設定

シングルモードセットアップのFlinkについては、すぐに使える状態で、開始するためにデフォルトの設定を変更する必要はありません。

すぐに使える設定はデフォルトのJavaインストレーションを使うでしょう。使用するJavaランタイムを手動で上書きしたい場合は、環境変数JAVA_HOMEあるいはconf/flink-conf.yaml内で設定キーenv.java.homeを手動で設定することができます。

このページでは効率的(分散)インストレーションをセットアップするために代表的に必要な最も一般的なオプションをリスト化します。さらに全ての利用可能な設定パラメータの完全なリストをここでリスト化します。

全ての設定はconf/flink-conf.yaml内で行われます。これはkey: valueの形式を持つYAML キー 値のペア の平坦な集合が期待されます。

システムと実行スクリプトは開始時に設定をパースします。設定ファイルへの変更はFlinkのジョブマネージャーとタスクマネージャーの再起動を必要とします。

タスクマネージャーのための設定ファイルは異なるかも知れません。Flinkはクラスタ内の均一なマシーンを仮定しません。

一般的なオプション

  • env.java.home: 使用するJavaインストレーションへのパス (デフォルト: もしあれば、システムのデフォルトのJavaインストレーション)。もしスタートアップスクリプトが自動的にJavaホームディレクトリを解決するのに失敗した場合は指定する必要があります。特定のjavaインストレーションあるいはバージョンを指定することができます。このオプションが指定されなかった場合は、スタートアップスクリプトは$JAVA_HOME環境変数も評価します。

  • env.java.opts: 独自のJVMオプションを設定します。この値はFlinkのスタートスクリプト、ジョブマネージャーとタスクマネージャーの両方、およびFlinkのYARNクライアントによって尊重されます。これは異なるガベージコレクションの設定、あるいはリモートのデバッガーをFlinkのサービスを実行しているJVMに含めるために使うことができます。ジョブマネージャーあるいはタスクマネージャー固有のそれぞれのオプションのために、env.java.opts.jobmanagerenv.java.opts.taskmanagerを使います。

  • env.java.opts.jobmanager: ジョブマネージャー固有のJVMオプション。これらは通常のenv.java.optsに加えて使用されます。この設定オプションはYARNクライアントによって無視されます。

  • env.java.opts.taskmanager: タスクマネージャー固有のJVMオプション。これらは通常のenv.java.optsに加えて使用されます。この設定オプションはYARNクライアントによって無視されます。

  • jobmanager.rpc.address: ジョブマネージャーの外部アドレス。これは分散システムのマスター/コーディネータです (デフォルト: localhost)。注意: アドレス (ホスト名あるいはIP)はクライアントを含むすべてのノードからアクセス可能でなければなりません。

  • jobmanager.rpc.port: ジョブマネージャーのポート番号 (デフォルト: 6123)。

  • jobmanager.heap.mb: ジョブマネージャーのためのJVM ヒープサイズ (メガバイト単位)。(多くのオペレーターを持つ)とても大きなアプリケーションを実行しているか、それらの長い履歴を保持する場合は、ジョブマネージャーのヒープサイズを増加させるべきかも知れません。

  • taskmanager.heap.mb: タスクマネージャーのJVM ヒープサイズ (メガバイト単位)。これはsystemの平行ワーカーです。Hadoopとは対照的に、Flinkはオペレータ(例えばjoin, aggregat)とユーザ定義の関数(例えば Map, Reduce, CoGroup) をタスクマネージャー ( sorting/hashing/caching を含む)の中で実行します。つまりこの値はできるだけ大きくしなければなりません。もしクラスタが排他的にFlinkを実行する場合は、マシーンごとの利用可能な総メモリからオペレーティングシステムのメモリ(おそらく 1-2 GB) を引いたものが良い値です。YARN セットアップ上では、この値はタスクマネージャーのYARNコンテナのサイズに自動的に設定され、ある程度寛容できる値が引かれます。

  • taskmanager.numberOfTaskSlots: 1つのタスクマネージャーが実行できる、並行オペレータあるいはユーザ関数のインスタンスの数(デフォルト: 1)。値が1より大きい場合は、1つのシングルマネージャーは関数あるいはオペレータの複数のインスタンスを取ることができます。そのようにして、タスクマネージャーは複数のCPUコアを使うことができますが、同時に、利用可能なメモリは異なるオペレータあるいは関数のインスタンスの間で分配されます。この値は一般的にタスクマネージャーのマシーンが持つ物理CPUコアの数に比例します(例えば、コアの数に一致、あるいはコアの数の半分)。タスクスロットについてのより多くのこと

  • parallelism.default: 並行度が指定されていないプログラムのために使うデフォルトの並行度。(デフォルト: 1). 同時実行のジョブが無いセットアップについては、この値を NumTaskManagers * NumSlotsPerTaskManager にすることでシステムはプログラムの実行のために全ての利用可能な実行リソースを使うでしょう。注意: デフォルトの並行度はExecutionEnvironment上でsetParallelism(int parallelism)を呼び出すか、 -p <parallelism> をFlinkコマンドライン フロントエンに渡すことで上書きすることができます。一つの返還のためにオペレータ上でsetParallelism(int parallelism) を呼び出すことで上書きすることができます。並行度の詳細については 並行実行を見てください。

  • fs.default-scheme: 接続するために必要な権限を持つ、使用されるデフォルトのファイルシステムスキーマ。例えば、HDFSの場合はネームノードの host:ポート (必要であれば)。デフォルトでは、これはローカルファイルシステムを示すfile:///に設定されます。このことは、明示的なスキーマの定義 無しに、ユーザ定義ファイルを検索するtあめにローカルファイルシステムが使われることを意味します。他の例として、これがhdfs://localhost:9000/に設定された場合、/user/USERNAME/in.txtのような明示的なスキーマ定義を持たないユーザ定義のパスはhdfs://localhost:9000/user/USERNAME/in.txtに変換されるでしょう。他のスキーマがユーザ指定のURIの中で(明示的に)指定された場合のみ、このスキーマが使われます。

  • fs.hdfs.hadoopconf: Hadoopファイルシステム(HDFS)の設定ディレクトリの絶対パス (任意の値)。この値を設定することで、プログラムはファイルのURI内にネームノードのアドレスをポートを含まない短いURL (hdfs:///path/to/filesを使ってHDFSファイルを参照することができます。このオプションが無い場合、HDFSはアクセスすることができますが、hdfs://address:port/path/to/filesのような完全装飾URIを必要とします。このオプションはファイルライターにブロックサイズとりプリケーションファクターのためのHDFSのデフォルト値を取り出すようにもします。Flinkは指定されたディレクトリの"core-site.xml"と"hdfs-site.xml"ファイルを調べないでしょう。

より進んだオプション

管理されるメモリ

デフォルトで、Flinkは管理しているメモリについてtaskmanager.heap.mbを使って設定された総メモリの 0.7 の割合を割り当てます。管理されたメモリはFlinkがバッチオペレーションを効率よく実施するのを助けます。Flinkはオペレーションを実行するために使うことができるメモリの量を知っているため、それはOutOfMemoryExceptionを避けます。Flinkが管理外のメモリを実行する場合は、ディスク空間を利用します。管理されているメモリを利用することで、いくつかのオペレーションはデータをJavaオブジェクトに変換する必要が無く生データ上で直接実施することができます。全体として管理されたメモリはシステムの堅牢さとスピードを改善します。

管理メモリのためのデフォルトの割合はtaskmanager.memory.fraction パラメータを使って調整可能です。絶対値はtaskmanager.memory.sizeを使って設定されるかもしれません (割合パラメータを上書きます)。必要であれば、管理メモリはJVMヒープ外に割り当てられるかもしれません。これは大きなメモリサイズを持つセットアップ内でのパフォーマンスを改善するかも知れません。

  • taskmanager.memory.size: タスクマネージャーがソート、ハッシュテーブル、および中間結果をキャッシュするためにヒープ上あるいはヒープ外(taskmanager.memory.off-heapに依存します)に確保する総メモリ量(メガバイト単位)。指定しない(-1)場合、メモリマネージャーは taskmanager.memory.fractionによって指定されたタスクマネージャーJVMのサイズに関する固定した割合を取るでしょう。(デフォルト: -1)

  • taskmanager.memory.fraction: タスクマネージャーがソート、ハッシュテーブルおよび中間結果をキャッシュするために確保する、(taskmanager.heap.mbに関する)相対的なメモリ量。例えば、0.8の値はタスクマネージャーが内部的なデータバッファのためにメモリの80% (ヒープ上あるいはヒープ外はtaskmanager.memory.off-heapに依存します)を確保し、フリーメモリの残り20%をユーザ定義関数によって生成されるオブジェクトのためのタスクマネージャーのヒープのために確保することを意味します。(デフォルト: 0.7) taskmanager.memory.size が設定されない場合は、このパラメータは評価のみされます。

  • taskmanager.memory.off-heap: trueに設定した場合、タスクマネージャーはソート、ハッシュテーブル および 中間結果のキャッシュのために使われるメモリをJVMヒープ外に割り当てます。大量のメモリを持つセットアップについては、これはメモリ上で実行さえるオペレーションの効率を改善することができます (デフォルト: false)。

  • taskmanager.memory.segment-size: メモリマネージャーとネットワークスタックによって使われるメモリバッファのバイトサイズ (デフォルト: 32768 (= 32 KiBytes))。

  • taskmanager.memory.preallocate: true あるいは falseのどちらかを設定することができます。タスクマネージャーが全ての管理メモリを開始時に割り当てるかどうかを指定します。(デフォルト: false). taskmanager.memory.off-heaptrueに設定された場合、この設定も trueに設定されることをお勧めします。もしこの設定が false に設定された場合、起動されたfull GCによって設定済みのJVMパラメータ MaxDirectMemorySize に近づいた時のみ、割り当てられたoffheapメモリの掃除が起こります。注意: ストリーミングセットアップについては、コアの状態バックエンドが現在のところ管理されたメモリを使わないため、この値をfalse に設定することをとてもお勧めします。

メモリとパフォーマンスのデバッグ

これらのオプションは、out-of-memory-process kill あるいは例外のような、問題に関係するメモリ及びガベージコレクションについてFlinkアプリケーションをデバッグするのに便利です。

  • taskmanager.debug.memory.startLogThread: タスクマネージャーが定期的のメモリとガベージコレクション統計を記録するようにします。この統計には、現在のヒープ、オフヒープ、および他のメモリプール利用と、ヒープメモリプールによってガベージコレクションに費やされた時間が含まれます。

  • taskmanager.debug.memory.logIntervalMs: タスクマネージャーがメモリおよびガベージコレクション統計を記録する時間間隔(ミリ秒単位)。taskmanager.debug.memory.startLogThread がtrueに設定された場合にのみ効果があります。

Kerberosベースのセキュリティ

Flinkは以下のサービスのためにKerberos認証をサポートします:

  • HDFS、YARN あるいは HBaseのようなHadoopコンポーネント (バージョン 2.6.1 以上; それ以外の全てのバージョンにはFlinkジョブが不意に失敗するかもしれない致命的なバグがあります)
  • Kafka コネクタ (バージョン 0.9+ 以上)
  • Zookeeper

KerberosセキュリティのためのFlinkの設定は、以下のサブセクションで別途説明される3つの特徴を伴います。

1. Kerberos証明書を持つクラスタを提供 (つまり、キータブあるいはkinitを使ったチケット)

Kerberos証明書を持つクラスタを提供するために、FlinkはKerberosキータブファイルあるいは kinitで管理されるチケットキャッシュの使用をサポートします。

  • security.kerberos.login.use-ticket-cache: Kerberosチケットキャッシュから読み込むかどうかを指示する (デフォルト: true)。

  • security.kerberos.login.keytab: ユーザ証明書を含むKerberosキータブファイルへの絶対パス。

  • security.kerberos.login.principal: キータブに関連するKerberos プリンシパル名。

security.kerberos.login.keytabsecurity.kerberos.login.principal の両方が提供された値を持つ場合、キータブは認証のために使われるでしょう。チケットの期限切れ問題を避けるために、長期間のジョブについてはキータブを使うことが望ましいです。チケットキャッシュを使うことが好きな場合は、Hadoop移譲トークンの生存期間の増加について管理者に話してください。

チケットキャッシュを使った認証は、FlinkがスタンドアローンあるいはYARNに配備された時のみサポートされることに注意してください。

2. 必要に応じてコンポーネントとコネクタに利用可能なKerberos証明書を作成

Hadoopコンポーネントについては、Hadoopセキュリティが有効かどうか(core-site.xml)に依存してHDFS、HBase およびほかのHadoopコンポーネントに接続する時に設定されたKerberos証明書が使われるべきかどうかを自動的に検知するでしょう。

JAAS設定ファイルを使用するコネクタあるいはコンポーネントについては、以下の設定を使ってそれぞれについてJAAS login コンテキストを設定することで利用可能なKerberos証明書を作成します:

  • security.kerberos.login.contexts: Kerberos証明書を提供するためのloginコンテキストのカンマ区切りのリスト (例えば、ZooKeeper認証およびKafka認証のための証明書を使うためのClient,KafkaClient)。

これにより異なるコネクタあるいはコンポーネントのためのKerberos認証を依存せずに有効にすることができます。例えば、ZooKeeperのためのKerberosの使用を必要とせずにHadoopセキュリティを有効にすることができます。逆もまた同様です。

Java SE Documentationで説明される機構を使って静的なJAAS設定ファイルも提供するかもしれません。それらのエントリは上の設定オプションで生成されたものを上書きします。

3. Kerberos認証を使うためにコンポーネント および/または コネクタを設定

最終的に、Kerberos認証を使う必要性に応じて、Flinkプログラムあるいはコンポーネント内でコネクタを設定するようにしてください。

以下はKerberos認証のためのFlinkによって現在のところファーストクラスがサポートされるコネクタあるいはコンポーネントのリストです:

  • Kafka: KafkaコネクタをKerberos認証を使うように設定する時の詳細はここを見てください。

  • Zookeeper (for HA): ここで述べられるKerberosベースのセキュリティ設定と連携するためのZookeeperセキュリティ設定についての詳細はここを見てください。

Flinkセキュリティが内部的にどうやってKerberos認証をセットアップするかについての詳細は、ここを見てください。

その他

  • taskmanager.tmp.dirs: テンポラリ ファイルのためのディレクトリ、あるいはシステムのディレクトリのデリミタ(例えばLinux/Unixでの ‘:’ (コロン))によって区切られたディレクトリのリスト。複数のディレクトリが指定された場合は、一時ファイルはラウンドロビンの形式でディレクトリに渡って分散されるでしょう。I/O マネージャーコンポーネントは、ディレクトリごとに1つの読み込みと1つの書き込みスレッドをspawnするでしょう。A directory may be listed multiple times to have the I/O manager use multiple threads for it (for example if it is physically stored on a very fast disc or RAID) (DEFAULT: The system’s tmp dir).

  • taskmanager.log.path: タスクマネージャーのログファイルの場所を定義する設定パラメータ

  • jobmanager.web.address: JobManagerのwebインタフェース (デフォルト: anyLocalAddress())。

  • jobmanager.web.port: ジョブマネージャーのwebインタフェースのポート (デフォルト: 8081)。

  • jobmanager.web.tmpdir: この設定パラメータを使ってwebインタフェースによって使われるFlink webディレクトリを定義することができます。webインタフェースは静的ファイルをディレクトリにコピーするでしょう。また、上書きされない場合はアップロードされたジョブのjarはディレクトリ内に格納されます。デフォルトで、テンポラリディレクトリが使われます。

  • jobmanager.web.upload.dir: ジョブのjarをアップロードするためのディレクトリを定義する設定パラメータ。指定されない場合はjobmanager.web.tmpdirで指定されるディレクトリの下の動的なディレクトリが使われるでしょう。

  • fs.overwrite-files: ファイルの出力ライターがデフォルトで既存のファイルを上書きするかどうかを指定します。デフォルトで上書きするためにはtrueに設定し、そうでなければfalse です。(デフォルト: false)

  • fs.output.always-create-directory: 並行度1以上で実行中のFile writerが出力ファイルパスのためのディレクトリを作成し、異なる結果ファイル(並行writerタスクごとに1つ)をディレクトリ内に置きます。このオプションが trueに設定された場合、並行度1のライターはディレクトリも生成し、その中に1つの結果ファイルを置くでしょう。オプションがfalseに設定された場合、ライターはディレクトリを生成せずに出力パスに直接ファイルを生成します。(デフォルト: false)

  • taskmanager.network.numberOfBuffers: ネットワークスタックに利用可能なバッファの数。この数値はタスクマネージャーがどれだけ多くのストリーミングデータ交換チャネルを同時に持つことができ、チャネルがどれほど良くバッファされるかを決定します。ジョブが却下された場合、あるいはシステムが利用可能なバッファを十分に持っていないと警告された場合は、この値を増やしてください (デフォルト: 2048)。

  • state.backend: チェックポイントが有効な場合、オペレータの状態のチェックポイントが格納されるために使われるであろうバックエンド。サポートされるバックエンド:
    • jobmanager: インメモリの状態、ジョブマネージャー/ZooKeeperのメモリにバックアップします。最小の状態(Kafkaオフセット)あるいはテストおよびローカルデバッギングのためにのみ使われるべきです。
    • filesystem: 状態はタスクマネージャー上のメモリ内にあり、状態のスナップショットはファイルシステムに格納されます。例えば HDFS、S3、… のFlinkによってサポートされる全てのファイルシステムによってサポートされます。
  • state.backend.fs.checkpointdir: Flinkがサポートするファイルシステム内に格納されるチェックポイントのためのディレクトリ。注意: 状態のバックエンドはジョブマネージャーからアクセス可能でなければなりません。ローカルセットアップのためにはfile:// のみ使います。

  • state.backend.rocksdb.checkpointdir: RocksDBファイルを格納するためのローカルディレクトリ、あるいはシステムのディレクトリ デリミタ(例えば、Linux/Unix上での ':' (コロン)) によって区切られたディレクトリのリスト。(デフォルトの値は taskmanager.tmp.dirs)

  • state.checkpoints.dir: externalized checkpointsのメタデータのための対象ディレクトリ。

  • high-availability.zookeeper.storageDir: HAのために必要です。ジョブマネージャーのメタデータを格納するディレクトリ; これは状態バックエンドの中に保持され、この状態のポインタだけがZooKeeperに格納されます。チェックポイントのディレクトリと正しく同じで、ジョブマネージャーからアクセス可能でなければならず、ローカル配備のためだけに使われなければなりません。以前はこのキーはrecovery.zookeeper.storageDirという名前でした。

  • blob.storage.directory: TaskManager上で(ユーザのJARのような)blobを格納するためのディレクトリ。

  • blob.server.port: TaskManager上の(ユーザのJARを提供するような)blobサーバのポート定義。デフォルトでは、ポートは0に設定されます。これはオペレーティングシステムが短命ポートを選んだことを意味します。Flink はポートのリスト(“50100,50101”)、範囲(“50100-50200”) あるいはそれら両方の組み合わせも受け付けます。複数のジョブマネージャーが同じマシーン上で実行中の場合は、衝突を避けるためにポートの範囲を設定することをお勧めします。

  • blob.service.ssl.enabled: blob クライアント/サーバ 通信のためのsslを有効にするためのフラグ。グローバルなsslフラグ security.ssl.enabled がtrue (デフォルト: true) に設定されている時のみ適用可能です。

  • restart-strategy: デフォルト restart strategy はジョブのために何も再起動ストラテジが指定されていない場合に使われます。オプションは以下の通りです:
    • 固定遅延ストラテジ: fixed-delay.
    • 障害レートストラテジ: failure-rate.
    • 再起動無し: none

    Default value is none unless checkpointing is enabled for the job in which case the default is fixed-delay with Integer.MAX_VALUE restart attempts and 10s delay.

  • restart-strategy.fixed-delay.attempts: デフォルトの再起動ストラテジが"fixed-delay"に設定された場合に使われる再起動の試行の数。Default value is 1, unless “fixed-delay” was activated by enabling checkpoints, in which case the default is Integer.MAX_VALUE.

  • restart-strategy.fixed-delay.delay: デフォルトの再起動ストラテジが"fixed-delay"に設定された場合に使われる、再起動の試行間の遅延。Default value is the akka.ask.timeout, unless “fixed-delay” was activated by enabling checkpoints, in which case the default is 10s.

  • restart-strategy.failure-rate.max-failures-per-interval: "failure-rate"ストラテジでジョブが失敗するまでの、指定された時間内の再起動の最大数。デフォルト値は 1です。

  • restart-strategy.failure-rate.failure-rate-interval: "failure-rate"ストラテジでの失敗レートを計測するための時間間隔。デフォルト値は 1 秒です。

  • restart-strategy.failure-rate.delay: デフォルトの再起動ストラテジが"failure-rate"に設定された場合に使われる、再起動の試行間の遅延。デフォルト値はakka.ask.timeoutです。

完全なリファレンス

HDFS

これらのパラメータはFlinkによって使われるデフォルトのHDFSを設定します。HDFS設定を指定しないセットアップはHDFSファイルのフルパス (hdfs://address:port/path/to/files) を指定しなければなりません。ファイルもデフォルトのHDFSパラメータ (ブロックサイズ、リプリケーション ファクター)で書かれるでしょう。

  • fs.hdfs.hadoopconf: Hadoop設定ディレクトリへの絶対パス。systemは"core-site.xml"と"hdfs-site.xml"ファイルをそのディレクトリ内で探します (デフォルト: null)。

  • fs.hdfs.hdfsdefault: Hadoopの独自の設定ファイル"hdfs-default.xml"の絶対パス (デフォルト: null)。

  • fs.hdfs.hdfssite: Hadoopの独自の設定ファイル "hdfs-site.xml" の絶対パス (デフォルト: null)。

ジョブマネージャー と タスクマネージャー

以下のパラメータはFlinkのジョブマネージャーとタスクマネージャーを設定します。

  • jobmanager.rpc.address: ジョブマネージャーの外部アドレス。これは分散システムのマスター/コーディネータです (デフォルト: localhost)。注意: アドレス (ホスト名あるいはIP)はクライアントを含むすべてのノードからアクセス可能でなければなりません。

  • jobmanager.rpc.port: ジョブマネージャーのポート番号 (デフォルト: 6123)。

  • taskmanager.hostname: タスクマネージャーがバインドされているネットワークインタフェースのホスト名。デフォルトで、タスクマネージャーはジョブマネージャーと他のタスクマネージャーに接続することができるネットワークインタフェースを探します。ストラテジが何らかの理由で失敗した場合に、このオプションはホスト名を定義するために使うことができます。異なるタスクマネージャーはこのオプションのために異なる値を必要とするため、通常追加の共有されないタスクマネージャー固有の設定ファイルが指定されます。

  • taskmanager.rpc.port: タスクマネージャーの IPC ポート (デフォルト: 0、これはOSによって空いているポートが選択されます)。

  • taskmanager.data.port: データ交換操作のために使われるタスクマネージャーのポート (デフォルト: 0、これはOSによって空いているポートが選択されます)。

  • taskmanager.data.ssl.enabled: タスクマネージャーのデータ転送のためにSSLサポートを有効にします。グローバルなsslフラグ security.ssl.enabled がtrue (デフォルト: true) に設定されている時のみ適用可能です。

  • jobmanager.heap.mb: ジョブマネージャーのためのJVM のヒープサイズ(メガバイト単位) (デフォルト: 256)。

  • taskmanager.heap.mb: タスクマネージャーのJVM ヒープサイズ (メガバイト単位)。これはsystemの平行ワーカーです。Hadoopとは対照的に、Flinkはオペレータ(例えばjoin, aggregat)とユーザ定義の関数(例えば Map, Reduce, CoGroup) をタスクマネージャー ( sorting/hashing/caching を含む)の中で実行します。つまりこの値はできるだけ大きくしなければなりません。YARN セットアップ上では、この値はタスクマネージャーのYARNコンテナのサイズに自動的に設定され、ある程度寛容できる値が引かれます。

  • taskmanager.numberOfTaskSlots: 1つのタスクマネージャーが実行できる、並行オペレータあるいはユーザ関数のインスタンスの数(デフォルト: 1)。値が1より大きい場合は、1つのシングルマネージャーは関数あるいはオペレータの複数のインスタンスを取ることができます。そのようにして、タスクマネージャーは複数のCPUコアを使うことができますが、同時に、利用可能なメモリは異なるオペレータあるいは関数のインスタンスの間で分配されます。この値は一般的にタスクマネージャーのマシーンが持つ物理CPUコアの数に比例します(例えば、コアの数に一致、あるいはコアの数の半分)。

  • taskmanager.tmp.dirs: テンポラリ ファイルのためのディレクトリ、あるいはシステムのディレクトリのデリミタ(例えばLinux/Unixでの ‘:’ (コロン))によって区切られたディレクトリのリスト。複数のディレクトリが指定された場合は、一時ファイルはラウンドロビンの形式でディレクトリに渡って分散されるでしょう。I/O マネージャーコンポーネントは、ディレクトリごとに1つの読み込みと1つの書き込みスレッドをspawnするでしょう。I/Oマネージャーが複数のスレッドを使うように、ディレクトリは複数回リスト化されるかもしれません (例えば、とても速いディスクあるいはRAIDに格納されます) (デフォルト: システムのtmpディレクトリ)。

  • taskmanager.network.numberOfBuffers: ネットワークスタックに利用可能なバッファの数。この数値はタスクマネージャーがどれだけ多くのストリーミングデータ交換チャネルを同時に持つことができ、チャネルがどれほど良くバッファされるかを決定します。ジョブが却下された場合、あるいはシステムが利用可能なバッファを十分に持っていないと警告された場合は、この値を増やしてください (デフォルト: 2048)。

  • taskmanager.memory.size: タスクマネージャーがソート、ハッシュテーブル、中間結果をキャッシュするためにJVMのヒープ上に確保する総メモリ量(メガバイト単位)。指定しない(-1)場合、メモリマネージャーは taskmanager.memory.fraction によって指定された、JVMに利用可能なヒープメモリの固定の割合を取るでしょう。(デフォルト: -1)

  • taskmanager.memory.fraction: タスクマネージャーがソート、ハッシュテーブルおよび中間結果をキャッシュするために確保する、相対的なメモリ量。例えば、0.8の値はタスクマネージャーが内部的なデータバッファのためにJVMのヒープ空間の80%を確保し、JVMのヒープ空間の残りの20%をユーザ定義関数によって生成されるオブジェクトのために開けておくことを意味します。(デフォルト: 0.7) taskmanager.memory.size が設定されない場合は、このパラメータは評価のみされます。

  • taskmanager.debug.memory.startLogThread: タスクマネージャーが定期的のメモリとガベージコレクション統計を記録するようにします。この統計には、現在のヒープ、オフヒープ、および他のメモリプール利用と、ヒープメモリプールによってガベージコレクションに費やされた時間が含まれます。

  • taskmanager.debug.memory.logIntervalMs: タスクマネージャーがメモリおよびガベージコレクション統計を記録する時間間隔(ミリ秒単位)。taskmanager.debug.memory.startLogThread がtrueに設定された場合にのみ効果があります。

  • taskmanager.maxRegistrationDuration: タスクマネージャーの登録を間違うことができる最大回数を定義します。登録が成功しないまま持続期間を超えると、タスクマネージャーは終了します。最大登録持続期間は時間単位識別子 (ms/s/min/h/d) (たとえば “10 min”) を必要とします。(デフォルト: Inf)

  • taskmanager.initial-registration-pause: 2つの連続する登録の試行の間の初期の登録休止時間。休止時間が最大登録休止時間に達するまで、各登録の試行ごとの休止は2倍されます。初期の登録休止時間は時間単位識別子 (ms/s/min/h/d) (例えば “5 s”) を必要とします。(デフォルト: 500 ms)

  • taskmanager.max-registration-pause: 2つの連続する登録の試行の間の最大登録休止時間。最大登録休止時間は時間単位識別子 (ms/s/min/h/d) (例えば “5 s”) を必要とします。(デフォルト: 30 s)

  • taskmanager.refused-registration-pause: 接続に入る前にジョブマネージャーによって拒否された登録の後の休止時間。拒否された登録の休止時間は時間単位識別子 (ms/s/min/h/d) (例えば “5 s”) を必要とします。(デフォルト: 10 s)

  • taskmanager.jvm-exit-on-oom: タスク スレッドが OutOfMemoryErrorを投げる時にタスクマネージャーがすぐにJVMを終了するかどうかを示します (デフォルト: false)。

  • blob.fetch.retries: タスクマネージャーがジョブマネージャーから(JARファイルのような)BLOBをダウンロードする試行数 (デフォルト: 50)。

  • blob.fetch.num-concurrent: ジョブマネージャーが提供する(JARファイルダウンロードのような)BLOB同時取り込み数 (デフォルト: 50)。

  • blob.fetch.backlog: ジョブマネージャーが可能な(JARファイルダウンロードのような)キューされたBLOBの取り込み最大数 (デフォルト: 1000)。

  • task.cancellation-interval: 二つの連続するタスクの取り消しの試行間のミリ秒単位の間隔 (デフォルト: 30000)。

分散の調整 (via Akka)

  • akka.ask.timeout: 全ての機能で使われ、Akka呼び出しをブロックするタイムアウト。Flinkがタイムアウトによって失敗する場合、この値を増やしてみるべきです。タイムアウトは遅いマシーンあるいは混雑したネットワークによっても起こり得ます。タイムアウトの値は time-単位記述子 (ms/s/min/h/d) を必要とします (デフォルト: 10 s)。

  • akka.lookup.timeout: ジョブマネージャーを検索するために使われるタイムアウト。タイムアウトの値はtime-単位識別子 (ms/s/min/h/d) を含む必要があります (デフォルト: 10 s)。

  • akka.client.timeout: Flinkクラスタと通信する時に、Flinkクライアントによって使われるタイムアウト (例えば、CliFrontend, ClusterClient)。タイムアウトの値は時間単位識別子 (ms/s/min/h/d) を含む必要があります (デフォルト: 60 s)。

  • akka.framesize: ジョブマネージャーとタスクマネージャー間で送信されるメッセージの最大サイズ。メッセージがこの制限を超えたためにFlinkが失敗する場合は、増やす必要があります。メッセージサイズはsize-単位識別子を必要とします (デフォルト: 10485760b)。

  • akka.watch.heartbeat.interval: AkkaのDeathWatch機構が死亡したタスクマネージャーを検知するためのハートビート間隔。ハートビートメッセージが喪失あるいは遅延したために間違ってタスクマネージャーが死亡したとマークされた場合、この値を増やすべきです。AkkaのDeathWatchの詳細な説明は ここで見つけることができます (デフォルト: 10 s)。

  • akka.watch.heartbeat.pause: AkkaのDeatchWatch機構が受け入れられるハートビートの休止。低い値は不規則なハートビートを許可しません。AkkaのDeathWatchの詳細な説明は ここで見つけることができます (デフォルト: 60 s)。

  • akka.watch.threshold: DeathWatchの障害検知の閾値。高い値は死亡したTaskManagerの検知するまでの時間が長くなるのに対し、低い値はfalse positiveになりがちです。AkkaのDeatchWatchの詳細な説明はここで見つけることができます (デフォルト: 12)。

  • akka.transport.heartbeat.interval: Akkaの転送障害検知のためのハートビート間隔。FlinkはTCPを使うため、検知は必要ではありません。したがって、間隔をとても大きな値にすることで検知を無効にします。転送障害検知必要とする場合は、間隔を意味のある値に設定します。間隔の値はtime-単位識別子 (ms/s/min/h/d) を必要とします (デフォルト: 1000 s)。

  • akka.transport.heartbeat.pause: Akkaの転送障害検知が受け入れられるハートビートの休止。FlinkはTCPを使うため、検知は必要ではありません。したがって、休止をとても大きな値にすることで検知を無効にします。転送障害検知必要とする場合は、休止を意味のある値に設定します。休止の値は time-単位記述子 (ms/s/min/h/d) を必要とします (デフォルト: 6000 s)。

  • akka.transport.threshold: 転送障害検知の閾値。FlinkはTCPを使うため検知は必要ありません。したがって閾値は大きな値に設定されます (デフォルト: 300)。

  • akka.tcp.timeout: 全ての外向きの接続のタイムアウト遅いネットワークのためにタスクマネージャーへの接続に問題を経験した場合は、この値を増やすべきです (デフォルト: 20 s)。

  • akka.throughput: スレッドをプールに返す前にバッチ内で処理されるメッセージの数。低い値は公平なスケジューリングを意味し、一方で高い値は不公平という代償を払ってパフォーマンスを改善することができます (デフォルト: 15)。

  • akka.log.lifecycle.events: イベントのAkkaのリモートログをオンにします。デバッグ時には、この値を'true'に設定します (デフォルト: false)。

  • akka.startup-timeout: リモートコンポーネントの起動が失敗したと見なされるタイムアウト (デフォルト: akka.ask.timeout)。

  • akka.ssl.enabled: Akkaのリモート通信のためのSSLをオンにします。グローバルなsslフラグ security.ssl.enabled がtrue (デフォルト: true) に設定されている時のみ適用可能です。

SSL設定

  • security.ssl.enabled: 内部的なネットワーク通信のためのSSLをオンにします。これは異なる通信モジュール内で定義されたフラグによって任意で上書きすることができます (デフォルト: false)。

  • security.ssl.keystore: flinkのエンドポイントによってSSLキーと証明書のために使われるJavaキーストアファイル。

  • security.ssl.keystore-password: キーストアファイルを複合化するための秘密鍵。

  • security.ssl.key-password: キーストア内のサーバキーを複合化するための秘密鍵。

  • security.ssl.truststore: flinkのエンドポイントによってピアの証明書を検証するために使われる公的なCA証明書を含むトラストストア ファイル。

  • security.ssl.truststore-password: トラストストアを複合化するための秘密鍵。

  • security.ssl.protocol: ssl通信でサポートされるSSLプロトコルのバージョン (デフォルト: TLSv1.2)。

  • security.ssl.algorithms: サポートされる標準SSLアルゴリズムのカンマ区切りのリスト。詳しくはここを読んでください (デフォルト: TLS_RSA_WITH_AES_128_CBC_SHA)。

  • security.ssl.verify-hostname: sslハンドシェイク時のピアのホスト名系ションを有効にするためのフラグ (デフォルト: true)。

ネットワーク通信 (via Netty)

これらのパラメータは進んだチューニングを許可します。大きなクラスタ上で同時高スループットを実行する場合はデフォルトの値で十分です。

  • taskmanager.net.num-arenas: Netty アリーナの数 (デフォルト: taskmanager.numberOfTaskSlots)。

  • taskmanager.net.server.numThreads: Netty サーバのスレッドの数 (デフォルト: taskmanager.numberOfTaskSlots)。

  • taskmanager.net.client.numThreads: Netty クライアントスレッドの数 (デフォルト: taskmanager.numberOfTaskSlots)。

  • taskmanager.net.server.backlog: netty サーバの接続のバックログ。

  • taskmanager.net.client.connectTimeoutSec: Netty クライアント接続のタイムアウト (デフォルト: 120 seconds)。

  • taskmanager.net.sendReceiveBufferSize: Netty の送信および受信バッファサイズ。これのデフォルトはシステムのバッファサイズ (cat /proc/sys/net/ipv4/tcp_[rw]mem) で、最新のLinuxでは 4 MiBです。

  • taskmanager.net.transport: Netty 転送の型。“nio” あるいは “epoll” (デフォルト: nio)。

ジョブマネージャーのWebフロントエンド

  • jobmanager.web.port: 実行中のジョブの状態と終了したジョブの実行時間のブレークダウンを表示するジョブマネージャーのwebインタフェースのポート (デフォルト: 8081)。この値を-1に設定するとwebフロントエンドを無効にします。

  • jobmanager.web.history: ジョブマネージャーのwebフロントエンドのヒストリ内の最新のジョブの数 (デフォルト: 5)。

  • jobmanager.web.checkpoints.disable: チェックポイントの統計を無効にします (デフォルト: false)。

  • jobmanager.web.checkpoints.history: 記憶するチェックポイントの統計の数 (デフォルト: 10)。

  • jobmanager.web.backpressure.cleanup-interval: アクセスされない場合にキャッシュされた統計が掃除される時間 (デフォルト: 600000, 10 分)。

  • jobmanager.web.backpressure.refresh-interval: 利用可能な統計が非推奨になり更新される必要がある時間 (デフォルト: 60000, 1 分)。

  • jobmanager.web.backpressure.num-samples: バックプレッシャーを決定するために取られるスタックトレースの標本の数 (デフォルト: 100)。

  • jobmanager.web.backpressure.delay-between-samples: バックプレッシャーを決定するためのスタックトレースの標本間の遅延 (デフォルト: 50, 50 ms).

  • jobmanager.web.ssl.enabled: webフロントエンドへのhttpsアクセスを有効にします。グローバルなsslフラグ security.ssl.enabled がtrue (デフォルト: true) に設定されている時のみ適用可能です。

ファイルシステム

パラメータは結果ファイルを生成するタスクの挙動を定義します。

  • fs.default-scheme: 接続するために必要な権限を持つ、使用されるデフォルトのファイルシステムスキーマ。例えば、HDFSの場合はネームノードの host:ポート (必要であれば)。デフォルトでは、これはローカルファイルシステムを示すfile:///に設定されます。このことは、明示的なスキーマの定義 無しに、ユーザ定義ファイルを検索するtあめにローカルファイルシステムが使われることを意味します。他のスキーマが(明示的に)ユーザが渡すURI内で指定されない場合にのみこのスキーマが使われます。

  • fs.overwrite-files: ファイルの出力ライターがデフォルトで既存のファイルを上書きするかどうかを指定します。デフォルトで上書きするためにはtrueに設定し、そうでなければfalse です。(デフォルト: false)

  • fs.output.always-create-directory: 並行度1以上で実行中のFile writerが出力ファイルパスのためのディレクトリを作成し、異なる結果ファイル(並行writerタスクごとに1つ)をディレクトリ内に置きます。このオプションが trueに設定された場合、並行度1のライターはディレクトリも生成し、その中に1つの結果ファイルを置くでしょう。オプションがfalseに設定された場合、ライターはディレクトリを生成せずに出力パスに直接ファイルを生成します。(デフォルト: false)

コンパイラ/オプティマイザ

  • compiler.delimited-informat.max-line-samples: 区切られた入力のためにコンパイラによって取られる行標本の最大数。標本はレコードの数を推測するために使われます。この値は入力フォーマットのパラメータを持つ特定の入力のために上書きすることができます (デフォルト: 10)。

  • compiler.delimited-informat.min-line-samples: 区切られた入力のためにコンパイラによって取られる行の標本の最小数。標本はレコードの数を推測するために使われます。この値は入力フォーマットのパラメータを持つ特定の入力のために上書きすることができます (デフォルト: 2)。

  • compiler.delimited-informat.max-sample-len: 区切られた入力のためにコンパイラによって取られる行標本の最大長。1つの標本の長さがこの値を超えた場合(パーサーの設定間違いによって起こり得ます)、標本化は異常終了します。この値は入力フォーマットのパラメータを持つ特定の入力のために上書きすることができます (デフォルト: 2097152 ( = 2Mバイト))。

ランタイム アルゴリズム

  • taskmanager.runtime.hashjoin-bloom-filters: ハイブリッド ハッシュ ジョイン実装の中でbloomフィルタを有効/無効にするフラグ。ハッシュ ジョインがディスクにこぼれる場合(データセットが確保されたメモリのフラクションより大きい)、これらのbloomフィルタはいくらかのCPUサイクルを犠牲にしてこぼれるレコードの数を大きく減らします。(デフォルト: false)

  • taskmanager.runtime.max-fan: 外部マージ ジョインのための最大論理入力と、こぼれているハッシュテーブルのための論理出力。オペレータあたりのファイルハンドルの数を制限しますが、あまりに小さく設定された場合中間のマージ/パーティションを起こすかも知れません (デフォルト: 128)。

  • taskmanager.runtime.sort-spilling-threshold: このメモリバジェットの割合が一杯になった時に、ソートオペレーションはこぼし始めます (デフォルト: 0.8)。

リソース マネージャ

この章での設定キーは使用されているリソース管理フレームワーク (YARN, Mesos, Standalone, …) に依存しません。

  • resourcemanager.rpc.port: リソースマネージャーとの通信のための接続するためのネットワークポートを定義する設定パラメータ。同じActorSystemが使われるため、デフォルトではジョブマネージャーのポート。ポート範囲を定義するためにこの設定キーを使うことはできません。

YARN

  • containerized.heap-cutoff-ratio: (デフォルト 0.25) YARNによって開始されたコンテナから削除するヒープ空間のパーセンテージ。ユーザが各タスクマネージャーコンテナのためにある量のメモリ(例えば4GB)を要求した場合、JVMはヒープ外にもメモリを割り当てるため、この総量をJVMのための最大ヒープ空間として渡す (-Xmx 引数) ことはできません。YARN はリクエストしたよりも多くのメモリを使っているコンテナをkillすることにとても厳密です。したがって、安全マージンとしてリクエストされたヒープから15%のメモリを取り去ります。

  • containerized.heap-cutoff-min: (デフォルト 600 MB) リクエストされたヒープサイズから削除するメモリの最小量。

  • yarn.maximum-failed-containers (デフォルト: リクエストされたコンテナの数)。障害時にsystemが再割り当てをしようとするコンテナの最大数。

  • yarn.application-attempts (デフォルト: 1). 再起動するアプリケーションマスターの数。Flinkクラスタ全体が再起動し、YARNクライアントは接続を失うだろうことに注意してください。また、ジョブマネージャーのアドレスは変わり、JMの host:port を手動で設定する必要があるでしょう。このオプションは1に設定したままにすることをお勧めします。

  • yarn.heartbeat-delay (デフォルト: 5 秒)。リソースマネージャーとのハートビート間の時間。

  • yarn.properties-file.location (デフォルト: 一時ディレクトリ)。FlinkのジョブがYARNにサブミットされた場合、Flinkクライアントが手に入れることができるように、ジョブマネージャーのホストと利用可能な処理スロットの数がプロパティファイルに書き込まれます。この設定パラメータを使ってそのファイルのデフォルトの場所を変更することができます (for example for environments sharing a Flink installation between users)

  • yarn.containers.vcores YARNコンテナあたりの仮想コア(vcores) の数。デフォルトでは、vcoresの数は設定されていればタスクマネージャーあたりのスロットの数に設定されます。そうでなければ1に設置衛されます。

  • containerized.master.env.ENV_VAR1=value containerized.master.env.が頭につく設定値は、環境変数としてアプリケーションマスター/ジョブマネージャー プロセスに渡されるでしょう。例えば、LD_LIBRARY_PATHをアプリケーションマスタに環境変数として渡すには、以下を設定します:

    containerized.master.env.LD_LIBRARY_PATH: "/usr/lib/native"

  • containerized.taskmanager.env. 上の設定接頭語と似て、この接頭語はタスクマネージャープロセスのための独自の環境変数を設定することができます。

  • yarn.container-start-command-template: YARN上で開始する時にFlinkは以下のテンプレートを使用します: %java% %jvmmem% %jvmopts% %logging% %class% %args% %redirects%。この設定パラメータによりユーザは独自の設定 (JVMパスや引数などのような)を渡すことができます。ほとんどの場合において、env.java.opts の設定の使用で十分なことに注意してください。これは文字列内の %jvmopts% 変数です。

  • yarn.application-master.port (デフォルト: 0、OSが短命ポートとして選択します) この設定オプションを使って、ユーザはアプリケーションマスタ(とジョブマネージャー) RPCポートのためのポート、ポートの範囲、あるいはポートのリストを指定することができます。デフォルトでは、オペレーティングシステムが適切なポートを選択できるようにデフォルト値 (0)を使うことをお勧めします。特に、複数のAMが同じ物理ホスト上で動作している場合は、固定ポートの割り当てはAMの開始を妨げるでしょう。

    例えば、制限されたファイアウォールを持つ環境上でYARN上でFlinkを実行する場合、このオプションを使って許可されたポートの範囲を指定することができます。

  • yarn.tags Flink YARNアプリケーションへ適用するカンマ区切りのタグのリスト。

Mesos

  • mesos.initial-tasks: マスタが開始した時に起動する初期ワーカー (デフォルト: クラスタの起動時に指定されたワーカーの数)。

  • mesos.maximum-failed-tasks: クラスタが故障するまでの失敗したワーカーの最大数 (デフォルト: 初期ワーカーの数)。この機能を無効にするために -1 を設定することができます。

  • mesos.master: Mesos マスターのURL。値は以下の形式のうちの1つでなければなりません:
    • host:port
    • zk://host1:port1,host2:port2,.../path
    • zk://username:password@host1:port1,host2:port2,.../path
    • file:///path/to/file
  • mesos.failover-timeout: Mesosスケジューラのためのフェイルオーバー タイムアウトの秒数。実行中のタスクはこの後で自動的にシャットダウンされます (デフォルト: 600)。

  • mesos.resourcemanager.artifactserver.port: 使用するMesos アーティファクト サーバのポートを定義する設定パラメータ。ポートを0に設定するとOSが利用可能なポートを選択するでしょう。

  • mesos.resourcemanager.framework.name: Mesos のフレームワーク名 (デフォルト: Flink)

  • mesos.resourcemanager.framework.role: Mesos フレームワークのロール定義 (デフォルト: *)

  • mesos.resourcemanager.framework.principal: Mesos フレームワーク プリンシパル (デフォルト無し)

  • mesos.resourcemanager.framework.secret: Mesos フレームワークの秘密鍵 (デフォルト無し)

  • mesos.resourcemanager.framework.user: Mesos フレームワークのユーザ (デフォルト:””)

  • mesos.resourcemanager.artifactserver.ssl.enabled: Flinkアーティファクト サーバのためのSSLを有効にします (デフォルト: true)。暗号化を有効にするためにsecurity.ssl.enabledtrue に設定する必要があることに注意してください。

  • mesos.resourcemanager.tasks.mem: Mesosワーカーに割り当てるメモリのMB数 (デフォルト: 1024)

  • mesos.resourcemanager.tasks.cpus: Mesosワーカーに割り当てるCPU (デフォルト: 0.0)

  • mesos.resourcemanager.tasks.container.type: 使用するコンテナ化の型: “mesos” あるいは “docker” (デフォルト: mesos);

  • mesos.resourcemanager.tasks.container.image.name: コンテナに使用するイメージ名 (デフォルト無し)

  • high-availability.zookeeper.path.mesos-workers: Mesosワーカーの情報を持続するためのZooKeeperのルートパス。

高可用性 (HA)

  • high-availability: クラスタの実行のために使われる高可用性モードを定義します。現在のところ、Flinkは以下のモードをサポートします:
    • none (デフォルト): 高可用性無し。1つのジョブマネージャーを実行し、ジョブマネージャーの状態はチェックポイントされません。
    • zookeeper: 複数のジョブマネージャーの実行とジョブマネージャーの状態をチェックポイントをサポートします。ジョブマネージャーのグループの中で、ZooKeeperは1つのジョブマネージャーをクラスタの実行に責任ンがあるリーダーとして選出します。ジョブマネージャーが失敗した場合は、スタンドバイ ジョブマネージャーが新しいリーダーとして選出され、最後のチェックポイントのジョブマネージャーの状態が渡されます。'zookeeper' モードを使うには、high-availability.zookeeper.quorum 設定値も定義する必要があります。

以前はこのキーはrecovery.modeという名前で、デフォルトの値は standaloneでした。

ZooKeeperベースのHAモード

  • high-availability.zookeeper.quorum: 'zookeeper'のHAモードが選択された場合にZooKeeperクラスタに接続するために使われるZooKeeperの定員URLを定義します。以前はこのキーはrecovery.zookeeper.quorumという名前でした。

  • high-availability.zookeeper.path.root: (デフォルト v/flink) ZooKeeper HAモードが名前空間ディレクトリを作成する予定のルートディレクトリを定義します。以前はこのキーはrecovery.zookeeper.path.rootという名前でした。

  • high-availability.zookeeper.path.namespace: (デフォルト スタンドアローン クラスタモードでは/default_ns、あるいは YARN配下) ZooKeeper HAモードがznodeを作成する予定のルートディレクトリ以下のサブディレクトリを定義します。これにより同じZooKeeper上で複数のアプリケーションを隔離することができます。以前はこのキーは`recovery.zookeeper.path.namespace`という名前でした。

  • high-availability.zookeeper.path.latch: (デフォルト /leaderlatch) リーダーを選出するために使われるリーダーラッチのznodeを定義します。以前はこのキーはrecovery.zookeeper.path.latchという名前でした。

  • high-availability.zookeeper.path.leader: (デフォルト /leader) リーダーへのURLと現在のリーダーのセッションIDを含むリーダーのznodeを定義します。以前はこのキーはrecovery.zookeeper.path.leaderという名前でした。

  • high-availability.zookeeper.storageDir: ジョブマネージャーのメタデータが格納される予定の状態バックエンド内のディレクトリを定義します (Zookeeperはそれへのポインタのみを保持します)。HAのための要件以前はこのキーはrecovery.zookeeper.storageDirという名前でした。

  • high-availability.zookeeper.client.session-timeout: (デフォルト 60000) ZooKeeperのセッションについてのセッションのタイムアウトをmsで定義します。以前はこのキーはrecovery.zookeeper.client.session-timeoutという名前でした。

  • high-availability.zookeeper.client.connection-timeout: (デフォルト 15000) ZooKeeperについての接続タイムアウトをmsで定義します。以前はこのキーはrecovery.zookeeper.client.connection-timeoutという名前でした。

  • high-availability.zookeeper.client.retry-wait: (デフォルト 5000) 連続する試行間の休止をmsで定義します。以前はこのキーはrecovery.zookeeper.client.retry-waitという名前でした。

  • high-availability.zookeeper.client.max-retry-attempts: (デフォルト 3) クライアントが諦めるまでに試す接続の数を定義します。以前はこのキーはrecovery.zookeeper.client.max-retry-attemptsという名前でした。

  • high-availability.job.delay: (デフォルト akka.ask.timeout) マスターリカバリの状況の場合に、永続ジョブが回復されるまでの遅延を定義します。以前はこのキーはrecovery.job.delayという名前でした。

  • high-availability.zookeeper.client.acl: (デフォルト open) Defines the ACL (open creator) to be configured on ZK node. ZooKeeperサーバ設定が SASLAuthenticationProviderを使うようにマップされた “authProvider” プロパティを持ち、クラスタがセキュアモード (Kerberos)で動作するように設定されている場合、設定値を“creator”に設定することができます。ACL オプションは https://zookeeper.apache.org/doc/r3.1.2/zookeeperProgrammers.html#sc_BuiltinACLSchemes に基づいています

ZooKeeper セキュリティ

  • zookeeper.sasl.disable: (デフォルト: true) SASLベースの認証が有効あるいは無効にされる必要があるかどうかを定義します。ZooKeeperクラスタがセキュアモード (Kerberos)で動いている場合に、設定値を “true” に設定することができます。

  • zookeeper.sasl.service-name: (デフォルト: zookeeper) ZooKeeperサーバが異なるサービス名 (デフォルト:”zookeeper”) を使って設定されている場合、この設定を使って提供することができます。クライアントとサーバ設定の間のサービス名の不一致は認証を失敗させるでしょう。

Kerberosベースのセキュリティ

  • security.kerberos.login.use-ticket-cache: Kerberosチケットキャッシュから読み込むかどうかを指示する (デフォルト: true)。

  • security.kerberos.login.keytab: ユーザ証明書を含むKerberosキータブファイルへの絶対パス。

  • security.kerberos.login.principal: キータブに関連するKerberos プリンシパル名。

  • security.kerberos.login.contexts: Kerberos証明書を提供するためのloginコンテキストのカンマ区切りのリスト (例えば、ZooKeeper認証およびKafka認証のための証明書を使うためのClient,KafkaClient)。

環境

  • env.log.dir: (デフォルトは Flinkのホームの下のlogディレクトリ) Flinkのログが保存されるディレクトリを定義します。絶対パスでなければなりません。

  • env.ssh.opts: JobManager, TaskManager および Zookeeper サービスを開始あるいは停止する時にSSHクライアントへ渡される追加のコマンドラインオプション (start-cluster.sh, stop-cluster.sh, start-zookeeper-quorum.sh, stop-zookeeper-quorum.sh)。

クエリ可能な状態

サーバ

  • query.server.enable: クエリ可能な状態を有効にする (デフォルト: true)。

  • query.server.port: クエリ可能な状態サーバを紐付けるためのポート (デフォルト: 0、ランダムなポートへ紐付けます)。

  • query.server.network-threads: クエリ可能な状態サーバのためのネットワーク (Nettyのイベントループ) スレッドの数 (デフォルト: 0、スロットの数を取り上げます)。

  • query.server.query-threads: クエリ可能な状態サーバのためのクエリスレッドの数 (デフォルト: 0、スロットの数を取り上げます)。

クライアント

  • query.client.network-threads: クエリ可能な状態クライアントのためのネットワーク (Nettyのイベントループ)の数 (デフォルト: 0Runtime.getRuntime().availableProcessors()によって帰る利用可能なコアの数を取り上げます)。

  • query.client.lookup.num-retries: 利用不可能なジョブマネージャーによりKvStateのルックアップが失敗する時の再試行の数 (デフォルト: 3)。

  • query.client.lookup.retry-delay: 利用不可能なジョブマネージャーによりKvStateのルックアップが失敗する時の再試行の遅延のミリ秒 (デフォルト: 1000)。

マトリックス

  • metrics.reporters: 名前付きのレポーターのリスト。つまり "foo,bar"。

  • metrics.reporter.<name>.<config>: <name>という名前のレポーターのための一般的な設定<config>

  • metrics.reporter.<name>.class: <name>という名前のレポーターが使うレポータークラス。

  • metrics.reporter.<name>.interval: <name>という名前のレポーターが使うレポーター間隔。

  • metrics.scope.jm: (デフォルト: <host>.jobmanager) ジョブマネージャーに作用する全てのメトリクスに適用される作用フォーマット文字列を定義します。

  • metrics.scope.jm.job: (デフォルト: <host>.jobmanager.<job_name>) ジョブマネージャー上のジョブに作用する全てのメトリクスに適用される作用フォーマット文字列を定義します。

  • metrics.scope.tm: (デフォルト: <host>.taskmanager.<tm_id>) タスクマネージャーに作用する全てのメトリクスに適用される作用フォーマット文字列を定義します。

  • metrics.scope.tm.job: (デフォルト: <host>.taskmanager.<tm_id>.<job_name>) タスクマネージャー上のジョブに作用する全てのメトリクスに適用される作用フォーマット文字列を定義します。

  • metrics.scope.tm.task: (Default: <host>.taskmanager.<tm_id>.<job_name>.<task_name>.<subtask_index>) タスクに作用する全てのメトリクスに適用される作用フォーマット文字列を定義します。

  • metrics.scope.tm.operator: (Default: <host>.taskmanager.<tm_id>.<job_name>.<operator_name>.<subtask_index>) オペレータに作用する全てのメトリクスに適用される作用フォーマット文字列を定義します。

  • metrics.latency.history-size: (デフォルト: 128) 各オペレータで維持するための測定されたレイテンシの数を定義します

背景

ネットワーク バッファの設定

java.io.IOException: Insufficient number of network buffersを見たことがある場合は、ネットワークバッファの数を調節するために以下の公式を使ってください:

#slots-per-TM^2 * #TMs * 4

#slots per TMタスクマネージャーあたりのスロットの数で、#TMs はタスクマネージャーの総数です。

ネットワークバッファは通信層のための重要なリソースです。それらはネットワーク上で伝送をする前にレコードをバッファするために使われ、入ってくるデータをレコードに分離してアプリケーションにそれらを渡す前にバッファするために使われます。良いスループットを達成するには、十分な数のネットワークバッファが重要です。

一般的に、同時に開いておきたい各論理ネットワーク接続が専用のバッファを持つのに十分なバッファを持つようにタスクマネージャーを設定します。論理ネットワーク接続はネットワーク上で各データのpoint-to-point交換のために存在します。これは一般的に再パーティショニング、あるいはブロードキャストの段階(シャッフル フェーズ)で起こります。それらの中で、タスクマネージャー内の各並行タスクは全ての他の平行タスクとやり取りをすることができなければなりません。したがって、タスクマネージャー上で必要なバッファの数は、 total-degree-of-parallelism (number of targets) * intra-node-parallelism (number of sources in one task manager) * n です。ここで、n は同時にどれだけの数の再パーティション/ブロードキャスト ステップが活性化すると予測するかを定義する定数です。

intra-node-parallelism は一般的にコアの数で、4つ以上の再パーティションあるいはブロードキャスト チャネルが同時に起こることは滅多に無いため、その頻度は#slots-per-TM^2 * #TMs * 4に落ち着きます。

例えば20 8スロットのマシーンのクラスタをサポートするには、最適なスループットのためにおよそ5000のネットワークバッファを使うべきです。

各ネットワークバッファはデフォルトで30Kバイトのサイズです。上の例では、システムはネットワークバッファのためにおよそ 300Mバイトを割り当てるでしょう。

ネットワークバッファの数とサイズは以下のパラメータを使って設定することができます:

  • taskmanager.network.numberOfBuffers
  • taskmanager.memory.segment-size

一時的な I/O ディレクトリの設定

Flink はできる限りメインメモリ内で処理することを目指していますが、利用可能なメモリよりもっと多くのデータが処理される必要があることは珍しいことではありません。Flinkのランタイムはこれらの状況を扱うために一時的なデータをディスクに書き込むように設計されています。

taskmanager.tmp.dirs パラメータはFlinkが一時ファイルを書き込むディレクトリのリストを指定します。ディレクトリのパスは ':' (コロン文字)によって分割されている必要があります。Flinkは1つの一時ファイルを各設定されたディレクトリへ(あるいは、から)同時に書き込む(読み込む)でしょう。このように、パフォーマンスを改善するためにハードディスクのような複数の非依存I/Oデバイスへ一時的なI/Oを結果的に分散することができます。高速なI/Oデバイス(例えば、SSD, RAID, NAS)を利用するために、1つのディレクトリを複数回指定することができます。

taskmanager.tmp.dirsパラメータが明示的に指定されない場合は、FlinkはLinuxシステムでの/tmp のようなオペレーティングシステムの一時ディレクトリへ一時データを書き込みます。

タスクマネージャーの処理スロットの設定

Flinkはプログラムをサブタスクに分割し、それらのサブタスクを処理スロットにスケジュールすることで、並行して実行します。

各Flinkタスクマネージャーはクラスタ内の処理スロットを提供します。スロットの数は一般的にタスクマネージャーの利用可能なCPUコアの数に比例します。一般的な推奨として、taskmanager.numberOfTaskSlotsについての良いデフォルトは利用可能なCPUコアの数です。

Flinkアプリケーションを開始する場合、ユーザはジョブに使うスロットのデフォルトの数を提供することができます。したがってコマンドラインの値は-p (for parallelism) と呼ばれます。さらに、アプリケーション全体と個々のオペレータのためにプログラミングAPIの中でスロットの数を設定することができます。

TOP
inserted by FC2 system