重要: Scalaに依存するMaven アーティファクトはScalaのメジャーバージョンが後ろに付きます。例えば、"2.10" あるいは "2.11"。プロジェクトwiki上のマイグレーションガイドに相談してください。

クリックスタート: セットアップ

上のFlinkのプログラムの例を取得し、2,3の簡単なステップで実行する。

セットアップ: ダウンロードと開始

FlinkはLinux, Mac OS X, and Windowsで動作します。Flinkを実行できるようにするための必要条件はJava 7.x (あるいは以上)のインストレーションのみです。Windows ユーザは、ローカルセットアップのためのWindows上でFlinkを実行する方法を説明しているWindows上のFlink ガイドを見てください。

ダウンロード

ダウンロードページからバイナリをダウンロードします。好きな Hadoop/Scala の組み合わせを選ぶことができます。

  1. ダウンロードディレクトリに行きます。
  2. ダウンロードされた書庫を解凍します。
  3. Flinkを開始します。
$ cd ~/Downloads # Go to download directory
$ tar xzf flink-*.tgz # Unpack the downloaded archive
$ cd flink-1.1-SNAPSHOT
$ bin/start-local.sh # Start Flink

http://localhost:8081ジョブマネージャーのwebフロントエンドをチェックし、全てが立ち上がっていて実行中であることを確認します。webフロントエンドは1つの利用可能なタスクマネージャーインスタンスを報告するはずです。

ジョブマネージャー: 概要

例の実行

今、SocketWindowWordCount の例を実行しようとしていて、ソケットからテキストを読み込み、個々の単語の数を数えます。

  • まず最初に、以下のようにローカルサーバを開始するためにnetcatを使います

    $ nc -l 9000
  • Flinkプログラムをサブミットします:

    $ bin/flink run examples/streaming/SocketWindowWordCount.jar --port 9000
    
    03/08/2016 17:21:56 Job execution switched to status RUNNING.
    03/08/2016 17:21:56 Source: Socket Stream -> Flat Map(1/1) switched to SCHEDULED
    03/08/2016 17:21:56 Source: Socket Stream -> Flat Map(1/1) switched to DEPLOYING
    03/08/2016 17:21:56 Keyed Aggregation -> Sink: Unnamed(1/1) switched to SCHEDULED
    03/08/2016 17:21:56 Keyed Aggregation -> Sink: Unnamed(1/1) switched to DEPLOYING
    03/08/2016 17:21:56 Source: Socket Stream -> Flat Map(1/1) switched to RUNNING
    03/08/2016 17:21:56 Keyed Aggregation -> Sink: Unnamed(1/1) switched to RUNNING

    プログラムはソケットに接続し入力を待ちます。期待した通りにジョブを実行しているかどうかを検証するためにwebインタフェースをチェックすることができます:

    ジョブマネージャー: 概要(cont'd)
    ジョブマネージャー: 実行中のジョブ
  • 数はstdoutに出力されます。ジョブマネージャーの出力ファイルを監視し、なんらかのテキストを ncに書き込みます:

    $ nc -l 9000
    lorem ipsum
    ipsum ipsum ipsum
    bye

    .out ファイルは即座に数を出力するでしょう:

    $ tail -f log/flink-*-jobmanager-*.out
    (lorem,1)
    (ipsum,1)
    (ipsum,2)
    (ipsum,3)
    (ipsum,4)
    (bye,1)

    完了時にFlinkを停止するには、以下をタイプします:

    $ bin/stop-local.sh

    クリックスタート: セットアップ

次のステップ

Flinkプログラミング APIの最初の感触を得るためにステップ バイ ステップの例を調べてください。それを完了して、ストリーミング ガイドに進んでください。

クラスタ設定

クラスタ上でFlinkを動かす は、ローカルで実行するのと同じくらい簡単です。全てのクラスタノード上でpasswordless SSH および同じディレクトリ構造にすることで、すべてを制御するためのスクリプトを使うことができます。

  1. 解凍したflink ディレクトリをダウンロード書庫からセットアップの各ノード上の同じファイルシステムパスにコピーします。
  2. マスターノード (JobManager) を選択し、conf/flink-conf.yaml内のjobmanager.rpc.address キーをそのIPアドレスまたはホスト名に設定します。クラスタ内のすべてのノードが同じjobmanager.rpc.address 設定を持つようにしてください。
  3. 全てのワーカーノード (TaskManager) のIPあるいはホスト名(行ごとに1つ)をconf/slaves内のスレーブファイルに追加します。

これで、マスターノードでbin/start-cluster.shを使ってクラスターを開始する ことができます。

以下のは3つのノード(10.0.0.1 から 10.0.0.3 のIPアドレスで、ホスト名master, worker1, worker2を持つ)のセットアップを図示し、設定ファイルの内容を示します。これは全てのマシーン上の同じパスでアクセス可能である必要があります。

/path/to/flink/conf/
flink-conf.yaml

jobmanager.rpc.address: 10.0.0.1

/path/to/flink/
conf/slaves

10.0.0.2
10.0.0.3

他に利用可能な設定オプションを見るために、ドキュメントの設定の章を見てください。Flinkが効果的に動作するために、2,3の設定値が設定される必要があります。

特に、

  • タスクマネージャーごとに利用可能なメモリの総量 (taskmanager.heap.mb)、
  • マシーンごとに利用可能なCPUの数 (taskmanager.numberOfTaskSlots)、
  • クラスタ内のCPUの総数 (parallelism.default) および
  • 一時的なディレクトリ (taskmanager.tmp.dirs)

は、とても重要な設定値です。

既存のYARN クラスタ上でFlinkを容易にデプロイすることができます。

  1. Flink Hadoop2 パッケージをダウンロードする: Flink with Hadoop 2
  2. YARNおよびHDFS設定を読み込むために、HADOOP_HOME (あるいは YARN_CONF_DIR または HADOOP_CONF_DIR) 環境変数が設定されているようにしてください。
  3. YARNクライアントを実行する: ./bin/yarn-session.sh。それぞれ8GBのメモリを持つ10個のタスクマネージャーを割り当てるために、-n 10 -tm 8192 オプションを付けてクライアントを実行することができます。
TOP
inserted by FC2 system