Speculative Execution
This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.

投機的実行 #

このページでは、投機的実行の背景、使い方、効果の確認方法について説明します。

背景 #

投機的実行は、問題のあるノードによって引き起こされるジョブの速度の低下を軽減する仕組みです。 問題のあるノードには、ハードウェアの問題、偶発的なI/Oビジー、高いCPU負荷がある可能性があります。これらの問題により、ホストされたタスクの実行が他のノード上のタスクよりも大幅に遅くなり、バッチジョブの全体的な実行時間に影響を与える可能性があります。

このような場合、投機的実行は、問題があると検出されなかったノード上で遅いタスクの新しい試行が開始されます。新しい試行は同じ入力データを処理し、古い試行と同じデータを生成します。古い試行は影響を受けず、引き続き実行されます。最初に完了した試行は許可され、その出力はダウンストリームのタスクによって表示および消費され、残りの試行はキャンセルされます。

これを実現するために、Flinkは遅いタスクの検出機能を使って遅いタスクを検出します。遅いタスクが存在するノードは問題のあるノードとしてブラックリストの仕組みによってブロックされます。スケジューラは、遅いタスクに対して新しい試行を作成し、ブロックされていないノードにそれらをデプロイします。

使い方 #

このセクションでは、投機的実行を有効にする方法、調整する方法、投機的実行で動作する独自のソースを開発/改善する方法など、投機的実行の使い方を説明します。

注意: DataSetは近い将来非推奨になるため、FlinkはDataSetジョブの投機的実行をサポートしません。DataStream APIは、Flinkバッチジョブを開発するために推奨される低レベルAPIになりました。

党的実行を有効にする #

以下の設定項目を使って投機的実行を有効にできます:

  • execution.batch.speculative.enabled: true

現在、Adaptive Batch Schedulerのみが投機的実行をサポートしていることに注意してください。また、別のスケジューラが明示的に設定されない限り、Flinkバッチジョブはデフォルトでこのスケジューラを使います。

調整の設定 #

様々なジョブに対して投機実行をより適切に機能させるには、スケジューラの次の設定オプションを調整できます:

低速タスク検出機能の以下の設定オプションを調整することもできます:

現在、投機的実行では、実行時間に基づいて遅いタスク検出機能を使って遅いタスクを検出します。 The detector will periodically count all finished executions, if the finished execution ratio reaches the configured ratio(slow-task-detector.execution-time.baseline-ratio), the baseline will be defined as the execution time median multiplied by the configured multiplier(slow-task-detector.execution-time.baseline-multiplier), then the running task whose execution time exceeds the baseline will be detected as a slow task. It is worth mentioning that the execution time will be weighted with the input data volume of the execution vertex, so the executions with large data volume differences but close computing power will not be detected as a slow task, when data skew occurs. これは、不必要な投機的な試行の開始を避けるのに役立ちます。

注意: ノードがSourceまたはHybrid Shuffleモードの場合、入力データ量を判断できないため、入力データ量で実行時間を重み付けする最適化は効果がありません。

投機的実行のソースを有効にする #

ジョブがカスタムの Source を使い、そのソースがカスタム SourceEvent を使う場合、そのソースの SplitEnumerator を変更して、 SupportsHandleExecutionAttemptSourceEvent インタフェースを実装する必要があります。

public interface SupportsHandleExecutionAttemptSourceEvent {
    void handleSourceEvent(int subtaskId, int attemptNumber, SourceEvent sourceEvent);
}

これは、SplitEnumeratorがイベントを送信する試みを認識する必要があることを意味します。そうしなければ、jobmanagerがタスクからソースイベントを受信した時に例外が発生し、ジョブの失敗につながります。

SourceFunction sources InputFormat sources new sources を含む投機的実行を使用する他のリソースについては、追加の変更は必要ありません。 Apache Flinkが提供する全てのソースコネクタは投機的時刻で動作できます。

投機的実行のシンクを有効にする #

シンクの投機的実行は、 SupportsConcurrentExecutionAttempts インタフェースを実装しない限りデフォルトで無効になっています。これは、互換性を考慮したためです。

public interface SupportsConcurrentExecutionAttempts {}

SupportsConcurrentExecutionAttempts は、 Sink SinkFunction OutputFormat に対して機能します。

タスク内のいずれかのオペレータが投機的実行をサポートしない場合、タスク全体が投機的実行をサポートしていないとしてマークされます。 つまり、シンクが投機的実行をサポートしない場合、シンクオペレータを含むタスクは投機的に実行できません。
Sink 実装の場合、Flinkは Committer ( WithPreCommitTopology WithPostCommitTopology によって拡張された演算子を含む)の投機的実行を無効にします。 ユーザの経験が浅い場合、同時コミットによって予期せぬ問題が発生する可能性があるからです。 また、コミッターがバッチジョブのボトルネックになる可能性はほとんどありません。

投機的実行の有効性を確認する #

投機的実行有効にした後、投機手実行をトリガーする遅いタスクがある場合、web UIはジョブページの頂点のSubTasksタブに投機的実行が表示されます。web UIでは、Flinkクラスタの概要Task Managersページにブロックされたtaskmanagerも表示されます。

これらのメトリクスをチェックして、投機的実行の効果を確認することもできます。

Back to top

inserted by FC2 system