ウィンドウ

ウィンドウは無限のストリームの処理の中核です。ウィンドウはストリームを有限のサイズの “buckets” に分割します。計算をこれに適用することができます。このドキュメントはFlink内でどのようにウィンドウが動作するのか、それが提供する機能によりどのようにプログラマが最大限に恩恵を受けることができるのかについて注目します。

ウィンドウ化されたFlinkプログラムの一般的な構造は以下で紹介します。最初の断片はキー付けされたストリームを参照し、一方で2つ目は非キー付けされたものです。お分かりのように、違いはキー付けされたストリームの keyBy(...) 呼び出しと、非キー付けされたストリームのための windowAll(...)になるwindow(...)です。These is also going to serve as a roadmap for the rest of the page.

キー付けされたウィンドウ

stream
       .keyBy(...)          <-  キー付け 隊 非キー付けウィンドウ
       .window(...)         <-  必須: "assigner"
      [.trigger(...)]       <-  任意: "trigger" (そうでなければデフォルトのトリガー)
      [.evictor(...)]       <-  任意: "evictor" (そうでなければ、非evictor)
      [.allowedLateness()]  <-  任意、そうでなければ0
       .reduce/fold/apply() <-  必須: "function"

非キー付けされたウィンドウ

stream
       .windowAll(...)      <-  必須: "assigner"
      [.trigger(...)]       <-  任意: "trigger" (そうでなければデフォルトのトリガー)
      [.evictor(...)]       <-  任意: "evictor" (そうでなければ、非evictor)
      [.allowedLateness()]  <-  任意、そうでなければ0
       .reduce/fold/apply() <-  必須: "function"

上記で角括弧 ([…]) 内のコマンドは任意です。これはFlinkを使って要望に最も一致するようにウィンドウ ロジックを多くの方法でカスタイズできることを明らかにします。

ウィンドウの寿命

一言で言うと、ウィンドウへ所属すべき最初の要素が到着するとすぐにウィンドウが作成されます。時間(イベントあるいは処理移管)が終了のタイムスタンプとユーザ定義の許される遅延 (許される遅延)を経た後で完全に削除されます。Flink は時間ベースのウィンドウのみ削除を保証し、他の型、例えば グローバルウィンドウ(ウィンドウのアサイナーを見てください)についてはそうではありません。例えば、5分ごとの非オーバーラップ(あるいはタンブル)ウィンドウを生成し、1分の許される遅延を持つイベント時間ベースのウィンドウストラテジを使うと、この間隔に分類されるタイムスタンプを持つ最初の要素が到着した時に、Flink は12:0012:05 の間隔のための新しいウィンドウを生成し、ウォーターマークが12:06 のタイムスタンプを経過した時にそれを削除するでしょう。

更に、各ウィンドウはそれに付属するTrigger (Triggersを見てください) と関数 (WindowFunction, ReduceFunction あるいは FoldFunction) (Window Functionsを見てください)を持つでしょう。関数はウィンドウの内容に適用される計算を含むでしょうが、一方で Trigger はウィンドウに関数が適応される準備ができたと見なされる条件を指定します。トリガーのポリシーは “ウィンドウ内の要素の数が4以上の時”、あるいは “ウォーターマークがウィンドウの終了を過ぎた時”のような何かかもしれません。トリガーはウィンドウの生成と削除の間のいつでもウィンドウの内容を消去することを決めることもできます。この場合の消去はウィンドウ内の要素を参照し、ウィンドウのメタデータを参照しません。このことは新しいデータはそのウィンドウにまだ追加することができることを意味します。

上のことはさておき、発火した後で関数が適用される前および/後で要素をウィンドウから削除することができるEvictor (Evictorsを見てください)を指定することができます。

以下で、上のコンポーネントのそれぞれについて詳細を調べます。任意のものに移動する前に、上の断片の必須部分(キー付け 対 非キー付けされたウィンドウ, ウィンドウのアサイナー および ウィンドウ関数を見てください)から始めます。

キー付け vs 非キー付けされたウィンドウ

まず最初に指定することは、ストリームがキー付けされているかそうでないかのどちらかです。これはウィンドウを定義する前に行う必要があります。keyBy(...) の使用は無限のストリームを論理的なキー付けされたストリームに分割します。もしkeyBy(...)が呼ばれると、ストリームはキー付けされません。

キー付けされたストリームの場合、やってくるイベントの全ての属性はキー (詳しくはここ)として使うことができます。それぞれの論理的なきー付けされたストリームは残りに依存せず処理することができるため、キー付けされたストリームを持つことにより、ウィンドウの計算は複数のタスクによって並行して実施することができるでしょう。同じキーを参照する全ての要素は同じ並行タスクに送信されるでしょう。

非キー付けされたストリームの場合、もtのストリームは複数の論理ストリームに分割され、全てのウィンドウのロジックは1つのタスク、つまり 並行度1で、実施されるでしょう。

ウィンドウのアサイナ

ストリームがキー付けあるいはそうでないかを指定した後は、次のステップは ウィンドウのアサイナーを決めることです。ウィンドウのアサイナーは要素がウィンドウにどのように割り当てられるかを定義します。window(...) (keyed ストリームのため) あるいは windowAll() (non-keyed ストリームのため) の呼び出しの中での選択のWindowAssignerを指定することで行われます。

WindowAssigner はそれぞれやってくる要素を1つ以上のウィンドウに割り当てる責任があります。Flinkは最も一般的な使い方、すなわち tumbling windows, sliding windows, session windows および global windowsのための事前定義されたウィンドウ アサイナーを付属します。WindowAssigner クラスを拡張することで独自のウィンドウ アサイナーを実装することができます。全ての組み込みの(グローバルウィンドウを除く)ウィンドウ アサイナーは要素を時間に基づくウィンドウに割り当てます。時間は、処理時間あるいはイベント時間のどちらかです。処理時間とイベント時間の違いとタイムスタンプとウォーターマークがどうやって生成されるかについて学ぶには、イベント時間の章を見てください。

以下では、Flinkの事前定義されたウィンドウアサイナーがどのように動作するか、データストリームプログラム内でどのように使われるかを示します。以下の図は各アサイナーの作用を可視化します。紫色の園はストリームの要素を表します。これは何らかのキー(この場合user 1, user 2user 3) によって分割されます。x-軸は時間の進捗を示します。

タンブリング ウィンドウ

タンブリング ウィンドウ アサイナーは各要素を指定されたウィンドウ サイズのウィンドウに割り当てます。タンブリング ウィンドウは固定のサイズを持ち、オーバーラップしません。例えば、もし5分のサイズのタンブリング ウィンドウを指定した場合、以下の図で示されるように各5分毎に現在のウィンドウが評価され、新しいウィンドウが開始されるでしょう。

以下のコードの断片はタンブリング ウィンドウを使う方法を示します。

DataStream<T> input = ...;

// tumbling event-time windows
input
    .keyBy(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
    .<windowed transformation>(<window function>);

// tumbling processing-time windows
input
    .keyBy(<key selector>)
    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    .<windowed transformation>(<window function>);

// daily tumbling event-time windows offset by -8 hours.
input
    .keyBy(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
    .<windowed transformation>(<window function>);
val input: DataStream[T] = ...

// tumbling event-time windows
input
    .keyBy(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
    .<windowed transformation>(<window function>)

// tumbling processing-time windows
input
    .keyBy(<key selector>)
    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    .<windowed transformation>(<window function>)

// daily tumbling event-time windows offset by -8 hours.
input
    .keyBy(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
    .<windowed transformation>(<window function>)

時間間隔はTime.milliseconds(x), Time.seconds(x), Time.minutes(x) などのうちの1つを使って指定することができます。

最後の例で示したように、タンブリング ウィンドウはウィンドウの割り当てを変更するために使うことができる任意のoffset パラメータも取ります。例えば、オフセット無しで1時間ごとのタンブリング ウィンドウは epochを使って割り当てられます。つまり、1:00:00.000 - 1:59:59.999, 2:00:00.000 - 2:59:59.999 などのようなウィンドウを得るでしょう。変更したい場合はオフセットを与えることができます。例えば、15分のオフセットを使って、1:15:00.000 - 2:14:59.999, 2:15:00.000 - 3:14:59.999 などを得るでしょう。オフセットの重要の使用例は、UTC-0以外のタイムゾーンにウィンドウを調整することです。例えば、中国ではTime.hours(-8)のオフセットを指定する必要があるでしょう。

スライディング ウィンドウ

スライディング ウィンドウ アサイナーは要素を固定の長さのウィンドウに割り当てます。タンブリング ウィンドウのアサイナーと似て、ウィンドウのサイズはwindow sizeパラメータによって設定されます。追加のwindow slide パラメータはスライディング ウィンドウがどれだけの頻度で開始されるかを制御します。従って、もしスライドがウィンドウサイズより小さい場合は、スライディング ウィンドウはオーバーラップするかもしれません。この場合、要素は複数のウィンドウに割り当てられます。

例えば、5分スライドする10分のサイズのウィンドウを持つことができます。これを使うと、以下の図で描写されるように最後の10分間で到着するイベントを含む各5分毎のウィンドウを得ます。

以下のコードの断片はスライディング ウィンドウを使う方法を示します。

DataStream<T> input = ...;

// sliding event-time windows
input
    .keyBy(<key selector>)
    .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
    .<windowed transformation>(<window function>);

// sliding processing-time windows
input
    .keyBy(<key selector>)
    .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
    .<windowed transformation>(<window function>);

// sliding processing-time windows offset by -8 hours
input
    .keyBy(<key selector>)
    .window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8)))
    .<windowed transformation>(<window function>);
val input: DataStream[T] = ...

// sliding event-time windows
input
    .keyBy(<key selector>)
    .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
    .<windowed transformation>(<window function>)

// sliding processing-time windows
input
    .keyBy(<key selector>)
    .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
    .<windowed transformation>(<window function>)

// sliding processing-time windows offset by -8 hours
input
    .keyBy(<key selector>)
    .window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8)))
    .<windowed transformation>(<window function>)

時間間隔はTime.milliseconds(x), Time.seconds(x), Time.minutes(x) などのうちの1つを使って指定することができます。

最後の例で示したように、スライディング ウィンドウはウィンドウの割り当てを変更するために使うことができる任意のoffset パラメータも取ります。例えば、オフセット無しで30分スライドする1時間ごとのウィンドウは epochを使って割り当てられます。つまり、1:00:00.000 - 1:59:59.999, 1:30:00.000 - 2:29:59.999 などのようなウィンドウを得るでしょう。変更したい場合はオフセットを与えることができます。例えば、15分のオフセットを使って、1:15:00.000 - 2:14:59.999, 1:45:00.000 - 2:44:59.999 などを得るでしょう。オフセットの重要の使用例は、UTC-0以外のタイムゾーンにウィンドウを調整することです。例えば、中国ではTime.hours(-8)のオフセットを指定する必要があるでしょう。

セッション ウィンドウ

セッション ウィンドウ アサイナーは活動のセッションによって要素をグループ化します。セッション ウィンドウは、tumbling windowssliding windowsとは対照的に、オーバーラップせず、固定の開始と終了時間を持ちません。代わりに、セッション ウィンドウはある期間要素を受け取らない時、つまり不活性の隙間が発生した時にセッションウィンドウを閉じます。セッション ウィンドウのアサイナーはどれだけの長さの不活性の期間が必要かを定義するsession gapを使って設定されます。この期間が過ぎると、現在のセッションは閉じられ、続く要素は新しいセッション ウィンドウに割り当てられます。

以下のコードの断片はセッション 宇lん同を使う方法を示します。

DataStream<T> input = ...;

// event-time session windows
input
    .keyBy(<key selector>)
    .window(EventTimeSessionWindows.withGap(Time.minutes(10)))
    .<windowed transformation>(<window function>);

// processing-time session windows
input
    .keyBy(<key selector>)
    .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
    .<windowed transformation>(<window function>);
val input: DataStream[T] = ...

// event-time session windows
input
    .keyBy(<key selector>)
    .window(EventTimeSessionWindows.withGap(Time.minutes(10)))
    .<windowed transformation>(<window function>)

// processing-time session windows
input
    .keyBy(<key selector>)
    .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
    .<windowed transformation>(<window function>)

時間間隔はTime.milliseconds(x), Time.seconds(x), Time.minutes(x) などのうちの1つを使って指定することができます。

注意 セッション ウィンドウは固定の開始および終了を持たないため、それらはタンブリングおよびスライディング ウィンドウと異なる評価をされます。内部的には、セッション ウィンドウ オペレータはそれぞれやってくるレコードについて新しいウィンドウを作成し、もしお互いが定義された隙間より近い場合にはウィンドウを1つにマージします。マージ可能にするには、セッションウィンドウオペレータは、 ReduceFunction あるいは WindowFunction (FoldFunctionはマージすることができません)のようなマージする Trigger およびマージするWindow Functionを必要とします。

グローバル ウィンドウ

グローバルウィンドウ アサイナーは同じキーを使って全ての要素を同じ1つのグローバルウィンドウに割り当てます。このウィンドウのスキーマはもし独自のtriggerも指定した場合のみ有用です。そうでなければ、グローバルウィンドウは集約された要素を処理することができる自然な終了を持たないため、計算が行われないでしょう。

以下のコードの断片はグローバルウィンドウを使う方法を示します。

DataStream<T> input = ...;

input
    .keyBy(<key selector>)
    .window(GlobalWindows.create())
    .<windowed transformation>(<window function>);
val input: DataStream[T] = ...

input
    .keyBy(<key selector>)
    .window(GlobalWindows.create())
    .<windowed transformation>(<window function>)

ウィンドウ関数

ウィンドウのアサイナーを定義した後で、これらのウィンドウのそれぞれの上で行いたい計算を指定する必要があります。これはウィンドウ関数の責務です。一度システムがウィンドウは処理をする準備ができたと決定すると、ウィンドウ関数は各(おそらくキー付けされている)ウィンドウの要素を処理するために使われます。 (ウィンドウの準備ができたとどうやってFlinkが決定するかについてはtriggers を見てください)。

ウィンドウ関数はReduceFunction, FoldFunction あるいは WindowFunctionの1つかもしれません。Flinkは要素が到着した時に各ウィンドウについて要素を逐次集約することができるため、最初の2つはより効率的に実行することができます(State Sizeの章をみてください)。WindowFunction は、ウィンドウに含まれる全ての要素についてのIterableと、要素が所属するウィンドウについての追加のメタ情報を取得します。

Flinkは関数を起動する前に内部的にウィンドウのために全ての要素をバッファする必要があるため、WindowFunctionを使ったウィンドウ化された変換は他のクラスのようには効率的に実行することができません。これは、WindowFunction と、ウィンドウ要素の逐次集約とWindowFunctionが受信する追加のウィンドウのメタデータの両方を取得するためにReduceFunction あるいは FoldFunction を組み合わせることで緩和することができます。これらの変種のそれぞれについての例を見てみましょう。

ReduceFunction

ReduceFunctionは、同じ型の出力要素を生成するために入力からの2つの要素がどのように組み合わされるかを指定します。Flinkは逐次ウィンドウの要素を集約するためにReduceFunction を使います。

ReduceFunction は以下のように定義し使うことができます:

DataStream<Tuple2<String, Long>> input = ...;

input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .reduce(new ReduceFunction<Tuple2<String, Long>> {
      public Tuple2<String, Long> reduce(Tuple2<String, Long> v1, Tuple2<String, Long> v2) {
        return new Tuple2<>(v1.f0, v1.f1 + v2.f1);
      }
    });
val input: DataStream[(String, Long)] = ...

input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .reduce { (v1, v2) => (v1._1, v1._2 + v2._2) }

上の例はウィンドウ内の全ての要素についてタプルの2つ目のフィールドを合計します。

FoldFunction

FoldFunction はウィンドウの入力要素をどのように出力の要素の型と組み合わせるかを指定します。FoldFunction はウィンドウと現在の出力の値に追加される要素ごとに逐次呼び出されます。最初の要素は出力型の事前定義された初期値と組み合わされます。

FoldFunction は以下のように定義し使うことができます:

DataStream<Tuple2<String, Long>> input = ...;

input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .fold("", new FoldFunction<Tuple2<String, Long>, String>> {
       public String fold(String acc, Tuple2<String, Long> value) {
         return acc + value.f1;
       }
    });
val input: DataStream[(String, Long)] = ...

input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .fold("") { (acc, v) => acc + v._2 }

上の例は全ての入力のLong値を初期値が空のStringに追加します。

注意 fold() はセッション ウィンドウあるいは他のマージ可能なウィンドウと一緒に使うことができます。

WindowFunction - The Generic Case

WindowFunction はウィンドウの全ての要素を含むIterable を取得し、全てのウィンドウ関数で最も柔軟性を提供します。要素は逐次集約することができないが、ウィンドウは処理ができる準備ができたと見なされるまで内部的にバッファする必要があるため、これはパフォーマンスとリソースの消費を犠牲にして行われます。

WindowFunctionのシグネチャーは以下のように見えます:

public interface WindowFunction<IN, OUT, KEY, W extends Window> extends Function, Serializable {

  /**
   * Evaluates the window and outputs none or several elements.
   *
   * @param key The key for which this window is evaluated.
   * @param window The window that is being evaluated.
   * @param input The elements in the window being evaluated.
   * @param out A collector for emitting elements.
   *
   * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
   */
  void apply(KEY key, W window, Iterable<IN> input, Collector<OUT> out) throws Exception;
}
trait WindowFunction[IN, OUT, KEY, W <: Window] extends Function with Serializable {

  /**
    * Evaluates the window and outputs none or several elements.
    *
    * @param key    The key for which this window is evaluated.
    * @param window The window that is being evaluated.
    * @param input  The elements in the window being evaluated.
    * @param out    A collector for emitting elements.
    * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
    */
  def apply(key: KEY, window: W, input: Iterable[IN], out: Collector[OUT])
}

WindowFunction は以下のように定義され使うことができます:

DataStream<Tuple2<String, Long>> input = ...;

input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .apply(new MyWindowFunction());

/* ... */

public class MyWindowFunction implements WindowFunction<Tuple<String, Long>, String, String, TimeWindow> {

  void apply(String key, TimeWindow window, Iterable<Tuple<String, Long>> input, Collector<String> out) {
    long count = 0;
    for (Tuple<String, Long> in: input) {
      count++;
    }
    out.collect("Window: " + window + "count: " + count);
  }
}
val input: DataStream[(String, Long)] = ...

input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .apply(new MyWindowFunction())

/* ... */

class MyWindowFunction extends WindowFunction[(String, Long), String, String, TimeWindow] {

  def apply(key: String, window: TimeWindow, input: Iterable[(String, Long)], out: Collector[String]): () = {
    var count = 0L
    for (in <- input) {
      count = count + 1
    }
    out.collect(s"Window $window count: $count")
  }
}

例はウィンドウ内の要素をカウントするWindowFunctionを示します。更に、ウィンドウ関数は出力にウィンドウについての情報を追加します。

注意 カウントのような単純な集約のためにWindowFunctionを使うことはとても非効率なことに注意してください。次の章は、逐次集約とWindowFunctionの追加された情報の両方を取得するために、どうやってReduceFunctionWindowFunctionと組み合わせることができるかを示します。

ProcessWindowFunction

WindowFunction を適切な場所で使うために、ProcessWindowFunctionを使うこともできます。これは、インタフェースによりウィンドウの評価が起こる場所でのコンテキストについてのより多くの情報をクエリすることができるという点を除いて、WindowFunctionにとても似ています。

これがProcessWindowFunction インタフェースです:

public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> implements Function {

    /**
     * Evaluates the window and outputs none or several elements.
     *
     * @param key The key for which this window is evaluated.
     * @param context The context in which the window is being evaluated.
     * @param elements The elements in the window being evaluated.
     * @param out A collector for emitting elements.
     *
     * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
     */
    public abstract void process(
            KEY key,
            Context context,
            Iterable<IN> elements,
            Collector<OUT> out) throws Exception;

    /**
     * The context holding window metadata
     */
    public abstract class Context {
        /**
         * @return The window that is being evaluated.
         */
        public abstract W window();
    }
}
abstract class ProcessWindowFunction[IN, OUT, KEY, W <: Window] extends Function {

  /**
    * Evaluates the window and outputs none or several elements.
    *
    * @param key      The key for which this window is evaluated.
    * @param context  The context in which the window is being evaluated.
    * @param elements The elements in the window being evaluated.
    * @param out      A collector for emitting elements.
    * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
    */
  @throws[Exception]
  def process(
      key: KEY,
      context: Context,
      elements: Iterable[IN],
      out: Collector[OUT])

  /**
    * The context holding window metadata
    */
  abstract class Context {
    /**
      * @return The window that is being evaluated.
      */
    def window: W
  }
}

このように使うことができます:

DataStream<Tuple2<String, Long>> input = ...;

input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .process(new MyProcessWindowFunction());
val input: DataStream[(String, Long)] = ...

input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .process(new MyProcessWindowFunction())

WindowFunction with Incremental Aggregation

WindowFunction はウィンドウ内に要素が来る時に逐次的に集約するために、ReduceFunction あるいは FoldFunction を組み合わせることができます。ウィンドウが閉じたときに、WindowFunction が集約した結果と共に提供されるでしょう。これによりWindowFunctionの追加のウィンドウメタ情報にアクセスしながら、逐次的にウィンドウを計算することができます。

注意 逐次ウィンドウ集約のためにWindowFunctionの代わりにProcessWindowFunctionを使うことができます。

FoldFunctionを使った逐次ウィンドウ集約

以下の例は、FoldFunctionをウィンドウ内のイベントの数を抽出しキーとウィンドウの終了時間も返すためにWindowFunctionと組み合わせる方法を示します。

DataStream<SensorReading> input = ...;

input
  .keyBy(<key selector>)
  .timeWindow(<window assigner>)
  .fold(new Tuple3<String, Long, Integer>("",0L, 0), new MyFoldFunction(), new MyWindowFunction())

// Function definitions

private static class MyFoldFunction
    implements FoldFunction<SensorReading, Tuple3<String, Long, Integer> > {

  public Tuple3<String, Long, Integer> fold(Tuple3<String, Long, Integer> acc, SensorReading s) {
      Integer cur = acc.getField(2);
      acc.setField(2, cur + 1);
      return acc;
  }
}

private static class MyWindowFunction
    implements WindowFunction<Tuple3<String, Long, Integer>, Tuple3<String, Long, Integer>, String, TimeWindow> {

  public void apply(String key,
                    TimeWindow window,
                    Iterable<Tuple3<String, Long, Integer>> counts,
                    Collector<Tuple3<String, Long, Integer>> out) {
    Integer count = counts.iterator().next().getField(2);
    out.collect(new Tuple3<String, Long, Integer>(key, window.getEnd(),count));
  }
}
val input: DataStream[SensorReading] = ...

input
 .keyBy(<key selector>)
 .timeWindow(<window assigner>)
 .fold (
    ("", 0L, 0),
    (acc: (String, Long, Int), r: SensorReading) => { ("", 0L, acc._3 + 1) },
    ( key: String,
      window: TimeWindow,
      counts: Iterable[(String, Long, Int)],
      out: Collector[(String, Long, Int)] ) =>
      {
        val count = counts.iterator.next()
        out.collect((key, window.getEnd, count._3))
      }
  )

ReduceFunctionを使った逐次ウィンドウ集約

以下の例は、逐次ReduceFunctionをウィンドウの開始時間と一緒にウィンドウ内での最小のイベントを返すためにWindowFunctionと組み合わせる方法を示します。

DataStream<SensorReading> input = ...;

input
  .keyBy(<key selector>)
  .timeWindow(<window assigner>)
  .reduce(new MyReduceFunction(), new MyWindowFunction());

// Function definitions

private static class MyReduceFunction implements ReduceFunction<SensorReading> {

  public SensorReading reduce(SensorReading r1, SensorReading r2) {
      return r1.value() > r2.value() ? r2 : r1;
  }
}

private static class MyWindowFunction
    implements WindowFunction<SensorReading, Tuple2<Long, SensorReading>, String, TimeWindow> {

  public void apply(String key,
                    TimeWindow window,
                    Iterable<SensorReading> minReadings,
                    Collector<Tuple2<Long, SensorReading>> out) {
      SensorReading min = minReadings.iterator().next();
      out.collect(new Tuple2<Long, SensorReading>(window.getStart(), min));
  }
}
val input: DataStream[SensorReading] = ...

input
  .keyBy(<key selector>)
  .timeWindow(<window assigner>)
  .reduce(
    (r1: SensorReading, r2: SensorReading) => { if (r1.value > r2.value) r2 else r1 },
    ( key: String,
      window: TimeWindow,
      minReadings: Iterable[SensorReading],
      out: Collector[(Long, SensorReading)] ) =>
      {
        val min = minReadings.iterator.next()
        out.collect((window.getStart, min))
      }
  )

トリガー

Trigger はウィンドウ(ウィンドウ アサイナーによって形成)がいつwindow functionによって処理される準備ができるかを決定します。各WindowAssigner はデフォルトの Triggerが付属します。もしデフォルトのトリガーが需要に合わない場合は、trigger(...)を使って独自のトリガーを指定することができます。

トリガーのインタフェースはTriggerが異なるイベントに反応するための5つのメソッドを持ちます:

  • onElement() メソッドはウィンドウに追加された各要素について呼ばれます。
  • onEventTime() メソッドは登録されたイベント時間のタイマーが起動したときに呼ばれます。
  • onProcessingTime() メソッドは登録された処理時間のタイマーが起動したときに呼ばれます。
  • onMerge() メソッドはstatefulトリガーに関係があり、それらの対応するウィンドウがマージされる時に2つのトリガーの状態をマージします。例えば セッション ウィンドウを使っている時。
  • 最後に、clear() メソッドは対応するウィンドウの削除時に必要なアクションを実施します。

上のメソッドについて知っておくべき2つのことがあります:

1)最初の3つは返ってくるTriggerResultによって発動イベントにどうやって振舞うかを決定します。アクションは以下のうちの1つです:

  • CONTINUE: 何もしない、
  • FIRE: 計算を起動する、
  • PURGE: ウィンドウ内の要素を削除する、そして
  • FIRE_AND_PURGE: 計算を発動しウィンドウ内の後のウィンドウ内の要素を削除する。

2)これらのメソッドのいずれも将来のアクションのために処理-あるいはイベント-時間のタイマーを登録することができます。

発火と消去

一旦トリガーがウィンドウが処理をする準備ができたと決定すると、それを起動します。つまりFIRE あるいは FIRE_AND_PURGEを返します。これはウィンドウオペレータが現在のウィンドウの結果を発行するシグナルです。WindowFunctionを持つウィンドウを仮定すると、全ての要素は (おそらくそれらをevictorに渡した後で)WindowFunctionに渡されます。FoldFunctionReduceFunctionを持つウィンドウは単純にそれらの貪欲に集約された結果を発行します。

When a trigger fires, it can either FIRE or FIRE_AND_PURGE. FIREはウィンドウの内容を保持しますが、FIRE_AND_PURGEは内容を削除します。デフォルトでは、事前実装されたトリガーはウィンドウの状態の消去無しに単純にFIREします。

注意 消去は単純にウィンドウの内容を削除し、ウィンドウについての一時的なメタ情報をそのままにし、トリガーの状態をそのままにするでしょう。

WindowAssignersのデフォルトのトリガ

WindowAssigner のデフォルトのTrigger は多くのユーザケースにとって適切です。例えば、全てのイベント時間のウィンドウのアサイナーはデフォルトのトリガーとしてEventTimeTriggerを持ちます。このトリガーは、ウォーターマークがウィンドウの終端を通ると単純に起動します。

注意 GlobalWindowのデフォルトのトリガーは決して起動しないNeverTriggerです。結果として、GlobalWindowを使う場合は独自のトリガーを常に定義する必要があります。

注意 trigger()を使ってトリガーを指定することで、WindowAssignerのデフォルトのトリガーを上書きします。例えば、TumblingEventTimeWindows についてCountTriggerを指定すると、時間の進捗に基づくウィンドウトリガーを二度と得ることはありませんが、カウントによるもののみ得るでしょう。今は、時間とカウントの両方に基づく反応が欲しい場合は、独自のトリガーを書く必要があります。

組み込みおよび独自のトリガ

Flinkは2,3の組み込みのトリガーを付属します。

  • (既に述べた) EventTimeTrigger はウォーターマークによって計測されるイベント時間の進捗に基づいて起動します。
  • ProcessingTimeTriggerは処理時間に基づいて起動します。
  • CountTriggerは一度ウィンドウ内の要素の数が指定された制限を超えると起動します。
  • PurgingTriggerは引数として他のトリガーを取り、それを削除されるものに変換します。

独自のトリガーを実装しなければならない場合、abstractTrigger クラスを調べる必要があります。APIはまだ開発中でFlinkの将来のバージョンでは変わるかも知れないことに注意してください。

エビクター

FlinkのウィンドウモデルはWindowAssignerTriggerに加えて、任意のEvictor を指定することができます。これはevictor(...)メソッドを使って行うことができます (この文章の最初で示されました)。evictor はトリガーが起動したで、ウィンドウ関数の適用の前 および/あるいは 後でウィンドウから要素を削除することができます。そうするためにEvictorインタフェースは2つのメソッドを持ちます:

/**
 * Optionally evicts elements. Called before windowing function.
 *
 * @param elements The elements currently in the pane.
 * @param size The current number of elements in the pane.
 * @param window The {@link Window}
 * @param evictorContext The context for the Evictor
 */
void evictBefore(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);

/**
 * Optionally evicts elements. Called after windowing function.
 *
 * @param elements The elements currently in the pane.
 * @param size The current number of elements in the pane.
 * @param window The {@link Window}
 * @param evictorContext The context for the Evictor
 */
void evictAfter(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);

evictBefore()はウィンドウ関数の前に適用される退去ロジックを含み、一方でevictAfter()はウィンドウ関数の後で適用されるものを含みます。ウィンドウ関数の適用前に退去された要素は、それによって処理されないでしょう。

Flink は3つの事前実装されたevictorを付属します。それらは:

  • CountEvictor: ウィンドウからのユーザ定義の数までの要素を保持し、ウィンドウのバッファの前から残っているものを破棄します。
  • DeltaEvictor: DeltaFunctionthresholdを取り、ウィンドウバッファ内と残っているもののそれぞれから最後の要素間のデルタを計算し、デルタが閾値以上のものを削除します。
  • TimeEvictor: 指定されたウィンドウについて引数としてミリ秒単位のintervalを取り、 要素内から最大のタイムスタンプmax_tsを見つけ、max_ts - intervalより小さいタイムスタンプを持つ全ての要素を削除します。

Default デフォルトでは、全ての事前実装されたevictorはウィンドウ関数の前にそれらのロジックを適用します。

注意 ウィンドウの全ての要素は計算の前にevictorを通る必要があるため、evictorの指定はいかなる事前集約も妨げます。

注意 Flinkはウィンドウ内の要素の順番について何も保証を提供しません。このことは、evictorがウィンドウの最初から要素を削除するかもしれないが、それらは最初あるいは最後に到着するものである必要は無いことを意味します。

許可された遅延

イベント時間 ウィンドウと連携する時、要素が遅れて、つまり Flinkがイベント時間の進捗を追跡するために使用するウォーターマークは要素が所属するウィンドウの終了のタイムスタンプを既に過ぎている、到着することがありえます。Flinkがイベント時間をどう扱うかのより多くの議論については、イベント時間 と特に遅れた要素 を見てください。

デフォルトでは、遅れた要素はウォーターマークがウィンドウの終了を過ぎた時に削ります。しかし、Flinkはウィンドウオペレータが最大の許される遅延を指定することを許可します。許される遅延は、それらが削られる前に要素はどれだけの時間遅延することができるかを指定します。デフォルトの値は0です。ウォーターマークがウィンドウの終了を超えた後に到着したが、ウィンドウの終了プラス許される遅延の前に到着した要素は、まだウィンドウに追加されます。使用されるトリガーに依存して、遅れたが削られない要素はウィンドウを再び起動するかもしれません。これはEventTimeTriggerの場合のことです。

これを動作するために、Flinkはそれらの許される遅延が期限切れになるまでウィンドウの状態を保持します。これがいったん起きると、ウィンドウのライフサイクルの章でも説明されるように、Flinkはウィンドウを削除し、状態を削除します。

デフォルト デフォルトで、許される遅延は0に設定されます。つまり、ウォーターマークの後で到着する要素は削られるでしょう。

以下のように許される遅延を指定することができます:

DataStream<T> input = ...;

input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .allowedLateness(<time>)
    .<windowed transformation>(<window function>);
val input: DataStream[T] = ...

input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .allowedLateness(<time>)
    .<windowed transformation>(<window function>)

注意 GlobalWindows ウィンドウ アサイナーを使う場合、グローバルウィンドウの終了のタイムスタンプはLong.MAX_VALUEのため、どのデータもそうとは見なされません。

遅延要素の考慮

許される遅延を0より大きく設定する場合、ウォーターマークがウィンドウの終了を過ぎた後で、内容と一緒にウィンドウが保持されます。この場合、起きれたが削られていない要素が到着した時に、ウィンドウについての他の起動を引き起こすかもしれません。ウィンドウの最初の起動のmain firingに対して、それらは遅れたイベントによって起動されるため、これらの起動はlate firingsと呼ばれます。セッションウィンドウの場合、遅れた起動は2つの既存のマージされていないウィンドウ間の隙間に“橋を架ける”かもしれないため、遅れた起動はウィンドウの更なるマージに繋がるかもしれません。

注意 late fireringで発行された要素は前の計算の更新された結果として扱われなければならないことに注意してください。つまり、データストリームは同じ計算の複数の結果を含むかもしれません。アプリケーションに依存して、これらの重複した結果を考慮あるいは1つにする必要があります。

便利な状態サイズの考慮

ウィンドウは(日、週、あるいは月のような)長い期間定義されるかもしれません、従ってとても大きな状態を集約します。ウィンドウ計算のストレージ要求の見積をする時に覚えておく2,3のルールがあります。

  1. Flinkは要素が所属するウィンドウごとに各要素の1つのコピーを作成します。こう考えると、タンブリングウィンドウは各要素ごとに1つのコピーを保持します(要素は後で削除されない限り確実に1つのウィンドウに所属します)。対称的に、スライディング ウィンドウはウィンドウ アサイナーの章で説明したように、各要素の幾つかを生成します。従って、サイズが1日でスライドが1秒のスライディング ウィンドウは良い考えかもしれません。

  2. FoldFunctionReduceFunction は、それらは貪欲に要素を集約しウィンドウごとに1つの値のみを格納するため、ストレージ要求を極めて削減することができます。対称的に、単純にWindowFunction を使うと全ての要素を集めることを必要とします。

  3. ウィンドウの全ての要素は計算の前にevictorを通る必要があるため、evictorの使用はいかなる事前集約も妨ぎます(Evictorsを見てください)。

TOP
inserted by FC2 system