重要: Scalaに依存するMaven アーティファクトはScalaのメジャーバージョンが後ろに付きます。例えば、"2.10" あるいは "2.11"。プロジェクトwiki上のマイグレーションガイドに相談してください。

Type 抽出とシリアライズ化

Flink handles types in a unique way, containing its own type descriptors, generic type extraction, and type serialization framework. このドキュメントはその概念とそれらの背後にある根本的理由を説明します。

There are fundamental differences in the way that the Scala API and the Java API handle type information, so most of the issues described here relate only to one of the to APIs.

Flinkはどの型が入力されたかできるだけの情報を知ろうとし、可能な限りユーザ関数を残そうとします。This stands in contrast to the approach to just assuming nothing and letting the programming language and serialization framework handle all types dynamically.

  • To allow using POJOs and grouping/joining them by referring to field names, Flink needs the type information to make checks (for typos and type compatibility) before the job is executed.

  • The more we know, the better serialization and data layout schemes the compiler/optimizer can develop. これはFlinkでのメモリの利用パラダイムにとってとても重要です(ヒープ 内/外 のシリアライズ化されたデータで動作し、シリアライズ化はとても手軽です)。

  • For the upcoming logical programs (see roadmap draft) we need this to know the “schema” of functions.

  • Finally, it also spares users having to worry about serialization frameworks and having to register types at those frameworks.

クラス TypeInformation は全ての型のデスクリプタのための基本クラスです。それは型のいくつかの基本的なプロパティを明らかにし、シリアライザと型のための特殊な比較器を生成することができます。(Flinkでの比較器は順番の定義以上の事をすることに注意してください - それらは基本的にキーを扱うためのユーティリティです)

内部的には、Flinkは以下の型の区別を行います:

  • 基本型: 全てのJavaのプリミティブ型とボックス化の形式、加えて void, String, Date, BigDecimalBigInteger

  • プリミティブ配列とオブジェクトの配列

  • 複合型

    • Flink Java タプル (Flink Java APIの一部)

    • Scala case classes (Scala のタプルを含みます)

    • POJOs: 特定のbean的なパターンに従うクラス

  • Scala 補助型 (Option, Either, Lists, Maps, …)

  • ジェネリック型: これらはFlink自身ではなくKryoによってシリアライズ化されるでしょう。

POJOs は特に興味深いです。それらは複雑な型の生成とキーの定義内でのフィールド名の使用をサポートします: dataSet.join(another).where("name").equalTo("personName"). それらはランタイムにも透過的で、Flinkによってとても効率的に扱うことができます。

POJOタイプのためのルール

以下の条件が満たされる場合は、FlinkはPOJOタイプとしてデータの型を認識します(そして"名前での"フィールド参照を許可します:

  • クラスはpublicでスタンドアローン(非static内部クラスがありません)
  • クラスはpublicな引数を持たないコンストラクタを持ちます
  • クラス(そしてすべてのスーパークラス)内の全てのフィールドはpuclic、あるいはgetterとsetterのためのJava beansの名前変換に従うpublicなgetterとsetterを持ちます。

Scala APIでの型情報

Scala type manifestsclass tagsを使ってランタイム型情報のための精巧な概念を持ちます。一般的に型とメソッドはジェネリック パラメータの型へのアクセスを持ちます - したがって ScalaプログラムはJavaプログラムがするような型の消去による影響を受けません。

さらに、ScalaはScalaマクロを使ってScalaコンパイラ内で独自のコードを実行することができます - FlinkのScala APIに対して書かれたScalaプログラムをコンパイルするときはいつでも何らかのFlinkのコードが実行されることを意味します。

パラメータタイプを見るためとコンパイルの間にすべてのユーザ関数の型を返すためにマクロを使います - 全ての型の情報が完全に利用可能なのはまさにこの時です。マクロの中で、関数の返り値の型(およびパラメータ型)のためにTypeInformationを生成し、それをオペレーションの一部にします。

No Implicit Value for Evidence Parameter Error

TypeInformation が生成されなかった場合、プログラムは "could not find implicit value for evidence parameter of type TypeInformation"から始まるエラーでコンパイルに失敗します。

TypeInformation を生成するコードがインポートされなかった場合によくある理由です。flink.api.scala パッケージ全体をインポートするようにしてください。

import org.apache.flink.api.scala._

他の一般的な原因はジェネリックメソッドです。これは以下の省で説明されるように修正することができます。

ジェネリックメソッド

以下の場合を考えます:

def selectFirst[T](input: DataSet[(T, _)]) : DataSet[T] = {
  input.map { v => v._1 }
}

val data : DataSet[(String, Long) = ...

val result = selectFirst(data)

For such generic methods, the data types of the function parameters and return type may not be the same for every call and are not known at the site where the method is defined. 上のコードは十分に明確なエビデンスが利用可能では無いために、エラーになるでしょう。

そのよな場合、型情報は発動した場所で生成され、メソッドに渡されなければなりません。Scala はそのために implicit parameters を提供します。

以下のコードはScalaに T のための型情報を関数に運ぶように伝えます。そして型情報はメソッドが定義された場所では無く、メソッドが発動された場所で生成されるでしょう。

def selectFirst[T : TypeInformation](input: DataSet[(T, _)]) : DataSet[T] = {
  input.map { v => v._1 }
}

Java APIでの型情報

Java は一般的にジェネリック型情報を削除します。Only for subclasses of generic classes, the subclass stores the type to which the generic type variables bind.

Flinkは関数のジェネリックパラメータの型を見つけ出すためのユーザ関数を実装する(匿名)クラス上でリフレクションを使います。This logic also contains some simple type inference for cases where the return types of functions are dependent on input types, such as in the generic utility method below:

public class AppendOne<T> extends MapFunction<T, Tuple2<T, Long>> {

    public Tuple2<T, Long> map(T value) {
        return new Tuple2<T, Long>(value, 1L);
    }
}

Not in all cases can Flink figure out the data types of functions reliably in Java. ジェネリック ラムダ(Javaコミュニティと一緒にこれを解決しようとしています。以下を見て下さい)と推測できないジェネリック型変数にまだ問題があります。

Java APIでの型ヒント

Flinkが削除されたジェネリック型情報を再構築できない場合の手助けとして、Java APIはバージョン 0.9から型ヒントと呼ばれるものを提供します。型ヒントはシステムに関数で生成されたデータセットの型を知らせます。以下が例です:

DataSet<SomeType> result = dataSet
    .map(new MyGenericNonInferrableFunction<Long, SomeType>())
        .returns(SomeType.class);

The returns statement specifies the produced type, in this case via a class. ヒントは以下の型定義をサポートします

  • クラス。パラメータ化されていない型(非ジェネリック)のため
  • returns("Tuple2<Integer, my.SomeType>")の形の文字列。これはパースされ、TypeInformationに変換されます。
  • 直接 TypeInformation

Java 8 lambdasのための型抽出

Type extraction for Java 8 lambdas works differently than for non-lambdas, because lambdas are not associated with an implementing class that extends the function interface.

現在のところ、Flinkはどのメソッドがlambdaを実装しているかを見つけだそうとし、パラメータの型と返り値の型を決定するためにJavaのジェネリックシグネチャーを使います。However, these signatures are not generated for lambdas by all compilers (as of writing this document only reliably by the Eclipse JDT compiler 4.5 from Milestone 2 onwards)

Java Lambdaのための型情報の改善

One of the Flink committers (Timo Walther) has actually become active in the Eclipse JDT compiler community and in the OpenJDK community and submitted patches to the compiler to improve availability of type information available for Java 8 lambdas.

Eclipse JDT コンパイラはバージョン 4.5 M4の時点でこのサポートを追加しました。OpenJDKコンパイラでの機能についての議論は未解決のままです。

POJO型のシリアライズ化

PojoTypeInformation はPOJOの中の全てのフィールドのためのシリアライザを生成しています。int, long, String などのような標準的な型はFlinkと同梱されているシリアライザによって処理されます。他の全ての型については、Kryoに頼っています。

Kryo が型を処理できない場合は、PojoTypeInfoにAvroを使ってPOJOをシリアライズ化するように頼むことができます。そうするには、以下のように呼び出す必要があります

final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().enableForceAvro();

FlinkはAvroシリアライザを使ってAvroに生成されたPOJOを自動的にシリアライズ化することに注意してください。

Kryoシリアライザによって完全に POJO 型を扱いたい場合は、以下のように設定してください

final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().enableForceKryo();

KryoがPOJOをシリアライズ化できない場合、以下のようにKryoに独自のシリアライザを追加することができます

env.getConfig().addDefaultKryoSerializer(Class type, Class extends Serializer> serializerClass)

これらのメソッドの異なる変異体が利用可能です。

TOP
inserted by FC2 system