YARN
This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.

Apache Hadoop YARN #

はじめに #

このはじめにのセクションでは、YARN上に完全に機能するFlinkクラスタをセットアップする手順を説明します。

はじめに #

Apache Hadoop YARNは多くのデータ処理フレームワークで人気のあるリソースプロバイダです。 FlinkサービスはYARNのResourceManagerに送信され、YARN NodeManagersによって管理されるマシーン上にコンテナが生成されます。FlinkはJobManagerとTaskManagerインスタンスをそのようなコンテナにデプロイします。

Flinkは、JobManagerで実行されるジョブに必要な処理スロットの数に応じてTaskManagerリソースを動的に割り当てたり割り当てを解除したりできます。

準備 #

このはじめにセクションは、バージョン2.10.2移行の機能的なYARN環境を前提としています。YARN環境は、Amazon EMR、Google Cloud DataProc、Clouderaなどのプロダクトなどのサービスを通じて最も便利に提供されます。ローカルでYARN環境を手動でセットアップするクラスタは、このはじめにのチュートリアルを進めるには推奨されません。

  • yarn topを実行して、YARNクラスタがFlinkアプリケーションを受け入れる準備ができていることを確認します。エラーメッセージは表示されないはずです。
  • 最新のFlink配布物をダウンロードページからダウンロードし、解凍します。
  • 重要 HADOOP_CLASSPATH環境変数がセットアップされていることを確認してください(echo $HADOOP_CLASSPATHを実行することで確認できます)。そうでない場合は、次を使って設定します
export HADOOP_CLASSPATH=`hadoop classpath`

YARN上でFlinkセッションを開始 #

HADOOP_CLASSPATH環境変数が設定されていることを確認したら、Flink on YARNセッションを開始し、サンプルのジョブを送信できます:

# we assume to be in the root directory of 
# the unzipped Flink distribution

# (0) export HADOOP_CLASSPATH
export HADOOP_CLASSPATH=`hadoop classpath`

# (1) Start YARN Session
./bin/yarn-session.sh --detached

# (2) You can now access the Flink Web Interface through the
# URL printed in the last lines of the command output, or through
# the YARN ResourceManager web UI.

# (3) Submit example job
./bin/flink run ./examples/streaming/TopSpeedWindowing.jar

# (4) Stop YARN session (replace the application id based 
# on the output of the yarn-session.sh command)
echo "stop" | ./bin/yarn-session.sh -id application_XXXXX_XXX

おめでとう!YARNにFlinkをデプロイすることで、Flinkアプリケーションが正常に実行されました。

Back to top

プロダクション環境で使う場合は、アプリケーション間の分離を強化するため、アプリケーションモードでFlinkアプリケーションをデプロイすることをお勧めします。

アプリケーションモード #

アプリケーションモードの背景にある高レベルの直感については、配備モードの概要を参照してください。

アプリケーションモードはYARN上でFlinkクラスタが起動され、アプリケーションjarのmain()メソッドがYARNのJobManagerで実行されます。 クラスタはアプリケーションが終了するとすぐにシャットダウンされます。yarn application -kill <ApplicationId>を使うか、Flinkジョブをキャンセルすることで、クラスタを手動で停止できます。

./bin/flink run-application -t yarn-application ./examples/streaming/TopSpeedWindowing.jar

アプリケーションモードクラスタがデプロイされると、キャンセルやセーブポイントの取得などの操作を行うためにクラスタとやり取りできるようになります。

# List running job on the cluster
./bin/flink list -t yarn-application -Dyarn.application.id=application_XXXX_YY
# Cancel running job
./bin/flink cancel -t yarn-application -Dyarn.application.id=application_XXXX_YY <jobId>

アプリケーションクラスタでジョブをキャンセルすると、クラスタが停止することに注意してください。

アプリケーションモードの可能性を完全に引き出すには、yarn.provided.lib.dirs設定オプションと一緒に使うことを検討し、クラスタ内の全てのノードがアクセスできる場所にアプリケーションjarを事前にアップロードします。この場合、コマンドは次のようになります:

./bin/flink run-application -t yarn-application \
	-Dyarn.provided.lib.dirs="hdfs://myhdfs/my-remote-flink-dist-dir" \
	hdfs://myhdfs/jars/my-application.jar

上記により、必要なFlink jarとアプリケーションjarが、クライアントによってクラスタに送信されるのではなく、指定されたリモートの場所に取得されるため、ジョブの送信が非常に軽くなります。

セッションモード #

セッションモードの背景にある高レベルの直感については、配備モードの概要を参照してください。

セッションモードを使ったデプロイメントについては、ページ上部のはじめにガイドで説明しています。

セッションモードには次の2つのオペレーションモードがあります:

  • アタッチモード (デフォルト): yarn-session.shクライアントはFlinkクラスタをYARNに送信しますが、クラインとは時刻を続けてクラスタの状態を追跡します。クラスタに障害が発生すると、クライアントはエラーを表示します。クライアントが終了すると、クラスタにもシャットダウンするように通知されます。
  • デタッチモード (-d or --detached): yarn-session.shクライアントはFlinkクラスタをYARNに送信し、クライアントは戻ります。Flinkクラスタを停止するには、クライアントまたはYARNツールを再度呼び出す必要があります。

セッションモードでは、/tmp/.yarn-properties-<username>に非表示のYARNプロパティファイルが作成されます。このファイルはジョブの送信時に個安堵ラインインタフェースによりクラスタ検出のために選択されます。

Flinkジョブを送信する時に、コマンドラインインタフェースでターゲットYARNクラスタを手動で指定することができます。以下に例を示します:

./bin/flink run -t yarn-session \
  -Dyarn.application.id=application_XXXX_YY \
  ./examples/streaming/TopSpeedWindowing.jar

次のコマンドを使って、YARNセッションに再アタッチできます:

./bin/yarn-session.sh -id application_XXXX_YY

conf/flink-conf.yamlファイル経由で設定を渡す以外に、-Dkey=value引数を使って./bin/yarn-session.shクライアントへ送信時に任意の設定を渡すこともできます。

YARNセッションクライアントには、一般的に使われる設定用の"shortcut arguments"も幾つかあります。これらは、./bin/yarn-session.sh -hで一覧表示できます。

Back to top

Per-Jobモード(非推奨) #

per-job モードは YARN でのみサポートされており、Flink 1.15 で非推奨になりました。 FLINK-26000 でドロップされます。 YARN 上でジョブごとに専用のクラスタを起動するアプリケーションモードを検討してください。
per-jobモードの背後にある高度な直感については、デプロイメントモードの概要を参照してください。

Per-jobクラスタモードではYARN上でFlinkクラスタが起動され、次に提供されたアプリケーションjarがローカルで実行され、最後にJobGraphをYARN上のJobManagerに送信されます。--detached引数を渡すと、送信が受け入れられるとクライアントは停止します。

ジョブが停止すると、YARNクラスタも停止します。

./bin/flink run -t yarn-per-job --detached ./examples/streaming/TopSpeedWindowing.jar

Per-Jobクラスタがデプロイされると、キャンセルやセーブポイントの取得などの操作を行うためにクラスタとやり取りできるようになります。

# List running job on the cluster
./bin/flink list -t yarn-per-job -Dyarn.application.id=application_XXXX_YY
# Cancel running job
./bin/flink cancel -t yarn-per-job -Dyarn.application.id=application_XXXX_YY <jobId>

Per-Jobクラスタでジョブをキャンセルすると、クラスタが停止することに注意してください。

YARN固有の設定は、設定ページにリストされています。

以下の設定パラメータは、実行時にフレームワークによって上書きされる可能性があるため、Flink on YARNによって管理されます。

  • jobmanager.rpc.address (Flink on YARNによってJobManagerコンテナのアドレスに動的に設定されます)
  • io.tmp.dirs (設定されていない場合、FlinkはYARNによって定義された一時的なディレクトリを設定します)
  • high-availability.cluster-id (HAサービス内の複数クラスタを識別するために自動的に生成されるID)

追加のHadoop設定ファイルをFlinkに渡す必要がある場合、Hadoop設定ファイルを含むディレクトリ名を受け入れるHADOOP_CONF_DIR環境変数を介して行うことができます。デフォルトでは、必要な全てのHadoop設定ファイルはHADOOP_CLASSPATH環境変数を介してクラスパスからロードされます。

リソース割り当ての動作 #

YARN上で実行されているJobManagerは、既存のリソースで送信された全てのジョブを実行できない場合、追加のTaskManagerを要求します。特に、セッションモードで実行している場合、JobManagerは、追加のジョブが送信されると、必要に応じて追加のTaskManagerを割り当てます。未使用のTaskManagerはタイムアウト後に再度解放されます。

JobManagerとTaskManagerプロセスのメモリ設定は、YARN実装によって尊重されます。報告された仮想コアの数は、デフォルトではTaskManagerごとに設定されたスロットの数と等しくなります。yarn.containers.vcoresでは、仮想コアの数を独自の値で上書きできます。このパラメータが機能するには、YARNクラスタでCPUスケジューリングを有効にする必要があります。

失敗したコンテナ(JobManagerを含む)は、YARNによって置き換えられます。JobManagerコンテナの最大再起動数は、yarn.application-attempts (デフォルトは1)で設定されます。全ての試行が完了すると、YARNアプリケーションは失敗します。

YARNの高可用性 #

YARNの高可用性は、YARNと高可用性サービスの組み合わせによって実現されます。

HAサービスが設定されると、JobManagerメタデータが保持され、リーダーの選出が実行されます。

YARNは失敗したJobMangerの再起動を処理します。JobManagerの最大再起動数は、2つの設定パラメータで定義されます。最初のFlinkのyarn.application-attempts設定はデフォルトで2になります。この値はYARNのyarn.resourcemanager.am.max-attemptsで制限され、これもデフォルトは2です。

YARNにデプロイする時、Flinkはhigh-availability.cluster-id設定パラメータを管理していることに注意してください。 Flinkは、デフォルトでYARNアプリケーションIDに設定します。 YARNにHAclusterをデプロイする時は、このパラメータを上書きしないでください。 クラスタIDはHAバックエンド(ZooKeeperなど)内の複数のHAclusterを区別するために使われます。 この設定パラメータの上書きは、複数のYARNクラスタがお互いに影響を与える可能性があります。

コンテナのシャットダウンの挙動 #

  • YARN 2.3.0 < version < 2.4.0. アプリケーションマスタが失敗した場合に全てのコンテナが再起動されます。
  • YARN 2.4.0 < version < 2.6.0. タスクマネージャーはアプリケーションマスタの失敗を超えて維持され続けます。これはスタートアップの時間が早く、コンテナリソースを再び取得するまでユーザが待つ必要が無いという利点があります。
  • YARN 2.6.0 <= version: Sets the attempt failure validity interval to the Flinks’ Pekko timeout value. 試行の障害有効間隔は、アプリケーションがシステムが1つの間隔の間でアプリケーションの試行の最大数を知った後でアプリケーションが単純にkillされるように伝えます。これにより、長時間続いているジョブによってアプリケーションの試行が枯渇することが回避されます。
Hadoop YARN 2.4.0 には、再起動されたApplication Master/Job Managerコンテナの再起動を妨げる重大なバグ(2.5.0で修正されました)があります。詳細は、 FLINK-4142をご覧ください。YARN上での高可用性のために少なくともHadoop 2.5.0 を使うことをお勧めします。

サポートされるHadoopバージョン。 #

Flink on YARNはHadoop 2.10.2に対してコンパイルされており、Hadoop 3.xを含む全てのHadoopバージョン>= 2.10.2がサポートされます。

Flinkに必要なHadoop依存関係を提供するには、Getting Started / Preparationセクションですでに紹介されているHADOOP_CLASSPATH環境変数を設定することをお勧めします。

それが不可能な場合は、依存関係をFlinkのlib/フォルダに置くこともできます。

Flinkは、webサイトのDownloads / Additional Componentsセクションの、lib/フォルダに配置できる事前にバンドルされたHadoop fat jarも提供しています。これらの事前にバンドルされたfat jarは、共通ライブラリとの依存関係の競合を避けるためにシェードされています。Flinkコミュニティはこれらの事前バンドルされたjarに対してYARN統合をテストしていません。

Firewallの後ろのYARN上でFlinkを実行 #

いくつかのYARNクラスタはクラスタとそのほかのネットワーク間のネットワーク通信を制御するためにファイアウォールを使います。 これらのセットアップでは、Flinkのジョブはクラスタのネットワーク内(ファイアウォールの内側)からのみYARNセッションに送信できます。 これがプロダクション環境での使用が不可能な場合、Flinkでは、クライアントとクラスタの通信に使われるRESTエンドポイントのポート範囲を設定できます。この範囲を設定すると、ユーザはファイアウォールを超えてFlinkにジョブを送信することもできます。

RESTエンドポイントポートを指定するための設定パラメータは、rest.bind-portです。この設定オプションは、単一のポート(例えば:“50010”)、範囲(“50000-50025”)、両方の組み合わせを受け入れます。

ユーザのjarsとクラスパス #

セッションモード

YARNにセッションモードでFlinkをデプロイする場合、起動コマンドで指定されたJARファイルだけがユーザjarとして認識され、ユーザクラスパスに組み込まれます。

PerJobモードとアプリケーションモード

YARN上でPerJob/ApplicationモードでFlinkをデプロイする場合、起動コマンドで指定されたJARファイルとFlinkのusrlibフォルダ内の全てのJARファイルはユーザjarsとして認識されます。 デフォルトで、Flinkはユーザjarsをシステムのクラスパスに含みます。この動作はyarn.classpath.include-user-jarパラメータで制御されます。

これをDISABLEDに設定した場合、Flinkは代わりにユーザクラスパスにjarを含めます。

クラスパス内のユーザjarsの位置は、パラメータを次のいずれかに設定することで制御できます:

  • ORDER: (デフォルト) 辞書順に基づいてjarをシステムのクラスパスに追加します。
  • FIRST: jarをシステムのクラスパスの先頭に追加します。
  • LAST: jarをシステムのクラスパスの末尾に追加します。

詳細については、クラスローディングのデバッグを参照してください。

Back to top

inserted by FC2 system