Time Attributes
This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.

時間属性 #

Flinkは様々な時間の概念に基づいてデータを処理できます。

  • 処理時間は、それぞれのオペレーションを実行するマシンのシステム時間(エポック時間とも呼ばれます。JavaのSystem.currentTimeMillis())を指します。
  • イベント時間は各行に付加されたタイムスタンプに基づいたストリーミングデータの処理を指します。タイムスタンプはいつイベントが発生したかをエンコードできます。

Flinkでの時間の処理の詳細については、イベント時間とウォーターマークの紹介を参照してください。

時間属性の紹介 #

時間属性は各テーブルスキーマの一部にすることができます。 これらはテーブルをCREATE TABLE DDLDataStreamから作成する時に定義されます。 時間属性が定義されると、フィールドとして参照し、時間ベースのオペレーションで使うことができます。 時間属性が変更されず、単にクエリのある部分から別の部分に転送される限り、それは有効な時間属性のままです。 時間属性は通常のタイムスタンプと同様に動作し、計算のためにアクセスできます。 計算で使う場合、時間属性が具体化され、標準のタイムスタンプとして機能します。 ただし、通常のタイムスタンプを時間属性の代わりに使ったり、時間属性に変換したりすることはできません。

イベントタイム #

イベント時間によりテーブルプログラムが全てのレコードのタイムスタンプに基づいて結果を生成できるため、イベントの順序が狂っていたり遅れたりしても、一貫した結果を得られます。また、永続ストレージからレコードを読み取る時に、テーブルプログラムの結果の再生可能性も保証されます。

さらに、イベント時間により、バッチ環境とストリーミング環境の両方でテーブルプログラムの構文を統一できます。ストリーミング環境の時間属性はバッチ環境の行の通常の列にすることができます。

順不同のイベントを処理し、ストリーミングで定刻通りのイベントと遅延したイベントを区別するには、Flinkは各行のタイムスタンプを知る必要があり、またイベント時間で処理がこれまでどれくらい進んでいるかを定期的に示す必要もあります(いわゆるウォーターマーク経由)。

イベント時間属性はCREATEテーブルDDL、またはDataStreamからテーブルへの変換中に定義できます。

DDLでの定義 #

イベント時間はCREATEテーブルDDLのWATERMARK文を使って定義されます。ウォーターマーク文は既存のイベント時間フィールドにウォーターマーク生成式を定義し、イベント時間属性としてイベント時間フィールドをマークします。ウォーターマーク文とウォーターマーク戦略の詳細については、CREATE TABLE DDLを参照してください。

FlinkはTIMESTAMP列とTIMESTAMP_LTZ列でのイベント時間属性の定義をサポートします。 ソースのタイムスタンプデータが年-月-日-時-分-秒として表される場合、通常はタイムゾーン情報の無い文字列値です。例えば、2020-04-15 20:13:40.564。イベント時間属性をTIMESTAMP列として定義することをお勧めします::

CREATE TABLE user_actions (
  user_name STRING,
  data STRING,
  user_action_time TIMESTAMP(3),
  -- declare user_action_time as event time attribute and use 5 seconds delayed watermark strategy
  WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND
) WITH (
  ...
);

SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);

ソースのタイムスタンプデータがエポック時間として表される場合、通常はlong値、例えば、1618989564564です。イベント時間属性をTIMESTAMP_LTZ列として定義することをお勧めします:

CREATE TABLE user_actions (
  user_name STRING,
  data STRING,
  ts BIGINT,
  time_ltz AS TO_TIMESTAMP_LTZ(ts, 3),
  -- declare time_ltz as event time attribute and use 5 seconds delayed watermark strategy
  WATERMARK FOR time_ltz AS time_ltz - INTERVAL '5' SECOND
) WITH (
  ...
);

SELECT TUMBLE_START(time_ltz, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
GROUP BY TUMBLE(time_ltz, INTERVAL '10' MINUTE);

高度なウォーターマーク機能 #

以前のバージョンでは、ウォーターマークの多くの高度な機能(ウォーターマークの配置など)はデータストリームAPIを通じて簡単に使えましたが、sqlで使うのはそれほど簡単ではありませんでした。ですので、バージョン1.18でこれらの機能を拡張し、ユーザがsqlで使えるようにしました。

注意: SupportsWatermarkPushDownインタフェースを実装するソースコネクタ(例えば、Kafka、pulsar)だけがこれらの高度な機能を使えます。ソースがSupportsWatermarkPushDownインタフェースを実装しないが、タスクがこれらのパラメータで設定されている場合、タスクは通常通りに実行できますが、これらのパラメータは有効になりません。

これらの機能は全て動的なテーブルオプションまたは’OPTIONS’ヒントを使って設定できます。ユーザが動的テーブルオプションと’OPTIONS’ヒントの両方でこれらの機能を設定した場合、‘OPTIONS’ヒントが優先されます。ユーザが同じソーステーブルに対して複数の場所で’OPTIONS’ヒントを使う場合、最初のヒントが使われます。

I. ウォーターマーク発行戦略の設定 #

flinkでウォーターマークを発行するには2つの戦略があります:

  • on-periodic: 定期的にウォーターマークを発行。
  • on-event: イベントごとにウォーターマークを発行

DataStream APIでは、ユーザはWatermarkGeneratorインタフェース(Writing WatermarkGenerators)を介して発行戦略を選択できます。sqlタスクの場合、ウォーターマークはデフォルトで定期的に発行されます。デフォルトの周期は200msで、パラメータpipeline.auto-watermark-intervalで変更できます。イベントごとにウォーターマークを発行する必要がある場合、ソーステーブルで次のように設定できます:

-- configure in table options
CREATE TABLE user_actions (
  ...
  user_action_time TIMESTAMP(3),
  WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND
) WITH (
  'scan.watermark.emit.strategy'='on-event',
  ...
)

もちろん、OPTIONSヒントを使うこともできます:

-- use 'OPTIONS' hint
select ... from source_table /*+ OPTIONS('scan.watermark.emit.strategy'='on-periodic') */
II. ソーステーブルのアイドルタイムアウトの設定 #

ソーステーブルの分割/パーティション/シャードがしばらくの間イベントデータを送信しない場合、WatermarkGeneratorもウォーターマークを生成するための新しいデータを取得しないことを意味します。そのようなデータソースをアイドル入力またはアイドルソースと呼びます。この場合、ほかのパーティションがイベントデータを送信している場合に問題が起きます。ダウンストリームオペレータのウォーターマークが全てのアップストリーム並列データソースのウォーターマークの最小値を取得して計算され、アイドル状態の分割/パーティション/シャードが新しいウォーターマークを生成しないため、ダウンストリームのオペレータのウォーターマークは変更されません。あtだし、アイドルタイムアウトが設定された場合、タイムアウト内にイベントデータが送信されない場合は分割/パーティション/シャードがアイドルとしてマークされ、ダウンストリームはウォーターマークを計算する時にこのアイドルソースを無視します。

グローバルアイドルタイムは、table.exec.source.idle-timeoutパラメータを使ってsqlで定義でき、これはソーステーブルごとに有効になります。ただし、各ソーステーブルに異なるアイドルタイムアウトを設定したい場合は、次のようにパラメータscan.watermark.idle-timeoutによってソーステーブルに設定できます:

-- configure in table options
CREATE TABLE user_actions (
  ...
  user_action_time TIMESTAMP(3),
  WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND
) WITH (
  'scan.watermark.idle-timeout'='1min',
  ...
).

あるいは、OPTIONSヒントを使えます:

-- use 'OPTIONS' hint
select ... from source_table /*+ OPTIONS('scan.watermark.idle-timeout'='1min') */

ユーザがパラメータtable.exec.source.idle-timeoutとパラメータscan.watermark.idle-timeoutの両方を使ってソースアイドルタイムアウトを設定している場合、パラメータscan.watermark.idle-timeoutが優先されます。

III. ウォータマークの配置 #

データの分散やマシーンの付加などの様々な要因を受けて、消費率は、同じデータソースまたは異なるデータソースの異なる分割/パーティション/シャード間で異なる場合があります。ダウンストリームにステートフルなオペレータが幾つかある場合、これらのオペレータはより高速に消費するオペレータのためにより多くのデータを状態にキャッシュし、より低速に消費するオペレータを待つ必要があり、状態は非常に大きくなる可能性があります; 消費率が一貫していない場合さらに深刻にデータが不規則になり、ウィンドウの計算精度に影響を与える可能性があります。これらのシナリオは、ウォーターマークの配置の機能を使って、高速な分割/パーティション/シャードのウォーターマークが他の分割/パーティション/シャードと比較して急激に増加しないようにすることで回避できます。ウォーターマークの配置の機能は、異なるソース間でのデータ消費量の違いに応じてタスクのパフォーマンスに影響を与えることに注意してください。

ウォーターマーク配置は次のようにソーステーブルで設定できます

-- configure in table options
CREATE TABLE user_actions (
...
user_action_time TIMESTAMP(3),
  WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND
) WITH (
'scan.watermark.alignment.group'='alignment-group-1',
'scan.watermark.alignment.max-drift'='1min',
'scan.watermark.alignment.update-interval'='1s',
...
).

もちろん、OPTIONSヒントを使い続けることもできます。

-- use 'OPTIONS' hint
select ... from source_table /*+ OPTIONS('scan.watermark.alignment.group'='alignment-group-1', 'scan.watermark.alignment.max-drift'='1min', 'scan. watermark.alignment.update-interval'='1s') */

3つのパラメータがあります:

  • scan.watermark.alignment.group 配置グループ名を設定します。同じグループ内のデータソースが配置されます。
  • scan.watermark.alignment.max-drift分割/パーティション/シャードに許可される配置時間からの偏差の最大を設定します。
  • scan.watermark.alignment.update-interval 配置時間を計算する頻度を設定します。必須ではありません。デフォルトは1秒です。
注意: コネクタは、FLIP-217に従って1.17以降のウォーターマーク配置機能を使うためにソース分割のウォーターマーク配置を実装する必要があります。ソースコネクタがFLIP-217を実装しない場合、タスクはエラーになり、ユーザはpipeline.watermark-alignment.allow-unaligned-source-splits: trueを設定してソース分割のウォーターマーク配置を無効にできます。分割数がソースオペレータの並列度と同じ場合のみウォーターマーク配置が適切に機能します。

DataStreamからTableへの変換 #

DataStreamをテーブルに変換する場合、スキーマ定義中にイベント時間属性を.rowtimeプロパティを使って定義できます。タイムスタンプとウォーターマークは、変換されるDataStreamに既に割り当てられている場合があります。DataStreamにはタイムゾーンの概念んがなく、全てのイベント時間値をUTCとして扱うため、変換中Flinkは常にrowtime属性をTIMESTAMP WITHOUT TIME ZONEとして算出します。

DataStreamTableに変換する場合、時間属性を定義する方法は2つあります。指定された.rowtimeフィールド名がDataStreamに存在するか同課に応じて、タイムスタンプは、(1) 新しい列として追加される、(2) 既存の列を置き換えます。

どちらの場合でも、イベント時間のタイムスタンプフィールドはDataStreamイベント時間のタイムスタンプの値を保持します。

// Option 1:

// extract timestamp and assign watermarks based on knowledge of the stream
DataStream<Tuple2<String, String>> stream = inputStream.assignTimestampsAndWatermarks(...);

// declare an additional logical field as an event time attribute
Table table = tEnv.fromDataStream(stream, $("user_name"), $("data"), $("user_action_time").rowtime());


// Option 2:

// extract timestamp from first field, and assign watermarks based on knowledge of the stream
DataStream<Tuple3<Long, String, String>> stream = inputStream.assignTimestampsAndWatermarks(...);

// the first field has been used for timestamp extraction, and is no longer necessary
// replace first field with a logical event time attribute
Table table = tEnv.fromDataStream(stream, $("user_action_time").rowtime(), $("user_name"), $("data"));

// Usage:

WindowedTable windowedTable = table.window(Tumble
       .over(lit(10).minutes())
       .on($("user_action_time"))
       .as("userActionWindow"));
// Option 1:

// extract timestamp and assign watermarks based on knowledge of the stream
val stream: DataStream[(String, String)] = inputStream.assignTimestampsAndWatermarks(...)

// declare an additional logical field as an event time attribute
val table = tEnv.fromDataStream(stream, $"user_name", $"data", $"user_action_time".rowtime)


// Option 2:

// extract timestamp from first field, and assign watermarks based on knowledge of the stream
val stream: DataStream[(Long, String, String)] = inputStream.assignTimestampsAndWatermarks(...)

// the first field has been used for timestamp extraction, and is no longer necessary
// replace first field with a logical event time attribute
val table = tEnv.fromDataStream(stream, $"user_action_time".rowtime, $"user_name", $"data")

// Usage:

val windowedTable = table.window(Tumble over 10.minutes on $"user_action_time" as "userActionWindow")
# Option 1:

# extract timestamp and assign watermarks based on knowledge of the stream
stream = input_stream.assign_timestamps_and_watermarks(...)

table = t_env.from_data_stream(stream, col('user_name'), col('data'), col('user_action_time').rowtime)

# Option 2:

# extract timestamp from first field, and assign watermarks based on knowledge of the stream
stream = input_stream.assign_timestamps_and_watermarks(...)

# the first field has been used for timestamp extraction, and is no longer necessary
# replace first field with a logical event time attribute
table = t_env.from_data_stream(stream, col("user_action_time").rowtime, col('user_name'), col('data'))

# Usage:

table.window(Tumble.over(lit(10).minutes).on(col("user_action_time")).alias("userActionWindow"))

処理時間 #

処理時間により、テーブルプログラムはローカルマシーンの時間に基づいて結果を生成できます。これは時間のもっとも単純な概念ですが、非決定的な結果が生成されます。時間処理にはタイムスタンプの抽出やウォーターマークの生成は必要ありません。

処理時間属性を定義する方法は2つあります。

DDLでの定義 #

処理時間属性はシステムPROCTIME()関数を使ってCREATEテーブルDDLの計算列として定義され、関数の返り値の型はTIMESTAMP_LTZです。計算列の詳細については、CREATE TABLE DDLを参照してください。

CREATE TABLE user_actions (
  user_name STRING,
  data STRING,
  user_action_time AS PROCTIME() -- declare an additional field as a processing time attribute
) WITH (
  ...
);

SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);

DataStreamからTableへの変換 #

処理時間属性はスキーマ定義中に.proctimeプロパティを使って定義されます。時間属性は、追加の論理フィールドによってのみ物理スキーマを拡張する必要があります。従って、スキーマ定義の最後でのみ定義できます。

DataStream<Tuple2<String, String>> stream = ...;

// declare an additional logical field as a processing time attribute
Table table = tEnv.fromDataStream(stream, $("user_name"), $("data"), $("user_action_time").proctime());

WindowedTable windowedTable = table.window(
        Tumble.over(lit(10).minutes())
            .on($("user_action_time"))
            .as("userActionWindow"));
val stream: DataStream[(String, String)] = ...

// declare an additional logical field as a processing time attribute
val table = tEnv.fromDataStream(stream, $"UserActionTimestamp", $"user_name", $"data", $"user_action_time".proctime)

val windowedTable = table.window(Tumble over 10.minutes on $"user_action_time" as "userActionWindow")
stream = ...

# declare an additional logical field as a processing time attribute
table = t_env.from_data_stream(stream, col("UserActionTimestamp"), col("user_name"), col("data"), col("user_action_time").proctime)

windowed_table = table.window(Tumble.over(lit(10).minutes).on(col("user_action_time")).alias("userActionWindow"))
inserted by FC2 system