データ型 & シリアライズ化

Apache Flink handles data types and serialization in a unique way, containing its own type descriptors, generic type extraction, and type serialization framework. このドキュメントはその概念とそれらの背後にある根本的理由を説明します。

Flink tries to infer a lot of information about the data types that are exchanged and stored during the distributed computation. Think about it like a database that infers the schema of tables. In most cases, Flink infers all necessary information seamlessly by itself. Having the type information allows Flink to do some cool things:

  • Using POJOs types and grouping / joining / aggregating them by referring to field names (like dataSet.keyBy("username")). The type information allows Flink to check (for typos and type compatibility) early rather than failing later ar runtime.

  • The more Flink knows about data types, the better the serialization and data layout schemes are. That is quite important for the memory usage paradigm in Flink (work on serialized data inside/outside the heap where ever possible and make serialization very cheap).

  • Finally, it also spares users in the majority of cases from worrying about serialization frameworks and having to register types.

In general, the information about data types is needed during the pre-flight phase - that is, when the program’s calls on DataStream and DataSet are made, and before any call to execute(), print(), count(), or collect().

Most Frequent Issues

The most frequent issues where users need to interact with Flink’s data type handling are:

  • Registering subtypes: If the function signatures describe only the supertypes, but they actually use subtypes of those during execution, it may increase performance a lot to make Flink aware of these subtypes. For that, call .registerType(clazz) on the StreamExecutionEnvironment or ExecutionEnvironment for each subtype.

  • Registering custom serializers: Flink falls back to Kryo for the types that it does not handle transparently by itself. Not all types are seamlessly handled by Kryo (and thus by Flink). For example, many Google Guava collection types do not work well by default. The solution is to register additional serializers for the types that cause problems. Call .getConfig().addDefaultKryoSerializer(clazz, serializer) on the StreamExecutionEnvironment or ExecutionEnvironment. Additional Kryo serializers are available in many libraries. See Custom Serializers for more details on working with custom serializers.

  • Adding Type Hints: Sometimes, when Flink cannot infer the generic types despits all tricks, a user must pass a type hint. That is generally only necessary in the Java API. The Type Hints Section describes that in more detail.

  • Manually creating a TypeInformation: This may be necessary for some API calls where it is not possible for Flink to infer the data types due to Java’s generic type erasure. See Creating a TypeInformation or TypeSerializer for details.

クラス TypeInformation は全ての型のデスクリプタのための基本クラスです。それは型のいくつかの基本的なプロパティを明らかにし、シリアライザと型のための特殊な比較器を生成することができます。(Flinkでの比較器は順番の定義以上の事をすることに注意してください - それらは基本的にキーを扱うためのユーティリティです)

内部的には、Flinkは以下の型の区別を行います:

  • 基本型: 全てのJavaのプリミティブ型とボックス化の形式、加えて void, String, Date, BigDecimalBigInteger

  • プリミティブ配列とオブジェクトの配列

  • 複合型

    • Flink Java Tuples (part of the Flink Java API): max 25 fields, null fields not supported

    • Scala case classes (including Scala tuples): max 22 fields, null fields not supported

    • Row: tuples with arbitrary number of fields and support for null fields

    • POJOs: 特定のbean的なパターンに従うクラス

  • Auxiliary types (Option, Either, Lists, Maps, …)

  • ジェネリック型: これらはFlink自身ではなくKryoによってシリアライズ化されるでしょう。

POJOs は特に興味深いです。それらは複雑な型の生成とキーの定義内でのフィールド名の使用をサポートします: dataSet.join(another).where("name").equalTo("personName"). それらはランタイムにも透過的で、Flinkによってとても効率的に扱うことができます。

POJOタイプのためのルール

以下の条件が満たされる場合は、FlinkはPOJOタイプとしてデータの型を認識します(そして"名前での"フィールド参照を許可します:

  • クラスはpublicでスタンドアローン(非static内部クラスがありません)
  • クラスはpublicな引数を持たないコンストラクタを持ちます
  • All fields in the class (and all superclasses) are either public (and non-final) or have a public getter- and a setter- method that follows the Java beans naming conventions for getters and setters.

Creating a TypeInformation or TypeSerializer

To create a TypeInformation object for a type, use the language specific way:

Because Java generally erases generic type information, you need to pass the type to the TypeInformation construction:

For non-generic types, you can pass the Class:

TypeInformation<String> info = TypeInformation.of(String.class);

For generic types, you need to “capture” the generic type information via the TypeHint:

TypeInformation<Tuple2<String, Double>> info = TypeInformation.of(new TypeHint<Tuple2<String, Double>>(){});

Internally, this creates an anonymous subclass of the TypeHint that captures the generic information to preserve it until runtime.

In Scala, Flink uses macros that runs at compile time and captures all generic type information while it is still available.

// important: this import is needed to access the 'createTypeInformation' macro function
import org.apache.flink.streaming.api.scala._

val stringInfo: TypeInformation[String] = createTypeInformation[String]

val tupleInfo: TypeInformation[(String, Double)] = createTypeInformation[(String, Double)]

You can still use the same method as in Java as a fallback.

To create a TypeSerializer, simply call typeInfo.createSerializer(config) on the TypeInformation object.

The config parameter is of type ExecutionConfig and holds the information about the program’s registered custom serializers. Where ever possibly, try to pass the programs proper ExecutionConfig. You can usually obtain it from DataStream or DataSet via calling getExecutionConfig(). Inside functions (like MapFunction), you can get it by making the function a Rich Function and calling getRuntimeContext().getExecutionConfig().



Scala APIでの型情報

Scala type manifestsclass tagsを使ってランタイム型情報のための精巧な概念を持ちます。一般的に型とメソッドはジェネリック パラメータの型へのアクセスを持ちます - したがって ScalaプログラムはJavaプログラムがするような型の消去による影響を受けません。

さらに、ScalaはScalaマクロを使ってScalaコンパイラ内で独自のコードを実行することができます - FlinkのScala APIに対して書かれたScalaプログラムをコンパイルするときはいつでも何らかのFlinkのコードが実行されることを意味します。

パラメータタイプを見るためとコンパイルの間にすべてのユーザ関数の型を返すためにマクロを使います - 全ての型の情報が完全に利用可能なのはまさにこの時です。マクロの中で、関数の返り値の型(およびパラメータ型)のためにTypeInformationを生成し、それをオペレーションの一部にします。

No Implicit Value for Evidence Parameter Error

TypeInformation が生成されなかった場合、プログラムは "could not find implicit value for evidence parameter of type TypeInformation"から始まるエラーでコンパイルに失敗します。

TypeInformation を生成するコードがインポートされなかった場合によくある理由です。flink.api.scala パッケージ全体をインポートするようにしてください。

import org.apache.flink.api.scala._

他の一般的な原因はジェネリックメソッドです。これは以下の省で説明されるように修正することができます。

ジェネリックメソッド

以下の場合を考えます:

def selectFirst[T](input: DataSet[(T, _)]) : DataSet[T] = {
  input.map { v => v._1 }
}

val data : DataSet[(String, Long) = ...

val result = selectFirst(data)

For such generic methods, the data types of the function parameters and return type may not be the same for every call and are not known at the site where the method is defined. 上のコードは十分に明確なエビデンスが利用可能では無いために、エラーになるでしょう。

そのよな場合、型情報は発動した場所で生成され、メソッドに渡されなければなりません。Scala はそのために implicit parameters を提供します。

以下のコードはScalaに T のための型情報を関数に運ぶように伝えます。そして型情報はメソッドが定義された場所では無く、メソッドが発動された場所で生成されるでしょう。

def selectFirst[T : TypeInformation](input: DataSet[(T, _)]) : DataSet[T] = {
  input.map { v => v._1 }
}


Java APIでの型情報

In the general case, Java erases generic type information. Flink tries to reconstruct as much type information as possible via reflection, using the few bits that Java preserves (mainly function signatures and subclass information). This logic also contains some simple type inference for cases where the return type of a function depends on its input type:

public class AppendOne<T> extends MapFunction<T, Tuple2<T, Long>> {

    public Tuple2<T, Long> map(T value) {
        return new Tuple2<T, Long>(value, 1L);
    }
}

There are cases where Flink cannot reconstruct all generic type information. In that case, a user has to help out via type hints.

Java APIでの型ヒント

In cases where Flink cannot reconstruct the erased generic type information, the Java API offers so called type hints. The type hints tell the system the type of the data stream or data set produced by a function:

DataSet<SomeType> result = dataSet
    .map(new MyGenericNonInferrableFunction<Long, SomeType>())
        .returns(SomeType.class);

The returns statement specifies the produced type, in this case via a class. The hints support type definition via

  • クラス。パラメータ化されていない型(非ジェネリック)のため
  • TypeHints in the form of returns(new TypeHint<Tuple2<Integer, SomeType>>(){}). The TypeHint class can capture generic type information and preserve it for the runtime (via an anonymous subclass).

Java 8 lambdasのための型抽出

Type extraction for Java 8 lambdas works differently than for non-lambdas, because lambdas are not associated with an implementing class that extends the function interface.

現在のところ、Flinkはどのメソッドがlambdaを実装しているかを見つけだそうとし、パラメータの型と返り値の型を決定するためにJavaのジェネリックシグネチャーを使います。However, these signatures are not generated for lambdas by all compilers (as of writing this document only reliably by the Eclipse JDT compiler from 4.5 onwards).

POJO型のシリアライズ化

PojoTypeInformation はPOJOの中の全てのフィールドのためのシリアライザを生成しています。int, long, String などのような標準的な型はFlinkと同梱されているシリアライザによって処理されます。他の全ての型については、Kryoに頼っています。

Kryo が型を処理できない場合は、PojoTypeInfoにAvroを使ってPOJOをシリアライズ化するように頼むことができます。そうするには、以下のように呼び出す必要があります

final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().enableForceAvro();

FlinkはAvroシリアライザを使ってAvroに生成されたPOJOを自動的にシリアライズ化することに注意してください。

Kryoシリアライザによって完全に POJO 型を扱いたい場合は、以下のように設定してください

final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().enableForceKryo();

KryoがPOJOをシリアライズ化できない場合、以下のようにKryoに独自のシリアライザを追加することができます

env.getConfig().addDefaultKryoSerializer(Class type, Class extends Serializer> serializerClass)

これらのメソッドの異なる変異体が利用可能です。

Defining Type Information using a Factory

A type information factory allows for plugging-in user-defined type information into the Flink type system. You have to implement org.apache.flink.api.common.typeinfo.TypeInfoFactory to return your custom type information. The factory is called during the type extraction phase if the corresponding type has been annotated with the @org.apache.flink.api.common.typeinfo.TypeInfo annotation.

Type information factories can be used in both the Java and Scala API.

In a hierarchy of types the closest factory will be chosen while traversing upwards, however, a built-in factory has highest precedence. A factory has also higher precendence than Flink’s built-in types, therefore you should know what you are doing.

The following example shows how to annotate a custom type MyTuple and supply custom type information for it using a factory in Java.

The annotated custom type:

@TypeInfo(MyTupleTypeInfoFactory.class)
public class MyTuple<T0, T1> {
  public T0 myfield0;
  public T1 myfield1;
}

The factory supplying custom type information:

public class MyTupleTypeInfoFactory extends TypeInfoFactory<MyTuple> {

  @Override
  public TypeInformation<MyTuple> createTypeInfo(Type t, Map<String, TypeInformation> genericParameters) {
    return new MyTupleTypeInfo(genericParameters.get("T0"), genericParameters.get("T1"));
  }
}

The method createTypeInfo(Type, Map<String, TypeInformation>) creates type information for the type the factory is targeted for. The parameters provide additional information about the type itself as well as the type’s generic type parameters if available.

If your type contains generic parameters that might need to be derived from the input type of a Flink function, make sure to also implement org.apache.flink.api.common.typeinfo.TypeInformation#getGenericParameters for a bidirectional mapping of generic parameters to type information.

TOP
inserted by FC2 system