ウィンドウは無限のストリームの処理の中核です。ウィンドウはストリームを有限のサイズの “buckets” に分割します。計算をこれに適用することができます。このドキュメントはFlink内でどのようにウィンドウが動作するのか、それが提供する機能によりどのようにプログラマが最大限に恩恵を受けることができるのかについて注目します。
ウィンドウ化されたFlinkプログラムの一般的な構造は以下で紹介します。最初の断片はキー付けされたストリームを参照し、一方で2つ目は非キー付けされたものです。お分かりのように、違いはキー付けされたストリームの keyBy(...)
呼び出しと、非キー付けされたストリームのための windowAll(...)
になるwindow(...)
です。These is also going to serve as a roadmap for the rest of the page.
キー付けされたウィンドウ
stream
.keyBy(...) <- キー付け 隊 非キー付けウィンドウ
.window(...) <- 必須: "assigner"
[.trigger(...)] <- 任意: "trigger" (そうでなければデフォルトのトリガー)
[.evictor(...)] <- 任意: "evictor" (そうでなければ、非evictor)
[.allowedLateness()] <- 任意、そうでなければ0
.reduce/fold/apply() <- 必須: "function"
非キー付けされたウィンドウ
stream
.windowAll(...) <- 必須: "assigner"
[.trigger(...)] <- 任意: "trigger" (そうでなければデフォルトのトリガー)
[.evictor(...)] <- 任意: "evictor" (そうでなければ、非evictor)
[.allowedLateness()] <- 任意、そうでなければ0
.reduce/fold/apply() <- 必須: "function"
上記で角括弧 ([…]) 内のコマンドは任意です。これはFlinkを使って要望に最も一致するようにウィンドウ ロジックを多くの方法でカスタイズできることを明らかにします。
一言で言うと、ウィンドウへ所属すべき最初の要素が到着するとすぐにウィンドウが作成されます。時間(イベントあるいは処理移管)が終了のタイムスタンプとユーザ定義の許される遅延
(許される遅延)を経た後で完全に削除されます。Flink は時間ベースのウィンドウのみ削除を保証し、他の型、例えば グローバルウィンドウ(ウィンドウのアサイナーを見てください)についてはそうではありません。例えば、5分ごとの非オーバーラップ(あるいはタンブル)ウィンドウを生成し、1分の許される遅延を持つイベント時間ベースのウィンドウストラテジを使うと、この間隔に分類されるタイムスタンプを持つ最初の要素が到着した時に、Flink は12:00
と 12:05
の間隔のための新しいウィンドウを生成し、ウォーターマークが12:06
のタイムスタンプを経過した時にそれを削除するでしょう。
更に、各ウィンドウはそれに付属するTrigger
(Triggersを見てください) と関数 (WindowFunction
, ReduceFunction
あるいは FoldFunction
) (Window Functionsを見てください)を持つでしょう。関数はウィンドウの内容に適用される計算を含むでしょうが、一方で Trigger
はウィンドウに関数が適応される準備ができたと見なされる条件を指定します。トリガーのポリシーは “ウィンドウ内の要素の数が4以上の時”、あるいは “ウォーターマークがウィンドウの終了を過ぎた時”のような何かかもしれません。トリガーはウィンドウの生成と削除の間のいつでもウィンドウの内容を消去することを決めることもできます。この場合の消去はウィンドウ内の要素を参照し、ウィンドウのメタデータを参照しません。このことは新しいデータはそのウィンドウにまだ追加することができることを意味します。
上のことはさておき、発火した後で関数が適用される前および/後で要素をウィンドウから削除することができるEvictor
(Evictorsを見てください)を指定することができます。
以下で、上のコンポーネントのそれぞれについて詳細を調べます。任意のものに移動する前に、上の断片の必須部分(キー付け 対 非キー付けされたウィンドウ, ウィンドウのアサイナー および ウィンドウ関数を見てください)から始めます。
まず最初に指定することは、ストリームがキー付けされているかそうでないかのどちらかです。これはウィンドウを定義する前に行う必要があります。keyBy(...)
の使用は無限のストリームを論理的なキー付けされたストリームに分割します。もしkeyBy(...)
が呼ばれると、ストリームはキー付けされません。
キー付けされたストリームの場合、やってくるイベントの全ての属性はキー (詳しくはここ)として使うことができます。それぞれの論理的なきー付けされたストリームは残りに依存せず処理することができるため、キー付けされたストリームを持つことにより、ウィンドウの計算は複数のタスクによって並行して実施することができるでしょう。同じキーを参照する全ての要素は同じ並行タスクに送信されるでしょう。
非キー付けされたストリームの場合、もtのストリームは複数の論理ストリームに分割され、全てのウィンドウのロジックは1つのタスク、つまり 並行度1で、実施されるでしょう。
ストリームがキー付けあるいはそうでないかを指定した後は、次のステップは ウィンドウのアサイナーを決めることです。ウィンドウのアサイナーは要素がウィンドウにどのように割り当てられるかを定義します。window(...)
(keyed ストリームのため) あるいは windowAll()
(non-keyed ストリームのため) の呼び出しの中での選択のWindowAssigner
を指定することで行われます。
WindowAssigner
はそれぞれやってくる要素を1つ以上のウィンドウに割り当てる責任があります。Flinkは最も一般的な使い方、すなわち tumbling windows, sliding windows, session windows および global windowsのための事前定義されたウィンドウ アサイナーを付属します。WindowAssigner
クラスを拡張することで独自のウィンドウ アサイナーを実装することができます。全ての組み込みの(グローバルウィンドウを除く)ウィンドウ アサイナーは要素を時間に基づくウィンドウに割り当てます。時間は、処理時間あるいはイベント時間のどちらかです。処理時間とイベント時間の違いとタイムスタンプとウォーターマークがどうやって生成されるかについて学ぶには、イベント時間の章を見てください。
以下では、Flinkの事前定義されたウィンドウアサイナーがどのように動作するか、データストリームプログラム内でどのように使われるかを示します。以下の図は各アサイナーの作用を可視化します。紫色の園はストリームの要素を表します。これは何らかのキー(この場合user 1, user 2 と user 3) によって分割されます。x-軸は時間の進捗を示します。
タンブリング ウィンドウ アサイナーは各要素を指定されたウィンドウ サイズのウィンドウに割り当てます。タンブリング ウィンドウは固定のサイズを持ち、オーバーラップしません。例えば、もし5分のサイズのタンブリング ウィンドウを指定した場合、以下の図で示されるように各5分毎に現在のウィンドウが評価され、新しいウィンドウが開始されるでしょう。
以下のコードの断片はタンブリング ウィンドウを使う方法を示します。
DataStream<T> input = ...;
// tumbling event-time windows
input
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.<windowed transformation>(<window function>);
// tumbling processing-time windows
input
.keyBy(<key selector>)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.<windowed transformation>(<window function>);
// daily tumbling event-time windows offset by -8 hours.
input
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
.<windowed transformation>(<window function>);
val input: DataStream[T] = ...
// tumbling event-time windows
input
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.<windowed transformation>(<window function>)
// tumbling processing-time windows
input
.keyBy(<key selector>)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.<windowed transformation>(<window function>)
// daily tumbling event-time windows offset by -8 hours.
input
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
.<windowed transformation>(<window function>)
時間間隔はTime.milliseconds(x)
, Time.seconds(x)
, Time.minutes(x)
などのうちの1つを使って指定することができます。
最後の例で示したように、タンブリング ウィンドウはウィンドウの割り当てを変更するために使うことができる任意のoffset
パラメータも取ります。例えば、オフセット無しで1時間ごとのタンブリング ウィンドウは epochを使って割り当てられます。つまり、1:00:00.000 - 1:59:59.999
, 2:00:00.000 - 2:59:59.999
などのようなウィンドウを得るでしょう。変更したい場合はオフセットを与えることができます。例えば、15分のオフセットを使って、1:15:00.000 - 2:14:59.999
, 2:15:00.000 - 3:14:59.999
などを得るでしょう。オフセットの重要の使用例は、UTC-0以外のタイムゾーンにウィンドウを調整することです。例えば、中国ではTime.hours(-8)
のオフセットを指定する必要があるでしょう。
スライディング ウィンドウ アサイナーは要素を固定の長さのウィンドウに割り当てます。タンブリング ウィンドウのアサイナーと似て、ウィンドウのサイズはwindow sizeパラメータによって設定されます。追加のwindow slide パラメータはスライディング ウィンドウがどれだけの頻度で開始されるかを制御します。従って、もしスライドがウィンドウサイズより小さい場合は、スライディング ウィンドウはオーバーラップするかもしれません。この場合、要素は複数のウィンドウに割り当てられます。
例えば、5分スライドする10分のサイズのウィンドウを持つことができます。これを使うと、以下の図で描写されるように最後の10分間で到着するイベントを含む各5分毎のウィンドウを得ます。
以下のコードの断片はスライディング ウィンドウを使う方法を示します。
DataStream<T> input = ...;
// sliding event-time windows
input
.keyBy(<key selector>)
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.<windowed transformation>(<window function>);
// sliding processing-time windows
input
.keyBy(<key selector>)
.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.<windowed transformation>(<window function>);
// sliding processing-time windows offset by -8 hours
input
.keyBy(<key selector>)
.window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8)))
.<windowed transformation>(<window function>);
val input: DataStream[T] = ...
// sliding event-time windows
input
.keyBy(<key selector>)
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.<windowed transformation>(<window function>)
// sliding processing-time windows
input
.keyBy(<key selector>)
.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.<windowed transformation>(<window function>)
// sliding processing-time windows offset by -8 hours
input
.keyBy(<key selector>)
.window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8)))
.<windowed transformation>(<window function>)
時間間隔はTime.milliseconds(x)
, Time.seconds(x)
, Time.minutes(x)
などのうちの1つを使って指定することができます。
最後の例で示したように、スライディング ウィンドウはウィンドウの割り当てを変更するために使うことができる任意のoffset
パラメータも取ります。例えば、オフセット無しで30分スライドする1時間ごとのウィンドウは epochを使って割り当てられます。つまり、1:00:00.000 - 1:59:59.999
, 1:30:00.000 - 2:29:59.999
などのようなウィンドウを得るでしょう。変更したい場合はオフセットを与えることができます。例えば、15分のオフセットを使って、1:15:00.000 - 2:14:59.999
, 1:45:00.000 - 2:44:59.999
などを得るでしょう。オフセットの重要の使用例は、UTC-0以外のタイムゾーンにウィンドウを調整することです。例えば、中国ではTime.hours(-8)
のオフセットを指定する必要があるでしょう。
セッション ウィンドウ アサイナーは活動のセッションによって要素をグループ化します。セッション ウィンドウは、tumbling windows と sliding windowsとは対照的に、オーバーラップせず、固定の開始と終了時間を持ちません。代わりに、セッション ウィンドウはある期間要素を受け取らない時、つまり不活性の隙間が発生した時にセッションウィンドウを閉じます。セッション ウィンドウのアサイナーはどれだけの長さの不活性の期間が必要かを定義するsession gapを使って設定されます。この期間が過ぎると、現在のセッションは閉じられ、続く要素は新しいセッション ウィンドウに割り当てられます。
以下のコードの断片はセッション 宇lん同を使う方法を示します。
DataStream<T> input = ...;
// event-time session windows
input
.keyBy(<key selector>)
.window(EventTimeSessionWindows.withGap(Time.minutes(10)))
.<windowed transformation>(<window function>);
// processing-time session windows
input
.keyBy(<key selector>)
.window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
.<windowed transformation>(<window function>);
val input: DataStream[T] = ...
// event-time session windows
input
.keyBy(<key selector>)
.window(EventTimeSessionWindows.withGap(Time.minutes(10)))
.<windowed transformation>(<window function>)
// processing-time session windows
input
.keyBy(<key selector>)
.window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
.<windowed transformation>(<window function>)
時間間隔はTime.milliseconds(x)
, Time.seconds(x)
, Time.minutes(x)
などのうちの1つを使って指定することができます。
注意 セッション ウィンドウは固定の開始および終了を持たないため、それらはタンブリングおよびスライディング ウィンドウと異なる評価をされます。内部的には、セッション ウィンドウ オペレータはそれぞれやってくるレコードについて新しいウィンドウを作成し、もしお互いが定義された隙間より近い場合にはウィンドウを1つにマージします。マージ可能にするには、セッションウィンドウオペレータは、 ReduceFunction
あるいは WindowFunction
(FoldFunction
はマージすることができません)のようなマージする Trigger およびマージするWindow Functionを必要とします。
グローバルウィンドウ アサイナーは同じキーを使って全ての要素を同じ1つのグローバルウィンドウに割り当てます。このウィンドウのスキーマはもし独自のtriggerも指定した場合のみ有用です。そうでなければ、グローバルウィンドウは集約された要素を処理することができる自然な終了を持たないため、計算が行われないでしょう。
以下のコードの断片はグローバルウィンドウを使う方法を示します。
DataStream<T> input = ...;
input
.keyBy(<key selector>)
.window(GlobalWindows.create())
.<windowed transformation>(<window function>);
val input: DataStream[T] = ...
input
.keyBy(<key selector>)
.window(GlobalWindows.create())
.<windowed transformation>(<window function>)
ウィンドウのアサイナーを定義した後で、これらのウィンドウのそれぞれの上で行いたい計算を指定する必要があります。これはウィンドウ関数の責務です。一度システムがウィンドウは処理をする準備ができたと決定すると、ウィンドウ関数は各(おそらくキー付けされている)ウィンドウの要素を処理するために使われます。 (ウィンドウの準備ができたとどうやってFlinkが決定するかについてはtriggers を見てください)。
ウィンドウ関数はReduceFunction
, FoldFunction
あるいは WindowFunction
の1つかもしれません。Flinkは要素が到着した時に各ウィンドウについて要素を逐次集約することができるため、最初の2つはより効率的に実行することができます(State Sizeの章をみてください)。WindowFunction
は、ウィンドウに含まれる全ての要素についてのIterable
と、要素が所属するウィンドウについての追加のメタ情報を取得します。
Flinkは関数を起動する前に内部的にウィンドウのために全ての要素をバッファする必要があるため、WindowFunction
を使ったウィンドウ化された変換は他のクラスのようには効率的に実行することができません。これは、WindowFunction
と、ウィンドウ要素の逐次集約とWindowFunction
が受信する追加のウィンドウのメタデータの両方を取得するためにReduceFunction
あるいは FoldFunction
を組み合わせることで緩和することができます。これらの変種のそれぞれについての例を見てみましょう。
ReduceFunction
は、同じ型の出力要素を生成するために入力からの2つの要素がどのように組み合わされるかを指定します。Flinkは逐次ウィンドウの要素を集約するためにReduceFunction
を使います。
ReduceFunction
は以下のように定義し使うことができます:
DataStream<Tuple2<String, Long>> input = ...;
input
.keyBy(<key selector>)
.window(<window assigner>)
.reduce(new ReduceFunction<Tuple2<String, Long>> {
public Tuple2<String, Long> reduce(Tuple2<String, Long> v1, Tuple2<String, Long> v2) {
return new Tuple2<>(v1.f0, v1.f1 + v2.f1);
}
});
val input: DataStream[(String, Long)] = ...
input
.keyBy(<key selector>)
.window(<window assigner>)
.reduce { (v1, v2) => (v1._1, v1._2 + v2._2) }
上の例はウィンドウ内の全ての要素についてタプルの2つ目のフィールドを合計します。
FoldFunction
はウィンドウの入力要素をどのように出力の要素の型と組み合わせるかを指定します。FoldFunction
はウィンドウと現在の出力の値に追加される要素ごとに逐次呼び出されます。最初の要素は出力型の事前定義された初期値と組み合わされます。
FoldFunction
は以下のように定義し使うことができます:
DataStream<Tuple2<String, Long>> input = ...;
input
.keyBy(<key selector>)
.window(<window assigner>)
.fold("", new FoldFunction<Tuple2<String, Long>, String>> {
public String fold(String acc, Tuple2<String, Long> value) {
return acc + value.f1;
}
});
val input: DataStream[(String, Long)] = ...
input
.keyBy(<key selector>)
.window(<window assigner>)
.fold("") { (acc, v) => acc + v._2 }
上の例は全ての入力のLong
値を初期値が空のString
に追加します。
注意 fold()
はセッション ウィンドウあるいは他のマージ可能なウィンドウと一緒に使うことができます。
WindowFunction
はウィンドウの全ての要素を含むIterable
を取得し、全てのウィンドウ関数で最も柔軟性を提供します。要素は逐次集約することができないが、ウィンドウは処理ができる準備ができたと見なされるまで内部的にバッファする必要があるため、これはパフォーマンスとリソースの消費を犠牲にして行われます。
WindowFunction
のシグネチャーは以下のように見えます:
public interface WindowFunction<IN, OUT, KEY, W extends Window> extends Function, Serializable {
/**
* Evaluates the window and outputs none or several elements.
*
* @param key The key for which this window is evaluated.
* @param window The window that is being evaluated.
* @param input The elements in the window being evaluated.
* @param out A collector for emitting elements.
*
* @throws Exception The function may throw exceptions to fail the program and trigger recovery.
*/
void apply(KEY key, W window, Iterable<IN> input, Collector<OUT> out) throws Exception;
}
trait WindowFunction[IN, OUT, KEY, W <: Window] extends Function with Serializable {
/**
* Evaluates the window and outputs none or several elements.
*
* @param key The key for which this window is evaluated.
* @param window The window that is being evaluated.
* @param input The elements in the window being evaluated.
* @param out A collector for emitting elements.
* @throws Exception The function may throw exceptions to fail the program and trigger recovery.
*/
def apply(key: KEY, window: W, input: Iterable[IN], out: Collector[OUT])
}
WindowFunction
は以下のように定義され使うことができます:
DataStream<Tuple2<String, Long>> input = ...;
input
.keyBy(<key selector>)
.window(<window assigner>)
.apply(new MyWindowFunction());
/* ... */
public class MyWindowFunction implements WindowFunction<Tuple<String, Long>, String, String, TimeWindow> {
void apply(String key, TimeWindow window, Iterable<Tuple<String, Long>> input, Collector<String> out) {
long count = 0;
for (Tuple<String, Long> in: input) {
count++;
}
out.collect("Window: " + window + "count: " + count);
}
}
val input: DataStream[(String, Long)] = ...
input
.keyBy(<key selector>)
.window(<window assigner>)
.apply(new MyWindowFunction())
/* ... */
class MyWindowFunction extends WindowFunction[(String, Long), String, String, TimeWindow] {
def apply(key: String, window: TimeWindow, input: Iterable[(String, Long)], out: Collector[String]): () = {
var count = 0L
for (in <- input) {
count = count + 1
}
out.collect(s"Window $window count: $count")
}
}
例はウィンドウ内の要素をカウントするWindowFunction
を示します。更に、ウィンドウ関数は出力にウィンドウについての情報を追加します。
注意 カウントのような単純な集約のためにWindowFunction
を使うことはとても非効率なことに注意してください。次の章は、逐次集約とWindowFunction
の追加された情報の両方を取得するために、どうやってReduceFunction
をWindowFunction
と組み合わせることができるかを示します。
WindowFunction
を適切な場所で使うために、ProcessWindowFunction
を使うこともできます。これは、インタフェースによりウィンドウの評価が起こる場所でのコンテキストについてのより多くの情報をクエリすることができるという点を除いて、WindowFunction
にとても似ています。
これがProcessWindowFunction
インタフェースです:
public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> implements Function {
/**
* Evaluates the window and outputs none or several elements.
*
* @param key The key for which this window is evaluated.
* @param context The context in which the window is being evaluated.
* @param elements The elements in the window being evaluated.
* @param out A collector for emitting elements.
*
* @throws Exception The function may throw exceptions to fail the program and trigger recovery.
*/
public abstract void process(
KEY key,
Context context,
Iterable<IN> elements,
Collector<OUT> out) throws Exception;
/**
* The context holding window metadata
*/
public abstract class Context {
/**
* @return The window that is being evaluated.
*/
public abstract W window();
}
}
abstract class ProcessWindowFunction[IN, OUT, KEY, W <: Window] extends Function {
/**
* Evaluates the window and outputs none or several elements.
*
* @param key The key for which this window is evaluated.
* @param context The context in which the window is being evaluated.
* @param elements The elements in the window being evaluated.
* @param out A collector for emitting elements.
* @throws Exception The function may throw exceptions to fail the program and trigger recovery.
*/
@throws[Exception]
def process(
key: KEY,
context: Context,
elements: Iterable[IN],
out: Collector[OUT])
/**
* The context holding window metadata
*/
abstract class Context {
/**
* @return The window that is being evaluated.
*/
def window: W
}
}
このように使うことができます:
DataStream<Tuple2<String, Long>> input = ...;
input
.keyBy(<key selector>)
.window(<window assigner>)
.process(new MyProcessWindowFunction());
val input: DataStream[(String, Long)] = ...
input
.keyBy(<key selector>)
.window(<window assigner>)
.process(new MyProcessWindowFunction())
WindowFunction
はウィンドウ内に要素が来る時に逐次的に集約するために、ReduceFunction
あるいは FoldFunction
を組み合わせることができます。ウィンドウが閉じたときに、WindowFunction
が集約した結果と共に提供されるでしょう。これによりWindowFunction
の追加のウィンドウメタ情報にアクセスしながら、逐次的にウィンドウを計算することができます。
注意 逐次ウィンドウ集約のためにWindowFunction
の代わりにProcessWindowFunction
を使うことができます。
以下の例は、FoldFunction
をウィンドウ内のイベントの数を抽出しキーとウィンドウの終了時間も返すためにWindowFunction
と組み合わせる方法を示します。
DataStream<SensorReading> input = ...;
input
.keyBy(<key selector>)
.timeWindow(<window assigner>)
.fold(new Tuple3<String, Long, Integer>("",0L, 0), new MyFoldFunction(), new MyWindowFunction())
// Function definitions
private static class MyFoldFunction
implements FoldFunction<SensorReading, Tuple3<String, Long, Integer> > {
public Tuple3<String, Long, Integer> fold(Tuple3<String, Long, Integer> acc, SensorReading s) {
Integer cur = acc.getField(2);
acc.setField(2, cur + 1);
return acc;
}
}
private static class MyWindowFunction
implements WindowFunction<Tuple3<String, Long, Integer>, Tuple3<String, Long, Integer>, String, TimeWindow> {
public void apply(String key,
TimeWindow window,
Iterable<Tuple3<String, Long, Integer>> counts,
Collector<Tuple3<String, Long, Integer>> out) {
Integer count = counts.iterator().next().getField(2);
out.collect(new Tuple3<String, Long, Integer>(key, window.getEnd(),count));
}
}
val input: DataStream[SensorReading] = ...
input
.keyBy(<key selector>)
.timeWindow(<window assigner>)
.fold (
("", 0L, 0),
(acc: (String, Long, Int), r: SensorReading) => { ("", 0L, acc._3 + 1) },
( key: String,
window: TimeWindow,
counts: Iterable[(String, Long, Int)],
out: Collector[(String, Long, Int)] ) =>
{
val count = counts.iterator.next()
out.collect((key, window.getEnd, count._3))
}
)
以下の例は、逐次ReduceFunction
をウィンドウの開始時間と一緒にウィンドウ内での最小のイベントを返すためにWindowFunction
と組み合わせる方法を示します。
DataStream<SensorReading> input = ...;
input
.keyBy(<key selector>)
.timeWindow(<window assigner>)
.reduce(new MyReduceFunction(), new MyWindowFunction());
// Function definitions
private static class MyReduceFunction implements ReduceFunction<SensorReading> {
public SensorReading reduce(SensorReading r1, SensorReading r2) {
return r1.value() > r2.value() ? r2 : r1;
}
}
private static class MyWindowFunction
implements WindowFunction<SensorReading, Tuple2<Long, SensorReading>, String, TimeWindow> {
public void apply(String key,
TimeWindow window,
Iterable<SensorReading> minReadings,
Collector<Tuple2<Long, SensorReading>> out) {
SensorReading min = minReadings.iterator().next();
out.collect(new Tuple2<Long, SensorReading>(window.getStart(), min));
}
}
val input: DataStream[SensorReading] = ...
input
.keyBy(<key selector>)
.timeWindow(<window assigner>)
.reduce(
(r1: SensorReading, r2: SensorReading) => { if (r1.value > r2.value) r2 else r1 },
( key: String,
window: TimeWindow,
minReadings: Iterable[SensorReading],
out: Collector[(Long, SensorReading)] ) =>
{
val min = minReadings.iterator.next()
out.collect((window.getStart, min))
}
)
Trigger
はウィンドウ(ウィンドウ アサイナーによって形成)がいつwindow functionによって処理される準備ができるかを決定します。各WindowAssigner
はデフォルトの Trigger
が付属します。もしデフォルトのトリガーが需要に合わない場合は、trigger(...)
を使って独自のトリガーを指定することができます。
トリガーのインタフェースはTrigger
が異なるイベントに反応するための5つのメソッドを持ちます:
onElement()
メソッドはウィンドウに追加された各要素について呼ばれます。onEventTime()
メソッドは登録されたイベント時間のタイマーが起動したときに呼ばれます。onProcessingTime()
メソッドは登録された処理時間のタイマーが起動したときに呼ばれます。onMerge()
メソッドはstatefulトリガーに関係があり、それらの対応するウィンドウがマージされる時に2つのトリガーの状態をマージします。例えば セッション ウィンドウを使っている時。clear()
メソッドは対応するウィンドウの削除時に必要なアクションを実施します。上のメソッドについて知っておくべき2つのことがあります:
1)最初の3つは返ってくるTriggerResult
によって発動イベントにどうやって振舞うかを決定します。アクションは以下のうちの1つです:
CONTINUE
: 何もしない、FIRE
: 計算を起動する、PURGE
: ウィンドウ内の要素を削除する、そしてFIRE_AND_PURGE
: 計算を発動しウィンドウ内の後のウィンドウ内の要素を削除する。2)これらのメソッドのいずれも将来のアクションのために処理-あるいはイベント-時間のタイマーを登録することができます。
一旦トリガーがウィンドウが処理をする準備ができたと決定すると、それを起動します。つまりFIRE
あるいは FIRE_AND_PURGE
を返します。これはウィンドウオペレータが現在のウィンドウの結果を発行するシグナルです。WindowFunction
を持つウィンドウを仮定すると、全ての要素は (おそらくそれらをevictorに渡した後で)WindowFunction
に渡されます。FoldFunction
のReduceFunction
を持つウィンドウは単純にそれらの貪欲に集約された結果を発行します。
When a trigger fires, it can either FIRE
or FIRE_AND_PURGE
. FIRE
はウィンドウの内容を保持しますが、FIRE_AND_PURGE
は内容を削除します。デフォルトでは、事前実装されたトリガーはウィンドウの状態の消去無しに単純にFIRE
します。
注意 消去は単純にウィンドウの内容を削除し、ウィンドウについての一時的なメタ情報をそのままにし、トリガーの状態をそのままにするでしょう。
WindowAssigner
のデフォルトのTrigger
は多くのユーザケースにとって適切です。例えば、全てのイベント時間のウィンドウのアサイナーはデフォルトのトリガーとしてEventTimeTrigger
を持ちます。このトリガーは、ウォーターマークがウィンドウの終端を通ると単純に起動します。
注意 GlobalWindow
のデフォルトのトリガーは決して起動しないNeverTrigger
です。結果として、GlobalWindow
を使う場合は独自のトリガーを常に定義する必要があります。
注意 trigger()
を使ってトリガーを指定することで、WindowAssigner
のデフォルトのトリガーを上書きします。例えば、TumblingEventTimeWindows
についてCountTrigger
を指定すると、時間の進捗に基づくウィンドウトリガーを二度と得ることはありませんが、カウントによるもののみ得るでしょう。今は、時間とカウントの両方に基づく反応が欲しい場合は、独自のトリガーを書く必要があります。
Flinkは2,3の組み込みのトリガーを付属します。
EventTimeTrigger
はウォーターマークによって計測されるイベント時間の進捗に基づいて起動します。ProcessingTimeTrigger
は処理時間に基づいて起動します。CountTrigger
は一度ウィンドウ内の要素の数が指定された制限を超えると起動します。PurgingTrigger
は引数として他のトリガーを取り、それを削除されるものに変換します。独自のトリガーを実装しなければならない場合、abstractTrigger クラスを調べる必要があります。APIはまだ開発中でFlinkの将来のバージョンでは変わるかも知れないことに注意してください。
FlinkのウィンドウモデルはWindowAssigner
と Trigger
に加えて、任意のEvictor
を指定することができます。これはevictor(...)
メソッドを使って行うことができます (この文章の最初で示されました)。evictor はトリガーが起動した後で、ウィンドウ関数の適用の前 および/あるいは 後でウィンドウから要素を削除することができます。そうするためにEvictor
インタフェースは2つのメソッドを持ちます:
/**
* Optionally evicts elements. Called before windowing function.
*
* @param elements The elements currently in the pane.
* @param size The current number of elements in the pane.
* @param window The {@link Window}
* @param evictorContext The context for the Evictor
*/
void evictBefore(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);
/**
* Optionally evicts elements. Called after windowing function.
*
* @param elements The elements currently in the pane.
* @param size The current number of elements in the pane.
* @param window The {@link Window}
* @param evictorContext The context for the Evictor
*/
void evictAfter(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);
evictBefore()
はウィンドウ関数の前に適用される退去ロジックを含み、一方でevictAfter()
はウィンドウ関数の後で適用されるものを含みます。ウィンドウ関数の適用前に退去された要素は、それによって処理されないでしょう。
Flink は3つの事前実装されたevictorを付属します。それらは:
CountEvictor
: ウィンドウからのユーザ定義の数までの要素を保持し、ウィンドウのバッファの前から残っているものを破棄します。DeltaEvictor
: DeltaFunction
と threshold
を取り、ウィンドウバッファ内と残っているもののそれぞれから最後の要素間のデルタを計算し、デルタが閾値以上のものを削除します。TimeEvictor
: 指定されたウィンドウについて引数としてミリ秒単位のinterval
を取り、 要素内から最大のタイムスタンプmax_ts
を見つけ、max_ts - interval
より小さいタイムスタンプを持つ全ての要素を削除します。Default デフォルトでは、全ての事前実装されたevictorはウィンドウ関数の前にそれらのロジックを適用します。
注意 ウィンドウの全ての要素は計算の前にevictorを通る必要があるため、evictorの指定はいかなる事前集約も妨げます。
注意 Flinkはウィンドウ内の要素の順番について何も保証を提供しません。このことは、evictorがウィンドウの最初から要素を削除するかもしれないが、それらは最初あるいは最後に到着するものである必要は無いことを意味します。
イベント時間 ウィンドウと連携する時、要素が遅れて、つまり Flinkがイベント時間の進捗を追跡するために使用するウォーターマークは要素が所属するウィンドウの終了のタイムスタンプを既に過ぎている、到着することがありえます。Flinkがイベント時間をどう扱うかのより多くの議論については、イベント時間 と特に遅れた要素 を見てください。
デフォルトでは、遅れた要素はウォーターマークがウィンドウの終了を過ぎた時に削ります。しかし、Flinkはウィンドウオペレータが最大の許される遅延を指定することを許可します。許される遅延は、それらが削られる前に要素はどれだけの時間遅延することができるかを指定します。デフォルトの値は0です。ウォーターマークがウィンドウの終了を超えた後に到着したが、ウィンドウの終了プラス許される遅延の前に到着した要素は、まだウィンドウに追加されます。使用されるトリガーに依存して、遅れたが削られない要素はウィンドウを再び起動するかもしれません。これはEventTimeTrigger
の場合のことです。
これを動作するために、Flinkはそれらの許される遅延が期限切れになるまでウィンドウの状態を保持します。これがいったん起きると、ウィンドウのライフサイクルの章でも説明されるように、Flinkはウィンドウを削除し、状態を削除します。
デフォルト デフォルトで、許される遅延は0
に設定されます。つまり、ウォーターマークの後で到着する要素は削られるでしょう。
以下のように許される遅延を指定することができます:
DataStream<T> input = ...;
input
.keyBy(<key selector>)
.window(<window assigner>)
.allowedLateness(<time>)
.<windowed transformation>(<window function>);
val input: DataStream[T] = ...
input
.keyBy(<key selector>)
.window(<window assigner>)
.allowedLateness(<time>)
.<windowed transformation>(<window function>)
注意 GlobalWindows
ウィンドウ アサイナーを使う場合、グローバルウィンドウの終了のタイムスタンプはLong.MAX_VALUE
のため、どのデータもそうとは見なされません。
許される遅延を0より大きく設定する場合、ウォーターマークがウィンドウの終了を過ぎた後で、内容と一緒にウィンドウが保持されます。この場合、起きれたが削られていない要素が到着した時に、ウィンドウについての他の起動を引き起こすかもしれません。ウィンドウの最初の起動のmain firing
に対して、それらは遅れたイベントによって起動されるため、これらの起動はlate firings
と呼ばれます。セッションウィンドウの場合、遅れた起動は2つの既存のマージされていないウィンドウ間の隙間に“橋を架ける”かもしれないため、遅れた起動はウィンドウの更なるマージに繋がるかもしれません。
注意 late fireringで発行された要素は前の計算の更新された結果として扱われなければならないことに注意してください。つまり、データストリームは同じ計算の複数の結果を含むかもしれません。アプリケーションに依存して、これらの重複した結果を考慮あるいは1つにする必要があります。
ウィンドウは(日、週、あるいは月のような)長い期間定義されるかもしれません、従ってとても大きな状態を集約します。ウィンドウ計算のストレージ要求の見積をする時に覚えておく2,3のルールがあります。
Flinkは要素が所属するウィンドウごとに各要素の1つのコピーを作成します。こう考えると、タンブリングウィンドウは各要素ごとに1つのコピーを保持します(要素は後で削除されない限り確実に1つのウィンドウに所属します)。対称的に、スライディング ウィンドウはウィンドウ アサイナーの章で説明したように、各要素の幾つかを生成します。従って、サイズが1日でスライドが1秒のスライディング ウィンドウは良い考えかもしれません。
FoldFunction
と ReduceFunction
は、それらは貪欲に要素を集約しウィンドウごとに1つの値のみを格納するため、ストレージ要求を極めて削減することができます。対称的に、単純にWindowFunction
を使うと全ての要素を集めることを必要とします。
ウィンドウの全ての要素は計算の前にevictorを通る必要があるため、evictorの使用はいかなる事前集約も妨ぎます(Evictorsを見てください)。