This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.
Apache Kafka SQLコネクタ #
Scan Source: Bounded Scan Source: Unbounded Sink: Streaming Append Mode
Kafkaコネクタを使ってKafkaトピックからのデータの読み取りとKafkaトピックへのデータの書き込みができます。
依存 #
In order to use the Kafka connector the following dependencies are required for both projects using a build automation tool (such as Maven or SBT) and SQL Client with SQL JAR bundles.
Kafka version | Maven dependency | SQL Client JAR |
---|---|---|
universal |
|
Only available for stable releases. |
Kafkaコネクタはバイナリ配布物の一部ではありません。 クラスタ実行用にリンクする方法については、こちらをご覧ください。
Kafkaテーブルの作成方法 #
以下の例はKafkaテーブルを作成する方法を示します:
CREATE TABLE KafkaTable (
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING,
`ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'format' = 'csv'
)
利用可能なメタデータ #
次のコネクタのメタデータは、テーブル定義のメタデータ列としてアクセスできます。
R/W
列は、メタデータフィールドが読み取り可能(R
) および/または書き込み可能(W
)かどうかを定義します。
INSERT INTO
オペレーション中に読み取り専用列を除外するいは、VIRTUAL
を宣言する必要があります。
キー | データ型 | 説明 | R/W |
---|---|---|---|
topic |
STRING NOT NULL |
Kafkaレコードのトピック名 | R |
partition |
INT NOT NULL |
KafkaレコードのパーティションID。 | R |
headers |
MAP<STRING, BYTES> NOT NULL |
生のバイトのマップとしてのKafkaレコードのヘッダー。 | R/W |
leader-epoch |
INT NULL |
利用可能であれば、Kafkaレコードのリーダーエポック。 | R |
offset |
BIGINT NOT NULL |
パーティション内のKafkaレコードのオフセット。 | R |
timestamp |
TIMESTAMP_LTZ(3) NOT NULL |
Kafkaレコードのタイムスタンプ。 | R/W |
timestamp-type |
STRING NOT NULL |
Kafkaレコードのタイムスタンプ型。Either "NoTimestampType", "CreateTime" (also set when writing metadata), or "LogAppendTime". | R |
以下の拡張されたCREATE TABLE
の例は、これらのメタデータフィールドを公開するための構文を示しています:
CREATE TABLE KafkaTable (
`event_time` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp',
`partition` BIGINT METADATA VIRTUAL,
`offset` BIGINT METADATA VIRTUAL,
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'format' = 'csv'
);
Format Metadata
コネクタは、読み取り用の値形式のメタデータを公開できます。フォーマットメタデータキーには、'value.'
というプレフィックスが付けられます。
以下の例では、KafkaとDebeziumの両方のメタデータフィールドアクセスする方法を示しています:
CREATE TABLE KafkaTable (
`event_time` TIMESTAMP_LTZ(3) METADATA FROM 'value.source.timestamp' VIRTUAL, -- from Debezium format
`origin_table` STRING METADATA FROM 'value.source.table' VIRTUAL, -- from Debezium format
`partition_id` BIGINT METADATA FROM 'partition' VIRTUAL, -- from Kafka connector
`offset` BIGINT METADATA VIRTUAL, -- from Kafka connector
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'value.format' = 'debezium-json'
);
コネクションオプション #
オプション | 必要条件 | Forwarded | デフォルト | 種類 | 説明 |
---|---|---|---|---|---|
connector |
必須 | no | (none) | 文字列 | 使うコネクタを指定します。Kafkaの場合は、'kafka' を使います。 |
トピック |
シンクに必要 | はい | (none) | 文字列 | テーブルがソースとして使われる場合にデータを読み取るトピック名。また、'topic-1;topic-2' のようにトピックをセミコロンで区切ることにより、ソースのトピックリストもサポートします。ソースとして、"topic-pattern"と"topic"のどちらか1つのみを指定できることに注意してください。テーブルがシンクとして使われる場合、トピック名はデータを書き込むトピックです。トピックリストはシンクではサポートされないことに注意してください。 |
topic-pattern |
オプション | はい | (none) | 文字列 | 意味取り対象となるトピック名のパターンの正規表現。指定された正規表現に一致する名前を持つ全てのトピックは、ジョブの開始時にコンシューマによってサブスクライブされます。ソースとして、"topic-pattern"と"topic"のどちらか1つのみを指定できることに注意してください。 |
properties.bootstrap.servers |
必須 | はい | (none) | 文字列 | Kafkaブローカのカンマ区切りのリスト。 |
properties.group.id |
ソースでは任意ですが、シンクには適用されません。 | はい | (none) | 文字列 | Kafkaソース用のコンシューマグループのID。グループIDが指定されない場合、自動的に生成されるID"KafkaSource-{tableIdentifier}"が使われます。 |
properties.* |
オプション | no | (none) | 文字列 |
これにより、任意のKafka設定を設定して渡すことができます。サフィックス名は、Kafka設定ドキュメントで定義されている設定キーと一致する必要があります。Flinkは"properties."キープリフィックスを削除し、変換されたキーと値を基となるKafkaClientに渡します。例えば、'properties.allow.auto.create.topics' = 'false' を使って自動トピック設定を無効にできます。ただし、例えば'key.deserializer' や'value.deserializer' はFlinkが上書きするため、設定がサポートされない設定がいくつかあります。
|
形式 |
必須 | no | (none) | 文字列 | Kafkaメッセージの値部分を逆シリアライズ化およびシリアライズ化するために使われるフォーマット。
詳細とフォーマットオプションの詳細については、}}">formatsページを参照してください。
more details and more format options.
注意: このオプションまたは'value.format' オプションのいずれかが必要です。
|
key.format |
オプション | no | (none) | 文字列 | Kafkaメッセージのキー部分を逆シリアライズ化およびシリアライズ化するために使われるフォーマット。
詳細とフォーマットオプションの詳細については、}}">formatsページを参照してください。
for more details and more format options. 注意: キーフォーマットが定義されている場合、'key.fields'
オプションも必要です。そうでなければ、Kafkaのレコードのキーが空になります。
|
key.fields |
オプション | no | [] | List<String> | Defines an explicit list of physical columns from the table schema that configure the data
type for the key format. デフォルトでは、このリストは空で、従ってキーは未定義です。
リストは'field1;field2' のようになります。
|
key.fields-prefix |
オプション | no | (none) | 文字列 | Defines a custom prefix for all fields of the key format to avoid name clashes with fields
of the value format. デフォルトでは、プリフィックスは空です。独自のプリフィックスが定義された場合、
テーブルスキーマと'key.fields' の両方が、プリフィックス名を使って動作します。When constructing the
data type of the key format, the prefix will be removed and the non-prefixed names will be used
within the key format. Please note that this option requires that 'value.fields-include'
must be set to 'EXCEPT_KEY' .
|
value.format |
必須 | no | (none) | 文字列 | Kafkaメッセージの値部分を逆シリアライズ化およびシリアライズ化するために使われるフォーマット。
詳細とフォーマットオプションの詳細については、}}">formatsページを参照してください。
for more details and more format options.
Note: Either this option or the 'format' option are required.
|
value.fields-include |
オプション | no | ALL | Enum Possible values: [ALL, EXCEPT_KEY] |
Defines a strategy how to deal with key columns in the data type of the value format. By
default, 'ALL' physical columns of the table schema will be included in the value
format which means that key columns appear in the data type for both the key and value format.
|
scan.startup.mode |
オプション | はい | group-offsets | Enum | Startup mode for Kafka consumer, valid values are 'earliest-offset' , 'latest-offset' , 'group-offsets' , 'timestamp' and 'specific-offsets' .
See the following Start Reading Position for more details. |
scan.startup.specific-offsets |
オプション | はい | (none) | 文字列 | Specify offsets for each partition in case of 'specific-offsets' startup mode, e.g. 'partition:0,offset:42;partition:1,offset:300' .
|
scan.startup.timestamp-millis |
オプション | はい | (none) | Long | Start from the specified epoch timestamp (milliseconds) used in case of 'timestamp' startup mode. |
scan.bounded.mode |
オプション | unbounded | Enum | Bounded mode for Kafka consumer, valid values are 'latest-offset' , 'group-offsets' , 'timestamp' and 'specific-offsets' .
See the following Bounded Ending Position for more details. |
|
scan.bounded.specific-offsets |
オプション | はい | (none) | 文字列 | Specify offsets for each partition in case of 'specific-offsets' bounded mode, e.g. 'partition:0,offset:42;partition:1,offset:300'. If an offset
for a partition is not provided it will not consume from that partition. .
|
scan.bounded.timestamp-millis |
オプション | はい | (none) | Long | End at the specified epoch timestamp (milliseconds) used in case of 'timestamp' bounded mode. |
scan.topic-partition-discovery.interval |
オプション | はい | (none) | 期間 | Interval for consumer to discover dynamically created Kafka topics and partitions periodically. |
sink.partitioner |
オプション | はい | 'default' | 文字列 | Output partitioning from Flink's partitions into Kafka's partitions. Valid values are
|
sink.semantic |
オプション | no | at-least-once | 文字列 | Deprecated: Please use sink.delivery-guarantee . |
sink.delivery-guarantee |
オプション | no | at-least-once | 文字列 | Defines the delivery semantic for the Kafka sink. Valid enumerationns are 'at-least-once' , 'exactly-once' and 'none' . See Consistency guarantees for more details. |
sink.transactional-id-prefix |
オプション | はい | (none) | 文字列 | If the delivery guarantee is configured as 'exactly-once' this value must be set and is used a prefix for the identifier of all opened Kafka transactions. |
sink.parallelism |
オプション | no | (none) | 数字 | Defines the parallelism of the Kafka sink operator. デフォルトでは、並列処理数は、上流のチェーンされたオペレータと同じ並列処理数を使ってフレームワークによって決定されます。 |
特徴 #
Key and Value Formats #
Both the key and value part of a Kafka record can be serialized to and deserialized from raw bytes using one of the given formats.
Value Format
Since a key is optional in Kafka records, the following statement reads and writes records with a configured
value format but without a key format. The 'format'
option is a synonym for 'value.format'
. All format
options are prefixed with the format identifier.
CREATE TABLE KafkaTable (
`ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp',
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING
) WITH (
'connector' = 'kafka',
...
'format' = 'json',
'json.ignore-parse-errors' = 'true'
)
The value format will be configured with the following data type:
ROW<`user_id` BIGINT, `item_id` BIGINT, `behavior` STRING>
Key and Value Format
The following example shows how to specify and configure key and value formats. The format options are
prefixed with either the 'key'
or 'value'
plus format identifier.
CREATE TABLE KafkaTable (
`ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp',
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING
) WITH (
'connector' = 'kafka',
...
'key.format' = 'json',
'key.json.ignore-parse-errors' = 'true',
'key.fields' = 'user_id;item_id',
'value.format' = 'json',
'value.json.fail-on-missing-field' = 'false',
'value.fields-include' = 'ALL'
)
The key format includes the fields listed in 'key.fields'
(using ';'
as the delimiter) in the same
order. Thus, it will be configured with the following data type:
ROW<`user_id` BIGINT, `item_id` BIGINT>
Since the value format is configured with 'value.fields-include' = 'ALL'
, key fields will also end up in
the value format’s data type:
ROW<`user_id` BIGINT, `item_id` BIGINT, `behavior` STRING>
Overlapping Format Fields
The connector cannot split the table’s columns into key and value fields based on schema information
if both key and value formats contain fields of the same name. The 'key.fields-prefix'
option allows
to give key columns a unique name in the table schema while keeping the original names when configuring
the key format.
The following example shows a key and value format that both contain a version
field:
CREATE TABLE KafkaTable (
`k_version` INT,
`k_user_id` BIGINT,
`k_item_id` BIGINT,
`version` INT,
`behavior` STRING
) WITH (
'connector' = 'kafka',
...
'key.format' = 'json',
'key.fields-prefix' = 'k_',
'key.fields' = 'k_version;k_user_id;k_item_id',
'value.format' = 'json',
'value.fields-include' = 'EXCEPT_KEY'
)
The value format must be configured in 'EXCEPT_KEY'
mode. The formats will be configured with
the following data types:
key format:
ROW<`version` INT, `user_id` BIGINT, `item_id` BIGINT>
value format:
ROW<`version` INT, `behavior` STRING>
Topic and Partition Discovery #
The config option topic
and topic-pattern
specifies the topics or topic pattern to consume for source. The config option topic
can accept topic list using semicolon separator like ’topic-1;topic-2’.
The config option topic-pattern
will use regular expression to discover the matched topic. For example, if the topic-pattern
is test-topic-[0-9]
, then all topics with names that match the specified regular expression (starting with test-topic-
and ending with a single digit)) will be subscribed by the consumer when the job starts running.
To allow the consumer to discover dynamically created topics after the job started running, set a non-negative value for scan.topic-partition-discovery.interval
. これにより、コンシューマは指定されたパターンにも一致する名前を持つ新しいトピックのパーティションを見つけることができます。
Please refer to Kafka DataStream Connector documentation for more about topic and partition discovery.
Note that topic list and topic pattern only work in sources. In sinks, Flink currently only supports a single topic.
Start Reading Position #
The config option scan.startup.mode
specifies the startup mode for Kafka consumer. The valid enumerations are:
group-offsets
: start from committed offsets in ZK / Kafka brokers of a specific consumer group.earliest-offset
: start from the earliest offset possible.latest-offset
: start from the latest offset.timestamp
: start from user-supplied timestamp for each partition.specific-offsets
: start from user-supplied specific offsets for each partition.
The default option value is group-offsets
which indicates to consume from last committed offsets in ZK / Kafka brokers.
If timestamp
is specified, another config option scan.startup.timestamp-millis
is required to specify a specific startup timestamp in milliseconds since January 1, 1970 00:00:00.000 GMT.
If specific-offsets
is specified, another config option scan.startup.specific-offsets
is required to specify specific startup offsets for each partition,
e.g. an option value partition:0,offset:42;partition:1,offset:300
indicates offset 42
for partition 0
and offset 300
for partition 1
.
Bounded Ending Position #
The config option scan.bounded.mode
specifies the bounded mode for Kafka consumer. The valid enumerations are:
- `group-offsets`: bounded by committed offsets in ZooKeeper / Kafka brokers of a specific consumer group. This is evaluated at the start of consumption from a given partition.
- `latest-offset`: bounded by latest offsets. This is evaluated at the start of consumption from a given partition.
- `timestamp`: bounded by a user-supplied timestamp.
- `specific-offsets`: bounded by user-supplied specific offsets for each partition.
If config option value scan.bounded.mode
is not set the default is an unbounded table.
If timestamp
is specified, another config option scan.bounded.timestamp-millis
is required to specify a specific bounded timestamp in milliseconds since January 1, 1970 00:00:00.000 GMT.
If specific-offsets
is specified, another config option scan.bounded.specific-offsets
is required to specify specific bounded offsets for each partition,
e.g. an option value partition:0,offset:42;partition:1,offset:300
indicates offset 42
for partition 0
and offset 300
for partition 1
. If an offset for a partition is not provided it will not consume from that partition.
CDC Changelog Source #
Flink natively supports Kafka as a CDC changelog source. If messages in a Kafka topic are change event captured from other databases using a CDC tool, you can use the corresponding Flink CDC format to interpret the messages as INSERT/UPDATE/DELETE statements into a Flink SQL table.
The changelog source is a very useful feature in many cases, such as synchronizing incremental data from databases to other systems, auditing logs, materialized views on databases, temporal join changing history of a database table and so on.
Flink provides several CDC formats:
Sink Partitioning #
The config option sink.partitioner
specifies output partitioning from Flink’s partitions into Kafka’s partitions.
By default, Flink uses the Kafka default partitioner to partition records. It uses the sticky partition strategy for records with null keys and uses a murmur2 hash to compute the partition for a record with the key defined.
In order to control the routing of rows into partitions, a custom sink partitioner can be provided. The ‘fixed’ partitioner will write the records in the same Flink partition into the same Kafka partition, which could reduce the cost of the network connections.
Consistency guarantees #
By default, a Kafka sink ingests data with at-least-once guarantees into a Kafka topic if the query is executed with checkpointing enabled.
With Flink’s checkpointing enabled, the kafka
connector can provide exactly-once delivery guarantees.
Besides enabling Flink’s checkpointing, you can also choose three different modes of operating chosen by passing appropriate sink.delivery-guarantee
option:
none
: Flink will not guarantee anything. 生成されるレコードは紛失されるかあるいは重複されるかもしれません。at-least-once
(default setting): This guarantees that no records will be lost (although they can be duplicated).exactly-once
: Kafka transactions will be used to provide exactly-once semantic. Whenever you write to Kafka using transactions, do not forget about setting desiredisolation.level
(read_committed
orread_uncommitted
- the latter one is the default value) for any application consuming records from Kafka.
Please refer to Kafka documentation for more caveats about delivery guarantees.
Source Per-Partition Watermarks #
Flink supports to emit per-partition watermarks for Kafka. Watermarks are generated inside the Kafka
consumer. The per-partition watermarks are merged in the same way as watermarks are merged during streaming
shuffles. The output watermark of the source is determined by the minimum watermark among the partitions
it reads. If some partitions in the topics are idle, the watermark generator will not advance. You can
alleviate this problem by setting the 'table.exec.source.idle-timeout'
option in the table configuration.
Please refer to Kafka watermark strategies for more details.
セキュリティ #
In order to enable security configurations including encryption and authentication, you just need to setup security configurations with “properties.” prefix in table options. The code snippet below shows configuring Kafka table to use PLAIN as SASL mechanism and provide JAAS configuration:
CREATE TABLE KafkaTable (
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING,
`ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka',
...
'properties.security.protocol' = 'SASL_PLAINTEXT',
'properties.sasl.mechanism' = 'PLAIN',
'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\";'
)
For a more complex example, use SASL_SSL as the security protocol and use SCRAM-SHA-256 as SASL mechanism:
CREATE TABLE KafkaTable (
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING,
`ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka',
...
'properties.security.protocol' = 'SASL_SSL',
/* SSL configurations */
/* Configure the path of truststore (CA) provided by the server */
'properties.ssl.truststore.location' = '/path/to/kafka.client.truststore.jks',
'properties.ssl.truststore.password' = 'test1234',
/* Configure the path of keystore (private key) if client authentication is required */
'properties.ssl.keystore.location' = '/path/to/kafka.client.keystore.jks',
'properties.ssl.keystore.password' = 'test1234',
/* SASL configurations */
/* Set SASL mechanism as SCRAM-SHA-256 */
'properties.sasl.mechanism' = 'SCRAM-SHA-256',
/* Set JAAS configurations */
'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.scram.ScramLoginModule required username=\"username\" password=\"password\";'
)
Please note that the class path of the login module in sasl.jaas.config
might be different if you relocate Kafka
client dependencies, so you may need to rewrite it with the actual class path of the module in the JAR.
For example if you are using SQL client JAR, which has relocate Kafka client dependencies to org.apache.flink.kafka.shaded.org.apache.kafka
,
the path of plain login module should be org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule
instead.
For detailed explanations of security configurations, please refer to the “Security” section in Apache Kafka documentation.
データ型マッピング #
Kafka stores message keys and values as bytes, so Kafka doesn’t have schema or data types. The Kafka messages are deserialized and serialized by formats, e.g. csv, json, avro. Thus, the data type mapping is determined by specific formats. Please refer to Formats pages for more details.