設定

For single-node setups Flink is ready to go out of the box and you don’t need to change the default configuration to get started.

すぐに使える設定はデフォルトのJavaインストレーションを使うでしょう。使用するJavaランタイムを手動で上書きしたい場合は、環境変数JAVA_HOMEあるいはconf/flink-conf.yaml内で設定キーenv.java.homeを手動で設定することができます。

このページでは効率的(分散)インストレーションをセットアップするために代表的に必要な最も一般的なオプションをリスト化します。さらに全ての利用可能な設定パラメータの完全なリストをここでリスト化します。

全ての設定はconf/flink-conf.yaml内で行われます。これはkey: valueの形式を持つYAML キー 値のペア の平坦な集合が期待されます。

システムと実行スクリプトは開始時に設定をパースします。設定ファイルへの変更はFlinkのジョブマネージャーとタスクマネージャーの再起動を必要とします。

タスクマネージャーのための設定ファイルは異なるかも知れません。Flinkはクラスタ内の均一なマシーンを仮定しません。

一般的なオプション

  • env.java.home: 使用するJavaインストレーションへのパス (デフォルト: もしあれば、システムのデフォルトのJavaインストレーション)。もしスタートアップスクリプトが自動的にJavaホームディレクトリを解決するのに失敗した場合は指定する必要があります。特定のjavaインストレーションあるいはバージョンを指定することができます。このオプションが指定されなかった場合は、スタートアップスクリプトは$JAVA_HOME環境変数も評価します。

  • env.java.opts: 独自のJVMオプションを設定します。この値はFlinkのスタートスクリプト、ジョブマネージャーとタスクマネージャーの両方、およびFlinkのYARNクライアントによって尊重されます。これは異なるガベージコレクションの設定、あるいはリモートのデバッガーをFlinkのサービスを実行しているJVMに含めるために使うことができます。ジョブマネージャーあるいはタスクマネージャー固有のそれぞれのオプションのために、env.java.opts.jobmanagerenv.java.opts.taskmanagerを使います。

  • env.java.opts.jobmanager: ジョブマネージャー固有のJVMオプション。これらは通常のenv.java.optsに加えて使用されます。この設定オプションはYARNクライアントによって無視されます。

  • env.java.opts.taskmanager: タスクマネージャー固有のJVMオプション。これらは通常のenv.java.optsに加えて使用されます。この設定オプションはYARNクライアントによって無視されます。

  • jobmanager.rpc.address: The external address of the JobManager, which is the master/coordinator of the distributed system (DEFAULT: localhost). Note: The address (host name or IP) should be accessible by all nodes including the client.

  • jobmanager.rpc.port: ジョブマネージャーのポート番号 (デフォルト: 6123)。

  • jobmanager.heap.mb: ジョブマネージャーのためのJVM ヒープサイズ (メガバイト単位)。(多くのオペレーターを持つ)とても大きなアプリケーションを実行しているか、それらの長い履歴を保持する場合は、ジョブマネージャーのヒープサイズを増加させるべきかも知れません。

  • taskmanager.heap.mb: タスクマネージャーのJVM ヒープサイズ (メガバイト単位)。これはsystemの平行ワーカーです。Hadoopとは対照的に、Flinkはオペレータ(例えばjoin, aggregat)とユーザ定義の関数(例えば Map, Reduce, CoGroup) をタスクマネージャー ( sorting/hashing/caching を含む)の中で実行します。つまりこの値はできるだけ大きくしなければなりません。もしクラスタが排他的にFlinkを実行する場合は、マシーンごとの利用可能な総メモリからオペレーティングシステムのメモリ(おそらく 1-2 GB) を引いたものが良い値です。YARN セットアップ上では、この値はタスクマネージャーのYARNコンテナのサイズに自動的に設定され、ある程度涵養できる値が引かれます。

  • taskmanager.numberOfTaskSlots: 1つのタスクマネージャーが実行できる、並行オペレータあるいはユーザ関数のインスタンスの数(デフォルト: 1)。値が1より大きい場合は、1つのシングルマネージャーは関数あるいはオペレータの複数のインスタンスを取ることができます。そのようにして、タスクマネージャーは複数のCPUコアを使うことができますが、同時に、利用可能なメモリは異なるオペレータあるいは関数のインスタンスの間で分配されます。この値は一般的にタスクマネージャーのマシーンが持つ物理CPUコアの数に比例します(例えば、コアの数に一致、あるいはコアの数の半分)。タスクスロットについてのより多くのこと

  • parallelism.default: 並行度が指定されていないプログラムのために使うデフォルトの並行度。(デフォルト: 1). 同時実行のジョブが無いセットアップについては、この値を NumTaskManagers * NumSlotsPerTaskManager にすることでシステムはプログラムの実行のために全ての利用可能な実行リソースを使うでしょう。注意: デフォルトの並行度はExecutionEnvironment上でsetParallelism(int parallelism)を呼び出すか、 -p <parallelism> をFlinkコマンドライン フロントエンに渡すことで上書きすることができます。一つの返還のためにオペレータ上でsetParallelism(int parallelism) を呼び出すことで上書きすることができます。See Parallel Execution for more information about parallelism.

  • fs.default-scheme: 接続するために必要な権限を持つ、使用されるデフォルトのファイルシステムスキーマ。例えば、HDFSの場合はネームノードの host:ポート (必要であれば)。デフォルトでは、これはローカルファイルシステムを示すfile:///に設定されます。このことは、明示的なスキーマの定義 無しに、ユーザ定義ファイルを検索するtあめにローカルファイルシステムが使われることを意味します。他の例として、これがhdfs://localhost:9000/に設定された場合、/user/USERNAME/in.txtのような明示的なスキーマ定義を持たないユーザ定義のパスはhdfs://localhost:9000/user/USERNAME/in.txtに変換されるでしょう。他のスキーマがユーザ指定のURIの中で(明示的に)指定された場合のみ、このスキーマが使われます。

  • fs.hdfs.hadoopconf: Hadoopファイルシステム(HDFS)の設定ディレクトリの絶対パス (任意の値)。この値を設定することで、プログラムはファイルのURI内にネームノードのアドレスをポートを含まない短いURL (hdfs:///path/to/filesを使ってHDFSファイルを参照することができます。このオプションが無い場合、HDFSはアクセスすることができますが、hdfs://address:port/path/to/filesのような完全装飾URIを必要とします。このオプションはファイルライターにブロックサイズとりプリケーションファクターのためのHDFSのデフォルト値を取り出すようにもします。Flinkは指定されたディレクトリの"core-site.xml"と"hdfs-site.xml"ファイルを調べないでしょう。

より進んだオプション

管理されるメモリ

デフォルトで、Flinkは管理しているメモリについてtaskmanager.heap.mbを使って設定された総メモリの 0.7 の割合を割り当てます。管理されたメモリはFlinkがバッチオペレーションを効率よく実施するのを助けます。Flinkはオペレーションを実行するために使うことができるメモリの量を知っているため、それはOutOfMemoryExceptionを避けます。Flinkが管理外のメモリを実行する場合は、ディスク空間を利用します。管理されているメモリを利用することで、いくつかのオペレーションはデータをJavaオブジェクトに変換する必要が無く生データ上で直接実施することができます。全体として管理されたメモリはシステムの堅牢さとスピードを改善します。

管理メモリのためのデフォルトの割合はtaskmanager.memory.fraction パラメータを使って調整可能です。絶対値はtaskmanager.memory.sizeを使って設定されるかもしれません (割合パラメータを上書きます)。必要であれば、管理メモリはJVMヒープ外に割り当てられるかもしれません。これは大きなメモリサイズを持つセットアップ内でのパフォーマンスを改善するかも知れません。

  • taskmanager.memory.size: タスクマネージャーがソート、ハッシュテーブル、および中間結果をキャッシュするためにヒープ上あるいはヒープ外(taskmanager.memory.off-heapに依存します)に確保する総メモリ量(メガバイト単位)。指定しない(-1)場合、メモリマネージャーは taskmanager.memory.fractionによって指定されたタスクマネージャーJVMのサイズに関する固定した割合を取るでしょう。(デフォルト: -1)

  • taskmanager.memory.fraction: タスクマネージャーがソート、ハッシュテーブルおよび中間結果をキャッシュするために確保する、(taskmanager.heap.mbに関する)相対的なメモリ量。例えば、0.8の値はタスクマネージャーが内部的なデータバッファのためにメモリの80% (ヒープ上あるいはヒープ外はtaskmanager.memory.off-heapに依存します)を確保し、フリーメモリの残り20%をユーザ定義関数によって生成されるオブジェクトのためのタスクマネージャーのヒープのために確保することを意味します。(デフォルト: 0.7) taskmanager.memory.size が設定されない場合は、このパラメータは評価のみされます。

  • taskmanager.memory.off-heap: trueに設定した場合、タスクマネージャーはソート、ハッシュテーブル および 中間結果のキャッシュのために使われるメモリをJVMヒープ外に割り当てます。大量のメモリを持つセットアップについては、これはメモリ上で実行さえるオペレーションの効率を改善することができます (デフォルト: false)。

  • taskmanager.memory.segment-size: メモリマネージャーとネットワークスタックによって使われるメモリバッファのバイトサイズ (デフォルト: 32768 (= 32 KiBytes))。

  • taskmanager.memory.preallocate: true あるいは falseのどちらかを設定することができます。タスクマネージャーが全ての管理メモリを開始時に割り当てるかどうかを指定します。(DEFAULT: false). When taskmanager.memory.off-heap is set to true, then it is advised that this configuration is also set to true. If this configuration is set to false cleaning up of the allocated offheap memory happens only when the configured JVM parameter MaxDirectMemorySize is reached by triggering a full GC. Note: For streaming setups, we highly recommend to set this value to false as the core state backends currently do not use the managed memory.

メモリとパフォーマンスのデバッグ

これらのオプションは、out-of-memory-process kill あるいは例外のような、問題に関係するメモリ及びガベージコレクションについてFlinkアプリケーションをデバッグするのに便利です。

  • taskmanager.debug.memory.startLogThread: タスクマネージャーが定期的のメモリとガベージコレクション統計を記録するようにします。この統計には、現在のヒープ、オフヒープ、および他のメモリプール利用と、ヒープメモリプールによってガベージコレクションに費やされた時間が含まれます。

  • taskmanager.debug.memory.logIntervalMs: タスクマネージャーがメモリおよびガベージコレクション統計を記録する時間間隔(ミリ秒単位)。taskmanager.debug.memory.startLogThread がtrueに設定された場合にのみ効果があります。

Kerberos-based Security

Flink supports Kerberos authentication for the following services:

  • Hadoop Components, such as HDFS, YARN, or HBase (version 2.6.1 and above; all other versions have critical bugs which might fail the Flink job unexpectedly).
  • Kafka Connectors (version 0.9+ and above).
  • Zookeeper

Configuring Flink for Kerberos security involves three aspects, explained separately in the following sub-sections.

1. Providing the cluster with a Kerberos credential (i.e. a keytab or a ticket via kinit)

To provide the cluster with a Kerberos credential, Flink supports using a Kerberos keytab file or ticket caches managed by kinit.

  • security.kerberos.login.use-ticket-cache: Indicates whether to read from your Kerberos ticket cache (default: true).

  • security.kerberos.login.keytab: Absolute path to a Kerberos keytab file that contains the user credentials.

  • security.kerberos.login.principal: Kerberos principal name associated with the keytab.

If both security.kerberos.login.keytab and security.kerberos.login.principal have values provided, keytabs will be used for authentication. It is preferable to use keytabs for long-running jobs, to avoid ticket expiration issues. If you prefer to use the ticket cache, talk to your administrator about increasing the Hadoop delegation token lifetime.

Note that authentication using ticket caches is only supported when deploying Flink as a standalone cluster or on YARN.

2. Making the Kerberos credential available to components and connectors as needed

For Hadoop components, Flink will automatically detect if the configured Kerberos credentials should be used when connecting to HDFS, HBase, and other Hadoop components depending on whether Hadoop security is enabled (in core-site.xml).

For any connector or component that uses a JAAS configuration file, make the Kerberos credentials available to them by configuring JAAS login contexts for each one respectively, using the following configuration:

  • security.kerberos.login.contexts: A comma-separated list of login contexts to provide the Kerberos credentials to (for example, Client,KafkaClient to use the credentials for ZooKeeper authentication and for Kafka authentication).

This allows enabling Kerberos authentication for different connectors or components independently. For example, you can enable Hadoop security without necessitating the use of Kerberos for ZooKeeper, or vice versa.

You may also provide a static JAAS configuration file using the mechanisms described in the Java SE Documentation, whose entries will override those produced by the above configuration option.

3. Configuring the component and/or connector to use Kerberos authentication

Finally, be sure to configure the connector within your Flink program or component as necessary to use Kerberos authentication.

Below is a list of currently first-class supported connectors or components by Flink for Kerberos authentication:

  • Kafka: see here for details on configuring the Kafka connector to use Kerberos authentication.

  • Zookeeper (for HA): see here for details on Zookeeper security configuration to work with the Kerberos-based security configurations mentioned here.

For more information on how Flink security internally setups Kerberos authentication, please see here.

その他

  • taskmanager.tmp.dirs: The directory for temporary files, or a list of directories separated by the system’s directory delimiter (for example ‘:’ (colon) on Linux/Unix). 複数のディレクトリが指定された場合は、一時ファイルはラウンドロビンの形式でディレクトリに渡って分散されるでしょう。I/O マネージャーコンポーネントは、ディレクトリごとに1つの読み込みと1つの書き込みスレッドをspawnするでしょう。A directory may be listed multiple times to have the I/O manager use multiple threads for it (for example if it is physically stored on a very fast disc or RAID) (DEFAULT: The system’s tmp dir).

  • taskmanager.log.path: タスクマネージャーのログファイルの場所を定義する設定パラメータ

  • jobmanager.web.address: Address of the JobManager’s web interface (DEFAULT: anyLocalAddress()).

  • jobmanager.web.port: ジョブマネージャーのwebインタフェースのポート (デフォルト: 8081)。

  • jobmanager.web.tmpdir: この設定パラメータを使ってwebインタフェースによって使われるFlink webディレクトリを定義することができます。webインタフェースは静的ファイルをディレクトリにコピーするでしょう。Also uploaded job jars are stored in the directory if not overridden. デフォルトで、テンポラリディレクトリが使われます。

  • jobmanager.web.upload.dir: The config parameter defining the directory for uploading the job jars. If not specified a dynamic directory will be used under the directory specified by jobmanager.web.tmpdir.

  • fs.overwrite-files: ファイルの出力ライターがデフォルトで既存のファイルを上書きするかどうかを指定します。デフォルトで上書きするためにはtrueに設定し、そうでなければfalse です。(デフォルト: false)

  • fs.output.always-create-directory: 並行度1以上で実行中のFile writerが出力ファイルパスのためのディレクトリを作成し、異なる結果ファイル(並行writerタスクごとに1つ)をディレクトリ内に置きます。このオプションが trueに設定された場合、並行度1のライターはディレクトリも生成し、その中に1つの結果ファイルを置くでしょう。オプションがfalseに設定された場合、ライターはディレクトリを生成せずに出力パスに直接ファイルを生成します。(デフォルト: false)

  • taskmanager.network.numberOfBuffers: ネットワークスタックに利用可能なバッファの数。この数値はタスクマネージャーがどれだけ多くのストリーミングデータ交換チャネルを同時に持つことができ、チャネルがどれほど良くバッファされるかを決定します。ジョブが却下された場合、あるいはシステムが利用可能なバッファを十分に持っていないと警告された場合は、この値を増やしてください (デフォルト: 2048)。

  • state.backend: チェックポイントが有効な場合、オペレータの状態のチェックポイントが格納されるために使われるであろうバックエンド。サポートされるバックエンド:
    • jobmanager: インメモリの状態、ジョブマネージャー/ZooKeeperのメモリにバックアップします。最小の状態(Kafkaオフセット)あるいはテストおよびローカルデバッギングのためにのみ使われるべきです。
    • filesystem: 状態はタスクマネージャー上のメモリ内にあり、状態のスナップショットはファイルシステムに格納されます。例えば HDFS、S3、… のFlinkによってサポートされる全てのファイルシステムによってサポートされます。
  • state.backend.fs.checkpointdir: Flinkがサポートするファイルシステム内に格納されるチェックポイントのためのディレクトリ。注意: 状態のバックエンドはジョブマネージャーからアクセス可能でなければなりません。ローカルセットアップのためにはfile:// のみ使います。

  • state.backend.rocksdb.checkpointdir: The local directory for storing RocksDB files, or a list of directories separated by the systems directory delimiter (for example ‘:’ (colon) on Linux/Unix). (DEFAULT value is taskmanager.tmp.dirs)

  • state.checkpoints.dir: The target directory for meta data of externalized checkpoints.

  • high-availability.zookeeper.storageDir: Required for HA. ジョブマネージャーのメタデータを格納するディレクトリ; これは状態バックエンドの中に保持され、この状態のポインタだけがZooKeeperに格納されます。チェックポイントのディレクトリと正しく同じで、ジョブマネージャーからアクセス可能でなければならず、ローカル配備のためだけに使われなければなりません。Previously this key was named recovery.zookeeper.storageDir.

  • blob.storage.directory: Directory for storing blobs (such as user JARs) on the TaskManagers.

  • blob.server.port: Port definition for the blob server (serving user JARs) on the TaskManagers. デフォルトでは、ポートは0に設定されます。これはオペレーティングシステムが短命ポートを選んだことを意味します。Flink はポートのリスト(“50100,50101”)、範囲(“50100-50200”) あるいはそれら両方の組み合わせも受け付けます。複数のジョブマネージャーが同じマシーン上で実行中の場合は、衝突を避けるためにポートの範囲を設定することをお勧めします。

  • blob.service.ssl.enabled: Flag to enable ssl for the blob client/server communication. This is applicable only when the global ssl flag security.ssl.enabled is set to true (DEFAULT: true).

  • restart-strategy: Default restart strategy to use in case no restart strategy has been specified for the job. オプションは以下の通りです:
    • fixed delay strategy: fixed-delay.
    • failure rate strategy: failure-rate.
    • no restarts: none

    Default value is none unless checkpointing is enabled for the job in which case the default is fixed-delay with Integer.MAX_VALUE restart attempts and 10s delay.

  • restart-strategy.fixed-delay.attempts: デフォルトの再起動ストラテジが"fixed-delay"に設定された場合に使われる再起動の試行の数。Default value is 1, unless “fixed-delay” was activated by enabling checkpoints, in which case the default is Integer.MAX_VALUE.

  • restart-strategy.fixed-delay.delay: デフォルトの再起動ストラテジが"fixed-delay"に設定された場合に使われる、再起動の試行間の遅延。Default value is the akka.ask.timeout, unless “fixed-delay” was activated by enabling checkpoints, in which case the default is 10s.

  • restart-strategy.failure-rate.max-failures-per-interval: "failure-rate"ストラテジでジョブが失敗するまでの、指定された時間内の再起動の最大数。デフォルト値は 1です。

  • restart-strategy.failure-rate.failure-rate-interval: "failure-rate"ストラテジでの失敗レートを計測するための時間間隔。デフォルト値は 1 秒です。

  • restart-strategy.failure-rate.delay: デフォルトの再起動ストラテジが"failure-rate"に設定された場合に使われる、再起動の試行間の遅延。デフォルト値はakka.ask.timeoutです。

完全なリファレンス

HDFS

これらのパラメータはFlinkによって使われるデフォルトのHDFSを設定します。Setups that do not specify a HDFS configuration have to specify the full path to HDFS files (hdfs://address:port/path/to/files) Files will also be written with default HDFS parameters (block size, replication factor).

  • fs.hdfs.hadoopconf: Hadoop設定ディレクトリへの絶対パス。systemは"core-site.xml"と"hdfs-site.xml"ファイルをそのディレクトリ内で探します (デフォルト: null)。

  • fs.hdfs.hdfsdefault: Hadoopの独自の設定ファイル"hdfs-default.xml"の絶対パス (デフォルト: null)。

  • fs.hdfs.hdfssite: Hadoopの独自の設定ファイル "hdfs-site.xml" の絶対パス (デフォルト: null)。

ジョブマネージャー と タスクマネージャー

以下のパラメータはFlinkのジョブマネージャーとタスクマネージャーを設定します。

  • jobmanager.rpc.address: The external address of the JobManager, which is the master/coordinator of the distributed system (DEFAULT: localhost). Note: The address (host name or IP) should be accessible by all nodes including the client.

  • jobmanager.rpc.port: ジョブマネージャーのポート番号 (デフォルト: 6123)。

  • taskmanager.hostname: タスクマネージャーがバインドされているネットワークインタフェースのホスト名。デフォルトで、タスクマネージャーはジョブマネージャーと他のタスクマネージャーに接続することができるネットワークインタフェースを探します。ストラテジが何らかの理由で失敗した場合に、このオプションはホスト名を定義するために使うことができます。異なるタスクマネージャーはこのオプションのために異なる値を必要とするため、通常追加の共有されないタスクマネージャー固有の設定ファイルが指定されます。

  • taskmanager.rpc.port: タスクマネージャーの IPC ポート (デフォルト: 0、これはOSによって空いているポートが選択されます)。

  • taskmanager.data.port: データ交換操作のために使われるタスクマネージャーのポート (デフォルト: 0、これはOSによって空いているポートが選択されます)。

  • taskmanager.data.ssl.enabled: Enable SSL support for the taskmanager data transport. This is applicable only when the global ssl flag security.ssl.enabled is set to true (DEFAULT: true)

  • jobmanager.heap.mb: ジョブマネージャーのためのJVM のヒープサイズ(メガバイト単位) (デフォルト: 256)。

  • taskmanager.heap.mb: タスクマネージャーのJVM ヒープサイズ (メガバイト単位)。これはsystemの平行ワーカーです。Hadoopとは対照的に、Flinkはオペレータ(例えばjoin, aggregat)とユーザ定義の関数(例えば Map, Reduce, CoGroup) をタスクマネージャー ( sorting/hashing/caching を含む)の中で実行します。つまりこの値はできるだけ大きくしなければなりません。YARN セットアップ上では、この値はタスクマネージャーのYARNコンテナのサイズに自動的に設定され、ある程度涵養できる値が引かれます。

  • taskmanager.numberOfTaskSlots: 1つのタスクマネージャーが実行できる、並行オペレータあるいはユーザ関数のインスタンスの数(デフォルト: 1)。値が1より大きい場合は、1つのシングルマネージャーは関数あるいはオペレータの複数のインスタンスを取ることができます。そのようにして、タスクマネージャーは複数のCPUコアを使うことができますが、同時に、利用可能なメモリは異なるオペレータあるいは関数のインスタンスの間で分配されます。この値は一般的にタスクマネージャーのマシーンが持つ物理CPUコアの数に比例します(例えば、コアの数に一致、あるいはコアの数の半分)。

  • taskmanager.tmp.dirs: The directory for temporary files, or a list of directories separated by the system’s directory delimiter (for example ‘:’ (colon) on Linux/Unix). 複数のディレクトリが指定された場合は、一時ファイルはラウンドロビンの形式でディレクトリに渡って分散されるでしょう。I/O マネージャーコンポーネントは、ディレクトリごとに1つの読み込みと1つの書き込みスレッドをspawnするでしょう。A directory may be listed multiple times to have the I/O manager use multiple threads for it (for example if it is physically stored on a very fast disc or RAID) (DEFAULT: The system’s tmp dir).

  • taskmanager.network.numberOfBuffers: ネットワークスタックに利用可能なバッファの数。この数値はタスクマネージャーがどれだけ多くのストリーミングデータ交換チャネルを同時に持つことができ、チャネルがどれほど良くバッファされるかを決定します。ジョブが却下された場合、あるいはシステムが利用可能なバッファを十分に持っていないと警告された場合は、この値を増やしてください (デフォルト: 2048)。

  • taskmanager.memory.size: タスクマネージャーがソート、ハッシュテーブル、中間結果をキャッシュするためにJVMのヒープ上に確保する総メモリ量(メガバイト単位)。指定しない(-1)場合、メモリマネージャーは taskmanager.memory.fraction によって指定された、JVMに利用可能なヒープメモリの固定の割合を取るでしょう。(デフォルト: -1)

  • taskmanager.memory.fraction: タスクマネージャーがソート、ハッシュテーブルおよび中間結果をキャッシュするために確保する、相対的なメモリ量。例えば、0.8の値はタスクマネージャーが内部的なデータバッファのためにJVMのヒープ空間の80%を確保し、JVMのヒープ空間の残りの20%をユーザ定義関数によって生成されるオブジェクトのために開けておくことを意味します。(デフォルト: 0.7) taskmanager.memory.size が設定されない場合は、このパラメータは評価のみされます。

  • taskmanager.debug.memory.startLogThread: タスクマネージャーが定期的のメモリとガベージコレクション統計を記録するようにします。この統計には、現在のヒープ、オフヒープ、および他のメモリプール利用と、ヒープメモリプールによってガベージコレクションに費やされた時間が含まれます。

  • taskmanager.debug.memory.logIntervalMs: タスクマネージャーがメモリおよびガベージコレクション統計を記録する時間間隔(ミリ秒単位)。taskmanager.debug.memory.startLogThread がtrueに設定された場合にのみ効果があります。

  • taskmanager.maxRegistrationDuration: Defines the maximum time it can take for the TaskManager registration. If the duration is exceeded without a successful registration, then the TaskManager terminates. The max registration duration requires a time unit specifier (ms/s/min/h/d) (e.g. “10 min”). (DEFAULT: Inf)

  • taskmanager.initial-registration-pause: The initial registration pause between two consecutive registration attempts. The pause is doubled for each new registration attempt until it reaches the maximum registration pause. The initial registration pause requires a time unit specifier (ms/s/min/h/d) (e.g. “5 s”). (DEFAULT: 500 ms)

  • taskmanager.max-registration-pause: The maximum registration pause between two consecutive registration attempts. The max registration pause requires a time unit specifier (ms/s/min/h/d) (e.g. “5 s”). (DEFAULT: 30 s)

  • taskmanager.refused-registration-pause: The pause after a registration has been refused by the job manager before retrying to connect. The refused registration pause requires a time unit specifier (ms/s/min/h/d) (e.g. “5 s”). (DEFAULT: 10 s)

  • taskmanager.jvm-exit-on-oom: Indicates that the TaskManager should immediately terminate the JVM if the task thread throws an OutOfMemoryError (DEFAULT: false).

  • blob.fetch.retries: タスクマネージャーがジョブマネージャーから(JARファイルのような)BLOBをダウンロードする試行数 (デフォルト: 50)。

  • blob.fetch.num-concurrent: ジョブマネージャーが提供する(JARファイルダウンロードのような)BLOB同時取り込み数 (デフォルト: 50)。

  • blob.fetch.backlog: ジョブマネージャーが可能な(JARファイルダウンロードのような)キューされたBLOBの取り込み最大数 (デフォルト: 1000)。

  • task.cancellation-interval: 二つの連続するタスクの取り消しの試行間のミリ秒単位の間隔 (デフォルト: 30000)。

分散の調整 (via Akka)

  • akka.ask.timeout: Timeout used for all futures and blocking Akka calls. Flinkがタイムアウトによって失敗する場合、この値を増やしてみるべきです。タイムアウトは遅いマシーンあるいは混雑したネットワークによっても起こり得ます。タイムアウトの値は time-単位記述子 (ms/s/min/h/d) を必要とします (デフォルト: 10 s)。

  • akka.lookup.timeout: ジョブマネージャーを検索するために使われるタイムアウト。タイムアウトの値はtime-単位識別子 (ms/s/min/h/d) を含む必要があります (デフォルト: 10 s)。

  • akka.client.timeout: Timeout used by Flink clients (e.g. CliFrontend, ClusterClient) when communicating with the Flink cluster. The timeout value has to contain a time-unit specifier (ms/s/min/h/d) (DEFAULT: 60 s).

  • akka.framesize: ジョブマネージャーとタスクマネージャー間で送信されるメッセージの最大サイズ。メッセージがこの制限を超えたためにFlinkが失敗する場合は、増やす必要があります。メッセージサイズはsize-単位識別子を必要とします (デフォルト: 10485760b)。

  • akka.watch.heartbeat.interval: AkkaのDeathWatch機構が死亡したタスクマネージャーを検知するためのハートビート間隔。ハートビートメッセージが喪失あるいは遅延したために間違ってタスクマネージャーが死亡したとマークされた場合、この値を増やすべきです。A thorough description of Akka’s DeathWatch can be found here (DEFAULT: 10 s).

  • akka.watch.heartbeat.pause: AkkaのDeatchWatch機構が受け入れられるハートビートの休止。低い値は不規則なハートビートを許可しません。A thorough description of Akka’s DeathWatch can be found here (DEFAULT: 60 s).

  • akka.watch.threshold: DeathWatchの障害検知の閾値。高い値は死亡したTaskManagerの検知するまでの時間が長くなるのに対し、低い値はfalse positiveになりがちです。AkkaのDeatchWatchの詳細な説明はここで見つけることができます (デフォルト: 12)。

  • akka.transport.heartbeat.interval: Akkaの転送障害検知のためのハートビート間隔。FlinkはTCPを使うため、検知は必要ではありません。したがって、間隔をとても大きな値にすることで検知を無効にします。転送障害検知必要とする場合は、間隔を意味のある値に設定します。間隔の値はtime-単位識別子 (ms/s/min/h/d) を必要とします (デフォルト: 1000 s)。

  • akka.transport.heartbeat.pause: Akkaの転送障害検知が受け入れられるハートビートの休止。FlinkはTCPを使うため、検知は必要ではありません。したがって、休止をとても大きな値にすることで検知を無効にします。転送障害検知必要とする場合は、休止を意味のある値に設定します。休止の値は time-単位記述子 (ms/s/min/h/d) を必要とします (デフォルト: 6000 s)。

  • akka.transport.threshold: 転送障害検知の閾値。FlinkはTCPを使うため検知は必要ありません。したがって閾値は大きな値に設定されます (デフォルト: 300)。

  • akka.tcp.timeout: 全ての外向きの接続のタイムアウトIf you should experience problems with connecting to a TaskManager due to a slow network, you should increase this value (DEFAULT: 20 s).

  • akka.throughput: スレッドをプールに返す前にバッチ内で処理されるメッセージの数。Low values denote a fair scheduling whereas high values can increase the performance at the cost of unfairness (DEFAULT: 15).

  • akka.log.lifecycle.events: イベントのAkkaのリモートログをオンにします。デバッグ時には、この値を'true'に設定します (デフォルト: false)。

  • akka.startup-timeout: リモートコンポーネントの起動が失敗したと見なされるタイムアウト (デフォルト: akka.ask.timeout)。

  • akka.ssl.enabled: Turns on SSL for Akka’s remote communication. This is applicable only when the global ssl flag security.ssl.enabled is set to true (DEFAULT: true).

SSL Settings

  • security.ssl.enabled: Turns on SSL for internal network communication. This can be optionally overridden by flags defined in different transport modules (DEFAULT: false).

  • security.ssl.keystore: The Java keystore file to be used by the flink endpoint for its SSL Key and Certificate.

  • security.ssl.keystore-password: The secret to decrypt the keystore file.

  • security.ssl.key-password: The secret to decrypt the server key in the keystore.

  • security.ssl.truststore: The truststore file containing the public CA certificates to be used by flink endpoints to verify the peer’s certificate.

  • security.ssl.truststore-password: The secret to decrypt the truststore.

  • security.ssl.protocol: The SSL protocol version to be supported for the ssl transport (DEFAULT: TLSv1.2).

  • security.ssl.algorithms: The comma separated list of standard SSL algorithms to be supported. Read more here (DEFAULT: TLS_RSA_WITH_AES_128_CBC_SHA).

  • security.ssl.verify-hostname: Flag to enable peer’s hostname verification during ssl handshake (DEFAULT: true).

Network communication (via Netty)

These parameters allow for advanced tuning. The default values are sufficient when running concurrent high-throughput jobs on a large cluster.

  • taskmanager.net.num-arenas: The number of Netty arenas (DEFAULT: taskmanager.numberOfTaskSlots).

  • taskmanager.net.server.numThreads: The number of Netty server threads (DEFAULT: taskmanager.numberOfTaskSlots).

  • taskmanager.net.client.numThreads: The number of Netty client threads (DEFAULT: taskmanager.numberOfTaskSlots).

  • taskmanager.net.server.backlog: The netty server connection backlog.

  • taskmanager.net.client.connectTimeoutSec: The Netty client connection timeout (DEFAULT: 120 seconds).

  • taskmanager.net.sendReceiveBufferSize: The Netty send and receive buffer size. This defaults to the system buffer size (cat /proc/sys/net/ipv4/tcp_[rw]mem) and is 4 MiB in modern Linux.

  • taskmanager.net.transport: The Netty transport type, either “nio” or “epoll” (DEFAULT: nio).

ジョブマネージャーのWebフロントエンド

  • jobmanager.web.port: 実行中のジョブの状態と終了したジョブの実行時間のブレークダウンを表示するジョブマネージャーのwebインタフェースのポート (デフォルト: 8081)。この値を-1に設定するとwebフロントエンドを無効にします。

  • jobmanager.web.history: ジョブマネージャーのwebフロントエンドのヒストリ内の最新のジョブの数 (デフォルト: 5)。

  • jobmanager.web.checkpoints.disable: チェックポイントの統計を無効にします (デフォルト: false)。

  • jobmanager.web.checkpoints.history: 記憶するチェックポイントの統計の数 (デフォルト: 10)。

  • jobmanager.web.backpressure.cleanup-interval: アクセスされない場合にキャッシュされた統計が掃除される時間 (デフォルト: 600000, 10 分)。

  • jobmanager.web.backpressure.refresh-interval: 利用可能な統計が非推奨になり更新される必要がある時間 (デフォルト: 60000, 1 分)。

  • jobmanager.web.backpressure.num-samples: バックプレッシャーを決定するために取られるスタックトレースの標本の数 (デフォルト: 100)。

  • jobmanager.web.backpressure.delay-between-samples: バックプレッシャーを決定するためのスタックトレースの標本間の遅延 (デフォルト: 50, 50 ms).

  • jobmanager.web.ssl.enabled: Enable https access to the web frontend. This is applicable only when the global ssl flag security.ssl.enabled is set to true (DEFAULT: true).

ファイルシステム

パラメータは結果ファイルを生成するタスクの挙動を定義します。

  • fs.default-scheme: 接続するために必要な権限を持つ、使用されるデフォルトのファイルシステムスキーマ。例えば、HDFSの場合はネームノードの host:ポート (必要であれば)。デフォルトでは、これはローカルファイルシステムを示すfile:///に設定されます。このことは、明示的なスキーマの定義 無しに、ユーザ定義ファイルを検索するtあめにローカルファイルシステムが使われることを意味します。他のスキーマが(明示的に)ユーザが渡すURI内で指定されない場合にのみこのスキーマが使われます。

  • fs.overwrite-files: ファイルの出力ライターがデフォルトで既存のファイルを上書きするかどうかを指定します。デフォルトで上書きするためにはtrueに設定し、そうでなければfalse です。(デフォルト: false)

  • fs.output.always-create-directory: 並行度1以上で実行中のFile writerが出力ファイルパスのためのディレクトリを作成し、異なる結果ファイル(並行writerタスクごとに1つ)をディレクトリ内に置きます。このオプションが trueに設定された場合、並行度1のライターはディレクトリも生成し、その中に1つの結果ファイルを置くでしょう。オプションがfalseに設定された場合、ライターはディレクトリを生成せずに出力パスに直接ファイルを生成します。(デフォルト: false)

コンパイラ/オプティマイザ

  • compiler.delimited-informat.max-line-samples: 区切られた入力のためにコンパイラによって取られる行標本の最大数。標本はレコードの数を推測するために使われます。この値は入力フォーマットのパラメータを持つ特定の入力のために上書きすることができます (デフォルト: 10)。

  • compiler.delimited-informat.min-line-samples: 区切られた入力のためにコンパイラによって取られる行の標本の最小数。標本はレコードの数を推測するために使われます。この値は入力フォーマットのパラメータを持つ特定の入力のために上書きすることができます (デフォルト: 2)。

  • compiler.delimited-informat.max-sample-len: 区切られた入力のためにコンパイラによって取られる行標本の最大長。1つの標本の長さがこの値を超えた場合(パーサーの設定間違いによって起こり得ます)、標本化は異常終了します。この値は入力フォーマットのパラメータを持つ特定の入力のために上書きすることができます (デフォルト: 2097152 ( = 2Mバイト))。

ランタイム アルゴリズム

  • taskmanager.runtime.hashjoin-bloom-filters: ハイブリッド ハッシュ ジョイン実装の中でbloomフィルタを有効/無効にするフラグ。ハッシュ ジョインがディスクにこぼれる場合(データセットが確保されたメモリのフラクションより大きい)、これらのbloomフィルタはいくらかのCPUサイクルを犠牲にしてこぼれるレコードの数を大きく減らします。(デフォルト: false)

  • taskmanager.runtime.max-fan: 外部マージ ジョインのための最大論理入力と、こぼれているハッシュテーブルのための論理出力。オペレータあたりのファイルハンドルの数を制限しますが、あまりに小さく設定された場合中間のマージ/パーティションを起こすかも知れません (デフォルト: 128)。

  • taskmanager.runtime.sort-spilling-threshold: このメモリバジェットの割合が一杯になった時に、ソートオペレーションはこぼし始めます (デフォルト: 0.8)。

リソース マネージャ

この章での設定キーは使用されているリソース管理フレームワーク (YARN, Mesos, Standalone, …) に依存しません。

  • resourcemanager.rpc.port: リソースマネージャーとの通信のための接続するためのネットワークポートを定義する設定パラメータ。同じActorSystemが使われるため、デフォルトではジョブマネージャーのポート。ポート範囲を定義するためにこの設定キーを使うことはできません。

YARN

  • containerized.heap-cutoff-ratio: (Default 0.25) Percentage of heap space to remove from containers started by YARN. ユーザが各タスクマネージャーコンテナのためにある量のメモリ(例えば4GB)を要求した場合、JVMはヒープ外にもメモリを割り当てるため、この総量をJVMのための最大ヒープ空間として渡す (-Xmx 引数) ことはできません。YARN はリクエストしたよりも多くのメモリを使っているコンテナをkillすることにとても厳密です。したがって、安全マージンとしてリクエストされたヒープから15%のメモリを取り去ります。

  • containerized.heap-cutoff-min: (Default 600 MB) Minimum amount of memory to cut off the requested heap size.

  • yarn.maximum-failed-containers (デフォルト: リクエストされたコンテナの数)。障害時にsystemが再割り当てをしようとするコンテナの最大数。

  • yarn.application-attempts (デフォルト: 1). 再起動するアプリケーションマスターの数。Flinkクラスタ全体が再起動し、YARNクライアントは接続を失うだろうことに注意してください。また、ジョブマネージャーのアドレスは変わり、JMの host:port を手動で設定する必要があるでしょう。このオプションは1に設定したままにすることをお勧めします。

  • yarn.heartbeat-delay (デフォルト: 5 秒)。リソースマネージャーとのハートビート間の時間。

  • yarn.properties-file.location (デフォルト: 一時ディレクトリ)。FlinkのジョブがYARNにサブミットされた場合、Flinkクライアントが手に入れることができるように、ジョブマネージャーのホストと利用可能な処理スロットの数がプロパティファイルに書き込まれます。この設定パラメータを使ってそのファイルのデフォルトの場所を変更することができます (for example for environments sharing a Flink installation between users)

  • yarn.containers.vcores YARNコンテナあたりの仮想コア(vcores) の数。デフォルトでは、vcoresの数は設定されていればタスクマネージャーあたりのスロットの数に設定されます。そうでなければ1に設置衛されます。

  • containerized.master.env.ENV_VAR1=value Configuration values prefixed with containerized.master.env. will be passed as environment variables to the ApplicationMaster/JobManager process. 例えば、LD_LIBRARY_PATHをアプリケーションマスタに環境変数として渡すには、以下を設定します:

    containerized.master.env.LD_LIBRARY_PATH: "/usr/lib/native"

  • containerized.taskmanager.env. Similar to the configuration prefix about, this prefix allows setting custom environment variables for the TaskManager processes.

  • yarn.container-start-command-template: Flink uses the following template when starting on YARN: %java% %jvmmem% %jvmopts% %logging% %class% %args% %redirects%. This configuration parameter allows users to pass custom settings (such as JVM paths, arguments etc.). Note that in most cases, it is sufficient to use the env.java.opts setting, which is the %jvmopts% variable in the String.

  • yarn.application-master.port (デフォルト: 0、OSが短命ポートとして選択します) この設定オプションを使って、ユーザはアプリケーションマスタ(とジョブマネージャー) RPCポートのためのポート、ポートの範囲、あるいはポートのリストを指定することができます。デフォルトでは、オペレーティングシステムが適切なポートを選択できるようにデフォルト値 (0)を使うことをお勧めします。特に、複数のAMが同じ物理ホスト上で動作している場合は、固定ポートの割り当てはAMの開始を妨げるでしょう。

    例えば、制限されたファイアウォールを持つ環境上でYARN上でFlinkを実行する場合、このオプションを使って許可されたポートの範囲を指定することができます。

  • yarn.tags A comma-separated list of tags to apply to the Flink YARN application.

Mesos

  • mesos.initial-tasks: The initial workers to bring up when the master starts (DEFAULT: The number of workers specified at cluster startup).

  • mesos.maximum-failed-tasks: The maximum number of failed workers before the cluster fails (DEFAULT: Number of initial workers). May be set to -1 to disable this feature.

  • mesos.master: The Mesos master URL. The value should be in one of the following forms:
    • host:port
    • zk://host1:port1,host2:port2,.../path
    • zk://username:password@host1:port1,host2:port2,.../path
    • file:///path/to/file
  • mesos.failover-timeout: The failover timeout in seconds for the Mesos scheduler, after which running tasks are automatically shut down (DEFAULT: 600).

  • mesos.resourcemanager.artifactserver.port:The config parameter defining the Mesos artifact server port to use. Setting the port to 0 will let the OS choose an available port.

  • mesos.resourcemanager.framework.name: Mesos framework name (DEFAULT: Flink)

  • mesos.resourcemanager.framework.role: Mesos framework role definition (DEFAULT: *)

  • mesos.resourcemanager.framework.principal: Mesos framework principal (NO DEFAULT)

  • mesos.resourcemanager.framework.secret: Mesos framework secret (NO DEFAULT)

  • mesos.resourcemanager.framework.user: Mesos framework user (DEFAULT:””)

  • mesos.resourcemanager.artifactserver.ssl.enabled: Enables SSL for the Flink artifact server (DEFAULT: true). Note that security.ssl.enabled also needs to be set to true encryption to enable encryption.

  • mesos.resourcemanager.tasks.mem: Memory to assign to the Mesos workers in MB (DEFAULT: 1024)

  • mesos.resourcemanager.tasks.cpus: CPUs to assign to the Mesos workers (DEFAULT: 0.0)

  • mesos.resourcemanager.tasks.container.type: Type of the containerization used: “mesos” or “docker” (DEFAULT: mesos);

  • mesos.resourcemanager.tasks.container.image.name: Image name to use for the container (NO DEFAULT)

  • high-availability.zookeeper.path.mesos-workers: The ZooKeeper root path for persisting the Mesos worker information.

高可用性 (HA)

  • high-availability: Defines the high availability mode used for the cluster execution. Currently, Flink supports the following modes:
    • none (default): No high availability. A single JobManager runs and no JobManager state is checkpointed.
    • zookeeper: Supports the execution of multiple JobManagers and JobManager state checkpointing. ジョブマネージャーのグループの中で、ZooKeeperは1つのジョブマネージャーをクラスタの実行に責任ンがあるリーダーとして選出します。ジョブマネージャーが失敗した場合は、スタンドバイ ジョブマネージャーが新しいリーダーとして選出され、最後のチェックポイントのジョブマネージャーの状態が渡されます。In order to use the ‘zookeeper’ mode, it is mandatory to also define the high-availability.zookeeper.quorum configuration value.

Previously this key was named recovery.mode and the default value was standalone.

ZooKeeper-based HA Mode

  • high-availability.zookeeper.quorum: Defines the ZooKeeper quorum URL which is used to connect to the ZooKeeper cluster when the ‘zookeeper’ HA mode is selected. Previously this key was named recovery.zookeeper.quorum.

  • high-availability.zookeeper.path.root: (Default /flink) Defines the root dir under which the ZooKeeper HA mode will create namespace directories. Previously this ket was named recovery.zookeeper.path.root.

  • high-availability.zookeeper.path.namespace: (Default /default_ns in standalone cluster mode, or the under YARN) Defines the subdirectory under the root dir where the ZooKeeper HA mode will create znodes. This allows to isolate multiple applications on the same ZooKeeper. Previously this key was named `recovery.zookeeper.path.namespace`.

  • high-availability.zookeeper.path.latch: (Default /leaderlatch) Defines the znode of the leader latch which is used to elect the leader. Previously this key was named recovery.zookeeper.path.latch.

  • high-availability.zookeeper.path.leader: (Default /leader) Defines the znode of the leader which contains the URL to the leader and the current leader session ID. Previously this key was named recovery.zookeeper.path.leader.

  • high-availability.zookeeper.storageDir: Defines the directory in the state backend where the JobManager metadata will be stored (ZooKeeper only keeps pointers to it). HAのための要件Previously this key was named recovery.zookeeper.storageDir.

  • high-availability.zookeeper.client.session-timeout: (Default 60000) Defines the session timeout for the ZooKeeper session in ms. Previously this key was named recovery.zookeeper.client.session-timeout

  • high-availability.zookeeper.client.connection-timeout: (Default 15000) Defines the connection timeout for ZooKeeper in ms. Previously this key was named recovery.zookeeper.client.connection-timeout.

  • high-availability.zookeeper.client.retry-wait: (Default 5000) Defines the pause between consecutive retries in ms. Previously this key was named recovery.zookeeper.client.retry-wait.

  • high-availability.zookeeper.client.max-retry-attempts: (Default 3) Defines the number of connection retries before the client gives up. Previously this key was named recovery.zookeeper.client.max-retry-attempts.

  • high-availability.job.delay: (Default akka.ask.timeout) Defines the delay before persisted jobs are recovered in case of a master recovery situation. Previously this key was named recovery.job.delay.

  • high-availability.zookeeper.client.acl: (Default open) Defines the ACL (open creator) to be configured on ZK node. The configuration value can be set to “creator” if the ZooKeeper server configuration has the “authProvider” property mapped to use SASLAuthenticationProvider and the cluster is configured to run in secure mode (Kerberos). The ACL options are based on https://zookeeper.apache.org/doc/r3.1.2/zookeeperProgrammers.html#sc_BuiltinACLSchemes

ZooKeeper Security

  • zookeeper.sasl.disable: (Default: true) Defines if SASL based authentication needs to be enabled or disabled. The configuration value can be set to “true” if ZooKeeper cluster is running in secure mode (Kerberos).

  • zookeeper.sasl.service-name: (Default: zookeeper) If the ZooKeeper server is configured with a different service name (default:”zookeeper”) then it can be supplied using this configuration. A mismatch in service name between client and server configuration will cause the authentication to fail.

Kerberos-based Security

  • security.kerberos.login.use-ticket-cache: Indicates whether to read from your Kerberos ticket cache (default: true).

  • security.kerberos.login.keytab: Absolute path to a Kerberos keytab file that contains the user credentials.

  • security.kerberos.login.principal: Kerberos principal name associated with the keytab.

  • security.kerberos.login.contexts: A comma-separated list of login contexts to provide the Kerberos credentials to (for example, Client,KafkaClient to use the credentials for ZooKeeper authentication and for Kafka authentication).

環境

  • env.log.dir: (デフォルトは Flinkのホームの下のlogディレクトリ) Flinkのログが保存されるディレクトリを定義します。絶対パスでなければなりません。

  • env.ssh.opts: Additional command line options passed to SSH clients when starting or stopping JobManager, TaskManager, and Zookeeper services (start-cluster.sh, stop-cluster.sh, start-zookeeper-quorum.sh, stop-zookeeper-quorum.sh).

クエリ可能な状態

サーバ

  • query.server.enable: Enable queryable state (Default: true).

  • query.server.port: Port to bind queryable state server to (Default: 0, binds to random port).

  • query.server.network-threads: Number of network (Netty’s event loop) Threads for queryable state server (Default: 0, picks number of slots).

  • query.server.query-threads: Number of query Threads for queryable state server (Default: 0, picks number of slots).

Client

  • query.client.network-threads: Number of network (Netty’s event loop) Threads for queryable state client (Default: 0, picks number of available cores as returned by Runtime.getRuntime().availableProcessors()).

  • query.client.lookup.num-retries: Number of retries on KvState lookup failure due to unavailable JobManager (Default: 3).

  • query.client.lookup.retry-delay: Retry delay in milliseconds on KvState lookup failure due to unavailable JobManager (Default: 1000).

マトリックス

  • metrics.reporters: 名前付きのレポーターのリスト。つまり "foo,bar"。

  • metrics.reporter.<name>.<config>: <name>という名前のレポーターのための一般的な設定<config>

  • metrics.reporter.<name>.class: <name>という名前のレポーターが使うレポータークラス。

  • metrics.reporter.<name>.interval: <name>という名前のレポーターが使うレポーター間隔。

  • metrics.scope.jm: (デフォルト: <host>.jobmanager) ジョブマネージャーに作用する全てのメトリクスに適用される作用フォーマット文字列を定義します。

  • metrics.scope.jm.job: (デフォルト: <host>.jobmanager.<job_name>) ジョブマネージャー上のジョブに作用する全てのメトリクスに適用される作用フォーマット文字列を定義します。

  • metrics.scope.tm: (デフォルト: <host>.taskmanager.<tm_id>) タスクマネージャーに作用する全てのメトリクスに適用される作用フォーマット文字列を定義します。

  • metrics.scope.tm.job: (デフォルト: <host>.taskmanager.<tm_id>.<job_name>) タスクマネージャー上のジョブに作用する全てのメトリクスに適用される作用フォーマット文字列を定義します。

  • metrics.scope.tm.task: (Default: <host>.taskmanager.<tm_id>.<job_name>.<task_name>.<subtask_index>) タスクに作用する全てのメトリクスに適用される作用フォーマット文字列を定義します。

  • metrics.scope.tm.operator: (Default: <host>.taskmanager.<tm_id>.<job_name>.<operator_name>.<subtask_index>) Defines the scope format string that is applied to all metrics scoped to an operator.

  • metrics.latency.history-size: (Default: 128) Defines the number of measured latencies to maintain at each operator

背景

ネットワーク バッファの設定

java.io.IOException: Insufficient number of network buffersを見たことがある場合は、ネットワークバッファの数を調節するために以下の公式を使ってください:

#slots-per-TM^2 * #TMs * 4

#slots per TMタスクマネージャーあたりのスロットの数で、#TMs はタスクマネージャーの総数です。

ネットワークバッファは通信層のための重要なリソースです。それらはネットワーク上で伝送をする前にレコードをバッファするために使われ、入ってくるデータをレコードに分離してアプリケーションにそれらを渡す前にバッファするために使われます。良いスループットを達成するには、十分な数のネットワークバッファが重要です。

一般的に、同時に開いておきたい各論理ネットワーク接続が専用のバッファを持つのに十分なバッファを持つようにタスクマネージャーを設定します。論理ネットワーク接続はネットワーク上で各データのpoint-to-point交換のために存在します。これは一般的に再パーティショニング、あるいはブロードキャストの段階(シャッフル フェーズ)で起こります。それらの中で、タスクマネージャー内の各並行タスクは全ての他の平行タスクとやり取りをすることができなければなりません。したがって、タスクマネージャー上で必要なバッファの数は、 total-degree-of-parallelism (number of targets) * intra-node-parallelism (number of sources in one task manager) * n です。ここで、n は同時にどれだけの数の再パーティション/ブロードキャスト ステップが活性化すると予測するかを定義する定数です。

intra-node-parallelism は一般的にコアの数で、4つ以上の再パーティションあるいはブロードキャスト チャネルが同時に起こることは滅多に無いため、その頻度は#slots-per-TM^2 * #TMs * 4に落ち着きます。

例えば20 8スロットのマシーンのクラスタをサポートするには、最適なスループットのためにおよそ5000のネットワークバッファを使うべきです。

各ネットワークバッファはデフォルトで30Kバイトのサイズです。上の例では、システムはネットワークバッファのためにおよそ 300Mバイトを割り当てるでしょう。

ネットワークバッファの数とサイズは以下のパラメータを使って設定することができます:

  • taskmanager.network.numberOfBuffers
  • taskmanager.memory.segment-size

一時的な I/O ディレクトリの設定

Flink はできる限りメインメモリ内で処理することを目指していますが、利用可能なメモリよりもっと多くのデータが処理される必要があることは珍しいことではありません。Flinkのランタイムはこれらの状況を扱うために一時的なデータをディスクに書き込むように設計されています。

taskmanager.tmp.dirs パラメータはFlinkが一時ファイルを書き込むディレクトリのリストを指定します。ディレクトリのパスは ':' (コロン文字)によって分割されている必要があります。Flinkは1つの一時ファイルを各設定されたディレクトリへ(あるいは、から)同時に書き込む(読み込む)でしょう。このように、パフォーマンスを改善するためにハードディスクのような複数の非依存I/Oデバイスへ一時的なI/Oを結果的に分散することができます。高速なI/Oデバイス(例えば、SSD, RAID, NAS)を利用するために、1つのディレクトリを複数回指定することができます。

taskmanager.tmp.dirsパラメータが明示的に指定されない場合は、FlinkはLinuxシステムでの/tmp のようなオペレーティングシステムの一時ディレクトリへ一時データを書き込みます。

タスクマネージャーの処理スロットの設定

Flinkはプログラムをサブタスクに分割し、それらのサブタスクを処理スロットにスケジュールすることで、並行して実行します。

各Flinkタスクマネージャーはクラスタ内の処理スロットを提供します。スロットの数は一般的にタスクマネージャーの利用可能なCPUコアの数に比例します。一般的な推奨として、taskmanager.numberOfTaskSlotsについての良いデフォルトは利用可能なCPUコアの数です。

Flinkアプリケーションを開始する場合、ユーザはジョブに使うスロットのデフォルトの数を提供することができます。したがってコマンドラインの値は-p (for parallelism) と呼ばれます。In addition, it is possible to set the number of slots in the programming APIs for the whole application and for individual operators.

TOP
inserted by FC2 system