バックエンドの状態

データストリーム API で書かれたプログラムはしばしば様々な形式で状態を保持します:

  • プログラムが起動されるまで、Windows は要素を集め、あるいは集約します
  • 変換関数は値を格納するためにキー/値の状態を使うかもしれません
  • 変換関数はそれらのローカルな変数を障害性のあるものにするためにCheckpointedインタフェースを実装するかもしれません

ストリーミング API ガイド内の 状態との連携 も見てください。

チェックポインティングが活性化された場合、データの紛失に対する保護と一貫性のある回復のために、そのような状態はチェックポイント上に維持されます。内部的にどのように状態が表現されるか、そしてチェックポイント上にどのようにそしてどこに維持されるかは、選択された状態バックエンドに依存します。

利用可能な状態バックエンド

そのままの状態で、Flinkはこれらの状態バックエンドをバンドルします:

  • MemoryStateBackend
  • FsStateBackend
  • RocksDBStateBackend

何も設定されない場合は、システムは MemoryStateBackend を使うでしょう。

MemoryStateBackend

MemoryStateBackend は内部的にデータをJavaヒープ上にオブジェクトとして保持します。Key/value 状態とウィンドウ オペレータは値、トリガーなどを格納するハッシュテーブルを保持します。

チェックポイントジに、この状態バックエンドは状態をスナップショットし、それをチェックポイントのジョブマネージャ(マスター)への通知メッセージの一部として送信するでしょう。これは同様にヒープ上に格納します。

MemoryStateBackendの制限:

  • それぞれの状態のサイズはデフォルトで5MBまでに制限されます。この値は MemoryStateBackend のコンストラクタの中で増やすことができます。
  • 設定された最大状態サイズに関係なく、状態はakka フレームサイズ (設定を見てください)より大きくできません。
  • aggregate 状態はジョブマネージャのメモリに収まる必要があります。

The MemoryStateBackend is encouraged for:

  • ローカル開発とデバッギング
  • Jobs that do hold little state, such as jobs that consist only of record-at-a-time functions (Map, FlatMap, Filter, …). Kafka コンシューマはほとんど状態を必要としません。

FsStateBackend

FsStateBackend は“hdfs://namenode:40010/flink/checkpoints” あるいは “file:///data/flink/checkpoints”のような、ファイルシステムURL(型、アドレス、パス) を使って設定されます。

FsStateBackend はタスクマネージャのメモリ内で実行中のデータを保持します。チェックポイント時に、状態のスナップショットを設定されたファイルシステムとディレクトリ内のファイルに書き込みます。最小のメタデータはジョブマネージャのメモリ(あるいは、高可用性モードでは、メタデータのチェックポイント内)に格納されます。

The FsStateBackend is encouraged for:

  • 大規模な状態を持つジョブ、長いウィンドウ、大きなキー/値の状態。
  • 全て高可用性なセットアップ。

RocksDBStateBackend

RocksDBStateBackend は“hdfs://namenode:40010/flink/checkpoints” あるいは “file:///data/flink/checkpoints”のような、ファイルシステムURL(型、アドレス、パス) を使って設定されます。

RocksDBStateBackend は(デフォルトで)タスクマネージャのデータディレクトリ内に格納されるRocksDB データベース内で実行中のデータを保持します。チェックポイント時には、RocksDBデータベース全体が設定されたファイルシステムとディレクトリ内にチェックポイントされるでしょう。最小のメタデータはジョブマネージャのメモリ(あるいは、高可用性モードでは、メタデータのチェックポイント内)に格納されます。

The RocksDBStateBackend is encouraged for:

  • とても大規模な状態を持つジョブ、長いウィンドウ、大きなキー/値の状態。
  • 全て高可用性なセットアップ。

保持できるデータの総量は利用可能なディスク空間の容量によってのみ制限されることに注意してください。これにより状態をメモリ内に格納するFsStateBackendに比べてとても大規模な状態を保持することができます。しかし、このことはまた達成可能な最大スループットはこの状態バックエンドをつかった時に低いだろうことを意味します。

状態バックエンドの設定

状態バックエンドはジョブごとに設定することができます。更に、ジョブが明示的に状態バックエンドを定義しない時に使われるデフォルトの状態爆炎度を定義することができます。

ジョブ毎の状態バックエンドの設定

ジョブ毎の状態バックエンドは、以下の例で示されるように、ジョブのStreamExecutionEnvironment で設定されます:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints"));
val env = StreamExecutionEnvironment.getExecutionEnvironment()
env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints"))

デフォルトの状態バックエンドの設定

デフォルトの状態バックエンドは設定キーstate.backendを使ってflink-conf.yaml内で設定することができます。

設定エントリに可能な値はjobmanager (MemoryStateBackend), filesystem (FsStateBackend)、あるいはRocksDBStateBackendのためのorg.apache.flink.contrib.streaming.state.RocksDBStateBackendFactoryのような状態バックエンド ファクトリー FsStateBackendFactoryを実装する完全修飾クラス名です。

デフォルトの状態バックエンドが filesystem に設定された場合、エントリ state.backend.fs.checkpointdir はチェックポイントデータが格納されるだろうディレクトリを定義します。

設定ファイル内のサンプルのセクションは以下のように見えるでしょう:

# The backend that will be used to store operator state checkpoints

state.backend: filesystem


# Directory for storing checkpoints

state.backend.fs.checkpointdir: hdfs://namenode:40010/flink/checkpoints
TOP
inserted by FC2 system