This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.
動的テーブル #
SQL - とTable API - は、リアルタイムのデータ処理のための柔軟で強力な機能を提供します。 このページでは、リレーショナルの概念がどのようにストリーミングに優雅に変換されるかを説明し、Flinkが無制限のストリームでも同じセマンティクスを実現できるかについて説明します。
データストリームのリレーショナルクエリ #
以下の表は、入力データ、自国、出力結果に対する従来のリレーショナル代数とストリーム処理を比較しています。
リレーショナル代数 / SQL | ストリーム処理 |
---|---|
リレーショナル(またはテーブル)は、タプルの制限された(複数の)セットです。 | ストリームはタプルの無限のシーケンスです。 |
バッチデータ(リレーショナルデータベースのテーブルなど)に対して実行されるクエリは、完全な入力データにアクセスできます。 | ストリーミングクエリは、開始時に全てのデータにアクセスできないため、データがストリーミングされるまで"待つ"必要があります。 |
バッチクエリは固定サイズの結果を生成した後に終了します。 | ストリーミングクエリは受信したレコードに基づいて結果を継続的に更新し、完了することはありません。 |
これらの違いに関わらず、リレーショナルクエリとSQLはストリームを処理するための強力なツールセットを提供します。高度なデータベースシステムはマテリアル化されたビューと呼ばれる機能を提供します。マテリアル化されたビューは、通常の仮想ビューと同様にSQLクエリとして定義されます。仮想ビューとは対照的にマテリアル化されたビューはクエリ結果をキャッシュするため、アクセス時にクエリを評価する必要はありません。キャッシュに関する一般的な課題は、キャッシュが古い結果を提供しないようにすることです。マテリアル化されたビューは、その定義クエリのベーステーブルが変更されると廃止されます。Eager Viewメンテナンスは、ベーステーブルが更新されるとすぐにマテリアル化ビューを更新する手法です。
以下の点を考慮すると、eager viewメンテナンスとSQLクエリの関係が明らかになります:
- データベーステーブルは、
INSERT
、UPDATE
、DELETE
DMLステートメントのストリームから生成されるデータベーステーブルは、変更ログストリームと呼ばれることが多々あります。 - マテリアル化ビューはSQLクエリとして定義されます。ビューを更新するには、クエリでビューの基本リレーションの変更ログストリームを継続的に処理する必要があります。
- マテリアル化ビューはストリーミングSQLクエリの結果です。
これらの点を念頭に置いて、次のセクションで動的テーブルの概念を紹介します。
動的テーブル & 継続的なクエリ #
動的テーブルは、FlinkのテーブルAPIとストリーミングデータのSQLサポートの主要な概念です。バッチデータを表す静的なテーブルとは対照的に、動的テーブルは時間の経過とともに変化します。ただし、静的バッチテーブルと同様に、システムは動的テーブルに対してクエリを実行できます。動的テーブルをクエリすると、継続的クエリが生成されます。継続的なクエリは決して終了せず、動的な結果、つまり他の動的なテーブルが生成されます。クエリは(動的)入力テーブルの変更を反映するために、(動的)結果テーブルを継続的に更新します。基本的に、動的テーブルに対する継続的なクエリは、マテリアル化ビューを定義するクエリと非常によく似ています。
継続的なクエリの出力は、入力テーブルのスナップショットに対してバッチモードで実行された同じクエリの結果と常に意味論的に同等であることに注意することが重要です。
以下の図はストリーム、動的テーブル、連続的なクエリの関係を視覚化したものです。
- ストリームは動的なテーブルに変換されます。
- 継続的なクエリは動的テーブルで評価され、新しい動的なテーブルが生成されます。
- 結果の動的なテーブルはストリームに再度変換されます。
動的なテーブルは何よりも論理的な概念です。動的なテーブルはクエリの実行中に必ずしも(完全に)実体化されるわけではありません。
以下では、次のスキーマを持つクリックイベントのストリームを使った動的なテーブルと継続的なクエリの概念について説明します。
CREATE TABLE clicks (
user VARCHAR, -- the name of the user
url VARCHAR, -- the URL that was accessed by the user
cTime TIMESTAMP(3) -- the time when the URL was accessed
) WITH (...);
ストリームのテーブルの定義 #
リレーショナルクエリを使ってストリームを処理するには、ストリームをTable
に変換する必要があります。概念的には、ストリームの各レコードは結果のテーブルに対するINSERT
変更として解釈されます。INSERT
のみの変更ログストリームからテーブルを作成しています。
以下の図は、クリックイベント(左側)のストリームがどのようにテーブル(右側)に変換されるかを視覚化しています。クリックストリームのレコードが挿入されると、結果のテーブルは継続的に増加します。
ストリームで定義されたテーブルは内部的に実体化されないことに注意してください。
継続的なクエリ #
継続的なクエリは動的なテーブルで評価され、結果として新しい動的なテーブルを生成します。バッチクエリとは対照的に、連続クエリは終了せず、入力テーブルの更新に従って結果テーブルを更新します。連続的なクエリはどの時点でも、入力テーブルのスナップショットに対してバッチモードで実行された同じクエリの結果と意味論的に同等です。
以下では、クリックイベントのストリームで定義されたclicks
テーブルに対する2つのクエリの例を示します。
最初のクエリは単純なGROUP-BY COUNT
集計クエリです。clicks
テーブルをuser
フィールドでグループ化し、訪問されたURLの数をカウントします。以下の図は、clicks
テーブルが追加の行で更新されるにつれて、クエリが時間の経過とともにどのように評価されるかを示しています。
クエリが開始されると、clicks
テーブル(左側)は空です。クエリは最初の行が挿入された時に結果テーブルを計算します。最初の行[Mary, ./home]
が到着すると、結果テーブル(右側、上部)は1つの行[Mary, 1]
で構成されます。2番目の行[Bob, ./cart]
がclicks
テーブルに挿入されると、クエリは結果テーブルを更新し、新しい行[Bob, 1]
を挿入します。3行目の[Mary, ./prod?id=1]
は、[Mary, 1]
が[Mary, 2]
に更新される、すでに計算された結果の行の更新を生成します。最後に、4行目の行がclicks
テーブルに追加される時に、クエリは3行目[Liz, 1]
を結果テーブルに挿入します。
2番目のクエリは最初のクエリに似ていますが、URLの数を数える前に、user
属性に加えてclicks
テーブルを時間ごとのタンブリングウィンドウ上でもグループ化します。(特別な[時間属性]/docs/dev/table/concepts/time_attributes/に基づくウィンドウなどの時間ベースの計算は後で説明します)。繰り返しますが、この図は動的テーブルの変化する性質を視覚化するために、異なる時点の入力と出力を示しています。
前と同様に、入力テーブルclicks
は左側に表示されます。クエリは継続的に結果を1時間ごとに計算し、結果テーブルを更新します。クリックテーブルには、12:00:00
から12:59:59
までのタイムスタンプ(cTime
)を持つ4つの行が含まれています。クエリはこの入力から2つの結果行(user
ごとに1つ)を計算し、それらを結果テーブルに追加します。13:00:00
から13:59:59
までの次のウィンドウについては、clicks
テーブルには3つの行が含まれており、その結果、結果テーブルにさらに2つの行が追加されます。時間経過とともに、さらに多くの行がclicks
に追加され、結果テーブルが更新されます。
更新クエリと追加クエリ #
2つのクエリの例は非常に似ているように見えますが(どちらもグルプ化されたカウントの集計を計算します)、次の1つの重要な点で異なります:
- 最初のクエリは、以前に発行された結果を更新します。つまり、結果テーブルの変更ログストリームは
INSERT
とUPDATE
の変更のみで構成されます。 - 2つ目のクエリは結果のテーブルにのみ追加します。つまり、結果テーブルの変更ログストリームは
INSERT
の変更のみで構成されます。
クエリが追加専用テーブルを生成するか、更新されたテーブルを生成するかには、いくつかの言外の意味があります:
- 更新を変更するクエリは通常、より多くの状態を維持する必要があります(以下のセクションを参照してください)。
- 追加専用テーブルのストリームへの変換は、更新されたテーブルの変換とは異なります(テーブルからストリームへの変換セクションを参照してください)。
クエリの制限 #
全てではありませんが、意味論的に有効なクエリの多くはストリーム上の連続クエリとして評価できます。一部のクエリは、維持する必要がある状態のサイズが大きいため、または更新の計算にコストがかかりすぎるため、計算コストが高すぎます。
- 状態サイズ: 連続クエリは無制限のストリームで評価され、多くの場合数週間または数か月にわたって実行されることが想定されています。したがって、連続クエリで処理されるデータの総量は非常に大きくなる可能性があります。以前に出力された結果を更新する必要があるクエリは、それらを更新するために出力された全ての行を保持する必要があります。例えば、最初のクエリの例では、各ユーザのURLカウントを保持してカウントを増やし、入力テーブルが新しい行を受け取った時に新しい結果を送信する必要があります。登録ユーザのみが追跡される場合、維持するカウント数はそれほど多くない可能性があります。ただし、未登録ユーザが一意のユーザ名を割り当てられると、維持するカウント数は時間の経過とともに増加し、結果的にクエリが失敗する可能性があります。
SELECT user, COUNT(url)
FROM clicks
GROUP BY user;
- 更新の計算: 一部のクエリでは、入力レコードが1つだけ追加または更新された場合でも、出力された結果行の大部分を再計算して更新する必要があります。そのようなクエリは、連続クエリとして実行するのにはあまり適していません。例としては、最後のクリックの時間に基づいて各ユーザの
RANK
を計算する次のクエリがあります。clicks
テーブルが新しい行を受け取るとすぐにユーザのlastAction
が更新され、新しいランクが計算されます。ただし、2つの行は同じランクを持つことはできないため、下位のランクの行も全て更新する必要があります。
SELECT user, RANK() OVER (ORDER BY lastAction)
FROM (
SELECT user, MAX(cTime) AS lastAction FROM clicks GROUP BY user
);
クエリ設定ページでは、連続クエリの実行を制御するパラメータについて説明します。一部のパラメータを使うと、維持される状態のサイズを犠牲にして結果の精度を得ることができます。
テーブルからストリームへの変換 #
動的テーブルは、通常のデータベーステーブルと同様に、INSERT
、UPDATE
、DELETE
の変更によって継続的に変更できます。常に更新される1行のテーブル、UPDATE
やDELETE
の変更がない挿入専用テーブル、またはその中間のテーブルが考えられます。
動的テーブルとストリームに変換したり、それを外部システムに書き込むが会い、これらの変更はエンコードをされる必要があります。FlinkのTable APIとSQLは、動的テーブルの変更をエンコードする3つの方法をサポートします:
-
追加専用ストリーム:
INSERT
の変更によってのみ変更される動的テーブルは、挿入された行を発行することでストリームに変更できます。 -
ストリームの取り消し: 取り消しストリームは、メッセージの追加とメッセージの取り消しの2種類のメッセージを含むストリームです。動的テーブルは、
INSERT
変更を追加メッセージとして、DELETE
変更を取り消しメッセージとして、UPDATE
変更を更新された(前の)行に対する取り消しメッセージと更新中の(新しい)行に対する追加のメッセージとして、エンコーディングすることで取り消しストリームに変換されます。次の図は、動的テーブルの取り消しストリームへの変換を視覚化しています。
- Upsertストリーム: upsertストリームは、upsertメッセージとdeleteメッセージの2つの種類のストリームです。upsertストリームに変換される動的テーブルには、(おそらく複合)ユニークキーが必要です。一意のキーを持つ動的テーブルは、
INSERT
とUPDATE
の変更をupsertメッセージとしてエンコード、DELETE
の変更をdeleteメッセージとしてエンコードすることで、ストリームに変換されます。オペレータを消費するストリームは、メッセージを正しく適用するために、ユニークなキー属性を認識している必要があります。取り消しストリームとの主な違いは、UPDATE
の変更が1つのメッセージでエンコードされるため、より効率的であることです。次の図は、動的テーブルのupsertストリームへの変換を視覚化しています。
動的テーブルをDataStream
に変換するAPIについては、共通概念のページで説明されています。動的テーブルをDataStream
に変換する場合は、追加と取り消しストリームのみがサポートされていることに注意してください。動的テーブルを外部システムに出力するためのTableSink
インタフェースについては、TableSourcesとTableSinksのページで説明されています。