Execution Configuration
This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.

実行設定 #

StreamExecutionEnvironment<x2/はランタイムのためのジョブに固有の設定値を設定することができるExecutionConfig`を含みます。 全てのジョブに影響するデフォルトを変更するには、設定を見てください。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
ExecutionConfig executionConfig = env.getConfig();
val env = StreamExecutionEnvironment.getExecutionEnvironment
var executionConfig = env.getConfig
env = StreamExecutionEnvironment.get_execution_environment()
execution_config = env.get_config()

以下の設定オプションが利用可能です: (デフォルトはボールド)

  • setClosureCleanerLevel(). closure cleanerレベルはデフォルトでClosureCleanerLevel.RECURSIVEに設定されます。クロージャークリーナーはFlinkプログラム内の匿名関数の取り巻くクラスへの不必要な参照を削除します。 クロージャークリーナーを無効にすることで、通常はシリアライズ化できない匿名ユーザ関数が取り巻くクラスを参照することが起きるかもしれません。これはシリアライザによって例外に繋がるでしょう。設定は以下の通りです: NONE: クロージャーのクリーナーを完全に無効化します。TOP_LEVEL: フィールドを再帰することなく、トップレベルのクラスだけを削除します。RECURSIVE: 全てのフィールドを再帰的に削除します。

  • getParallelism() / setParallelism(int parallelism) ジョブのためのデフォルトの並行度を設定します。

  • getMaxParallelism() / setMaxParallelism(int parallelism) ジョブのためのデフォルトの最大並行度を設定します。この設定は並行度の最大度合を決定し、動的スケーリングの上限を指定します。

  • getNumberOfExecutionRetries() / setNumberOfExecutionRetries(int numberOfExecutionRetries) 失敗したタスクが再実行される回数を設定します。0の値は耐障害性を効率的に無効にします。-1の値は(設定で定義された)システムのデフォルト値が使われるべきであることを示します。非推奨です。代わりにrestart strategiesを使ってください。

  • getExecutionRetryDelay() / setExecutionRetryDelay(long executionRetryDelay) ジョブが失敗した後で再実行する前にシステムが待つ遅延をミリ秒で設定します。タスクマネージャー上で全てのタスクの停止が成功した後で延期が開始され、延期が過ぎた後でタスクは再実行されます。再実行を試行し、同じ問題で再びすぐに失敗する前に、(完全にはタイムアウトしていない壊れた接続のような)失敗に関係する特定のタイムアウトを明るみに出すために、再実行を遅延させるこのパラメータは有用です。このパラメータは1つ以上の再実行の場合にのみ効果があります。非推奨です。代わりにrestart strategiesを使ってください。

  • getExecutionMode() / setExecutionMode(). デフォルトの実行モードは PIPELINED です。プログラムを実行する実行モードを設定してください。実行モードはデータの交換がバッチあるいはパイプラインの方法で実施されるかどうかを定義します。

  • enableForceKryo() / disableForceKryo. Kryo はデフォルトでは強制されません。POJOだと解析したとしてもPOJOのためにKryoシリアライザを使うように GenericTypeInformation を強制します。これが好ましい場合もあります。例えば、Flinkの内部シリアライザがPOJOを適切に扱うのに失敗した場合。

  • enableForceAvro() / disableForceAvro(). Avroはデフォルトでは強制されません。AvroPOJOをシリアライズ化するためにKryoの代わりにAvroシリアライザを使うように AvroTypeInfo を強制します。

  • enableObjectReuse() / disableObjectReuse() デフォルトでは、オブジェクトはFlink内で再利用されません。オブジェクトの再利用モードを有効にすると、パフォーマンス改善のためにランタイムはユーザオブジェクトを再利用するでしょう。オペレータのユーザコード関数がこの挙動に気が付いていない場合は、これがバグに繋がることに注意してください。

  • getGlobalJobParameters() / setGlobalJobParameters() このメソッドによりユーザは独自のオブジェクトをジョブのためのグローバル設定として設定することができます。ExecutionConfigは全てのユーザ定義関数の中でアクセス可能なため、これはジョブの中でグローバルに利用可能な設定を作るための簡単な方法です。

  • addDefaultKryoSerializer(Class<?> type, Serializer<?> serializer) 指定されたtypeのためのKryoシリアライザインスタンスを登録します。

  • addDefaultKryoSerializer(Class<?> type, Class<?extends Serializer<?>> serializerClass) 指定されたtypeのためのKryoシリアライザ クラスを登録します。

  • registerTypeWithKryoSerializer(Class<?> type, Serializer<?> serializer) 指定された型をKryoを使って登録し、それのためのシリアライザを指定します。Kryoを使って型を登録することで、型のシリアライズ化はより効率的になるでしょう。

  • registerKryoType(Class<?> type) もし型がKryoを使ってシリアライズ化されることになる場合、タグ(整数のID)が確実に書かれるようにKryoで登録されるでしょう。もし型がKryoを使って登録されない場合、クラス名全体は各インスタンスを使ってシリアライズ化され、かなり高いI/Oのコストに繋がるでしょう。

  • registerPojoType(Class<?> type) シリアライズ化スタックを使って指定された型を登録します。結局型がPOJOとしてシリアライズ化される場合、型はPOJOシリアライザを使って登録されるでしょう。もし型が結局Kryoを使ってシリアライズ化される場合、タグだけが確実に書き込まれるようにKryoで登録されるでしょう。もし型がKryoを使って登録されない場合、クラス名全体は各インスタンスを使ってシリアライズ化され、かなり高いI/Oのコストに繋がるでしょう。

registerKryoType()を使って登録された型はFlinkのPOJOシリアライザインスタンスで利用できないことに注意してください。

  • disableAutoTypeRegistration() デフォルトでは自動的な型登録が有効です。自動的な型登録はKryoおよびPOJOシリアライザを使ってユーザコードによって使われる(サブ タイプも含む)全ての型を登録します。

  • setTaskCancellationInterval(long interval) 実行中のタスクを中止しようとする連続の試行間で待つ間隔(ミリ秒)を設定します。タスクが中止される場合、もしタスクスレッドが一定期間内に中断されない場合はタスクのスレッド上で定期的にinterrupt()を呼ぶ新しいスレッドが作成されます。このパラメータはinterrupt()への連続する呼び出し間の時間を参照し、デフォルトで30000ミリ秒か30秒に設定されます。

getRuntimeContext()メソッドを使ってRich*関数内でアクセス可能なRuntimeContextは、全てのユーザ定義関数の中で ExecutionConfigにアクセスすることもできます。

Back to top

inserted by FC2 system