JSON
This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.

Json形式 #

JSON形式を使うには、プロジェクトにFlink JSON依存関係を追加する必要があります:

<dependency>
	<groupId>org.apache.flink</groupId>
	<artifactId>flink-json</artifactId>
	<version>1.19-SNAPSHOT</version>
	<scope>provided</scope>
</dependency>

PyFlinkユーザの場合、ジョブの中で直接使えます。

FlinkはJsonSerializationSchema/JsonDeserializationSchemaを介したJSONレコードの読み書きをサポートします。 これらはJacksonライブラリを利用し、POJOObjectNodeを含むがこれらに限定されない、Jacksonにサポートされる任意の型をサポートします。

JsonDeserializationSchemaは、DeserializationSchemaをサポートする任意のコネクタで使えます。

例えば、これをKafkaSourceで使ってPOJOを逆シリアライズする方法は次の通りです:

JsonDeserializationSchema<SomePojo> jsonFormat=new JsonDeserializationSchema<>(SomePojo.class);
KafkaSource<SomePojo> source=
    KafkaSource.<SomePojo>builder()
        .setValueOnlyDeserializer(jsonFormat)
        ...

JsonSerializationSchemaSerializationSchemaをサポートする任意のコネクタで使えます。

例えば、これをKafkaSinkで使ってPOJOをシリアライズする方法は次の通りです:

JsonSerializationSchema<SomePojo> jsonFormat=new JsonSerializationSchema<>();
KafkaSink<SomePojo> source  = 
    KafkaSink.<SomePojo>builder()
        .setRecordSerializer(
            new KafkaRecordSerializationSchemaBuilder<>()
                .setValueSerializationSchema(jsonFormat)
                ...

独自のマッパー #

どちらのスキーマにもSerializableSupplier<ObjectMapper>を受け入れるコンストラクタがあり、オブジェクトマッパーのファクトリとして振舞います。 このファクトリを使って、作成されたマッパーを完全に制御でき、様々なJackson機能を有効/無効にしたり、モジュールを登録してサポートされる型のセットを拡張したり、機能を追加したりできます。

JsonSerializationSchema<SomeClass> jsonFormat=new JsonSerializationSchema<>(
    () -> new ObjectMapper()
        .enable(SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS))
        .registerModule(new ParameterNamesModule());

Python #

PyFlinkでは、JsonRowSerializationSchemaJsonRowDeserializationSchemaRow型の組み込みサポートです。 例えば、KafkaSourceKafkaSinkで使うには以下のようにします:

row_type_info = Types.ROW_NAMED(['name', 'age'], [Types.STRING(), Types.INT()])
json_format = JsonRowDeserializationSchema.builder().type_info(row_type_info).build()

source = KafkaSource.builder() \
    .set_value_only_deserializer(json_format) \
    .build()
row_type_info = Types.ROW_NAMED(['name', 'age'], [Types.STRING(), Types.INT()])
json_format = JsonRowSerializationSchema.builder().with_type_info(row_type_info).build()

sink = KafkaSink.builder() \
    .set_record_serializer(
        KafkaRecordSerializationSchema.builder()
            .set_topic('test')
            .set_value_serialization_schema(json_format)
            .build()
    ) \
    .build()
inserted by FC2 system