This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.
Canal形式 #
Changelog-Data-Capture Format Format: Serialization Schema Format: Deserialization Schema
Canalは、MySQLから他のシステムに変換をリアルタイムにストリーミングできるCDC(Changelog Data Capture)ツールです。Canalは変更ログの統一フォーマットスキーマを提供し、JSONとprotobuf (protobufはCanalのデフォルト形式)を使ってメッセージのシリアライズ化をサポートします。
Flinkは、Canal JSONメッセージをFlink SQLシステムへのINSERT/UPDATE/DELETEメッセージとして解釈することをサポートしています。 これは次のような多くの場合にこの機能を利用するのに役立ちます
- データベースから他のシステムへの増分データの同期
- 監査ログ
- データベース上でのリアルタイムのマテリアライズドビュー
- データベーステーブルなどの変更履歴の一時結合。
Flinkは、Flink SQLのINSERT/UPDATE/DELETEメッセージをCanal JSONメッセージとしてエンコードし、Kafkaのようにストレージに出力することもサポートします。 ただし、現在FlinkはUPDATE_BEFOREとUPDATE_AFTERを単一のUPDATEメッセージに組み合わせることはできません。したがって、FlinkはUPDATE_BEFOREとUPDATE_AFTERを、DELETEとINSERT Canalメッセージとしてエンコードします。
注意: Canal protobufメッセージの解釈のサポートは、ロードマップにあります。
依存 #
In order to use the Canal format the following dependencies are required for both projects using a build automation tool (such as Maven or SBT) and SQL Client with SQL JAR bundles.
Maven dependency | SQL Client |
---|---|
|
Built-in |
注意: Canalをデプロイして変更ログをメッセージキューに同期する方法については、Canal documentationを参照してください。
Canalフォーマットの使い方 #
Canalは、変更ログの統一形式を提供します。次に、MySQL products
テーブルからキャプチャーされた更新オペレーションの簡単な例を示します:
{
"data": [
{
"id": "111",
"name": "scooter",
"description": "Big 2-wheel scooter",
"weight": "5.18"
}
],
"database": "inventory",
"es": 1589373560000,
"id": 9,
"isDdl": false,
"mysqlType": {
"id": "INTEGER",
"name": "VARCHAR(255)",
"description": "VARCHAR(512)",
"weight": "FLOAT"
},
"old": [
{
"weight": "5.15"
}
],
"pkNames": [
"id"
],
"sql": "",
"sqlType": {
"id": 4,
"name": 12,
"description": 12,
"weight": 7
},
"table": "products",
"ts": 1589373560798,
"type": "UPDATE"
}
注意: 各フィールドの意味については、Canalドキュメントを参照してください。
MySQL products
テーブルには4つのカラムがあります(id
、name
、description
、weight
)。上記のJSONメッセージは、products
テーブルの更新変更イベントであり、id = 111
の行のweight
値が、5.18
から5.15
に変更されます。
メッセージがKafkaトピックproducts_binlog
に同期されていると仮定すると、次のDDLを使ってこのトピックを消費し、変更イベントを解釈できます。
CREATE TABLE topic_products (
-- schema is totally the same to the MySQL "products" table
id BIGINT,
name STRING,
description STRING,
weight DECIMAL(10, 2)
) WITH (
'connector' = 'kafka',
'topic' = 'products_binlog',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'format' = 'canal-json' -- using canal-json as the format
)
トピックをFlinkテーブルとして登録した後で、Canalメッセージを変更ログソースとして消費できます。
-- a real-time materialized view on the MySQL "products"
-- which calculates the latest average of weight for the same products
SELECT name, AVG(weight) FROM topic_products GROUP BY name;
-- synchronize all the data and incremental changes of MySQL "products" table to
-- Elasticsearch "products" index for future searching
INSERT INTO elasticsearch_products
SELECT * FROM topic_products;
利用可能なメタデータ #
次のフォーマットのメタデータは、テーブル定義の読み取り専用(VIRTUAL
)カラムとして公開できます。
フォーマットメタデータのフィールドは、対応するコネクタがフォーマットメタデータを転送する場合のみ利用できます。現在、Kafkaコネクタだけがその値の形式のメタデータフィールドを公開できます。
キー | データ型 | 説明 |
---|---|---|
database |
STRING NULL |
元のデータベース。利用可能であればCanalレコードのdatabase フィールドに対応します。
Canal record if available. |
table |
STRING NULL |
元のデータベーステーブル。利用可能であればCanalレコードのtable フィールドに対応します。
Canal record if available. |
sql-type |
MAP<STRING, INT> NULL |
様々なsqlタイプのマップ。利用可能であればCanalレコードのsqlType フィールドに対応します。
Canal record if available. |
pk-names |
ARRAY<STRING> NULL |
主キー名の配列。利用可能であればCanalレコードのpkNames フィールドに対応します。
Canal record if available. |
ingestion-timestamp |
TIMESTAMP_LTZ(3) NULL |
コネクタがイベントを処理した時のタイムスタンプ。Canalレコードのts フィールドに対応します。
field in the Canal record. |
次の例は、KakfaのCanalメタデータフィールドにアクセスする方法を示しています:
CREATE TABLE KafkaTable (
origin_database STRING METADATA FROM 'value.database' VIRTUAL,
origin_table STRING METADATA FROM 'value.table' VIRTUAL,
origin_sql_type MAP<STRING, INT> METADATA FROM 'value.sql-type' VIRTUAL,
origin_pk_names ARRAY<STRING> METADATA FROM 'value.pk-names' VIRTUAL,
origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,
user_id BIGINT,
item_id BIGINT,
behavior STRING
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'value.format' = 'canal-json'
);
フォーマットオプション #
オプション | 必要条件 | デフォルト | 種類 | 説明 |
---|---|---|---|---|
形式 |
必須 | (none) | 文字列 | 使用する形式を指定します。ここでは'canal-json' にする必要があります。 |
canal-json.ignore-parse-errors |
オプション | false | 真偽値 | 解析エラーが発生したフィールドと行を失敗せずにスキップします。 エラーが発生した場合は、フィールドはnullに設定されます。 |
canal-json.timestamp-format.standard |
オプション | 'SQL' |
文字列 | 入力および出力のタイムスタンプの形式を指定します。現在サポートされる値は、'SQL' と'ISO-8601' です:
|
canal-json.map-null-key.mode |
オプション | 'FAIL' |
文字列 | マップデータのnullキーをシリアライズする際のモードを指定します。現在サポートされる値は'FAIL' 、'DROP' 、'LITERAL' です:
|
canal-json.map-null-key.literal |
オプション | 'null' | 文字列 | 'canal-json.map-null-key.mode' がLITERALの場合、nullキーを置き開ける文字列リテラルを指定します。 |
canal-json.encode.decimal-as-plain-number |
オプション | false | 真偽値 | 全ての小数を科学的な表記法ではなく単純な数値としてエンコードします。デフォルトでは、小数は科学的な表記法を使って記述できます。例えば、0.000000027 はデフォルトで2.7E-8 としてエンコードされ、このオプションをtrueに設定すると0.000000027 として書き込まれます。 |
canal-json.database.include |
オプション | (none) | 文字列 | An optional regular expression to only read the specific databases changelog rows by regular matching the "database" meta field in the Canal record. パターン文字列はJavaのPatternと互換性があります。 |
canal-json.table.include |
オプション | (none) | 文字列 | An optional regular expression to only read the specific tables changelog rows by regular matching the "table" meta field in the Canal record. パターン文字列はJavaのPatternと互換性があります。 |
警告 #
変更イベントの重複 #
通常の運用シナリオでは、Canalアプリケーションは全ての変更イベントを確実に1回配信します。。この状況でCanalが生成したイベントを消費する場合、Flinkは非常にうまく機能します。
ただし、フェイルオーバーが発生した倍、Canalアプリケーションは少なくとも1回配信で動作します。
つまり、異常な状況では、Canalは重複した変更イベントをメッセージキューに配信し、Flinkは重複したイベントを取得する可能性があります。
これにより、Flinkクエリで間違った結果や予期しない例外が発生する可能性があります。したがって、上部設定table.exec.source.cdc-events-duplicate
をtrue
に設定し、この状況ではソースにPRIMARY KEYを定義することをお勧めします。
フレームワークは追加のステートフルオペレーションを生成し、主キーを使って変更イベントの重複排除をし、正規化された変更ログストリームを生成します。
データ型マッピング #
現在、Canal形式はシリアライズ化とデシリアライズ化のためにJSONを使います。データ型マッピングの詳細については、JSONフォーマットドキュメント format documentationを参照してください。