This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.
アプリケーションパラメータの処理 #
アプリケーションパラメータの処理 #
ほとんど全てのFlinkアプリケーション、バッチおよびストリーミングの両方は外部設定パラメータに依存します。 それらは (パスあるいはアドレスのような)入力および出力ソース、システムパラメータ(並行度、ランタイムの設定)、およびアプリケーション固有のパラメータ(一般的にユーザ関数内で使われます)を指定するために使われます。
Flink はこれらの問題を解決するための基本的なツールを提供するためにParameterTool
と呼ばれる簡単なユーティリティを提供します。
ここで説明されるParameterTool
を使う必要はないことに注意してください。Commons CLIやargparse4jのような他のフレームワークもFlinkでもうまく動作します。
ParameterTool
への設定値の取得
#
ParameterTool
は設定を読み込むための事前定義された静的なメソッドのセットを提供します。このツールは内部的にMap<String, String>
を想定しているため、独自の設定スタイルと統合するのは非常に簡単です。
.properties
ファイルから
#
以下のメソッドはPropertiesファイルを読み取り、key/valueペアを提供します:
String propertiesFilePath = "/home/sam/flink/myjob.properties";
ParameterTool parameters = ParameterTool.fromPropertiesFile(propertiesFilePath);
File propertiesFile = new File(propertiesFilePath);
ParameterTool parameters = ParameterTool.fromPropertiesFile(propertiesFile);
InputStream propertiesFileInputStream = new FileInputStream(file);
ParameterTool parameters = ParameterTool.fromPropertiesFile(propertiesFileInputStream);
コマンドライン引数から #
これにより、コマンドラインから--input hdfs:///mydata --elements 42
のような引数を取得できます。
public static void main(String[] args) {
ParameterTool parameters = ParameterTool.fromArgs(args);
// .. regular code ..
システムプロパティから #
JVMを起動する時に、システムプロパティをJVMに渡すことができます: -Dinput=hdfs:///mydata
。次のシステムプロパティからParameterTool
を初期化することもできます:
ParameterTool parameters = ParameterTool.fromSystemProperties();
Flinkプログラム内のパラメータの使用 #
これで、パラメータをどこかから取得できたので(上記を参照)、様々な方法でそれらを使えます。
ParameterTool
から直接
ParameterTool
自体に、値にアクセスするためのメソッドがあります。
ParameterTool parameters = // ...
parameters.getRequired("input");
parameters.get("output", "myDefaultValue");
parameters.getLong("expectedCount", -1L);
parameters.getNumberOfParameters();
// .. さらに多くのメソッドが利用可能です。
アプリケーションを送信するクライアントのmain()
メソッドで、これらのメソッドのメソッドの返り値を直接使えます。
例えば、このようにオペレータの並行度を設定できるでしょう:
ParameterTool parameters = ParameterTool.fromArgs(args);
int parallelism = parameters.get("mapParallelism", 2);
DataStream<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer()).setParallelism(parallelism);
ParameterTool
はシリアライズ化されるため、関数自体に渡すことができます:
ParameterTool parameters = ParameterTool.fromArgs(args);
DataStream<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer(parameters));
そして、コマンドラインから値を取得するために関数内でそれを使います。
グローバルにパラメータを登録する #
ExecutionConfig
にグローバルジョブパラメータとして登録されたパラメータは、JobManagerインタフェースとユーザが定義した全ての巻数から設定値としてアクセスできます。
グローバルにパラメータを登録する:
ParameterTool parameters = ParameterTool.fromArgs(args);
// set up the execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(parameters);
リッチなユーザ関数からそれらにアクセスする:
public static final class Tokenizer extends RichFlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
ParameterTool parameters = (ParameterTool)
getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
parameters.getRequired("input");
// .. do more ..