データストリーミングの耐障害性

このドキュメントはストリーミングデータフローのためのFlinkの耐障害性機構について説明します。

はじめに

Apache Flink はデータストリーミングアプリケーションの状態を一貫性を持って回復するための耐障害性機構を提供します。この機構は、障害の目前であってもプログラムの状態がデータストリームから最終的に確実に一回各レコードを反映することを保証します。保証を少なくとも一回 (以下で説明します)にダウングレードするための切り替えがあることに注意してください。

耐障害性の機構は連続して分散ストリームデータフローのスナップショットを引き出します。For streaming applications with small state, these snapshots are very light-weight and can be drawn frequently without impacting the performance much. ストリーミングアプリケーションの状態は(マスターノードあるいはHDFSのような)設定可能な場所に格納されます。

(マシーン、ネットワーク、あるいはソフトウェア障害による)プログラムの障害の場合は、Flinkは分散ストリーミングデータフローを停止します。そしてシステムはオペレータを再起動し、それらを最後に成功したチェックポイントに再設定します。入力ストリームは状態のスナップショットの場所に再設定されます。再起動された並行データフローの一部として処理された全てのレコードが、以前のチェックポイントされた状態の一部では無いことが保証されます。

注意: この機構がその保証を完全に満たすために、(メッセージキューあるいはブローカーのような)データストリーム ソースはストリームを定義された最新のポイントに巻き戻すことができる必要があります。FApache Kafka はこの能力を持っており、FlinkのKafkaへのコネクタはこの能力を利用しています。

注意: Flinkのチェックポイントは分散スナップショットを使って実現されているため、snapshotcheckpoint の単語を可換なものとして使います。

チェックポイント

Flinkの耐障害性機構の中央部分は分散データストリームとオペレータ状態の一貫性のあるスナップショットを引き出します。これらのスナップショットは障害時にシステムが元に戻れることができる一貫性のあるチェックポイントとして振る舞います。これらのスナップショットを引き出すためのFlinkの機構は"Lightweight Asynchronous Snapshots for Distributed Dataflows"の中で説明されます。分散スナップショットについては標準的なChandy-Lamport アルゴリズムに着想を得ており、特にFlinkの実行モデルに合わせられています。

バリア

Flinkの分散スナップショットの主な要素はstream barriersです。これらのbarrierはデータストリームに挿入され、データストリームの一部としてレコードと共に流れます。Barrierはレコードを追い越すことはなく、流れは厳密に並んでいます。A barrier separates the records in the data stream into the set of records that goes into the current snapshot, and the records that go into the next snapshot. Each barrier carries the ID of the snapshot whose records it pushed in front of it. バリアはストリームの流れを妨害しないため、とても軽いです。異なるスナップショットによる複数のバリアが同時にストリーム内にあることがありえます。これは同時に様々なスナップショットが起こるかも知れないことを意味します。

データストリーム内のチェックポイントのバリア

ストリームのバリアはストリームソースで並行するデータフローに挿入されます。The point where the barriers for snapshot n are injected (let’s call it Sn) is the position in the source stream up to which the snapshot covers the data. 例えばApache Kafkaでは、このポジションはパーティション内の最後のレコードのオフセットでしょう。このポジション Sncheckpoint coordinator (Flinkの JobManager)に送信されます。

バリアは下流に流れます。中間オペレータが入力ストリームの全てからスナップショット nについてのバリアを受け取った場合、それはスナップショット nについての自身のバリアを出力ストリームの全てに発行します。一度sinkオペレータ(ストリーミングDAGの終端)が入力ストリームの全てからバリア n を受け取ると、それはスナップショット n をチェックポイント コーディネーターに知らせます。全てのsinkがスナップショットを受け取ると、完了したとみなします。

スナップショットnが完了した場合、ソースからのSn より前からのレコード(およびそれらの子孫)はデータフローのトポロジ全体を通過しているため、これらはもう必要無いだろうことは明らかです。

複数の入力を持つオペレータでのデータストリームの割り当て

一つ以上の入力ストリームを受け取るオペレータはスナップショット バリア上の入力ストリームに割り当てる必要があります。上の図は以下のことを表しています:

  • オペレータが入力ストリームからスナップショット バリアnを受け取るとすぐに、他の入力からも同様にバリア n を受け取るまではそれ以上のレコードをストリームから処理することができません。そうでなければ、スナップショット n に属するレコードとスナップショット n+1に属するレコードを混ぜてしまうでしょう。
  • バリア n を報告するストリームは一時的に置いておかれます。これらのストリームから受け取ったレコードは処理されませんが、入力バッファに置かれます。
  • 一度最後のストリームがバリア nを受け取ると、オペレータは全ての待機中の外へ向かうレコードを放出します。そしてスナップショット n バリア自身を放出します。
  • その後、ストリームからのレコードの処理の前に入力バッファからのレコードを処理し、全ての入力ストリームからの処理中のレコードを再開します。

状態

オペレータが状態の形式を含む場合は、この状態は同様にスナップショットの一部分でなければなりません。Operator state comes in different forms:

  • User-defined state: This is state that is created and modified directly by the transformation functions (like map() or filter()). ユーザ定義の状態は、関数のjavaオブジェクト内の単純な変数、あるいは関数の連想キー/値 状態のどちらかがありえます (詳細は ストリーミングアプリケーションの状態を見てください)。
  • システムの状態: この状態はオペレータの計算の一部分であるデータバッファを参照します。この状態の代表的な例は window buffersです。これはウィンドウが評価され退去させられるまでウィンドウのためにレコードを収集(および集約)します。

オペレータは出力ストリームに放出する前に、入力ストリームから全てのスナップショットのバリアを受け取った時点の状態をスナップショットします。At that point, all updates to the state from records before the barriers will have been made, and no updates that depend on records from after the barriers have been applied. スナップショットの状態はもしかすると大きくなるかもしれないので、設定可能な state backendの中に格納されます。デフォルトでは、これはジョブマネージャーのメモリですが、真面目なセットアップの場合は(HDFSのような)信頼できる分散ストレージが設定されるべきです。状態が格納された後で、オペレータはチェックポイントを承認し、出力ストリームにスナップショットのバリアを放出し、進めます。

これで結果のスナップショットは以下を含みます:

  • 各並行ストリームデータソースについて、スナップショットが開始された時のストリーム内のオフセット/ポジション
  • 各オペレータについて、スナップショットの一部として格納された状態へのポインタ
チェックポイントの機構の説明図

確実に一回 vs. 少なくとも一回

整列の段階はストリーミング プログラムにレイテンシを追加するかもしれません。通常、この余分なレイテンシは2,3ミリ秒のオーダーですが、いくつかの例外的なレイテンシが著しく増加したケースを目撃しました。一貫して特に低いレンテンシ(2,3ミリ秒)を必要とするアプリケーションについては、Flinkはチェックポイント中のストリーム整列をスキップするように切り替えるスイッチを持っています。Checkpoint snapshots are still drawn as soon as an operator has seen the checkpoint barrier from each input.

整列がスキップされた場合、たとえチェックポイントnのためのいくつかのチェックポイントバリアが到着したとしても、オペレータは全ての入力の処理を続けます。そのようにして、チェックポイントnのための状態スナップショットが取られる前に、オペレータはチェックポイント n+1に所属する要素を処理します。それらのレコードはチェックポイントnの状態スナップショットの中に両方とも含まれるため、回復時にはそれらのレコードは重複して起こるでしょう。そしてチェックポイント nの後のデータの一部分として再現されるでしょう。

NOTE: Alignment happens only for operators with multiple predecessors (joins) as well as operators with multiple senders (after a stream repartitioning/shuffle). Because of that, dataflows with only embarrassingly parallel streaming operations (map(), flatMap(), filter(), …) actually give exactly once guarantees even in at least once mode.

Asynchronous State Snapshots

Note that the above described mechanism implies that operators stop processing input records while they are storing a snapshot of their state in the state backend. This synchronous state snapshot introduces a delay every time a snapshot is taken.

It is possible to let an operator continue processing while it stores its state snapshot, effectively letting the state snapshots happen asynchronously in the background. To do that, the operator must be able to produce a state object that should be stored in a way such that further modifications to the operator state do not affect that state object. An example for that are copy-on-write style data structures, such as used for example in RocksDB.

After receiving the checkpoint barriers on its inputs, the operator starts the asynchronous snapshot copying of its state. It immediately emits the barrier to its outputs and continues with the regular stream processing. Once the background copy process has completed, it acknowledges the checkpoint to the checkpoint coordinator (the JobManager). The checkpoint is now only complete after all sinks received the barriers and all stateful operators acknowledged their completed backup (which may be later than the barriers reaching the sinks).

See State Backends for details on the state snapshots.

回復

この機構での回復は率直です: 障害時には、Flinkは最新の完了したチェックポイント kを選択します。その後、システムは全体の分散データフローを再配備し、各オペレータにチェックポイント kの一部としてスナップショットされた状態を与えます。ソースはポジションSkからストリームの読み込みをするように設定されます。例えばApache Kafkaの場合、それはコンシューマーがオフセット Skから取り出しを開始するように伝えることを意味します。

状態が増分スナップショットされた場合、オペレータは最新の完全なスナップショットの状態から始め、そしてその状態に増分のスナップショットの系列を適用します。

Operator Snapshot Implementation

When operator snapshots are taken, there are two parts: the synchronous and the asynchronous parts.

Operators and state backends provide their snapshots as a Java FutureTask. That task contains the state where the synchronous part is completed and the asynchronous part is pending. The asynchronous part is then executed by a background thread for that checkpoint.

Operators that checkpoint purely synchronously return an already completed FutureTask. If an asynchronous operation needs to be performed, it is executed in the run() method of that FutureTask.

The tasks are cancelable, in order to release streams and other resource consuming handles.

TOP
inserted by FC2 system