Java Lambda Expressions
This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.

Java ラムダ式 #

Java 8 は、より高速でより明確なコーディングのために設計されたいくつかの新しい言語機能を導入しました。最も重要な機能であるいわゆる"Lambda Expressions"により、関数型プログラミングへの扉が開かれました。ラムダ式を使うと、追加の(匿名)クラスを宣言することなく、関数を簡単な方法で実装して渡すことができます。

FlinkはJava APIの全てのオペレータに対してラムダ式の使用をサポートしますが、ラムダ式でJavaジェネリックを使う場合は常に型情報を明示的に宣言する必要があります。

このドキュメントはラムダ式の使用方法を示し、現在の制限について説明します。Flink APIの概要については、DataSteam APIの概要を参照してください。

例と制限 #

以下の例は、ラムダ式を使って入力を2乗する単純なインラインmap()関数を実装する方法を示しています。 入力imap()関数の出力パラメータはJavaコンパイラによって推測されるため、宣言する必要はありません。

env.fromElements(1, 2, 3)
// returns the squared i
.map(i -> i*i)
.print();

OUTはジェネリックではなくIntegerであるため、FlinkはメソッドシグネチャOUT map(IN value)の実装から結果の型情報を自動的に抽出できます。

残念ながら、void flatMap(IN value, Collector<OUT> out)というシグネチャを持つ flatMap()などの関数は、Javaコンパイラによってvoid flatMap(IN value, Collector out)にコンパイルされます。これにより、Flinkは出力タイプの型情報を自動的に推測できなくなります。

Flinkはほとんどの場合、以下のような例外を投げます:

org.apache.flink.api.common.functions.InvalidTypesException: The generic type parameters of 'Collector' are missing.
    Javaジェネリックが関係する場合、多くの場合においてラムダ表現は自動型抽出に十分な情報を提供しません。
    簡単な回避策は、代わりに 'org.apache.flink.api.common.functions.FlatMapFunction' インタフェースを実装する(匿名の)クラスを使うことです。
    それ以外の場合は、型情報を使って型を明示的に指定する必要があります。

この場合、型情報は明示的に指定する必要があります。そうでない場合は、出力は非効率的なシリアライズ化に繋がる型Objectとして扱われます。

DataStream<Integer> input = env.fromElements(1, 2, 3);

// collector type must be declared
input.flatMap((Integer number, Collector<String> out) -> {
    StringBuilder builder = new StringBuilder();
    for(int i = 0; i < number; i++) {
        builder.append("a");
        out.collect(builder.toString());
    }
})
// provide type information explicitly
.returns(Types.STRING)
// prints "a", "a", "aa", "a", "aa", "aaa"
.print();

Similar problems occur when using a map() function with a generic return type. A method signature Tuple2<Integer, Integer> map(Integer value) is erasured to Tuple2 map(Integer value) in the example below.

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;

env.fromElements(1, 2, 3)
    .map(i -> Tuple2.of(i, i))    // no information about fields of Tuple2
    .print();

一般的に、これらの問題は複数の方法で解決することができます:

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;

// use the explicit ".returns(...)"
env.fromElements(1, 2, 3)
    .map(i -> Tuple2.of(i, i))
    .returns(Types.TUPLE(Types.INT, Types.INT))
    .print();

// use a class instead
env.fromElements(1, 2, 3)
    .map(new MyTuple2Mapper())
    .print();

public static class MyTuple2Mapper extends MapFunction<Integer, Tuple2<Integer, Integer>> {
    @Override
    public Tuple2<Integer, Integer> map(Integer i) {
        return Tuple2.of(i, i);
    }
}

// use an anonymous class instead
env.fromElements(1, 2, 3)
    .map(new MapFunction<Integer, Tuple2<Integer, Integer>> {
        @Override
        public Tuple2<Integer, Integer> map(Integer i) {
            return Tuple2.of(i, i);
        }
    })
    .print();

// or in this example use a tuple subclass instead
env.fromElements(1, 2, 3)
    .map(i -> new DoubleTuple(i, i))
    .print();

public static class DoubleTuple extends Tuple2<Integer, Integer> {
    public DoubleTuple(int f0, int f1) {
        this.f0 = f0;
        this.f1 = f1;
    }
}

Back to top

inserted by FC2 system