This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.
柔軟なスケーリング #
Apache Flinkによりジョブを再スケールできます。これを手動で行うには、ジョブを停止し、シャットダウン中に作成したセーブポイントから別のへ並列処理で再開します。
このページはFlinkが並列処理を自動的に調整するオプションについて説明します。
リアクティブモード #
リアクティブモードはMVP (“minimum viable product”)機能です。Flinkコミュニティは、メーリングリストを通じてユーザからのフィードバックを積極的に求めています。このページに記載されている制限事項をご確認ください。
リアクティブモードは、クラスタ内で利用可能な全てのリソースを常に使うようにジョブを設定します。TaskManagerの追加はジョブをスケールアップし、リソースを削除するとスケールダウンします。Flinkはジョブの並列度を管理し、常に可能な限り高い値に設定します。
リアクティブモードは再スケーリングイベントでジョブを再開し、最後に完了したチェックポイントからジョブを復元します。これは、セーブポイント(ジョブを手動で再スケーリングするために必要)を作成するオーバーヘッドが無いことを意味します。また、再スケーリング後に再処理されるデータの量は、チェックポイントの間隔によって異なり、回復時間は状態のサイズによって異なります。
リアクティブモードにより、Flinkユーザは、外部サービスにコンシューマのラグ、総CPU使用率、スループット、レイテンシなどの特定のメトリクスを監視させることで、強力な自動化の仕組みを実装できます。これらのメトリクスが特定の閾値を上回るか下回ると、追加のTaskManagerをFlinkのクラスタに追加または削除できます。これは、Kubernetesデプロイメントのreplica factor または、AWS上の自動化スケーリンググループを変更することで実装できます。この外部サービスは、リソースの割り当てと割りて解除を処理することのみを必要とします。Flinkは利用可能なリソースを使ってジョブの実行を維持します。
開始 #
リアクティブモードを試してみたいだけの場合は、次の手順に従ってください。Flinkの単一のマシンに配備していることを前提とします。
# these instructions assume you are in the root directory of a Flink distribution.
# Put Job into lib/ directory
cp ./examples/streaming/TopSpeedWindowing.jar lib/
# Submit Job in Reactive Mode
./bin/standalone-job.sh start -Dscheduler-mode=reactive -Dexecution.checkpointing.interval="10s" -j org.apache.flink.streaming.examples.windowing.TopSpeedWindowing
# Start first TaskManager
./bin/taskmanager.sh start
使用されている送信コマンドを簡単に調べてみましょう:
./bin/standalone-job.sh start
は、アプリケーションモードでFlinkをデプロイします-Dscheduler-mode=reactive
はリアクティブモードを有効にします。-Dexecution.checkpointing.interval="10s"
はチェックポイントと再起動戦略を設定します。- 最後の引数はジョブのmainクラス名を渡します。
これで、リアクティブモードでFlinkジョブが開始されました。web interfaceはジョブが1つのTaskManagerで実行されていることを示します。ジョブをスケールアップしたい場合は、クラスタに別のTaskManagerを追加するだけです:
# Start additional TaskManager
./bin/taskmanager.sh start
スケールダウンするには、TaskManagerインスタンスを削除します。
# Remove a TaskManager
./bin/taskmanager.sh stop
使い方 #
設定 #
リアクティブモードを有効にするには、scheduler-mode
をreactive
に設定する必要があります。
ジョブ内の個々のオペレータの並列度は、スケジューラによって決定されます。これは設定可能ではなく、個々のオペレータまたはジョブ全体で明示的に設定しても無視されます。
並列度に影響を与える唯一の方法は、オペレータの最大並列度を設定することです(これはスケジューラによって尊重されます)。maxParallelismは2^15 (32768)の制限があります。 個々のオペレータまたはジョブ全体に並列度を設定しない場合、デフォルトの並列度ルールが適用され、可能な最大値よりも下限が適用される可能性があります。デフォルトのスケジュールモードと同様に、並列度のベストプラクティスを考慮にいれてください。
Flinkの一部の内部構造を維持するにはさらに多くの内部構造が必要となるため、このように最大並列度が高いとジョブのパフォーマンスに影響を与える可能性があることに注意してください。
リアクティブモードを有効にする場合、jobmanager.adaptive-scheduler.resource-wait-timeout
設定キーはデフォルトで-1
になります。これは、JobManagerが十分なリソースを待機しながら永久に実行されることを意味します。
ジョブを実行するのに十分なTaskManagerが無い状態で一定時間後にJobManagerを停止したい場合は、jobmanager.adaptive-scheduler.resource-wait-timeout
を設定します。
リアクティブモードを有効にすると、jobmanager.adaptive-scheduler.resource-stabilization-timeout
設定キーはデフォルトで0
に設定されます: 十分なリソースが利用可能になるとすぐに、Flinkはジョブを開始します。
TaskManagersが同時に接続せず、次々とゆっくり接続するシナリオでは、この動作によりTaskManagerが接続するたびにジョブが再起動されます。ジョブをスケジュールする前にリソースが安定するまで待機する場合、この設定値を増やします。
さらに、jobmanager.adaptive-scheduler.min-parallelism-increase
を設定できます: この設定オプションは、スケールアップをトリガーする前に追加の合計並列度の追加の最小値を指定します。例えば、ソース(並列度=2)とシンク(並列度=2)を持つジョブがある場合、合計の並列度は4になります。デフォルトでは、設定キーは1に設定されているため、並列度の合計が増加すると、再起動が引き起こされます。
推奨事項 #
-
ステートフルジョブの定期的なチェックポイントの設定: リアクティブモードは、再スケールイベントで最後に完了したチェックポイントから回復します。定期的なチェックポイントが有効になっていない場合、プログラムは状態を失います。チェックポイントは再起動戦略も設定します。リアクティブモードは設定された再起動戦略を尊重します: 再起動戦略が設定されていない場合、リアクティブモードはジョブをスケーリングせずに失敗します。
-
TaskManagerが適切にシャットダウンされていない(つまり、SIGTERMシグナルの代わりにSIGKILLシグナルが使われた)場合、リアクティブモードのダウンスケーリングはさらに時間が掛かる可能性があります。この場合、FlinkはJobManagerと停止したTaskManagerの間のハートビートがタイムアウトするまで待機します。より低い並列度でジョブを再デプロイするまで、Flinkジョブが約50秒停止することが分かります。
デフォルトのタイムアウトは50秒に設定されています。インフラストラクチャが許すようであれば、
heartbeat.timeout
設定を低い値に調整します。ハートビートタイムアウトを低く設定すると、ネットワークの服装やガベージコレクションの長時間の一時停止により、TaskManagerがハートビートに応答できない場合に障害に繋がる可能性があります。heartbeat.interval
は常にタイムアウトよりも低くする必要があることに注意してください。
制限事項 #
リアクティブモードは新しい実験的な機能であるため、デフォルトのスケジューラでサポートされている全ての機能がリアクティブモード(とその適応型スケジューラ)でも使えるわけではありません。Flinkコミュニティはこれらの制限の対処に取り組んでいます。
-
デプロイメントはスタンドアローンアプリケーションのデプロイメントとしてのみサポートされます。(ネイティブKubernetes、Yarnなどの)アクティブリソースプロバイダは明示的にサポートされません。スタンドアローンセッションクラスタもサポートされません。アプリケーションのデプロイメントは単一のジョブのアプリケーションに制限されます。
サポートされるデプロイメントオプションは、アプリケーションモードのスタンドアローン (このページで説明されます)、アプリケーションモードのDocker、スタンドアローンKubernetesアプリケーションクラスタのみです。
適応型スケジューラの制限は、リアクティブモードにも適用されます。
適応型スケジューラ #
複数のジョブを含むセッションクラスタでのスロット割り当てが定義されていないため、適応型スケジューラを(リアクティブモードではなく)直接使うことは、上級のユーザのみにお勧めします。
適応型スケジューラは、利用可能なスロットに基づいてジョブの並列度を調整できます。最初に設定された並列度でジョブを実行するのに十分なスロットが利用できない場合、自動的に並列度が減らされます; 送信時に利用可能なリソースが十分ではないこと、またはジョブの実行中にTaskManagerが停止したことが原因です。新しいスロットが利用可能になると、ジョブは設定された並列度まで再びスケールアップされます。 リアクティブモード(上記を参照)では、設定された並列度が無視され、無限に設定されていたかのように扱われ、ジョブが常に可能な限り多くのリソースを使うようにします。 リアクティブモード無しで適応型スケジューラを使うこともできますが、実際的な制限がいくつかあります:
- セッションクラスタで適応型スケジューラを使っている場合、同じセッション内で実行中の複数のジョブ間でのスロットの分散に関する保証はありません。
デフォルトのスケジューラに対する適応型スケジューラの利点の1つは、タスクマネージャの損失を適切に処理できることです。これは、このような場合にスケールダウンするだけであるためです。
使い方 #
以下の設定パラメータを設定する必要があります:
jobmanager.scheduler: adaptive
: デフォルトのスケジューラから適応型スケジューラに変更します
The behavior of Adaptive Scheduler is configured by all configuration options containing adaptive-scheduler
in their name.
制限事項 #
- ストリーミングジョブのみ: 適応型スケジューラはストリーミングジョブのみで実行されます。バッチジョブを送信する時、Flinkはバッチジョブのデフォルトのスケジューラ、つまり適応型バッチスケジューラを使います。
- 部分的なフェイルオーバーはサポートされません: 部分的なフェイルオーバーとは、スケジューラがジョブ全体ではなく、失敗したジョブの一部(Flink内部の"regions")を再起動できることを意味します。この制限は、厄介な並列ジョブの回復時間にのみ影響します: Flinkのデフォルトのスケジューラは失敗した部分を再起動できますが、適応型スケジューラはジョブ全体を再起動します。
- スケーリングイベントはジョブとタスクの再起動を引き起こし、タスクの試行回数が増加します。
適応型バッチスケジューラ #
適応型バッチスケジューラは実行計画を自動的に調整できるバッチジョブスケジューラです。現在、バッチジョブのオペレータの並列度の自動決定をサポートします。オペレータに並列度が設定されていない場合、スケジューラは消費されるデータセットのサイズに応じてオペレータの並列度を決定します。これは多くの利点をもたらします:
- バッチジョブのユーザは並列度のチューニングから解放されます
- 自動的に調整された並列度により、ボリュームサイズが毎日変化する消費されたデータセットに適切に適合できます。
- SQLバッチジョブのオペレータは自動的に調整される様々な並列度を割り当てることができます。
現時点では、適当型バッチスケジューラはFlinkバッチジョブのデフォルトのスケジューラです。他のスケジューラが明示的に設定されない限り、追加の設定は必要ありません。例えば、jobmanager.scheduler: default
)。Note that you need to
leave the execution.batch-shuffle-mode
unset or explicitly set it to ALL_EXCHANGES_BLOCKING
(default value) or ALL_EXCHANGES_HYBRID_FULL
or ALL_EXCHANGES_HYBRID_SELECTIVE
due to “BLOCKING or HYBRID jobs only”.
オペレータの並列度を自動的に決定 #
使い方 #
適応型バッチスケジューラを使ってオペレータの並列度を自動的に決定するには、以下のことをする必要があります:
-
機能をオンに切り替えます:
適応型バッチスケジューラは、デフォルトで自動並列度導出を有効にします。
execution.batch.adaptive.auto-parallelism.enabled
を使ってこの機能を切り替えます。 さらに、適応型バッチスケジューラを使ってオペレータの並列度を自動的に決定する場合、調整が必要な関連する設定オプションがいくつかあります。execution.batch.adaptive.auto-parallelism.min-parallelism
: 適応的に設定できる並列度の下限。execution.batch.adaptive.auto-parallelism.max-parallelism
: 適当的に設定できる並列度の上限。parallelism.default
またはStreamExecutionEnvironment#setParallelism()
によって設定されたデフォルトの並列度は、設定されていない場合は許可された並列度の上限として使われます。execution.batch.adaptive.auto-parallelism.avg-data-volume-per-task
: 各インスタンスの処理が予想されるデータ量の平均サイズ。データのスキューが発生するか、決定された並列度が(データが多すぎるために)最大並列度に達した場合、一部のタスクで実際に処理されるデータがこの値を大幅に超える可能性があることに注意してください。execution.batch.adaptive.auto-parallelism.default-source-parallelism
: データソースのデフォルトの並列度。
-
オペレータの並列度の設定は避けてください:
適応型バッチスケジューラは並列度が設定されていないオペレータの並列度のみを設定します。したがって、オペレータの並列度を自動的に決定したい場合は、‘setParallelism()‘メソッドを使ってオペレータの並列度を設定することは避ける必要があります。
さらに、DataSetジョブには次の設定が必要です:
parallelism.default: -1
を設定します。ExecutionEnvironment
でsetParallelism()
を呼ばないでください。
パフォーマンスの調整 #
- Sort Shuffleを使い、
taskmanager.network.memory.buffers-per-channel
を0
に設定することをお勧めします。これにより必要なネットワークメモリを並列度から切り離すことができるため、大規模なジョブで"Insufficient number of network buffers"エラーが発生する可能性が低くなります。 execution.batch.adaptive.auto-parallelism.max-parallelism
を最悪の場合に必要となると予想される並列度に設定します。過度の値はパフォーマンスに影響する可能性があるため、それより大きい値はお勧めできません。このオプションはアップストリームによって生成されるサブパーティションの数に影響を与える可能性があり、巣部パーティションの数が多いと、パケットが小さいためにハッシュシャッフルのパフォーマンスとネットワーク送信のパフォーマンスが低下する可能性があります。
制限事項 #
- Batch jobs only: 適応型バッチスケジューラはバッチジョブのみをサポートします。ストリーミングジョブが送信された場合、例外が投げられます。
- BLOCKING or HYBRID jobs only: 現在のところ、適応型バッチスケジューラはシャッフルモードが
ALL_EXCHANGES_BLOCKING / ALL_EXCHANGES_HYBRID_FULL / ALL_EXCHANGES_HYBRID_SELECTIVE
のジョブのみをサポートします。 - FileInputFormat sources are not supported: FileInputFormat sources are not supported, including
StreamExecutionEnvironment#readFile(...)
StreamExecutionEnvironment#readTextFile(...)
andStreamExecutionEnvironment#createInput(FileInputFormat, ...)
. 適応型バッチスケジューラを使う場合は、ファイルを読み取るために新しいソースFileSystem DataStream ConnectorまたはFileSystem SQL Connector)を使う必要があります。 - Inconsistent broadcast results metrics on WebUI: When use Adaptive Batch Scheduler to automatically decide parallelisms for operators, for broadcast results, the number of bytes/records sent by the upstream task counted by metric is not equal to the number of bytes/records received by the downstream task, which may confuse users when displayed on the Web UI. 詳細はFLIP-187を参照してください。