This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.
ウィンドウ #
ウィンドウは無限のストリームの処理の中核です。ウィンドウはストリームを有限サイズの"buckets"に分割し、そのパケットに対して計算を適用できます。 over which we can apply computations. このドキュメントはFlink内でどのようにウィンドウが動作するのか、それが提供する機能によりどのようにプログラマが最大限に恩恵を受けることができるのかについて注目します。
ウィンドウ化されたFlinkプログラムの一般的な構造は以下で紹介します。最初のスニペットはkeyedストリームを参照し、2番目のスニペットはキー無しストリームを参照します。ご覧の通り、唯一の違いはキー付きストリーム用のkeyBy(...)
呼び出しと、キー無しストリーム用
のwindowAll(...)
になるwindow(...)
です。これは、このページののkリの部分のロードマップとしても機能します。
キー付きウィンドウ
キー無しウィンドウ
上記では、各カッコ([…])内のコマンドはオプションです。これにより、Flinkを使うとニーズに最適になるようにウィンドウ処理ロジックを様々な方法でカスタマイズできることが分かります。
注意: Evictor
はPython DataStream APIではまだサポートされません。
ウィンドウの寿命 #
簡単に言うと、ウィンドウは、このウィンドウに所属すべき最初の要素が到着するとすぐに作成され、時間(イベントまたは処理時間)が終了のタイムスタンプにユーザ定義の許容遅延
(Allowed Latenessを参照)を加えた時間がすぎると、完全に削除されます。Flink は時間ベースのウィンドウのみ削除を保証し、他の型、例えばグローバルウィンドウ( ウィンドウのアサイナーを参照)については保証しません。例えば、5分ごとの非重複(またはタンブリング)ウィンドウを作成し、許容遅延が1分であるイベント時間ベースのウィンドウ戦略を使う場合、Flinkは、タイムスタンプが12:00
から12:05
までにある最初の要素が来た時に新しいウィンドウを作成し、ウォーターマークが12:06
のタイムスタンプを過ぎると、そのウィンドウを削除します。
timestamp.
さらに、各ウィンドウにはそれに付属するTrigger
(Triggersを参照)と関数(ProcessWindowFunction
、ReduceFunction
、AggregateFunction
)があります(Window Functions)。関数にはウィンドウの内容に適用される計算が含まれ、Trigger
はウィンドウが関数を適用する準備ができていると見なされる条件を指定します。トリガーとなるポリシーは、“ウィンドウ内の要素数の数が4を超える時"や"ウォーターマークがウィンドウの端を通過した時"などです。トリガーはウィンドウの生成と削除の間にいつでもウィンドウの内容をパージすることを決定できます。この場合のパージはウィンドウ内の要素のみを参照し、ウィンドウのメタデータを参照しません。このことは新しいデータはそのウィンドウにまだ追加することができることを意味します。
上記とは別に、トリガーされた後で、関数が適用される前/後にウィンドウから要素を削除できるEvictor
(Evictorsを参照)を指定できます。
以下で、上のコンポーネントのそれぞれについて詳細を調べます。オプションのものに移る前に、上記のスニペットの必要な部分から始めます(Keyed vs Non-Keyed Windows、Window Assigners、Window Functionsを参照)。
キー付け vs 非キー付けされたウィンドウ #
まず最初に指定することは、ストリームがキー付けされているかそうでないかのどちらかです。これはウィンドウを定義する前に行う必要があります。
keyBy(...)
を使うと、無限ストリームを論理キー付きストリームに分割します。keyBy(...)
が呼ばれない場合は、ストリームにはキーが指定されませ。
キー付きストリームの場合、受信イベントの任意属性をキーとして使えます(詳細はここを参照してください)。キー付きストリームを使うと、各論理キー付きストリームを独立して処理できるため、ウィンドウ計算を複数のタスクで並行して実行できます。同じキーを参照する全ての要素は同じ並行タスクに送信されるでしょう。
キー無しストリームの場合、元のストリームは複数の論理ストリームに分割されず、全てのウィンドウロジックはシングルタスク、つまり並列度1で実行されます。
ウィンドウのアサイナ #
ストリームにキーが指定されているかどうかを指定したら、次のステップはウィンドウアサイナを定義することです。
ウィンドウのアサイナーは要素がウィンドウにどのように割り当てられるかを定義します。これを行うには、window(...)
(キー付きストリームの場合)またはwindowAll()
(キー無しストリームの場合)呼び出しで選択したWindowAssigner
を指定します。
WindowAssigner
には受信した各要素を1つ以上のウィンドウに割り当てる役割を果たします。Flinkには、最も一般的な使い方、つまり、タンブリングウィンドウ、スライディングウィンドウ、セッションウィンドウ、グローバルウィンドウのための事前定義されたウィンドウアサイナが付属します。WindowAssigner
クラスを拡張することで、独自のウィンドウアサイナを実装することもできます。All built-in window assigners (except the global
全ての組み込みウィンドウアサイナ(グローバルウィンドウを除く)は、時間に基づいて要素をウィンドウに割り当てます。時間は処理時間またはイベント時間のどちらかです。処理時間とイベント時間の違い、タイムスタンプとウォーターマークがどのように生成されるかについては、イベント時間に関するセクションをご覧ください。
時間ベースのウィンドウには開始タイムスタンプ(端を含む)と終了タイムスタンプ(端を含まない)があります。
コードでは、Flinkは時間ベースのウィンドウを操作する時にTimeWindow
を使います。これには、開始タイムスタンプと終了タイムスタンプがあり、指定されたウィンドウで許可されている最大のタイムスタンプを返すmaxTimestamp()
メソッドもあります。
以下では、Flinkの事前定義されたウィンドウアサイナがどのように動作するか、DataStreamプログラムでどのように使われるかを示します。以下の図は各アサイナーの作用を可視化します。紫色の円はストリームの要素を表します。これは何からのキー(この場合user 1、user 2、user 3)によって分割されます。 x-軸は時間の進捗を示します。
タンブリング ウィンドウ #
A タンブリングウィンドウアサイナは、各要素を指定されたウィンドウサイズに割り当てます。 タンブリング ウィンドウは固定のサイズを持ち、オーバーラップしません。例えば、5分のサイズのタンブリングウィンドウを指定した場合、以下の図で示されるように現在のウィンドウが評価され、5分ごとに新しいウィンドウが開始されます。
以下のコードの断片はタンブリング ウィンドウを使う方法を示します。
DataStream<T> input = ...;
// tumbling event-time windows
input
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.<windowed transformation>(<window function>);
// tumbling processing-time windows
input
.keyBy(<key selector>)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.<windowed transformation>(<window function>);
// daily tumbling event-time windows offset by -8 hours.
input
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
.<windowed transformation>(<window function>);
val input: DataStream[T] = ...
// tumbling event-time windows
input
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.<windowed transformation>(<window function>)
// tumbling processing-time windows
input
.keyBy(<key selector>)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.<windowed transformation>(<window function>)
// daily tumbling event-time windows offset by -8 hours.
input
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
.<windowed transformation>(<window function>)
input = ... # type: DataStream
# tumbling event-time windows
input \
.key_by(<key selector>) \
.window(TumblingEventTimeWindows.of(Time.seconds(5))) \
.<windowed transformation>(<window function>)
# tumbling processing-time windows
input \
.key_by(<key selector>) \
.window(TumblingProcessingTimeWindows.of(Time.seconds(5))) \
.<windowed transformation>(<window function>)
# daily tumbling event-time windows offset by -8 hours.
input \
.key_by(<key selector>) \
.window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8))) \
.<windowed transformation>(<window function>)
時間間隔は、Time.milliseconds(x)
、Time.seconds(x)
、Time.minutes(x)
などのいずれかを使って指定できます。
最後の例で示したように、タンブリングウィンドウアサイナは、ウィンドウの配置を変更するために使われるオプション offset
パラメータも受け取ります。例えば、オフセットが無い場合、時間ごとのタンブリングウィンドウはエポックに合わせて配置されます。つまり、1:00:00.000 - 1:59:59.999
、2:00:00.000 - 2:59:59.999
などのようなウィンドウが得られます。変更したい場合はオフセットを設定できます。
that you can give an offset. オフセットを15分にすると、例えば、1:15:00.000 - 2:14:59.999
、2:15:00.000 - 3:14:59.999
などが得られます。
オフセットの重要の使用例は、UTC-0以外のタイムゾーンにウィンドウを調整することです。
例えば、中国では、Time.hours(-8)
のオフセットを指定する必要があります。
スライディング ウィンドウ #
スライディングウィンドウアサイナは、要素を固定長のウィンドウに割り当てます。タンブリングウィンドウと同様に、ウィンドウのサイズはwindow sizeパラメータによって設定されます。 追加のwindow slideパラメータはスライディングウィンドウを開始する頻度を制御します。従って、スライドがウィンドウサイズより小さい場合、スライディングウィンドウが重なる可能性があります。。この場合、要素は複数のウィンドウに割り当てられます。
例えば、5分スライドする10分のサイズのウィンドウを持つことができます。これにより、次の図に示すように、過去10分間に到着したイベントを含むウィンドウを5分ごとに取得できます。
以下のコードの断片はスライディング ウィンドウを使う方法を示します。
DataStream<T> input = ...;
// sliding event-time windows
input
.keyBy(<key selector>)
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.<windowed transformation>(<window function>);
// sliding processing-time windows
input
.keyBy(<key selector>)
.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.<windowed transformation>(<window function>);
// sliding processing-time windows offset by -8 hours
input
.keyBy(<key selector>)
.window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8)))
.<windowed transformation>(<window function>);
val input: DataStream[T] = ...
// sliding event-time windows
input
.keyBy(<key selector>)
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.<windowed transformation>(<window function>)
// sliding processing-time windows
input
.keyBy(<key selector>)
.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.<windowed transformation>(<window function>)
// sliding processing-time windows offset by -8 hours
input
.keyBy(<key selector>)
.window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8)))
.<windowed transformation>(<window function>)
input = ... # type: DataStream
# sliding event-time windows
input \
.key_by(<key selector>) \
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))) \
.<windowed transformation>(<window function>)
# sliding processing-time windows
input \
.key_by(<key selector>) \
.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))) \
.<windowed transformation>(<window function>)
# sliding processing-time windows offset by -8 hours
input \
.key_by(<key selector>) \
.window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8))) \
.<windowed transformation>(<window function>)
時間間隔は、Time.milliseconds(x)
、Time.seconds(x)
、Time.minutes(x)
などのいずれかを使って指定できます。
最後の例で示すように、スライディングウィンドウアサイナはウィンドウの配置を変更するために使えるオプションのoffset
パラメータも受け取ります。例えば、オフセット無しの場合、30分スライドする時間ごとのウィンドウはエポックを使って配置されます。つまり、1:00:00.000 - 1:59:59.999
、1:30:00.000 - 2:29:59.999
などのようなウィンドウを取得できます。変更したい場合はオフセットを与えることができます。オフセットを15分にすると、例えば、1:15:00.000 - 2:14:59.999
、1:45:00.000 - 2:44:59.999
などが得られます。
オフセットの重要の使用例は、UTC-0以外のタイムゾーンにウィンドウを調整することです。
例えば、中国では、Time.hours(-8)
のオフセットを指定する必要があります。
セッション ウィンドウ #
セッションウィンドウアサイナは、アクティビティのセッションごとに要素をグループ化します。タンブリングウィンドウやスライディングウィンドウとは対照的に、セッションウィンドウは重ならず、開始時間と終了時間が固定されていません。代わりに、セッションウィンドウは一定期間、つまり非アクティブなギャップが発生した場合に、ウィンドウが閉じられます。セッションウィンドウアサイナは、静的なセッションギャップまたは非アクティブな機関の長さを定義するセッションギャップエクストラクタ関数を使って設置できます。この期間が終了すると、現在のセッションが終了し、後続の要素が新しいセッションウィンドウに割り当てられます。
以下のコードの断片はセッションウィンドウを使う方法を示します。
DataStream<T> input = ...;
// event-time session windows with static gap
input
.keyBy(<key selector>)
.window(EventTimeSessionWindows.withGap(Time.minutes(10)))
.<windowed transformation>(<window function>);
// event-time session windows with dynamic gap
input
.keyBy(<key selector>)
.window(EventTimeSessionWindows.withDynamicGap((element) -> {
// determine and return session gap
}))
.<windowed transformation>(<window function>);
// processing-time session windows with static gap
input
.keyBy(<key selector>)
.window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
.<windowed transformation>(<window function>);
// processing-time session windows with dynamic gap
input
.keyBy(<key selector>)
.window(ProcessingTimeSessionWindows.withDynamicGap((element) -> {
// determine and return session gap
}))
.<windowed transformation>(<window function>);
val input: DataStream[T] = ...
// event-time session windows with static gap
input
.keyBy(<key selector>)
.window(EventTimeSessionWindows.withGap(Time.minutes(10)))
.<windowed transformation>(<window function>)
// event-time session windows with dynamic gap
input
.keyBy(<key selector>)
.window(EventTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor[String] {
override def extract(element: String): Long = {
// determine and return session gap
}
}))
.<windowed transformation>(<window function>)
// processing-time session windows with static gap
input
.keyBy(<key selector>)
.window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
.<windowed transformation>(<window function>)
// processing-time session windows with dynamic gap
input
.keyBy(<key selector>)
.window(DynamicProcessingTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor[String] {
override def extract(element: String): Long = {
// determine and return session gap
}
}))
.<windowed transformation>(<window function>)
input = ... # type: DataStream
class MySessionWindowTimeGapExtractor(SessionWindowTimeGapExtractor):
def extract(self, element: tuple) -> int:
# determine and return session gap
# event-time session windows with static gap
input \
.key_by(<key selector>) \
.window(EventTimeSessionWindows.with_gap(Time.minutes(10))) \
.<windowed transformation>(<window function>)
# event-time session windows with dynamic gap
input \
.key_by(<key selector>) \
.window(EventTimeSessionWindows.with_dynamic_gap(MySessionWindowTimeGapExtractor())) \
.<windowed transformation>(<window function>)
# processing-time session windows with static gap
input \
.key_by(<key selector>) \
.window(ProcessingTimeSessionWindows.with_gap(Time.minutes(10))) \
.<windowed transformation>(<window function>)
# processing-time session windows with dynamic gap
input \
.key_by(<key selector>) \
.window(DynamicProcessingTimeSessionWindows.with_dynamic_gap(MySessionWindowTimeGapExtractor())) \
.<windowed transformation>(<window function>)
性的ギャップは、Time.milliseconds(x)
、Time.seconds(x)
、Time.minutes(x)
のいずれかを使って指定できます。
動的ギャップはSessionWindowTimeGapExtractor
インタフェースを実装することで指定されます。
セッションウィンドウには固定の開始と終了がないため、タンブリングウィンドウやスライディングウィンドウとは異なる方法で評価されます。内部的には、セッションウィンドウオペレータは到着するレコードごとに新しいウィンドウを作成し、もしお互いが定義された隙間より近い場合にはウィンドウを1つにマージします。 マージ可能にするために、セッションウィンドウはマージTriggerと、ReduceFunction
、AggregateFunction
、ProcessWindowFunction
のようなマージWindow Functionが必要です。。
グローバル ウィンドウ #
グローバルウィンドウアサイナは、同じキーを持つ全ての要素を同じ1つのグローバルウィンドウに割り当てます。 このウィンドウスキームは、独自のtriggerも指定した場合のみ役立ちます。それ以外の場合、グローバルウィンドウには集約された要素を処理できる自然な終了が無いため、計算が実行されません。
以下のコードの断片はグローバルウィンドウを使う方法を示します。
DataStream<T> input = ...;
input
.keyBy(<key selector>)
.window(GlobalWindows.create())
.<windowed transformation>(<window function>);
val input: DataStream[T] = ...
input
.keyBy(<key selector>)
.window(GlobalWindows.create())
.<windowed transformation>(<window function>)
input = ... # type: DataStream
input \
.key_by(<key selector>) \
.window(GlobalWindows.create()) \
.<windowed transformation>(<window function>)
ウィンドウ関数 #
ウィンドウのアサイナーを定義した後で、これらの各ウィンドウで行いたい計算を指定する必要があります。これはwindow functionの役割です。ウィンドウが処理をする準備ができたとシステムが判断すると、ウィンドウ関数は各(おそらくキー付けされている)ウィンドウの要素を処理するために使われます。 (ウィンドウの準備ができたとどうやってFlinkが決定するかについては triggersを参照してください)。
ウィンドウ関数は、ReduceFunction
、AggregateFunction
、ProcessWindowFunction
のいずれかです。Flinkは要素が到着した時に各ウィンドウについて要素を逐次集約することができるため、最初の2つはより効率的に実行することができます(State Sizeの章を参照してください)。ProcessWindowFunction
はウィンドウに含まれる全ての要素のIterable
と、要素が属するウィンドウに関する追加のメタ情報を取得します。
ProcessWindowFunction
を使ったウィンドウ変換は、Flinkが関数を呼び出す前にウィンドウの全ての要素を内部でバッファリングする必要があるため、他のケースほど効率的に実行できません。
これは、ProcessWindowFunction
をReduceFunction
またはAggregateFunction
と組み合わせて、ウィンドウ要素とProcessWindowFunction
追加のウィンドウのメタデータの両方を取得することで軽減できます。これらの変種のそれぞれについての例を見てみましょう。
ReduceFunction #
ReduceFunction
は、入力からの2つの要素を組み合わせて同じタイプの出力要素を生成する方法を指定します。FlinkはReduceFunction
を使って段階的にウィンドウの要素を集計します。。
ReduceFunction
は次のようにして定義して使えます:
DataStream<Tuple2<String, Long>> input = ...;
input
.keyBy(<key selector>)
.window(<window assigner>)
.reduce(new ReduceFunction<Tuple2<String, Long>>() {
public Tuple2<String, Long> reduce(Tuple2<String, Long> v1, Tuple2<String, Long> v2) {
return new Tuple2<>(v1.f0, v1.f1 + v2.f1);
}
});
val input: DataStream[(String, Long)] = ...
input
.keyBy(<key selector>)
.window(<window assigner>)
.reduce { (v1, v2) => (v1._1, v1._2 + v2._2) }
input = ... # type: DataStream
input \
.key_by(<key selector>) \
.window(<window assigner>) \
.reduce(lambda v1, v2: (v1[0], v1[1] + v2[1]),
output_type=Types.TUPLE([Types.STRING(), Types.LONG()]))
上の例はウィンドウ内の全ての要素についてタプルの2つ目のフィールドを合計します。
AggregateFunction #
AggregateFunction
は、3つのタイプがあるReduceFunction
の一般化されたバージョンです: 入力タイプ(IN
)、蓄積タイプ (ACC
)、出力タイプ (OUT
)。入力タイプは入力ストリーム内の要素のタイプであり、AggregateFunction
には1つの入力要素をアキュムレータに追加するメソッドがあります。インタフェースには初期アキュムレータを作成するためのメソッドもあります。2つnおアキュムレータを1つのアキュムレータにマージし、アキュムレータから出力(OUT
)を抽出します。以下の例でこれがどのように機能するかを見てみましょう。
ReduceFunction
と同様に、Flinkはウィンドウの入力要素が到着するたびに段階的に集計します。
AggregateFunction
はいかのように定義して使えます:
/**
* The accumulator is used to keep a running sum and a count. The {@code getResult} method
* computes the average.
*/
private static class AverageAggregate
implements AggregateFunction<Tuple2<String, Long>, Tuple2<Long, Long>, Double> {
@Override
public Tuple2<Long, Long> createAccumulator() {
return new Tuple2<>(0L, 0L);
}
@Override
public Tuple2<Long, Long> add(Tuple2<String, Long> value, Tuple2<Long, Long> accumulator) {
return new Tuple2<>(accumulator.f0 + value.f1, accumulator.f1 + 1L);
}
@Override
public Double getResult(Tuple2<Long, Long> accumulator) {
return ((double) accumulator.f0) / accumulator.f1;
}
@Override
public Tuple2<Long, Long> merge(Tuple2<Long, Long> a, Tuple2<Long, Long> b) {
return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);
}
}
DataStream<Tuple2<String, Long>> input = ...;
input
.keyBy(<key selector>)
.window(<window assigner>)
.aggregate(new AverageAggregate());
/**
* The accumulator is used to keep a running sum and a count. The [getResult] method
* computes the average.
*/
class AverageAggregate extends AggregateFunction[(String, Long), (Long, Long), Double] {
override def createAccumulator() = (0L, 0L)
override def add(value: (String, Long), accumulator: (Long, Long)) =
(accumulator._1 + value._2, accumulator._2 + 1L)
override def getResult(accumulator: (Long, Long)) = accumulator._1 / accumulator._2
override def merge(a: (Long, Long), b: (Long, Long)) =
(a._1 + b._1, a._2 + b._2)
}
val input: DataStream[(String, Long)] = ...
input
.keyBy(<key selector>)
.window(<window assigner>)
.aggregate(new AverageAggregate)
class AverageAggregate(AggregateFunction):
def create_accumulator(self) -> Tuple[int, int]:
return 0, 0
def add(self, value: Tuple[str, int], accumulator: Tuple[int, int]) -> Tuple[int, int]:
return accumulator[0] + value[1], accumulator[1] + 1
def get_result(self, accumulator: Tuple[int, int]) -> float:
return accumulator[0] / accumulator[1]
def merge(self, a: Tuple[int, int], b: Tuple[int, int]) -> Tuple[int, int]:
return a[0] + b[0], a[1] + b[1]
input = ... # type: DataStream
input \
.key_by(<key selector>) \
.window(<window assigner>) \
.aggregate(AverageAggregate(),
accumulator_type=Types.TUPLE([Types.LONG(), Types.LONG()]),
output_type=Types.DOUBLE())
上の例は、ウィンドウの要素の2番目のフィールドの平均を計算します。
ProcessWindowFunction #
ProcessWindowFunctionはウィンドウの全ての要素を含むIterableと、時間と状態の情報にアクセスできるContextオブジェクトを取得します。これにより他のウィンドウ関数よりも高い柔軟性が得られます。これにはパフォーマンスとリソースの消費を犠牲になります。要素は段階的に集計できませんが、代わりにウィンドウが処理の準備ができたと見なされるまで内部的にバッファする必要があります。
ProcessWindowFunction
のシグネチャーは以下のようになります:
public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> implements Function {
/**
* Evaluates the window and outputs none or several elements.
*
* @param key The key for which this window is evaluated.
* @param context The context in which the window is being evaluated.
* @param elements The elements in the window being evaluated.
* @param out A collector for emitting elements.
*
* @throws Exception The function may throw exceptions to fail the program and trigger recovery.
*/
public abstract void process(
KEY key,
Context context,
Iterable<IN> elements,
Collector<OUT> out) throws Exception;
/**
* Deletes any state in the {@code Context} when the Window expires (the watermark passes its
* {@code maxTimestamp} + {@code allowedLateness}).
*
* @param context The context to which the window is being evaluated
* @throws Exception The function may throw exceptions to fail the program and trigger recovery.
*/
public void clear(Context context) throws Exception {}
/**
* The context holding window metadata.
*/
public abstract class Context implements java.io.Serializable {
/**
* Returns the window that is being evaluated.
*/
public abstract W window();
/** Returns the current processing time. */
public abstract long currentProcessingTime();
/** Returns the current event-time watermark. */
public abstract long currentWatermark();
/**
* State accessor for per-key and per-window state.
*
* <p><b>NOTE:</b>If you use per-window state you have to ensure that you clean it up
* by implementing {@link ProcessWindowFunction#clear(Context)}.
*/
public abstract KeyedStateStore windowState();
/**
* State accessor for per-key global state.
*/
public abstract KeyedStateStore globalState();
}
}
abstract class ProcessWindowFunction[IN, OUT, KEY, W <: Window] extends Function {
/**
* Evaluates the window and outputs none or several elements.
*
* @param key The key for which this window is evaluated.
* @param context The context in which the window is being evaluated.
* @param elements The elements in the window being evaluated.
* @param out A collector for emitting elements.
* @throws Exception The function may throw exceptions to fail the program and trigger recovery.
*/
def process(
key: KEY,
context: Context,
elements: Iterable[IN],
out: Collector[OUT])
/**
* Deletes any state in the [[Context]] when the Window expires
* (the watermark passes its `maxTimestamp` + `allowedLateness`).
*
* @param context The context to which the window is being evaluated
* @throws Exception The function may throw exceptions to fail the program and trigger recovery.
*/
@throws[Exception]
def clear(context: Context) {}
/**
* The context holding window metadata
*/
abstract class Context {
/**
* Returns the window that is being evaluated.
*/
def window: W
/**
* Returns the current processing time.
*/
def currentProcessingTime: Long
/**
* Returns the current event-time watermark.
*/
def currentWatermark: Long
/**
* State accessor for per-key and per-window state.
*/
def windowState: KeyedStateStore
/**
* State accessor for per-key global state.
*/
def globalState: KeyedStateStore
}
}
class ProcessWindowFunction(Function, Generic[IN, OUT, KEY, W]):
@abstractmethod
def process(self,
key: KEY,
context: 'ProcessWindowFunction.Context',
elements: Iterable[IN]) -> Iterable[OUT]:
"""
Evaluates the window and outputs none or several elements.
:param key: The key for which this window is evaluated.
:param context: The context in which the window is being evaluated.
:param elements: The elements in the window being evaluated.
:return: The iterable object which produces the elements to emit.
"""
pass
@abstractmethod
def clear(self, context: 'ProcessWindowFunction.Context') -> None:
"""
Deletes any state in the :class:`Context` when the Window expires (the watermark passes its
max_timestamp + allowed_lateness).
:param context: The context to which the window is being evaluated.
"""
pass
class Context(ABC, Generic[W2]):
"""
The context holding window metadata.
"""
@abstractmethod
def window(self) -> W2:
"""
:return: The window that is being evaluated.
"""
pass
@abstractmethod
def current_processing_time(self) -> int:
"""
:return: The current processing time.
"""
pass
@abstractmethod
def current_watermark(self) -> int:
"""
:return: The current event-time watermark.
"""
pass
@abstractmethod
def window_state(self) -> KeyedStateStore:
"""
State accessor for per-key and per-window state.
.. note::
If you use per-window state you have to ensure that you clean it up by implementing
:func:`~ProcessWindowFunction.clear`.
:return: The :class:`KeyedStateStore` used to access per-key and per-window states.
"""
pass
@abstractmethod
def global_state(self) -> KeyedStateStore:
"""
State accessor for per-key global state.
"""
pass
key
パラメータは、keyBy()
呼び出しに指定されたKeySelector
経由で抽出されるキーです。タプルインデックスキーまたは文字列フィールド参照の場合、このキーのタイプは常にTuple
であり、キーフィールドを抽出するには、それを正しいサイズのタプルに手動でキャストする必要があります。
ProcessWindowFunction
は次のようにして定義して使えます:
DataStream<Tuple2<String, Long>> input = ...;
input
.keyBy(t -> t.f0)
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.process(new MyProcessWindowFunction());
/* ... */
public class MyProcessWindowFunction
extends ProcessWindowFunction<Tuple2<String, Long>, String, String, TimeWindow> {
@Override
public void process(String key, Context context, Iterable<Tuple2<String, Long>> input, Collector<String> out) {
long count = 0;
for (Tuple2<String, Long> in: input) {
count++;
}
out.collect("Window: " + context.window() + "count: " + count);
}
}
val input: DataStream[(String, Long)] = ...
input
.keyBy(_._1)
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.process(new MyProcessWindowFunction())
/* ... */
class MyProcessWindowFunction extends ProcessWindowFunction[(String, Long), String, String, TimeWindow] {
def process(key: String, context: Context, input: Iterable[(String, Long)], out: Collector[String]) = {
var count = 0L
for (in <- input) {
count = count + 1
}
out.collect(s"Window ${context.window} count: $count")
}
}
input = ... # type: DataStream
input \
.key_by(lambda v: v[0]) \
.window(TumblingEventTimeWindows.of(Time.minutes(5))) \
.process(MyProcessWindowFunction())
# ...
class MyProcessWindowFunction(ProcessWindowFunction):
def process(self, key: str, context: ProcessWindowFunction.Context,
elements: Iterable[Tuple[str, int]]) -> Iterable[str]:
count = 0
for _ in elements:
count += 1
yield "Window: {} count: {}".format(context.window(), count)
この例は、ウィンドウ内の要素をカウントするProcessWindowFunction
を示します。更に、ウィンドウ関数は出力にウィンドウについての情報を追加します。
カウントのような単純な集計にProcessWindowFunction
を使うことは非常に非効率なことに注意してください。次のセクションでは、ReduceFunction
またはAggregateFunction
をProcessWindowFunction
と組み合わせて、増分集計とProcessWindowFunction
の追加情報の両方を取得する方法を示します。
増加集約するProcessWindowFunction #
ProcessWindowFunction
はReduceFunction
またはAggregateFunction
と組み合わせて、要素がウィンドウに到達した時に段階的に集計することができます。
ウィンドウが閉じられると、ProcessWindowFunction
に集約結果が提供されます。
これによりProcessWindowFunction
の追加のウィンドウメタ情報にアクセスしながらウィンドウを増分的に計算できるようになります。
増分ウィンドウ集計には、ProcessWindowFunction
の代わりに従来のWindowFunction
を使うこともできます。
ReduceFunctionを使った逐次ウィンドウ集約 #
次の例は、増分ReduceFunction
をProcessWindowFunction
と組み合わせて、ウィンドウ内の最小のイベントとウィンドウの開始時間を返す方法を示しています。
DataStream<SensorReading> input = ...;
input
.keyBy(<key selector>)
.window(<window assigner>)
.reduce(new MyReduceFunction(), new MyProcessWindowFunction());
// Function definitions
private static class MyReduceFunction implements ReduceFunction<SensorReading> {
public SensorReading reduce(SensorReading r1, SensorReading r2) {
return r1.value() > r2.value() ? r2 : r1;
}
}
private static class MyProcessWindowFunction
extends ProcessWindowFunction<SensorReading, Tuple2<Long, SensorReading>, String, TimeWindow> {
public void process(String key,
Context context,
Iterable<SensorReading> minReadings,
Collector<Tuple2<Long, SensorReading>> out) {
SensorReading min = minReadings.iterator().next();
out.collect(new Tuple2<Long, SensorReading>(context.window().getStart(), min));
}
}
val input: DataStream[SensorReading] = ...
input
.keyBy(<key selector>)
.window(<window assigner>)
.reduce(
(r1: SensorReading, r2: SensorReading) => { if (r1.value > r2.value) r2 else r1 },
( key: String,
context: ProcessWindowFunction[_, _, _, TimeWindow]#Context,
minReadings: Iterable[SensorReading],
out: Collector[(Long, SensorReading)] ) =>
{
val min = minReadings.iterator.next()
out.collect((context.window.getStart, min))
}
)
input = ... # type: DataStream
input \
.key_by(<key selector>) \
.window(<window assigner>) \
.reduce(lambda r1, r2: r2 if r1.value > r2.value else r1,
window_function=MyProcessWindowFunction(),
output_type=Types.TUPLE([Types.STRING(), Types.LONG()]))
# Function definition
class MyProcessWindowFunction(ProcessWindowFunction):
def process(self, key: str, context: ProcessWindowFunction.Context,
min_readings: Iterable[SensorReading]) -> Iterable[Tuple[int, SensorReading]]:
min = next(iter(min_readings))
yield context.window().start, min
AggregateFunctionを使った増分ウィンドウ集計 #
以下の例は、AggregateFunction
とProcessWindowFunction
を組み合わせて平均を計算し、平均とともにキーとウィンドウも出力します。
DataStream<Tuple2<String, Long>> input = ...;
input
.keyBy(<key selector>)
.window(<window assigner>)
.aggregate(new AverageAggregate(), new MyProcessWindowFunction());
// Function definitions
/**
* The accumulator is used to keep a running sum and a count. The {@code getResult} method
* computes the average.
*/
private static class AverageAggregate
implements AggregateFunction<Tuple2<String, Long>, Tuple2<Long, Long>, Double> {
@Override
public Tuple2<Long, Long> createAccumulator() {
return new Tuple2<>(0L, 0L);
}
@Override
public Tuple2<Long, Long> add(Tuple2<String, Long> value, Tuple2<Long, Long> accumulator) {
return new Tuple2<>(accumulator.f0 + value.f1, accumulator.f1 + 1L);
}
@Override
public Double getResult(Tuple2<Long, Long> accumulator) {
return ((double) accumulator.f0) / accumulator.f1;
}
@Override
public Tuple2<Long, Long> merge(Tuple2<Long, Long> a, Tuple2<Long, Long> b) {
return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);
}
}
private static class MyProcessWindowFunction
extends ProcessWindowFunction<Double, Tuple2<String, Double>, String, TimeWindow> {
public void process(String key,
Context context,
Iterable<Double> averages,
Collector<Tuple2<String, Double>> out) {
Double average = averages.iterator().next();
out.collect(new Tuple2<>(key, average));
}
}
val input: DataStream[(String, Long)] = ...
input
.keyBy(<key selector>)
.window(<window assigner>)
.aggregate(new AverageAggregate(), new MyProcessWindowFunction())
// Function definitions
/**
* The accumulator is used to keep a running sum and a count. The [getResult] method
* computes the average.
*/
class AverageAggregate extends AggregateFunction[(String, Long), (Long, Long), Double] {
override def createAccumulator() = (0L, 0L)
override def add(value: (String, Long), accumulator: (Long, Long)) =
(accumulator._1 + value._2, accumulator._2 + 1L)
override def getResult(accumulator: (Long, Long)) = accumulator._1 / accumulator._2
override def merge(a: (Long, Long), b: (Long, Long)) =
(a._1 + b._1, a._2 + b._2)
}
class MyProcessWindowFunction extends ProcessWindowFunction[Double, (String, Double), String, TimeWindow] {
def process(key: String, context: Context, averages: Iterable[Double], out: Collector[(String, Double)]) = {
val average = averages.iterator.next()
out.collect((key, average))
}
}
input = ... # type: DataStream
input
.key_by(<key selector>) \
.window(<window assigner>) \
.aggregate(AverageAggregate(),
window_function=MyProcessWindowFunction(),
accumulator_type=Types.TUPLE([Types.LONG(), Types.LONG()]),
output_type=Types.TUPLE([Types.STRING(), Types.DOUBLE()]))
# Function definitions
class AverageAggregate(AggregateFunction):
"""
The accumulator is used to keep a running sum and a count. The :func:`get_result` method
computes the average.
"""
def create_accumulator(self) -> Tuple[int, int]:
return 0, 0
def add(self, value: Tuple[str, int], accumulator: Tuple[int, int]) -> Tuple[int, int]:
return accumulator[0] + value[1], accumulator[1] + 1
def get_result(self, accumulator: Tuple[int, int]) -> float:
return accumulator[0] / accumulator[1]
def merge(self, a: Tuple[int, int], b: Tuple[int, int]) -> Tuple[int, int]:
return a[0] + b[0], a[1] + b[1]
class MyProcessWindowFunction(ProcessWindowFunction):
def process(self, key: str, context: ProcessWindowFunction.Context,
averages: Iterable[float]) -> Iterable[Tuple[str, float]]:
average = next(iter(averages))
yield key, average
ProcessWindowFunction内のウィンドウ単位の状態の使用 #
ProcessWindowFunction
は、(リッチ関数と同様に)キー付き状態にアクセスするだけでなく、関数が現在処理しているウィンドウをスコープとするキー付き状態も使えます。この文脈では、per-windowの状態が参照しているウィンドウが何であるかを理解することは重要です。
様々な"windows"が関係します:
- ウィンドウ操作を指定する時に定義されたウィンドウ: これは1時間のタンブリングウィンドウまたは1時間ずつスライドする2時間のスライディングウィンドウです。
- 特定のキーに対して定義されたウィンドウの実際のインスタンス: これはuser-id xyzの12:00から13:00までの時間ウィンドウです。これはウィンドウ定義に基づいており、ジョブが現在処理しているキーの数とイベントが該当するタイムスロットに基づいて多数のウィンドウが存在します。
Per-windowの状態は、これら2つのうち後者に関連付けられています。つまり、1000件の異なるキーのイベントを処理し、それら全てのイベントが現在*[12:00, 13:00)*時間ウィンドウに分類される場合、それぞれに独自のキー付きのウィンドウごとの状態の1000個のウィンドウインスタンスが存在することになります。
process()
呼び出しが受け取るContext
オブジェクトには、2種類の状態へのアクセスを許可する2つのメソッドがあります:
globalState()
、ウィンドウにスコープされないキー付き状態へのアクセスを許可します。windowState()
、ウィンドウにスコープするキー付き状態へのアクセスを許可します。
この機能は、遅れて到着するデータに対する遅延起動がある場合や、投機的な早期起動を行う独自のトリガーがある場合に発生する可能性があるなど、同じウィンドウで複数の起動が予想される場合に役立ちます。このような場合、以前の起動についての情報や、ウィンドウごとの状態の起動数を格納できます。。
ウィンドウ状態を使う場合、ウィンドウがクリアされた時に状態をクリーンアップすることも重要です。これは、clear()
メソッドで発生します。
WindowFunction (レガシー) #
ProcessWindowFunction
が使われる場所では、WindowFunction
も使えます。これは、ProcessWindowFunction
の古いバージョンであり、提供されるコンテキスト情報が少なく、ウィンドウごとのキー付き状態などのいくつかの高度な機能がありません。このインタフェースは、ある時点で廃止される予定です。
at some point.
WindowFunction
のシグネチャーは次のようになります:
public interface WindowFunction<IN, OUT, KEY, W extends Window> extends Function, Serializable {
/**
* Evaluates the window and outputs none or several elements.
*
* @param key The key for which this window is evaluated.
* @param window The window that is being evaluated.
* @param input The elements in the window being evaluated.
* @param out A collector for emitting elements.
*
* @throws Exception The function may throw exceptions to fail the program and trigger recovery.
*/
void apply(KEY key, W window, Iterable<IN> input, Collector<OUT> out) throws Exception;
}
trait WindowFunction[IN, OUT, KEY, W <: Window] extends Function with Serializable {
/**
* Evaluates the window and outputs none or several elements.
*
* @param key The key for which this window is evaluated.
* @param window The window that is being evaluated.
* @param input The elements in the window being evaluated.
* @param out A collector for emitting elements.
* @throws Exception The function may throw exceptions to fail the program and trigger recovery.
*/
def apply(key: KEY, window: W, input: Iterable[IN], out: Collector[OUT])
}
class WindowFunction(Function, Generic[IN, OUT, KEY, W]):
@abstractmethod
def apply(self, key: KEY, window: W, inputs: Iterable[IN]) -> Iterable[OUT]:
"""
Evaluates the window and outputs none or several elements.
:param key: The key for which this window is evaluated.
:param window: The window that is being evaluated.
:param inputs: The elements in the window being evaluated.
"""
pass
このように使うことができます:
DataStream<Tuple2<String, Long>> input = ...;
input
.keyBy(<key selector>)
.window(<window assigner>)
.apply(new MyWindowFunction());
val input: DataStream[(String, Long)] = ...
input
.keyBy(<key selector>)
.window(<window assigner>)
.apply(new MyWindowFunction())
input = ... # type: DataStream
input \
.key_by(<key selector>) \
.window(<window assigner>) \
.apply(MyWindowFunction())
トリガー #
トリガー
は、ウィンドウ(window assignerによって形成される)がwindow functionによって処理される準備ができたかどうかを決定します。各WindowAssigner
には、デフォルトのTrigger
が付属しています。
デフォルトのトリガーがニーズに合わない場合は、trigger(...)
を使ってカスタムトリガーを指定できます。
トリガーインタフェースには、Trigger
が様々なイベントに反応できるようにする5つのメソッドがあります:
onElement()
メソッドは、ウィンドウに追加される要素gごとに呼び出されます。onEventTime()
メソッドは、登録されたイベント時間タイマーが起動した時に呼ばれます。onProcessingTime()
メソッドは、登録された処理時間タイマーが起動した時に呼ばれます。onMerge()
メソッドはステートフルトリガーに関連しており、対応するウィンドウがマージされる時、例えばセッションウィンドウを使う場合)に、2つのトリガーの状態をマージします。- 最後に、
clear()
メソッドは対応するウィンドウの削除時に必要なアクションを実行します。
上のメソッドについて知っておくべき2つのことがあります:
- 最初の3つは、
TriggerResult
を返すことで、呼び出しイベントにどう作用するかを決定します。アクションは以下のうちの1つです:
CONTINUE
: 何もしません。FIRE
: 計算をトリガーします。PURGE
: ウィンドウ内の要素をクリアし、FIRE_AND_PURGE
: 計算をトリガーし、その後ウィンドウ内の要素をクリアします。
- これらのメソッドのいずれも将来のアクションのために処理時間あるいはイベント時間のタイマーを登録することができます。
発火と消去 #
トリガーはウィンドウの処理準備が整っていると判断すると、それを起動します。つまり、FIRE
かFIRE_AND_PURGE
を返します。これは、ウィンドウオペレータが現在のウィンドウの結果を出力するためのシグナルです。
to emit the result of the current window. ProcessWindowFunction
を持つウィンドウを考えると、全ての要素は (おそらく、evictorに渡した後に)ProcessWindowFunction
に渡されます。
ReduceFunction
またはAggregateFunction
を持つウィンドウは、単純にそれらの貪欲に集計された結果を出力します。
トリガーが起動されると、FIRE
またはFIRE_AND_PURGE
のいずれかが行われます。FIRE
はウィンドウの内容を保持しますが、FIRE_AND_PURGE
は内容を削除します。
デフォルトでは、事前実装されたトリガーはウィンドウの状態をパージせずに、単純にFIRE
を実行します。
パージは単純にウィンドウの内容を削除し、ウィンドウに関する潜在的なメタデータやトリガーの状態はそのまま残します。
WindowAssignersのデフォルトのトリガ #
WindowAssigner
のデフォルトのTrigger
は、多くのユースケースにとって適切です。例えば、全てのイベント時間アサイナは、デフォルトのトリガーとしてEventTimeTrigger
があります。
default trigger. このトリガーは、ウォーターマークがウィンドウの終端を通ると単純に起動します。
GlobalWindow
のデフォルトのトリガーは、決して起動しないNeverTrigger
です。従って、GlobalWindow
を使う場合は、常に独自のトリガーを定義する必要があります。
trigger()
を使ってトリガーを指定すると、WindowAssigner
のデフォルトのトリガーが上書きされます。例えば、TumblingEventTimeWindows
にCountTrigger
を指定すると、時間お経過に基づいてウィンドウが起動されるのではなく、カウントによってのみ起動されます。現時点では、時間とカウントの両方に基づいて反応したい場合は、独自のカスタムトリガーを書く必要があります。
組み込みおよび独自のトリガ #
Flinkは2,3の組み込みのトリガーを付属します。
- (既に述べた)
EventTimeTrigger
は、ウォーターマークによって測定されたイベント時間の進捗に基づいて起動されます。 ProcessingTimeTrigger
は処理時間に基づいて起動されます。CountTrigger
はウィンドウ内の要素の数が指定された制限を超えると起動されます。PurgingTrigger
は引数として別のトリガーを受け取り、それをパージトリガーに変換します。
独自のトリガーを実装する必要がある場合は、abstractクラス Trigger を確認してください。 APIはまだ開発中でFlinkの将来のバージョンでは変わるかも知れないことに注意してください。
エビクター #
Flinkのウィンドウモデルにより、WindowAssigner
とTrigger
に加えてオプションのEvictor
を指定できます。
これは、evictor(...)
メソッド(このドキュメントの冒頭で示しました)を使って行われます。evictorには、トリガーの起動後およびウィンドウ関数の適用前および/または後にウィンドウから要素を削除する機能があります。
これを行うために、Evictor
インタフェースには次の2つのメソッドがあります:
/**
* Optionally evicts elements. Called before windowing function.
*
* @param elements The elements currently in the pane.
* @param size The current number of elements in the pane.
* @param window The {@link Window}
* @param evictorContext The context for the Evictor
*/
void evictBefore(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);
/**
* Optionally evicts elements. Called after windowing function.
*
* @param elements The elements currently in the pane.
* @param size The current number of elements in the pane.
* @param window The {@link Window}
* @param evictorContext The context for the Evictor
*/
void evictAfter(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);
evictBefore()
にはウィンドウ関数の前に適用されるevictionロジックが含まれていて、evictAfter()
にはウィンドウ関数の後に適用されるevictionロジックが含まれています。ウィンドウ関数の適用前にevictされた要素は、ウィンドウ関数によって処理されません。
Flinkには3つの事前実装されたevictorが付属します。それらは:
CountEvictor
: userが指定した要素数をウィンドウに保持し、残りの要素をウィンドウバッファの先頭から破棄します。DeltaEvictor
:DeltaFunction
とthreshold
を受け取り、ウィンドウバッファ内の最後の要素と残りの各要素の間の差分を計算し、閾値以上の差分を持つものを削除します。TimeEvictor
: 指定されたウィンドウについて引数としてinterval
をミリ秒単位で受け取り、要素内から最大のタイムスタンプmax_ts
を見つけて、max_ts - interval
より小さいタイムスタンプを持つ全ての要素を削除します。
デフォルトでは、事前実装された全てのevictorは、ウィンドウ関数の前にロジックを適用します。
ウィンドウの全ての要素は計算を適用する前にevictorを通る必要があるため、evictorを指定すると事前集計が防止されます。 これはevictorを持つウィンドウは非常に多くの状態を作成することを意味します。
注意: Evictor
はPython DataStream APIではまだサポートされません。
Flinkはウィンドウ内の要素の順序については保証しません。This implies that although an evictor may remove elements from the beginning of the window, these are not necessarily the ones that arrive first or last.
これは、evictorはウィンドウの先頭から要素を削除することはできるが、それらが最初または最後に到着する要素である必要ではないことを意味します。 #
イベント時間ウィンドウで使う場合、要素が遅れて到着する可能性があります。つまりイベント時間の寸効状況を追跡するためにFlinkが使うウォーターマークは既に要素が所属するウィンドウの終了タイムスタンプを過ぎています。Flinkがイベント時間をどう扱うについての詳細な説明については、イベント時間と特に遅い要素を参照してください。
デフォルトでは、遅れた要素はウォーターマークがウィンドウの終了を過ぎた時に削ります。ただし、Flinkでは、ウィンドウオペレータの最大許容遅延を指定できます。許される遅延は、それらが削られる前に要素はどれだけの時間遅延することができるかを指定します。デフォルトの値は0です。
ウォーターマークがウィンドウの終了を超えた後に到着したが、ウィンドウの終了プラス許される遅延の前に到着した要素は、まだウィンドウに追加されます。使用されるトリガーによっては、遅れたが削られなかった要素はウィンドウを再び起動する場合があります。これは、EventTimeTrigger
の場合に当てはまります。
これを動作するために、Flinkはそれらの許される遅延が期限切れになるまでウィンドウの状態を保持します。これが発生すると、ウィンドウのライフサイクルの章でも説明されるように、Flinkはウィンドウを削除し、状態を削除します。
デフォルトでは、許容遅延は0
に設定されています。つまり、ウォーターマークの後で到着する要素は削られるでしょう。
以下のように許される遅延を指定することができます:
DataStream<T> input = ...;
input
.keyBy(<key selector>)
.window(<window assigner>)
.allowedLateness(<time>)
.<windowed transformation>(<window function>);
val input: DataStream[T] = ...
input
.keyBy(<key selector>)
.window(<window assigner>)
.allowedLateness(<time>)
.<windowed transformation>(<window function>)
input = ... # type: DataStream
input \
.key_by(<key selector>) \
.window(<window assigner>) \
.allowed_lateness(<time>) \
.<windowed transformation>(<window function>)
GlobalWindows
ウィンドウアサイナーを使う場合、グローバルウィンドウの終了タイムスタンプがLong.MAX_VALUE
であるため、データが遅れているとは見なされません。
side outputとしての遅延データの取得 #
Flinkのサイド出力機能を使うと、遅延して破棄されたデータのストリームを取得できます。
まず、ウィンドウ化されたストリームでsideOutputLateData(OutputTag)
を使って遅延データを取得することを指定する必要があります。そして、ウィンドウ化されたオペレーションの結果のストリームのサイド出路y九ストリームを取得できます:
final OutputTag<T> lateOutputTag = new OutputTag<T>("late-data"){};
DataStream<T> input = ...;
SingleOutputStreamOperator<T> result = input
.keyBy(<key selector>)
.window(<window assigner>)
.allowedLateness(<time>)
.sideOutputLateData(lateOutputTag)
.<windowed transformation>(<window function>);
DataStream<T> lateStream = result.getSideOutput(lateOutputTag);
val lateOutputTag = OutputTag[T]("late-data")
val input: DataStream[T] = ...
val result = input
.keyBy(<key selector>)
.window(<window assigner>)
.allowedLateness(<time>)
.sideOutputLateData(lateOutputTag)
.<windowed transformation>(<window function>)
val lateStream = result.getSideOutput(lateOutputTag)
late_output_tag = OutputTag("late-data", type_info)
input = ... # type: DataStream
result = input \
.key_by(<key selector>) \
.window(<window assigner>) \
.allowed_lateness(<time>) \
.side_output_late_data(late_output_tag) \
.<windowed transformation>(<window function>)
late_stream = result.get_side_output(late_output_tag)
遅延要素の考慮 #
許容遅延を0より大きく指定すると、ウォーターマークがウィンドウの終了を過ぎた後でもウィンドウとその内容が保持されます。このような場合、遅れたが削れれていない要素が到着すると、ウィンドウの別のトリガーが起動される可能性があります。ウィンドウの最初の起動のメイン起動
に対して、それらは遅れたイベントによって起動されるため、これらの起動は遅延起動
と呼ばれます。セッションウィンドウの場合、遅れた起動は2つの既存のマージされていないウィンドウ間の隙間を“埋める”可能性ああるため、遅延起動はウィンドウの更なるマージに繋がる可能性があります。
遅延起動によって発行された要素は、前の計算の更新された結果として扱われる必要があります。つまり、データストリームには同じ計算の複数の結果が含まれます。アプリケーションに依存して、これらの重複した結果を考慮あるいは1つにする必要があります。
ウィンドウの結果との連携 #
ウィンドウオペレーションの結果もDataStream
になります。ウィンドウオペレーションに関する情報は結果の要素には保持されないため、ウィンドウに関するメタ情報を保持したい場合は、ProcessWindowFunction
の結果要素にその情報を手動でエンコードする必要があります。結果要素に設定される唯一の関連情報は要素のtimestampです。ウィンドウの終了タイムスタンプは排他的なため、これは処理されたウィンドウの最大許容タイムスタンプ、end timestamp - 1に設定されます。これは、イベント時間ウィンドウと処理時間ウィンドウの両方に当てはまることに注意してください。例えば、ウィンドウオペレーションの後で、要素には常にタイムスタンプがありますが、これはイベント時間のタイムスタンプまたは処理時間のタイムスタンプである可能性があります。処理時間ウィンドウの場合、これは特別な影響を及ぼしませんが、イベント時間ウィンドウの場合、ウォーターマークとウィンドウの相互作用と合わせて同じウィンドウサイズで連続したウィンドウ操作が可能になります。これについては後でウォーターマークとウィンドウがどのように相互作用するかを見てから説明します。
ウォーターマークとウィンドウのやり取り #
このセクションに進む前に、イベント時間とウォーターマークに関するセクションをご覧ください。
ウォーターマークがウィンドウオペレーションに到着すると、これは次の2つのことをトリガーします:
- ウォーターマークは、最大タイムスタンプ(end-timestamp - 1)が新しいウォーターマークより小さい全てのウィンドウの計算をトリガーします
- ウォーターマークは(そのまま)ダウンストリームオペレーションに転送されます
直感的には、ウォーターマークは、そのウォーターマークを受信すると、ダウンストリームオペレーションで遅れていると見なされる全てのウィンドウを"フラッシュ"します。
連続するウィンドウ化操作 #
前述したように、ウィンドウ処理されたタイムスタンプが計算される方法とウォーターマークがウィンドウと相互作用する方法により、連続するウィンドウ処理を繋ぎ合わせることができます。これは、異なるキーを使いながら同じアップストリームウィンドウの要素を同じダウンストリームウィンドウに配置したい場合に、2つの連続したウィンドウオペレーションを自国する場合に便利です。次の例を考えてみましょう:
DataStream<Integer> input = ...;
DataStream<Integer> resultsPerKey = input
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.reduce(new Summer());
DataStream<Integer> globalResults = resultsPerKey
.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
.process(new TopKWindowFunction());
val input: DataStream[Int] = ...
val resultsPerKey = input
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.reduce(new Summer())
val globalResults = resultsPerKey
.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
.process(new TopKWindowFunction())
input = ... # type: DataStream
results_per_key = input \
.key_by(<key selector>) \
.window(TumblingEventTimeWindows.of(Time.seconds(5))) \
.reduce(Summer())
global_results = results_per_key \
.window_all(TumblingProcessingTimeWindows.of(Time.seconds(5))) \
.process(TopKWindowFunction())
この例では、最初のオペレーションからのタイムウィンドウ[0, 5)
の結果は、後続のウィンドウオペレーションのタイムウィンドウ[0, 5)
にもなります。これにより、キーごとの合計を計算し、2番目のオペレーションで同じウィンドウ内の上位k個の要素を計算できます。
便利な状態サイズの考慮 #
ウィンドウは(日、週、あるいは月のような)長い期間定義されるかもしれません、従ってとても大きな状態を集約します。ウィンドウ計算のストレージ要求の見積をする時に覚えておく2,3のルールがあります。
-
Flinkは要素が所属するウィンドウごとに各要素の1つのコピーを作成します。こう考えると、タンブリングウィンドウは各要素ごとに1つのコピーを保持します(要素は後で削除されない限り確実に1つのウィンドウに所属します)。対称的に、スライディングウィンドウはウィンドウアサイナーの章で説明したように、各要素の幾つかを生成します。従って、サイズが1日でスライドが1秒のスライディング ウィンドウは良い考えかもしれません。
-
ReduceFunction
とAggregateFunction
は、要素を積極的に集約し、ウィンドウごとに1つの値を保存するため、ストレージ要件を大幅に削減できます。対照的に、ProcessWindowFunction
を使うだけでは、全ての要素を蓄積するする必要があります。 -
Evictor
を使うと、計算を適用する前にウィンドウの全ての要素をevictorを介して渡す必要があるため、事前集計が防止されます( Evictorsを参照してください)。