並行実行

この章はプログラムの並行実行をFlink内でどのように設定することができるかを説明します。Flinkプログラムは複数のタスク(変換/オペレータ、データソース、およびシンク)からできています。タスクは実行のために幾つかの並行インスタンスに分割され、各並行インタウンスはタスクの入力データのサブセットを処理します。タスクの並行インスタンスの数は並行度 と呼ばれます。

セーブポイントを使いたい場合は、最大の並行度(あるいはmax parallelism)を設定することも考慮すべきです。セーブポイントから回復する時、特定のオペレータあるいはプログラム全体の並行度を変更することができ、この設定は並行度の上限を指定します。Flinkは内部的に状態をキー-グループに分割し、パフォーマンスを決定する+Infのキー-グループの数を持てないため、これは必須です。

並行度の設定

タスクの並行度はFlink内で異なるレベルで指定することができます:

オペレータ レベル

個々のオペレータ、データソース、あるいはシンクの並行度は、そのsetParallelism()メソッドを呼び出すことで定義することができます。例えば、以下のようにです:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<String> text = [...]
DataStream<Tuple2<String, Integer>> wordCounts = text
    .flatMap(new LineSplitter())
    .keyBy(0)
    .timeWindow(Time.seconds(5))
    .sum(1).setParallelism(5);

wordCounts.print();

env.execute("Word Count Example");
val env = StreamExecutionEnvironment.getExecutionEnvironment

val text = [...]
val wordCounts = text
    .flatMap{ _.split(" ") map { (_, 1) } }
    .keyBy(0)
    .timeWindow(Time.seconds(5))
    .sum(1).setParallelism(5)
wordCounts.print()

env.execute("Word Count Example")

実行環境レベル

ここで述べたように、Flinkのプログラムは実行環境のコンテキスト内で実行されます。実行環境はそれを実行する全てのオペレータ、データソース、およびデータシンクのためのデフォルトの並列度を定義します。実行環境の並行度はオペレータの並行度を明示的に設定することで上書きすることができます。

実行環境のデフォルトの並行度はsetParallelism() メソッドを呼び出すことで指定することができます。全てのオペレータ、データソース、およびデータシンクを並行度3で実行するには、実行環境のデフォルトの並行度を以下のように設定します:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);

DataStream<String> text = [...]
DataStream<Tuple2<String, Integer>> wordCounts = [...]
wordCounts.print();

env.execute("Word Count Example");
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(3)

val text = [...]
val wordCounts = text
    .flatMap{ _.split(" ") map { (_, 1) } }
    .keyBy(0)
    .timeWindow(Time.seconds(5))
    .sum(1)
wordCounts.print()

env.execute("Word Count Example")

クライアント レベル

ジョブをFlinkにサブミットする時に並行度をクライアントで設定することができます。クライアントはJavaあるいはScalaプログラムのどちらかです。そのようなクライアントの例の1つがFlinkのコマンドライン インタフェース (CLI) です。

CLIクライアントについては、並行度のパラメータは-pを使って指定することができます。例えば:

./bin/flink run -p 10 ../examples/*WordCount-java*.jar

Java/Scala プログラム内で、並行度は以下のように設定されます:

try {
    PackagedProgram program = new PackagedProgram(file, args);
    InetSocketAddress jobManagerAddress = RemoteExecutor.getInetFromHostport("localhost:6123");
    Configuration config = new Configuration();

    Client client = new Client(jobManagerAddress, config, program.getUserCodeClassLoader());

    // set the parallelism to 10 here
    client.run(program, 10, true);

} catch (ProgramInvocationException e) {
    e.printStackTrace();
}
try {
    PackagedProgram program = new PackagedProgram(file, args)
    InetSocketAddress jobManagerAddress = RemoteExecutor.getInetFromHostport("localhost:6123")
    Configuration config = new Configuration()

    Client client = new Client(jobManagerAddress, new Configuration(), program.getUserCodeClassLoader())

    // set the parallelism to 10 here
    client.run(program, 10, true)

} catch {
    case e: Exception => e.printStackTrace
}

システム レベル

全ての実行環境のためのシステム全体のデフォルトの並行度は./conf/flink-conf.yaml内のparallelism.default プロパティを設定することで定義することができます。詳細は設定 ドキュメントを見てください。

最大の並行度の設定

最大の並行度は(クライアントレベルおよびシステムレベルを除いて)並行度も設定できる箇所で設定することができます。setParallelism() を呼び出す代わりに、最大並行度を設定するためにsetMaxParallelism()を呼びます。

最大並行度のデフォルトの設定は、下限が127で上限が32768で、およそoperatorParallelism + (operatorParallelism / 2)です。

注意 幾つかの状態のバックエンドはキー-グループの数でスケールする内部的なデータ構造を維持する必要があるため、最大並行度をとても大きな値に設定するとパフォーマンスに有害かもしれません (これは再スケール可能な状態のための内部的な実装の仕組みです)。

上に戻る

TOP
inserted by FC2 system