StreamExecutionEnvironment
はランタイムのためのジョブに固有の設定値を設定することができるExecutionConfig
を含みます。全てのジョブに影響するデフォルトを変更するには、設定を見てください。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
ExecutionConfig executionConfig = env.getConfig();
val env = StreamExecutionEnvironment.getExecutionEnvironment
var executionConfig = env.getConfig
以下の設定オプションが利用可能です: (デフォルトはボールド)
enableClosureCleaner()
/ disableClosureCleaner()
. デフォルトでクロージャー クリーナーが有効です。クロージャー クリーナーはFlinkプログラム内の匿名関数の取り巻くクラスへの不必要な参照を削除します。With the closure cleaner disabled, it might happen that an anonymous user function is referencing the surrounding class, which is usually not Serializable. これはシリアライザによって例外に繋がるでしょう。
getParallelism()
/ setParallelism(int parallelism)
ジョブのためのデフォルトの並行度を設定する。
getMaxParallelism()
/ setMaxParallelism(int parallelism)
ジョブのためのデフォルトの最大並行度を設定する。この設定は並行度の最大度合を決定し、動的スケーリングの上限を指定します。
getNumberOfExecutionRetries()
/ setNumberOfExecutionRetries(int numberOfExecutionRetries)
失敗したタスクが再実行される回数を設定します。0の値は耐障害性を効率的に無効にします。-1
の値は(設定で定義された)システムのデフォルト値が使われるべきであることを示します。これは非推奨です。代わりにrestart strategiesを使ってください。
getExecutionRetryDelay()
/ setExecutionRetryDelay(long executionRetryDelay)
ジョブが失敗した後で再実行する前にシステムが待つ遅延をミリ秒で設定します。タスクマネージャー上で全てのタスクの停止が成功した後で延期が開始され、延期が過ぎた後でタスクは再実行されます。再実行を試行し、同じ問題で再びすぐに失敗する前に、(完全にはタイムアウトしていない壊れた接続のような)失敗に関係する特定のタイムアウトを明るみに出すために、再実行を遅延させるこのパラメータは有用です。このパラメータは1つ以上の再実行の場合にのみ効果があります。これは非推奨です。代わりにrestart strategies を使ってください。
getExecutionMode()
/ setExecutionMode()
. デフォルトの実行モードは PIPELINED です。プログラムを実行する実行モードを設定してください。実行モードはデータの交換がバッチあるいはパイプラインの方法で実施されるかどうかを定義します。
enableForceKryo()
/ disableForceKryo
. Kryo はデフォルトでは強制されません。POJOだと解析したとしてもPOJOSのためにKryoシリアライザを使うように GenericTypeInformation を強制します。これが好ましい場合もあります。例えば、Flinkの内部シリアライザがPOJOを適切に扱うのに失敗した場合。
enableForceAvro()
/ disableForceAvro()
. Avroはデフォルトでは強制されません。AvroPOJOをシリアライズ化するためにKryoの代わりにAvroシリアライザを使うように AvroTypeInformation を強制します。
enableObjectReuse()
/ disableObjectReuse()
デフォルトでは、オブジェクトはFlink内で再利用されません。オブジェクトの再利用モードを有効にすると、パフォーマンス改善のためにランタイムはユーザオブジェクトを再利用するでしょう。オペレータのユーザコード関数がこの挙動に気が付いていない場合は、これがバグに繋がることに注意してください。
enableSysoutLogging()
/ disableSysoutLogging()
ジョブマネージャーのステータスの更新はデフォルトでSystem.out
に出力されます。この設定によりこの挙動を無効にすることができます。
getGlobalJobParameters()
/ setGlobalJobParameters()
このメソッドによりユーザは独自のオブジェクトをジョブのためのグローバル設定として設定することができます。ExecutionConfig
は全てのユーザ定義関数の中でアクセス可能なため、これはジョブの中でグローバルに利用可能な設定を作るための簡単な方法です。
addDefaultKryoSerializer(Class> type, Serializer> serializer)
指定された type
のためのKryoシリアライザ インスタンスを登録します。
addDefaultKryoSerializer(Class> type, Class> serializerClass)
指定された type
のためのKryoシリアライザ クラスを登録します。
registerTypeWithKryoSerializer(Class> type, Serializer> serializer)
指定された型をKryoを使って登録し、それのためのシリアライザを指定します。Kryoを使って型を登録することで、型のシリアライズ化はより効率的になるでしょう。
registerKryoType(Class> type)
もし型がKryoを使ってシリアライズ化されることになる場合、タグ(整数のID)が確実に書かれるようにKryoで登録されるでしょう。もし型がKryoを使って登録されない場合、クラス名全体は各インスタンスを使ってシリアライズ化され、かなり高いI/Oのコストに繋がるでしょう。
registerPojoType(Class> type)
シリアライズ化スタックを使って指定された型を登録します。結局型がPOJOとしてシリアライズ化される場合、型はPOJOシリアライザを使って登録されるでしょう。もし型が結局Kryoを使ってシリアライズ化される場合、タグだけが確実に書き込まれるようにKryoで登録されるでしょう。もし型がKryoを使って登録されない場合、クラス名全体は各インスタンスを使ってシリアライズ化され、かなり高いI/Oのコストに繋がるでしょう。
registerKryoType()
を使って登録された型はFlinkのKryoシリアライザ インスタンスで利用できないことに注意してください。
disableAutoTypeRegistration()
デフォルトでは自動的な型登録が有効です。自動的な型登録はKryoおよびPOJOシリアライザを使ってユーザコードによって使われる(サブ タイプも含む)全ての型を登録します。
setTaskCancellationInterval(long interval)
実行中のタスクを中止しようとする連続の試行間で待つ間隔(ミリ秒)を設定します。タスクが中止される場合、もしタスクスレッドが一定期間内に中断されない場合はタスクのスレッド上で定期的にinterrupt()
を呼ぶ新しいスレッドが作成されます。このパラメータはinterrupt()
への連続する呼び出し間の時間を参照し、デフォルトで30000 ミリ秒か30 秒に設定されます。
getRuntimeContext()
メソッドを使ってRich*
関数内でアクセス可能なRuntimeContext
は、全てのユーザ定義関数の中で ExecutionConfig
にアクセスすることもできます。