Checkpointing
This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.

チェックポイント #

Flinkの全ての関数とオペレータはステートフルにすることができます (詳細はworking with stateを参照してください)。 ステートフル関数は個々の要素/イベントの処理全体にわたってデータを保存し、状態をあらゆるタイプのより複雑なオペレータにとって重要な構成要素にします。

状態を耐障害性のあるものにするために、Flinkは状態をチェックポイントする必要があります。チェックポイントを使ってFlinkがストリーム内の状態と位置を回復して、アプリケーションに失敗のない実行と同じセマンティクスを与えることができます。

ストリーミング耐障害性のドキュメントでは、Flinkのストリーミングの耐障害性の仕組みの背後にある技術が詳しく説明されています。

必要条件 #

Flinkのチェックポイントの仕組みはストリームと状態のための耐久力のあるストレージとやりとりをします。一般的に、以下が必要です:

  • 一定期間レコードを再生できる永続的 (または耐久力のある)データソース。そのようなソースの例は、永続的なメッセージキュー(例えば Apache Kafka, RabbitMQ, Amazon Kinesis, Google PubSub)あるいはファイルシステム(例えば HDFS, S3, GFS, NFS, Ceph, …)です。
  • 状態のための永続的なストレージ、一般的に分散ファイルシステム (例えば HDFS, S3, GFS, NFS, Ceph, …)

チェックポイントの有効化と設定 #

デフォルトではチェックポイントは無効です。チェックポイントを有効にするには、StreamExecutionEnvironmentenableCheckpointing(n)を呼び出します。ここで、nチェックポイント間隔のミリ秒です。

チェックポイントのための他のパラメータには以下が含まれます:

  • チェックポイントストレージ: チェックポイントのスナップショットを永続的に保存する場所を設定できます。デフォルトではFlinkはJobManagerのヒープを使います。プロダクションのデプロイメントでは、代わりに耐久性のあるファイルシステムを使うことをお勧めします。ジョブ全体およびクラスタ全体の設定で利用可能なオプションの詳細については、チェックポイントストレージを参照してください。

  • 確実に1回 vs. 少なくとも1回: 2つの保証レベルの間で選択するために任意でenableCheckpointing(n)メソッドにモードを渡すことができます。 確実に1回がほとんどのアプリケーションで好ましいです。少なくとも1回は特定の超低レイテンシ(確実に2,3ミリ秒)アプリケーションに適しているかもしれません。

  • チェックポイントのタイムアウト: 進行中のチェックポイントがその時間までに完了しなかった場合は、それが破棄されるまでの時間。

  • チェックポイント間の最小の時間: ストリーミング アプリケーションがチェックポイント間で幾らかの進捗をすることを確実にするために、チェックポイント間でどれだけの時間が過ぎる必要があるかを定義することができます。この値が例えば 5000に設定された場合、チェックポイントの持続期間とチェックポイントの間隔に関係なく、次のチェックポイントは以前のチェックポイントが完了してから5秒未満では開始されないでしょう。 これはチェックポイントの間隔がこのパラメータより小さくならないだろう事を意味することに注意してください。

    (例えば目的のストレージシステムが一時的に遅い場合は)チェックポイントは時には平均より長く掛かるかもしれないという事実に“チェックポイントの間の時間”はあまり影響を受けないため、チェックポイントの間隔よりも“チェックポイントの間の時間”を定義してアプリケーションを設定することのほうが簡単です。

    この値は同時に起こるチェックポイントの数が1つであることも意味することに注意してください。

  • 許容チェックポイント失敗数: これは、ジョブ全体がフェイルオーバーされるまでに許容される連続するチェックポイントの失敗の回数を定義します。デフォルト値は0で、チェックポイントの失敗は許容されず、最初に報告されたチェックポイントの失敗でジョブが失敗することを意味します。 これは次の失敗理由のみに適用されます: JobManagerでのIOException、Task Managerの非同期フェーズの失敗、タイムアウトによるチェックポイントの有効期限切れ。Task Managerの同期フェーズに起因する失敗は、影響を受けるタスクのフェイルオーバーを常に強制します。Other types of checkpoint failures (such as checkpoint being subsumed) are being ignored.

  • 同時チェックポイントの数: デフォルトでは、システムはチェックポイントの進行中は別のチェックポイントをトリガーしません。 これは、トポロジはあまりに長くの時間をチェックポイントに費やさず、ストリームを処理しながら進行しないことを確実にします。 複数のオーバーラップするチェックポイントを許可することは可能です。これは(例えば関数が応答をするのに少し時間が必要な外部サービスを呼び出すため)ある程度の処理遅延を持つが障害時に再処理にほとんど掛からないためにとても頻繁なチェックポイント(ミリ秒の100個)をまだしたいパイプラインにとって興味深いです。

    このオプションはチェックポイント間の最小時間が定義されている場合は使うことができません。

  • 外部化チェックポイント: 定期的なチェックポイントを外部に永続化するように設定できます。外部化されたチェックポイントはメタデータを永続ストレージに書き込み、ジョブが失敗する時に自動的にクリーンアップされません。このやり方で、ジョブが失敗した時に再開するためのチェックポイントをそのあたりに持つでしょう。詳細については、[外部化チェックポイントに関するデプロイノート](/docs/ops/state/checkpoints/#externalized-checkpointsを参照してください。

  • 整列されていないチェックポイント: 整列されていないチェックポイントを有効にして、バックプレッシャー時のチェックポイントを大幅に削減できます。これは、確実に1回のチェックポイントと1つの同時チェックポイントでのみ動作します。

  • 終了したタスクのチェックポイント: デフォルトでは、DAGの一部が全てのレコードの処理を終了した場合でも、Flinkはチェックポイントの実行を継続します。詳細は重要な考慮事項を参照してください。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// start a checkpoint every 1000 ms
env.enableCheckpointing(1000);

// advanced options:

// set mode to exactly-once (this is the default)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

// make sure 500 ms of progress happen between checkpoints
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);

// checkpoints have to complete within one minute, or are discarded
env.getCheckpointConfig().setCheckpointTimeout(60000);

// only two consecutive checkpoint failures are tolerated
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(2);

// allow only one checkpoint to be in progress at the same time
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

// enable externalized checkpoints which are retained
// after job cancellation
env.getCheckpointConfig().setExternalizedCheckpointCleanup(
    ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

// enables the unaligned checkpoints
env.getCheckpointConfig().enableUnalignedCheckpoints();

// sets the checkpoint storage where checkpoint snapshots will be written
env.getCheckpointConfig().setCheckpointStorage("hdfs:///my/checkpoint/dir");

// enable checkpointing with finished tasks
Configuration config = new Configuration();
config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
env.configure(config);
val env = StreamExecutionEnvironment.getExecutionEnvironment()

// start a checkpoint every 1000 ms
env.enableCheckpointing(1000)

// advanced options:

// set mode to exactly-once (this is the default)
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)

// make sure 500 ms of progress happen between checkpoints
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)

// checkpoints have to complete within one minute, or are discarded
env.getCheckpointConfig.setCheckpointTimeout(60000)

// only two consecutive checkpoint failures are tolerated
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(2)

// allow only one checkpoint to be in progress at the same time
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)

// enable externalized checkpoints which are retained 
// after job cancellation
env.getCheckpointConfig().setExternalizedCheckpointCleanup(
    ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)

// enables the unaligned checkpoints
env.getCheckpointConfig.enableUnalignedCheckpoints()

// sets the checkpoint storage where checkpoint snapshots will be written
env.getCheckpointConfig.setCheckpointStorage("hdfs:///my/checkpoint/dir")

// enable checkpointing with finished tasks
val config = new Configuration()
config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true)
env.configure(config)
env = StreamExecutionEnvironment.get_execution_environment()

# start a checkpoint every 1000 ms
env.enable_checkpointing(1000)

# advanced options:

# set mode to exactly-once (this is the default)
env.get_checkpoint_config().set_checkpointing_mode(CheckpointingMode.EXACTLY_ONCE)

# make sure 500 ms of progress happen between checkpoints
env.get_checkpoint_config().set_min_pause_between_checkpoints(500)

# checkpoints have to complete within one minute, or are discarded
env.get_checkpoint_config().set_checkpoint_timeout(60000)

# only two consecutive checkpoint failures are tolerated
env.get_checkpoint_config().set_tolerable_checkpoint_failure_number(2)

# allow only one checkpoint to be in progress at the same time
env.get_checkpoint_config().set_max_concurrent_checkpoints(1)

# enable externalized checkpoints which are retained after job cancellation
env.get_checkpoint_config().enable_externalized_checkpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)

# enables the unaligned checkpoints
env.get_checkpoint_config().enable_unaligned_checkpoints()

関連する設定オプション #

さらにいくつかのパラメータやデフォルトは、conf/flink-conf.yaml経由で設定できます (完全なガイドについては設定を参照してください):

Key Default Type Description
state.backend.incremental
false Boolean Option whether the state backend should create incremental checkpoints, if possible. For an incremental checkpoint, only a diff from the previous checkpoint is stored, rather than the complete checkpoint state. Once enabled, the state size shown in web UI or fetched from rest API only represents the delta checkpoint size instead of full checkpoint size. Some state backends may not support incremental checkpoints and ignore this option.
state.backend.local-recovery
false Boolean This option configures local recovery for this state backend. By default, local recovery is deactivated. Local recovery currently only covers keyed state backends (including both the EmbeddedRocksDBStateBackend and the HashMapStateBackend).
state.checkpoint-storage
(none) String The checkpoint storage implementation to be used to checkpoint state.
The implementation can be specified either via their shortcut name, or via the class name of a CheckpointStorageFactory. If a factory is specified it is instantiated via its zero argument constructor and its CheckpointStorageFactory#createFromConfig(ReadableConfig, ClassLoader) method is called.
Recognized shortcut names are 'jobmanager' and 'filesystem'.
state.checkpoints.dir
(none) String The default directory used for storing the data files and meta data of checkpoints in a Flink supported filesystem. The storage path must be accessible from all participating processes/nodes(i.e. all TaskManagers and JobManagers).
state.checkpoints.num-retained
1 Integer The maximum number of completed checkpoints to retain.
state.savepoints.dir
(none) String The default directory for savepoints. Used by the state backends that write savepoints to file systems (HashMapStateBackend, EmbeddedRocksDBStateBackend).
state.storage.fs.memory-threshold
20 kb MemorySize The minimum size of state data files. All state chunks smaller than that are stored inline in the root checkpoint metadata file. The max memory threshold for this configuration is 1MB.
state.storage.fs.write-buffer-size
4096 Integer The default size of the write buffer for the checkpoint streams that write to file systems. The actual write buffer size is determined to be the maximum of the value of this option and option 'state.storage.fs.memory-threshold'.
taskmanager.state.local.root-dirs
(none) String The config parameter defining the root directories for storing file-based state for local recovery. Local recovery currently only covers keyed state backends. If not configured it will default to <WORKING_DIR>/localState. The <WORKING_DIR> can be configured via process.taskmanager.working-dir

Back to top

チェックポイントストレージの選択 #

Flinkのチェックポイントの仕組みは、コネクタ、ウィンドウ、任意のユーザ定義の状態を含む、タイマーと状態オペレータの全ての状態の一貫したスナップショットを保持します。 チェックポイントが保存される場所(例えば、JobManagerのメモリ、ファイルシステム、データベース)は、設定されたチェックポイントストレージによって異なります。

デフォルトでは、チェックポイントはJobManagerのメモリに保存されます。大きな状態を適切に永続化するために、Flinkは他の場所に状態をチェックポイントするための様々な方法をサポートしています。 チェックポイントストレージの選択は、StreamExecutionEnvironment.getCheckpointConfig().setCheckpointStorage(…)を介して設定できます。 プロダクションデプロイメントでは、チェックポイントを高可用性ファイルシステムに保存することを強くお勧めします。

ジョブ全体およびクラスタ全体の設定で利用可能なオプションの詳細については、 チェックポイントストレージを参照してください。

繰り返しジョブでの状態のチェックポイント #

Flinkは現在のところ繰り返し無しのジョブのための処理の保証のみを提供します。繰り返しのジョブのチェックポイントを有効にすると例外が起きます。反復プログラムでチェックポイントを強制するには、チェックポイントを有効にする特別なフラグを設定する必要があります: env.enableCheckpointing(interval, CheckpointingMode.EXACTLY_ONCE, force = true)

ループで処理中のレコード (およびそれらに関連する状態の変更)は、障害時に喪失されるでしょう。

グラフの一部が完了した状態でのチェックポイント設定 #

Flink 1.14以降では、ジョブグラフの一部が全てのデータの処理を完了した場合でもチェックポイントの実行を続行できるようになりました。これは制限されたソースが含まれている場合に発生する可能性があります。この昨日は1.15以降デフォルトで有効になっており、機能フラグを使って無効にできます:

Configuration config = new Configuration();
config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, false);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);

タスク/サブタスクが完了すると、それらはチェックポイントに関与しなくなります。これは、カスタムオペレータやUDF(ユーザ定義関数)を実装する際の重要な考慮事項です。

終了したタスクのチェックポイント設定をサポートするために、タスクのライフサイクルを調整し、 StreamOperator#finish メソッドを導入しました。 このメソッドは残りのバッファ状態をフラッシュするための明確なカットオフポイントになることが期待されます。終了メソッドが呼び出された後に取得された全てのチェックポイントは、(ほとんどの場合は)空白であり、このデータを出力する方法がないため、バッファリングされたデータを含めるべきではありません。注目すべき例外の1つは、オペレータが外部システム内のトランダクションへのポインタを持っている場合です(つまり、確実に1回のセマンティクスを実装するため)。このような場合、finish()メソッドの呼び出し後に取得されたチェックポイントは、オペレータが閉じられる前の最後のチェックポイントでコミットされる最後のトランザクションへのポインタを保持する必要があります。この組み込みの良い例は、確実に1回のシンクとTwoPhaseCommitSinkFunctionです。

これはオペレータの状態にどのような影響を与えるのでしょうか? #

UnionListStateには特別な処理があり、外部システムのオフセットに対するグローバルビューを実装するために良く使われます(つまり、Kafkaのパーティションの現在のオフセットを保持します)。closeメソッドが呼ばれた単一のサブタスクの状態を破棄していたら、割り当てられていたパーティションのオフセットが失われていたでしょう。この問題を回避するために、UnionListStateを使うサブタスクが全く終了しないか、全てが終了した場合にのみチェックポイントを成功させます。

ListStateが同様の方法で使われているのを見たことが無いですが、closeメソッドの後にチェックポイントされた状態はすべて破棄され、回復後には使えなくなることに注意してください。

再スケーリングの準備ができているオペレータは、部分的に終了するタスクでも適切に動作するはずです。 タスクのサブセットのみが終了したチェックポイントからの回復は、実行中のタスクと同じ数の新しいサブタスクを含むタスクを回復することと同じです。

タスク終了前に最終チェックポイントを待機 #

2フェーズコミットを使ってオペレータに対して全てのレコードを確実にコミットできるようにするために、タスクは全てのオペレータが終了した後、最終チェックポイントが正常に完了するまで待機します。 最終チェックポイントは全てのオペレータがデータの終わりに到達した直後に定期的なトリガーを待たずにトリガーされますが、ジョブはこの最終チェックポイントが完了するまで待つ必要があります。

Back to top

inserted by FC2 system