チェックポイント

Flinkの各関数やオペレータはstatefulかもしれません (詳細は状態との連携を見てください)。Stateful functions store data across the processing of individual elements/events, making state a critical building block for any type of more elaborate operation.

状態を耐障害性のあるものにするためには、Flinkは状態をチェックポイント する必要があります。失敗のない実行としてアプリケーションに同じセマンティクスを与えるために、チェックポイントによりFlinkはストリーム内の状態と位置を回復することができます

ストリーミングの耐障害性のドキュメント はFlinkのストリーミングの耐障害性の仕組みの背景にある技術を詳細に説明します。

必要条件

Flinkのチェックポイントの仕組みはストリームと状態のための耐久力のあるストレージとやりとりをします。一般的に、以下が必要です:

  • 一定期間の間のレコードを再生することができる永続的 (あるいは 耐久性のある) データソース。そのようなソースの例は、永続的なメッセージキュー (例えば Apache Kafka, RabbitMQ, Amazon Kinesis, Google PubSub) あるいはファイルシステム (例えば HDFS, S3, GFS, NFS, Ceph, …) です。
  • 状態のための永続的なストレージ、一般的に分散ファイルシステム (例えば HDFS, S3, GFS, NFS, Ceph, …)

チェックポイントの有効化と設定

デフォルトではチェックポイントは無効です。チェックポイントを有効にするには、StreamExecutionEnvironment上でenableCheckpointing(n)を呼びます。ここで、n はチェックポイントのミリ秒の間隔です。

チェックポイントのための他のパラメータには以下が含まれます:

  • 確実に1回 vs. 少なくとも1回: 2つの保証レベルの間で選択するために任意でenableCheckpointing(n) メソッドにモードを渡すことができます。確実に1回がほとんどのアプリケーションで好ましいです。少なくとも1回は特定の超低レイテンシ(確実に2,3ミリ秒)アプリケーションに適しているかもしれません。

  • チェックポイントのタイムアウト: 進行中のチェックポイントがその時間までに完了しなかった場合は、それが破棄されるまでの時間。

  • チェックポイント間の最小の時間: ストリーミング アプリケーションがチェックポイント間で幾らかの進捗をすることを確実にするために、チェックポイント間でどれだけの時間が過ぎる必要があるかを定義することができます。この値が例えば5000に設定された場合、チェックポイントの持続期間とチェックポイントの間隔に関係なく、次のチェックポイントは以前のチェックポイントが完了してから5秒未満では開始されないでしょう。これはチェックポイントの間隔がこのパラメータより小さくならないだろう事を意味することに注意してください。

    (例えば目的のストレージシステムが一時的に遅い場合は)チェックポイントは時には平均より長く掛かるかもしれないという事実に“チェックポイントの間の時間”は影響を受けないため、“チェックポイントの間の時間”そしてチェックポイントの間隔を定義することでアプリケーションを設定することはしばしば簡単です。

    この値は同時に起こるチェックポイントの数が1つであることも意味することに注意してください。

  • 同時に起こるチェックポイントの数: デフォルトでは、システムは1つのチェックポイントがまだ実行中の間は他のチェックポイントは引き起こされないでしょう。これは、トポロジはあまりに長くの時間をチェックポイントに費やさず、ストリームを処理しながら進行しないことを確実にします。複数のオーバーラップするチェックポイントを許可することは可能です。これは(例えば関数が応答をするのに少し時間が必要な外部サービスを呼び出すため)ある程度の処理遅延を持つが障害時に再処理にほとんど掛からないためにとても頻繁なチェックポイント(ミリ秒の100個)をまだしたいパイプラインにとって興味深いです。

    このオプションはチェックポイント間の最小時間が定義されている場合は使うことができません。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// start a checkpoint every 1000 ms
env.enableCheckpointing(1000);

// advanced options:

// set mode to exactly-once (this is the default)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

// make sure 500 ms of progress happen between checkpoints
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);

// checkpoints have to complete within one minute, or are discarded
env.getCheckpointConfig().setCheckpointTimeout(60000);

// allow only one checkpoint to be in progress at the same time
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
val env = StreamExecutionEnvironment.getExecutionEnvironment()

// start a checkpoint every 1000 ms
env.enableCheckpointing(1000)

// advanced options:

// set mode to exactly-once (this is the default)
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)

// make sure 500 ms of progress happen between checkpoints
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)

// checkpoints have to complete within one minute, or are discarded
env.getCheckpointConfig.setCheckpointTimeout(60000)

// allow only one checkpoint to be in progress at the same time
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)

上に戻る

状態バックエンドの選択

The checkpointing mechanism stores the progress in the data sources and data sinks, the state of windows, as well as the user-defined state consistently to provide exactly once processing semantics. チェックポイントがどこに(例えば、ジョブマネージャのメモリ、ファイルシステム、データベース)格納されるかは設定された状態バックエンドに依存します。

デフォルトでは、状態はメモリ内に維持され、チェックポイントはマスターノード(ジョブマネージャ)のメモリ内に格納されるでしょう。大きな状態の適切な永続性のために、Flinkは状態バックエンドと呼ばれる様々な形式の格納とチェックポイント状態をサポートします。これらはStreamExecutionEnvironment.setStateBackend(…)を使って設定することができます。

利用可能なバックエンドとジョブ単位およびクラスタ単位の設定のためのオプションについての詳細は状態バックエンドを見てください。

繰り返しジョブでの状態のチェックポイント

Flinkは現在のところ繰り返し無しのジョブのための処理の保証のみを提供します。繰り返しのジョブのチェックポイントを有効にすると例外が起きます。繰り返しのプログラム上でチェックポイントを強制するには、チェックポイントを有効にする時に特別なフラグを設定する必要があります: env.enableCheckpointing(interval, force = true).

ループで処理中のレコード (およびそれらに関連する状態の変更)は、障害時に喪失されるでしょう。

上に戻る

ストラテジの再実行

Flinkは障害時にジョブがどのように再起動するかを制御する異なる再起動戦略をサポートします。詳しい情報については、再起動戦略を見てください。

上に戻る

TOP
inserted by FC2 system