データストリーミングの耐障害性

このドキュメントはストリーミングデータフローのためのFlinkの耐障害性機構について説明します。

はじめに

Apache Flink はデータストリーミングアプリケーションの状態を一貫性を持って回復するための耐障害性機構を提供します。この機構は、障害の目前であってもプログラムの状態がデータストリームから最終的に確実に一回各レコードを反映することを保証します。保証を少なくとも一回 (以下で説明します)にダウングレードするための切り替えがあることに注意してください。

耐障害性の機構は連続して分散ストリームデータフローのスナップショットを引き出します。小さな状態を持つストリーミングアプリケーションについては、これらのスナップショットはとても軽量でパフォーマンスに大きな影響無く頻繁に取ることができます。ストリーミングアプリケーションの状態は(マスターノードあるいはHDFSのような)設定可能な場所に格納されます。

(マシーン、ネットワーク、あるいはソフトウェア障害による)プログラムの障害の場合は、Flinkは分散ストリーミングデータフローを停止します。そしてシステムはオペレータを再起動し、それらを最後に成功したチェックポイントに再設定します。入力ストリームは状態のスナップショットの場所に再設定されます。再起動された並行データフローの一部として処理された全てのレコードが、以前にチェックポイントされた状態の一部では無いことが保証されます。

注意: デフォルトでは、チェックポイントは無効です。チェックポイントを有効および設定する方法の詳細についてはチェックポイントを見てください。

注意: この機構がその保証を完全に満たすために、(メッセージキューあるいはブローカーのような)データストリーム ソースはストリームを定義された最新のポイントに巻き戻すことができる必要があります。FApache Kafka はこの能力を持っており、FlinkのKafkaへのコネクタはこの能力を利用しています。Flinkのコネクタによって提供される保証についての詳しい情報はデータソースおよびシンクの耐障害性の保証 を見てください。

注意: Flinkのチェックポイントは分散スナップショットを使って実現されているため、snapshotcheckpoint の単語を可換なものとして使います。

チェックポイント

Flinkの耐障害性機構の中央部分は分散データストリームとオペレータ状態の一貫性のあるスナップショットを引き出します。これらのスナップショットは障害時にシステムが元に戻れることができる一貫性のあるチェックポイントとして振る舞います。これらのスナップショットを引き出すためのFlinkの機構は"Lightweight Asynchronous Snapshots for Distributed Dataflows"の中で説明されます。分散スナップショットについては標準的なChandy-Lamport アルゴリズムに着想を得ており、特にFlinkの実行モデルに合わせられています。

バリア

Flinkの分散スナップショットの主な要素はstream barriersです。これらのbarrierはデータストリームに挿入され、データストリームの一部としてレコードと共に流れます。Barrierはレコードを追い越すことはなく、流れは厳密に並んでいます。境界はデータストリーム内のレコードを現在のスナップショットに入れるレコードのセットと、次のスナップショットに入れるレコードに分割します。各境界は前に入れたレコードのスナップショットのIDを運びます。バリアはストリームの流れを妨害しないため、とても軽いです。異なるスナップショットによる複数のバリアが同時にストリーム内にあることがありえます。これは同時に様々なスナップショットが起こるかも知れないことを意味します。

データストリーム内のチェックポイントのバリア

ストリームのバリアはストリームソースで並行するデータフローに挿入されます。スナップショットn のための境界が挿入される場所(それをSnと呼びましょう)は、スナップショットがデータをカバーするまでのソースストリーム内の場所です。例えばApache Kafkaでは、このポジションはパーティション内の最後のレコードのオフセットでしょう。このポジション Sncheckpoint coordinator (Flinkの JobManager)に送信されます。

バリアは下流に流れます。中間のオペレータが入力ストリームの全てからスナップショットn のための境界を受け取った時に、それはスナップショットnのための境界を出力ストリーム内の全てに発行します。一度sinkオペレータ(ストリーミングDAGの終端)が入力ストリームの全てからバリア n を受け取ると、それはスナップショット n をチェックポイント コーディネーターに知らせます。全てのシンクがスナップショットを通知した後で、完了したと見なされます。

一旦スナップショットn が完了すると、その時点でこれらのレコード(およびそれらの後続のレコード)はデータフロートポロジ全体を経験するだろうため、ジョブは二度とソースにSnより前のレコードを要求しないでしょう。

複数の入力を持つオペレータでのデータストリームの割り当て

一つ以上の入力ストリームを受け取るオペレータはスナップショット バリア上の入力ストリームに割り当てる必要があります。上の図は以下のことを表しています:

  • オペレータが入力ストリームからスナップショット バリアnを受け取るとすぐに、他の入力からも同様にバリア n を受け取るまではそれ以上のレコードをストリームから処理することができません。そうでなければ、スナップショット n に属するレコードとスナップショット n+1に属するレコードを混ぜてしまうでしょう。
  • バリア n を報告するストリームは一時的に置いておかれます。これらのストリームから受け取ったレコードは処理されませんが、入力バッファに置かれます。
  • 一度最後のストリームがバリア nを受け取ると、オペレータは全ての待機中の外へ向かうレコードを放出します。そしてスナップショット n バリア自身を放出します。
  • その後、ストリームからのレコードの処理の前に入力バッファからのレコードを処理し、全ての入力ストリームからの処理中のレコードを再開します。

状態

オペレータが状態の形式を含む場合は、この状態は同様にスナップショットの一部分でなければなりません。オペレータの状態は異なる形式でやってきます:

  • ユーザ定義の状態: これは(map() あるいは filter()のような)変換関数によって直接作成および変更された状態です。詳細はストリーミングアプリケーションでの状態を見てください。
  • システムの状態: この状態はオペレータの計算の一部分であるデータバッファを参照します。この状態の代表的な例は window buffersです。これはウィンドウが評価され退去させられるまでウィンドウのためにレコードを収集(および集約)します。

オペレータは出力ストリームに放出する前に、入力ストリームから全てのスナップショットのバリアを受け取った時点の状態をスナップショットします。At that point, all updates to the state from records before the barriers will have been made, and no updates that depend on records from after the barriers have been applied. スナップショットの状態はもしかすると大きくなるかもしれないので、設定可能な state backendの中に格納されます。デフォルトでは、これはジョブマネージャーのメモリですが、プロダクションでの利用には(HDFSのような)信頼できる分散ストレージが設定されるべきです。状態が格納された後で、オペレータはチェックポイントを承認し、出力ストリームにスナップショットのバリアを放出し、進めます。

これで結果のスナップショットは以下を含みます:

  • 各並行ストリームデータソースについて、スナップショットが開始された時のストリーム内のオフセット/ポジション
  • 各オペレータについて、スナップショットの一部として格納された状態へのポインタ
チェックポイントの機構の説明図

確実に一回 vs. 少なくとも一回

整列の段階はストリーミング プログラムにレイテンシを追加するかもしれません。通常、この余分なレイテンシは2,3ミリ秒のオーダーですが、いくつかの例外的なレイテンシが著しく増加したケースを目撃しました。一貫して特に低いレンテンシ(2,3ミリ秒)を必要とするアプリケーションについては、Flinkはチェックポイント中のストリーム整列をスキップするように切り替えるスイッチを持っています。チェックポイントのスナップショットはまだオペレータが各入力からチェックポイントの境界を見るとすぐに取られます。

整列がスキップされた場合、たとえチェックポイントnのためのいくつかのチェックポイントバリアが到着したとしても、オペレータは全ての入力の処理を続けます。そのようにして、チェックポイントnのための状態スナップショットが取られる前に、オペレータはチェックポイント n+1に所属する要素を処理します。それらのレコードはチェックポイントnの状態スナップショットの中に両方とも含まれるため、回復時にはそれらのレコードは重複して起こるでしょう。そしてチェックポイント nの後のデータの一部分として再現されるでしょう。

注意: 割り当ては複数の前任者を持つオペレータ(join)と複数の送信者(ストリームの再分割/シャッフル)を持つオペレータのためにのみ起こります。そのため、実際には、まごつかせる並行ストリーミング オペレーションmap(), flatMap(), filter(), …) のみを持つデータフローが少なくとも1回のモードの中でさえ確実に1回の補償を与えます。

非同期状態のスナップショット

上で説明された仕組みは、state backend内に状態のスナップショットを格納する間はオペレータが入力レコードの処理を停止することを意味します。この同期状態のスナップショットはスナップショットが取られるたびに遅延を導入します。

効果的に状態のスナップショットが非同期にバックグラウンドで起こしながら、状態をスナップショットに格納する間にオペレータが処理を続けるようにすることが可能です。そうするには、オペレータの状態への更なる操作が状態のオブジェクトに影響しない方法で格納しなければならない状態オブジェクトをオペレータが生成できるようにしなければなりません。例えば、RocksDBで使われるようなcopy-on-write データ構造はこの挙動をします。

入力上でのチェックポイントの境界を受け取った後で、オペレータは状態のコピーの非同期のスナップショットを開始します。それはすぐに出力に境界を発行し、通常のストリーム処理と一緒に続きます。一旦バックグラウンドのコピー処理が完了すると、チェックポイントのコーディネータ(JobManager)にチェックポイントの通知をします。チェックポイントは全てのシンクが境界の受信を完了し、全てのステートフルのオペレータがバックアップを完了した(これは境界がシンクに到着した後でしょう)後でのみ完了します。

状態のスナップショットの詳細については状態のバックエンドを見てください。

回復

この機構での回復は率直です: 障害時には、Flinkは最新の完了したチェックポイント kを選択します。その後、システムは全体の分散データフローを再配備し、各オペレータにチェックポイント kの一部としてスナップショットされた状態を与えます。ソースはポジションSkからストリームの読み込みをするように設定されます。例えばApache Kafkaの場合、それはコンシューマーがオフセット Skから取り出しを開始するように伝えることを意味します。

状態が増分スナップショットされた場合、オペレータは最新の完全なスナップショットの状態から始め、そしてその状態に増分のスナップショットの系列を適用します。

詳しい情報は再起動戦略 を見てください。

オペレータ スナップショットの実装

オペレータのスナップショットが取られる時に2つの部分があります: 同期非同期の部分です。

オペレータと状態のバックエンドはスナップショットを Java のFutureTaskとして提供します。タスクは同期部分が完了し非同期 部分が起ころうとしている時の状態を含みます。そして非同期部分はチェックポイントのためのバックグランドスレッドによって実行されます。

Operators that checkpoint purely synchronously return an already completed FutureTask. 非同期オペレーションが実施されなければならない場合、FutureTaskrun()メソッド内で実行されます。

タスクがキャンセル可能です。そのためストリームおよびハンドルを消費している他のリソースを解放することができます。

上に戻る

TOP
inserted by FC2 system