Versioned Tables
This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.

バージョン管理されたテーブル #

Flink SQLは、追加専用または更新のいずれかである進化する動的テーブルに対して動作します。 バージョン管理されたテーブルは各キーの過去の値を記憶する特別な型の更新テーブルを表します。

概念 #

動的テーブルは時間の経過に伴う関係を定義します。 多くの場合、特にメタデータを扱う場合、キーの古い値が変更されても無関係にはなりません。

Flink SQLはPRIMARY KEY制約と時間属性を使って任意の動的テーブルに対してバージョン管理されたテーブルを定義できます。

Flinkでの主キー制約は、テーブルまたはビューの列または列のセットが一意であり、nullではないことを意味します。 更新/挿入テーブルの主キーセマンティクスは、特定のキーに関する具体化された変更(INSERT/UPDATE/DELETE)の1つの行への変更の時間経過を表すことを意味します。upsertingテーブルの時間属性は、各変更がいつ発生したかを定義します。

まとめると、Flinkは行への変更を経時的に追跡し、そのキーの各値有効であった期間を維持できます。

テーブルが店内の様々な商品の値段を追跡するとします。

(changelog kind)  update_time  product_id product_name price
================= ===========  ========== ============ ===== 
+(INSERT)         00:01:00     p_001      scooter      11.11
+(INSERT)         00:02:00     p_002      basketball   23.11
-(UPDATE_BEFORE)  12:00:00     p_001      scooter      11.11
+(UPDATE_AFTER)   12:00:00     p_001      scooter      12.99
-(UPDATE_BEFORE)  12:00:00     p_002      basketball   23.11 
+(UPDATE_AFTER)   12:00:00     p_002      basketball   19.99
-(DELETE)         18:00:00     p_001      scooter      12.99 

この一連の変更を考慮して、スクーターの価格が時間の経過とともにどのように変化するかを追跡します。 カタログに追加された時は、最初は00:01:00で$11.11です。 その後、価格は12:00:00に$12.99に上昇し、18:00:00にカタログから削除されます。

様々な時点で様々な商品の価格をテーブルに問い合わせると、異なる結果が取得されます。10:00:00に、テーブルには1つの価格のセットが表示されます。

update_time  product_id product_name price
===========  ========== ============ ===== 
00:01:00     p_001      scooter      11.11
00:02:00     p_002      basketball   23.11

13:00:00,では、別の価格セットが見つかります。

update_time  product_id product_name price
===========  ========== ============ ===== 
12:00:00     p_001      scooter      12.99
12:00:00     p_002      basketball   19.99

バージョン管理されたテーブルソース #

バージョン管理されたテーブルは、基礎となるソースまたは形式が変更ログを直接定義するテーブルに対して、暗黙的に定義されます。 例には、upsert Kafkaソースとdebeziumcanalのようなデータベース変更ログが含まれます。 上で説明したように、唯一の追加条件はCREATEテーブル文にPRIMARY KEYとイベント時間属性が含まれている必要があることです。

CREATE TABLE products (
	product_id    STRING,
	product_name  STRING,
	price         DECIMAL(32, 2),
	update_time   TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,
	PRIMARY KEY (product_id) NOT ENFORCED,
	WATERMARK FOR update_time AS update_time
) WITH (...);

バージョン管理されたテーブルビュー #

Flinkは基礎となるクエリに一意のキー制約とイベント時間属性が含まれている場合に、バージョン管理されたビューの定義もサポートします。追加レートの追加専用テーブルを想像してみてください。

CREATE TABLE currency_rates (
	currency      STRING,
	rate          DECIMAL(32, 10),
	update_time   TIMESTAMP(3),
	WATERMARK FOR update_time AS update_time
) WITH (
	'connector' = 'kafka',
	'topic'	    = 'rates',
	'properties.bootstrap.servers' = 'localhost:9092',
	'format'    = 'json'
);

テーブルcurrency_ratesには、通貨ごとに行が含まれています。USD —については、レートが変更されるたびに新しい行を受け取ります。 JSON形式はネイティブの変更ログセマンティクスをサポートしないため、Flinkはこのテーブルを追加専用としてのみ読み取ることができます。

(changelog kind) update_time   currency   rate
================ ============= =========  ====
+(INSERT)        09:00:00      Yen        102
+(INSERT)        09:00:00      Euro       114
+(INSERT)        09:00:00      USD        1
+(INSERT)        11:15:00      Euro       119
+(INSERT)        11:45:00      Pounds     107
+(INSERT)        11:49:00      Pounds     108

Flinkは各行をテーブルへのINSERTとして解釈します。つまり、通過に対してPRIMARY KEYを定義できないことを意味します。 ただし、このテーブルにはバージョン管理されたテーブルを定義するために必要な情報がすべて含まれていることは、クエリ開発者にとって明らかです。 Flinkは、重複排除クエリを定義することでこのテーブルをバージョン管理されたテーブルとして再解釈できます。これにより、推論された主キー(通貨)とイベント時間(update time)を含む順序付けされた変更ログストリームが生成されます。

-- Define a versioned view
CREATE VIEW versioned_rates AS              
SELECT currency, rate, update_time              -- (1) `update_time` keeps the event time
  FROM (
      SELECT *,
      ROW_NUMBER() OVER (PARTITION BY currency  -- (2) the inferred unique key `currency` can be a primary key
         ORDER BY update_time DESC) AS rownum 
      FROM currency_rates)
WHERE rownum = 1; 

-- the view `versioned_rates` will produce a changelog as the following.
(changelog kind) update_time currency   rate
================ ============= =========  ====
+(INSERT)        09:00:00      Yen        102
+(INSERT)        09:00:00      Euro       114
+(INSERT)        09:00:00      USD        1
+(UPDATE_AFTER)  11:15:00      Euro       119
+(INSERT)        11:45:00      Pounds     107
+(UPDATE_AFTER)  11:49:00      Pounds     108

Flinkには、このクエリを後続のクエリでバージョン管理されたテーブルへに効率的に変換する特別な最適化ステップがあります。 一般的に、次の形式のクエリの結果は、バージョン管理されたテーブルを生成します:

SELECT [column_list]
FROM (
   SELECT [column_list],
     ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]
       ORDER BY time_attr DESC) AS rownum
   FROM table_name)
WHERE rownum = 1

パラメータの仕様:

  • ROW_NUMBER(): 一意の、各行で連続する1から始まる数値を割り当てます。
  • PARTITION BY col1[, col2...]: パーティション列、つまり重複排他キーを指定します。これらの列は後続のバージョン管理されたテーブルの主キーを形成します。
  • ORDER BY time_attr DESC: 順序列を指定します。時間属性の必要があります。
  • WHERE rownum = 1: The rownum = 1は、Flinkがこのクエリがバージョン管理されたテーブルを生成することを理解するために必要です。

Back to top

inserted by FC2 system