Handling Application Parameters
This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.

アプリケーションパラメータの処理 #

アプリケーションパラメータの処理 #

ほとんど全てのFlinkアプリケーション、バッチおよびストリーミングの両方は外部設定パラメータに依存します。 それらは (パスあるいはアドレスのような)入力および出力ソース、システムパラメータ(並行度、ランタイムの設定)、およびアプリケーション固有のパラメータ(一般的にユーザ関数内で使われます)を指定するために使われます。

Flink はこれらの問題を解決するための基本的なツールを提供するためにParameterTool と呼ばれる簡単なユーティリティを提供します。 ここで説明されるParameterToolを使う必要はないことに注意してください。Commons CLIargparse4jのような他のフレームワークもFlinkでもうまく動作します。

ParameterToolへの設定値の取得 #

ParameterToolは設定を読み込むための事前定義された静的なメソッドのセットを提供します。このツールは内部的にMap<String, String>を想定しているため、独自の設定スタイルと統合するのは非常に簡単です。

.propertiesファイルから #

以下のメソッドはPropertiesファイルを読み取り、key/valueペアを提供します:

String propertiesFilePath = "/home/sam/flink/myjob.properties";
ParameterTool parameters = ParameterTool.fromPropertiesFile(propertiesFilePath);

File propertiesFile = new File(propertiesFilePath);
ParameterTool parameters = ParameterTool.fromPropertiesFile(propertiesFile);

InputStream propertiesFileInputStream = new FileInputStream(file);
ParameterTool parameters = ParameterTool.fromPropertiesFile(propertiesFileInputStream);

コマンドライン引数から #

これにより、コマンドラインから--input hdfs:///mydata --elements 42のような引数を取得できます。

public static void main(String[] args) {
    ParameterTool parameters = ParameterTool.fromArgs(args);
    // .. regular code ..

システムプロパティから #

JVMを起動する時に、システムプロパティをJVMに渡すことができます: -Dinput=hdfs:///mydata。次のシステムプロパティからParameterToolを初期化することもできます:

ParameterTool parameters = ParameterTool.fromSystemProperties();

Flinkプログラム内のパラメータの使用 #

これで、パラメータをどこかから取得できたので(上記を参照)、様々な方法でそれらを使えます。

ParameterToolから直接

ParameterTool自体に、値にアクセスするためのメソッドがあります。

ParameterTool parameters = // ...
parameters.getRequired("input");
parameters.get("output", "myDefaultValue");
parameters.getLong("expectedCount", -1L);
parameters.getNumberOfParameters();
// .. さらに多くのメソッドが利用可能です。

アプリケーションを送信するクライアントのmain()メソッドで、これらのメソッドのメソッドの返り値を直接使えます。 例えば、このようにオペレータの並行度を設定できるでしょう:

ParameterTool parameters = ParameterTool.fromArgs(args);
int parallelism = parameters.get("mapParallelism", 2);
DataStream<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer()).setParallelism(parallelism);

ParameterToolはシリアライズ化されるため、関数自体に渡すことができます:

ParameterTool parameters = ParameterTool.fromArgs(args);
DataStream<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer(parameters));

そして、コマンドラインから値を取得するために関数内でそれを使います。

グローバルにパラメータを登録する #

ExecutionConfigにグローバルジョブパラメータとして登録されたパラメータは、JobManagerインタフェースとユーザが定義した全ての巻数から設定値としてアクセスできます。

グローバルにパラメータを登録する:

ParameterTool parameters = ParameterTool.fromArgs(args);

// set up the execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(parameters);

リッチなユーザ関数からそれらにアクセスする:

public static final class Tokenizer extends RichFlatMapFunction<String, Tuple2<String, Integer>> {

    @Override
    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
	ParameterTool parameters = (ParameterTool)
	    getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
	parameters.getRequired("input");
	// .. do more ..

Back to top

inserted by FC2 system