Set up TaskManager Memory
This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.

TaskManagerのメモリのセットアップ #

TaskManagerはFlinkでユーザコードを実行します。 ニーズに合わせてメモリ使用量を設定すると、Flinkのリソースのフットプリントが大幅に削減され、ジョブの安定性が向上します。

さらに説明されているメモリ設定は、リリースバージョン1.10以降に適用可能です。Flinkを以前のバージョンからアップグレードする場合は、1.10リリースで多くの変更が導入されたため、移行ガイドを確認してください。

このメモリセットアップガイドは、TaskManagersにだけ関係します! TaskManagerのメモリコンポーネントは、JobManagerプロセスのメモリモデルと似ていますが、より洗練された構造を持っています。

合計メモリの設定 #

Flink JVM プロセスの合計プロセスメモリは、Flinkアプリケーションによって消費されるメモリ(total Flink memory)とJVMがプロセスを実行するために消費されるメモリで構成されます。合計Flinkメモリの消費には、JVMヒープ、管理メモリ(Flinkによって管理されます)、その他のダイレクト(あるいはネイティブ)メモリの使用量が含まれます。

Simple TaskManager Memory Model

クラスタを作成せずにFlinkをローカル(例えば、IDEから)実行する場合、メモリ設定オプションのサブセットのみが関連します。詳細については、ローカル実行を参照してください。

それ以外の場合、TaskManagersのメモリをセットアップする最も簡単な方法は、合計メモリを設定することです。 よりきめ細かい方法は、ここで詳しく説明されています。

残りのメモリコンポーネントはデフォルト値または追加で設定されたオプションに基づいて自動的に調整されます。 他のメモリコンポーネントの詳細については、次の章を参照してください。

ヒープメモリと管理メモリを設定 #

合計メモリの説明で前述したように、Flinkでメモリをセットアップするもう1つの方法は、タスクヒープ管理メモリの両方を明示的に指定することです。 これにより、Flinkのタスクで利用可能なJVMヒープとその管理メモリをより詳細に制御できるようになります。

残りのメモリコンポーネントはデフォルト値または追加で設定されたオプションに基づいて自動的に調整されます。 ここで、他のメモリコンポーネントについて詳しく説明します。

タスクヒープと管理メモリを明示的に設定した場合は、合計プロセスメモリtotal Flink memoryも設定しないことをお勧めします。そうしなければ、メモリ設定の競合が簡単に発生する可能性があります。

タスク(オペレータ)ヒープメモリ #

ユーザコードで一定量のJVMヒープを利用できることを保証したい場合は、タスクヒープメモリを明示的に設定できますtaskmanager.memory.task.heap.size)。 これはJVMヒープサイズに追加され、ユーザコードを実行するFlinkのオペレータ専用になります。

管理されるメモリ #

管理メモリはFlinkによって管理され、ネイティブメモリ(オフヒープ)として割り当てられます。次のワークロードは管理メモリを使います:

  • ストリーミングジョブはそれをRocksDB状態バックエンドに使えます。
  • ストリーミングジョブとバッチジョブの両方で、ソート、ハッシュテーブル、中間結果のキャッシュに使えます。
  • ストリーミングジョブとバッチジョブの両方で、それをPythonプロセスのユーザ定義関数を実行するために使えます。

管理メモリのサイズは、

Sizefractionも設定されている場合、Sizeはfractionを上書きします。 sizefractionも明示的に設定されていない場合、default fractionが使われます。

バックエンド用に設定する方法バッチジョブも参照してください。

重み付けConsumer #

ジョブに複数の型の管理メモリconsumersが含まれている場合、これらの型の間で管理メモリを共有する方法を制御することもできます。 設定オプションtaskmanager.memory.managed.consumer-weightsを使って、各型の重みを設定することができ、Flinkはそれに比例して管理メモリを予約します。 有効なconsumer型は次のとおりです:

  • OPERATOR: 組み込みアルゴリズム用。
  • STATE_BACKEND: ストリーミングのRocksDB状態バックエンド用。
  • PYTHON: Pythonプロセス用。

タオ知恵場、ストリーミングジョブがRocksDB状態バックエンドとPython UDFの両方を使い、consumer weightsがSTATE_BACKEND:70,PYTHON:30として設定されている場合、FlinkはRocksDB状態バックエンド用に合計管理メモリの70%を、Pythonプロセス用に30%を予約します。

ジョブにその型の管理メモリconsumerが含まれている場合のみ、Flinkは管理メモリを予約します。 例えば、ストリーミングジョブがヒープ状態バックエンドとPython UDFを使い、consumer weightsがSTATE_BACKEND:70,PYTHON:30として設定されている場合、ヒープ状態バックエンドは管理メモリを使わないため、FlinkはPythonプロセス用に全ての管理メモリを使います。

Flinkは、consumer weightに含まれていないconsumer型の管理メモリを予約しません。 欠落している型がジョブで実際に必要な場合、メモリ割り当てエラーが発生する場合があります。 デフォルトでは、全てのconsumer型が含まれます。 これは、重み付けが明示的に設定/上書きされている場合にのみ発生する可能性があります。

オフヒープメモリの設定(直接またはネイティブ) #

ユーザコードによって割り当てらえるオフヒープメモリはタスクのオフヒープメモリ(taskmanager.memory.task.off-heap.size)に考慮される必要があります。

フレームワークのオフヒープメモリを調整することもできます。 Flinkフレームワークがさらに多くのメモリを必要とすることが確実な場合にのみ、この値を変更してください。

FlinkはフレームワークオフヒープメモリタスクのオフヒープメモリをJVMの直接メモリ制限に組み込みます。JVMパラメータも参照してください。

注意ただし、ネイティブの非ダイレクトメモリの使用量はフレームワークのオフヒープメモリまたはタスクのオフヒープメモリの一部として考慮される可能性があるため、この場合はJVMのダイレクトメモリ制限が高くなります。

注意 ネットワークメモリもJVMのダイレクトメモリの一部ですが、Flinkによって管理され、設定されたサイズを超えないことが保証されています。したがって、ネットワークメモリのサイズ変更はこの状況では役に立ちません。

詳細なメモリモデルも参照してください。

詳細なメモリモデル #

Simple memory model

以下のテーブルは、全てのメモリコンポーネント、上記の表現、各コンポーネントのサイズに影響を与えるFlink設定のリファレンスを一覧表示します:

  コンポーネント     設定オプション     説明  
フレームワークヒープメモリ taskmanager.memory.framework.heap.size Flinkフレームワーク専用のJVMヒープメモリ(詳細オプション)
タスクヒープメモリ taskmanager.memory.task.heap.size オペレータやユーザコードを実行するためのFlinkアプリケーション専用のJVMヒープメモリ
管理メモリ taskmanager.memory.managed.size
taskmanager.memory.managed.fraction
Flinkによって管理されるネイティブメモリ。ソート、ハッシュテーブル、中間結果のキャッシュ、RocksDB状態バックエンド用に予約されています。
フレームワークオフヒープメモリ taskmanager.memory.framework.off-heap.size Flinkフレームワーク専用のオフヒープダイレクト(またはネイティブ)メモリ
タスクオフヒープメモリ taskmanager.memory.task.off-heap.size オペレータを実行するためのFlinkアプリケーション専用のオフヒープダイレクト(あるいはネイティブ)メモリ
ネットワークメモリ taskmanager.memory.network.min
taskmanager.memory.network.max
taskmanager.memory.network.fraction
タスク間のデータレコード交換のために予約されたダイレクトメモリ(例えば、ネットワーク経由の転送用のバッファリング)は、合計Flinkメモリ上限付き分割コンポーネントです。このメモリはネットワークバッファの割り当てに使われます。
JVMメタスペース taskmanager.memory.jvm-metaspace.size Flink JVMプロセスのメタスペースサイズ
JVMオーバーヘッド taskmanager.memory.jvm-overhead.min
taskmanager.memory.jvm-overhead.max
taskmanager.memory.jvm-overhead.fraction
他のJVMオーバーヘッド用に予約されているネイティブメモリ: 例えば、スレッドスタック、コードキャッシュ、ガベージコレクション空間など。 [プロセスメモリ全体]の(/docs/deployment/memory/mem_setup/#configure-total-memory)上限付き分割コンポーネントです。

ご覧のとおり、一部のメモリコンポーネントのサイズは、それぞれのオプションで簡単に設定できます。 他のコンポーネントは複数のオプションを使って調整できます。

フレームワークメモリ #

正当な理由がない限り、フレームワークメモリフレームワークオフヒープメモリを変更すべきではありません。 一部の内部データ構造またはオペレーションのためにFlinkがより多くのメモリを必要とすることが確実な場合のみ、それらを調整してください。 これは、高い並列度など、特定のデプロイメント環境やジョブ構造に関連している可能性があります。 さらに、HadoopなどのFlinkの依存関係は、徳手のセットアップでより多くのダイレクトあるいはネイティブメモリを消費する可能性があります。

注意 現時点では、Flinkはフレームワーク、タスクメモリのヒープバージョンもオフヒープバージョンも分離しません。 フレームワークとタスクメモリの分離は、将来のリリースでさらに最適化するために使われます。

ローカルでの実行 #

クラスタを作成せずに(例えばIDEから)Flinkを単一のJavaプログラムとしてマシーン上でローカルに起動する場合、以下を除く全てのコンポーネントは無視されます:

  メモリコンポーネント     関連オプション     ローカル実行のデフォルト値  
タスクヒープ taskmanager.memory.task.heap.size 無限
タスクオフヒープ taskmanager.memory.task.off-heap.size 無限
管理メモリ taskmanager.memory.managed.size 128MB
ネットワークメモリ taskmanager.memory.network.min
taskmanager.memory.network.max
64MB

上記の全てのコンポーネントは、ローカル実行のために明示的に設定できますが、設定しなければいけないというわけではありません。 設定されない場合、それらはデフォルト値に設定されます。ローカル実行モードの場合、タスクヒープメモリタスクオフヒープは無限(Long.MAX_VALUEバイト)と見なされ、管理メモリのデフォルト値は128MBのみです。

注意 この場合、タスクヒープサイズは実際のヒープサイズとは全く関係ありません。 これは、次のリリースでの将来の最適化に関連する可能性があります。開始されたローカルプロセスの実際のJVMヒープサイズはFlinkによtt制御されず、プロセスの開始方法によって異なります。 JVMヒープサイズを制御したい場合は、タオいうするJVM引数、例えば、-Xmx-Xmsを明示的に渡す必要があります。

inserted by FC2 system