タイムスタンプ/watermarkの生成

この章はイベント時間で実行するプログラムに関連します。イベント時間, 処理時間および取り込み時間の紹介については、イベント時間の紹介を参照してください。

イベント時間と連携するために、ストリーミングプログラムは結果的に時間指標を設定する必要があります。

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

増加タイムスタンプ

イベント時間と連携するために、Flinkはイベントの タイムスタンプを知る必要があります。ストリーム内の各要素は割り当てられたイベントのタイムスタンプを持つ必要があることを意味します。これは通常は要素内の何らかのフィールドからタイムスタンプをアクセス/取り出しすることで行われます。

タイムスタンプの割り当てはウォーターマークの生成と協力して行われます。ウォーターマークの生成はシステムにイベント時の進捗を伝えます。

タイムスタンプの割り当てとウォーターマークの生成には2つの方法があります:

  1. データストリーム ソース内で直接
  2. タイムスタンプ アサイナー / ウォーターマーク ジェネレータ を使って: Flinkではタイムスタンプ アサイナーはウォーターマークも発行されるように定義します

注意 タイムスタンプとウォーターマークの両方ともJava epoch 1970-01-01T00:00:00Z からのミリ秒で指定されます。

タイムスタンプとウォーターマークを持つソース関数

ストリームソースは生成した要素に直接タイムスタンプも割り当て、ウォーターマークも発行することができます。これが行われる時は、タイムスタンプ アサイナーは必要ありません。もしタイムスタンプが使われる場合、ソースから提供されるタイムスタンプおよびウォーターマークの全てが上書きされるでしょう。

タイムスタンプをソースの中で直接要素に割り当てるには、ソースはSourceContext上でcollectWithTimestamp(...)メソッドを使う必要があります。ウォーターマークを生成するには、ソースはemitWatermark(Watermark) 関数を呼ぶ必要があります。

以下は、タイムスタンプを割り当てウォーターマークを生成する(non-checkpointed)ソースの簡単な例です:

@Override
public void run(SourceContext<MyType> ctx) throws Exception {
	while (/* condition */) {
		MyType next = getNext();
		ctx.collectWithTimestamp(next, next.getEventTimestamp());

		if (next.hasWatermarkTime()) {
			ctx.emitWatermark(new Watermark(next.getWatermarkTime()));
		}
	}
}
override def run(ctx: SourceContext[MyType]): Unit = {
	while (/* condition */) {
		val next: MyType = getNext()
		ctx.collectWithTimestamp(next, next.eventTimestamp)

		if (next.hasWatermarkTime) {
			ctx.emitWatermark(new Watermark(next.getWatermarkTime))
		}
	}
}

タイムスタンプ アイサイナー / ウォーターマーク ジェネレーター

タイムスタンプ アサイナーはストリームを取り、タイムスタンプが付いた要素とウォーターマークを持つ新しいストリームを生成します。もし元のストリームがタイムスタンプ および/あるいは ウォーターマークを既に持っていた場合は、タイムスタンプ アサイナーはそれらを上書きするでしょう。

タイムスタンプ アサイナーは通常データソースの後ですぐに指定されますが、厳密にはそうすることは必須ではありません。例えば、一般的なパターンはタイムスタンプ アサイナーの前にパース(MapFunction) およびフィルター (FilterFunction) をします。どの場合でも、タイムスタンプ アサイナーは(最初のウィンドウ オペレーションのような)イベント時の最初のオペレーションの前に指定される必要があります。特別な場合として、Kafkaをストリーミング ジョブのソースとして使う場合、Flinkはソース(あるいはコンシューマ)自身の中での タイムスタンプ アサイナー / ウォーターマーク エミッター の仕様を許可します。そうする方法の詳しい情報については、Kafka コネクタ ドキュメントの中で見つかります。

注意: この章の助言として、独自のタイムスタンプ エクストラクタ/ウォーターマーク エミッターを生成するためにプログラマが実装しなければならない主要なインタフェースを表します。Flinkに同梱される事前実装されたエクストラクタを見るには、事前定義されたタイムスタンプ エクストラクタ / ウォーターマーク エミッター ページを参照してください。

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

DataStream<MyEvent> stream = env.readFile(
        myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,
        FilePathFilter.createDefaultFilter(), typeInfo);

DataStream<MyEvent> withTimestampsAndWatermarks = stream
        .filter( event -> event.severity() == WARNING )
        .assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks());

withTimestampsAndWatermarks
        .keyBy( (event) -> event.getGroup() )
        .timeWindow(Time.seconds(10))
        .reduce( (a, b) -> a.add(b) )
        .addSink(...);
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

val stream: DataStream[MyEvent] = env.readFile(
         myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,
         FilePathFilter.createDefaultFilter());

val withTimestampsAndWatermarks: DataStream[MyEvent] = stream
        .filter( _.severity == WARNING )
        .assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks())

withTimestampsAndWatermarks
        .keyBy( _.getGroup )
        .timeWindow(Time.seconds(10))
        .reduce( (a, b) => a.add(b) )
        .addSink(...)

定期的なウォーターマークを使って

AssignerWithPeriodicWatermarks はタイムスタンプを割り当て、ウォーターマークを定期的に生成します (ストリーム要素に依存するか、処理時間に純粋に基づくかもしれません)。

ウォーターマークが生成される間隔 (各n ミリ秒) はExecutionConfig.setAutoWatermarkInterval(...)を使って定義されます。アサイナーの getCurrentWatermark() メソッドは毎回呼ばれ、返ってきたウォーターマークが非nullおよび以前のウォーターマークより大きい場合は新しいウォーターマークが発行されるでしょう。

以下は、定期的なウォーターマークの生成を持つタイムスタンプ アサイナーの2つの簡単な例です。

/**
 * This generator generates watermarks assuming that elements arrive out of order,
 * but only to a certain degree. The latest elements for a certain timestamp t will arrive
 * at most n milliseconds after the earliest elements for timestamp t.
 */
public class BoundedOutOfOrdernessGenerator extends AssignerWithPeriodicWatermarks<MyEvent> {

    private final long maxOutOfOrderness = 3500; // 3.5 seconds

    private long currentMaxTimestamp;

    @Override
    public long extractTimestamp(MyEvent element, long previousElementTimestamp) {
        long timestamp = element.getCreationTime();
        currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
        return timestamp;
    }

    @Override
    public Watermark getCurrentWatermark() {
        // return the watermark as current highest timestamp minus the out-of-orderness bound
        return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
    }
}

/**
 * This generator generates watermarks that are lagging behind processing time by a fixed amount.
 * It assumes that elements arrive in Flink after a bounded delay.
 */
public class TimeLagWatermarkGenerator extends AssignerWithPeriodicWatermarks<MyEvent> {

	private final long maxTimeLag = 5000; // 5 seconds

	@Override
	public long extractTimestamp(MyEvent element, long previousElementTimestamp) {
		return element.getCreationTime();
	}

	@Override
	public Watermark getCurrentWatermark() {
		// return the watermark as current time minus the maximum time lag
		return new Watermark(System.currentTimeMillis() - maxTimeLag);
	}
}
/**
 * This generator generates watermarks assuming that elements arrive out of order,
 * but only to a certain degree. The latest elements for a certain timestamp t will arrive
 * at most n milliseconds after the earliest elements for timestamp t.
 */
class BoundedOutOfOrdernessGenerator extends AssignerWithPeriodicWatermarks[MyEvent] {

    val maxOutOfOrderness = 3500L; // 3.5 seconds

    var currentMaxTimestamp: Long;

    override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {
        val timestamp = element.getCreationTime()
        currentMaxTimestamp = max(timestamp, currentMaxTimestamp)
        timestamp;
    }

    override def getCurrentWatermark(): Watermark = {
        // return the watermark as current highest timestamp minus the out-of-orderness bound
        new Watermark(currentMaxTimestamp - maxOutOfOrderness);
    }
}

/**
 * This generator generates watermarks that are lagging behind processing time by a fixed amount.
 * It assumes that elements arrive in Flink after a bounded delay.
 */
class TimeLagWatermarkGenerator extends AssignerWithPeriodicWatermarks[MyEvent] {

    val maxTimeLag = 5000L; // 5 seconds

    override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {
        element.getCreationTime
    }

    override def getCurrentWatermark(): Watermark = {
        // return the watermark as current time minus the maximum time lag
        new Watermark(System.currentTimeMillis() - maxTimeLag)
    }
}

中断されたウォーターマークを使って

あるイベントが新しいウォーターマークが生成されたかもしれないことを示す時にいつでもウォーターマークを生成するには、AssignerWithPunctuatedWatermarksを使ってください。このクラスのために Flink はまず最初に要素にタイムスタンプを割り当てるために extractTimestamp(...) メソッドを呼び、その要素上ですぐにcheckAndGetNextWatermark(...)メソッドを呼びます。

checkAndGetNextWatermark(...) メソッドはextractTimestamp(...) メソッド内で割り当てられたタイムスタンプが渡され、ウォーターマークを生成したいかどうかを決定することができます。checkAndGetNextWatermark(...) メソッドが非nullのウォーターマークを返し、そのウォーターマークが最近の以前のウォーターマークより大きい場合はいつでも、新しいウォーターマークが発行されるでしょう。

public class PunctuatedAssigner extends AssignerWithPunctuatedWatermarks<MyEvent> {

	@Override
	public long extractTimestamp(MyEvent element, long previousElementTimestamp) {
		return element.getCreationTime();
	}

	@Override
	public Watermark checkAndGetNextWatermark(MyEvent lastElement, long extractedTimestamp) {
		return lastElement.hasWatermarkMarker() ? new Watermark(extractedTimestamp) : null;
	}
}
class PunctuatedAssigner extends AssignerWithPunctuatedWatermarks[MyEvent] {

	override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {
		element.getCreationTime
	}

	override def checkAndGetNextWatermark(lastElement: MyEvent, extractedTimestamp: Long): Watermark = {
		if (lastElement.hasWatermarkMarker()) new Watermark(extractedTimestamp) else null
	}
}

注意: 各1つのイベント上でウォーターマークが生成されるかもしれません。しかし、各ウォーターマークはいくつかのダウンストリームの計算を引き起こすため、過剰なウォーターマークはパフォーマンスを低下させます。

Kafkaパーティション毎のタイムスタンプ

データソースとしてApache Kafka を使う場合は、各Kafkaパーティションは1つのイベント時間パターンを持つかもしれません(増加するタイムスタンプあるいは有限な順番のばらばらさ)。しかし、Kafkaからストリームを消費する場合、パーティションからイベントを交互配置しパーティションごとのパターンを破壊しながら複数のパーティションがしばしば並行して消費します (これはKafkaのコンシューマクライアントが動作する生来の方法です)。

この場合、FlinkのKafka-partition-aware ウォーターマーク 生成を使うことができます。この機能を使って、ウォーターマークはKafkaコンシューマ内、Kafkaパーティション毎で生成され、パーティション毎のウォーターマークはストリーム シャッフル上でウォーターマークがマージされるのと同じ方法でマージされます。

例えば、もしイベントのタイムスタンプがKafkaパーティション毎に厳密に増加する場合、ascending timestamps watermark generatorを使ったパーティション毎のウォーターマークの生成はウォーターマーク全体の中で完全になるでしょう。

下の図はどのようにper-Kafka-partition watermark 生成を使うか、およびその場合のストリーミング データフローを通じてどのようにウォーターマークが広がるかを示します。

FlinkKafkaConsumer09<MyType> kafkaSource = new FlinkKafkaConsumer09<>("myTopic", schema, props);
kafkaSource.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<MyType>() {

    @Override
    public long extractAscendingTimestamp(MyType element) {
        return element.eventTimestamp();
    }
});

DataStream<MyType> stream = env.addSource(kafkaSource);
val kafkaSource = new FlinkKafkaConsumer09[MyType]("myTopic", schema, props)
kafkaSource.assignTimestampsAndWatermarks(new AscendingTimestampExtractor[MyType] {
    def extractAscendingTimestamp(element: MyType): Long = element.eventTimestamp
})

val stream: DataStream[MyType] = env.addSource(kafkaSource)

Kafkaパーティションを考慮したウォーターマークの生成

上に戻る

TOP
inserted by FC2 system