Overview
This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.

データ型 & シリアライズ化 #

Apache Flink handles data types and serialization in a unique way, containing its own type descriptors, generic type extraction, and type serialization framework. このドキュメントはその概念とそれらの背後にある根本的理由を説明します。

サポートされるデータ型 #

Flink places some restrictions on the type of elements that can be in a DataStream. The reason for this is that the system analyzes the types to determine efficient execution strategies.

There are seven different categories of data types:

  1. Java Tuples and Scala Case Classes
  2. Java POJOs
  3. Primitive Types
  4. Regular Classes
  5. Values
  6. Hadoop Writables
  7. Special Types

タプルと Case クラス #

タプルは様々な型を持つ固定数のフィールドを含む複合型です。 The Java API provides classes from Tuple1 up to Tuple25. Every field of a tuple can be an arbitrary Flink type including further tuples, resulting in nested tuples. Fields of a tuple can be accessed directly using the field’s name as tuple.f4, or using the generic getter method tuple.getField(int position). フィールドのインデックスは0から始まります。Note that this stands in contrast to the Scala tuples, but it is more consistent with Java’s general indexing.

DataStream<Tuple2<String, Integer>> wordCounts = env.fromElements(
    new Tuple2<String, Integer>("hello", 1),
    new Tuple2<String, Integer>("world", 2));

wordCounts.map(new MapFunction<Tuple2<String, Integer>, Integer>() {
    @Override
    public Integer map(Tuple2<String, Integer> value) throws Exception {
        return value.f1;
    }
});

wordCounts.keyBy(value -> value.f0);

Scalaのcaseクラス (および caseクラスの特別な場合のScalaのタプル) は、様々な型を持つ固定数のフィールドを含む複合型です。Tuple fields are addressed by their 1-offset names such as _1 for the first field. Case クラスのフィールドはそれらの名前によってアクセスされます。

case class WordCount(word: String, count: Int)
val input = env.fromElements(
    WordCount("hello", 1),
    WordCount("world", 2)) // Case Class Data Set

input.keyBy(_.word)

val input2 = env.fromElements(("hello", 1), ("world", 2)) // Tuple2 Data Set

input2.keyBy(value => (value._1, value._2))

POJOs #

Java と Scala クラスはFlinkによって、もしそれらが以下の要求を満たす場合は、特別なPOJOデータ型として扱われます。

  • クラスがpublicでなければなりません。

  • 引数無しのpublicコンストラクタ(デフォルトのコンストラクタ)を持たなければなりません。

  • 全てのフィールドはそれぞれpublic、あるいはgetterおよびsetter関数によってアクセス可能でなければなりません。For a field called foo the getter and setter methods must be named getFoo() and setFoo().

  • フィールドの型は登録されたシリアライザによってサポートされていなければなりません。

POJOs are generally represented with a PojoTypeInfo and serialized with the PojoSerializer (using Kryo as configurable fallback). The exception is when the POJOs are actually Avro types (Avro Specific Records) or produced as “Avro Reflect Types”. In that case the POJO’s are represented by an AvroTypeInfo and serialized with the AvroSerializer. You can also register your own custom serializer if required; see Serialization for further information.

Flink はPOJO型の構造を解析します。つまり、POJOのフィールドについて確認します。結果的に、POJO型は一般的な型ようりも扱い易いです。更に、Flinkは一般的な型よりPOJOをもっと効率的に処理することができます。

You can test whether your class adheres to the POJO requirements via org.apache.flink.types.PojoTestUtils#assertSerializedAsPojo() from the flink-test-utils. If you additionally want to ensure that no field of the POJO will be serialized with Kryo, use assertSerializedAsPojoWithoutKryo() instead.

以下の例は2つのpublicフィールドを持つ単純なPOJOを示します。

public class WordWithCount {

    public String word;
    public int count;

    public WordWithCount() {}

    public WordWithCount(String word, int count) {
        this.word = word;
        this.count = count;
    }
}

DataStream<WordWithCount> wordCounts = env.fromElements(
    new WordWithCount("hello", 1),
    new WordWithCount("world", 2));

wordCounts.keyBy(value -> value.word);
class WordWithCount(var word: String, var count: Int) {
    def this() {
      this(null, -1)
    }
}

val input = env.fromElements(
    new WordWithCount("hello", 1),
    new WordWithCount("world", 2)) // Case Class Data Set

input.keyBy(_.word)

Primitive 型 #

Flink supports all Java and Scala primitive types such as Integer, String, and Double.

一般的なクラスの型 #

Flink はほとんどのJavaおよびScalaのクラスをサポートします (APIおよびカスタム)。 Restrictions apply to classes containing fields that cannot be serialized, like file pointers, I/O streams, or other native resources. Java Beansの慣習に従うクラスは一般的にうまく動作します。

POJO型として認識されないすべてのクラス (上記のPOJOの要求を見てください)は、一般的なクラス型としてFlinkによって扱われます。 Flink はこれらのデータ型をブラックボックスとして扱い、それらの内容(例えば、効率的なソート)にアクセスすることができません。General types are de/serialized using the serialization framework Kryo.

#

Value types describe their serialization and deserialization manually. Instead of going through a general purpose serialization framework, they provide custom code for those operations by means of implementing the org.apache.flink.types.Value interface with the methods read and write. Using a Value type is reasonable when general purpose serialization would be highly inefficient. An example would be a data type that implements a sparse vector of elements as an array. Knowing that the array is mostly zero, one can use a special encoding for the non-zero elements, while the general purpose serialization would simply write all array elements.

The org.apache.flink.types.CopyableValue interface supports manual internal cloning logic in a similar way.

Flink は基本的なデータ型に対応するあらかじめ定義されたValue型を同梱しています。(ByteValue, ShortValue, IntValue, LongValue, FloatValue, DoubleValue, StringValue, CharValue, BooleanValue). These Value types act as mutable variants of the basic data types: Their value can be altered, allowing programmers to reuse objects and take pressure off the garbage collector.

Hadoop Writables #

You can use types that implement the org.apache.hadoop.Writable interface. The serialization logic defined in the write()and readFields() methods will be used for serialization.

特別型 #

You can use special types, including Scala’s Either, Option, and Try. The Java API has its own custom implementation of Either. Similarly to Scala’s Either, it represents a value of two possible types, Left or Right. Either can be useful for error handling or operators that need to output two different types of records.

型の抹消 & 型の推論 #

Note: This Section is only relevant for Java.

Javaのコンパイラはコンパイルの後で一般型の情報のほとんどを捨てます。This is known as type erasure in Java. It means that at runtime, an instance of an object does not know its generic type any more. For example, instances of DataStream<String> and DataStream<Long> look the same to the JVM.

Flink requires type information at the time when it prepares the program for execution (when the main method of the program is called). The Flink Java API tries to reconstruct the type information that was thrown away in various ways and store it explicitly in the data sets and operators. You can retrieve the type via DataStream.getType(). The method returns an instance of TypeInformation, which is Flink’s internal way of representing types.

The type inference has its limits and needs the “cooperation” of the programmer in some cases. Examples for that are methods that create data sets from collections, such as StreamExecutionEnvironment.fromCollection(), where you can pass an argument that describes the type. But also generic functions like MapFunction<I, O> may need extra type information.

The ResultTypeQueryable interface can be implemented by input formats and functions to tell the API explicitly about their return type. The input types that the functions are invoked with can usually be inferred by the result types of the previous operations.

Back to top

Flinkでの型の扱い #

Flinkは分散計算の間に交換および格納されるデータ型についての多くの情報を推論しようとします。 テーブルのスキーマを推測するデータベースのようなものを考えてください。In most cases, Flink infers all necessary information seamlessly by itself. 型情報を持つことでFlinkは幾つかのクールなことを行うことができます:

  • Flinkがデータ型をより知ることにより、シリアライズ化およびデータのレイアウトのスキーマが良くなります。 That is quite important for the memory usage paradigm in Flink (work on serialized data inside/outside the heap where ever possible and make serialization very cheap).

  • 結果的にユーザが多くのクラスの中でシリアライズ化フレームワークを心配し、型を登録しなければならないことを手助けします。

In general, the information about data types is needed during the pre-flight phase - that is, when the program’s calls on DataStream are made, and before any call to execute(), print(), count(), or collect().

最もよくある問題 #

The most frequent issues where users need to interact with Flink’s data type handling are:

  • Registering subtypes: If the function signatures describe only the supertypes, but they actually use subtypes of those during execution, it may increase performance a lot to make Flink aware of these subtypes. For that, call .registerType(clazz) on the StreamExecutionEnvironment for each subtype.

  • Registering custom serializers: Flink falls back to Kryo for the types that it does not handle transparently by itself. 全ての型がKryoによって(従ってFlinkによって)シームレスに扱われるわけではありません。For example, many Google Guava collection types do not work well by default. 解決法は、問題を起こす型について追加のシリアライザを登録することです。 Call .getConfig().addDefaultKryoSerializer(clazz, serializer) on the StreamExecutionEnvironment. 追加のKryoシリアライザは多くのライブラリで利用可能です。See 3rd party serializer for more details on working with external serializers.

  • Adding Type Hints: Sometimes, when Flink cannot infer the generic types despite all tricks, a user must pass a type hint. That is generally only necessary in the Java API. The Type Hints Section describes that in more detail.

  • Manually creating a TypeInformation: This may be necessary for some API calls where it is not possible for Flink to infer the data types due to Java’s generic type erasure. See Creating a TypeInformation or TypeSerializer for details.

The class TypeInformation is the base class for all type descriptors. It reveals some basic properties of the type and can generate serializers and, in specializations, comparators for the types. (Note that comparators in Flink do much more than defining an order - they are basically the utility to handle keys)

内部的には、Flinkは以下の型の区別を行います:

  • Basic types: All Java primitives and their boxed form, plus void, String, Date, BigDecimal, and BigInteger.

  • プリミティブ配列とオブジェクトの配列

  • 複合型

    • Flink Java タプル (Flink Java APIの一部) : 最大20フィールド、nullフィールドはサポートされません

    • Scala case classes (including Scala tuples): null fields not supported

    • 行: 任意の数のフィールドを持つtupleとnullフィールドのためのサポート

    • POJOs: 特定のbean的なパターンに従うクラス

  • Auxiliary types (Option, Either, Lists, Maps, …)

  • ジェネリック型: これらはFlink自身ではなくKryoによってシリアライズ化されるでしょう。

POJOs are of particular interest, because they support the creation of complex types. それらはランタイムにも透過的で、Flinkによってとても効率的に扱うことができます。

POJOタイプのためのルール #

Flink recognizes a data type as a POJO type (and allows “by-name” field referencing) if the following conditions are fulfilled:

  • クラスはpublicでスタンドアローン(非static内部クラスがありません)
  • クラスはpublicな引数を持たないコンストラクタを持ちます
  • All non-static, non-transient fields in the class (and all superclasses) are either public (and non-final) or have a public getter- and a setter- method that follows the Java beans naming conventions for getters and setters.

Note that when a user-defined data type can’t be recognized as a POJO type, it must be processed as GenericType and serialized with Kryo.

TypeInformation あるいは TypeSerializerの生成 #

型のためのTypeInformation オブジェクトを作るには、言語固有の方法を使います:

Because Java generally erases generic type information, you need to pass the type to the TypeInformation construction:

非generic型については、Classを渡すことができます:

TypeInformation<String> info = TypeInformation.of(String.class);

For generic types, you need to “capture” the generic type information via the TypeHint:

TypeInformation<Tuple2<String, Double>> info = TypeInformation.of(new TypeHint<Tuple2<String, Double>>(){});

Internally, this creates an anonymous subclass of the TypeHint that captures the generic information to preserve it until runtime.

In Scala, Flink uses macros that runs at compile time and captures all generic type information while it is still available.

// important: this import is needed to access the 'createTypeInformation' macro function
import org.apache.flink.streaming.api.scala._

val stringInfo: TypeInformation[String] = createTypeInformation[String]

val tupleInfo: TypeInformation[(String, Double)] = createTypeInformation[(String, Double)]

Javaでは代替として同じメソッドをまだ使うことができます。

To create a TypeSerializer, simply call typeInfo.createSerializer(config) on the TypeInformation object.

The config parameter is of type ExecutionConfig and holds the information about the program’s registered custom serializers. Where ever possibly, try to pass the programs proper ExecutionConfig. You can usually obtain it from DataStream via calling getExecutionConfig(). Inside functions (like MapFunction), you can get it by making the function a Rich Function and calling getRuntimeContext().getExecutionConfig().



Scala APIでの型情報 #

Scala has very elaborate concepts for runtime type information though type manifests and class tags. In general, types and methods have access to the types of their generic parameters - thus, Scala programs do not suffer from type erasure as Java programs do.

In addition, Scala allows to run custom code in the Scala Compiler through Scala Macros - that means that some Flink code gets executed whenever you compile a Scala program written against Flink’s Scala API.

We use the Macros to look at the parameter types and return types of all user functions during compilation - that is the point in time when certainly all type information is perfectly available. Within the macro, we create a TypeInformation for the function’s return types (or parameter types) and make it part of the operation.

No Implicit Value for Evidence Parameter Error #

In the case where TypeInformation could not be created, programs fail to compile with an error stating “could not find implicit value for evidence parameter of type TypeInformation”.

TypeInformation を生成するコードがインポートされなかった場合によくある理由です。 flink.api.scala パッケージ全体をインポートするようにしてください。

import org.apache.flink.api.scala._

他の一般的な原因はジェネリックメソッドです。これは以下の省で説明されるように修正することができます。

ジェネリックメソッド #

以下の場合を考えます:

def selectFirst[T](input: DataStream[(T, _)]) : DataStream[T] = {
  input.map { v => v._1 }
}

val data : DataStream[(String, Long) = ...

val result = selectFirst(data)

For such generic methods, the data types of the function parameters and return type may not be the same for every call and are not known at the site where the method is defined. The code above will result in an error that not enough implicit evidence is available.

In such cases, the type information has to be generated at the invocation site and passed to the method. Scala offers implicit parameters for that.

The following code tells Scala to bring a type information for T into the function. The type information will then be generated at the sites where the method is invoked, rather than where the method is defined.

def selectFirst[T : TypeInformation](input: DataStream[(T, _)]) : DataStream[T] = {
  input.map { v => v._1 }
}


Java APIでの型情報 #

一般的な場合、Javaはgeneric型情報を削除します。Flink tries to reconstruct as much type information as possible via reflection, using the few bits that Java preserves (mainly function signatures and subclass information). このロジックは関数の返り値の型が入力型に依存するクラスのための単純な型推定も含みます。

public class AppendOne<T> implements MapFunction<T, Tuple2<T, Long>> {

    public Tuple2<T, Long> map(T value) {
        return new Tuple2<T, Long>(value, 1L);
    }
}

Flinkが全てのgeneric型情報を再構築できない場合があります。In that case, a user has to help out via type hints.

Java APIでの型ヒント #

In cases where Flink cannot reconstruct the erased generic type information, the Java API offers so called type hints. The type hints tell the system the type of the data stream or data set produced by a function:

DataStream<SomeType> result = stream
    .map(new MyGenericNonInferrableFunction<Long, SomeType>())
        .returns(SomeType.class);

The returns statement specifies the produced type, in this case via a class. The hints support type definition via

  • クラス。パラメータ化されていない型(非ジェネリック)のため
  • TypeHints in the form of returns(new TypeHint<Tuple2<Integer, SomeType>>(){}). The TypeHint class can capture generic type information and preserve it for the runtime (via an anonymous subclass).

Java 8 lambdasのための型抽出 #

Type extraction for Java 8 lambdas works differently than for non-lambdas, because lambdas are not associated with an implementing class that extends the function interface.

Currently, Flink tries to figure out which method implements the lambda and uses Java’s generic signatures to determine the parameter types and the return type. However, these signatures are not generated for lambdas by all compilers. If you observe unexpected behavior, manually specify the return type using the returns method.

POJO型のシリアライズ化 #

The PojoTypeInfo is creating serializers for all the fields inside the POJO. Standard types such as int, long, String etc. are handled by serializers we ship with Flink. For all other types, we fall back to Kryo.

If Kryo is not able to handle the type, you can ask the PojoTypeInfo to serialize the POJO using Avro. そうするには、以下のように呼び出す必要があります

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().enableForceAvro();

FlinkはAvroシリアライザを使ってAvroに生成されたPOJOを自動的にシリアライズ化することに注意してください。

If you want your entire POJO Type to be treated by the Kryo serializer, set

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().enableForceKryo();

KryoがPOJOをシリアライズ化できない場合、以下のようにKryoに独自のシリアライザを追加することができます

env.getConfig().addDefaultKryoSerializer(Class<?> type, Class<? extends Serializer<?>> serializerClass);

これらのメソッドの異なる変異体が利用可能です。

Kryoフォールバックの無効化 #

プログラムは明示的にgeneric型のための代替としてKryoを使うことを避けたいことがあります。The most common one is wanting to ensure that all types are efficiently serialized either through Flink’s own serializers, or via user-defined custom serializers.

以下の設定は、データ型がKryoを使うことでいつも遭遇する例外を上げるでしょう:

env.getConfig().disableGenericTypes();

Factoryを使った型情報の定義 #

型情報ファクトリにより、ユーザ定義の型情報をFlink型システムにプラグインすることができます。 You have to implement org.apache.flink.api.common.typeinfo.TypeInfoFactory to return your custom type information. The factory is called during the type extraction phase if either the corresponding type or a POJO’s field using this type has been annotated with the @org.apache.flink.api.common.typeinfo.TypeInfo annotation.

型情報のfactoryはJavaとScala APIの両方で使うことができます。

In a hierarchy of types the closest factory will be chosen while traversing upwards, however, a built-in factory has highest precedence. A factory has also higher precedence than Flink’s built-in types, therefore you should know what you are doing.

The following example shows how to annotate a custom type MyTuple and supply custom type information for it using a factory in Java.

アノテートされた独自の型:

@TypeInfo(MyTupleTypeInfoFactory.class)
public class MyTuple<T0, T1> {
  public T0 myfield0;
  public T1 myfield1;
}

独自の型情報を提供するfactory:

public class MyTupleTypeInfoFactory extends TypeInfoFactory<MyTuple> {

  @Override
  public TypeInformation<MyTuple> createTypeInfo(Type t, Map<String, TypeInformation<?>> genericParameters) {
    return new MyTupleTypeInfo(genericParameters.get("T0"), genericParameters.get("T1"));
  }
}

Instead of annotating the type itself, which may not be possible for third-party code, you can also annotate the usage of this type inside a valid Flink POJO like this:

public class MyPojo {
  public int id;

  @TypeInfo(MyTupleTypeInfoFactory.class)
  public MyTuple<Integer, String> tuple;
}

The method createTypeInfo(Type, Map<String, TypeInformation<?>>) creates type information for the type the factory is targeted for. The parameters provide additional information about the type itself as well as the type’s generic type parameters if available.

If your type contains generic parameters that might need to be derived from the input type of a Flink function, make sure to also implement org.apache.flink.api.common.typeinfo.TypeInformation#getGenericParameters for a bidirectional mapping of generic parameters to type information.

Back to top

inserted by FC2 system