This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.
Temporal Table Function
一時的なテーブル関数 #
一時的なテーブル関数は、特定の時点での一時的なテーブルのバージョンへのアクセスを提供します。 一時的なテーブル内のデータにアクセスするいは、返されるテーブルのバージョンを決定する時間属性を渡す必要があります。 Flinkはテーブル関数のSQL構文を使ってそれを表現する方法を提供します。
バージョン管理されたテーブルと異なり、一時的なテーブル関数は追加専用ストリーム上でのみ定義できます; 変更ログ入力はサポートされていません。 さらに、一時的なテーブル関数は純粋なSQL DDLでは定義できません。
一時的なテーブル関数の定義 #
一時的なテーブル関数は、Table APIを使って追加専用ストリーム上に定義できます。 テーブルには1つ以上のキー列と、バージョン管理で使われる時間属性が登録されます。
一時的なテーブル関数として登録したい通貨レートの追加専用テーブルがあるとします。
SELECT * FROM currency_rates;
update_time currency rate
============= ========= ====
09:00:00 Yen 102
09:00:00 Euro 114
09:00:00 USD 1
11:15:00 Euro 119
11:49:00 Pounds 108
Table APIを使って、キーとしてcurrency
を使い、バージョン管理時間属性としてupdate_time
を使って、このストリームを登録できます。
TemporalTableFunction rates = tEnv
.from("currency_rates")
.createTemporalTableFunction("update_time", "currency");
tEnv.createTemporarySystemFunction("rates", rates);
rates = tEnv
.from("currency_rates")
.createTemporalTableFunction("update_time", "currency")
tEnv.createTemporarySystemFunction("rates", rates)
また、Python APIではサポートされません。
一時的なテーブル関数のjoin #
一度定義すると、一時的なテーブル関数は標準のテーブル関数として使われます。 追加専用テーブル(左側の入力/プローブ側)は、一時的なテーブル(右側の入力/ビルド側)と結合できます。つまり、時間の経過とともに変化し、その変化を追跡して特定の時点でのキーの値を取得するテーブルです。
顧客の注文を様々な通貨で追跡する追加専用テーブルorders
を考えてみましょう。
SELECT * FROM orders;
order_time amount currency
========== ====== =========
10:15 2 Euro
10:30 1 USD
10:32 50 Yen
10:52 3 Euro
11:04 5 USD
これらのテーブルを考慮して、注文を共通の通貨 — USDに変換したいとします。
SELECT
SUM(amount * rate) AS amount
FROM
orders,
LATERAL TABLE (rates(order_time))
WHERE
rates.currency = orders.currency
Table result = orders
.joinLateral(call("rates", $("o_proctime")), $("o_currency").isEqual($("r_currency")))
.select($("(o_amount").times($("r_rate")).sum().as("amount"));
val result = orders
.joinLateral($"rates(order_time)", $"orders.currency = rates.currency")
.select($"(o_amount * r_rate).sum as amount"))
また、Python APIではサポートされません。