Determinism in Continuous Queries
This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.

連続クエリにおける決定性 #

この記事の内容は次の通りです:

  1. 決定性とは何ですか?
  2. 全てのバッチ処理は決定的ですか?
    • 非決定的な結果を伴うバッチクエリの2つの例
    • バッチ処理における非決定性
  3. ストリーミング処理での決定性
    • ストリーミングにおける非決定性
    • ストリーミングにおける非決定的な更新
    • ストリーミングクエリにおける非決定的な更新の影響を排除する方法

1. 決定性とは何ですか? #

SQL標準の決定性の説明を引用: ‘同じ入力値を使って繰り返した時に、そのオペレーションが確実に同じ結果を計算する場合、そのオペレーションは決定性があります’。

2. 全てのバッチ処理は決定性がありますか? #

古典的なバッチシナリオでは、特定の制限されたデータセットに対して同じクエリを繰り返し実行すると、一貫した結果が得られます。これは決定性の最も直截的な理解です。

ただし、実際には2つのクエリ例を見ると、バッチプロセスでも同じクエリが常に一貫した結果を返すわけではありません。

2.1非決定性の結果を伴うバッチクエリの2つの例 #

例えば、新しく作成されたwebsiteのクリックログのテーブルがあります:

CREATE TABLE clicks (
    uid VARCHAR(128),
    cTime TIMESTAMP(3),
    url VARCHAR(256)
)

Some new data were written:

+------+---------------------+------------+
|  uid |               cTime |        url |
+------+---------------------+------------+
| Mary | 2022-08-22 12:00:01 |      /home |
|  Bob | 2022-08-22 12:00:01 |      /home |
| Mary | 2022-08-22 12:00:05 | /prod?id=1 |
|  Liz | 2022-08-22 12:01:00 |      /home |
| Mary | 2022-08-22 12:01:30 |      /cart |
|  Bob | 2022-08-22 12:01:35 | /prod?id=3 |
+------+---------------------+------------+
  1. クエリ1は、ログテーブルに時間フィルタを適用し、過去2分間のログをフィルタで除外したいとします:
SELECT * FROM clicks
WHERE cTime BETWEEN TIMESTAMPADD(MINUTE, -2, CURRENT_TIMESTAMP) AND CURRENT_TIMESTAMP;

クエリが'2022-08-22 12:02:00’に送信されると、テーブル内の6行全てが返され、1分後の'2022-08-22 12:03:00’に再度実行されると、3つの項目のみが返されました:

+------+---------------------+------------+
|  uid |               cTime |        url |
+------+---------------------+------------+
|  Liz | 2022-08-22 12:01:00 |      /home |
| Mary | 2022-08-22 12:01:30 |      /cart |
|  Bob | 2022-08-22 12:01:35 | /prod?id=3 |
+------+---------------------+------------+
  1. クエリ2は返される各レコードに一意の識別子を追加したいとします(clicksテーブルには主キーがないためです)
SELECT UUID() AS uuid, * FROM clicks LIMIT 3;

このクエリの実行を続けて2回実行すると、各行に異なるuuid識別子が市政されます

-- first execution
+--------------------------------+------+---------------------+------------+
|                           uuid |  uid |               cTime |        url |
+--------------------------------+------+---------------------+------------+
| aaaa4894-16d4-44d0-a763-03f... | Mary | 2022-08-22 12:00:01 |      /home |
| ed26fd46-960e-4228-aaf2-0aa... |  Bob | 2022-08-22 12:00:01 |      /home |
| 1886afc7-dfc6-4b20-a881-b0e... | Mary | 2022-08-22 12:00:05 | /prod?id=1 |
+--------------------------------+------+---------------------+------------+

-- second execution
+--------------------------------+------+---------------------+------------+
|                           uuid |  uid |               cTime |        url |
+--------------------------------+------+---------------------+------------+
| 95f7301f-bcf2-4b6f-9cf3-1ea... | Mary | 2022-08-22 12:00:01 |      /home |
| 63301e2d-d180-4089-876f-683... |  Bob | 2022-08-22 12:00:01 |      /home |
| f24456d3-e942-43d1-a00f-fdb... | Mary | 2022-08-22 12:00:05 | /prod?id=1 |
+--------------------------------+------+---------------------+------------+

2.2バッチ処理の非決定性 #

バッチ処理での非決定性は、上記の2つのクエリ例のように、主に非決定性関数によって引き起こされます。ここで、組み込み関数CURRENT_TIMESTAMPUUID()は実際にバッチ処理では動作が異なります。クエリ例を続けます:

SELECT CURRENT_TIMESTAMP, * FROM clicks;

CURRENT_TIMESTAMPは返される全てのレコードで同じ値です

+-------------------------+------+---------------------+------------+
|       CURRENT_TIMESTAMP |  uid |               cTime |        url |
+-------------------------+------+---------------------+------------+
| 2022-08-23 17:25:46.831 | Mary | 2022-08-22 12:00:01 |      /home |
| 2022-08-23 17:25:46.831 |  Bob | 2022-08-22 12:00:01 |      /home |
| 2022-08-23 17:25:46.831 | Mary | 2022-08-22 12:00:05 | /prod?id=1 |
| 2022-08-23 17:25:46.831 |  Liz | 2022-08-22 12:01:00 |      /home |
| 2022-08-23 17:25:46.831 | Mary | 2022-08-22 12:01:30 |      /cart |
| 2022-08-23 17:25:46.831 |  Bob | 2022-08-22 12:01:35 | /prod?id=3 |
+-------------------------+------+---------------------+------------+

この違いは、FlinkがApache Calciteから関数の定義を継承していることによるもので、決定性関数(非決定性関数)と動的関数(動的関数、組み込み動的関数は主に一時的な関数です)以外の2種類の関数があります。 非決定性関数は実行時に実行されます(クラスタではレコードごとに評価されます)。一方動的関数はクエリ計画の生成時のみ対応する値を決定します(実行時には実行されず、異なる時間では異なる値が取得されますが、同じ実行では同じ値が取得されます)。 詳細については、システム(組み込み)関数の決定性を参照してください。

3. ストリーミング処理での決定性 #

ストリーミングとバッチの主な違いはデータが無制限であることです。Flink SQLは、ストリーミング処理を動的テーブルに対する連続クエリとして抽象化します。 したがって、バッチクエリの例での動的関数は、ストリーミング処理の非決定性関数と同等です(論理的には、ベーステーブル内の全ての変更がクエリの実行をトリガーします)。 この例のclicksログテーブルが、継続的に書き込まれるKafkaトピックからのものである場合、ストリームモードでの同じクエリは時間の経過とともに変化するCURRENT_TIMESTAMPを返します。

SELECT CURRENT_TIMESTAMP, * FROM clicks;

e.g,

+-------------------------+------+---------------------+------------+
|       CURRENT_TIMESTAMP |  uid |               cTime |        url |
+-------------------------+------+---------------------+------------+
| 2022-08-23 17:25:46.831 | Mary | 2022-08-22 12:00:01 |      /home |
| 2022-08-23 17:25:47.001 |  Bob | 2022-08-22 12:00:01 |      /home |
| 2022-08-23 17:25:47.310 | Mary | 2022-08-22 12:00:05 | /prod?id=1 |
+-------------------------+------+---------------------+------------+

3.1ストリーミングでの非決定 #

非決定性関数に加えて、非決定性を生み出す可能性があるその他の要因は次の通りです:

  1. ソースコネクタの非決定的な逆読み込み
  2. 処理時間に基づくクエリ
  3. TTLに基づく状態データの削除

ソースコネクタの非決定的な逆読み込み #

Flink SQLの場合、ユーザデータ自体は保存されないため、提供される決定性は計算のみに限定されます(したがって、ストリーミングモードで管理される内部状態とユーザデータ自体を区別する必要があります)。そのため、決定的な逆読み込みを提供できないソースコネクタの実装は、入力データが非決定的になり、非決定的な結果が生じる可能性があります。 一般的な例としては、同じオフセットからの複数の読み取りに対する一貫性のないデータや保持時間のせいで存在しなくなったデータのリクエストがあります(例えば、Kafkaトピックの設定されたttlを超えてリクエストされたデータ)。

処理時間に基づいたクエリ #

イベント時間と異なり、処理時間はマシーンのローカルタイムに基づいており、この処理には決定性がありません。時間属性に依存する関連するオペレーションには、ウィンドウ集計, Interval Join, Temporal Joinなどがあります。 もう1つの一般的なオペレーションはLookup Joinです。これは意味論的には処理時間に基づくTemporal Joinに似ており、アクセスされる外部データが時間経過とともに変化する場合に非決定性が生じます。

TTLに基づいた内部状態データの削除 #

ストリーミング処理の無制限の性質により、Regular JoinGroup Aggregation (non-windowed aggregation)のようなオペレーションで長時間実行されるストリーミングクエリによって維持される内部状態データは継続的に大きくなる可能性があります。 内部状態データをクリーンアップするために状態TTLを設定することは、多くの場合必要な妥協ですが、これにより計算結果が非決定的になる可能性もあります。

非決定性がクエリごとに及ぼす影響は異なりますが、一部のクエリでは非決定的な結果が生成されるだけです9クエリは正常に動作しますが、複数回実行すると一貫した結果が得られません)。一方で、一部のクエリでは、間違った結果または実行時エラーのようなより深刻な影響が生じる可能性があります。 後者の主な理由は’非決定的な更新’です。

3.2ストリーミングでの非決定的な更新 #

Flink SQLは、動的テーブルに対する継続クエリの抽象化に基づいた完全なインクリメント更新の仕組みを実装します。インクリメントメッセージを生成する必要がある全てのオペレーションは、完全な内部状態データを保持し、クエリパイプライン全体(ソースオペレータからシンクオペレータまでの完全なDAGを含む)のオペレーションは、オペレータ間の更新メッセージの正しい配送の保証に依存していて、エラーに繋がる非決定性によって壊れる可能性があります。

‘非決定性の更新’(NDU)とは何ですか? 更新メッセージ(changelog)には以下の種類のメッセージタイプが含まれる場合があります: Insert (I)、Delete (D)、Update_Before (UB)、Update_After (UA)。挿入専用のchangelogパイプラインではNDUの問題はありません。 更新メッセージ(Iに加えて少なくとも1つのメッセージ D, UB, UAを含む)がある場合、メッセージの更新キー(changelogの主キーと見なすことができます)は、クエリから推定されます:

  • 更新キーが推定できる場合、パイプラインのオペレータは更新キーによって内部状態を維持します。
  • when the update key cannot be deduced (it is possible that the primary key is not defined in the CDC source table or Sink table, or some operations cannot be deduced from the semantics of the query). 内部状態を保持する全てのオペレータは、完全な行を介して更新(D/UB/UA)メッセージを処理でき、主キーが定義されていない場合シンクノードはリトラクトモードで動作し、削除オペレーションは完全な行によって実行されます。

したがって、行ごとの更新モードでは、状態を保持する必要があるオペレータによって受信される全ての更新メッセージは非決定性カラム値によって干渉されることはありません。それ以外の場合は計算エラーを引き起こすNDU問題が発生します。 更新目瀬sージを含むクエリパイプラインで更新キーを取得できない場合、以下の3つの点がNDU問題の最も重要な原因となります:

  1. 非決定性関数(スカラー関数、table関数、集計関数、組み込み関数、独自の関数を含む)
  2. LookupJoin on an evolving source
  3. CDC sourceは、メータデータ(システムカラム、エンティティ行自体には属しません)を運びます。

注意: TTLに基づいて内部状態データをクリーニングすることによって発生する例外は、ランタイムの耐障害性処理戦略として別途説明します(FLINK-24666)。

3.3ストリーミングでの非決定性更新の影響を排除する方法 #

ストリーミングクエリでのNDU問題は通常直感的ではなく、複雑なクエリの小さな変更から問題のリスクが発生する可能性があります。

1.16以降、Flink SQL (FLINK-27849)には実験的なNDU処理の仕組み’table.optimizer.non-deterministic-update.strategy’が導入されました。TRY_RESOLVEモードが有効な場合、ストリーミングにNDU問題があるかどうか確認し、まだ要因がある場合はLookup Join(内部実体化が追加されます)によって生成されたNDU問題の除去を試みます。上記のポイント1または3では、自動的に削除することはできませんん。Flink SQLは、非決定性の導入を避けるためにSQLを調整するようにユーザに促す詳細なエラーメッセージを表示します(実体化によってもたらされるオペレータの高コストと複雑性を考慮すると、対応する自動解決の仕組みはまだサポートされていません)。

最善の実践 #

  1. ストリーミングクエリを実行する前にTRY_RESOLVEモードを有効にします。クエリに解決できないNDU問題があることを確認したら、問題を事前に回避するためにエラープロンプトに従ってSQLを変更してください。

FLINK-27639による実際のケース:

INSERT INTO t_join_sink
SELECT o.order_id, o.order_name, l.logistics_id, l.logistics_target, l.logistics_source, now()
FROM t_order AS o
LEFT JOIN t_logistics AS l ON ord.order_id=logistics.order_id

デフォルトで生成される実行計画はランタイムエラーで実行されます。TRY_RESOLVEモードが有効な場合、次のエラーが発生します:

org.apache.flink.table.api.TableException: The column(s): logistics_time(generated by non-deterministic function: NOW ) can not satisfy the determinism requirement for correctly processing update message(changelogMode contains not only insert ‘I’).... これらの非決定性カラムを削除するか、決定性カラムにすることを検討してください。

related rel plan:
Calc(select=[CAST(order_id AS INTEGER) AS order_id, order_name, logistics_id, logistics_target, logistics_source, CAST(NOW() AS TIMESTAMP(6)) AS logistics_time], changelogMode=[I,UB,UA,D], upsertKeys=[])
+- Join(joinType=[LeftOuterJoin], where=[=(order_id, order_id0)], select=[order_id, order_name, logistics_id, logistics_target, logistics_source, order_id0], leftInputSpec=[JoinKeyContainsUniqueKey], rightInputSpec=[HasUniqueKey], changelogMode=[I,UB,UA,D], upsertKeys=[])
   :- Exchange(distribution=[hash[order_id]], changelogMode=[I,UB,UA,D], upsertKeys=[{0}])
   :  +- TableSourceScan(table=[[default_catalog, default_database, t_order, project=[order_id, order_name], metadata=[]]], fields=[order_id, order_name], changelogMode=[I,UB,UA,D], upsertKeys=[{0}])
   +- Exchange(distribution=[hash[order_id]], changelogMode=[I,UB,UA,D], upsertKeys=[])
      +- TableSourceScan(table=[[default_catalog, default_database, t_logistics, project=[logistics_id, logistics_target, logistics_source, order_id], metadata=[]]], fields=[logistics_id, logistics_target, logistics_source, order_id], changelogMode=[I,UB,UA,D], upsertKeys=[{0}])

エラープロンプトに従って、now()関数を削除するか、代わりに他の決定性関数を使います(あるいは、orderテーブルのtimeフィールドを使います)。上記のNDU問題を解決し、自国時エラーを回避します。

  1. Lookup Joinを使う場合は、主キー(存在する場合)を宣言してください。主キー定義を含むソーステーブルを検査すると、多くの場合Flink SQLに更新キーの導出が避けられ、高い実行化コストが節約されます。

以下の2つの例は、ユーザがソーステーブルのルックアップのために主キーを宣言することが推奨される理由を示しています:

insert into sink_with_pk
select t1.a, t1.b, t2.c
from (
  select *, proctime() proctime from cdc
) t1 
join dim_with_pk for system_time as of t1.proctime as t2
   on t1.a = t2.a
   
-- plan: the upsertKey of left stream is reserved when lookup table with a pk definition and use it as lookup key, so that the high cost materialization can be omitted.
Sink(table=[default_catalog.default_database.sink_with_pk], fields=[a, b, c])
+- Calc(select=[a, b, c])
   +- LookupJoin(table=[default_catalog.default_database.dim_with_pk], joinType=[InnerJoin], lookup=[a=a], select=[a, b, a, c])
      +- DropUpdateBefore
         +- TableSourceScan(table=[[default_catalog, default_database, cdc, project=[a, b], metadata=[]]], fields=[a, b])   
insert into sink_with_pk
select t1.a, t1.b, t2.c
from (
  select *, proctime() proctime from cdc
) t1 
join dim_without_pk for system_time as of t1.proctime as t2
   on t1.a = t2.a

-- execution plan when `TRY_RESOLVE` is enabled(may encounter errors at runtime when `TRY_RESOLVE` mode is not enabled):
Sink(table=[default_catalog.default_database.sink_with_pk], fields=[a, b, c], upsertMaterialize=[true])
+- Calc(select=[a, b, c])
   +- LookupJoin(table=[default_catalog.default_database.dim_without_pk], joinType=[InnerJoin], lookup=[a=a], select=[a, b, a, c], upsertMaterialize=[true])
      +- TableSourceScan(table=[[default_catalog, default_database, cdc, project=[a, b], metadata=[]]], fields=[a, b])

TRY_RESOLVEが有効な場合、2番目のケースはマテリアル化を追加することで解決できますが、コストが非常に高く、主キーを使ったマテリアル化と比較して、コストのかかるマテリアル化が2つ多くなります。

  1. Lookup Joinでアクセスされるlookupソーステーブルが静的な場合、TRY_RESOLVEモードが有効にならない場合があります。Lookup Joinが静的lookupソーステーブルにアクセスする場合は、最初にTRY_RESOLVEモードをオンにして、他にNDU問題がないことを確認し、不要な具体化のオーバーヘッドを避けるためにIGNOREモードを復元します。 注意: ここでは、lookupソーステーブルが純粋に静的で更新されていないことを確認する必要があります。そうでない場合、IGNOREモードは安全ではありません。

Back to top

inserted by FC2 system