事前定義された、タイムスタンプのエクストラクタ/ウォーターマークのエミッタ

タイムスタンプとウォーターマークの処理で説明されたように、Flinkはプログラマーが独自のタイムスタンプを割り当て独自のウォーターマークを発行することができる抽象概念を提供します。より具体的には、ユースケースに依存して、AssignerWithPeriodicWatermarksAssignerWithPunctuatedWatermarks インタフェースのうちの1つを実装することでそうすることができます。簡単に言うと、1つ目は定期的にウォーターマークを発行し、一方で2つ目はやってくるレコードの何らかのプロパティに基づいてそれを行います。例えば、ストリーム内で特別な要素に遭遇するごとに。

そのようなタスクのためのプログラミングの努力をより簡単にするために、Flinkは事前実装された幾つかのタイムスタンプのアサイナーを付属しています。この章ではそれらのリストを提供します。それらの取り出してすぐ使える機能とは別として、それらの実装は独自の実装の例を提供します。

増加タイムスタンプを持つアサイナー

定期的なウォーターマークの生成の最も簡単な特別な実例は、指定されたソースタスクで見られるタイムスタンプが増加する順番で発生する場合です。そのような場合、以前のタイムスタンプは到着しないため、現在のタイムスタンプは常にウォーターマークとして振舞います。

並行データソースタスク毎にタイムスタンプが増加することだけが必要なことに注意してください。例えば、あるセットアップでKafkaのパーティションが1つの並行データソースインスタンスによって読まれる場合、タイムスタンプが各Kafkaパーティション内で増加することだけが必要です。Flinkのウォーターマークのマージの仕組みは、並行ストリームがシャッフル、統合、接続あるいは合併される時にいつでも現在のウォーターマークを生成するでしょう。

DataStream<MyEvent> stream = ...

DataStream<MyEvent> withTimestampsAndWatermarks =
    stream.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<MyEvent>() {

        @Override
        public long extractAscendingTimestamp(MyEvent element) {
            return element.getCreationTime();
        }
});
val stream: DataStream[MyEvent] = ...

val withTimestampsAndWatermarks = stream.assignAscendingTimestamps( _.getCreationTime )

固定量の遅延が許されるアサイナー

定期的なウォーターマークの生成の他の例は、ウォーターマークがストリーム内で見つかる最大(イベント時間)タイムスタンプから一定時間遅れる場合です。この例はストリーム内で遭遇するかもしれない最大の遅延が前もって知られている場合のシナリオです。例えば、タイムスタンプを持つ要素を含む独自のソースの生成がテストのために一定の時間の間広がる場合です。これらの事例のために、Flinkは引数maxOutOfOrdernessを取るBoundedOutOfOrdernessTimestampExtractorを提供します。つまり、指定されたウィンドウの最終結果を計算する時に、要素が無視される前に許容される遅延の最大時間。遅延はt - t_wの結果に対応し、ここでt は要素の(イベント時間の)タイムスタンプで、t_w は以前のウォーターマークです。もし、lateness > 0 であれば要素は遅れたと見なされ、デフォルトでは対応するウィンドウのジョブの結果を計算する時に無視されます。遅延要素との連携についての詳しい情報は許可される遅延についてのドキュメントを見てください。

DataStream<MyEvent> stream = ...

DataStream<MyEvent> withTimestampsAndWatermarks =
    stream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<MyEvent>(Time.seconds(10)) {

        @Override
        public long extractTimestamp(MyEvent element) {
            return element.getCreationTime();
        }
});
val stream: DataStream[MyEvent] = ...

val withTimestampsAndWatermarks = stream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[MyEvent](Time.seconds(10))( _.getCreationTime ))

上に戻る

TOP
inserted by FC2 system