This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.
実行モード (Batch/Streaming) #
DataStream APIは、ユースケースの要件やジョブの特性に応じて選択できる様々なランタイム実行モードをサポートします。
DataStream APIには"classic"実行動作があり、STREAMING
実行モードと呼ばれます。これは、継続的なインクリメンタル処理を必要とし、無期限にオンラインに留まることが予想される無制限のジョブに使う必要があります。
さらに、BATCH
実行モードと呼ばれるバッチ形式の実行モードがあります。
execution mode. これは、MapReduceのようなバッチ処理フレームワークを連想させる方法でジョブを実行します。これは、既知の固定入力があり、継続的に実行されない制限のあるジョブに使う必要があります。
Apache Flinkのストリーム処理とバッチ処理への統合アプローチは、設定された実行モードに関係なく、制限された入力に対して実行されるDataStream アプリケーションが同じ最終結果を生成することを意味します。ここでの最終の意味に注意することが重要です: STREAMING
モードで実行されているジョブは逐次的な更新を生成する可能性があります。データベースでのUPSERTを考えてください)。一方で、BATCH
ジョブは最後に最終結果を1つだけ生成します。正しく解釈された場合、最終結果は同じになりますが、そこに到達する方法は異なる場合があります。
BATCH
実行を有効にすることで、入力が制限されていることがわかっている場合にのみ実行できる追加の最適化をFlinkが適用できるようになります。例えば、より効率的なタスクスケジューリングと障害回復動作を可能にする様々なシャッフル実装に加えて、様々なjoin/aggregation戦略を使えます。以下で、実行動作の詳細をいくつか説明します。
BATCH実行モードをいつ使えますか?使うべきですか? #
BATCH
実行モードは_有限_のジョブ/Flinkプログラムにのみ使えます。境界性は、データソースからの全ての入力が実行前に既知であるかどうか、または新しいデータが潜在的に無期限に現れるかどうかを示すデータソースのプロパティです。一方、ジョブは全てのソースが制限されている場合は制限され、そうでない場合は制限されません。
一方、STREAMING
実行モードは、制限付きジョブと制限無しジョブの両方に使えます。
経験則として、プログラムが制限されている場合は効率が向上するため、BATCH
実行モードを使うべきです。プログラムが無制限の場合は、連続するデータストリームを処理できるのはSTREAMING
実行モードだけというのが一般的なため、このモードを使うべきです。
1つの明らかな例外としては、制限付きジョブを使ってジョブの状態をブートストラップし、それを制限無しのジョブで使う場合があります。例えば、STREAMING
モードを使って制限付きジョブを実行し、セーブポイントを取得し、そのセーブポイントを制限無しジョブで復元します。これは非常に特殊な使用例であり、BATCH
実行ジョブの追加出力としてセーブポイントの生成を許可した場合、これがすぐに廃止される可能性があります。
STREAMING
モードを使って制限付きジョブを実行するもう1つのケースは、最終的に無制限のソースで実行されるコードのテストを書く場合です。このような場合、テストには制限付きソースを使う方が自然な場合があります。
BATCH実行モードの設定 #
実行モードはexecution.runtime-mode
設定を使って設定できます。
可能な値は3つです:
STREAMING
: 従来のDataStream実行モード(デフォルト)BATCH
: DataStream APIでのバッチ形式の実行AUTOMATIC
: ソースの境界に基づいて、システムに決定させます
これは、bin/flink run ...
のコマンドラインパラメータを使って設定するか、StreamExecutionEnvironment
を作成/設定sるうときにプログラムで設定できます。
コマンドラインから実行モードを設定する方法は次の通りです:
$ bin/flink run -Dexecution.runtime-mode=BATCH <jarFile>
この例は、コードで実行モードを設定する方法を示しています:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
userにはプルグラム内でランタイムモードを設定せず、代わりにアプリケーションの送信時にコマンドラインを使って設定することをお勧めします。アプリケーションコードを設定フリーに保つことで、同じアプリケーションをどの実行モードでも実行できるため、柔軟性が高まります。
Executionの挙動 #
このセクションでは、BATCH
実行モードの実行動作の概要を説明し、STREAMING
実行モードと対比します。詳細については、この機能を導入したFLIPを参照してください:
FLIP-134とFLIP-140。
タスクスケジューリングとネットワークシャッフル #
Flinkのジョブは、データフローグラフ内で相互に接続された様々なオペレーションで構成されます。システムは、様々なプロセス/マシーン(TaskManager)でこれらのオペレーションの実行をスケジュールする方法と、それらの間でデータをシャッフル(送信)する方法を決定します。
複数のオペレーション/オペレータはchainingと呼ばれる機能を使って複数のオペレーション/オペレータを繋ぐことができます。 Flinkがスケジューリングの単位と見なす1つ以上の(連鎖した)オペレータのグループは_タスク_と呼ばれます。 しばしば、_サブタスク_という用語が、複数のTaskManagerで並列して実行されているタスクの個々の伊那スタンスを指すために使われますが、ここでは_タスク_という用語のみを使います。
タスクスケジューリングとネットワークシャッフルは、BATCH
とSTREAMING
実行モードでは動作が異なります。その主な理由は、入力データがBATCH
実行モードで制限されていることが分かっているためです。これにより、Flinkはより効率的にデータ構造とアルゴリズムを使うことができます。
この例を使って、タスクのスケジューリングとネットワーク転送の違いを説明します:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> source = env.fromElements(...);
source.name("source")
.map(...).name("map1")
.map(...).name("map2")
.rebalance()
.map(...).name("map3")
.map(...).name("map4")
.keyBy((value) -> value)
.map(...).name("map5")
.map(...).name("map6")
.sinkTo(...).name("sink");
map()
、flatMap()
、filter()
のようなオペレーション間の1対1の接続パターンを暗黙的に示すオペレーションは、データを次のオペレーションに単純に転送できるため、これらのオペレーションを繋げることができます。これは、Flinkは通常それらの間にネットワークシャッフルを挿入しないことを意味します。
一方、keyBy()
やrebalance()
のようなオペレーションは、タスクの異なる並列インスタンス間でデータをシャッフルする必要があります。これはネットワークシャッフルを引き起こします。
上の例では、Flinkは次のようにオペレーションをタスクとしてグループ化します:
- Task1:
source
,map1
, andmap2
- Task2:
map3
,map4
- Task3:
map5
,map6
, andsink
また、タスク1とタスク2、タスク2とタスク3の間でもネットワークシャッフルが行われます。 これはそのジョブを視覚的に表現したものです:
STREAMING実行モード #
STREAMING
実行モードでは、全てのタスクが常にオンライン/実行されている必要があります。これにより、Flinkはパイプライン全体を通して新しいレコードを即座に処理できるようになり、これが継続的かつ低遅延のストリーム処理に必要としているものです。これは、ジョブに割り当てらえるTaskManagerに、全てのタスクを同時に実行するのに十分なリソースが必要であることも意味します。
ネットワークシャッフルは_パイプライン化_されます。つまり、ネットワーク層でバッファリングが行われ、レコードがすぐにダウンストリームのタスクに送信されます。繰り返しますが、これが必要なのは、データの連続ストリームを処理する場合、タスク(またはタスクのパイプライン)間でデータが実体化できる自然な時点(時間)が存在しないためです。これは、以下で説明するように、中間結果を具体化できるという点でBATCH
実行モードとは対照的です。
BATCH実行モード #
BATCH
実行モードでは、ジョブのタスクを段階に分解して次々に実行できます。これができるのは、入力が制限されていて、Flinkがパイプラインの1つのステージを完全に処理してから次の段階に進むことができるからです。上の例では、ジョブにはシャッフルバリアで区切られた3つのタスクに対応する3つのステージがあります。
STREAMING
モードで上で説明したように、レコードをダウンストリームにすぐに送信するのではなく、段階的に処理するためにFlinkはタスクの中間結果を一時的なものではないストレージに具体化する必要があり、これによりダウンストリームのタスクはアップストリームのタスクが既にオフラインになった後でも読み取ることができます。
これにより処理のレイテンシが増加しますが、他の興味深い特性も伴います。1つは、これによりFlinkは、障害が発生した場合にジョブ全体を再起動するのではなく、最新の利用可能な結果をバックトラックできるようになります。もう1つの副作用は、システムがタスクを次々に順番に実行できるため、BATCH
ジョブはより少ないリソース(TaskManagerの利用可能なスロットの点で)で実行できることです。
TaskManagersは少なくともダウンストリームのタスクが中間結果を消費しない限り、中間結果を保持します。(技術的には、これらはコンシューマのパイプライン領域が出力を生成するまで保持されます。)その後、障害の場合に以前の結果に前述のバックトラッキングを可能にするために、スペースが許す限り保持されます。
状態バックエンド/状態 #
STREAMING
モードでは、FlinkはStateBackendを使って、状態の保存方法とチェックポイントの動作方法を制御します。
BATCH
モードでは、設定された状態バックエンドは無視されます。代わりにキーのオペレーションは(ソートを使って)キーごとにグループ化され、キーの全てのレコードが順番に処理されます。これにより、同時に1つのキーのみの状態を保持することができます。特定のキーの状態は、次のキーに進むときに破棄されます。
next key.
これに関する背景情報は、FLIP-140を参照してください。
処理の順序 #
オペレータまたはユーザ定義関数(UDF)でレコードが処理される順序は、BATCH
とSTREAMING
実行とで異なる場合があります。
STREAMING
モードでは、ユーザ定義関数は受信レコードの順序についていかなる仮定も行うべきではありません。
データは到着するとすぐに処理されます。
BATCH
実行モードでは、Flinkが順序を保証するいくつかのオペレーションがあります。
この順序は、特定のタスクのスケジューリング、ネットワークシャッフル、状態バックエンド(上記を参照)の副作用である場合もあれば、システムによる意識的な選択である場合もあります
区別できる入力には、次の3つの一般的なタイプがあります:
- broadcast input: ブロードキャストストリームからの入力(Broadcast Stateも参照してください)
- regular input: ブロードキャストでもキーでもない入力
- keyed input:
KeyedStream
からの入力
複数の入力のタイプを消費する関数、オペレータは、次の順序で処理します:
- 最初にブロードキャスト入力が処理されます
- 次に通常の入力が処理されます
- 最後にキーの入力が処理されます
複数の通常入力またはブロードキャスト入力から消費する関数 — CoProcessFunction
など — Flinkはそのタイプの入力からのデータを任意の順序で処理する権利を持ちます。
複数のキーの入力から消費する関数 — KeyedCoProcessFunction
など — Flinkは次のキーの入力の前に全てのキー入力から1つのキーの全てレコードを処理します。
イベント時間/ウォーターマーク #
イベント時間のサポートに関しては、Flinkのストリーミングランタイムは、イベントの時間が順不同である可能性がある、_つまり_タイムスタンプt
のイベントは、タイムスタンプt+1
の後に来る可能性があるという悲観的な仮定に基づいて構築されています。このため、システムは、特定のタイムスタンプT
に対してタイムスタンプt < T
を持つ要素が将来存在しないことを確定できません。この順不同による最終結果への影響を償却するため、システムの実用的にする一方で、STREAMING
モードでFLinkはWatermarksと呼ばれるヒューリスティックを使います。
タイムスタンプT
のウォーターマークは、タイムスタンプt < T
の要素が後に続かないことを示します。
BATCH
モードでは、入力データセットが事前に分かっているため、少なくとも要素をタイムスタンプで並べ替えて時間順に処理できるようなヒューリスティックは必要ありません。ストリーミングに詳しい読者の場合、BATCH
では“perfect watermarks”を想定できます。
上記を考慮すると、BATCH
モードでは、各キーに関連付けられた入力の最後、または入力ストリームがキー付けされていない場合の入力の最後に、MAX_WATERMARK
が必要になるだけです。このスキームに基づいて、登録されている全てのタイマーは、時間の終了に起動され、ユーザ定義のWatermarkAssigners
やWatermarkGenerators
は無視されます。ただし、レコードにタイムスタンプを割り当てるためにTimestampAssigner
が引き続き使われるため、WatermarkStrategy
を指定することは依然として重要です。
処理時間 #
処理時間は、レコードが処理されるマシーンの実時間です。レコードが処理されている特定のインスタンスで処理されます。この定義に基づくと、処理時間に基つく計算の結果は再現できないことが分かります。これは、同じレコードが2度処理されると、2つの異なるタイムスタンプが付与されるからです。
上記にも関わらず、STREAMING
モードでの処理時間の使用は便利な場合があります。その理由はストリーミングパイプラインが無制限の入力をリアルタイムで取り込むことが多いため、イベント時間と処理時間の間に相関関係があるという事実に関係しています。
さらに、上記の理由により、STREAMING
モードでの1h
のイベント時間は、処理時間または実時間のほぼ1h
になることがよくあります。So
したがって、処理時間を使って、期待される結果についてのヒントを与える初期の(不完全な)起動に使えます。
この相関関係は、入力データセットが静的で事前に分かっているバッチの世界では存在しません。これを考慮して、BATCH
モードでは、userが現在の処理時間をリクエストし、処理時間タイマーを登録できるようになりますが、イベント時間の場合と同様に、全てのタイマーは入力の最後に起動されます。
概念的には、ジョブの実行中に処理時間が進まず、入力全体が処理されると時間の終了まで早送りされると想像できます。 input is processed.
障害回復 #
STREAMING
実行モードでは、Flinkは障害回復のためにチェックポイントを使います。
これと、その設定方法に関する実践的なドキュメントについては、チェックポイントのドキュメントをご覧ください。状態スナップショットによる耐障害性に関するより入門的なセクションもあり、より高いレベルで概念を説明しています。
障害回復のためのチェックポイントの特徴の1つは、障害が発生した場合にFlinkが実行中の全てのタスクをチェックポイントから再起動することです。これは、(以下で説明するように)BATCH
モードで行う必要があることよりもコストが高くなる可能性があります。ジョブがそれを許すならBATCH
実行モードを使うべきである理由の一つです。
BATCH
実行モードでは、Flinkは中間結果がまだ利用可能な以前の処理処理ステージにバックトラックしようとします。場合によっては、失敗したタスク(あるいはグラフ内の先行タスク)のみを再起動する必要があるため、チェックポイントから全てのタスクを再起動する場合と比較して、ジョブの処理効率と全体の処理時間を向上させることができます。
重要な考慮事項 #
従来のSTREAMING
実行モードと比較して、BATCH
モードでは一部の動作が期待通りに動作しない可能性があります。一部の機能は動作が若干異なったり、サポートされていなかったりします。
BATCHモードでの動作の変更:
- reduce()やsum()のような"Rolling"オペレーションは、
STREAMING
モードで投薬する全ての新しいレコードに対して逐次的な更新を発行します。BATCH
モードでは、これらのオペレーションは"rolling"しません。最終結果のみを発行します。
BATCHモードでは、以下のことがサポートされません:
- チェックポイントと、チェックポイントに依存するオペレーションは機能しません。
独自のオペレータは注意して実装する必要があります。そうしなければ正しく動作しない可能性があります。詳細については、以下の追加の説明を参照してください。
チェックポイント #
上記で説明したように、バッチプログラムの障害回復はチェックポイントを使いません。
チェックポイントが無いため、
CheckpointListener
のような特定の機能、結果としてKafkaEXACTLY_ONCEモードやFile Sink
のOnCheckpointRollingPolicyは動作しません。BATCH
モードで動作するトランザクションシンクが必要な場合は、FLIP-143で提案されているUnified Sink APIを使うことを確認してください。
全てのstate primitivesは引き続き使えます。それは単に障害回復に使われる仕組みが異なるからです。
独自のオペレータの書き方 #
注意: 独自のオペレータはApache Flinkの高度な使用パターンです。ほとんどのユースケースでは、代わりに(キー付き)プロセス関数を使うことを検討してください。
独自のオペレータを作成する時は、BATCH
実行モードの前提条件を覚えておくことが重要です。そうしなければ、STREAMING
モードでは問題なく動作するオペレータが、BATCH
モードでは誤った結果を生成する可能性があります。オペレータは特定のキーにスコープされることはありません。つまり、オペレータはFlinkが利用しようとしているBATCH
処理の一部のプロパティを見ます。
まず第一に、オペレータ内で最後に表示されたウォーターマークをキャッシュしないでください。
BATCH
モードでは、レコードをキーごとに処理します。その結果、ウォーターマークは各キーの間でMAX_VALUE
からMIN_VALUE
に切り替わります。ウォーターマークがオペレータ内で常に昇順であると仮定しないでください。おなじ理由で、タイマーは最初にキーの順序で起動し、次に各キー内のタイムスタンプの順序で起動します。また、キーを手動で変更するオペレーションはサポートされていません。