This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.
Maxwell Format #
Changelog-Data-Capture Format Format: Serialization Schema Format: Deserialization Schema
Maxwell is a CDC (Changelog Data Capture) tool that can stream changes in real-time from MySQL into Kafka, Kinesis and other streaming connectors. Maxwell provides a unified format schema for changelog and supports to serialize messages using JSON.
Flink supports to interpret Maxwell JSON messages as INSERT/UPDATE/DELETE messages into Flink SQL system. これは次のような多くの場合にこの機能を利用するのに役立ちます
- データベースから他のシステムへの増分データの同期
- 監査ログ
- データベース上でのリアルタイムのマテリアライズドビュー
- データベーステーブルなどの変更履歴の一時結合。
Flink also supports to encode the INSERT/UPDATE/DELETE messages in Flink SQL as Maxwell JSON messages, and emit to external systems like Kafka. ただし、現在FlinkはUPDATE_BEFOREとUPDATE_AFTERを単一のUPDATEメッセージに組み合わせることはできません。Therefore, Flink encodes UPDATE_BEFORE and UDPATE_AFTER as DELETE and INSERT Maxwell messages.
依存 #
In order to use the Maxwell format the following dependencies are required for both projects using a build automation tool (such as Maven or SBT) and SQL Client with SQL JAR bundles.
Maven dependency | SQL Client |
---|---|
|
Built-in |
Note: please refer to Maxwell documentation about how to synchronize changelog to Kafka topics with Maxwell JSON.
How to use Maxwell format #
Maxwell provides a unified format for changelog, here is a simple example for an update operation captured from a MySQL products
table in JSON format:
{
"database":"test",
"table":"e",
"type":"insert",
"ts":1477053217,
"xid":23396,
"commit":true,
"position":"master.000006:800911",
"server_id":23042,
"thread_id":108,
"primary_key": [1, "2016-10-21 05:33:37.523000"],
"primary_key_columns": ["id", "c"],
"data":{
"id":111,
"name":"scooter",
"description":"Big 2-wheel scooter",
"weight":5.15
},
"old":{
"weight":5.18,
}
}
Note: please refer to Maxwell documentation about the meaning of each fields.
MySQL products
テーブルには4つのカラムがあります(id
、name
、description
、weight
)。上記のJSONメッセージは、products
テーブルの更新変更イベントであり、id = 111
の行のweight
値が、5.18
から5.15
に変更されます。
Assuming this messages is synchronized to Kafka topic products_binlog
, then we can use the following DDL to consume this topic and interpret the change events.
CREATE TABLE topic_products (
-- schema is totally the same to the MySQL "products" table
id BIGINT,
name STRING,
description STRING,
weight DECIMAL(10, 2)
) WITH (
'connector' = 'kafka',
'topic' = 'products_binlog',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'format' = 'maxwell-json'
)
After registering the topic as a Flink table, then you can consume the Maxwell messages as a changelog source.
-- a real-time materialized view on the MySQL "products"
-- which calculate the latest average of weight for the same products
SELECT name, AVG(weight) FROM topic_products GROUP BY name;
-- synchronize all the data and incremental changes of MySQL "products" table to
-- Elasticsearch "products" index for future searching
INSERT INTO elasticsearch_products
SELECT * FROM topic_products;
利用可能なメタデータ #
次のフォーマットのメタデータは、テーブル定義の読み取り専用(VIRTUAL
)カラムとして公開できます。
フォーマットメタデータのフィールドは、対応するコネクタがフォーマットメタデータを転送する場合のみ利用できます。現在、Kafkaコネクタだけがその値の形式のメタデータフィールドを公開できます。
キー | データ型 | 説明 |
---|---|---|
database |
STRING NULL |
元のデータベース。利用可能であればCanalレコードのdatabase フィールドに対応します。
Maxwell record if available. |
table |
STRING NULL |
元のデータベーステーブル。利用可能であればCanalレコードのtable フィールドに対応します。
Maxwell record if available. |
primary-key-columns |
ARRAY<STRING> NULL |
主キー名の配列。Corresponds to the primary_key_columns field in the
Maxwell record if available. |
ingestion-timestamp |
TIMESTAMP_LTZ(3) NULL |
コネクタがイベントを処理した時のタイムスタンプ。Canalレコードのts フィールドに対応します。
field in the Maxwell record. |
The following example shows how to access Maxwell metadata fields in Kafka:
CREATE TABLE KafkaTable (
origin_database STRING METADATA FROM 'value.database' VIRTUAL,
origin_table STRING METADATA FROM 'value.table' VIRTUAL,
origin_primary_key_columns ARRAY<STRING> METADATA FROM 'value.primary-key-columns' VIRTUAL,
origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,
user_id BIGINT,
item_id BIGINT,
behavior STRING
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'value.format' = 'maxwell-json'
);
フォーマットオプション #
オプション | 必要条件 | デフォルト | 種類 | 説明 |
---|---|---|---|---|
形式 |
必須 | (none) | 文字列 | Specify what format to use, here should be 'maxwell-json' . |
maxwell-json.ignore-parse-errors |
オプション | false | 真偽値 | 解析エラーが発生したフィールドと行を失敗せずにスキップします。 エラーが発生した場合は、フィールドはnullに設定されます。 |
maxwell-json.timestamp-format.standard |
オプション | 'SQL' |
文字列 | 入力および出力のタイムスタンプの形式を指定します。現在サポートされる値は、'SQL' と'ISO-8601' です:
|
maxwell-json.map-null-key.mode |
オプション | 'FAIL' |
文字列 | マップデータのnullキーをシリアライズする際のモードを指定します。現在サポートされる値は'FAIL' 、'DROP' 、'LITERAL' です:
|
maxwell-json.map-null-key.literal |
オプション | 'null' | 文字列 | Specify string literal to replace null key when 'maxwell-json.map-null-key.mode' is LITERAL. |
maxwell-json.encode.decimal-as-plain-number |
オプション | false | 真偽値 | 全ての小数を科学的な表記法ではなく単純な数値としてエンコードします。デフォルトでは、小数は科学的な表記法を使って記述できます。例えば、0.000000027 はデフォルトで2.7E-8 としてエンコードされ、このオプションをtrueに設定すると0.000000027 として書き込まれます。 |
警告 #
変更イベントの重複 #
The Maxwell application allows to deliver every change event exactly-once. Flink works pretty well when consuming Maxwell produced events in this situation.
If Maxwell application works in at-least-once delivery, it may deliver duplicate change events to Kafka and Flink will get the duplicate events.
これにより、Flinkクエリで間違った結果や予期しない例外が発生する可能性があります。したがって、上部設定table.exec.source.cdc-events-duplicate
をtrue
に設定し、この状況ではソースにPRIMARY KEYを定義することをお勧めします。
フレームワークは追加のステートフルオペレーションを生成し、主キーを使って変更イベントの重複排除をし、正規化された変更ログストリームを生成します。
データ型マッピング #
Currently, the Maxwell format uses JSON for serialization and deserialization. Please refer to JSON Format documentation for more details about the data type mapping.