Java 8 は高速化およびコードの綺麗さを目的とする幾つかの新しい言語機能を導入します。“ラムダ表現”と言われる最も重要な機能を使って、Java 8は関数型プログラミングへの扉を開けます。ラムダ表現により追加の(匿名の)クラス宣言をする必要無しに素直な方法で関数を実装および渡すことができます。
Flinkの最新のバージョンはJava APIの全てのオペレータについてラムダ表現の使用をサポートします。このドキュメントはラムダ表現の使い方と現在の制限について説明します。Flink APIの一般的な導入については、プログラミング ガイドを参照してください。
以下の例はラムダ表現を使って入力を2乗する単純なインライン map()
関数を実装する方法を説明します。入力 i
とmap()
関数の出力パラメータの型は、Java 8のコンパイラが推測するため、宣言する必要がありません。
env.fromElements(1, 2, 3)
// returns the squared i
.map(i -> i*i)
.print();
次の2つの例は出力のためにCollector
を使う関数の異なる実装を示します。flatMap()
のような関数は型セーフのためにCollector
について出力型(この場合String
)が定義される必要があります。もしCollector
の型が周りのコンテキストから推測でいない場合、手動でラムダ表現のパラメータのリスト内で宣言される必要があります。そうでなければ、出力は望ましくない挙動に繋がるかもしれない型Object
として扱われるでしょう。
DataSet<Integer> input = env.fromElements(1, 2, 3);
// collector type must be declared
input.flatMap((Integer number, Collector<String> out) -> {
StringBuilder builder = new StringBuilder();
for(int i = 0; i < number; i++) {
builder.append("a");
out.collect(builder.toString());
}
})
// returns (on separate lines) "a", "a", "aa", "a", "aa", "aaa"
.print();
DataSet<Integer> input = env.fromElements(1, 2, 3);
// collector type must not be declared, it is inferred from the type of the dataset
DataSet<String> manyALetters = input.flatMap((number, out) -> {
StringBuilder builder = new StringBuilder();
for(int i = 0; i < number; i++) {
builder.append("a");
out.collect(builder.toString());
}
});
// returns (on separate lines) "a", "a", "aa", "a", "aa", "aaa"
manyALetters.print();
The following code demonstrates a word count which makes extensive use of Lambda Expressions.
DataSet<String> input = env.fromElements("Please count", "the words", "but not this");
// filter out strings that contain "not"
input.filter(line -> !line.contains("not"))
// split each line by space
.map(line -> line.split(" "))
// emit a pair <word,1> for each array element
.flatMap((String[] wordArray, Collector<Tuple2<String, Integer>> out)
-> Arrays.stream(wordArray).forEach(t -> out.collect(new Tuple2<>(t, 1)))
)
// group and sum up
.groupBy(0).sum(1)
// print
.print();
現在のところ、Eclipse Luna 4.4.2 (以上)に含まれるEclipse JDTコンパイラでコンパイルされた場合のみ、Flinkはラムダ表現を含むジョブを完全にサポートします。
Eclipse JDT コンパイラだけがラムダ表現のタイプセーフ機能全体を使うために必要なジェネリック型情報を維持します。OpenJDKおよびOracle JDKのjavac
のような他のコンパイラはラムダ表現に関係する全てのジェネリックパラメータを捨てます。このことは、ラムダ関数の入力あるいは出力パラメータとして宣言されるTuple2<String, Integer>
あるいは Collector<String>
のような型は、コンパイルされた.class
ファイルの中でのTuple2
あるいは Collector
に切り詰められるだろうことを意味します。
ラムダ表現を含むFlinkジョブをJDTコンパイラを使ってコンパイルする方法は次の章でカバーされるでしょう。
しかし、関数がCollector
あるいは Iterable
を持たずかつ 関数が Integer
, Long
, String
, MyOwnClass
(ジェネリックでは無い型!)のようなパラメータ化されない型を処理する場合のみ、ラムダ表現を持つmap()
あるいは filter()
のような関数をEclipse JDTコンパイラ以外の Java 8 コンパイラで実装することは可能です。
Eclipse IDEを使っている場合は、幾つかの設定ステップの後で、IDE内で問題無くFlinkのコードを実行およびデバッグすることができます。デフォルトでは Eclipse IDE はJavaのソースをEclipse JDTコンパイラを使ってコンパイルします。次の章は Eclipse IDE を設定する方法を説明します。
IntelliJ IDEA のような異なるIDEを使っているか、ジョブをクラスタ上で実行するためにMavenを使ってJarファイルをパッケージ化したい場合は、プロジェクトの pom.xml
ファイルを修正し、Mavenを使ってプログラムをビルドする必要があります。クイック スタート は新しいプロジェクトあるいはリファレンスとして使うことができる事前設定済みのMavenプロジェクトを含みます。ラムダ表現と一緒に Java 8 を使いたい場合は、生成されたクイックスタートの pom.xml
の言及されている行をコメントから外してください。
もう一つの方法として、以下の行をMavenの pom.xml
ファイルに挿入することができます。Maven はコンパイルのために Eclipse JDT コンパイラを使うでしょう。
<!-- put these lines under "project/build/pluginManagement/plugins" of your pom.xml -->
<plugin>
<!-- Use compiler plugin with tycho as the adapter to the JDT compiler. -->
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
<compilerId>jdt</compilerId>
</configuration>
<dependencies>
<!-- This dependency provides the implementation of compiler "jdt": -->
<dependency>
<groupId>org.eclipse.tycho</groupId>
<artifactId>tycho-compiler-jdt</artifactId>
<version>0.21.0</version>
</dependency>
</dependencies>
</plugin>
開発のために Eclipse を使っている場合は、m2e プラグインが上で挿入した行について文句を言い、無効なものとして pom.xml
をマークするかもしれません。そうであれば、以下の行を pom.xml
に挿入してください。
<!-- put these lines under "project/build/pluginManagement/plugins/plugin[groupId="org.eclipse.m2e", artifactId="lifecycle-mapping"]/configuration/lifecycleMappingMetadata/pluginExecutions" of your pom.xml -->
<pluginExecution>
<pluginExecutionFilter>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<versionRange>[3.1,)</versionRange>
<goals>
<goal>testCompile</goal>
<goal>compile</goal>
</goals>
</pluginExecutionFilter>
<action>
<ignore></ignore>
</action>
</pluginExecution>
まず最初に、Eclipse IDEの現在のバージョン(4.4.2以降)を実行しているようにしてください。また、Eclipse IDE内で Java 8 Runtime Environment (JRE) をインストールしているようにしてください (Window
-> Preferences
-> Java
-> Installed JREs
)。
Eclipse プロジェクトを 作成/インポート する。
Mavenを使っている場合は、maven-compiler-plugin
のための pom.xml
内でJavaバージョンを変更する必要もあります。そうでなければ、プロジェクトの JRE System Library
セクションを右クリックし、ラムダ表現をサポートする Java 8 JRE (以上) に切り替えるためにProperties
を開いてください。
Eclipse JDT コンパイラは.class
ファイル内に型情報を格納するために特別なコンパイラ フラグを必要とします。{project directoy}/.settings/org.eclipse.jdt.core.prefs
の JDT設定ファイルを好みのテキストエディタで開き、以下の行を追加します:
org.eclipse.jdt.core.compiler.codegen.lambda.genericSignature=generate
まだやっていない場合は、以下のプロパティのJavaのバージョンも 1.8
(以降)に変更します:
org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.8
org.eclipse.jdt.core.compiler.compliance=1.8
org.eclipse.jdt.core.compiler.source=1.8
ファイルを保存した後で、Eclipse IDE内で完全なプロジェクトの再読み込みを行います。
Mavenを使っている場合は、Eclipse プロジェクトを右クリックし Maven
-> Update Project...
を選択します。
以下のFlinkプログラムが例外無しで実行する場合、全てを正しく設定できています:
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.fromElements(1, 2, 3).map((in) -> new Tuple1<String>(" " + in)).print();
env.execute();