重要: Scalaに依存するMaven アーティファクトはScalaのメジャーバージョンが後ろに付きます。例えば、"2.10" あるいは "2.11"。プロジェクトwiki上のマイグレーションガイドに相談してください。

最善の実践

このページは、Flinkプログラマが頻繁に遭遇する問題のどうやって解決するかのベストプラクティスのコレクションを含んでいます。

ほとんど全てのFlinkアプリケーション、バッチおよびストリーミングの両方は外部設定パラメータに依存します。For example for specifying input and output sources (like paths or addresses), also system parameters (parallelism, runtime configuration) and application specific parameters (often used within the user functions).

バージョン 0.9から、これらの問題を解決するために少なくともいくつかの基本的なツールを提供するために、ParameterToolと呼ばれる単純なユーティリティを提供しています。

ここで説明されるParameterTool を使う必要がないことに注意してください。Commons CLI, argparse4j などのような他のフレームワークはFlinkともよく動作します。

設定値をParameterToolに渡す

ParameterToolは設定を読み込むために事前に定義された静的なメソッドを提供します。ツールは内部的にMap<String, String>を期待します。つまり、自身の設定形式と統合するのはとても簡単です。

.properties ファイルから

以下のメソッドはPropertiesファイルを読み込み、キー/値のペアを提供するでしょう。

String propertiesFile = "/home/sam/flink/myjob.properties";
ParameterTool parameter = ParameterTool.fromPropertiesFile(propertiesFile);

コマンドライン引数から

これはコマンドラインから--input hdfs:///mydata --elements 42のような引数を取得することができます。

public static void main(String[] args) {
	ParameterTool parameter = ParameterTool.fromArgs(args);
	// .. 通常のコード ..

システムプロパティから

JVMを開始する時に、システムプロパティを渡すことができます: -Dinput=hdfs:///mydata. これらのシステムプロパティからParameterTool を初期化することもできます:

ParameterTool parameter = ParameterTool.fromSystemProperties();

Now that we’ve got the parameters from somewhere (see above) we can use them in various ways.

Directly from the ParameterTool

ParameterTool それ自身が値にアクセスするメソッドを持ちます。

ParameterTool parameters = // ...
parameter.getRequired("input");
parameter.get("output", "myDefaultValue");
parameter.getLong("expectedCount", -1L);
parameter.getNumberOfParameters()
// .. もっと多くの利用可能なメソッドがあります。

main()メソッドの中で、それらのメソッドの返り値を直接使うことができます(=アプリケーションをサブミットするクライアント)。例えば、このようにオペレータの並行度を設定できるでしょう:

ParameterTool parameters = ParameterTool.fromArgs(args);
int parallelism = parameters.get("mapParallelism", 2);
DataSet<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer()).setParallelism(parallelism);

ParameterTool はシリアライズ可能なため、それを関数自身に渡すことができます:

ParameterTool parameters = ParameterTool.fromArgs(args);
DataSet<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer(parameters));

そして、コマンドラインから値を取得するために関数内でそれらを使います。

Configurationオブジェクトとして1つの関数にそれを渡します

以下の例はパラメータを設定オブジェクトとしてユーザ定義の関数に渡す方法を示します。

ParameterTool parameters = ParameterTool.fromArgs(args);
DataSet<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer()).withParameters(parameters.getConfiguration())

これで、Tokenizerの中でオブジェクトはopen(Configuration conf)メソッドの中でアクセス可能です:

public static final class Tokenizer extends RichFlatMapFunction<String, Tuple2<String, Integer>> {
	@Override
	public void open(Configuration parameters) throws Exception {
		parameters.getInteger("myInt", -1);
		// .. do

グローバルにパラメータを登録する

ExecutionConfigグローバル ジョブ パラメータとして登録されたパラメータによって、JobManagerのwebインタフェースとユーザによって定義された全ての関数から設定値にアクセスすることができます。

グローバルにパラメータを登録する

ParameterTool parameters = ParameterTool.fromArgs(args);

// set up the execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(parameters);

Access them in any rich user function:

public static final class Tokenizer extends RichFlatMapFunction<String, Tuple2<String, Integer>> {

	@Override
	public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
		ParameterTool parameters = (ParameterTool) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
		parameters.getRequired("input");
		// .. do more ..

大きなTupleXタイプに名前を付ける

多くのフィールドを持つデータタイプのために、TupleX の代わりにPOJOs (Plain old Java objects)を使うことをお勧めします。また、POJOsは大きなTuple-タイプに名前を付けるために使うこともできます。

以下の使い方の代わりに:

Tuple11<String, String, ..., String> var = new ...;

大きなTupleタイプから拡張する独自のタイプを生成するほうがとても簡単です。

CustomType var = new ...;

public static class CustomType extends Tuple11<String, String, ..., String> {
    // constructor matching super
}

Flinkのタイプのシリアライザによってシリアライズ化されない独自のタイプをFinkプログラム内で使う場合は、Flinkは一般的なKryoシリアライザの使用にフォールバックします。独自のシリアライザ、あるいはGoogle ProtobufあるいはKryoを使ったApache Triftのようなシリアライズ化システムを登録するかも知れません。そうするには、単にタイプクラスとFlinkプログラムのExecutionConfig内のシリアライザを登録します。

final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

// register the class of the serializer as serializer for a type
env.getConfig().registerTypeWithKryoSerializer(MyCustomType.class, MyCustomSerializer.class);

// register an instance as serializer for a type
MySerializer mySerializer = new MySerializer();
env.getConfig().registerTypeWithKryoSerializer(MyCustomType.class, mySerializer);

独自のシリアライザはKryoのシリアライザクラスを継承しなければならないことに注意してください。Google Protobuf あるいは Apache Thriftの場合は、これは以下のようにしてすでに行われています:

final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

// register the Google Protobuf serializer with Kryo
env.getConfig().registerTypeWithKryoSerializer(MyCustomType.class, ProtobufSerializer.class);

// register the serializer included with Apache Thrift as the standard serializer
// TBaseSerializer states it should be initialized as a default Kryo serializer
env.getConfig().addDefaultKryoSerializer(MyCustomType.class, TBaseSerializer.class);

上の例を動作するには、必要な依存をMavenプロジェクトファイル (pom.xml)に含める必要があります。依存のセクションの中で、Apache Thriftのために以下を追加します:

<dependency>
	<groupId>com.twitter</groupId>
	<artifactId>chill-thrift</artifactId>
	<version>0.5.2</version>
</dependency>
<!-- libthrift is required by chill-thrift -->
<dependency>
	<groupId>org.apache.thrift</groupId>
	<artifactId>libthrift</artifactId>
	<version>0.6.1</version>
	<exclusions>
		<exclusion>
			<groupId>javax.servlet</groupId>
			<artifactId>servlet-api</artifactId>
		</exclusion>
		<exclusion>
			<groupId>org.apache.httpcomponents</groupId>
			<artifactId>httpclient</artifactId>
		</exclusion>
	</exclusions>
</dependency>

Google Protobuf のためには、以下のMaven依存を必要とします:

<dependency>
	<groupId>com.twitter</groupId>
	<artifactId>chill-protobuf</artifactId>
	<version>0.5.2</version>
</dependency>
<!-- We need protobuf for chill-protobuf -->
<dependency>
	<groupId>com.google.protobuf</groupId>
	<artifactId>protobuf-java</artifactId>
	<version>2.5.0</version>
</dependency>

必要に応じて両方のライブラリのバージョンを調節してください。

Log4jの代わりにLogbackを使う

注意: このチュートリアルは Flink 0.10以降に適用可能です。

Apache Flink はコード内でのログの抽象化として slf4j を使っています。ユーザは関数内で同様にsfl4jを使うことを勧められます。

Sfl4j は実行時にlog4j あるいは Logbackのような異なるログの実装を使うことができるコンパイル時のログインタフェースです。

Flink はデフォルトでLog4jに依存しています。このページではLogbackと一緒にFlinkを使う方法を説明します。Users reported that they were also able to set up centralized logging with Graylog using this tutorial.

コード内でロガーのインスタンスを取得するには、以下のコードを使います:

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MyClass implements MapFunction {
	private static final Logger LOG = LoggerFactory.getLogger(MyClass.class);
	// ...

In all cases were classes are executed with a classpath created by a dependency manager such as Maven, Flink will pull log4j into the classpath.

従って、Flinkの依存からlog4jを除外する必要があるでしょう。以下の説明は、Flink クリックスタートで生成されたMavenプロジェクトを仮定します。

以下のようにプロジェクトのpom.xmlを変更します:

<dependencies>
	<!-- Add the two required logback dependencies -->
	<dependency>
		<groupId>ch.qos.logback</groupId>
		<artifactId>logback-core</artifactId>
		<version>1.1.3</version>
	</dependency>
	<dependency>
		<groupId>ch.qos.logback</groupId>
		<artifactId>logback-classic</artifactId>
		<version>1.1.3</version>
	</dependency>

	<!-- Add the log4j -> sfl4j (-> logback) bridge into the classpath
	 Hadoop is logging to log4j! -->
	<dependency>
		<groupId>org.slf4j</groupId>
		<artifactId>log4j-over-slf4j</artifactId>
		<version>1.7.7</version>
	</dependency>

	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-java</artifactId>
		<version>1.1-SNAPSHOT</version>
		<exclusions>
			<exclusion>
				<groupId>log4j</groupId>
				<artifactId>*</artifactId>
			</exclusion>
			<exclusion>
				<groupId>org.slf4j</groupId>
				<artifactId>slf4j-log4j12</artifactId>
			</exclusion>
		</exclusions>
	</dependency>
	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-streaming-java_2.10</artifactId>
		<version>1.1-SNAPSHOT</version>
		<exclusions>
			<exclusion>
				<groupId>log4j</groupId>
				<artifactId>*</artifactId>
			</exclusion>
			<exclusion>
				<groupId>org.slf4j</groupId>
				<artifactId>slf4j-log4j12</artifactId>
			</exclusion>
		</exclusions>
	</dependency>
	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-clients_2.10</artifactId>
		<version>1.1-SNAPSHOT</version>
		<exclusions>
			<exclusion>
				<groupId>log4j</groupId>
				<artifactId>*</artifactId>
			</exclusion>
			<exclusion>
				<groupId>org.slf4j</groupId>
				<artifactId>slf4j-log4j12</artifactId>
			</exclusion>
		</exclusions>
	</dependency>
</dependencies>

以下の変更は<dependencies> セクション内で行われました:

  • 全てのFlinkの依存から全てのlog4j 依存を除外する: これによりMavenがFlinkのlog4jへの推移的な依存を無視します。
  • Flinkの依存からslf4j-log4j12 アーティファクトを除外する: logbackバインドするためにslf4jを使おうとしているので、slf4jからlog4jへのバインドを削除しなければなりません。
  • Logback依存を追加する: logback-corelogback-classic
  • log4j-over-slf4jのための依存を追加する。log4j-over-slf4j はSlf4jインタフェースを使うためにLog4j APIを直接使用するレガシーなアプリケーションを許可するツールです。Flinkはログのために直接Log4jを使っているHaddopに依存します。従って、Log4jからログをLogbackに切り替えるSlf4jへの全てのロガーの呼び出しをリダイレクトする必要があります。

pomファイルに追加している全ての新しいFlinkの依存へ除外を手動で追加しなければならないことに注意してください。

他の依存(非Flink)がlog4jバインドを引き込んでいないかを調べる必要もあるかも知れません。mvn dependency:treeを使ってプロジェクトの依存を解析することができます。

このチュートリアルはYarn上、あるいはスタンドアローンクラスタとしてFlnkを実行する場合に適用可能です。

Flinkと一緒にLog4jの代わりにLogbackを使うためには、lib/ディレクトリからlog4j-1.2.xx.jarsfl4j-log4j12-xxx.jar を削除する必要があります。

次に、以下のjarファイルをlib/フォルダに配置する必要があります:

  • logback-classic.jar
  • logback-core.jar
  • log4j-over-slf4j.jar: Hadoop(これはLog4jを使っています)からのログの呼び出しをSlf4jへリダイレクトするために、このブリッジがクラスパス内にある必要があります。

jobあたりのYARNクラスタを使う場合は、lib/ディレクトリを明示的に設定しなければならないことに注意してください。

独自のログを使うYARN上にFlinkをサブミットするためのコマンド: ./bin/flink run -yt $FLINK_HOME/lib <... remaining arguments ...>

TOP
inserted by FC2 system