データ型 & シリアライズ化

Apache Flink はデータの型およびシリアライズ化を、独自の型のディスクリプタ、generic型の抽出、そして型のシリアライズ化フレームワークを含むユニークな方法で扱います。このドキュメントはその概念とそれらの背後にある根本的理由を説明します。

Flinkは分散計算の間に交換および格納されるデータ型についての多くの情報を推論しようとします。テーブルのスキーマを推測するデータベースのようなものを考えてください。ほとんどの場合、Flinkは自身でシームレスに全ての必要な情報を推論します。型情報を持つことでFlinkは幾つかのクールなことを行うことができます:

  • (dataSet.keyBy("username")のような)フィールド名を参照することで、POJO型およびそれらの gouping / joining / aggregatingを使う。型情報によりFlinkは後で実行時に失敗するより早く(タイポおよび型の互換性について)調査することができます。

  • Flinkがデータ型をより知ることにより、シリアライズ化およびデータのレイアウトのスキーマが良くなります。これはFlinkでのメモリの利用パラダイムにとってとても重要です(これまで可能なヒープ 内/外 のシリアライズ化されたデータで動作し、シリアライズ化はとても手軽です)。

  • 結果的にユーザが多くのクラスの中でシリアライズ化フレームワークを心配し、型を登録しなければならないことを手助けします。

一般的にデータ型の情報はpre-flight フェーズで必要です - つまり、DataStream および DataSet のプログラマの呼び出しが行われる時、そしてexecute(), print(), count() あるいは collect()の呼び出しの前。

最もよくある問題

ユーザがFlinkのデータ型の処理とやり取りをしなければならない最もよくある問題:

  • subtypeの登録: 関数の署名がsupertypeのみを記述するがそれらは実行時には実際にはsubtypeを使う場合、Flinkのこれらのsubtypeに気づくパフォーマンスが大きく増えるかもしれません。そのため、それぞれのsubtypeのためにStreamExecutionEnvironment あるいは ExecutionEnvironment上で.registerType(clazz)を呼びます。

  • 独自のシリアライザの登録: Flink は自身で透過的に型を扱わない型についてKryoに戻ります。全ての型がKryoによって(従ってFlinkによって)シームレスに扱われるわけではありません。例えば、多くの Google Guava コレクション型はデフォルトでは良く動作しません。解決法は、問題を起こす型について追加のシリアライザを登録することです。StreamExecutionEnvironment または ExecutionEnvironment 上で .getConfig().addDefaultKryoSerializer(clazz, serializer)を呼びます。追加のKryoシリアライザは多くのライブラリで利用可能です。独自のシリアライザと連携する詳細については独自のシリアライザを見てください。

  • 型ヒントの追加: 全ての策略にも関わらず、Flinkがgeneric型を推測できない場合、ユーザが型ヒントを渡さなければならないことが時々あります。一般的にJava APIでのみ必要です。型ヒントの選択 はそれをもっと詳細に説明します。

  • TypeInformationの手動での生成: これはFlinkがJavaのgeneric型の消去によりデータ型を推測できない時にいくつかのAPIの呼び出しで必要かもしれません。詳細はTypeInformation または TypeSerializerの生成を見てください。

クラス TypeInformation は全ての型のデスクリプタのための基本クラスです。それは型のいくつかの基本的なプロパティを明らかにし、シリアライザと型のための特殊な比較器を生成することができます。(Flinkでの比較器は順番の定義以上の事をすることに注意してください - それらは基本的にキーを扱うためのユーティリティです)

内部的には、Flinkは以下の型の区別を行います:

  • 基本型: 全てのJavaのプリミティブ型とボックス化の形式、加えて void, String, Date, BigDecimalBigInteger

  • プリミティブ配列とオブジェクトの配列

  • 複合型

    • Flink Java タプル (Flink Java APIの一部) : 最大20フィールド、nullフィールドはサポートされません

    • Scala case classes (Scala tuplesを含む): 最大 22 フィールド、nullフィールドはサポートされません

    • 行: 任意の数のフィールドを持つtupleとnullフィールドのためのサポート

    • POJOs: 特定のbean的なパターンに従うクラス

  • 補助型 (Option, Either, Lists, Maps, …)

  • ジェネリック型: これらはFlink自身ではなくKryoによってシリアライズ化されるでしょう。

POJOs は特に興味深いです。それらは複雑な型の生成とキーの定義内でのフィールド名の使用をサポートします: dataSet.join(another).where("name").equalTo("personName"). それらはランタイムにも透過的で、Flinkによってとても効率的に扱うことができます。

POJOタイプのためのルール

以下の条件が満たされる場合は、FlinkはPOJOタイプとしてデータの型を認識します(そして"名前での"フィールド参照を許可します:

  • クラスはpublicでスタンドアローン(非static内部クラスがありません)
  • クラスはpublicな引数を持たないコンストラクタを持ちます
  • クラス(そして全てのsuperclass)内の全ての非静的、非一時的なフィールドは、getterおよびsetterのためのJava beansの命名規則に従うpublic(そして非final)あるいはpublicのgetter- および setter- のいずれかのメソッドを持ちます。

ユーザ定義のデータ型がPOJO型として認識できない場合、GenericType およびKryoを使ってシリアライズ化されたものとして処理されなければならないことに注意してください。

TypeInformation あるいは TypeSerializerの生成

型のためのTypeInformation オブジェクトを作るには、言語固有の方法を使います:

Javaは一般的にgeneric型情報を消すため、型をTypeInformation コンストラクションに渡す必要があります:

非generic型については、Classを渡すことができます:

TypeInformation<String> info = TypeInformation.of(String.class);

generic型については、TypeHintを使ってgeneric型情報を“capture” する必要があります:

TypeInformation<Tuple2<String, Double>> info = TypeInformation.of(new TypeHint<Tuple2<String, Double>>(){});

内部的には、これは実行時までTypeHintを保持するためにgeneric情報をcaptureするTypeHintの匿名子クラスを生成します。

Scalaでは、Flinkはコンパイル時に実行するmacrosを使用し、まだ利用可能なうちに全てのgeneric型情報をcaptureします。

// important: this import is needed to access the 'createTypeInformation' macro function
import org.apache.flink.streaming.api.scala._

val stringInfo: TypeInformation[String] = createTypeInformation[String]

val tupleInfo: TypeInformation[(String, Double)] = createTypeInformation[(String, Double)]

Javaでは代替として同じメソッドをまだ使うことができます。

TypeSerializerを生成するためには、単純にTypeInformation オブジェクト上でtypeInfo.createSerializer(config)を呼びます。

config パラメータは型ExecutionConfigで、プログラマの登録した独自のシリアライザについての情報を保持します。Where ever possibly, try to pass the programs proper ExecutionConfig. You can usually obtain it from DataStream or DataSet via calling getExecutionConfig(). (MapFunctionのような)関数の中で、関数Rich Function を生成しgetRuntimeContext().getExecutionConfig()を呼ぶことでそれを取得することができます。



Scala APIでの型情報

Scala type manifestsclass tagsを使ってランタイム型情報のための精巧な概念を持ちます。一般的に型とメソッドはジェネリック パラメータの型へのアクセスを持ちます - したがって ScalaプログラムはJavaプログラムがするような型の消去による影響を受けません。

さらに、ScalaはScalaマクロを使ってScalaコンパイラ内で独自のコードを実行することができます - FlinkのScala APIに対して書かれたScalaプログラムをコンパイルするときはいつでも何らかのFlinkのコードが実行されることを意味します。

パラメータタイプを見るためとコンパイルの間にすべてのユーザ関数の型を返すためにマクロを使います - 全ての型の情報が完全に利用可能なのはまさにこの時です。マクロの中で、関数の返り値の型(およびパラメータ型)のためにTypeInformationを生成し、それをオペレーションの一部にします。

No Implicit Value for Evidence Parameter Error

TypeInformation が生成されなかった場合、プログラムは "could not find implicit value for evidence parameter of type TypeInformation"から始まるエラーでコンパイルに失敗します。

TypeInformation を生成するコードがインポートされなかった場合によくある理由です。flink.api.scala パッケージ全体をインポートするようにしてください。

import org.apache.flink.api.scala._

他の一般的な原因はジェネリックメソッドです。これは以下の省で説明されるように修正することができます。

ジェネリックメソッド

以下の場合を考えます:

def selectFirst[T](input: DataSet[(T, _)]) : DataSet[T] = {
  input.map { v => v._1 }
}

val data : DataSet[(String, Long) = ...

val result = selectFirst(data)

For such generic methods, the data types of the function parameters and return type may not be the same for every call and are not known at the site where the method is defined. 上のコードは十分に明確なエビデンスが利用可能では無いために、エラーになるでしょう。

そのよな場合、型情報は発動した場所で生成され、メソッドに渡されなければなりません。Scala はそのために implicit parameters を提供します。

以下のコードはScalaに T のための型情報を関数に運ぶように伝えます。そして型情報はメソッドが定義された場所では無く、メソッドが発動された場所で生成されるでしょう。

def selectFirst[T : TypeInformation](input: DataSet[(T, _)]) : DataSet[T] = {
  input.map { v => v._1 }
}


Java APIでの型情報

一般的な場合、Javaはgeneric型情報を削除します。Javaが保持する少しのbit(主に関数のシグニチャーと子クラス情報)を使って、Flinkはreflectionを使って可能な限り型情報をできるだけ再構築しようとします。このロジックは関数の返り値の型が入力型に依存するクラスのための単純な型推定も含みます。

public class AppendOne<T> extends MapFunction<T, Tuple2<T, Long>> {

    public Tuple2<T, Long> map(T value) {
        return new Tuple2<T, Long>(value, 1L);
    }
}

Flinkが全てのgeneric型情報を再構築できない場合があります。この場合、ユーザは型ヒントを使って手伝わなければなりません。

Java APIでの型ヒント

Flinkが削除されたgeneric型情報を再構築できない場合、Java APIはいわゆる型ヒントを提供します。型ヒントはシステムに関数によって生成されたデータストリームあるいはデータセットの型を伝えます:

DataSet<SomeType> result = dataSet
    .map(new MyGenericNonInferrableFunction<Long, SomeType>())
        .returns(SomeType.class);

returns文は、この場合クラスを使って、生成された型を指定します。ヒントは以下による型定義をサポートします

  • クラス。パラメータ化されていない型(非ジェネリック)のため
  • returns(new TypeHint<Tuple2<Integer, SomeType>>(){})の形式のTypeHint。TypeHintクラスはgeneric型情報をcaptureすることができ、(匿名サブクラスを使って)実行時のためにそれを保持します。

Java 8 lambdasのための型抽出

Java 8 のラムダのための型抽出は、ラムダは関数インタフェースを拡張する実装クラスと関連しないため、非ラムダのためのものとは異なって動作します。

現在のところ、Flinkはどのメソッドがlambdaを実装しているかを見つけだそうとし、パラメータの型と返り値の型を決定するためにJavaのジェネリックシグネチャーを使います。しかし、これらのシグネチャーは全てのコンパイラによってラムダのために生成されません (このドキュメントを書いている時点では、4.5より先からのEclipse JDTコンパイラによってのみ信頼できます)。

POJO型のシリアライズ化

PojoTypeInformation はPOJOの中の全てのフィールドのためのシリアライザを生成しています。int, long, String などのような標準的な型はFlinkと同梱されているシリアライザによって処理されます。他の全ての型については、Kryoに頼っています。

Kryo が型を処理できない場合は、PojoTypeInfoにAvroを使ってPOJOをシリアライズ化するように頼むことができます。そうするには、以下のように呼び出す必要があります

final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().enableForceAvro();

FlinkはAvroシリアライザを使ってAvroに生成されたPOJOを自動的にシリアライズ化することに注意してください。

Kryoシリアライザによって完全に POJO 型を扱いたい場合は、以下のように設定してください

final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().enableForceKryo();

KryoがPOJOをシリアライズ化できない場合、以下のようにKryoに独自のシリアライザを追加することができます

env.getConfig().addDefaultKryoSerializer(Class type, Class extends Serializer> serializerClass)

これらのメソッドの異なる変異体が利用可能です。

Kryoフォールバックの無効化

プログラムは明示的にgeneric型のための代替としてKryoを使うことを避けたいことがあります。最も一般的なのは、全ての型がFlinkの独自のシリアライザあるいはユーザ定義の独自のシリアライザのどちらかを使って、効率的にシリアライズ化されるようにしたい場合です。

以下の設定は、データ型がKryoを使うことでいつも遭遇する例外を上げるでしょう:

env.getConfig().disableGenericTypes();

Factoryを使った型情報の定義

A type information factory allows for plugging-in user-defined type information into the Flink type system. 独自の型情報を返すためにorg.apache.flink.api.common.typeinfo.TypeInfoFactory を実装する必要があります。対応する型が@org.apache.flink.api.common.typeinfo.TypeInfo アノテーションを使ってアノテートされた場合、型抽出のフェーズの間にfactoryが呼ばれます。

型情報のfactoryはJavaとScala APIの両方で使うことができます。

In a hierarchy of types the closest factory will be chosen while traversing upwards, however, a built-in factory has highest precedence. factoryはFlinkの組み込みの型よりも優先度も高いため、何をしているかを知るべきです。

以下の例は独自の型MyTuple をアノテートし、Javaでfactoryを使って独自の型情報を提供する方法を示します。

アノテートされた独自の型:

@TypeInfo(MyTupleTypeInfoFactory.class)
public class MyTuple<T0, T1> {
  public T0 myfield0;
  public T1 myfield1;
}

独自の型情報を提供するfactory:

public class MyTupleTypeInfoFactory extends TypeInfoFactory<MyTuple> {

  @Override
  public TypeInformation<MyTuple> createTypeInfo(Type t, Map<String, TypeInformation> genericParameters) {
    return new MyTupleTypeInfo(genericParameters.get("T0"), genericParameters.get("T1"));
  }
}

createTypeInfo(Type, Map<String, TypeInformation>)メソッドはfatoryが目的とする型のための型情報を生成します。パラメータは型自身と、可能であれば型のgeneric型パラメータについての追加の情報を提供します。

型がFlink関数の入力型から取り出す必要があるかもしれないgenericパラメータを含む場合、genericパラメータから型情報への双方向のマッピングのためにorg.apache.flink.api.common.typeinfo.TypeInformation#getGenericParametersも実装するようにしてください。

上に戻る

TOP
inserted by FC2 system