This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.
TaskManagerのメモリのセットアップ #
TaskManagerはFlinkでユーザコードを実行します。 ニーズに合わせてメモリ使用量を設定すると、Flinkのリソースのフットプリントが大幅に削減され、ジョブの安定性が向上します。
さらに説明されているメモリ設定は、リリースバージョン1.10以降に適用可能です。Flinkを以前のバージョンからアップグレードする場合は、1.10リリースで多くの変更が導入されたため、移行ガイドを確認してください。
このメモリセットアップガイドは、TaskManagersにだけ関係します! TaskManagerのメモリコンポーネントは、JobManagerプロセスのメモリモデルと似ていますが、より洗練された構造を持っています。
合計メモリの設定 #
Flink JVM プロセスの合計プロセスメモリは、Flinkアプリケーションによって消費されるメモリ(total Flink memory)とJVMがプロセスを実行するために消費されるメモリで構成されます。合計Flinkメモリの消費には、JVMヒープ、管理メモリ(Flinkによって管理されます)、その他のダイレクト(あるいはネイティブ)メモリの使用量が含まれます。
クラスタを作成せずにFlinkをローカル(例えば、IDEから)実行する場合、メモリ設定オプションのサブセットのみが関連します。詳細については、ローカル実行を参照してください。
それ以外の場合、TaskManagersのメモリをセットアップする最も簡単な方法は、合計メモリを設定することです。 よりきめ細かい方法は、ここで詳しく説明されています。
残りのメモリコンポーネントはデフォルト値または追加で設定されたオプションに基づいて自動的に調整されます。 他のメモリコンポーネントの詳細については、次の章を参照してください。
ヒープメモリと管理メモリを設定 #
合計メモリの説明で前述したように、Flinkでメモリをセットアップするもう1つの方法は、タスクヒープと管理メモリの両方を明示的に指定することです。 これにより、Flinkのタスクで利用可能なJVMヒープとその管理メモリをより詳細に制御できるようになります。
残りのメモリコンポーネントはデフォルト値または追加で設定されたオプションに基づいて自動的に調整されます。 ここで、他のメモリコンポーネントについて詳しく説明します。
タスクヒープと管理メモリを明示的に設定した場合は、合計プロセスメモリもtotal Flink memoryも設定しないことをお勧めします。そうしなければ、メモリ設定の競合が簡単に発生する可能性があります。
タスク(オペレータ)ヒープメモリ #
ユーザコードで一定量のJVMヒープを利用できることを保証したい場合は、タスクヒープメモリを明示的に設定できますtaskmanager.memory.task.heap.size
)。
これはJVMヒープサイズに追加され、ユーザコードを実行するFlinkのオペレータ専用になります。
管理されるメモリ #
管理メモリはFlinkによって管理され、ネイティブメモリ(オフヒープ)として割り当てられます。次のワークロードは管理メモリを使います:
- ストリーミングジョブはそれをRocksDB状態バックエンドに使えます。
- ストリーミングジョブとバッチジョブの両方で、ソート、ハッシュテーブル、中間結果のキャッシュに使えます。
- ストリーミングジョブとバッチジョブの両方で、それをPythonプロセスのユーザ定義関数を実行するために使えます。
管理メモリのサイズは、
taskmanager.memory.managed.size
経由で明示的に設定されるか、taskmanager.memory.managed.fraction
を介して合計Flinkメモリの割合として計算されます。
Sizeもfractionも設定されている場合、Sizeはfractionを上書きします。 sizeもfractionも明示的に設定されていない場合、default fractionが使われます。
バックエンド用に設定する方法とバッチジョブも参照してください。
重み付けConsumer #
ジョブに複数の型の管理メモリconsumersが含まれている場合、これらの型の間で管理メモリを共有する方法を制御することもできます。
設定オプションtaskmanager.memory.managed.consumer-weights
を使って、各型の重みを設定することができ、Flinkはそれに比例して管理メモリを予約します。
有効なconsumer型は次のとおりです:
OPERATOR
: 組み込みアルゴリズム用。STATE_BACKEND
: ストリーミングのRocksDB状態バックエンド用。PYTHON
: Pythonプロセス用。
タオ知恵場、ストリーミングジョブがRocksDB状態バックエンドとPython UDFの両方を使い、consumer weightsがSTATE_BACKEND:70,PYTHON:30
として設定されている場合、FlinkはRocksDB状態バックエンド用に合計管理メモリの70%
を、Pythonプロセス用に30%
を予約します。
ジョブにその型の管理メモリconsumerが含まれている場合のみ、Flinkは管理メモリを予約します。
例えば、ストリーミングジョブがヒープ状態バックエンドとPython UDFを使い、consumer weightsがSTATE_BACKEND:70,PYTHON:30
として設定されている場合、ヒープ状態バックエンドは管理メモリを使わないため、FlinkはPythonプロセス用に全ての管理メモリを使います。
Flinkは、consumer weightに含まれていないconsumer型の管理メモリを予約しません。 欠落している型がジョブで実際に必要な場合、メモリ割り当てエラーが発生する場合があります。 デフォルトでは、全てのconsumer型が含まれます。 これは、重み付けが明示的に設定/上書きされている場合にのみ発生する可能性があります。
オフヒープメモリの設定(直接またはネイティブ) #
ユーザコードによって割り当てらえるオフヒープメモリはタスクのオフヒープメモリ(taskmanager.memory.task.off-heap.size
)に考慮される必要があります。
フレームワークのオフヒープメモリを調整することもできます。 Flinkフレームワークがさらに多くのメモリを必要とすることが確実な場合にのみ、この値を変更してください。
FlinkはフレームワークオフヒープメモリとタスクのオフヒープメモリをJVMの直接メモリ制限に組み込みます。JVMパラメータも参照してください。
注意ただし、ネイティブの非ダイレクトメモリの使用量はフレームワークのオフヒープメモリまたはタスクのオフヒープメモリの一部として考慮される可能性があるため、この場合はJVMのダイレクトメモリ制限が高くなります。
注意 ネットワークメモリもJVMのダイレクトメモリの一部ですが、Flinkによって管理され、設定されたサイズを超えないことが保証されています。したがって、ネットワークメモリのサイズ変更はこの状況では役に立ちません。
詳細なメモリモデルも参照してください。
詳細なメモリモデル #
以下のテーブルは、全てのメモリコンポーネント、上記の表現、各コンポーネントのサイズに影響を与えるFlink設定のリファレンスを一覧表示します:
ご覧のとおり、一部のメモリコンポーネントのサイズは、それぞれのオプションで簡単に設定できます。 他のコンポーネントは複数のオプションを使って調整できます。
フレームワークメモリ #
正当な理由がない限り、フレームワークメモリとフレームワークオフヒープメモリを変更すべきではありません。 一部の内部データ構造またはオペレーションのためにFlinkがより多くのメモリを必要とすることが確実な場合のみ、それらを調整してください。 これは、高い並列度など、特定のデプロイメント環境やジョブ構造に関連している可能性があります。 さらに、HadoopなどのFlinkの依存関係は、徳手のセットアップでより多くのダイレクトあるいはネイティブメモリを消費する可能性があります。
注意 現時点では、Flinkはフレームワーク、タスクメモリのヒープバージョンもオフヒープバージョンも分離しません。 フレームワークとタスクメモリの分離は、将来のリリースでさらに最適化するために使われます。
ローカルでの実行 #
クラスタを作成せずに(例えばIDEから)Flinkを単一のJavaプログラムとしてマシーン上でローカルに起動する場合、以下を除く全てのコンポーネントは無視されます:
メモリコンポーネント | 関連オプション | ローカル実行のデフォルト値 |
---|---|---|
タスクヒープ | taskmanager.memory.task.heap.size |
無限 |
タスクオフヒープ | taskmanager.memory.task.off-heap.size |
無限 |
管理メモリ | taskmanager.memory.managed.size |
128MB |
ネットワークメモリ | taskmanager.memory.network.min taskmanager.memory.network.max |
64MB |
上記の全てのコンポーネントは、ローカル実行のために明示的に設定できますが、設定しなければいけないというわけではありません。 設定されない場合、それらはデフォルト値に設定されます。ローカル実行モードの場合、タスクヒープメモリとタスクオフヒープは無限(Long.MAX_VALUEバイト)と見なされ、管理メモリのデフォルト値は128MBのみです。
注意 この場合、タスクヒープサイズは実際のヒープサイズとは全く関係ありません。 これは、次のリリースでの将来の最適化に関連する可能性があります。開始されたローカルプロセスの実際のJVMヒープサイズはFlinkによtt制御されず、プロセスの開始方法によって異なります。 JVMヒープサイズを制御したい場合は、タオいうするJVM引数、例えば、-Xmx、-Xmsを明示的に渡す必要があります。