YARN セットアップ

クイックススタート

4つのタスクマネージャーを持つYARNセッションを開始します(それぞれは4GのHeapspaceを持ちます):

# get the hadoop2 package from the Flink download page at
# http://flink.apache.org/downloads.html
curl -O <flink_hadoop2_download_url>
tar xvzf flink-1.3-SNAPSHOT-bin-hadoop2.tgz
cd flink-1.3-SNAPSHOT/
./bin/yarn-session.sh -n 4 -jm 1024 -tm 4096

タスクマネージャーあたりの処理スロットの数のために -s フラグを指定します。スロットの数をマシーンあたりのプロセッサの数に設定することをお勧めします。

一度セッションが開始されると、./bin/flink ツールを使ってクラスタにジョブをサブミットすることができます。

# get the hadoop2 package from the Flink download page at
# http://flink.apache.org/downloads.html
curl -O <flink_hadoop2_download_url>
tar xvzf flink-1.3-SNAPSHOT-bin-hadoop2.tgz
cd flink-1.3-SNAPSHOT/
./bin/flink run -m yarn-cluster -yn 4 -yjm 1024 -ytm 4096 ./examples/batch/WordCount.jar

Apache Hadoop YARN はクラスタのリソース管理フレームワークです。クラスタ上で様々な分散アプリケーションを実行することができます。FlinkはYARN上で他のアプリケーションの隣で実行します。すでにYARNセットアップがある場合は、ユーザは何もセットアップあるいはインストールする必要はありません。

必要条件

  • 少なくとも Apache Hadoop 2.2
  • HDFS (Hadoop 分散ファイルシステム) (あるいはHadoopによってサポートされる他の分散ファイルシステム)

Flink YARNクライアントの使用に問題がある場合は、FAQ の章を見て下さい。

YARNクラスタ内でFlinkセッションを起動する方法を学ぶために指示に従ってください。

クラスタにプログラムをサブミットできるようにセッションは全ての必要なFlinkサービス(ジョブマネージャーとタスクマネージャー)を開始するでしょう。セッションごとに複数のプログラムを実行できることに注意してください。

ダウンロードページから Hadoop >=2 のためのFlinkパッケージをダウンロードします。それには必要なファイルが含まれています。

以下を使ってパッケージを解凍します:

tar xvzf flink-1.3-SNAPSHOT-bin-hadoop2.tgz
cd flink-1.3-SNAPSHOT/

セッションを開始します

セッションを開始するために以下のコマンドを使います

./bin/yarn-session.sh

このコマンドは以下の概要を示すでしょう:

使い方:
   必要条件
     -n,--container <arg>   Number of YARN container to allocate (=Number of Task Managers)
   オプション
     -D <arg>                        Dynamic properties
     -d,--detached                   Start detached
     -jm,--jobManagerMemory <arg>    Memory for JobManager Container [in MB]
     -nm,--name                      Set a custom name for the application on YARN
     -q,--query                      Display available YARN resources (memory, cores)
     -qu,--queue <arg>               Specify YARN queue.
     -s,--slots <arg>                Number of slots per TaskManager
     -tm,--taskManagerMemory <arg>   Memory per TaskManager Container [in MB]
     -z,--zookeeperNamespace <arg>   Namespace to create the Zookeeper sub-paths for HA mode

YARNおよびHDFS設定を読み込むためにクライアントはYARN_CONF_DIR あるいは HADOOP_CONF_DIR 環境変数を設定されることを必要とすることに注意してください。

例: それぞれ8GBのメモリと32個の処理スロットを持つ10個のタスクマネージャーを割り当てるために以下のコマンドを発行します:

./bin/yarn-session.sh -n 10 -tm 8192 -s 32

The system will use the configuration in conf/flink-conf.yaml. 何か変更したい場合は、設定ガイド に従ってください。

YARN上のFlinkは以下の設定パラメータ jobmanager.rpc.address (ジョブマネージャーは常に異なるマシーン上に割り当てられるため)、taskmanager.tmp.dirs (YARNによって指定されたtmpディレクトリを使っています) および スロットの数が指定された場合はparallelism.default を上書きするでしょう。

設定パラメータを設定するために設定ファイルを変更したくない場合は、-D フラグを使って動的にプロパティを渡す方法があります。つまり以下のようにしてパラメータを渡すことができます: -Dfs.overwrite-files=true -Dtaskmanager.network.numberOfBuffers=16368

アプリケーションマスタとジョブマネージャーのために追加の1つのコンテナがあるために、例の発動は(10個のコンテナが要求されたにも関わらず)11個のコンテナを開始します。

Flinkが一度YARNクラスタ内でデプロイされると、ジョブマネージャーの接続の詳細を示すでしょう。

unixプロセスを停止sるうか、クライアントに'stop'を入力することでYARNセッションは停止します。

YARN上のFlinkは十分なリソースがクラスタ上で利用可能であれば、全てのリクエストされたコンテナを開始するでしょう。ほとんどのYARNスケジューラはコンテナの要求されたメモリを占め、いくつかはvcoreの数も占めます。デフォルトでは、vcoreの数は処理スロット(-s)引数と等しいです。yarn.containers.vcoresは独自の値でvcoreの数を上書きすることができます。

YARNセッションをデタッチする

Flink YARNクライアントをずっと実行しておきたくない場合は、detached YARN セッションを開始することもできます。そのパラメータは-d or --detachedです。

その場合、Flink YARNクライアントはFlinkをクラスタにサブミットだけし、それ自身を終了します。この場合、Flinkを使ってYARNセッションを停止できないことに注意してください。

YARNセッションを停止するには、YARNユーティリティ (yarn application -kill <appId>) を使ってください。

既存のセッションをアタッチする

セッションを開始するために以下のコマンドを使います

./bin/yarn-session.sh

このコマンドは以下の概要を示すでしょう:

使い方:
   必要条件
     -id,--applicationId <yarnAppId> YARN アプリケーション Id

既に述べたように、YARNおよびHDFS設定を読み込むためにクライアントはYARN_CONF_DIR あるいは HADOOP_CONF_DIR 環境変数が設定されていなければなりません。

例: 実行中のFlink YARN セッション application_1463870264508_0029 にアタッチするために以下のコマンドを発行します:

./bin/yarn-session.sh -id application_1463870264508_0029

実行中のセッションへのアタッチは、ジョブマネージャーのRPCポートを決定するためにYARN リソースマネージャーを使います。

unixプロセスを停止sるうか、クライアントに'stop'を入力することでYARNセッションは停止します。

FlinkプログラムをYARNクラスタにサブミットするために以下のコマンドを使います:

./bin/flink

Please refer to the documentation of the command-line client.

コマンドは以下のようなヘルプメニューを示すでしょう:

[...]
Action "run" compiles and runs a program.

  Syntax: run [OPTIONS] <jar-file> <arguments>
  "run" action arguments:
     -c,--class <classname>           Class with the program entry point ("main"
                                      method or "getPlan()" method. Only needed
                                      if the JAR file does not specify the class
                                      in its manifest.
     -m,--jobmanager <host:port>      Address of the JobManager (master) to
                                      which to connect. Use this flag to connect
                                      to a different JobManager than the one
                                      specified in the configuration.
     -p,--parallelism <parallelism>   The parallelism with which to run the
                                      program. Optional flag to override the
                                      default value specified in the
                                      設定

ジョブをYARNにサブミットするにはrun アクションを使います。クライアントはジョブマネージャーのアドレスを決定することができます。In the rare event of a problem, you can also pass the JobManager address using the -m argument. ジョブマネージャーのアドレスはYARNコンソールで見ることができます。

wget -O LICENSE-2.0.txt http://www.apache.org/licenses/LICENSE-2.0.txt
hadoop fs -copyFromLocal LICENSE-2.0.txt hdfs:/// ...
./bin/flink run ./examples/batch/WordCount.jar \
        hdfs:///..../LICENSE-2.0.txt hdfs:///.../wordcount-result.txt

以下のエラーがある場合は、全てのタスクマネージャーが開始したことを確かめてください:

Exception in thread "main" org.apache.flink.compiler.CompilerException:
    Available instances could not be determined from job manager: Connection timed out.

ジョブマネージャーのwebインタフェースの中でタスクマネージャーの数を調べることができます。このインタフェースのアドレスはYARNセッションコンソール内に出力されています。

タスクマネージャーが1分後に現れない場合は、ログファイルを使って問題を調査しなければなりません。

上のドキュメントはどうやってHadoop YARN環境内でFlinkクラスタを開始するかを説明します。1つのジョブを実行するためだけにYARN内でFlinkを起動することもできます。

クライアントは-yn の値(タスクマネージャーの数)が設定されている事を期待することに注意してください。

例:

./bin/flink run -m yarn-cluster -yn 2 ./examples/batch/WordCount.jar

YARNセッションのコマンドラインオプションは ./bin/flink ツール内でも利用可能です。それらはy あるいは yarn (長い引数のオプション)を使って事前に定義されます。

注意: 環境変数FLINK_CONF_DIRを設定することで、ジョブごとに異なる設定ディレクトリを使うことができます。これを使うには、Flinkの配布物からconf ディレクトリをコピーし、例えばジョブ事のログ設定を変更します。

Note: It is possible to combine -m yarn-cluster with a detached YARN submission (-yd) to “fire and forget” a Flink job to the YARN cluster. In this case, your application will not get any accumulator results or exceptions from the ExecutionEnvironment.execute() call!

FlinkのYARNクライアントは、コンテナが失敗した時にどう振る舞うかを制御するための以下の設定パラメータを持ちます。これらのパラメータはconf/flink-conf.yaml あるいはYARNセッションから開始している場合は-D パラメータを使うかどちらかの方法で設定することができます。

  • yarn.reallocate-failed: このパラメータはFlinkが失敗したタスクマネージャーのコンテナを再割り当てすべきかどうかを制御します。デフォルト: true
  • yarn.maximum-failed-containers: アプリケーションマスタがYARNセッションを失敗するまでに受け付ける失敗のコンテナの最大数。デフォルト: 最初に要求されるタスクマネージャーの数 (-n).
  • yarn.application-attempts: アプリケーションマスター (+ そのタスクマネージャーのコンテナ)の試行数。この値が1(デフォルト)に設定された場合、アプリケーションマスターが失敗した場合にYARNセッション全体が失敗するでしょう。Higher values specify the number of restarts of the ApplicationMaster by YARN.

失敗したYARNセッションをデバッグ

FlinkのYARNセッションの配備が失敗するかもしれない原因はたくさんあります。Hadoopセットアップの設定ミス(HDFSパーミッション、YARN設定)、バージョンの非互換性(Cloudera Hadoop上でvanilla Hadoopと一緒に実行するFlink)、あるいはそのほかのエラー。

ログファイル

FlinkのYARNセッションが自身を配備中に失敗する場合、ユーザはHadoop YARNのログの可能性を頼みにしなければなりません。そのためのもっとも有用な機能はYARNのログ集約です。それを有効にするためには、ユーザはyarn-site.xmlの中でyarn.log-aggregation-enable プロパティを trueに設定しなければなりません。一度有効にすると、ユーザは(失敗した)YARNセッションのすべてのログファイルを扱うために以下のコマンドを使うことができます。

yarn logs -applicationId <application ID>

ログが現れるまでセッションが完了した後で数秒かかることに注意してください。

YARN クライアント コンソール & Web インタフェース

実行時にエラーが起きた場合(例えば、タスクマネージャーが動作を停止した場合)、Flink YARN クライアントもターミナル内にエラーメッセージを出力します。

それに加えてYARNリソースマネージャーのwebインタフェース(デフォルトはポート8080)があります。リソースマネージャーのwebインタフェースのポートはyarn.resourcemanager.webapp.address 設定値によって決定されます。

実行中のYARNアプリケーションについてのログにアクセスすることができ、失敗したアプリケーションの診断を表示します。

特定のHadoopバージョンのためのYARNクライアントをビルド

Users using Hadoop distributions from companies like Hortonworks, Cloudera or MapR might have to build Flink against their specific versions of Hadoop (HDFS) and YARN. 詳細はビルドの説明を読んでください。

いくつかのYARNクラスタはクラスタとそのほかのネットワーク間のネットワーク通信を制御するためにファイアウォールを使います。それらのセットアップでは、Flinkのジョブはクラスタのネットワーク内(ファイアウォールの後ろ)からのみYARNセッションへサブミットされます。プロダクションの利用では実現可能では無い場合は、Flinkは全ての関係するサービスのためのポート範囲を設定することができます。これらの範囲の設定を使って、ユーザはファイアウォールを横断してジョブをFlinkにサブミットすることもできます。

現在のところ、ジョブをサブミットするには二つのサービスが必要です:

  • ジョブマネージャー (YARN内のアプリケーションマスター)
  • ジョブマネージャー内で動作しているBlobServer。

ジョブをFlinkにサブミットする場合、BlobServerはユーザコードと一緒に全てのワーカーノード(タスクマネージャー)にjarを分配するでしょう。ジョブマネージャーはジョブ自身を受け取り、実行を開始します。

ポートを指定するための二つの設定パラメータは以下の通りです:

  • yarn.application-master.port
  • blob.server.port

これら二つの設定オプションは、1つのポート(例えば: "50010")、範囲 ("50000-50025")、あるいは両方の組み合わせ ("50010,50011,50020-50025,50050-50075")を受け付けます。

(Hadoop は似たような仕組みを使っています。そこでは設定パラメータは yarn.app.mapreduce.am.job.client.port-rangeと呼ばれます。)

背景 / 内部

この章はFlinkとYARNがどのようにやり取りをするかを端的に説明します。

YARNクライアントはYARNリソースマネージャーとHDFSに接続するために、Hadoop設定へのアクセスを必要とします。以下の戦略を使ってHadoop設定を決定します:

  • YARN_CONF_DIR, HADOOP_CONF_DIR あるいは HADOOP_CONF_PATH が(その順番で)設定されているかどうかをテストします。これらの変数の一つが設定されている場合、それらが設定を読み込むために使われます。
  • 上の戦略が失敗した場合(これは正しいYARNセットアップで起こるべきではありません)、クライアントは HADOOP_HOME 環境変数を使っています。もしそれが設定されている場合、クライアントは$HADOOP_HOME/etc/hadoop (Hadoop 2) と $HADOOP_HOME/conf (Hadoop 1)にアクセスしようとします。

新しいFlink YARNセッションが開始する場合、クライアントはまずリクエストされたリソース(コンテナとメモリ)が利用可能かどうかを調べます。その後で、Flinkと設定を含むjarをHDFSにアップロードします(ステップ1)。

クライアントの次のステップは、YARNコンテナにApplicationMasterを開始 (ステップ3)するようにリクエストする(ステップ2)ことです。Since the client registered the configuration and jar-file as a resource for the container, the NodeManager of YARN running on that particular machine will take care of preparing the container (e.g. downloading the files). 一旦それが完了すると、ApplicationMaster (AM) が開始されます。

JobManager AM は同じコンテナ内で実行されます。一旦それらの開始が成功すると、AMはジョブマネージャー(それ自身のホスト)のアドレスを知ります。(ジョブマネージャーに接続できるように)タスクマネージャーのための新しいFlink設定ファイルを生成します。ファイルはHDFSにもアップロードされます。さらに、AM コンテナはFlinkのwebインタフェースにも提供されます。YARNコードが割り当てられている全てのポートは 短命ポートです。これによりユーザは複数のYARNセッションを並行して実行することができます。

その後で、AMはFlinkのタスクマネージャーのためのコンテナの割り当てを開始します。これはjarファイルをダウンロードし、HDFSから修正された設定をダウンロードするでしょう。これらのステップが完了すると、Flinkが準備されジョブの受付の準備ができています。

TOP
inserted by FC2 system