Maxwell Format #

Changelog-Data-Capture Format Format: Serialization Schema Format: Deserialization Schema

Maxwell is a CDC (Changelog Data Capture) tool that can stream changes in real-time from MySQL into Kafka, Kinesis and other streaming connectors. Maxwell provides a unified format schema for changelog and supports to serialize messages using JSON.

Flink supports to interpret Maxwell JSON messages as INSERT/UPDATE/DELETE messages into Flink SQL system.

  • データベースから他のシステムへの増分データの同期
  • 監査ログ
  • データベース上でのリアルタイムのマテリアライズドビュー
  • データベーステーブルなどの変更履歴の一時結合。

Flink also supports to encode the INSERT/UPDATE/DELETE messages in Flink SQL as Maxwell JSON messages, and emit to external systems like Kafka. Therefore, Flink encodes UPDATE_BEFORE and UDPATE_AFTER as DELETE and INSERT Maxwell messages.

依存 #

In order to use the Maxwell format the following dependencies are required for both projects using a build automation tool (such as Maven or SBT) and SQL Client with SQL JAR bundles.

Maven dependency SQL Client

Note: please refer to Maxwell documentation about how to synchronize changelog to Kafka topics with Maxwell JSON.

How to use Maxwell format #

Maxwell provides a unified format for changelog, here is a simple example for an update operation captured from a MySQL products table in JSON format:

   "primary_key": [1, "2016-10-21 05:33:37.523000"],
   "primary_key_columns": ["id", "c"],
     "description":"Big 2-wheel scooter",

Note: please refer to Maxwell documentation about the meaning of each fields.

Assuming this messages is synchronized to Kafka topic products_binlog, then we can use the following DDL to consume this topic and interpret the change events.

CREATE TABLE topic_products (
  -- schema is totally the same to the MySQL "products" table
  id BIGINT,
  name STRING,
  description STRING,
  weight DECIMAL(10, 2)
) WITH (
 'connector' = 'kafka',
 'topic' = 'products_binlog',
 'properties.bootstrap.servers' = 'localhost:9092',
 '' = 'testGroup',
 'format' = 'maxwell-json'

After registering the topic as a Flink table, then you can consume the Maxwell messages as a changelog source.

-- a real-time materialized view on the MySQL "products"
-- which calculate the latest average of weight for the same products
SELECT name, AVG(weight) FROM topic_products GROUP BY name;

-- synchronize all the data and incremental changes of MySQL "products" table to
-- Elasticsearch "products" index for future searching
INSERT INTO elasticsearch_products
SELECT * FROM topic_products;

利用可能なメタデータ #


キー データ型 説明
database STRING NULL The original database name. Corresponds to the database field in the Maxwell record if available.
table STRING NULL The original database table name. Corresponds to the table field in the Maxwell record if available.
primary-key-columns ARRAY<STRING> NULL 主キー名の配列。Corresponds to the primary_key_columns field in the Maxwell record if available.
ingestion-timestamp TIMESTAMP_LTZ(3) NULL The timestamp at which the connector processed the event. Corresponds to the ts field in the Maxwell record.

The following example shows how to access Maxwell metadata fields in Kafka:

  origin_database STRING METADATA FROM 'value.database' VIRTUAL,
  origin_table STRING METADATA FROM 'value.table' VIRTUAL,
  origin_primary_key_columns ARRAY<STRING> METADATA FROM 'value.primary-key-columns' VIRTUAL,
  origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,
  user_id BIGINT,
  item_id BIGINT,
  behavior STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'user_behavior',
  'properties.bootstrap.servers' = 'localhost:9092',
  '' = 'testGroup',
  'scan.startup.mode' = 'earliest-offset',
  'value.format' = 'maxwell-json'

フォーマットオプション #

オプション 必要条件 デフォルト 種類 説明
必須 (none) 文字列 Specify what format to use, here should be 'maxwell-json'.
オプション false 真偽値 解析エラーが発生したフィールドと行を失敗せずにスキップします。 エラーが発生した場合は、フィールドはnullに設定されます。
オプション 'SQL' 文字列 入力および出力のタイムスタンプの形式を指定します。現在サポートされる値は、'SQL''ISO-8601'です:
  • オプション'SQL'は、入力タイムスタンプを"yyyy-MM-dd HH:mm:ss.s{precision}"形式で解析して出力します。例えば'2020-12-30 12:13:14.123'と出力タイムスタンプは同じ形式です。
  • Option 'ISO-8601'will parse input timestamp in "yyyy-MM-ddTHH:mm:ss.s{precision}" format, e.g '2020-12-30T12:13:14.123' and output timestamp in the same format.
オプション 'FAIL' 文字列 マップデータのnullキーをシリアライズする際のモードを指定します。現在サポートされる値は'FAIL''DROP''LITERAL'です:
  • オプション'FAIL'は、nullキーを持つマップ値に遭遇した場合に例外を投げます。
  • オプション'DROP'はマップデータのnullキーエントリを削除します。
  • オプション'LITERAL'はnullキーを文字列リテラルに置き換えます。The string literal is defined by option.
オプション 'null' 文字列 Specify string literal to replace null key when '' is LITERAL.
オプション false 真偽値 全ての小数を科学的な表記法ではなく単純な数値としてエンコードします。デフォルトでは、小数は科学的な表記法を使って記述できます。例えば、0.000000027はデフォルトで2.7E-8としてエンコードされ、このオプションをtrueに設定すると0.000000027として書き込まれます。

警告 #

変更イベントの重複 #

The Maxwell application allows to deliver every change event exactly-once. Flink works pretty well when consuming Maxwell produced events in this situation. If Maxwell application works in at-least-once delivery, it may deliver duplicate change events to Kafka and Flink will get the duplicate events. This may cause Flink query to get wrong results or unexpected exceptions. Thus, it is recommended to set job configuration table.exec.source.cdc-events-duplicate to true and define PRIMARY KEY on the source in this situation. Framework will generate an additional stateful operator, and use the primary key to deduplicate the change events and produce a normalized changelog stream.

データ型マッピング #

Currently, the Maxwell format uses JSON for serialization and deserialization. Please refer to JSON Format documentation for more details about the data type mapping.

