This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.
ネットワークメモリチューニングガイド #
概要 #
Flinkの各レコードは、サブタスク間の通信の最小単位であるネットワークバッファ内の他のレコードと結合されて次のサブタスクに送信されます。一貫した高スループットを維持するために、Flinkは送信プロセスの入力側と出力側でネットワークバッファキュー(in-flightデータとも呼ばれます)を使います。
各サブタスクは、データの消費を待機する入力キューと、データを次のサブタスクへのデータ送信を待機する出力キューがあります。大量のin-flightデータがあるということは、Flinkはパイプラインでより高い、より回復量のあるスループットを提供できることを意味します。ただし、これによりチェックポイント時間が長くなります。
Flinkのチェックポイントは、全てのサブタスクが注入されたチェックポイントバリアを全て受け取った場合にのみ終了できます。aligned checkpointsでは、これらのチェックポイントバリアはネットワークバッファとともにジョブグラフ全体を移動します。in-flightデータが増えるほど、チェックポンとバリアの伝搬時間も長くなります。unaligned checkpointsでは、in-flightデータが増えるほど、キャプチャされた全てのin-flightデータがチェックポイントの一部として保持する必要があるため、チェックポイントのサイズが大きくなります。
バッファの膨張の仕組み #
以前は、in-flightデータの量を設定する唯一の方法は、バッファ量とバッファサイズの両方を指定することでした。ただし、理想的な値はデプロイメントごとに異なるため、選択が難しい場合がありました。deployment. Flink 1.14で追加されたバッファ膨張の仕組みは、in-flightデータの量を適切な値に調整することで、この問題に対処しようとします。
バッファ膨張機能はサブタスク(サブタスクが常にビジーであるシナリオ)の最大可能スループットを計算し、それらのin-flightデータの消費時間が設定された値と等しくなるようにin-flightデータの量を調整します。
バッファ膨張の仕組みは、プロパティtaskmanager.network.memory.buffer-debloat.enabled
をtrue
に設定することで有効にできます。
in-flightデータを消費する目標時間を設定するには、taskmanager.network.memory.buffer-debloat.target
をduration
に設定します。
ほとんどの場合において、膨張ターゲットのデフォルト値で十分です。
この機能は、過去のスループットデータを使って残りのin-flightデータを消費するのに必要な時間を予測します。予測が間違っている場合、膨張の仕組みは次の2つのいずれかの理由で失敗する可能性があります:
- 完全なスループットを提供するのに十分なバッファデータがありません。
- バッファされたin-flightデータが多すぎるため、alignedチェックポイントバリアの伝達時間やunalignedチェックポイントサイズに悪影響があります。
ジョブの負荷が変化する場合(つまり、受信レコードの急増、定期的なウィンドウ集計または結合の時刻など)、次の設定を調整する必要になる場合があります:
-
taskmanager.network.memory.buffer-debloat.period
- これは、バッファサイズの再計算間の最小期間です。期間が短いほど、膨張の仕組みの反応時間は速くなりますが、必要は計算のためのCPUオーバーヘッドが高くなります。 -
taskmanager.network.memory.buffer-debloat.samples
- これは、スループット測定値が平均化されるサンプル数を調整します。収集されるサンプルの頻度はtaskmanager.network.memory.buffer-debloat.period
で調整できます。サンプルが少ないほど、膨張の仕組みの反応時間は速くなりますが、スループットが急増または低下する可能性が高く、バッファ膨張の仕組みがin-flightデータの最適な量を誤って計算する可能性があります。 -
taskmanager.network.memory.buffer-debloat.threshold-percentages
- バッファサイズの頻繁な変更を防ぐための最適化(つまり、新しいサイズが古いサイズに比べてそれほど変わらない場合)。
詳細と追加のパラメータについては、設定のドキュメントを参照してください。
現在のバッファサイズの監視に使えるメトリクスは次の通りです:
estimatedTimeToConsumeBuffersMs
- 全ての入力チャンネルからのデータを消費する合計時間debloatedBufferSize
- 現在のバッファサイズ
制限事項 #
現在、バッファ膨張の仕組みによって自動的に処理されないケースがいくつかあります。
複数の入力と結合 #
現在、スループットの計算とバッファの膨張はサブタスクレベルで行われます。
サブタスクに複数の異なる入力がある場合、または結合された単一の入力がある場合、バッファの膨張により、スループットを維持するには、低スループットの入力にはバッファリングされたin-flightのデータが多すぎる可能性があり、一方高スループットの入力にはバッファが小さすぎる可能性があります。これは、異なる入力のスループットが大幅に異なる場合に特に顕著になる可能性があります。この機能をテストする時には、そのようなサブタスクに特別な注意を払うことをお勧めします。
バッファサイズとバッファ数 #
現在、バッファの膨張は、使われる最大バッファサイズのみが制限されます。実際のバッファサイズとバッファ数は変更されません。これは、膨張の仕組みはジョブのメモリ使用量を削減できないことを意味します。バッファの量またはサイズのいずれかを手動で減らす必要があります。
Furthermore, if you want to reduce the amount of buffered in-flight data below what buffer debloating currently allows, you might want to さらに、バッファされたin-flightデータの量を、現在バッファ膨張の仕組みで許可されている量よりも減らしたい場合は、バッファの数を手動で設定することをお勧めします。
高い並列度 #
現在、バッファ膨張の仕組みはデフォルトの設定を使った高並列度(200以上)では正しく実行されない可能性があります。
スループットの低下またはチェックポイント作成時間が予想よりも長いことが観測された場合、フローティングバッファ(taskmanager.network.memory.floating-buffers-per-gate
)をデフォルト値から少なくとも並列度に等しい数まで増やすことをお勧めします。
問題が発生する並列度の実際の値はジョブによって異なりますが、通常は数百を超えるはずです。
ネットワークバッファのライフサイクル #
Flinkには、出力ストリーム用と各入力ゲート用に1つずつ、複数のローカルバッファプールがあります。 各バッファプールの目標サイズは次の計算式で計算されます。
#channels * taskmanager.network.memory.buffers-per-channel + taskmanager.network.memory.floating-buffers-per-gate
バッファのサイズはtaskmanager.memory.segment-size
を設定することで設定できます。
入力ネットワークバッファ #
目標のバッファプールサイズに常に到達するとは限りません。 バッファを取得できない場合に、Flinkが失敗するかどうかを制御する閾値があります。 バッファの目標数のうち、この閾値を下回る部分は必須と見なされます。 残りがある場合は、オプションです。 必要なバッファを取得できないと、タスクが失敗します。 オプションのバッファが取得できない場合でもタスクは失敗しませんが、パフォーマンスが低下する可能性があります。
この閾値のデフォルト値は、ストリーミングワークロードの場合はInteger.MAX_VALUE
、バッチワークロードの場合は1000
です。
ユーザに正当な理由があり、自分が何をしているかよく分かっている場合を除き、この閾値を変更することはお勧めしません。
関連する設定オプションはtaskmanager.network.memory.read-buffer.required-per-gate.max
です。
一般的に、閾値を小さくすると"insufficient number of network buffers"例外が発生する可能性が低くなりますが、ワークロードのパフォーマンスが静かに下がる可能性があり、その逆も同様です。
出力ネットワークバッファ #
入力バッファプールと異なり、出力バッファプールは全てのサブパーティション間で共有するバッファのタイプが1つだけあります。
過度のデータスキューを避けるために、各サブパーティションのバッファ数はtaskmanager.network.memory.max-buffers-per-channel
設定で制限されます。
入力バッファプールと異なり、排他バッファおよびフローティングバッファの設定量は推奨値としてのみ扱われます。利用可能なバッファが十分にない場合、Flinkは出力サブバッファごとに1つの排他バッファと、ゼロ個のフローティングバッファのみを使って処理を進めることができます。
超過バッファ #
各出力サブタスクは最大taskmanager.network.memory.max-overdraft-buffers-per-gate
(デフォルトは5)の追加の超過バッファを要求することもできます。これらのバッファは、サブタスクがダウンストリームのサブタスクによってバックプレッシャーを受けており、サブタスクが現在行っている処理を完了するために複数のネットワークバッファを必要とする場合にのみ使われます。これは次のような状況で発生する可能性があります:
- 単一のネットワークバッファに収まらない非常に大きなレコードをシリアライズ化します。
- FlatMapのようなオペレータ。単一の入力レコードごとに多くの出力レコードを生成します。
- 定期的、またはいくつかのイベント(例えば
WindowOperator
のトリガーなど)への反応時に多数のレコードを出力するオペレータ。
このような状況で超過バッファが無いと、Flinkサブタスクスレッドがバックプレッシャーでブロックされ、例えばunalignedチェックポイントが完了できなくなります。これを軽減するために、超過バッファの概念が追加されました。これらの超過バッファは厳密にオプションであり、Flinkは通常のバッファのみを使って徐々に処理を進めることができます。つまり0
がtaskmanager.network.memory.max-overdraft-buffers-per-gate
の設定として許容されることを意味します。
この機能はパイプラインシャッフル
でのみ有効です。
in-flightバッファの数 #
排他バッファとフローティングバッファのデフォルト設定は、最大のスループットを得るのに十分なはずです。最小のin-flightデータを設定する必要がある倍は、排他バッファを0
に設定し、メモリセグメントサイズを減らすことができます。
バッファサイズの選択 #
バッファは、データ部分を次のサブタスクに送信する時のネットワークオーバーヘッドを最適化するためにレコードを収集します。次のサブタスクは、レコードを消費する前にレコードの全ての部分を受信する必要があります。
バッファサイズが小さすぎる場合、またはバッファのフラッシュ頻度(execution.buffer-timeout
設定パラメータ)が高すぎる場合、Flinkのランタイムでのバッファあたりのオーバーヘッドがレコードあたりのオーバーヘッドよりも大幅に大きいため、スループットが低下する可能性があります。
経験則として、実際のワークロードでネットワークのボトルネック(ダウンストリームオペレータのアイドリング、アップストリームのバックプレッシャー、出力バッファキューがフル、ダウンストリーム入力キューが空)が観測されない限り、バッファサイズやバッファタイムアウトを増やすことを検討することはお勧めしません。
バッファサイズが大きすぎると、次のような問題が発生する可能性があります:
- メモリ使用量が多い
- 巨大なチェックポイントデータ(unalingedチェックポイント)
- 長いチェックポイント時間(alignedチェックポイント)
- フラッシュされたバッファは部分的にのみ送信されるため、小さい
execution.buffer-timeout
で割り当てられたメモリを非効率に使用
バッファカウントの選択 #
バッファの数はtaskmanager.network.memory.buffers-per-channel
とtaskmanager.network.memory.floating-buffers-per-gate
設定で設定されます。
最高のスループットのために、排他バッファとフローティングバッファの数にデフォルト値を使うことをお勧めします(制限があるケースを除く)。in-flightデータの量が問題の原因となっている場合、バッファ膨張を有効にすることをお勧めします。
ネットワークバッファの数を手動で調整することができますが、次のことを考慮してください:
- 予想されるスループット(
bytes/second
単位)に応じてバッファの数を調整する必要があります。 クレジットの割り当てとバッファnお送信には時間が掛かります(2つのノード間で約2往復のメッセージ)。遅延はネットワークによっても異なります。
バッファの往復時間(正常なローカルネットワークで約1ms
)、バッファサイズ、予想されるスループットを使って、次の式でスループットを維持するために必要なバッファの数を計算できます:
number_of_buffers = expected_throughput * buffer_roundtrip / buffer_size
例えば、予想されるスループットが320MB/s
、ラウンドトリップレイテンシが1ms
、デフォルトのメモリセグメントサイズの場合、期待されるスループットを達成するために必要なアクティブに使用されるバッファの数は10です:
number_of_buffers = 320MB/s * 1ms / 32KB = 10
-
フローティングバッファの目的はデータスキューシナリオを処理することです。理想的には、そのチャンネルに属するフローティングバッファの数(デフォルト:8)と排他バッファ(デフォルト:2)の数がネットワークスループットを飽和させることができる必要があります。しかし、これは必ずしも実現可能でも必要であるわけでもありません。task managerの中の全てのサブタスクのうち1つのチャンネルだけが使われることは非常に稀です。
-
排他バッファの目的は柔軟なスループットを提供することです。1つのバッファが転送中の場合、もう1つのバッファは一杯になります。高スループットセットアップでは、排他バッファの数がFlinkが使うin-flightデータ量を定義する主な要素になります。
低スループットセットアップでバックプレッシャーが発生する場合は、排他バッファの数を減らすことを検討する必要があります。
概要 #
Flinkでのネットワークのメモリ設定チューニングは、バッファ膨張の仕組みを有効にすることで簡素化できます。それを調整する必要があるかもしれません。
これが機能しない場合は、バッファ膨張の仕組みを無効にし、メモリセグメントサイズとバッファの数を手動で設定することができます。この2番目のシナリオでは、次のことをお勧めします:
- 最大スループットのデフォルト値を使う
- メモリセグメントサイズや排他バッファの数を削減して、チェックポイントを高速化し、ネットワークスタックのメモリ消費を削減します