バックエンドの状態

データストリーム API で書かれたプログラムはしばしば様々な形式で状態を保持します:

  • プログラムが起動されるまで、Windows は要素を集め、あるいは集約します
  • 変換関数は値を格納するためにキー/値の状態を使うかもしれません
  • 変換関数はそれらのローカルな変数を障害性のあるものにするためにCheckpointedインタフェースを実装するかもしれません

See also state section in the streaming API guide.

チェックポインティングが活性化された場合、データの紛失に対する保護と一貫性のある回復のために、そのような状態はチェックポイント上に維持されます。内部的にどのように状態が表現されるか、そしてチェックポイント上にどのようにそしてどこに維持されるかは、選択された状態バックエンドに依存します。

利用可能な状態バックエンド

そのままの状態で、Flinkはこれらの状態バックエンドをバンドルします:

  • MemoryStateBackend
  • FsStateBackend
  • RocksDBStateBackend

何も設定されない場合は、システムは MemoryStateBackend を使うでしょう。

MemoryStateBackend

MemoryStateBackend は内部的にデータをJavaヒープ上にオブジェクトとして保持します。Key/value 状態とウィンドウ オペレータは値、トリガーなどを格納するハッシュテーブルを保持します。

チェックポイントジに、この状態バックエンドは状態をスナップショットし、それをチェックポイントのジョブマネージャ(マスター)への通知メッセージの一部として送信するでしょう。これは同様にヒープ上に格納します。

The MemoryStateBackend can be configured to use asynchronous snapshots. While we strongly encourage the use of asynchronous snapshots to avoid blocking pipelines, please note that this is a new feature and currently not enabled by default. To enable this feature, users can instantiate a MemoryStateBackend with the corresponding boolean flag in the constructor set to true, e.g.:

new MemoryStateBackend(MAX_MEM_STATE_SIZE, true);

MemoryStateBackendの制限:

  • それぞれの状態のサイズはデフォルトで5MBまでに制限されます。この値は MemoryStateBackend のコンストラクタの中で増やすことができます。
  • 設定された最大状態サイズに関係なく、状態はakka フレームサイズ (設定を見てください)より大きくできません。
  • aggregate 状態はジョブマネージャのメモリに収まる必要があります。

The MemoryStateBackend is encouraged for:

  • ローカル開発とデバッギング
  • Jobs that do hold little state, such as jobs that consist only of record-at-a-time functions (Map, FlatMap, Filter, …). Kafka コンシューマはほとんど状態を必要としません。

FsStateBackend

FsStateBackend は“hdfs://namenode:40010/flink/checkpoints” あるいは “file:///data/flink/checkpoints”のような、ファイルシステムURL(型、アドレス、パス) を使って設定されます。

FsStateBackend はタスクマネージャのメモリ内で実行中のデータを保持します。チェックポイント時に、状態のスナップショットを設定されたファイルシステムとディレクトリ内のファイルに書き込みます。最小のメタデータはジョブマネージャのメモリ(あるいは、高可用性モードでは、メタデータのチェックポイント内)に格納されます。

The FsStateBackend uses asynchronous snapshots by default to avoid blocking the processing pipeline while writing state checkpoints. To disable this feature, users can instantiate a FsStateBackend with the corresponding boolean flag in the constructor set to false, e.g.:

new FsStateBackend(path, false);

The FsStateBackend is encouraged for:

  • 大規模な状態を持つジョブ、長いウィンドウ、大きなキー/値の状態。
  • 全て高可用性なセットアップ。

RocksDBStateBackend

RocksDBStateBackend は“hdfs://namenode:40010/flink/checkpoints” あるいは “file:///data/flink/checkpoints”のような、ファイルシステムURL(型、アドレス、パス) を使って設定されます。

The RocksDBStateBackend holds in-flight data in a RocksDB database that is (per default) stored in the TaskManager data directories. Upon checkpointing, the whole RocksDB database will be checkpointed into the configured file system and directory. 最小のメタデータはジョブマネージャのメモリ(あるいは、高可用性モードでは、メタデータのチェックポイント内)に格納されます。

The RocksDBStateBackend always performs asynchronous snapshots.

Limitations of the RocksDBStateBackend:

  • As RocksDB’s JNI bridge API is based on byte[], the maximum supported size per key and per value is 2^31 bytes each. IMPORTANT: states that use merge operations in RocksDB (e.g. ListState) can silently accumulate value sizes > 2^31 bytes and will then fail on their next retrieval. This is currently a limitation of RocksDB JNI.

The RocksDBStateBackend is encouraged for:

  • とても大規模な状態を持つジョブ、長いウィンドウ、大きなキー/値の状態。
  • 全て高可用性なセットアップ。

Note that the amount of state that you can keep is only limited by the amount of disk space available. これにより状態をメモリ内に格納するFsStateBackendに比べてとても大規模な状態を保持することができます。しかし、このことはまた達成可能な最大スループットはこの状態バックエンドをつかった時に低いだろうことを意味します。

RocksDBStateBackend is currently the only backend that offers incremental checkpoints (see here).

状態バックエンドの設定

The default state backend, if you specify nothing, is the jobmanager. If you wish to establish a different default for all jobs on your cluster, you can do so by defining a new default state backend in flink-conf.yaml. The default state backend can be overridden on a per-job basis, as shown below.

ジョブ毎の状態バックエンドの設定

ジョブ毎の状態バックエンドは、以下の例で示されるように、ジョブのStreamExecutionEnvironment で設定されます:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints"));
val env = StreamExecutionEnvironment.getExecutionEnvironment()
env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints"))

デフォルトの状態バックエンドの設定

デフォルトの状態バックエンドは設定キーstate.backendを使ってflink-conf.yaml内で設定することができます。

Possible values for the config entry are jobmanager (MemoryStateBackend), filesystem (FsStateBackend), rocksdb (RocksDBStateBackend), or the fully qualified class name of the class that implements the state backend factory FsStateBackendFactory, such as org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory for RocksDBStateBackend.

デフォルトの状態バックエンドが filesystem に設定された場合、エントリ state.backend.fs.checkpointdir はチェックポイントデータが格納されるだろうディレクトリを定義します。

設定ファイル内のサンプルのセクションは以下のように見えるでしょう:

# The backend that will be used to store operator state checkpoints

state.backend: filesystem


# Directory for storing checkpoints

state.backend.fs.checkpointdir: hdfs://namenode:40010/flink/checkpoints

上に戻る

TOP
inserted by FC2 system