Confluent Avro
This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.

Confluent Avroフォーマット #

Format: Serialization Schema Format: Deserialization Schema

Avro Schema Registry (avro-confluent)フォーマットにより、io.confluent.kafka.serializers.KafkaAvroSerializerでシリアライズ化されたレコードを読み取ることができ、次にio.confluent.kafka.serializers.KafkaAvroDeserializerで読み取ることができるレコードを書き込むことができます。

この形式でレコードを読み取る(逆シリアライズ化する)場合、Avroライタースキーマは、レコード内にエンコードされたスキーマバージョンIDに基づいて、設定されたConfluentスキーマレジストリからフェッチされ、リーダースキーマはテーブルスキーマから推論されます。

この形式でレコードを書き込む(シリアライズ化する)場合、Avroスキーマはテーブルスキーマから推論され、データと共にエンコードされるスキーマIDを取得するために使われます。検索は、avro-confluent.subjectで指定されたsubjectの下にある設定済みのConfluentスキーマレジストリで実行されます。

Avroスキーマレジストリ形式は、Apache Kafka SQLコネクタまたはUpsert Kafka SQLコネクタと合わせてのみ使えます。

依存 #

In order to use the Avro Schema Registry format the following dependencies are required for both projects using a build automation tool (such as Maven or SBT) and SQL Client with SQL JAR bundles.

Maven dependency SQL Client
Only available for stable releases.

Maven、SBT、Gradle、その他のビルド自動化ツールの場合は、https://packages.confluent.io/maven/にあるConfluentのMavenリポジトリがプロジェクトのビルドファイルで設定されることも確認してください。

Avro形式でテーブルを作成する方法 #

生のURF-8文字列をKafkaキーとして使い、スキーマレジストリにKafkaの値として登録されたAvroレコードを使うテーブルの例:

CREATE TABLE user_created (

  -- one column mapped to the Kafka raw UTF-8 key
  the_kafka_key STRING,
  
  -- a few columns mapped to the Avro fields of the Kafka value
  id STRING,
  name STRING,
  email STRING

) WITH (

  'connector' = 'kafka',
  'topic' = 'user_events_example1',
  'properties.bootstrap.servers' = 'localhost:9092',

  -- UTF-8 string as Kafka keys, using the 'the_kafka_key' table column
  'key.format' = 'raw',
  'key.fields' = 'the_kafka_key',

  'value.format' = 'avro-confluent',
  'value.avro-confluent.url' = 'http://localhost:8082',
  'value.fields-include' = 'EXCEPT_KEY'
)

以下のようにkafkaテーブルにデータを書き込むことができます:

INSERT INTO user_created
SELECT
  -- replicating the user id into a column mapped to the kafka key
  id as the_kafka_key,

  -- all values
  id, name, email
FROM some_table

Kafkaキーと値の両方がスキーマレジストリにAvroレコードとして登録されているテーブルの例:

CREATE TABLE user_created (
  
  -- one column mapped to the 'id' Avro field of the Kafka key
  kafka_key_id STRING,
  
  -- a few columns mapped to the Avro fields of the Kafka value
  id STRING,
  name STRING,
  email STRING
  
) WITH (

  'connector' = 'kafka',
  'topic' = 'user_events_example2',
  'properties.bootstrap.servers' = 'localhost:9092',

  -- Watch out: schema evolution in the context of a Kafka key is almost never backward nor
  -- forward compatible due to hash partitioning.
  'key.format' = 'avro-confluent',
  'key.avro-confluent.url' = 'http://localhost:8082',
  'key.fields' = 'kafka_key_id',

  -- In this example, we want the Avro types of both the Kafka key and value to contain the field 'id'
  -- => adding a prefix to the table column associated to the Kafka key field avoids clashes
  'key.fields-prefix' = 'kafka_key_',

  'value.format' = 'avro-confluent',
  'value.avro-confluent.url' = 'http://localhost:8082',
  'value.fields-include' = 'EXCEPT_KEY',
   
  -- subjects have a default value since Flink 1.13, though can be overridden:
  'key.avro-confluent.subject' = 'user_events_example2-key2',
  'value.avro-confluent.subject' = 'user_events_example2-value2'
)

スキーマレジストリにAvroレコードとして登録されているKafka値を含む、upsert-kafkaコネクタを使うテーブルの例:

CREATE TABLE user_created (
  
  -- one column mapped to the Kafka raw UTF-8 key
  kafka_key_id STRING,
  
  -- a few columns mapped to the Avro fields of the Kafka value
  id STRING,
  name STRING,
  email STRING,
  
  -- upsert-kafka connector requires a primary key to define the upsert behavior
  PRIMARY KEY (kafka_key_id) NOT ENFORCED

) WITH (

  'connector' = 'upsert-kafka',
  'topic' = 'user_events_example3',
  'properties.bootstrap.servers' = 'localhost:9092',

  -- UTF-8 string as Kafka keys
  -- We don't specify 'key.fields' in this case since it's dictated by the primary key of the table
  'key.format' = 'raw',
  
  -- In this example, we want the Avro types of both the Kafka key and value to contain the field 'id'
  -- => adding a prefix to the table column associated to the kafka key field avoids clashes
  'key.fields-prefix' = 'kafka_key_',

  'value.format' = 'avro-confluent',
  'value.avro-confluent.url' = 'http://localhost:8082',
  'value.fields-include' = 'EXCEPT_KEY'
)

フォーマットオプション #

オプション 必要条件 Forwarded デフォルト 種類 説明
形式
必須 no (none) 文字列 使用するフォーマットを指定します。これは'avro-confluent'でなければなりません。
avro-confluent.basic-auth.credentials-source
オプション はい (none) 文字列 スキーマレジストリのBasic auth credentialソース
avro-confluent.basic-auth.user-info
オプション はい (none) 文字列 スキーマレジストリのBasic auth user info
avro-confluent.bearer-auth.credentials-source
オプション はい (none) 文字列 スキーマレジストリのBearer auth credentialソース
avro-confluent.bearer-auth.token
オプション はい (none) 文字列 スキーマレジストリのBearer auth トークン
avro-confluent.properties
オプション はい (none) マップ 基礎となる隙間レジストリに転送されるプロパティマップ。これは、Flink設定オプションを介して正式に公開されていないオプションに便利です。ただし、Flinkオプションの方が優先されることに注意してください。
avro-confluent.ssl.keystore.location
オプション はい (none) 文字列 SSLキーストアの場所/ファイル
avro-confluent.ssl.keystore.password
オプション はい (none) 文字列 SSLキーストアのパスワード
avro-confluent.ssl.truststore.location
オプション はい (none) 文字列 SSL truststoreの場所/ファイル
avro-confluent.ssl.truststore.password
オプション はい (none) 文字列 SSL truststoreのパスワード
avro-confluent.schema
オプション no (none) 文字列 Confluentスキーマレジストリに格納されている、あるいは格納されるスキーマ。スキーマが指定されていない場合、Flinkはテーブルスキーマをavroスキーマに変換します。指定されたスキーマはテーブルのスキーマと一致する必要があります。
avro-confluent.subject
オプション はい (none) 文字列 シリアル化中にこの形式で使われるスキーマを登録するためのConfluentスキーマレジストリサブジェクト。デフォルトは、'kafka'と'upsert-kafka'コネクタは、この形式が値またはキー形式として使われる場合、デフォルトのサブジェクト名として'<topic_name>-value'または'<topic_name>-key'を使います。ただし、他のコネクタ(例えば'filesystem')の場合、シンクとして使う場合は、サブジェクトオプションが必要です。
avro-confluent.url
必須 はい (none) 文字列 ConfluentスキーマレジストリのスキーマURLを取得/登録するためのスキーマレジストリのURL。

データ型マッピング #

現在、Apache Flinkは常にテーブルスキーマを宇t勝手、逆シリアライズ化中にAvroリーダースキーマを導出し、シリアル化中にAvroライタースキーマを導出します。Avroスキーマの明示的な定義はまだサポートされません。 AvroとFlinkデータ型のマッピングについては、Apache Avroフォーマットを参照してください。

そこでリストされている型に加えて、Flinkはnull許容型の読み取り/書き込みをサポートします。Flinkは、null許容型をAvro union(something, null)にマップします。somethingはFlink型から変換されたAvro型です。

Avro型の詳細については、Avro Specificationを参照してください。

inserted by FC2 system