This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.
Json形式 #
JSON形式を使うには、プロジェクトにFlink JSON依存関係を追加する必要があります:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>1.19-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
PyFlinkユーザの場合、ジョブの中で直接使えます。
FlinkはJsonSerializationSchema/JsonDeserializationSchema
を介したJSONレコードの読み書きをサポートします。
これらはJacksonライブラリを利用し、POJO
とObjectNode
を含むがこれらに限定されない、Jacksonにサポートされる任意の型をサポートします。
JsonDeserializationSchema
は、DeserializationSchema
をサポートする任意のコネクタで使えます。
例えば、これをKafkaSource
で使ってPOJO
を逆シリアライズする方法は次の通りです:
JsonDeserializationSchema<SomePojo> jsonFormat=new JsonDeserializationSchema<>(SomePojo.class);
KafkaSource<SomePojo> source=
KafkaSource.<SomePojo>builder()
.setValueOnlyDeserializer(jsonFormat)
...
JsonSerializationSchema
はSerializationSchema
をサポートする任意のコネクタで使えます。
例えば、これをKafkaSink
で使ってPOJO
をシリアライズする方法は次の通りです:
JsonSerializationSchema<SomePojo> jsonFormat=new JsonSerializationSchema<>();
KafkaSink<SomePojo> source =
KafkaSink.<SomePojo>builder()
.setRecordSerializer(
new KafkaRecordSerializationSchemaBuilder<>()
.setValueSerializationSchema(jsonFormat)
...
独自のマッパー #
どちらのスキーマにもSerializableSupplier<ObjectMapper>
を受け入れるコンストラクタがあり、オブジェクトマッパーのファクトリとして振舞います。
このファクトリを使って、作成されたマッパーを完全に制御でき、様々なJackson機能を有効/無効にしたり、モジュールを登録してサポートされる型のセットを拡張したり、機能を追加したりできます。
JsonSerializationSchema<SomeClass> jsonFormat=new JsonSerializationSchema<>(
() -> new ObjectMapper()
.enable(SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS))
.registerModule(new ParameterNamesModule());
Python #
PyFlinkでは、JsonRowSerializationSchema
とJsonRowDeserializationSchema
はRow
型の組み込みサポートです。
例えば、KafkaSource
とKafkaSink
で使うには以下のようにします:
row_type_info = Types.ROW_NAMED(['name', 'age'], [Types.STRING(), Types.INT()])
json_format = JsonRowDeserializationSchema.builder().type_info(row_type_info).build()
source = KafkaSource.builder() \
.set_value_only_deserializer(json_format) \
.build()
row_type_info = Types.ROW_NAMED(['name', 'age'], [Types.STRING(), Types.INT()])
json_format = JsonRowSerializationSchema.builder().with_type_info(row_type_info).build()
sink = KafkaSink.builder() \
.set_record_serializer(
KafkaRecordSerializationSchema.builder()
.set_topic('test')
.set_value_serialization_schema(json_format)
.build()
) \
.build()