Flinkのタイプのシリアライザによってシリアライズ化されない独自のタイプをFinkプログラム内で使う場合は、Flinkは一般的なKryoシリアライザの使用にフォールバックします。独自のシリアライザ、あるいはGoogle ProtobufあるいはKryoを使ったApache Triftのようなシリアライズ化システムを登録するかも知れません。そうするには、単にタイプクラスとFlinkプログラムのExecutionConfig
内のシリアライザを登録します。
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// register the class of the serializer as serializer for a type
env.getConfig().registerTypeWithKryoSerializer(MyCustomType.class, MyCustomSerializer.class);
// register an instance as serializer for a type
MySerializer mySerializer = new MySerializer();
env.getConfig().registerTypeWithKryoSerializer(MyCustomType.class, mySerializer);
独自のシリアライザはKryoのシリアライザクラスを継承しなければならないことに注意してください。Google Protobuf あるいは Apache Thriftの場合は、これは以下のようにしてすでに行われています:
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// register the Google Protobuf serializer with Kryo
env.getConfig().registerTypeWithKryoSerializer(MyCustomType.class, ProtobufSerializer.class);
// register the serializer included with Apache Thrift as the standard serializer
// TBaseSerializer states it should be initialized as a default Kryo serializer
env.getConfig().addDefaultKryoSerializer(MyCustomType.class, TBaseSerializer.class);
上の例を動作するには、必要な依存をMavenプロジェクトファイル (pom.xml)に含める必要があります。依存のセクションの中で、Apache Thriftのために以下を追加します:
<dependency>
<groupId>com.twitter</groupId>
<artifactId>chill-thrift</artifactId>
<version>0.5.2</version>
</dependency>
<!-- libthrift is required by chill-thrift -->
<dependency>
<groupId>org.apache.thrift</groupId>
<artifactId>libthrift</artifactId>
<version>0.6.1</version>
<exclusions>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</exclusion>
</exclusions>
</dependency>
Google Protobuf のためには、以下のMaven依存を必要とします:
<dependency>
<groupId>com.twitter</groupId>
<artifactId>chill-protobuf</artifactId>
<version>0.5.2</version>
</dependency>
<!-- We need protobuf for chill-protobuf -->
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>2.5.0</version>
</dependency>
必要に応じて両方のライブラリのバージョンを調節してください。