This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.
バージョン管理されたテーブル #
Flink SQLは、追加専用または更新のいずれかである進化する動的テーブルに対して動作します。 バージョン管理されたテーブルは各キーの過去の値を記憶する特別な型の更新テーブルを表します。
概念 #
動的テーブルは時間の経過に伴う関係を定義します。 多くの場合、特にメタデータを扱う場合、キーの古い値が変更されても無関係にはなりません。
Flink SQLはPRIMARY KEY
制約と時間属性を使って任意の動的テーブルに対してバージョン管理されたテーブルを定義できます。
Flinkでの主キー制約は、テーブルまたはビューの列または列のセットが一意であり、nullではないことを意味します。
更新/挿入テーブルの主キーセマンティクスは、特定のキーに関する具体化された変更(INSERT
/UPDATE
/DELETE
)の1つの行への変更の時間経過を表すことを意味します。upsertingテーブルの時間属性は、各変更がいつ発生したかを定義します。
まとめると、Flinkは行への変更を経時的に追跡し、そのキーの各値有効であった期間を維持できます。
テーブルが店内の様々な商品の値段を追跡するとします。
(changelog kind) update_time product_id product_name price
================= =========== ========== ============ =====
+(INSERT) 00:01:00 p_001 scooter 11.11
+(INSERT) 00:02:00 p_002 basketball 23.11
-(UPDATE_BEFORE) 12:00:00 p_001 scooter 11.11
+(UPDATE_AFTER) 12:00:00 p_001 scooter 12.99
-(UPDATE_BEFORE) 12:00:00 p_002 basketball 23.11
+(UPDATE_AFTER) 12:00:00 p_002 basketball 19.99
-(DELETE) 18:00:00 p_001 scooter 12.99
この一連の変更を考慮して、スクーターの価格が時間の経過とともにどのように変化するかを追跡します。
カタログに追加された時は、最初は00:01:00
で$11.11です。
その後、価格は12:00:00
に$12.99に上昇し、18:00:00
にカタログから削除されます。
様々な時点で様々な商品の価格をテーブルに問い合わせると、異なる結果が取得されます。10:00:00
に、テーブルには1つの価格のセットが表示されます。
update_time product_id product_name price
=========== ========== ============ =====
00:01:00 p_001 scooter 11.11
00:02:00 p_002 basketball 23.11
13:00:00,
では、別の価格セットが見つかります。
update_time product_id product_name price
=========== ========== ============ =====
12:00:00 p_001 scooter 12.99
12:00:00 p_002 basketball 19.99
バージョン管理されたテーブルソース #
バージョン管理されたテーブルは、基礎となるソースまたは形式が変更ログを直接定義するテーブルに対して、暗黙的に定義されます。
例には、upsert Kafkaソースとdebeziumとcanalのようなデータベース変更ログが含まれます。
上で説明したように、唯一の追加条件はCREATE
テーブル文にPRIMARY KEY
とイベント時間属性が含まれている必要があることです。
CREATE TABLE products (
product_id STRING,
product_name STRING,
price DECIMAL(32, 2),
update_time TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,
PRIMARY KEY (product_id) NOT ENFORCED,
WATERMARK FOR update_time AS update_time
) WITH (...);
バージョン管理されたテーブルビュー #
Flinkは基礎となるクエリに一意のキー制約とイベント時間属性が含まれている場合に、バージョン管理されたビューの定義もサポートします。追加レートの追加専用テーブルを想像してみてください。
CREATE TABLE currency_rates (
currency STRING,
rate DECIMAL(32, 10),
update_time TIMESTAMP(3),
WATERMARK FOR update_time AS update_time
) WITH (
'connector' = 'kafka',
'topic' = 'rates',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'json'
);
テーブルcurrency_rates
には、通貨ごとに行が含まれています。USD —については、レートが変更されるたびに新しい行を受け取ります。
JSON
形式はネイティブの変更ログセマンティクスをサポートしないため、Flinkはこのテーブルを追加専用としてのみ読み取ることができます。
(changelog kind) update_time currency rate
================ ============= ========= ====
+(INSERT) 09:00:00 Yen 102
+(INSERT) 09:00:00 Euro 114
+(INSERT) 09:00:00 USD 1
+(INSERT) 11:15:00 Euro 119
+(INSERT) 11:45:00 Pounds 107
+(INSERT) 11:49:00 Pounds 108
Flinkは各行をテーブルへのINSERT
として解釈します。つまり、通過に対してPRIMARY KEY
を定義できないことを意味します。
ただし、このテーブルにはバージョン管理されたテーブルを定義するために必要な情報がすべて含まれていることは、クエリ開発者にとって明らかです。
Flinkは、重複排除クエリを定義することでこのテーブルをバージョン管理されたテーブルとして再解釈できます。これにより、推論された主キー(通貨)とイベント時間(update time)を含む順序付けされた変更ログストリームが生成されます。
-- Define a versioned view
CREATE VIEW versioned_rates AS
SELECT currency, rate, update_time -- (1) `update_time` keeps the event time
FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY currency -- (2) the inferred unique key `currency` can be a primary key
ORDER BY update_time DESC) AS rownum
FROM currency_rates)
WHERE rownum = 1;
-- the view `versioned_rates` will produce a changelog as the following.
(changelog kind) update_time currency rate
================ ============= ========= ====
+(INSERT) 09:00:00 Yen 102
+(INSERT) 09:00:00 Euro 114
+(INSERT) 09:00:00 USD 1
+(UPDATE_AFTER) 11:15:00 Euro 119
+(INSERT) 11:45:00 Pounds 107
+(UPDATE_AFTER) 11:49:00 Pounds 108
Flinkには、このクエリを後続のクエリでバージョン管理されたテーブルへに効率的に変換する特別な最適化ステップがあります。 一般的に、次の形式のクエリの結果は、バージョン管理されたテーブルを生成します:
SELECT [column_list]
FROM (
SELECT [column_list],
ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]
ORDER BY time_attr DESC) AS rownum
FROM table_name)
WHERE rownum = 1
パラメータの仕様:
ROW_NUMBER()
: 一意の、各行で連続する1から始まる数値を割り当てます。PARTITION BY col1[, col2...]
: パーティション列、つまり重複排他キーを指定します。これらの列は後続のバージョン管理されたテーブルの主キーを形成します。ORDER BY time_attr DESC
: 順序列を指定します。時間属性の必要があります。WHERE rownum = 1
: Therownum = 1
は、Flinkがこのクエリがバージョン管理されたテーブルを生成することを理解するために必要です。