データストリーム API で書かれたプログラムはしばしば様々な形式で状態を保持します:
Checkpointed
インタフェースを実装するかもしれませんストリーミング API ガイド内の 状態との連携 も見てください。
チェックポインティングが活性化された場合、データの紛失に対する保護と一貫性のある回復のために、そのような状態はチェックポイント上に維持されます。内部的にどのように状態が表現されるか、そしてチェックポイント上にどのようにそしてどこに維持されるかは、選択された状態バックエンドに依存します。
そのままの状態で、Flinkはこれらの状態バックエンドをバンドルします:
何も設定されない場合は、システムは MemoryStateBackend を使うでしょう。
MemoryStateBackend は内部的にデータをJavaヒープ上にオブジェクトとして保持します。Key/value 状態とウィンドウ オペレータは値、トリガーなどを格納するハッシュテーブルを保持します。
チェックポイントジに、この状態バックエンドは状態をスナップショットし、それをチェックポイントのジョブマネージャ(マスター)への通知メッセージの一部として送信するでしょう。これは同様にヒープ上に格納します。
MemoryStateBackendの制限:
The MemoryStateBackend is encouraged for:
FsStateBackend は“hdfs://namenode:40010/flink/checkpoints” あるいは “file:///data/flink/checkpoints”のような、ファイルシステムURL(型、アドレス、パス) を使って設定されます。
FsStateBackend はタスクマネージャのメモリ内で実行中のデータを保持します。チェックポイント時に、状態のスナップショットを設定されたファイルシステムとディレクトリ内のファイルに書き込みます。最小のメタデータはジョブマネージャのメモリ(あるいは、高可用性モードでは、メタデータのチェックポイント内)に格納されます。
The FsStateBackend is encouraged for:
RocksDBStateBackend は“hdfs://namenode:40010/flink/checkpoints” あるいは “file:///data/flink/checkpoints”のような、ファイルシステムURL(型、アドレス、パス) を使って設定されます。
RocksDBStateBackend は(デフォルトで)タスクマネージャのデータディレクトリ内に格納されるRocksDB データベース内で実行中のデータを保持します。チェックポイント時には、RocksDBデータベース全体が設定されたファイルシステムとディレクトリ内にチェックポイントされるでしょう。最小のメタデータはジョブマネージャのメモリ(あるいは、高可用性モードでは、メタデータのチェックポイント内)に格納されます。
The RocksDBStateBackend is encouraged for:
保持できるデータの総量は利用可能なディスク空間の容量によってのみ制限されることに注意してください。これにより状態をメモリ内に格納するFsStateBackendに比べてとても大規模な状態を保持することができます。しかし、このことはまた達成可能な最大スループットはこの状態バックエンドをつかった時に低いだろうことを意味します。
状態バックエンドはジョブごとに設定することができます。更に、ジョブが明示的に状態バックエンドを定義しない時に使われるデフォルトの状態爆炎度を定義することができます。
ジョブ毎の状態バックエンドは、以下の例で示されるように、ジョブのStreamExecutionEnvironment
で設定されます:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints"));
val env = StreamExecutionEnvironment.getExecutionEnvironment()
env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints"))
デフォルトの状態バックエンドは設定キーstate.backend
を使ってflink-conf.yaml
内で設定することができます。
設定エントリに可能な値はjobmanager (MemoryStateBackend), filesystem (FsStateBackend)、あるいはRocksDBStateBackendのためのorg.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory
のような状態バックエンド ファクトリー FsStateBackendFactoryを実装する完全修飾クラス名です。
デフォルトの状態バックエンドが filesystem に設定された場合、エントリ state.backend.fs.checkpointdir
はチェックポイントデータが格納されるだろうディレクトリを定義します。
設定ファイル内のサンプルのセクションは以下のように見えるでしょう:
# The backend that will be used to store operator state checkpoints
state.backend: filesystem
# Directory for storing checkpoints
state.backend.fs.checkpointdir: hdfs://namenode:40010/flink/checkpoints