セーブポイント

概要

セーブポイントは、Flinkプログラムを停止および再開、あるいは更新するために使うことができるチェックポイントを外部的に格納します。それらはストリーミングプログラムの状態のスナップショットを作成し、チェックポイントのメタデータを外部ファイルシステムに書き出すために、Flinkのチェックポイントの仕組み を使います。

このページはセーブポイントの起動、再開、廃棄に伴う全てのステップをカバーします。プログラムとFlinkのバージョン間でアップグレードできるようにするためには、オペレータにIDを割り当てについての章を調べることが重要です。

一般的にFlinkが状態および障害を扱う方法についての詳細は、ストリーミング プログラムでの状態ページを調べてください。

オペレータIDの割り当て

将来プログラムをアップグレードできるようにするために、この章で説明されるようにプログラムを調整することを強くお勧めします。主要な必要とされる変更は、uid(String)メソッドを使って手動でオペレータIDを指定することです。これらのIDは各オペレータの状態を探すために使われます。

DataStream<String> stream = env.
  // Stateful source (e.g. Kafka) with ID
  .addSource(new StatefulSource())
  .uid("source-id") // ID for the source operator
  .shuffle()
  // Stateful mapper with ID
  .map(new StatefulMapper())
  .uid("mapper-id") // ID for the mapper
  // Stateless printing sink
  .print(); // Auto-generated ID

IDを手動で指定しない場合、それらは自動的に生成されるでしょう。これらのIDが変更されない限りはセーブポイントから自動的に回復することができます。生成されたIDはプログラムの構造に依存し、プログラムの変更に敏感です。従って、これらのIDを手動で割り当てることを強くお勧めします。

セーブポイントの状態

各statefulオペレータについて、セーブポイントをOperator ID -> State のマップを持つと考えることができます:

Operator ID | State
------------+------------------------
source-id   | State of StatefulSource
mapper-id   | State of StatefulMapper

In the above example, the print sink is stateless and hence not part of the savepoint state. By default, we try to map each entry of the savepoint back to the new program.

操作

コマンドライン クライアントセーブポイントを起動, セーブポイントを持つジョブを中止, セーブポイントからの再開, およびセーブポイントの破棄するために使うことができます。

セーブポイントの起動

セーブポイントを起動する時、チェックポイントのメタデータを含む1つのセーブポイントファイルが生成されます。実際のチェックポイントの状態は設定されたチェックポイントのディレクトリ内あたりにあるでしょう。例えば、FsStateBackend あるいは RocksDBStateBackendを使うと:

# Savepoint file contains the checkpoint meta data
/savepoints/savepoint-123123

# Checkpoint directory contains the actual state
/checkpoints/:jobid/chk-:id/...

セーブポイントファイルは通常実際のチェックポイントされた状態よりもかなり小さいです。MemoryStateBackendを使う場合、セーブポイントファイルは自己内包して全ての状態を含むだろうことに注意してください。

セーブポイントを起動

$ bin/flink savepoint :jobId [:targetDirectory]

これはID :jobidを持つジョブのためのセーブポイントを起動するでしょう。更に、セーブポイントを格納する目的のファイルシステム ディレクトリを指定することができます。ディレクトリはジョブマネージャーによってアクセス可能である必要があります。

目的のディレクトリを指定しない場合、設定されたデフォルトのディレクトリを持つ必要があります。そうでなければ、セーブポイントの起動は失敗するでしょう。

セーブポイントを使ってジョブを取り消す

$ bin/flink cancel -s [:targetDirectory] :jobId

これはID :jobid を持つジョブのためのセーブポイントをアトミックに起動し、ジョブをキャンセルするでしょう。更に、セーブポイントを格納する目的のファイルシステム ディレクトリを指定することができます。ディレクトリはジョブマネージャーによってアクセス可能である必要があります。

目的のディレクトリを指定しない場合、設定されたデフォルトのディレクトリを持つ必要があります。そうでなければ、セーブポイントを持つジョブの中止は失敗するでしょう。

セーブポイントからの再開

$ bin/flink run -s :savepointPath [:runArgs]

これはジョブをサブミットしセーブポイントのパスを指定します。実行はそれぞれのセーブポイントの状態から再開するでしょう。セーブポイントファイルはチェックポイントのメタデータを保持し、実際のチェックポイントファイルを指します。セーブポイントファイルが通常実際のチェックポイントの状態よりもはるかに小さいのはこのためです。

回復されない状態の許容

デフォルトでは、再開オペレーションはセーブポイントの全ての状態を回復しようとしているプログラムにマップしようとするでしょう。オペレータを落とした場合、--allowNonRestoredState (短縮: -n) オプションを使って、新しいプログラムにマップすることができない状態をスキップすることができます。

$ bin/flink run -s :savepointPath -n [:runArgs]

セーブポイントの廃棄

$ bin/flink savepoint -d :savepointPath

これは :savepointPathに格納されているセーブポイントを破棄します。

セーブポイントは常にファイルシステムに行くため、通常のファイルシステムの操作を通じてセーブポイントを手動で削除することが可能なことに注意してください。しかしセーブポイントは実際のチェックポイントデータを指し示すメタデータのみを格納することを忘れないでください。従って、もし手動でセーブポイントを削除したい場合は、チェックポイント ファイルも含む必要があるでしょう。現在のところ、どのようにセーブポイントがチェックポイントにマップするかを見つけ出す素直な方法が無いため、上で説明したようにこのためにセーブポイントツールを使うことをお勧めします。

設定

state.savepoints.dir キーを使ってデフォルトのセーブポイントの目的ディレクトリを設定できます。セーブポイントを起動する時、このディレクトリはセーブポイントのメタデータを格納するために使われるでしょう。起動コマンドを使って、独自の目的ディレクトリを指定することでデフォルトを上書きすることができます (:targetDirectory 引数を見てください)。

# Default savepoint target directory
state.savepoints.dir: hdfs:///flink/savepoints

デフォルトを設定せず、独自の目的ディレクトリも指定しない場合、セーブポイントの起動は失敗するでしょう。

F.A.Q

ジョブ内の全てのオペレータにIDを割り当てる必要がありますか?

経験則では、はい、です。厳密に言うと、uidメソッドを使ってIDをジョブ内のstatefulオペレータへ割り当てるだけで十分です。セーブポイントはこれらのオペレータのための状態のみを含み、オペレータはセーブポイントの一部ではありません。

実際問題としては、ウィンドウ操作のオペレータのようなFlinkの組み込みのオペレータのいくつかもstatefulでどの組み込みのオペレータが実際にstatefulかどれがそうでないかが明らかではないため、全てのオペレータに割り当てることをお勧めします。オペレータがstatelessだと絶対に確認している場合は、uidメソッドをスキップすることができます。

なぜセーブポイントファイルはそんなに小さいのでしょうか?

セーブポイントファイルはチェックポイントのメタデータのみを含み、チェックポイントの状態へのポインタを持ちます。チェックポイントの状態は通常はより大きいです。MemoryStateBackendを使う場合、チェックポイントは全ての状態を含みますが、バックエンドによって小さな状態に制約されるでしょう。

状態を必要とする新しいオペレータをジョブに追加する時に何が起きますか?

新しいオペレータをジョブに追加する場合、それは状態無しで初期化されるでしょう。セーブポイントは各statefulオペレータの状態を含みます。statelessのオペレータは単なるセーブポイントの一部ではありません。新しいオペレータはstatelessオペレータと似た挙動をします。

状態を持つオペレータをジョブから削除する時に何が起きますか?

デフォルトでは、セーブポイントは全ての状態を回復されたジョブに合致しようとするでしょう。削除されたオペレータについての状態を含むセーブポイントから回復する場合、その結果失敗するでしょう。

実行コマンドと一緒に--allowNonRestoredState (短縮: -n) を設定することで回復無しの状態にすることができます。

$ bin/flink run -s :savepointPath -n [:runArgs]

statefulなオペレータをジョブ内で追加注文した時に何が起きますか?

IDをこれらのオペレータに割り当てた場合、それらは通常通りに回復されるでしょう。

IDを割り当てなかった場合、statefulオペレータの自動生成されたIDはおそらく追加注文の後で変更されるでしょう。これは以前のセーブポイントから回復することができない結果になるでしょう。

状態を持たないオペレータを追加あるいは削除あるいは追加注文した時に何が起きますか?

statefulオペレータにIDを割り当てる場合、statelessオペレータはセーブポイントの回復に影響しないでしょう。

IDを割り当てなかった場合、statefulオペレータの自動生成されたIDはおそらく追加注文の後で変更されるでしょう。これは以前のセーブポイントから回復することができない結果になるでしょう。

回復時にプログラムの並行堂を変更すると何が起きますか?

セーブポイントがFlink >= 1.2.0 を使って起動され、Checkpointedのような非推奨ではない状態APIを使う場合、セーブポイントから簡単にプログラムを回復し、新し並行度を指定することができます。

Flink < 1.2.0 を使って起動されるか、今では非推奨のAPIを使ってセーブポイントから再開する場合、並行度を変更する前に、まずFlink 1.2.0へジョブとセーブポイントを移設する必要があります。ジョブとFlinkのバージョンのアップグレード ガイドを見てください。

現在の制限

  • チェイン: 繋げられたオペレータは最初のタスクのIDによって識別されます。繋げられたタスクの中間にIDを手動で割り当てることはできません。例えば、[ a -> b -> c ]のチェインでは、a だけが手動で割り当てられたIDを持つことができますが、b または cはできません。これの代替策として手動でタスクのチェインを定義することができます。自動的なIDの割り当てを頼みにする場合、繋がりの挙動内での変更はIDも変更するでしょう。
TOP
inserted by FC2 system