Stateful Stream Processing
This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.

ステートフルストリーム処理 #

State とは何ですか? #

データフロー内の多くのオペレーションは単純に1つの個々のそのときのイベントを調べるだけですが(例えば、イベントパーサー)、いくつかのオペレーションは多数のイベントに渡って情報を記録します(例えば、ウィンドウ オペレーション)。これらのオペレーションはstatefulと呼ばれます。

以下はステートフルオペレーションの例です:

  • アプリケーションが特定のイベントのパターンを検索すると、状態にはこれまで発生したイベントのシーケンスが保存されます。
  • 分/時間/日ごとにイベントを集約する場合、状態は保留中の集約を保持します。 aggregates.
  • データポイントのストリーム上で機械学習のモデルを訓練する場合、状態はモデルパラメータの現在のバージョンを保持します。
  • 履歴データを管理する必要がある場合、状態により過去に発生したイベントに効率的にアクセスできます。

Flink は、checkpointssavepoints を使って耐障害性を持つために、状態を認識する必要があります。

状態に関する知識により、Flink アプリケーションの再スケーリングも可能になります。つまり、Flink は並列インスタンス間で状態を再分散することを意味します。

状態を扱う場合、Flink の状態バックエンドについて読むことも役立つかもしれません。Flink は状態を保存する方法と場所を指定する様々な状態バックエンドを提供します。

Back to top

キー付けされた状態 #

キー付き状態は組み込みキー/値ストアとみなすことができるもので維持されます。状態はステートフルオペレータによって読み込まれたストリームと一緒に厳密にパーティション化され分散されます。したがって、キー/値状態へのアクセスはキー付きストリーム、つまりキー付き/パーティション化されたデータの交換後のみ可能であり、現在のイベントのキーに関連付けられた値に制限されます。ストリームと状態のキーを調整することで、全ての状態の更新がローカル操作となり、トランザクションのオーバーヘッド無しに一貫性が保証されます。 この調整により、Flinkが状態を再分配しストリームのパーティショニングを透過的に調整することも可能になります。

State and Partitioning

キー付き状態は、いわゆるキーグループにさらに組織化されます。キーグループはFlinkがキー付けされた状態を再分配するアトミックな単位です; 定義された最大並列処理と正確に同じ数のキーグループがあります。実行の間、キー付きオペレータの各並列インスタンスは1つ以上のキーグループのキーを操作します。

状態の永続性 #

Flink は、stream replaycheckpointing を組み合わせて耐障害性を実装します。チェックポイントは、各入力ストリーム内の特定のポイントを、各オペレータの対応する状態とともにマークします。ストリーミングデータフローは、オペレータの状態の回復およびチェックポイントの場所からのイベントの再生により、一貫性を維持*(確実に一回の処理セマンティクス)*しながらチェックポイントから再開することができます。

チェックポイントの間隔は、実行中の耐障害性のオーバーヘッドと回復時間(再生する必要があるレコードの数)をトレードオフする手段です。

耐障害性の仕組みは、分散ストリーミングデータフローのスナップショットを継続的に引き出します。状態が小さいストリーミングアプリケーションの場合、これらのスナップショットは非常に軽量であり、パフォーマンスに大きな影響を与えることなく頻繁に引き出せます。ストリーミングアプリケーションの状態は、設定可能な場所、通常は分散ファイルシステム、に格納されます。

(マシーン、ネットワーク、あるいはソフトウェア障害による)プログラムの障害の場合は、Flinkは分散ストリーミングデータフローを停止します。次にシステムはオペレータを再起動し、最後に成功したチェックポイントにリセットします。入力ストリームは状態スナップショットの時点にリセットされます。再開された並列データフローの一部として処理されるレコードは、以前にチェックポイントが設定された状態に影響を与えないことが保証されます。

デフォルトでは、チェックポイントは無効です。チェックポイントを有効にして設定する方法の詳細については、Checkpointingを参照してください。
この仕組みが完全な保証を実現するには、データストリームソース(メッセージキューやブローカーなど)がストリームを定義された最近のポイントまで巻き戻すことができる必要があります。Apache Kafka にはこの機能があり、KafkaへのFlinkコネクタはこれを利用します。Flink コネクタによって提供される保証の詳細については、データソースとシンクの耐障害性の保証を参照してください。
Flinkのチェックポイントは分散スナップショットを通じて実現されるため、スナップショットチェックポイントという単語を同じ意味で使います。多くの場合、スナップショットという用語は、チェックポイントまたはセーブポイントを意味するために使われます。

チェックポイント #

Flinkの耐障害性の仕組みの中心部分は、分散データストリームとオペレータ状態の一貫したスナップショットを取り出すことです。これらのスナップショットは、障害時にシステムがフォールバックできる一貫したチェックポイントとして機能します。これらのスナップショットを取り出すためのFlinkの仕組みは、"分散データフロー用の軽量非同期スナップショット“で説明されています。これは分散スナップショット用の標準のChandy-Lamport アルゴリズムに着想を得ており、特に Flink の実行モデルに合わせて調整されています。

チェックポイントに関する全ての作業は非同期で実行されることに注意してください。チェックポイントバリアはロック段階で移動せず、オペレーションは非同期でそれらの状態のスナップショットを取得できます。

Flink 1.11 以降、チェックポイントはアライメントの有無に関わらず取得できます。このセクションでは、最初に整列チェックポイントについて説明します。

バリア #

Flinkの分散スナップショットの主な要素はストリームバリアです。 これらのバリアはデータストリームに挿入され、データストリームの一部としてレコードと共に流れます。バリアはレコードを追い越すことなく、厳密に一列に並んで流れます。バリアは、データストリーム内のレコードを、現在のスナップショットに入るレコードのセットと、次のスナップショットに入るレコードのセットに分離します。各バリアは、その前にプッシュされたレコードのスナップショットのIDを保持します。バリアはストリームの流れを妨げないため、非常に軽量です。異なるスナップショットからの複数のバリアが同時にストリームに存在する可能性があります。これは様々なスナップショットが同時に発生する可能性があることを意味します。

{{< img src="../../../fig/stream_barriers.svg" alt="Checkpoint barriers in data streams" width="60%" >}}

ストリームのバリアはストリームソースで並行するデータフローに挿入されます。 スナップショット n のバリが挿入されるポイント( Sn と呼びます)は、スナップショットがデータをカバーするソースストリーム内の位置です。例えば、Apache Kafka では、この位置はパーティション内の最後のレコードのオフセットになります。この位置 Sn は、チェックポイントコーディネーター (Flink の JobManager) に報告されます。

次に、バリアは下流に流れます。中間のオペレータがすべtれの入力ストリームからスナップショット n のバリアを受信すると、スナップショット n のバリアを全ての出力ストリームに発行します。シンクオペレータ(ストリーミング DAGの終端)が全ての入力ストリームからバリア n を受信すると、そのスナップショット n をチェックポイントコーディネーターに知らせます。全ての真紅がスナップショットを認識すると、スナップショットが完了したと見なされます。

スナップショット n が完了すると、その時点でこれらのレコード(およびそれらの後続のレコード)はデータフロートポロジ全体を通過するため、ジョブは Sn より前のレコードをソースに再度要求することはありません。

{{< img src="../../../fig/stream_aligning.svg" alt="Aligning data streams at operators with multiple inputs" width="60%" >}}

1つ以上の入力ストリームを受け取るオペレータは、スナップショットバリア上で入力ストリームを合わせる必要があります。上の図は以下のことを表しています:

  • オペレータが入力ストリームからスナップショット バリアnを受け取るとすぐに、他の入力からも同様にバリア n を受け取るまではそれ以上のレコードをストリームから処理することができません。そうでなければ、スナップショット n に属するレコードとスナップショット n+1 に属するレコードを混ぜてしまうでしょう。
  • 一度最後のストリームがバリア n を受け取ると、オペレータは全ての待機中の外へ向かうレコードを放出します。そしてスナップショット n バリア自身を放出します。
  • 状態のスナップショットを取り、全ての入力ストリームからのレコードの処理を再開します。ストリームからのレコード処理する前に入力バッファからのレコードを処理します。
  • 最後に、オペレータは状態を非同期で状態バックエンドに書き込みます。

アラインメントは、複数の入力を持つ全てのオペレータと、複数の上流サブタスクの出力ストリームを消費するシャッフル後のオペレータに必要であることに注意してください。

オペレータ状態のスナップショット #

オペレータに任意の形式のstateが含まれる場合、この状態もスナップショットの一部である必要があります。

オペレータは、入力ストリームから全てのスナップショットバリアを受信した時点で、出力ストリームにバリアを発行する前に、状態のスナップショットを作成します。その時点で、バリアが作成される前のレコードから状態への全ての更新が行われ、バリアが適用された後のレコードに依存する更新は行われません。スナップショットの状態は大きい場合があるため、設定可能な*state backend*に格納されます。デフォルトでは、これは JobManager のメモリですが、運用環境で使う場合は(HDFSのような)信頼性の高い分散ストレージを使う必要があります。状態が保存された後で、オペレータはチェックポイントを確認し、出力ストリームにスナップショットバリアを発行して処理を続けます。

これで結果のスナップショットは以下を含みます:

  • 各並列ストリームデータソースの、スナップショットが開始された時のストリーム内のオフセット/ポジション。
  • オペレータごとに、スナップショットの一部として保存された状態へのポインタ。
Illustration of the Checkpointing Mechanism

回復 #

この仕組みでの回復は簡単です: 障害時には、Flinkは最新の完了したチェックポイント kを選択します。次に、システムは全体の分散データフローを再配備し、各オペレータにチェックポイント kの一部としてスナップショットされた状態を提供します。ソースは、ポジション Sk からストリームの読み取りを開始するように設定されています。例えば、Apache Kafka では、コンシューマにオフセット Sk からフェッチを開始するように指示することを意味します。

状態が増分スナップショットされた場合、オペレータは最新の完全なスナップショットの状態から始め、次にその状態に一連の増分スナップショットの更新を適用します。

詳細については、Restart Strategies を参照してください。

整列していないチェックポイント #

チェックポイントは整列していない状態で実行することもできます。 基本的な考え方は、処理中のデータがオペレータ状態の一部である限り、チェックポイントは全ての処理中のデータを追い越すことができるということです。

このやり方は実際にはChandy-Lamport アルゴリズムに近いですが、Flink はチェックポイントコーディネータの過負荷を避けるためにソースにバリアを挿入することに注意してください。

{{< img src="../../../fig/stream_unaligning.svg" alt="Unaligned checkpointing" >}}

この図は、オペレータが整列されていないチェックポイントバリアを処理する方法を示しています:

  • オペレータは、入力バッファに格納されている最初のバリアに反応します。
  • 出力バッファの末尾にバリアを追加することにより、バリアをすぐに下流のストリームに転送します。
  • オペレータは、追い越された全てのレコードを非同期で保存するようにマークし、自身の状態のスナップショットを作成します。

従って、オペレータはバッファをマークするために入力の処理を一時的に停止するだけで、バリアを転送し、他の状態のスナップショットを作成します。

整列していないチェックポイントにより、バリアができるだけ早く真紅に到達することが保証されます。これは、整列時間が数時間に及ぶ可能性がある、低速で移動するデータパスが少なくとも1つ以上あるアプリケーションに特に適しています。ただし、追加の I/O プレッシャーがかかるため、状態のバックエンドへの I/O がボトルネックになる場合には役に立ちません。そのほかの制限については、ops を参照してください。

セーブポイントは常に整列されることに注意してください。

整列していないリカバリー #

オペレーターは、整列していないチェックポイントで上流のオペレータからのデータの処理を開始する前に、処理中のデータを回復します。それ以外では、整列されたチェックポイントの回復の時と同じ手順が実行されます。

バックエンドの状態 #

キー/値インデックスが保存される正確なデータ構造は、選択したstate backendに依存します。1つの状態バックエンドはメモリ内のハッシュマップにデータを保存し、別の状態バックエンドはキー/値ストアとしてRocksDBを使います。状態を保持するデータ構造の定義に加えて、状態バックエンドはキー/値 状態のある時点のスナップショットを取り、スナップショットをチェックポイントの一部として格納するロジックも実装します。アプリケーションのロジックを変更せずに、状態バックエンドを設定できます。

checkpoints and snapshots

Back to top

セーブポイント #

チェックポイントを使う全てのプログラムは、savepointから実行を再開できます。 セーブポイントを使うと、状態を失おうことなくプログラムと Flink クラスタの両方を更新できます。

Savepoints は、手動でトリガーされるチェックポイントで、プログラムのスナップショットを取得して状態バックエンドに書き込みます。これらを行うには、通常のチェックポイントの仕組みに依存します。

セーブポイントはチェックポイントと似ていますが、ユーザによってトリガーされることと、新しいチェックポイントが完了しても自動的に期限切れにならない点が異なります。 セーブポイントを適切に使うには、checkpointssavepoints の違いを理解することが重要です。これについては、checkpoints vs. savepointsで説明されています。

Back to top

確実に一回 vs. 少なくとも一回 #

整列の段階はストリーミング プログラムにレイテンシを追加するかもしれません。通常、この余分なレイテンシは数ミリ秒程度ですが、一部の異常値のレイテンシが著しく増加するケースが確認されています。一貫して特に低いレンテンシ(2,3ミリ秒)を必要とするアプリケーションについては、Flinkはチェックポイント中のストリーム整列をスキップするように切り替えるスイッチを持っています。チェックポイントのスナップショットは、オペレータが各入力からチェックポイントのバリアを見つけるとすぐに取られます。

整列がスキップされた場合、たとえチェックポイント n のためのいくつかのチェックポイントバリアが到着したとしても、オペレータは全ての入力の処理を続けます。そのようにして、チェックポイント n のための状態スナップショットが取られる前に、オペレータはチェックポイント n+1 に所属する要素を処理します。リストアでは、これらのレコードは重複として発生します。これらはどちらもチェックポイント n の状態スナップショットの中に含まれていて、チェックポイント n の後にデータとして再生されるためです。

整列は、複数の先行者(joins)と、複数の送信者(ストリームの再分割/シャッフル)を持つオペレータに対してのみ発生します。Because of that, dataflows with only embarrassingly parallel streaming operations (map(), flatMap(), filter(), …) actually give exactly once guarantees even in at least once mode.

Back to top

バッチプログラムでの状態と耐障害性 #

Flink は、BATCH ExecutionMode でストリーミングプログラムの特殊なケースとして、バッチプログラムを実行します。ストリームは制限されています(要素数が有限)。 従って、上記の概念はストリーミングプログラムに適用される方法と同じようにバッチプログラムに適用されますが、若干の例外があります:

  • バッチプログラムの耐障害性 はチェックポイントを使いません。 does not use checkpointing. リカバリはストリームを完全に再生することで行われます。入力が制限されているため、それらが可能です。このことはリカバリのためのコストを押し上げますが、チェックポイントを回避するため通常の処理を手軽にします。

  • バッチ実行モードの状態バックエンドは、キー/値インデックスではなく、単純化されたメモリ内/コア外データ構造を使います。

Back to top

inserted by FC2 system