データストリーム API で書かれたプログラムはしばしば様々な形式で状態を保持します:
Checkpointed
インタフェースを実装するかもしれませんSee also state section in the streaming API guide.
チェックポインティングが活性化された場合、データの紛失に対する保護と一貫性のある回復のために、そのような状態はチェックポイント上に維持されます。内部的にどのように状態が表現されるか、そしてチェックポイント上にどのようにそしてどこに維持されるかは、選択された状態バックエンドに依存します。
そのままの状態で、Flinkはこれらの状態バックエンドをバンドルします:
何も設定されない場合は、システムは MemoryStateBackend を使うでしょう。
MemoryStateBackend は内部的にデータをJavaヒープ上にオブジェクトとして保持します。Key/value 状態とウィンドウ オペレータは値、トリガーなどを格納するハッシュテーブルを保持します。
チェックポイントジに、この状態バックエンドは状態をスナップショットし、それをチェックポイントのジョブマネージャ(マスター)への通知メッセージの一部として送信するでしょう。これは同様にヒープ上に格納します。
The MemoryStateBackend can be configured to use asynchronous snapshots. While we strongly encourage the use of asynchronous snapshots to avoid blocking pipelines, please note that this is a new feature and currently not enabled
by default. To enable this feature, users can instantiate a MemoryStateBackend
with the corresponding boolean flag in the constructor set to true
, e.g.:
new MemoryStateBackend(MAX_MEM_STATE_SIZE, true);
MemoryStateBackendの制限:
The MemoryStateBackend is encouraged for:
FsStateBackend は“hdfs://namenode:40010/flink/checkpoints” あるいは “file:///data/flink/checkpoints”のような、ファイルシステムURL(型、アドレス、パス) を使って設定されます。
FsStateBackend はタスクマネージャのメモリ内で実行中のデータを保持します。チェックポイント時に、状態のスナップショットを設定されたファイルシステムとディレクトリ内のファイルに書き込みます。最小のメタデータはジョブマネージャのメモリ(あるいは、高可用性モードでは、メタデータのチェックポイント内)に格納されます。
The FsStateBackend uses asynchronous snapshots by default to avoid blocking the processing pipeline while writing state checkpoints. To disable this feature, users can instantiate a FsStateBackend
with the corresponding boolean flag in the constructor set to false
, e.g.:
new FsStateBackend(path, false);
The FsStateBackend is encouraged for:
RocksDBStateBackend は“hdfs://namenode:40010/flink/checkpoints” あるいは “file:///data/flink/checkpoints”のような、ファイルシステムURL(型、アドレス、パス) を使って設定されます。
The RocksDBStateBackend holds in-flight data in a RocksDB database that is (per default) stored in the TaskManager data directories. Upon checkpointing, the whole RocksDB database will be checkpointed into the configured file system and directory. 最小のメタデータはジョブマネージャのメモリ(あるいは、高可用性モードでは、メタデータのチェックポイント内)に格納されます。
The RocksDBStateBackend always performs asynchronous snapshots.
Limitations of the RocksDBStateBackend:
The RocksDBStateBackend is encouraged for:
Note that the amount of state that you can keep is only limited by the amount of disk space available. これにより状態をメモリ内に格納するFsStateBackendに比べてとても大規模な状態を保持することができます。しかし、このことはまた達成可能な最大スループットはこの状態バックエンドをつかった時に低いだろうことを意味します。
RocksDBStateBackend is currently the only backend that offers incremental checkpoints (see here).
The default state backend, if you specify nothing, is the jobmanager. If you wish to establish a different default for all jobs on your cluster, you can do so by defining a new default state backend in flink-conf.yaml. The default state backend can be overridden on a per-job basis, as shown below.
ジョブ毎の状態バックエンドは、以下の例で示されるように、ジョブのStreamExecutionEnvironment
で設定されます:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints"));
val env = StreamExecutionEnvironment.getExecutionEnvironment()
env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints"))
デフォルトの状態バックエンドは設定キーstate.backend
を使ってflink-conf.yaml
内で設定することができます。
Possible values for the config entry are jobmanager (MemoryStateBackend), filesystem (FsStateBackend), rocksdb (RocksDBStateBackend), or the fully qualified class
name of the class that implements the state backend factory FsStateBackendFactory,
such as org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory
for RocksDBStateBackend.
デフォルトの状態バックエンドが filesystem に設定された場合、エントリ state.backend.fs.checkpointdir
はチェックポイントデータが格納されるだろうディレクトリを定義します。
設定ファイル内のサンプルのセクションは以下のように見えるでしょう:
# The backend that will be used to store operator state checkpoints
state.backend: filesystem
# Directory for storing checkpoints
state.backend.fs.checkpointdir: hdfs://namenode:40010/flink/checkpoints