このページは、Flinkプログラマが頻繁に遭遇する問題のどうやって解決するかのベストプラクティスのコレクションを含んでいます。
ほとんど全てのFlinkアプリケーション、バッチおよびストリーミングの両方は外部設定パラメータに依存します。For example for specifying input and output sources (like paths or addresses), also system parameters (parallelism, runtime configuration) and application specific parameters (often used within the user functions).
バージョン 0.9から、これらの問題を解決するために少なくともいくつかの基本的なツールを提供するために、ParameterTool
と呼ばれる単純なユーティリティを提供しています。
ここで説明されるParameterTool
を使う必要がないことに注意してください。Commons CLI, argparse4j などのような他のフレームワークはFlinkともよく動作します。
ParameterTool
に渡すParameterTool
は設定を読み込むために事前に定義された静的なメソッドを提供します。ツールは内部的にMap<String, String>
を期待します。つまり、自身の設定形式と統合するのはとても簡単です。
.properties
ファイルから以下のメソッドはPropertiesファイルを読み込み、キー/値のペアを提供するでしょう。
String propertiesFile = "/home/sam/flink/myjob.properties";
ParameterTool parameter = ParameterTool.fromPropertiesFile(propertiesFile);
これはコマンドラインから--input hdfs:///mydata --elements 42
のような引数を取得することができます。
public static void main(String[] args) {
ParameterTool parameter = ParameterTool.fromArgs(args);
// .. 通常のコード ..
JVMを開始する時に、システムプロパティを渡すことができます: -Dinput=hdfs:///mydata
. これらのシステムプロパティからParameterTool
を初期化することもできます:
ParameterTool parameter = ParameterTool.fromSystemProperties();
これで、どこか(上を見てください)からパラメータを取得し、それらを様々な方法で使うことができます。
ParameterTool
から直接
ParameterTool
それ自身が値にアクセスするメソッドを持ちます。
ParameterTool parameters = // ...
parameter.getRequired("input");
parameter.get("output", "myDefaultValue");
parameter.getLong("expectedCount", -1L);
parameter.getNumberOfParameters()
// .. もっと多くの利用可能なメソッドがあります。
main()メソッドの中で、それらのメソッドの返り値を直接使うことができます(=アプリケーションをサブミットするクライアント)。例えば、このようにオペレータの並行度を設定できるでしょう:
ParameterTool parameters = ParameterTool.fromArgs(args);
int parallelism = parameters.get("mapParallelism", 2);
DataSet<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer()).setParallelism(parallelism);
ParameterTool
はシリアライズ可能なため、それを関数自身に渡すことができます:
ParameterTool parameters = ParameterTool.fromArgs(args);
DataSet<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer(parameters));
そして、コマンドラインから値を取得するために関数内でそれらを使います。
Configuration
オブジェクトとして1つの関数にそれを渡します以下の例はパラメータを設定
オブジェクトとしてユーザ定義の関数に渡す方法を示します。
ParameterTool parameters = ParameterTool.fromArgs(args);
DataSet<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer()).withParameters(parameters.getConfiguration())
これで、Tokenizer
の中でオブジェクトはopen(Configuration conf)
メソッドの中でアクセス可能です:
public static final class Tokenizer extends RichFlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void open(Configuration parameters) throws Exception {
parameters.getInteger("myInt", -1);
// .. do
ExecutionConfig
にグローバル ジョブ パラメータとして登録されたパラメータによって、JobManagerのwebインタフェースとユーザによって定義された全ての関数から設定値にアクセスすることができます。
グローバルにパラメータを登録する
ParameterTool parameters = ParameterTool.fromArgs(args);
// set up the execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(parameters);
リッチなユーザ関数からそれらにアクセスする:
public static final class Tokenizer extends RichFlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
ParameterTool parameters = (ParameterTool) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
parameters.getRequired("input");
// .. do more ..
多くのフィールドを持つデータタイプのために、TupleX
の代わりにPOJOs (Plain old Java objects)を使うことをお勧めします。また、POJOsは大きなTuple
-タイプに名前を付けるために使うこともできます。
例
以下の使い方の代わりに:
Tuple11<String, String, ..., String> var = new ...;
大きなTupleタイプから拡張する独自のタイプを生成するほうがとても簡単です。
CustomType var = new ...;
public static class CustomType extends Tuple11<String, String, ..., String> {
// constructor matching super
}
注意: このチュートリアルは Flink 0.10以降に適用可能です。
Apache Flink はコード内でのログの抽象化として slf4j を使っています。ユーザは関数内で同様にsfl4jを使うことを勧められます。
Sfl4j は実行時にlog4j あるいは Logbackのような異なるログの実装を使うことができるコンパイル時のログインタフェースです。
Flink はデフォルトでLog4jに依存しています。このページではLogbackと一緒にFlinkを使う方法を説明します。Users reported that they were also able to set up centralized logging with Graylog using this tutorial.
コード内でロガーのインスタンスを取得するには、以下のコードを使います:
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MyClass implements MapFunction {
private static final Logger LOG = LoggerFactory.getLogger(MyClass.class);
// ...
In all cases were classes are executed with a classpath created by a dependency manager such as Maven, Flink will pull log4j into the classpath.
従って、Flinkの依存からlog4jを除外する必要があるでしょう。以下の説明は、Flink クリックスタートで生成されたMavenプロジェクトを仮定します。
以下のようにプロジェクトのpom.xml
を変更します:
<dependencies>
<!-- Add the two required logback dependencies -->
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>1.1.3</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.1.3</version>
</dependency>
<!-- Add the log4j -> sfl4j (-> logback) bridge into the classpath
Hadoop is logging to log4j! -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>log4j-over-slf4j</artifactId>
<version>1.7.7</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.3-SNAPSHOT</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.10</artifactId>
<version>1.3-SNAPSHOT</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.10</artifactId>
<version>1.3-SNAPSHOT</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
以下の変更は<dependencies>
セクション内で行われました:
log4j
依存を除外する: これによりMavenがFlinkのlog4jへの推移的な依存を無視します。slf4j-log4j12
アーティファクトを除外する: logbackバインドするためにslf4jを使おうとしているので、slf4jからlog4jへのバインドを削除しなければなりません。logback-core
と logback-classic
log4j-over-slf4j
のための依存を追加する。log4j-over-slf4j
はSlf4jインタフェースを使うためにLog4j APIを直接使用するレガシーなアプリケーションを許可するツールです。Flinkはログのために直接Log4jを使っているHaddopに依存します。従って、Log4jからログをLogbackに切り替えるSlf4jへの全てのロガーの呼び出しをリダイレクトする必要があります。pomファイルに追加している全ての新しいFlinkの依存へ除外を手動で追加しなければならないことに注意してください。
他の依存(非Flink)がlog4jバインドを引き込んでいないかを調べる必要もあるかも知れません。mvn dependency:tree
を使ってプロジェクトの依存を解析することができます。
このチュートリアルはYarn上、あるいはスタンドアローンクラスタとしてFlnkを実行する場合に適用可能です。
Flinkと一緒にLog4jの代わりにLogbackを使うためには、lib/
ディレクトリからlog4j-1.2.xx.jar
と sfl4j-log4j12-xxx.jar
を削除する必要があります。
次に、以下のjarファイルをlib/
フォルダに配置する必要があります:
logback-classic.jar
logback-core.jar
log4j-over-slf4j.jar
: Hadoop(これはLog4jを使っています)からのログの呼び出しをSlf4jへリダイレクトするために、このブリッジがクラスパス内にある必要があります。jobあたりのYARNクラスタを使う場合は、lib/
ディレクトリを明示的に設定しなければならないことに注意してください。
独自のログを使うYARN上にFlinkをサブミットするためのコマンド: ./bin/flink run -yt $FLINK_HOME/lib <... remaining arguments ...>