Canal
This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.

Canal形式 #

Changelog-Data-Capture Format Format: Serialization Schema Format: Deserialization Schema

Canalは、MySQLから他のシステムに変換をリアルタイムにストリーミングできるCDC(Changelog Data Capture)ツールです。Canalは変更ログの統一フォーマットスキーマを提供し、JSONとprotobuf (protobufはCanalのデフォルト形式)を使ってメッセージのシリアライズ化をサポートします。

Flinkは、Canal JSONメッセージをFlink SQLシステムへのINSERT/UPDATE/DELETEメッセージとして解釈することをサポートしています。 これは次のような多くの場合にこの機能を利用するのに役立ちます

  • データベースから他のシステムへの増分データの同期
  • 監査ログ
  • データベース上でのリアルタイムのマテリアライズドビュー
  • データベーステーブルなどの変更履歴の一時結合。

Flinkは、Flink SQLのINSERT/UPDATE/DELETEメッセージをCanal JSONメッセージとしてエンコードし、Kafkaのようにストレージに出力することもサポートします。 ただし、現在FlinkはUPDATE_BEFOREとUPDATE_AFTERを単一のUPDATEメッセージに組み合わせることはできません。したがって、FlinkはUPDATE_BEFOREとUPDATE_AFTERを、DELETEとINSERT Canalメッセージとしてエンコードします。

注意: Canal protobufメッセージの解釈のサポートは、ロードマップにあります。

依存 #

In order to use the Canal format the following dependencies are required for both projects using a build automation tool (such as Maven or SBT) and SQL Client with SQL JAR bundles.

Maven dependency SQL Client
Built-in

注意: Canalをデプロイして変更ログをメッセージキューに同期する方法については、Canal documentationを参照してください。

Canalフォーマットの使い方 #

Canalは、変更ログの統一形式を提供します。次に、MySQL productsテーブルからキャプチャーされた更新オペレーションの簡単な例を示します:

{
  "data": [
    {
      "id": "111",
      "name": "scooter",
      "description": "Big 2-wheel scooter",
      "weight": "5.18"
    }
  ],
  "database": "inventory",
  "es": 1589373560000,
  "id": 9,
  "isDdl": false,
  "mysqlType": {
    "id": "INTEGER",
    "name": "VARCHAR(255)",
    "description": "VARCHAR(512)",
    "weight": "FLOAT"
  },
  "old": [
    {
      "weight": "5.15"
    }
  ],
  "pkNames": [
    "id"
  ],
  "sql": "",
  "sqlType": {
    "id": 4,
    "name": 12,
    "description": 12,
    "weight": 7
  },
  "table": "products",
  "ts": 1589373560798,
  "type": "UPDATE"
}

注意: 各フィールドの意味については、Canalドキュメントを参照してください。

MySQL productsテーブルには4つのカラムがあります(idnamedescriptionweight)。上記のJSONメッセージは、productsテーブルの更新変更イベントであり、id = 111の行のweight値が、5.18から5.15に変更されます。 メッセージがKafkaトピックproducts_binlogに同期されていると仮定すると、次のDDLを使ってこのトピックを消費し、変更イベントを解釈できます。

CREATE TABLE topic_products (
  -- schema is totally the same to the MySQL "products" table
  id BIGINT,
  name STRING,
  description STRING,
  weight DECIMAL(10, 2)
) WITH (
 'connector' = 'kafka',
 'topic' = 'products_binlog',
 'properties.bootstrap.servers' = 'localhost:9092',
 'properties.group.id' = 'testGroup',
 'format' = 'canal-json'  -- using canal-json as the format
)

トピックをFlinkテーブルとして登録した後で、Canalメッセージを変更ログソースとして消費できます。

-- a real-time materialized view on the MySQL "products"
-- which calculates the latest average of weight for the same products
SELECT name, AVG(weight) FROM topic_products GROUP BY name;

-- synchronize all the data and incremental changes of MySQL "products" table to
-- Elasticsearch "products" index for future searching
INSERT INTO elasticsearch_products
SELECT * FROM topic_products;

利用可能なメタデータ #

次のフォーマットのメタデータは、テーブル定義の読み取り専用(VIRTUAL)カラムとして公開できます。

フォーマットメタデータのフィールドは、対応するコネクタがフォーマットメタデータを転送する場合のみ利用できます。現在、Kafkaコネクタだけがその値の形式のメタデータフィールドを公開できます。
キー データ型 説明
database STRING NULL 元のデータベース。利用可能であればCanalレコードのdatabaseフィールドに対応します。 Canal record if available.
table STRING NULL 元のデータベーステーブル。利用可能であればCanalレコードのtableフィールドに対応します。 Canal record if available.
sql-type MAP<STRING, INT> NULL 様々なsqlタイプのマップ。利用可能であればCanalレコードのsqlTypeフィールドに対応します。 Canal record if available.
pk-names ARRAY<STRING> NULL 主キー名の配列。利用可能であればCanalレコードのpkNamesフィールドに対応します。 Canal record if available.
ingestion-timestamp TIMESTAMP_LTZ(3) NULL コネクタがイベントを処理した時のタイムスタンプ。Canalレコードのtsフィールドに対応します。 field in the Canal record.

次の例は、KakfaのCanalメタデータフィールドにアクセスする方法を示しています:

CREATE TABLE KafkaTable (
  origin_database STRING METADATA FROM 'value.database' VIRTUAL,
  origin_table STRING METADATA FROM 'value.table' VIRTUAL,
  origin_sql_type MAP<STRING, INT> METADATA FROM 'value.sql-type' VIRTUAL,
  origin_pk_names ARRAY<STRING> METADATA FROM 'value.pk-names' VIRTUAL,
  origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,
  user_id BIGINT,
  item_id BIGINT,
  behavior STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'user_behavior',
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'earliest-offset',
  'value.format' = 'canal-json'
);

フォーマットオプション #

オプション 必要条件 デフォルト 種類 説明
形式
必須 (none) 文字列 使用する形式を指定します。ここでは'canal-json'にする必要があります。
canal-json.ignore-parse-errors
オプション false 真偽値 解析エラーが発生したフィールドと行を失敗せずにスキップします。 エラーが発生した場合は、フィールドはnullに設定されます。
canal-json.timestamp-format.standard
オプション 'SQL' 文字列 入力および出力のタイムスタンプの形式を指定します。現在サポートされる値は、'SQL''ISO-8601'です:
  • オプション'SQL'は、入力タイムスタンプを"yyyy-MM-dd HH:mm:ss.s{precision}"形式で解析して出力します。例えば'2020-12-30 12:13:14.123'と出力タイムスタンプは同じ形式です。
  • オプション'ISO-8601'は、入力タイムスタンプを"yyyy-MM-ddTHH:mm:ss.s{precision}"形式で解析して出力します。例えば'2020-12-30T12:13:14.123'と同じ形式の出力タイムスタンプ。
canal-json.map-null-key.mode
オプション 'FAIL' 文字列 マップデータのnullキーをシリアライズする際のモードを指定します。現在サポートされる値は'FAIL''DROP''LITERAL'です:
  • オプション'FAIL'は、nullキーを持つマップ値に遭遇した場合に例外を投げます。
  • オプション'DROP'はマップデータのnullキーエントリを削除します。
  • オプション'LITERAL'はnullキーを文字列リテラルに置き換えます。文字列のリテラルはcanal-json.map-null-key.literalオプションによって定義されます。
canal-json.map-null-key.literal
オプション 'null' 文字列 'canal-json.map-null-key.mode'がLITERALの場合、nullキーを置き開ける文字列リテラルを指定します。
canal-json.encode.decimal-as-plain-number
オプション false 真偽値 全ての小数を科学的な表記法ではなく単純な数値としてエンコードします。デフォルトでは、小数は科学的な表記法を使って記述できます。例えば、0.000000027はデフォルトで2.7E-8としてエンコードされ、このオプションをtrueに設定すると0.000000027として書き込まれます。
canal-json.database.include
オプション (none) 文字列 An optional regular expression to only read the specific databases changelog rows by regular matching the "database" meta field in the Canal record. パターン文字列はJavaのPatternと互換性があります。
canal-json.table.include
オプション (none) 文字列 An optional regular expression to only read the specific tables changelog rows by regular matching the "table" meta field in the Canal record. パターン文字列はJavaのPatternと互換性があります。

警告 #

変更イベントの重複 #

通常の運用シナリオでは、Canalアプリケーションは全ての変更イベントを確実に1回配信します。。この状況でCanalが生成したイベントを消費する場合、Flinkは非常にうまく機能します。 ただし、フェイルオーバーが発生した倍、Canalアプリケーションは少なくとも1回配信で動作します。 つまり、異常な状況では、Canalは重複した変更イベントをメッセージキューに配信し、Flinkは重複したイベントを取得する可能性があります。 これにより、Flinkクエリで間違った結果や予期しない例外が発生する可能性があります。したがって、上部設定table.exec.source.cdc-events-duplicatetrueに設定し、この状況ではソースにPRIMARY KEYを定義することをお勧めします。 フレームワークは追加のステートフルオペレーションを生成し、主キーを使って変更イベントの重複排除をし、正規化された変更ログストリームを生成します。

データ型マッピング #

現在、Canal形式はシリアライズ化とデシリアライズ化のためにJSONを使います。データ型マッピングの詳細については、JSONフォーマットドキュメント format documentationを参照してください。

inserted by FC2 system