セーブポイント

概要

Savepoints are externally stored self-contained checkpoints that you can use to stop-and-resume or update your Flink programs. They use Flink’s checkpointing mechanism to create a (non-incremental) snapshot of the state of your streaming program and write the checkpoint data and meta data out to an external file system.

このページはセーブポイントの起動、再開、廃棄に伴う全てのステップをカバーします。一般的にFlinkが状態および障害を扱う方法についての詳細は、ストリーミング プログラムでの状態ページを調べてください。

Attention: In order to allow upgrades between programs and Flink versions, it is important to check out the following section about assigning IDs to your operators.

オペレータIDの割り当て

将来プログラムをアップグレードできるようにするために、この章で説明されるようにプログラムを調整することを強くお勧めします。主要な必要とされる変更は、uid(String)メソッドを使って手動でオペレータIDを指定することです。これらのIDは各オペレータの状態を探すために使われます。

DataStream<String> stream = env.
  // Stateful source (e.g. Kafka) with ID
  .addSource(new StatefulSource())
  .uid("source-id") // ID for the source operator
  .shuffle()
  // Stateful mapper with ID
  .map(new StatefulMapper())
  .uid("mapper-id") // ID for the mapper
  // Stateless printing sink
  .print(); // Auto-generated ID

IDを手動で指定しない場合、それらは自動的に生成されるでしょう。これらのIDが変更されない限りはセーブポイントから自動的に回復することができます。生成されたIDはプログラムの構造に依存し、プログラムの変更に敏感です。従って、これらのIDを手動で割り当てることを強くお勧めします。

セーブポイントの状態

各statefulオペレータについて、セーブポイントをOperator ID -> State のマップを持つと考えることができます:

Operator ID | State
------------+------------------------
source-id   | State of StatefulSource
mapper-id   | State of StatefulMapper

In the above example, the print sink is stateless and hence not part of the savepoint state. By default, we try to map each entry of the savepoint back to the new program.

操作

コマンドライン クライアントセーブポイントを起動, セーブポイントを持つジョブを中止, セーブポイントからの再開, およびセーブポイントの破棄するために使うことができます。

With Flink >= 1.2.0 it is also possible to resume from savepoints using the webui.

セーブポイントの起動

When triggering a savepoint, a new savepoint directory beneath the target directory is created. In there, the data as well as the meta data will be stored. For example with a FsStateBackend or RocksDBStateBackend:

# Savepoint target directory
/savepoints/

# Savepoint directory
/savepoints/savepoint-:shortjobid-:savepointid/

# Savepoint file contains the checkpoint meta data
/savepoints/savepoint-:shortjobid-:savepointid/_metadata

# Savepoint state
/savepoints/savepoint-:shortjobid-:savepointid/...
Note: Although it looks as if the savepoints may be moved, it is currently not possible due to absolute paths in the _metadata file. Please follow FLINK-5778 for progress on lifting this restriction.

Note that if you use the MemoryStateBackend, metadata and savepoint state will be stored in the _metadata file. Since it is self-contained, you may move the file and restore from any location.

セーブポイントを起動

$ bin/flink savepoint :jobId [:savepointDirectory]

This will trigger a savepoint for the job with ID :jobId, and returns the path of the created savepoint. savepoints を回復および破棄するにはこのパスが必要です。

Furthermore, you can optionally specify a target file system directory to store the savepoint in. ディレクトリはジョブマネージャーによってアクセス可能である必要があります。

If you don’t specify a target directory, you need to have configured a default directory (see Savepoints). そうでなければ、セーブポイントの起動は失敗するでしょう。

Trigger a Savepoint with YARN

$ bin/flink savepoint :jobId [:savepointDirectory] -yid :yarnAppId

This will trigger a savepoint for the job with ID :jobId and YARN application ID :yarnAppId, and returns the path of the created savepoint.

Everything else is the same as described in the above Trigger a Savepoint section.

セーブポイントを使ってジョブを取り消す

$ bin/flink cancel -s [:targetDirectory] :jobId

これはID :jobid を持つジョブのためのセーブポイントをアトミックに起動し、ジョブをキャンセルするでしょう。更に、セーブポイントを格納する目的のファイルシステム ディレクトリを指定することができます。ディレクトリはジョブマネージャーによってアクセス可能である必要があります。

目的のディレクトリを指定しない場合、設定されたデフォルトのディレクトリを持つ必要があります。そうでなければ、セーブポイントを持つジョブの中止は失敗するでしょう。

セーブポイントからの再開

$ bin/flink run -s :savepointPath [:runArgs]

This submits a job and specifies a savepoint to resume from. You may give a path to either the savepoint’s directory or the _metadata file.

回復されない状態の許容

デフォルトでは、再開オペレーションはセーブポイントの全ての状態を回復しようとしているプログラムにマップしようとするでしょう。オペレータを落とした場合、--allowNonRestoredState (短縮: -n) オプションを使って、新しいプログラムにマップすることができない状態をスキップすることができます。

$ bin/flink run -s :savepointPath -n [:runArgs]

セーブポイントの廃棄

$ bin/flink savepoint -d :savepointPath

これは :savepointPathに格納されているセーブポイントを破棄します。

Note that it is possible to also manually delete a savepoint via regular file system operations without affecting other savepoints or checkpoints (recall that each savepoint is self-contained). Up to Flink 1.2, this was a more tedious task which was performed with the savepoint command above.

設定

state.savepoints.dir キーを使ってデフォルトのセーブポイントの目的ディレクトリを設定できます。When triggering savepoints, this directory will be used to store the savepoint. 起動コマンドを使って、独自の目的ディレクトリを指定することでデフォルトを上書きすることができます (:targetDirectory 引数を見てください)。

# Default savepoint target directory
state.savepoints.dir: hdfs:///flink/savepoints

デフォルトを設定せず、独自の目的ディレクトリも指定しない場合、セーブポイントの起動は失敗するでしょう。

F.A.Q

ジョブ内の全てのオペレータにIDを割り当てる必要がありますか?

経験則では、はい、です。厳密に言うと、uidメソッドを使ってIDをジョブ内のstatefulオペレータへ割り当てるだけで十分です。セーブポイントはこれらのオペレータのための状態のみを含み、オペレータはセーブポイントの一部ではありません。

実際問題としては、ウィンドウ操作のオペレータのようなFlinkの組み込みのオペレータのいくつかもstatefulでどの組み込みのオペレータが実際にstatefulかどれがそうでないかが明らかではないため、全てのオペレータに割り当てることをお勧めします。オペレータがstatelessだと絶対に確認している場合は、uidメソッドをスキップすることができます。

状態を必要とする新しいオペレータをジョブに追加する時に何が起きますか?

新しいオペレータをジョブに追加する場合、それは状態無しで初期化されるでしょう。セーブポイントは各statefulオペレータの状態を含みます。statelessのオペレータは単なるセーブポイントの一部ではありません。新しいオペレータはstatelessオペレータと似た挙動をします。

状態を持つオペレータをジョブから削除する時に何が起きますか?

デフォルトでは、セーブポイントは全ての状態を回復されたジョブに合致しようとするでしょう。削除されたオペレータについての状態を含むセーブポイントから回復する場合、その結果失敗するでしょう。

実行コマンドと一緒に--allowNonRestoredState (短縮: -n) を設定することで回復無しの状態にすることができます。

$ bin/flink run -s :savepointPath -n [:runArgs]

statefulなオペレータをジョブ内で追加注文した時に何が起きますか?

IDをこれらのオペレータに割り当てた場合、それらは通常通りに回復されるでしょう。

IDを割り当てなかった場合、statefulオペレータの自動生成されたIDはおそらく追加注文の後で変更されるでしょう。これは以前のセーブポイントから回復することができない結果になるでしょう。

状態を持たないオペレータを追加あるいは削除あるいは追加注文した時に何が起きますか?

statefulオペレータにIDを割り当てる場合、statelessオペレータはセーブポイントの回復に影響しないでしょう。

IDを割り当てなかった場合、statefulオペレータの自動生成されたIDはおそらく追加注文の後で変更されるでしょう。これは以前のセーブポイントから回復することができない結果になるでしょう。

回復時にプログラムの並行堂を変更すると何が起きますか?

セーブポイントがFlink >= 1.2.0 を使って起動され、Checkpointedのような非推奨ではない状態APIを使う場合、セーブポイントから簡単にプログラムを回復し、新し並行度を指定することができます。

If you are resuming from a savepoint triggered with Flink < 1.2.0 or using now deprecated APIs you first have to migrate your job and savepoint to Flink >= 1.2.0 before being able to change the parallelism. ジョブとFlinkのバージョンのアップグレード ガイドを見てください。

上に戻る

TOP
inserted by FC2 system