This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.
Confluent Avroフォーマット #
Format: Serialization Schema Format: Deserialization Schema
Avro Schema Registry (avro-confluent
)フォーマットにより、io.confluent.kafka.serializers.KafkaAvroSerializer
でシリアライズ化されたレコードを読み取ることができ、次にio.confluent.kafka.serializers.KafkaAvroDeserializer
で読み取ることができるレコードを書き込むことができます。
この形式でレコードを読み取る(逆シリアライズ化する)場合、Avroライタースキーマは、レコード内にエンコードされたスキーマバージョンIDに基づいて、設定されたConfluentスキーマレジストリからフェッチされ、リーダースキーマはテーブルスキーマから推論されます。
この形式でレコードを書き込む(シリアライズ化する)場合、Avroスキーマはテーブルスキーマから推論され、データと共にエンコードされるスキーマIDを取得するために使われます。検索は、avro-confluent.subject
で指定されたsubjectの下にある設定済みのConfluentスキーマレジストリで実行されます。
Avroスキーマレジストリ形式は、Apache Kafka SQLコネクタまたはUpsert Kafka SQLコネクタと合わせてのみ使えます。
依存 #
In order to use the Avro Schema Registry format the following dependencies are required for both projects using a build automation tool (such as Maven or SBT) and SQL Client with SQL JAR bundles.
Maven dependency | SQL Client |
---|---|
|
Only available for stable releases. |
Maven、SBT、Gradle、その他のビルド自動化ツールの場合は、https://packages.confluent.io/maven/
にあるConfluentのMavenリポジトリがプロジェクトのビルドファイルで設定されることも確認してください。
Avro形式でテーブルを作成する方法 #
生のURF-8文字列をKafkaキーとして使い、スキーマレジストリにKafkaの値として登録されたAvroレコードを使うテーブルの例:
CREATE TABLE user_created (
-- one column mapped to the Kafka raw UTF-8 key
the_kafka_key STRING,
-- a few columns mapped to the Avro fields of the Kafka value
id STRING,
name STRING,
email STRING
) WITH (
'connector' = 'kafka',
'topic' = 'user_events_example1',
'properties.bootstrap.servers' = 'localhost:9092',
-- UTF-8 string as Kafka keys, using the 'the_kafka_key' table column
'key.format' = 'raw',
'key.fields' = 'the_kafka_key',
'value.format' = 'avro-confluent',
'value.avro-confluent.url' = 'http://localhost:8082',
'value.fields-include' = 'EXCEPT_KEY'
)
以下のようにkafkaテーブルにデータを書き込むことができます:
INSERT INTO user_created
SELECT
-- replicating the user id into a column mapped to the kafka key
id as the_kafka_key,
-- all values
id, name, email
FROM some_table
Kafkaキーと値の両方がスキーマレジストリにAvroレコードとして登録されているテーブルの例:
CREATE TABLE user_created (
-- one column mapped to the 'id' Avro field of the Kafka key
kafka_key_id STRING,
-- a few columns mapped to the Avro fields of the Kafka value
id STRING,
name STRING,
email STRING
) WITH (
'connector' = 'kafka',
'topic' = 'user_events_example2',
'properties.bootstrap.servers' = 'localhost:9092',
-- Watch out: schema evolution in the context of a Kafka key is almost never backward nor
-- forward compatible due to hash partitioning.
'key.format' = 'avro-confluent',
'key.avro-confluent.url' = 'http://localhost:8082',
'key.fields' = 'kafka_key_id',
-- In this example, we want the Avro types of both the Kafka key and value to contain the field 'id'
-- => adding a prefix to the table column associated to the Kafka key field avoids clashes
'key.fields-prefix' = 'kafka_key_',
'value.format' = 'avro-confluent',
'value.avro-confluent.url' = 'http://localhost:8082',
'value.fields-include' = 'EXCEPT_KEY',
-- subjects have a default value since Flink 1.13, though can be overridden:
'key.avro-confluent.subject' = 'user_events_example2-key2',
'value.avro-confluent.subject' = 'user_events_example2-value2'
)
スキーマレジストリにAvroレコードとして登録されているKafka値を含む、upsert-kafkaコネクタを使うテーブルの例:
CREATE TABLE user_created (
-- one column mapped to the Kafka raw UTF-8 key
kafka_key_id STRING,
-- a few columns mapped to the Avro fields of the Kafka value
id STRING,
name STRING,
email STRING,
-- upsert-kafka connector requires a primary key to define the upsert behavior
PRIMARY KEY (kafka_key_id) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'user_events_example3',
'properties.bootstrap.servers' = 'localhost:9092',
-- UTF-8 string as Kafka keys
-- We don't specify 'key.fields' in this case since it's dictated by the primary key of the table
'key.format' = 'raw',
-- In this example, we want the Avro types of both the Kafka key and value to contain the field 'id'
-- => adding a prefix to the table column associated to the kafka key field avoids clashes
'key.fields-prefix' = 'kafka_key_',
'value.format' = 'avro-confluent',
'value.avro-confluent.url' = 'http://localhost:8082',
'value.fields-include' = 'EXCEPT_KEY'
)
フォーマットオプション #
オプション | 必要条件 | Forwarded | デフォルト | 種類 | 説明 |
---|---|---|---|---|---|
形式 |
必須 | no | (none) | 文字列 | 使用するフォーマットを指定します。これは'avro-confluent' でなければなりません。 |
avro-confluent.basic-auth.credentials-source |
オプション | はい | (none) | 文字列 | スキーマレジストリのBasic auth credentialソース |
avro-confluent.basic-auth.user-info |
オプション | はい | (none) | 文字列 | スキーマレジストリのBasic auth user info |
avro-confluent.bearer-auth.credentials-source |
オプション | はい | (none) | 文字列 | スキーマレジストリのBearer auth credentialソース |
avro-confluent.bearer-auth.token |
オプション | はい | (none) | 文字列 | スキーマレジストリのBearer auth トークン |
avro-confluent.properties |
オプション | はい | (none) | マップ | 基礎となる隙間レジストリに転送されるプロパティマップ。これは、Flink設定オプションを介して正式に公開されていないオプションに便利です。ただし、Flinkオプションの方が優先されることに注意してください。 |
avro-confluent.ssl.keystore.location |
オプション | はい | (none) | 文字列 | SSLキーストアの場所/ファイル |
avro-confluent.ssl.keystore.password |
オプション | はい | (none) | 文字列 | SSLキーストアのパスワード |
avro-confluent.ssl.truststore.location |
オプション | はい | (none) | 文字列 | SSL truststoreの場所/ファイル |
avro-confluent.ssl.truststore.password |
オプション | はい | (none) | 文字列 | SSL truststoreのパスワード |
avro-confluent.schema |
オプション | no | (none) | 文字列 | Confluentスキーマレジストリに格納されている、あるいは格納されるスキーマ。スキーマが指定されていない場合、Flinkはテーブルスキーマをavroスキーマに変換します。指定されたスキーマはテーブルのスキーマと一致する必要があります。 |
avro-confluent.subject |
オプション | はい | (none) | 文字列 | シリアル化中にこの形式で使われるスキーマを登録するためのConfluentスキーマレジストリサブジェクト。デフォルトは、'kafka'と'upsert-kafka'コネクタは、この形式が値またはキー形式として使われる場合、デフォルトのサブジェクト名として'<topic_name>-value'または'<topic_name>-key'を使います。ただし、他のコネクタ(例えば'filesystem')の場合、シンクとして使う場合は、サブジェクトオプションが必要です。 |
avro-confluent.url |
必須 | はい | (none) | 文字列 | ConfluentスキーマレジストリのスキーマURLを取得/登録するためのスキーマレジストリのURL。 |
データ型マッピング #
現在、Apache Flinkは常にテーブルスキーマを宇t勝手、逆シリアライズ化中にAvroリーダースキーマを導出し、シリアル化中にAvroライタースキーマを導出します。Avroスキーマの明示的な定義はまだサポートされません。 AvroとFlinkデータ型のマッピングについては、Apache Avroフォーマットを参照してください。
そこでリストされている型に加えて、Flinkはnull許容型の読み取り/書き込みをサポートします。Flinkは、null許容型をAvro union(something, null)
にマップします。something
はFlink型から変換されたAvro型です。
Avro型の詳細については、Avro Specificationを参照してください。