設定

シングルモードセットアップのFlinkについては、すぐに使える状態で、開始するためにデフォルトの設定を変更する必要はありません。

すぐに使える設定はデフォルトのJavaインストレーションを使うでしょう。使用するJavaランタイムを手動で上書きしたい場合は、環境変数JAVA_HOMEあるいはconf/flink-conf.yaml内で設定キーenv.java.homeを手動で設定することができます。

このページでは効率的(分散)インストレーションをセットアップするために代表的に必要な最も一般的なオプションをリスト化します。さらに全ての利用可能な設定パラメータの完全なリストをここでリスト化します。

全ての設定はconf/flink-conf.yaml内で行われます。これはkey: valueの形式を持つYAML キー 値のペア の平坦な集合が期待されます。

システムと実行スクリプトは開始時に設定をパースします。設定ファイルへの変更はFlinkのジョブマネージャーとタスクマネージャーの再起動を必要とします。

タスクマネージャーのための設定ファイルは異なるかも知れません。Flinkはクラスタ内の均一なマシーンを仮定しません。

一般的なオプション

  • env.java.home: 使用するJavaインストレーションへのパス (デフォルト: もしあれば、システムのデフォルトのJavaインストレーション)。もしスタートアップスクリプトが自動的にJavaホームディレクトリを解決するのに失敗した場合は指定する必要があります。特定のjavaインストレーションあるいはバージョンを指定することができます。このオプションが指定されなかった場合は、スタートアップスクリプトは$JAVA_HOME環境変数も評価します。

  • env.java.opts: 独自のJVMオプションを設定します。この値はFlinkのスタートスクリプト、ジョブマネージャーとタスクマネージャーの両方、およびFlinkのYARNクライアントによって尊重されます。これは異なるガベージコレクションの設定、あるいはリモートのデバッガーをFlinkのサービスを実行しているJVMに含めるために使うことができます。Enclosing options in double quotes delays parameter substitution allowing access to variables from Flink’s startup scripts. ジョブマネージャーあるいはタスクマネージャー固有のそれぞれのオプションのために、env.java.opts.jobmanagerenv.java.opts.taskmanagerを使います。

  • env.java.opts.jobmanager: ジョブマネージャー固有のJVMオプション。これらは通常のenv.java.optsに加えて使用されます。

  • env.java.opts.taskmanager: タスクマネージャー固有のJVMオプション。これらは通常のenv.java.optsに加えて使用されます。

  • jobmanager.rpc.address: ジョブマネージャーの外部アドレス。これは分散システムのマスター/コーディネータです (デフォルト: localhost)。注意: アドレス (ホスト名あるいはIP)はクライアントを含むすべてのノードからアクセス可能でなければなりません。

  • jobmanager.rpc.port: ジョブマネージャーのポート番号 (デフォルト: 6123)。

  • jobmanager.heap.mb: ジョブマネージャーのためのJVM ヒープサイズ (メガバイト単位)。(多くのオペレーターを持つ)とても大きなアプリケーションを実行しているか、それらの長い履歴を保持する場合は、ジョブマネージャーのヒープサイズを増加させるべきかも知れません。

  • taskmanager.heap.mb: タスクマネージャーのJVM ヒープサイズ (メガバイト単位)。これはsystemの平行ワーカーです。Hadoopとは対照的に、Flinkはオペレータ(例えばjoin, aggregat)とユーザ定義の関数(例えば Map, Reduce, CoGroup) をタスクマネージャー ( sorting/hashing/caching を含む)の中で実行します。つまりこの値はできるだけ大きくしなければなりません。もしクラスタが排他的にFlinkを実行する場合は、マシーンごとの利用可能な総メモリからオペレーティングシステムのメモリ(おそらく 1-2 GB) を引いたものが良い値です。YARN セットアップ上では、この値はタスクマネージャーのYARNコンテナのサイズに自動的に設定され、ある程度寛容できる値が引かれます。

  • taskmanager.numberOfTaskSlots: 1つのタスクマネージャーが実行できる、並行オペレータあるいはユーザ関数のインスタンスの数(デフォルト: 1)。値が1より大きい場合は、1つのシングルマネージャーは関数あるいはオペレータの複数のインスタンスを取ることができます。そのようにして、タスクマネージャーは複数のCPUコアを使うことができますが、同時に、利用可能なメモリは異なるオペレータあるいは関数のインスタンスの間で分配されます。この値は一般的にタスクマネージャーのマシーンが持つ物理CPUコアの数に比例します(例えば、コアの数に一致、あるいはコアの数の半分)。タスクスロットについてのより多くのこと

  • parallelism.default: 並行度が指定されていないプログラムのために使うデフォルトの並行度。(デフォルト: 1). 同時実行のジョブが無いセットアップについては、この値を NumTaskManagers * NumSlotsPerTaskManager にすることでシステムはプログラムの実行のために全ての利用可能な実行リソースを使うでしょう。注意: デフォルトの並行度はExecutionEnvironment上でsetParallelism(int parallelism)を呼び出すか、 -p <parallelism> をFlinkコマンドライン フロントエンに渡すことで上書きすることができます。一つの返還のためにオペレータ上でsetParallelism(int parallelism) を呼び出すことで上書きすることができます。並行度の詳細については 並行実行を見てください。

  • fs.default-scheme: 接続するために必要な権限を持つ、使用されるデフォルトのファイルシステムスキーマ。例えば、HDFSの場合はネームノードの host:ポート (必要であれば)。デフォルトでは、これはローカルファイルシステムを示すfile:///に設定されます。このことは、明示的なスキーマの定義 無しに、ユーザ定義ファイルを検索するtあめにローカルファイルシステムが使われることを意味します。他の例として、これがhdfs://localhost:9000/に設定された場合、/user/USERNAME/in.txtのような明示的なスキーマ定義を持たないユーザ定義のパスはhdfs://localhost:9000/user/USERNAME/in.txtに変換されるでしょう。他のスキーマがユーザ指定のURIの中で(明示的に)指定された場合のみ、このスキーマが使われます。

  • classloader.resolve-order: Whether Flink should use a child-first ClassLoader when loading user-code classes or a parent-first ClassLoader. Can be one of parent-first or child-first. (default: child-first)

  • classloader.parent-first-patterns: A (semicolon-separated) list of patterns that specifies which classes should always be resolved through the parent ClassLoader first. A pattern is a simple prefix that is checked against the fully qualified class name. By default, this is set to java.;org.apache.flink.;javax.annotation;org.slf4j;org.apache.log4j;org.apache.logging.log4j;ch.qos.logback. If you want to change this setting you have to make sure to also include the default patterns in your list of patterns if you want to keep that default behaviour.

より進んだオプション

Compute

  • taskmanager.compute.numa: When enabled a TaskManager is started on each NUMA node for each worker listed in conf/slaves (DEFAULT: false). Note: only supported when deploying Flink as a standalone cluster.

管理されるメモリ

By default, Flink allocates a fraction of 0.7 of the free memory (total memory configured via taskmanager.heap.mb minus memory used for network buffers) for its managed memory. 管理されたメモリはFlinkがバッチオペレーションを効率よく実施するのを助けます。Flinkはオペレーションを実行するために使うことができるメモリの量を知っているため、それはOutOfMemoryExceptionを避けます。Flinkが管理外のメモリを実行する場合は、ディスク空間を利用します。管理されているメモリを利用することで、いくつかのオペレーションはデータをJavaオブジェクトに変換する必要が無く生データ上で直接実施することができます。全体として管理されたメモリはシステムの堅牢さとスピードを改善します。

管理メモリのためのデフォルトの割合はtaskmanager.memory.fraction パラメータを使って調整可能です。絶対値はtaskmanager.memory.sizeを使って設定されるかもしれません (割合パラメータを上書きます)。必要であれば、管理メモリはJVMヒープ外に割り当てられるかもしれません。これは大きなメモリサイズを持つセットアップ内でのパフォーマンスを改善するかも知れません。

  • taskmanager.memory.size: タスクマネージャーがソート、ハッシュテーブル、および中間結果をキャッシュするためにヒープ上あるいはヒープ外(taskmanager.memory.off-heapに依存します)に確保する総メモリ量(メガバイト単位)。指定しない(-1)場合、メモリマネージャーは taskmanager.memory.fractionによって指定されたタスクマネージャーJVMのサイズに関する固定した割合を取るでしょう。(デフォルト: -1)

  • taskmanager.memory.fraction: The relative amount of memory (with respect to taskmanager.heap.mb, after subtracting the amount of memory used by network buffers) that the task manager reserves for sorting, hash tables, and caching of intermediate results. 例えば、0.8の値はタスクマネージャーが内部的なデータバッファのためにメモリの80% (ヒープ上あるいはヒープ外はtaskmanager.memory.off-heapに依存します)を確保し、フリーメモリの残り20%をユーザ定義関数によって生成されるオブジェクトのためのタスクマネージャーのヒープのために確保することを意味します。(デフォルト: 0.7) taskmanager.memory.size が設定されない場合は、このパラメータは評価のみされます。

  • taskmanager.memory.off-heap: trueに設定した場合、タスクマネージャーはソート、ハッシュテーブル および 中間結果のキャッシュのために使われるメモリをJVMヒープ外に割り当てます。大量のメモリを持つセットアップについては、これはメモリ上で実行さえるオペレーションの効率を改善することができます (デフォルト: false)。

  • taskmanager.memory.segment-size: メモリマネージャーとネットワークスタックによって使われるメモリバッファのバイトサイズ (デフォルト: 32768 (= 32 KiBytes))。

  • taskmanager.memory.preallocate: true あるいは falseのどちらかを設定することができます。タスクマネージャーが全ての管理メモリを開始時に割り当てるかどうかを指定します。(デフォルト: false). taskmanager.memory.off-heaptrueに設定された場合、この設定も trueに設定されることをお勧めします。もしこの設定が false に設定された場合、起動されたfull GCによって設定済みのJVMパラメータ MaxDirectMemorySize に近づいた時のみ、割り当てられたoffheapメモリの掃除が起こります。注意: ストリーミングセットアップについては、コアの状態バックエンドが現在のところ管理されたメモリを使わないため、この値をfalse に設定することをとてもお勧めします。

メモリとパフォーマンスのデバッグ

これらのオプションは、out-of-memory-process kill あるいは例外のような、問題に関係するメモリ及びガベージコレクションについてFlinkアプリケーションをデバッグするのに便利です。

  • taskmanager.debug.memory.startLogThread: タスクマネージャーが定期的のメモリとガベージコレクション統計を記録するようにします。この統計には、現在のヒープ、オフヒープ、および他のメモリプール利用と、ヒープメモリプールによってガベージコレクションに費やされた時間が含まれます。

  • taskmanager.debug.memory.logIntervalMs: タスクマネージャーがメモリおよびガベージコレクション統計を記録する時間間隔(ミリ秒単位)。taskmanager.debug.memory.startLogThread がtrueに設定された場合にのみ効果があります。

Kerberosベースのセキュリティ

Flinkは以下のサービスのためにKerberos認証をサポートします:

  • HDFS、YARN あるいは HBaseのようなHadoopコンポーネント (バージョン 2.6.1 以上; それ以外の全てのバージョンにはFlinkジョブが不意に失敗するかもしれない致命的なバグがあります)
  • Kafka コネクタ (バージョン 0.9+ 以上)
  • Zookeeper

KerberosセキュリティのためのFlinkの設定は、以下のサブセクションで別途説明される3つの特徴を伴います。

1. Kerberos証明書を持つクラスタを提供 (つまり、キータブあるいはkinitを使ったチケット)

Kerberos証明書を持つクラスタを提供するために、FlinkはKerberosキータブファイルあるいは kinitで管理されるチケットキャッシュの使用をサポートします。

  • security.kerberos.login.use-ticket-cache: Kerberosチケットキャッシュから読み込むかどうかを指示する (デフォルト: true)。

  • security.kerberos.login.keytab: ユーザ証明書を含むKerberosキータブファイルへの絶対パス。

  • security.kerberos.login.principal: キータブに関連するKerberos プリンシパル名。

security.kerberos.login.keytabsecurity.kerberos.login.principal の両方が提供された値を持つ場合、キータブは認証のために使われるでしょう。チケットの期限切れ問題を避けるために、長期間のジョブについてはキータブを使うことが望ましいです。チケットキャッシュを使うことが好きな場合は、Hadoop移譲トークンの生存期間の増加について管理者に話してください。

チケットキャッシュを使った認証は、FlinkがスタンドアローンあるいはYARNに配備された時のみサポートされることに注意してください。

2. 必要に応じてコンポーネントとコネクタに利用可能なKerberos証明書を作成

Hadoopコンポーネントについては、Hadoopセキュリティが有効かどうか(core-site.xml)に依存してHDFS、HBase およびほかのHadoopコンポーネントに接続する時に設定されたKerberos証明書が使われるべきかどうかを自動的に検知するでしょう。

JAAS設定ファイルを使用するコネクタあるいはコンポーネントについては、以下の設定を使ってそれぞれについてJAAS login コンテキストを設定することで利用可能なKerberos証明書を作成します:

  • security.kerberos.login.contexts: Kerberos証明書を提供するためのloginコンテキストのカンマ区切りのリスト (例えば、ZooKeeper認証およびKafka認証のための証明書を使うためのClient,KafkaClient)。

これにより異なるコネクタあるいはコンポーネントのためのKerberos認証を依存せずに有効にすることができます。例えば、ZooKeeperのためのKerberosの使用を必要とせずにHadoopセキュリティを有効にすることができます。逆もまた同様です。

Java SE Documentationで説明される機構を使って静的なJAAS設定ファイルも提供するかもしれません。それらのエントリは上の設定オプションで生成されたものを上書きします。

3. Kerberos認証を使うためにコンポーネント および/または コネクタを設定

最終的に、Kerberos認証を使う必要性に応じて、Flinkプログラムあるいはコンポーネント内でコネクタを設定するようにしてください。

以下はKerberos認証のためのFlinkによって現在のところファーストクラスがサポートされるコネクタあるいはコンポーネントのリストです:

  • Kafka: KafkaコネクタをKerberos認証を使うように設定する時の詳細はここを見てください。

  • Zookeeper (for HA): ここで述べられるKerberosベースのセキュリティ設定と連携するためのZookeeperセキュリティ設定についての詳細はここを見てください。

Flinkセキュリティが内部的にどうやってKerberos認証をセットアップするかについての詳細は、ここを見てください。

その他

  • taskmanager.tmp.dirs: テンポラリ ファイルのためのディレクトリ、あるいはシステムのディレクトリのデリミタ(例えばLinux/Unixでの ‘:’ (コロン))によって区切られたディレクトリのリスト。複数のディレクトリが指定された場合は、一時ファイルはラウンドロビンの形式でディレクトリに渡って分散されるでしょう。I/O マネージャーコンポーネントは、ディレクトリごとに1つの読み込みと1つの書き込みスレッドをspawnするでしょう。I/Oマネージャーが複数のスレッドを使うように、ディレクトリは複数回リスト化されるかもしれません (例えば、とても速いディスクあるいはRAIDに格納されます) (デフォルト: システムのtmpディレクトリ)。

  • taskmanager.log.path: タスクマネージャーのログファイルの場所を定義する設定パラメータ

  • jobmanager.web.address: JobManagerのwebインタフェース (デフォルト: anyLocalAddress())。

  • jobmanager.web.port: ジョブマネージャーのwebインタフェースのポート (デフォルト: 8081)。

  • jobmanager.web.tmpdir: この設定パラメータを使ってwebインタフェースによって使われるFlink webディレクトリを定義することができます。webインタフェースは静的ファイルをディレクトリにコピーするでしょう。また、上書きされない場合はアップロードされたジョブのjarはディレクトリ内に格納されます。デフォルトで、テンポラリディレクトリが使われます。

  • jobmanager.web.upload.dir: ジョブのjarをアップロードするためのディレクトリを定義する設定パラメータ。指定されない場合はjobmanager.web.tmpdirで指定されるディレクトリの下の動的なディレクトリが使われるでしょう。

  • fs.overwrite-files: ファイルの出力ライターがデフォルトで既存のファイルを上書きするかどうかを指定します。デフォルトで上書きするためにはtrueに設定し、そうでなければfalse です。(デフォルト: false)

  • fs.output.always-create-directory: 並行度1以上で実行中のFile writerが出力ファイルパスのためのディレクトリを作成し、異なる結果ファイル(並行writerタスクごとに1つ)をディレクトリ内に置きます。このオプションが trueに設定された場合、並行度1のライターはディレクトリも生成し、その中に1つの結果ファイルを置くでしょう。オプションがfalseに設定された場合、ライターはディレクトリを生成せずに出力パスに直接ファイルを生成します。(デフォルト: false)

  • taskmanager.network.memory.fraction: Fraction of JVM memory to use for network buffers. This determines how many streaming data exchange channels a TaskManager can have at the same time and how well buffered the channels are. If a job is rejected or you get a warning that the system has not enough buffers available, increase this value or the min/max values below. (DEFAULT: 0.1)

  • taskmanager.network.memory.min: Minimum memory size for network buffers in bytes (DEFAULT: 64 MB)

  • taskmanager.network.memory.max: Maximum memory size for network buffers in bytes (DEFAULT: 1 GB)

  • state.backend: チェックポイントが有効な場合、オペレータの状態のチェックポイントが格納されるために使われるであろうバックエンド。サポートされるバックエンド:
    • jobmanager: インメモリの状態、ジョブマネージャー/ZooKeeperのメモリにバックアップします。最小の状態(Kafkaオフセット)あるいはテストおよびローカルデバッギングのためにのみ使われるべきです。
    • filesystem: 状態はタスクマネージャー上のメモリ内にあり、状態のスナップショットはファイルシステムに格納されます。例えば HDFS、S3、… のFlinkによってサポートされる全てのファイルシステムによってサポートされます。
  • state.backend.fs.checkpointdir: Flinkがサポートするファイルシステム内に格納されるチェックポイントのためのディレクトリ。注意: 状態のバックエンドはジョブマネージャーからアクセス可能でなければなりません。ローカルセットアップのためにはfile:// のみ使います。

  • state.backend.rocksdb.checkpointdir: RocksDBファイルを格納するためのローカルディレクトリ、あるいはシステムのディレクトリ デリミタ(例えば、Linux/Unix上での ':' (コロン)) によって区切られたディレクトリのリスト。(デフォルトの値は taskmanager.tmp.dirs)

  • state.checkpoints.dir: externalized checkpointsのメタデータのための対象ディレクトリ。

  • state.checkpoints.num-retained: The number of completed checkpoint instances to retain. Having more than one allows recovery fallback to an earlier checkpoints if the latest checkpoint is corrupt. (Default: 1)

  • high-availability.zookeeper.storageDir: HAのために必要です。ジョブマネージャーのメタデータを格納するディレクトリ; これは状態バックエンドの中に保持され、この状態のポインタだけがZooKeeperに格納されます。チェックポイントのディレクトリと正しく同じで、ジョブマネージャーからアクセス可能でなければならず、ローカル配備のためだけに使われなければなりません。以前はこのキーはrecovery.zookeeper.storageDirという名前でした。

  • blob.storage.directory: TaskManager上で(ユーザのJARのような)blobを格納するためのディレクトリ。

  • blob.service.cleanup.interval: Cleanup interval (in seconds) of transient blobs at server and caches as well as permanent blobs at the caches (DEFAULT: 1 hour). Whenever a job is not referenced at the cache anymore, we set a TTL for its permanent blob files and let the periodic cleanup task (executed every blob.service.cleanup.interval seconds) remove them after this TTL has passed. We do the same for transient blob files at both server and caches but immediately after accessing them, i.e. an put or get operation. This means that a blob will be retained at most 2 * blob.service.cleanup.interval seconds after not being referenced anymore (permanent blobs) or their last access (transient blobs). For permanent blobs, this means that a recovery still has the chance to use existing files rather downloading them again.

  • blob.server.port: TaskManager上の(ユーザのJARを提供するような)blobサーバのポート定義。デフォルトでは、ポートは0に設定されます。これはオペレーティングシステムが短命ポートを選んだことを意味します。Flink はポートのリスト(“50100,50101”)、範囲(“50100-50200”) あるいはそれら両方の組み合わせも受け付けます。複数のジョブマネージャーが同じマシーン上で実行中の場合は、衝突を避けるためにポートの範囲を設定することをお勧めします。

  • blob.service.ssl.enabled: blob クライアント/サーバ 通信のためのsslを有効にするためのフラグ。グローバルなsslフラグ security.ssl.enabled がtrue (デフォルト: true) に設定されている時のみ適用可能です。

  • restart-strategy: デフォルト restart strategy はジョブのために何も再起動ストラテジが指定されていない場合に使われます。オプションは以下の通りです:
    • 固定遅延ストラテジ: fixed-delay.
    • 障害レートストラテジ: failure-rate.
    • 再起動無し: none

    Default value is none unless checkpointing is enabled for the job in which case the default is fixed-delay with Integer.MAX_VALUE restart attempts and 10s delay.

  • restart-strategy.fixed-delay.attempts: デフォルトの再起動ストラテジが"fixed-delay"に設定された場合に使われる再起動の試行の数。Default value is 1, unless “fixed-delay” was activated by enabling checkpoints, in which case the default is Integer.MAX_VALUE.

  • restart-strategy.fixed-delay.delay: デフォルトの再起動ストラテジが"fixed-delay"に設定された場合に使われる、再起動の試行間の遅延。(default: 1 s)

  • restart-strategy.failure-rate.max-failures-per-interval: "failure-rate"ストラテジでジョブが失敗するまでの、指定された時間内の再起動の最大数。デフォルト値は 1です。

  • restart-strategy.failure-rate.failure-rate-interval: "failure-rate"ストラテジでの失敗レートを計測するための時間間隔。デフォルト値は 1 秒です。

  • restart-strategy.failure-rate.delay: デフォルトの再起動ストラテジが"failure-rate"に設定された場合に使われる、再起動の試行間の遅延。デフォルト値はakka.ask.timeoutです。

完全なリファレンス

HDFS

Note: These keys are deprecated and it is recommended to configure the Hadoop path with the environment variable *HADOOP_CONF_DIR* instead.

これらのパラメータはFlinkによって使われるデフォルトのHDFSを設定します。HDFS設定を指定しないセットアップはHDFSファイルのフルパス (hdfs://address:port/path/to/files) を指定しなければなりません。ファイルもデフォルトのHDFSパラメータ (ブロックサイズ、リプリケーション ファクター)で書かれるでしょう。

  • fs.hdfs.hadoopconf: Hadoopファイルシステム(HDFS)の設定ディレクトリの絶対パス (任意の値)。この値を設定することで、プログラムはファイルのURI内にネームノードのアドレスをポートを含まない短いURL (hdfs:///path/to/filesを使ってHDFSファイルを参照することができます。このオプションが無い場合、HDFSはアクセスすることができますが、hdfs://address:port/path/to/filesのような完全装飾URIを必要とします。このオプションはファイルライターにブロックサイズとりプリケーションファクターのためのHDFSのデフォルト値を取り出すようにもします。Flinkは指定されたディレクトリの"core-site.xml"と"hdfs-site.xml"ファイルを調べないでしょう。

  • fs.hdfs.hdfsdefault: Hadoopの独自の設定ファイル"hdfs-default.xml"の絶対パス (デフォルト: null)。

  • fs.hdfs.hdfssite: Hadoopの独自の設定ファイル "hdfs-site.xml" の絶対パス (デフォルト: null)。

ジョブマネージャー と タスクマネージャー

以下のパラメータはFlinkのジョブマネージャーとタスクマネージャーを設定します。

  • jobmanager.rpc.address: ジョブマネージャーの外部アドレス。これは分散システムのマスター/コーディネータです (デフォルト: localhost)。注意: アドレス (ホスト名あるいはIP)はクライアントを含むすべてのノードからアクセス可能でなければなりません。

  • jobmanager.rpc.port: ジョブマネージャーのポート番号 (デフォルト: 6123)。

  • taskmanager.hostname: タスクマネージャーがバインドされているネットワークインタフェースのホスト名。デフォルトで、タスクマネージャーはジョブマネージャーと他のタスクマネージャーに接続することができるネットワークインタフェースを探します。ストラテジが何らかの理由で失敗した場合に、このオプションはホスト名を定義するために使うことができます。異なるタスクマネージャーはこのオプションのために異なる値を必要とするため、通常追加の共有されないタスクマネージャー固有の設定ファイルが指定されます。

  • taskmanager.rpc.port: タスクマネージャーの IPC ポート (デフォルト: 0、これはOSによって空いているポートが選択されます)。Flink はポートのリスト(“50100,50101”)、範囲(“50100-50200”) あるいはそれら両方の組み合わせも受け付けます。It is recommended to set a range of ports to avoid collisions when multiple TaskManagers are running on the same machine.

  • taskmanager.data.port: データ交換操作のために使われるタスクマネージャーのポート (デフォルト: 0、これはOSによって空いているポートが選択されます)。

  • taskmanager.data.ssl.enabled: タスクマネージャーのデータ転送のためにSSLサポートを有効にします。グローバルなsslフラグ security.ssl.enabled がtrue (デフォルト: true) に設定されている時のみ適用可能です。

  • jobmanager.heap.mb: ジョブマネージャーのためのJVM のヒープサイズ(メガバイト単位) (デフォルト: 256)。

  • taskmanager.heap.mb: タスクマネージャーのJVM ヒープサイズ (メガバイト単位)。これはsystemの平行ワーカーです。Hadoopとは対照的に、Flinkはオペレータ(例えばjoin, aggregat)とユーザ定義の関数(例えば Map, Reduce, CoGroup) をタスクマネージャー ( sorting/hashing/caching を含む)の中で実行します。つまりこの値はできるだけ大きくしなければなりません。YARN セットアップ上では、この値はタスクマネージャーのYARNコンテナのサイズに自動的に設定され、ある程度寛容できる値が引かれます。

  • taskmanager.numberOfTaskSlots: 1つのタスクマネージャーが実行できる、並行オペレータあるいはユーザ関数のインスタンスの数(デフォルト: 1)。値が1より大きい場合は、1つのシングルマネージャーは関数あるいはオペレータの複数のインスタンスを取ることができます。そのようにして、タスクマネージャーは複数のCPUコアを使うことができますが、同時に、利用可能なメモリは異なるオペレータあるいは関数のインスタンスの間で分配されます。この値は一般的にタスクマネージャーのマシーンが持つ物理CPUコアの数に比例します(例えば、コアの数に一致、あるいはコアの数の半分)。

  • taskmanager.tmp.dirs: テンポラリ ファイルのためのディレクトリ、あるいはシステムのディレクトリのデリミタ(例えばLinux/Unixでの ‘:’ (コロン))によって区切られたディレクトリのリスト。複数のディレクトリが指定された場合は、一時ファイルはラウンドロビンの形式でディレクトリに渡って分散されるでしょう。I/O マネージャーコンポーネントは、ディレクトリごとに1つの読み込みと1つの書き込みスレッドをspawnするでしょう。I/Oマネージャーが複数のスレッドを使うように、ディレクトリは複数回リスト化されるかもしれません (例えば、とても速いディスクあるいはRAIDに格納されます) (デフォルト: システムのtmpディレクトリ)。

  • taskmanager.network.memory.fraction: Fraction of JVM memory to use for network buffers. This determines how many streaming data exchange channels a TaskManager can have at the same time and how well buffered the channels are. If a job is rejected or you get a warning that the system has not enough buffers available, increase this value or the min/max values below. Also note, that taskmanager.network.memory.min and taskmanager.network.memory.max may override this fraction. (DEFAULT: 0.1)

  • taskmanager.network.memory.min: Minimum memory size for network buffers in bytes (DEFAULT: 64 MB). Previously, this was determined from taskmanager.network.numberOfBuffers and taskmanager.memory.segment-size.

  • taskmanager.network.memory.max: Maximum memory size for network buffers in bytes (DEFAULT: 1 GB). Previously, this was determined from taskmanager.network.numberOfBuffers and taskmanager.memory.segment-size.

  • taskmanager.network.numberOfBuffers (deprecated, replaced by the three parameters above): The number of buffers available to the network stack. この数値はタスクマネージャーがどれだけ多くのストリーミングデータ交換チャネルを同時に持つことができ、チャネルがどれほど良くバッファされるかを決定します。ジョブが却下された場合、あるいはシステムが利用可能なバッファを十分に持っていないと警告された場合は、この値を増やしてください (デフォルト: 2048)。If set, it will be mapped to taskmanager.network.memory.min and taskmanager.network.memory.max based on taskmanager.memory.segment-size.

  • taskmanager.memory.size: タスクマネージャーがソート、ハッシュテーブル、中間結果をキャッシュするためにJVMのヒープ上に確保する総メモリ量(メガバイト単位)。指定しない(-1)場合、メモリマネージャーは taskmanager.memory.fraction によって指定された、JVMに利用可能なヒープメモリの固定の割合を取るでしょう。(デフォルト: -1)

  • taskmanager.memory.fraction: The relative amount of memory (with respect to taskmanager.heap.mb, after subtracting the amount of memory used by network buffers) that the task manager reserves for sorting, hash tables, and caching of intermediate results. 例えば、0.8の値はタスクマネージャーが内部的なデータバッファのためにメモリの80% (ヒープ上あるいはヒープ外はtaskmanager.memory.off-heapに依存します)を確保し、フリーメモリの残り20%をユーザ定義関数によって生成されるオブジェクトのためのタスクマネージャーのヒープのために確保することを意味します。(デフォルト: 0.7) taskmanager.memory.size が設定されない場合は、このパラメータは評価のみされます。

  • taskmanager.debug.memory.startLogThread: タスクマネージャーが定期的のメモリとガベージコレクション統計を記録するようにします。この統計には、現在のヒープ、オフヒープ、および他のメモリプール利用と、ヒープメモリプールによってガベージコレクションに費やされた時間が含まれます。

  • taskmanager.debug.memory.logIntervalMs: タスクマネージャーがメモリおよびガベージコレクション統計を記録する時間間隔(ミリ秒単位)。taskmanager.debug.memory.startLogThread がtrueに設定された場合にのみ効果があります。

  • taskmanager.maxRegistrationDuration: タスクマネージャーの登録を間違うことができる最大回数を定義します。登録が成功しないまま持続期間を超えると、タスクマネージャーは終了します。最大登録持続期間は時間単位識別子 (ms/s/min/h/d) (たとえば “10 min”) を必要とします。(デフォルト: Inf)

  • taskmanager.initial-registration-pause: 2つの連続する登録の試行の間の初期の登録休止時間。休止時間が最大登録休止時間に達するまで、各登録の試行ごとの休止は2倍されます。初期の登録休止時間は時間単位識別子 (ms/s/min/h/d) (例えば “5 s”) を必要とします。(デフォルト: 500 ms)

  • taskmanager.max-registration-pause: 2つの連続する登録の試行の間の最大登録休止時間。最大登録休止時間は時間単位識別子 (ms/s/min/h/d) (例えば “5 s”) を必要とします。(デフォルト: 30 s)

  • taskmanager.refused-registration-pause: 接続に入る前にジョブマネージャーによって拒否された登録の後の休止時間。拒否された登録の休止時間は時間単位識別子 (ms/s/min/h/d) (例えば “5 s”) を必要とします。(デフォルト: 10 s)

  • taskmanager.jvm-exit-on-oom: タスク スレッドが OutOfMemoryErrorを投げる時にタスクマネージャーがすぐにJVMを終了するかどうかを示します (デフォルト: false)。

  • blob.fetch.retries: タスクマネージャーがジョブマネージャーから(JARファイルのような)BLOBをダウンロードする試行数 (デフォルト: 50)。

  • blob.fetch.num-concurrent: ジョブマネージャーが提供する(JARファイルダウンロードのような)BLOB同時取り込み数 (デフォルト: 50)。

  • blob.fetch.backlog: ジョブマネージャーが可能な(JARファイルダウンロードのような)キューされたBLOBの取り込み最大数 (デフォルト: 1000)。

  • task.cancellation-interval: 二つの連続するタスクの取り消しの試行間のミリ秒単位の間隔 (デフォルト: 30000)。

  • taskmanager.exit-on-fatal-akka-error: Whether the TaskManager shall be terminated in case of a fatal Akka error (quarantining event). (DEFAULT: false)

  • jobmanager.tdd.offload.minsize: Maximum size of the TaskDeploymentDescriptor’s serialized task and job information to still transmit them via RPC. Larger blobs may be offloaded to the BLOB server. (DEFAULT: 1 KiB).

分散の調整 (via Akka)

  • akka.ask.timeout: 全ての機能で使われ、Akka呼び出しをブロックするタイムアウト。Flinkがタイムアウトによって失敗する場合、この値を増やしてみるべきです。タイムアウトは遅いマシーンあるいは混雑したネットワークによっても起こり得ます。タイムアウトの値は time-単位記述子 (ms/s/min/h/d) を必要とします (デフォルト: 10 s)。

  • akka.lookup.timeout: ジョブマネージャーを検索するために使われるタイムアウト。タイムアウトの値はtime-単位識別子 (ms/s/min/h/d) を含む必要があります (デフォルト: 10 s)。

  • akka.client.timeout: Flinkクラスタと通信する時に、Flinkクライアントによって使われるタイムアウト (例えば、CliFrontend, ClusterClient)。タイムアウトの値は時間単位識別子 (ms/s/min/h/d) を含む必要があります (デフォルト: 60 s)。

  • akka.framesize: ジョブマネージャーとタスクマネージャー間で送信されるメッセージの最大サイズ。メッセージがこの制限を超えたためにFlinkが失敗する場合は、増やす必要があります。メッセージサイズはsize-単位識別子を必要とします (デフォルト: 10485760b)。

  • akka.watch.heartbeat.interval: AkkaのDeathWatch機構が死亡したタスクマネージャーを検知するためのハートビート間隔。If TaskManagers are wrongly marked dead because of lost or delayed heartbeat messages, then you should decrease this value or increase akka.watch.heartbeat.pause. A thorough description of Akka’s DeathWatch can be found here (DEFAULT: 10 s).

  • akka.watch.heartbeat.pause: AkkaのDeatchWatch機構が受け入れられるハートビートの休止。A low value does not allow an irregular heartbeat. If TaskManagers are wrongly marked dead because of lost or delayed heartbeat messages, then you should increase this value or decrease akka.watch.heartbeat.interval. Higher value increases the time to detect a dead TaskManager. A thorough description of Akka’s DeathWatch can be found here (DEFAULT: 60 s).

  • akka.watch.threshold: DeathWatchの障害検知の閾値。高い値は死亡したTaskManagerの検知するまでの時間が長くなるのに対し、低い値はfalse positiveになりがちです。AkkaのDeatchWatchの詳細な説明はここで見つけることができます (デフォルト: 12)。

  • akka.transport.heartbeat.interval: Akkaの転送障害検知のためのハートビート間隔。FlinkはTCPを使うため、検知は必要ではありません。したがって、間隔をとても大きな値にすることで検知を無効にします。転送障害検知必要とする場合は、間隔を意味のある値に設定します。間隔の値はtime-単位識別子 (ms/s/min/h/d) を必要とします (デフォルト: 1000 s)。

  • akka.transport.heartbeat.pause: Akkaの転送障害検知が受け入れられるハートビートの休止。FlinkはTCPを使うため、検知は必要ではありません。したがって、休止をとても大きな値にすることで検知を無効にします。転送障害検知必要とする場合は、休止を意味のある値に設定します。休止の値は time-単位記述子 (ms/s/min/h/d) を必要とします (デフォルト: 6000 s)。

  • akka.transport.threshold: 転送障害検知の閾値。FlinkはTCPを使うため検知は必要ありません。したがって閾値は大きな値に設定されます (デフォルト: 300)。

  • akka.tcp.timeout: 全ての外向きの接続のタイムアウト遅いネットワークのためにタスクマネージャーへの接続に問題を経験した場合は、この値を増やすべきです (デフォルト: 20 s)。

  • akka.throughput: スレッドをプールに返す前にバッチ内で処理されるメッセージの数。低い値は公平なスケジューリングを意味し、一方で高い値は不公平という代償を払ってパフォーマンスを改善することができます (デフォルト: 15)。

  • akka.log.lifecycle.events: イベントのAkkaのリモートログをオンにします。デバッグ時には、この値を'true'に設定します (デフォルト: false)。

  • akka.startup-timeout: リモートコンポーネントの起動が失敗したと見なされるタイムアウト (デフォルト: akka.ask.timeout)。

  • akka.ssl.enabled: Akkaのリモート通信のためのSSLをオンにします。グローバルなsslフラグ security.ssl.enabled がtrue (デフォルト: true) に設定されている時のみ適用可能です。

SSL設定

  • security.ssl.enabled: 内部的なネットワーク通信のためのSSLをオンにします。これは異なる通信モジュール内で定義されたフラグによって任意で上書きすることができます (デフォルト: false)。

  • security.ssl.keystore: flinkのエンドポイントによってSSLキーと証明書のために使われるJavaキーストアファイル。

  • security.ssl.keystore-password: キーストアファイルを複合化するための秘密鍵。

  • security.ssl.key-password: キーストア内のサーバキーを複合化するための秘密鍵。

  • security.ssl.truststore: flinkのエンドポイントによってピアの証明書を検証するために使われる公的なCA証明書を含むトラストストア ファイル。

  • security.ssl.truststore-password: トラストストアを複合化するための秘密鍵。

  • security.ssl.protocol: ssl通信でサポートされるSSLプロトコルのバージョン (デフォルト: TLSv1.2)。Note that it doesn’t support comma separated list.

  • security.ssl.algorithms: サポートされる標準SSLアルゴリズムのカンマ区切りのリスト。詳しくはここを読んでください (デフォルト: TLS_RSA_WITH_AES_128_CBC_SHA)。

  • security.ssl.verify-hostname: sslハンドシェイク時のピアのホスト名系ションを有効にするためのフラグ (デフォルト: true)。

ネットワーク通信 (via Netty)

これらのパラメータは進んだチューニングを許可します。大きなクラスタ上で同時高スループットを実行する場合はデフォルトの値で十分です。

  • taskmanager.net.num-arenas: Netty アリーナの数 (デフォルト: taskmanager.numberOfTaskSlots)。

  • taskmanager.net.server.numThreads: Netty サーバのスレッドの数 (デフォルト: taskmanager.numberOfTaskSlots)。

  • taskmanager.net.client.numThreads: Netty クライアントスレッドの数 (デフォルト: taskmanager.numberOfTaskSlots)。

  • taskmanager.net.server.backlog: netty サーバの接続のバックログ。

  • taskmanager.net.client.connectTimeoutSec: Netty クライアント接続のタイムアウト (デフォルト: 120 seconds)。

  • taskmanager.net.sendReceiveBufferSize: Netty の送信および受信バッファサイズ。これのデフォルトはシステムのバッファサイズ (cat /proc/sys/net/ipv4/tcp_[rw]mem) で、最新のLinuxでは 4 MiBです。

  • taskmanager.net.transport: Netty 転送の型。“nio” あるいは “epoll” (デフォルト: nio)。

Web Frontend

  • web.port: Port of the web interface that displays status of running jobs and execution time breakdowns of finished jobs (DEFAULT: 8081). この値を-1に設定するとwebフロントエンドを無効にします。

  • web.history: The number of latest jobs that the web front-end in its history (DEFAULT: 5).

  • web.checkpoints.disable: Disables checkpoint statistics (DEFAULT: false).

  • web.checkpoints.history: Number of checkpoint statistics to remember (DEFAULT: 10).

  • web.backpressure.cleanup-interval: Time after which cached stats are cleaned up if not accessed (DEFAULT: 600000, 10 mins).

  • web.backpressure.refresh-interval: Time after which available stats are deprecated and need to be refreshed (DEFAULT: 60000, 1 min).

  • web.backpressure.num-samples: Number of stack trace samples to take to determine back pressure (DEFAULT: 100).

  • web.backpressure.delay-between-samples: Delay between stack trace samples to determine back pressure (DEFAULT: 50, 50 ms).

  • web.ssl.enabled: Enable https access to the web frontend. グローバルなsslフラグ security.ssl.enabled がtrue (デフォルト: true) に設定されている時のみ適用可能です。

  • web.access-control-allow-origin: Enable custom access control parameter for allow origin header, default is *.

  • web.timeout: Timeout for asynchronous operation executed by the web frontend in milliseconds (DEFAULT: 10000, 10 s)

ファイルシステム

パラメータは結果ファイルを生成するタスクの挙動を定義します。

  • fs.default-scheme: 接続するために必要な権限を持つ、使用されるデフォルトのファイルシステムスキーマ。例えば、HDFSの場合はネームノードの host:ポート (必要であれば)。デフォルトでは、これはローカルファイルシステムを示すfile:///に設定されます。このことは、明示的なスキーマの定義 無しに、ユーザ定義ファイルを検索するtあめにローカルファイルシステムが使われることを意味します。他のスキーマが(明示的に)ユーザが渡すURI内で指定されない場合にのみこのスキーマが使われます。

  • fs.overwrite-files: ファイルの出力ライターがデフォルトで既存のファイルを上書きするかどうかを指定します。デフォルトで上書きするためにはtrueに設定し、そうでなければfalse です。(デフォルト: false)

  • fs.output.always-create-directory: 並行度1以上で実行中のFile writerが出力ファイルパスのためのディレクトリを作成し、異なる結果ファイル(並行writerタスクごとに1つ)をディレクトリ内に置きます。このオプションが trueに設定された場合、並行度1のライターはディレクトリも生成し、その中に1つの結果ファイルを置くでしょう。オプションがfalseに設定された場合、ライターはディレクトリを生成せずに出力パスに直接ファイルを生成します。(デフォルト: false)

コンパイラ/オプティマイザ

  • compiler.delimited-informat.max-line-samples: 区切られた入力のためにコンパイラによって取られる行標本の最大数。標本はレコードの数を推測するために使われます。この値は入力フォーマットのパラメータを持つ特定の入力のために上書きすることができます (デフォルト: 10)。

  • compiler.delimited-informat.min-line-samples: 区切られた入力のためにコンパイラによって取られる行の標本の最小数。標本はレコードの数を推測するために使われます。この値は入力フォーマットのパラメータを持つ特定の入力のために上書きすることができます (デフォルト: 2)。

  • compiler.delimited-informat.max-sample-len: 区切られた入力のためにコンパイラによって取られる行標本の最大長。1つの標本の長さがこの値を超えた場合(パーサーの設定間違いによって起こり得ます)、標本化は異常終了します。この値は入力フォーマットのパラメータを持つ特定の入力のために上書きすることができます (デフォルト: 2097152 ( = 2Mバイト))。

ランタイム アルゴリズム

  • taskmanager.runtime.hashjoin-bloom-filters: ハイブリッド ハッシュ ジョイン実装の中でbloomフィルタを有効/無効にするフラグ。ハッシュ ジョインがディスクにこぼれる場合(データセットが確保されたメモリのフラクションより大きい)、これらのbloomフィルタはいくらかのCPUサイクルを犠牲にしてこぼれるレコードの数を大きく減らします。(デフォルト: false)

  • taskmanager.runtime.max-fan: 外部マージ ジョインのための最大論理入力と、こぼれているハッシュテーブルのための論理出力。オペレータあたりのファイルハンドルの数を制限しますが、あまりに小さく設定された場合中間のマージ/パーティションを起こすかも知れません (デフォルト: 128)。

  • taskmanager.runtime.sort-spilling-threshold: このメモリバジェットの割合が一杯になった時に、ソートオペレーションはこぼし始めます (デフォルト: 0.8)。

リソース マネージャ

この章での設定キーは使用されているリソース管理フレームワーク (YARN, Mesos, Standalone, …) に依存しません。

  • resourcemanager.rpc.port: リソースマネージャーとの通信のための接続するためのネットワークポートを定義する設定パラメータ。同じActorSystemが使われるため、デフォルトではジョブマネージャーのポート。ポート範囲を定義するためにこの設定キーを使うことはできません。

YARN

  • containerized.heap-cutoff-ratio: (デフォルト 0.25) YARNによって開始されたコンテナから削除するヒープ空間のパーセンテージ。ユーザが各タスクマネージャーコンテナのためにある量のメモリ(例えば4GB)を要求した場合、JVMはヒープ外にもメモリを割り当てるため、この総量をJVMのための最大ヒープ空間として渡す (-Xmx 引数) ことはできません。YARN はリクエストしたよりも多くのメモリを使っているコンテナをkillすることにとても厳密です。Therefore, we remove this fraction of the memory from the requested heap as a safety margin and add it to the memory used off-heap.

  • containerized.heap-cutoff-min: (デフォルト 600 MB) リクエストされたヒープサイズから削除するメモリの最小量。

  • yarn.maximum-failed-containers (デフォルト: リクエストされたコンテナの数)。障害時にsystemが再割り当てをしようとするコンテナの最大数。

  • yarn.application-attempts (デフォルト: 1). 再起動するアプリケーションマスターの数。Flinkクラスタ全体が再起動し、YARNクライアントは接続を失うだろうことに注意してください。また、ジョブマネージャーのアドレスは変わり、JMの host:port を手動で設定する必要があるでしょう。このオプションは1に設定したままにすることをお勧めします。

  • yarn.heartbeat-delay (デフォルト: 5 秒)。リソースマネージャーとのハートビート間の時間。

  • yarn.properties-file.location (デフォルト: 一時ディレクトリ)。FlinkのジョブがYARNにサブミットされた場合、Flinkクライアントが手に入れることができるように、ジョブマネージャーのホストと利用可能な処理スロットの数がプロパティファイルに書き込まれます。この設定パラメータを使ってそのファイルのデフォルトの場所を変更することができます (for example for environments sharing a Flink installation between users)

  • yarn.containers.vcores YARNコンテナあたりの仮想コア(vcores) の数。デフォルトでは、vcoresの数は設定されていればタスクマネージャーあたりのスロットの数に設定されます。そうでなければ1に設置衛されます。

  • containerized.master.env.ENV_VAR1=value containerized.master.env.が頭につく設定値は、環境変数としてアプリケーションマスター/ジョブマネージャー プロセスに渡されるでしょう。例えば、LD_LIBRARY_PATHをアプリケーションマスタに環境変数として渡すには、以下を設定します:

    containerized.master.env.LD_LIBRARY_PATH: "/usr/lib/native"

  • containerized.taskmanager.env. 上の設定接頭語と似て、この接頭語はタスクマネージャープロセスのための独自の環境変数を設定することができます。

  • yarn.container-start-command-template: YARN上で開始する時にFlinkは以下のテンプレートを使用します: %java% %jvmmem% %jvmopts% %logging% %class% %args% %redirects%。この設定パラメータによりユーザは独自の設定 (JVMパスや引数などのような)を渡すことができます。ほとんどの場合において、env.java.opts の設定の使用で十分なことに注意してください。これは文字列内の %jvmopts% 変数です。

  • yarn.application-master.port (デフォルト: 0、OSが短命ポートとして選択します) この設定オプションを使って、ユーザはアプリケーションマスタ(とジョブマネージャー) RPCポートのためのポート、ポートの範囲、あるいはポートのリストを指定することができます。デフォルトでは、オペレーティングシステムが適切なポートを選択できるようにデフォルト値 (0)を使うことをお勧めします。特に、複数のAMが同じ物理ホスト上で動作している場合は、固定ポートの割り当てはAMの開始を妨げるでしょう。

    例えば、制限されたファイアウォールを持つ環境上でYARN上でFlinkを実行する場合、このオプションを使って許可されたポートの範囲を指定することができます。

  • yarn.tags Flink YARNアプリケーションへ適用するカンマ区切りのタグのリスト。

  • yarn.per-job-cluster.include-user-jar (Default: ORDER) Control whether and how the user-jar is included in the system class path for per-job clusters. Setting this parameter to DISABLED causes the jar to be included in the user class path instead. Setting this parameter to one of FIRST, LAST or ORDER causes the jar to be included in the system class path, with the jar either being placed at the beginning of the class path (FIRST), at the end (LAST), or based on the lexicographic order (ORDER).

Mesos

  • mesos.initial-tasks: マスタが開始した時に起動する初期ワーカー (デフォルト: クラスタの起動時に指定されたワーカーの数)。

  • mesos.constraints.hard.hostattribute: Constraints for task placement on mesos (DEFAULT: None).

  • mesos.maximum-failed-tasks: クラスタが故障するまでの失敗したワーカーの最大数 (デフォルト: 初期ワーカーの数)。この機能を無効にするために -1 を設定することができます。

  • mesos.master: Mesos マスターのURL。値は以下の形式のうちの1つでなければなりません:
    • host:port
    • zk://host1:port1,host2:port2,.../path
    • zk://username:password@host1:port1,host2:port2,.../path
    • file:///path/to/file
  • mesos.failover-timeout: Mesosスケジューラのためのフェイルオーバー タイムアウトの秒数。実行中のタスクはこの後で自動的にシャットダウンされます (デフォルト: 600)。

  • mesos.resourcemanager.artifactserver.port: 使用するMesos アーティファクト サーバのポートを定義する設定パラメータ。ポートを0に設定するとOSが利用可能なポートを選択するでしょう。

  • mesos.resourcemanager.framework.name: Mesos のフレームワーク名 (デフォルト: Flink)

  • mesos.resourcemanager.framework.role: Mesos フレームワークのロール定義 (デフォルト: *)

  • mesos.resourcemanager.framework.principal: Mesos フレームワーク プリンシパル (デフォルト無し)

  • mesos.resourcemanager.framework.secret: Mesos フレームワークの秘密鍵 (デフォルト無し)

  • mesos.resourcemanager.framework.user: Mesos フレームワークのユーザ (デフォルト:””)

  • mesos.resourcemanager.artifactserver.ssl.enabled: Flinkアーティファクト サーバのためのSSLを有効にします (デフォルト: true)。暗号化を有効にするためにsecurity.ssl.enabledtrue に設定する必要があることに注意してください。

  • mesos.resourcemanager.tasks.mem: Mesosワーカーに割り当てるメモリのMB数 (デフォルト: 1024)

  • mesos.resourcemanager.tasks.cpus: Mesosワーカーに割り当てるCPU (デフォルト: 0.0)

  • mesos.resourcemanager.tasks.container.type: 使用するコンテナ化の型: “mesos” あるいは “docker” (デフォルト: mesos);

  • mesos.resourcemanager.tasks.container.image.name: コンテナに使用するイメージ名 (デフォルト無し)

  • mesos.resourcemanager.tasks.container.volumes: A comma separated list of [host_path:]container_path[:RO|RW]. This allows for mounting additional volumes into your container. (NO DEFAULT)

  • high-availability.zookeeper.path.mesos-workers: Mesosワーカーの情報を持続するためのZooKeeperのルートパス。

高可用性 (HA)

  • high-availability: クラスタの実行のために使われる高可用性モードを定義します。現在のところ、Flinkは以下のモードをサポートします:
    • none (デフォルト): 高可用性無し。1つのジョブマネージャーを実行し、ジョブマネージャーの状態はチェックポイントされません。
    • zookeeper: 複数のジョブマネージャーの実行とジョブマネージャーの状態をチェックポイントをサポートします。ジョブマネージャーのグループの中で、ZooKeeperは1つのジョブマネージャーをクラスタの実行に責任ンがあるリーダーとして選出します。ジョブマネージャーが失敗した場合は、スタンドバイ ジョブマネージャーが新しいリーダーとして選出され、最後のチェックポイントのジョブマネージャーの状態が渡されます。'zookeeper' モードを使うには、high-availability.zookeeper.quorum 設定値も定義する必要があります。
  • high-availability.cluster-id: (Default /default_ns in standalone cluster mode, or the under YARN) Defines the subdirectory under the root dir where the ZooKeeper HA mode will create znodes. これにより同じZooKeeper上で複数のアプリケーションを隔離することができます。Previously this key was named `recovery.zookeeper.path.namespace` and `high-availability.zookeeper.path.namespace`.

以前はこのキーはrecovery.modeという名前で、デフォルトの値は standaloneでした。

ZooKeeperベースのHAモード

  • high-availability.zookeeper.quorum: 'zookeeper'のHAモードが選択された場合にZooKeeperクラスタに接続するために使われるZooKeeperの定員URLを定義します。以前はこのキーはrecovery.zookeeper.quorumという名前でした。

  • high-availability.zookeeper.path.root: (デフォルト v/flink) ZooKeeper HAモードが名前空間ディレクトリを作成する予定のルートディレクトリを定義します。以前はこのキーはrecovery.zookeeper.path.rootという名前でした。

  • high-availability.zookeeper.path.latch: (デフォルト /leaderlatch) リーダーを選出するために使われるリーダーラッチのznodeを定義します。以前はこのキーはrecovery.zookeeper.path.latchという名前でした。

  • high-availability.zookeeper.path.leader: (デフォルト /leader) リーダーへのURLと現在のリーダーのセッションIDを含むリーダーのznodeを定義します。以前はこのキーはrecovery.zookeeper.path.leaderという名前でした。

  • high-availability.storageDir: Defines the directory in the state backend where the JobManager metadata will be stored (ZooKeeper only keeps pointers to it). HAのための要件Previously this key was named recovery.zookeeper.storageDir and high-availability.zookeeper.storageDir.

  • high-availability.zookeeper.client.session-timeout: (デフォルト 60000) ZooKeeperのセッションについてのセッションのタイムアウトをmsで定義します。以前はこのキーはrecovery.zookeeper.client.session-timeoutという名前でした。

  • high-availability.zookeeper.client.connection-timeout: (デフォルト 15000) ZooKeeperについての接続タイムアウトをmsで定義します。以前はこのキーはrecovery.zookeeper.client.connection-timeoutという名前でした。

  • high-availability.zookeeper.client.retry-wait: (デフォルト 5000) 連続する試行間の休止をmsで定義します。以前はこのキーはrecovery.zookeeper.client.retry-waitという名前でした。

  • high-availability.zookeeper.client.max-retry-attempts: (デフォルト 3) クライアントが諦めるまでに試す接続の数を定義します。以前はこのキーはrecovery.zookeeper.client.max-retry-attemptsという名前でした。

  • high-availability.job.delay: (デフォルト akka.ask.timeout) マスターリカバリの状況の場合に、永続ジョブが回復されるまでの遅延を定義します。以前はこのキーはrecovery.job.delayという名前でした。

  • high-availability.zookeeper.client.acl: (Default open) Defines the ACL (open|creator) to be configured on ZK node. ZooKeeperサーバ設定が SASLAuthenticationProviderを使うようにマップされた “authProvider” プロパティを持ち、クラスタがセキュアモード (Kerberos)で動作するように設定されている場合、設定値を“creator”に設定することができます。ACL オプションは https://zookeeper.apache.org/doc/r3.1.2/zookeeperProgrammers.html#sc_BuiltinACLSchemes に基づいています

ZooKeeper セキュリティ

  • zookeeper.sasl.disable: (デフォルト: true) SASLベースの認証が有効あるいは無効にされる必要があるかどうかを定義します。ZooKeeperクラスタがセキュアモード (Kerberos)で動いている場合に、設定値を “true” に設定することができます。

  • zookeeper.sasl.service-name: (デフォルト: zookeeper) ZooKeeperサーバが異なるサービス名 (デフォルト:”zookeeper”) を使って設定されている場合、この設定を使って提供することができます。クライアントとサーバ設定の間のサービス名の不一致は認証を失敗させるでしょう。

Kerberosベースのセキュリティ

  • security.kerberos.login.use-ticket-cache: Kerberosチケットキャッシュから読み込むかどうかを指示する (デフォルト: true)。

  • security.kerberos.login.keytab: ユーザ証明書を含むKerberosキータブファイルへの絶対パス。

  • security.kerberos.login.principal: キータブに関連するKerberos プリンシパル名。

  • security.kerberos.login.contexts: Kerberos証明書を提供するためのloginコンテキストのカンマ区切りのリスト (例えば、ZooKeeper認証およびKafka認証のための証明書を使うためのClient,KafkaClient)。

環境

  • env.log.dir: (デフォルトは Flinkのホームの下のlogディレクトリ) Flinkのログが保存されるディレクトリを定義します。絶対パスでなければなりません。

  • env.log.max: (Default: 5) The maximum number of old log files to keep.

  • env.ssh.opts: JobManager, TaskManager および Zookeeper サービスを開始あるいは停止する時にSSHクライアントへ渡される追加のコマンドラインオプション (start-cluster.sh, stop-cluster.sh, start-zookeeper-quorum.sh, stop-zookeeper-quorum.sh)。

クエリ可能な状態

サーバ

  • query.server.enable: クエリ可能な状態を有効にする (デフォルト: true)。

  • query.server.port: クエリ可能な状態サーバを紐付けるためのポート (デフォルト: 0、ランダムなポートへ紐付けます)。

  • query.server.network-threads: クエリ可能な状態サーバのためのネットワーク (Nettyのイベントループ) スレッドの数 (デフォルト: 0、スロットの数を取り上げます)。

  • query.server.query-threads: クエリ可能な状態サーバのためのクエリスレッドの数 (デフォルト: 0、スロットの数を取り上げます)。

クライアント

  • query.client.network-threads: クエリ可能な状態クライアントのためのネットワーク (Nettyのイベントループ)の数 (デフォルト: 0Runtime.getRuntime().availableProcessors()によって帰る利用可能なコアの数を取り上げます)。

  • query.client.lookup.num-retries: 利用不可能なジョブマネージャーによりKvStateのルックアップが失敗する時の再試行の数 (デフォルト: 3)。

  • query.client.lookup.retry-delay: 利用不可能なジョブマネージャーによりKvStateのルックアップが失敗する時の再試行の遅延のミリ秒 (デフォルト: 1000)。

マトリックス

  • metrics.reporters: 名前付きのレポーターのリスト。つまり "foo,bar"。

  • metrics.reporter.<name>.<config>: <name>という名前のレポーターのための一般的な設定<config>

  • metrics.reporter.<name>.class: <name>という名前のレポーターが使うレポータークラス。

  • metrics.reporter.<name>.interval: <name>という名前のレポーターが使うレポーター間隔。

  • metrics.scope.jm: (デフォルト: <host>.jobmanager) ジョブマネージャーに作用する全てのメトリクスに適用される作用フォーマット文字列を定義します。

  • metrics.scope.jm.job: (デフォルト: <host>.jobmanager.<job_name>) ジョブマネージャー上のジョブに作用する全てのメトリクスに適用される作用フォーマット文字列を定義します。

  • metrics.scope.tm: (デフォルト: <host>.taskmanager.<tm_id>) タスクマネージャーに作用する全てのメトリクスに適用される作用フォーマット文字列を定義します。

  • metrics.scope.tm.job: (デフォルト: <host>.taskmanager.<tm_id>.<job_name>) タスクマネージャー上のジョブに作用する全てのメトリクスに適用される作用フォーマット文字列を定義します。

  • metrics.scope.task: (Default: <host>.taskmanager.<tm_id>.<job_name>.<task_name>.<subtask_index>) Defines the scope format string that is applied to all metrics scoped to a task.

  • metrics.scope.operator: (Default: <host>.taskmanager.<tm_id>.<job_name>.<operator_name>.<subtask_index>) Defines the scope format string that is applied to all metrics scoped to an operator.

  • metrics.latency.history-size: (デフォルト: 128) 各オペレータで維持するための測定されたレイテンシの数を定義します

ヒストリーサーバ

You have to configure jobmanager.archive.fs.dir in order to archive terminated jobs and add it to the list of monitored directories via historyserver.archive.fs.dir if you want to display them via the HistoryServer’s web frontend.

  • jobmanager.archive.fs.dir: Directory to upload information about terminated jobs to. You have to add this directory to the list of monitored directories of the history server via historyserver.archive.fs.dir.

  • historyserver.archive.fs.dir: Comma separated list of directories to fetch archived jobs from. The history server will monitor these directories for archived jobs. You can configure the JobManager to archive jobs to a directory via jobmanager.archive.fs.dir.

  • historyserver.archive.fs.refresh-interval: Interval in milliseconds for refreshing the archived job directories (DEFAULT: 10000).

  • historyserver.web.tmpdir: This configuration parameter allows defining the Flink web directory to be used by the history server web interface. The web interface will copy its static files into the directory (DEFAULT: local system temporary directory).

  • historyserver.web.address: Address of the HistoryServer’s web interface (DEFAULT: anyLocalAddress()).

  • historyserver.web.port: Port of the HistoryServers’s web interface (DEFAULT: 8082).

  • historyserver.web.ssl.enabled: Enable HTTPs access to the HistoryServer web frontend. This is applicable only when the global SSL flag security.ssl.enabled is set to true (DEFAULT: false).

背景

ネットワーク バッファの設定

If you ever see the Exception java.io.IOException: Insufficient number of network buffers, you need to adapt the amount of memory used for network buffers in order for your program to run on your task managers.

ネットワークバッファは通信層のための重要なリソースです。それらはネットワーク上で伝送をする前にレコードをバッファするために使われ、入ってくるデータをレコードに分離してアプリケーションにそれらを渡す前にバッファするために使われます。良いスループットを達成するには、十分な数のネットワークバッファが重要です。

Since Flink 1.3, you may follow the idiom "more is better" without any penalty on the latency (we prevent excessive buffering in each outgoing and incoming channel, i.e. *buffer bloat*, by limiting the actual number of buffers used by each channel).

一般的に、同時に開いておきたい各論理ネットワーク接続が専用のバッファを持つのに十分なバッファを持つようにタスクマネージャーを設定します。A logical network connection exists for each point-to-point exchange of data over the network, which typically happens at repartitioning or broadcasting steps (shuffle phase). それらの中で、タスクマネージャー内の各並行タスクは全ての他の平行タスクとやり取りをすることができなければなりません。

Note: Since Flink 1.5, network buffers will always be allocated off-heap, i.e. outside of the JVM heap, irrespective of the value of taskmanager.memory.off-heap. This way, we can pass these buffers directly to the underlying network stack layers.

Setting Memory Fractions

Previously, the number of network buffers was set manually which became a quite error-prone task (see below). Since Flink 1.3, it is possible to define a fraction of memory that is being used for network buffers with the following configuration parameters:

  • taskmanager.network.memory.fraction: Fraction of JVM memory to use for network buffers (DEFAULT: 0.1),
  • taskmanager.network.memory.min: Minimum memory size for network buffers in bytes (DEFAULT: 64 MB),
  • taskmanager.network.memory.max: Maximum memory size for network buffers in bytes (DEFAULT: 1 GB), and
  • taskmanager.memory.segment-size: Size of memory buffers used by the memory manager and the network stack in bytes (DEFAULT: 32768 (= 32 KiBytes)).

Setting the Number of Network Buffers directly

Note: This way of configuring the amount of memory used for network buffers is deprecated. Please consider using the method above by defining a fraction of memory to use.

The required number of buffers on a task manager is total-degree-of-parallelism (number of targets) * intra-node-parallelism (number of sources in one task manager) * n with n being a constant that defines how many repartitioning-/broadcasting steps you expect to be active at the same time. Since the intra-node-parallelism is typically the number of cores, and more than 4 repartitioning or broadcasting channels are rarely active in parallel, it frequently boils down to

#slots-per-TM^2 * #TMs * 4

#slots per TMタスクマネージャーあたりのスロットの数で、#TMs はタスクマネージャーの総数です。

To support, for example, a cluster of 20 8-slot machines, you should use roughly 5000 network buffers for optimal throughput.

各ネットワークバッファはデフォルトで30Kバイトのサイズです。In the example above, the system would thus allocate roughly 300 MiBytes for network buffers.

ネットワークバッファの数とサイズは以下のパラメータを使って設定することができます:

  • taskmanager.network.numberOfBuffers
  • taskmanager.memory.segment-size

一時的な I/O ディレクトリの設定

Flink はできる限りメインメモリ内で処理することを目指していますが、利用可能なメモリよりもっと多くのデータが処理される必要があることは珍しいことではありません。Flinkのランタイムはこれらの状況を扱うために一時的なデータをディスクに書き込むように設計されています。

taskmanager.tmp.dirs パラメータはFlinkが一時ファイルを書き込むディレクトリのリストを指定します。ディレクトリのパスは ':' (コロン文字)によって分割されている必要があります。Flinkは1つの一時ファイルを各設定されたディレクトリへ(あるいは、から)同時に書き込む(読み込む)でしょう。このように、パフォーマンスを改善するためにハードディスクのような複数の非依存I/Oデバイスへ一時的なI/Oを結果的に分散することができます。高速なI/Oデバイス(例えば、SSD, RAID, NAS)を利用するために、1つのディレクトリを複数回指定することができます。

taskmanager.tmp.dirsパラメータが明示的に指定されない場合は、FlinkはLinuxシステムでの/tmp のようなオペレーティングシステムの一時ディレクトリへ一時データを書き込みます。

タスクマネージャーの処理スロットの設定

Flinkはプログラムをサブタスクに分割し、それらのサブタスクを処理スロットにスケジュールすることで、並行して実行します。

各Flinkタスクマネージャーはクラスタ内の処理スロットを提供します。スロットの数は一般的にタスクマネージャーの利用可能なCPUコアの数に比例します。一般的な推奨として、taskmanager.numberOfTaskSlotsについての良いデフォルトは利用可能なCPUコアの数です。

Flinkアプリケーションを開始する場合、ユーザはジョブに使うスロットのデフォルトの数を提供することができます。したがってコマンドラインの値は-p (for parallelism) と呼ばれます。さらに、アプリケーション全体と個々のオペレータのためにプログラミングAPIの中でスロットの数を設定することができます。

上に戻る

TOP
inserted by FC2 system