チェックポイント

Flinkの各関数やオペレータはstatefulかもしれません (詳細は状態との連携を見てください)。Stateful functions store data across the processing of individual elements/events, making state a critical building block for any type of more elaborate operation.

状態を耐障害性のあるものにするためには、Flinkは状態をチェックポイント する必要があります。失敗のない実行としてアプリケーションに同じセマンティクスを与えるために、チェックポイントによりFlinkはストリーム内の状態と位置を回復することができます

ストリーミングの耐障害性のドキュメント はFlinkのストリーミングの耐障害性の仕組みの背景にある技術を詳細に説明します。

必要条件

Flinkのチェックポイントの仕組みはストリームと状態のための耐久力のあるストレージとやりとりをします。一般的に、以下が必要です:

  • 一定期間の間のレコードを再生することができる永続的 (あるいは 耐久性のある) データソース。そのようなソースの例は、永続的なメッセージキュー (例えば Apache Kafka, RabbitMQ, Amazon Kinesis, Google PubSub) あるいはファイルシステム (例えば HDFS, S3, GFS, NFS, Ceph, …) です。
  • 状態のための永続的なストレージ、一般的に分散ファイルシステム (例えば HDFS, S3, GFS, NFS, Ceph, …)

チェックポイントの有効化と設定

デフォルトではチェックポイントは無効です。To enable checkpointing, call enableCheckpointing(n) on the StreamExecutionEnvironment, where n is the checkpoint interval in milliseconds.

Other parameters for checkpointing include:

  • exactly-once vs. at-least-once: You can optionally pass a mode to the enableCheckpointing(n) method to choose between the two guarantee levels. Exactly-once is preferrable for most applications. At-least-once may be relevant for certain super-low-latency (consistently few milliseconds) applications.

  • checkpoint timeout: The time after which a checkpoint-in-progress is aborted, if it did not complete by then.

  • minimum time between checkpoints: To make sure that the streaming application makes a certain amount of progress between checkpoints, one can define how much time needs to pass between checkpoints. If this value is set for example to 5000, the next checkpoint will be started no sooner than 5 seconds after the previous checkpoint completed, regardless of the checkpoint duration and the checkpoint interval. Note that this implies that the checkpoint interval will never be smaller than this parameter.

    It is often easier to configure applications by defining the “time between checkpoints” than the checkpoint interval, because the “time between checkpoints” is not susceptible to the fact that checkpoints may sometimes take longer than on average (for example if the target storage system is temporarily slow).

    Note that this value also implies that the number of concurrent checkpoints is one.

  • number of concurrent checkpoints: By default, the system will not trigger another checkpoint while one is still in progress. This ensures that the topology does not spend too much time on checkpoints and not make progress with processing the streams. It is possible to allow for multiple overlapping checkpoints, which is interesting for pipelines that have a certain processing delay (for example because the functions call external services that need some time to respond) but that still want to do very frequent checkpoints (100s of milliseconds) to re-process very little upon failures.

    This option cannot be used when a minimum time between checkpoints is defined.

  • externalized checkpoints: You can configure periodic checkpoints to be persisted externally. Externalized checkpoints write their meta data out to persistent storage and are not automatically cleaned up when the job fails. このやり方で、ジョブが失敗した時に再開するためのチェックポイントをそのあたりに持つでしょう。There are more details in the deployment notes on externalized checkpoints.

  • fail/continue task on checkpoint errors: This determines if a task will be failed if an error occurs in the execution of the task’s checkpoint procedure. This is the default behaviour. Alternatively, when this is disabled, the task will simply decline the checkpoint to the checkpoint coordinator and continue running.

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// start a checkpoint every 1000 ms
env.enableCheckpointing(1000);

// advanced options:

// set mode to exactly-once (this is the default)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

// make sure 500 ms of progress happen between checkpoints
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);

// checkpoints have to complete within one minute, or are discarded
env.getCheckpointConfig().setCheckpointTimeout(60000);

// allow only one checkpoint to be in progress at the same time
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

// enable externalized checkpoints which are retained after job cancellation
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
val env = StreamExecutionEnvironment.getExecutionEnvironment()

// start a checkpoint every 1000 ms
env.enableCheckpointing(1000)

// advanced options:

// set mode to exactly-once (this is the default)
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)

// make sure 500 ms of progress happen between checkpoints
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)

// checkpoints have to complete within one minute, or are discarded
env.getCheckpointConfig.setCheckpointTimeout(60000)

// prevent the tasks from failing if an error happens in their checkpointing, the checkpoint will just be declined.
env.getCheckpointConfig.setFailTasksOnCheckpointingErrors(false)

// allow only one checkpoint to be in progress at the same time
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)

Some more parameters and/or defaults may be set via conf/flink-conf.yaml (see configuration for a full guide):

  • state.backend: チェックポイントが有効な場合、オペレータの状態のチェックポイントが格納されるために使われるであろうバックエンド。サポートされるバックエンド:
    • jobmanager: インメモリの状態、ジョブマネージャー/ZooKeeperのメモリにバックアップします。最小の状態(Kafkaオフセット)あるいはテストおよびローカルデバッギングのためにのみ使われるべきです。
    • filesystem: 状態はタスクマネージャー上のメモリ内にあり、状態のスナップショットはファイルシステムに格納されます。例えば HDFS、S3、… のFlinkによってサポートされる全てのファイルシステムによってサポートされます。
  • state.backend.fs.checkpointdir: Flinkがサポートするファイルシステム内に格納されるチェックポイントのためのディレクトリ。注意: 状態のバックエンドはジョブマネージャーからアクセス可能でなければなりません。ローカルセットアップのためにはfile:// のみ使います。

  • state.backend.rocksdb.checkpointdir: RocksDBファイルを格納するためのローカルディレクトリ、あるいはシステムのディレクトリ デリミタ(例えば、Linux/Unix上での ':' (コロン)) によって区切られたディレクトリのリスト。(デフォルトの値は taskmanager.tmp.dirs)

  • state.checkpoints.dir: externalized checkpointsのメタデータのための対象ディレクトリ。

  • state.checkpoints.num-retained: The number of completed checkpoint instances to retain. Having more than one allows recovery fallback to an earlier checkpoints if the latest checkpoint is corrupt. (Default: 1)

上に戻る

状態バックエンドの選択

Flink’s checkpointing mechanism stores consistent snapshots of all the state in timers and stateful operators, including connectors, windows, and any user-defined state. Where the checkpoints are stored (e.g., JobManager memory, file system, database) depends on the configured State Backend.

By default, state is kept in memory in the TaskManagers and checkpoints are stored in memory in the JobManager. For proper persistence of large state, Flink supports various approaches for storing and checkpointing state in other state backends. The choice of state backend can be configured via StreamExecutionEnvironment.setStateBackend(…).

See state backends for more details on the available state backends and options for job-wide and cluster-wide configuration.

繰り返しジョブでの状態のチェックポイント

Flink currently only provides processing guarantees for jobs without iterations. Enabling checkpointing on an iterative job causes an exception. In order to force checkpointing on an iterative program the user needs to set a special flag when enabling checkpointing: env.enableCheckpointing(interval, force = true).

Please note that records in flight in the loop edges (and the state changes associated with them) will be lost during failure.

上に戻る

ストラテジの再実行

Flinkは障害時にジョブがどのように再起動するかを制御する異なる再起動戦略をサポートします。For more information, see Restart Strategies.

上に戻る

TOP
inserted by FC2 system