Generating Watermarks
This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.

ウォーターマークの生成 #

このセクションでは、イベント時間タイムスタンプとウォーターマークを操作するためにFlinkが提供するAPIについて学習します。イベント時間処理時間取り込み時間の概要については、イベント時間の概要を参照してください。

ウォーターマーク戦略の概要 #

イベント時間と連携するために、Flinkはイベントのタイムスタンプを知る必要があります。ストリーム内の各要素は割り当てられたイベントのタイムスタンプを持つ必要があることを意味します。これは通常、TimestampAssignerを使って要素内の一部のフィールドにアクセスしてタイムスタンプを抽出することによって行われます。

タイムスタンプの割り当てはウォーターマークの生成と連動して行われます。ウォーターマークの生成はシステムにイベント時の進捗を伝えます。WatermarkGeneratorを指定して設定できます。

Flink APIはTimestampAssignerWatermarkGeneratorの両方を含むWatermarkStrategyを想定しています。WatermarkStrategyでは多くの一般的な戦略が静的メソッドとしてすぐに利用できますが、必要であればユーザは独自の戦略を作ることもできます。

完全を期すためのインタフェースは次の通りです:

public interface WatermarkStrategy<T>
    extends TimestampAssignerSupplier<T>,
            WatermarkGeneratorSupplier<T>{

    /**
     * Instantiates a {@link TimestampAssigner} for assigning timestamps according to this
     * strategy.
     */
    @Override
    TimestampAssigner<T> createTimestampAssigner(TimestampAssignerSupplier.Context context);

    /**
     * Instantiates a WatermarkGenerator that generates watermarks according to this strategy.
     */
    @Override
    WatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context);
}

前述したように、通常はこのインタフェースを自分で実装せずに一般的なウォーターマーク戦略用のWatermarkStrategyの静的ヘルパーメソッド、または独自のTimestampAssignerWatermarkGeneratorとバンドルします。 例えば、タイムスタンプアサイナーとして境界あり順不同のウォーターマークとラムダ関数を使うには、次のように使います:

WatermarkStrategy
        .<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20))
        .withTimestampAssigner((event, timestamp) -> event.f0);
WatermarkStrategy
  .forBoundedOutOfOrderness[(Long, String)](Duration.ofSeconds(20))
  .withTimestampAssigner(new SerializableTimestampAssigner[(Long, String)] {
    override def extractTimestamp(element: (Long, String), recordTimestamp: Long): Long = element._1
  })
class FirstElementTimestampAssigner(TimestampAssigner):
   
    def extract_timestamp(self, value, record_timestamp):
        return value[0]


WatermarkStrategy \
    .for_bounded_out_of_orderness(Duration.of_seconds(20)) \
    .with_timestamp_assigner(FirstElementTimestampAssigner())

TimestampAssignerの指定はオプションで、ほとんどの場合実際には指定する必要はありません。例えば、KafkaまたはKinesisを使う場合、Kafka/Kinesisのレコードから直接タイムスタンプを取得します。

WatermarkGeneratorインタフェースについては、後のWriting WatermarkGeneratorsで説明します。

注意: タイムスタンプとウォーターマークはどちらもJava epoch 1970-01-01T00:00:00Zからのミリ秒として指定されます。

ウォーターマーク戦略の使用 #

Flinkアプリケーションには、WatermarkStrategyを使える場所が二か所あります。1) ソース上で直接、2) 非ソース操作後

最初のオプションは、ソースがウォーターマークのロジックのシャード/パーティション/分割に関する知識を活用できるため、推奨されます。通常、ソースはより細かいレベルでウォーターマークを追跡できるようになり、ソースによって生成される全体的なウォーターマークがより正確になります。WatermarkStrategyをソース上で指定することは、通常ソース固有のインタフェースを使う必要があることを意味します / これがKafkaコネクタでどのように機能するのか、パーティションごとのウォーターマークがそこでどのように機能するかについては、ウォーターマーク戦略とKafkaコネクタを参照してください。

2番目のオプション(任意の操作後のWatermarkStrategyの設定)は、ソースに戦略を直接設定できない場合のみ使ってください:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<MyEvent> stream = env.readFile(
        myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,
        FilePathFilter.createDefaultFilter(), typeInfo);

DataStream<MyEvent> withTimestampsAndWatermarks = stream
        .filter( event -> event.severity() == WARNING )
        .assignTimestampsAndWatermarks(<watermark strategy>);

withTimestampsAndWatermarks
        .keyBy( (event) -> event.getGroup() )
        .window(TumblingEventTimeWindows.of(Time.seconds(10)))
        .reduce( (a, b) -> a.add(b) )
        .addSink(...);
val env = StreamExecutionEnvironment.getExecutionEnvironment

val stream: DataStream[MyEvent] = env.readFile(
         myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,
         FilePathFilter.createDefaultFilter())

val withTimestampsAndWatermarks: DataStream[MyEvent] = stream
        .filter( _.severity == WARNING )
        .assignTimestampsAndWatermarks(<watermark strategy>)

withTimestampsAndWatermarks
        .keyBy( _.getGroup )
        .window(TumblingEventTimeWindows.of(Time.seconds(10)))
        .reduce( (a, b) => a.add(b) )
        .addSink(...)
env = StreamExecutionEnvironment.get_execution_environment()

# currently read_file is not supported in PyFlink
stream = env \
    .read_text_file(my_file_path, charset) \
    .map(lambda s: MyEvent.from_string(s))

with_timestamp_and_watermarks = stream \
    .filter(lambda e: e.severity() == WARNING) \
    .assign_timestamp_and_watermarks(<watermark strategy>)

with_timestamp_and_watermarks \
    .key_by(lambda e: e.get_group()) \
    .window(TumblingEventTimeWindows.of(Time.seconds(10))) \
    .reduce(lambda a, b: a.add(b)) \
    .add_sink(...)

この方法でWatermarkStrategyを使うと、ストリームを取得し、タイムスタンプ付きの要素とウォーターマークを含む新しいストリームが生成されます。元のストリームにすでにタイムスタンプやウォーターマークがあった場合、タイムスタンプアサイナーはそれを上書きします。

アイドル状態のソースの処理 #

入力スプリット/パーティション/シャードの1つがしばらくの間イベントを伝送しない場合、これはWatermarkGeneratorがウォーターマークの基礎となる新しい情報を取得できないことも意味します。これをアイドル入力またはアイドルソースと呼びます。 一部のパーティションがまだイベントを保持している可能性があるため、これは問題です。この場合、ウォーターマークは、全ての異なる並列ウォーターマークの最小値として計算されるため、抑制されます。

これに対処するには、アイドル状態を検出し、入力をアイドルとしてマークするWatermarkStrategyを使えます。WatermarkStrategyはこれを行うための便利なヘルパーを提供します:

WatermarkStrategy
        .<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20))
        .withIdleness(Duration.ofMinutes(1));
WatermarkStrategy
  .forBoundedOutOfOrderness[(Long, String)](Duration.ofSeconds(20))
  .withIdleness(Duration.ofMinutes(1))
WatermarkStrategy \
    .for_bounded_out_of_orderness(Duration.of_seconds(20)) \
    .with_idleness(Duration.of_minutes(1))

ウォーターマークの配置 #

前の段落では、スプリット/パーティション/シャードまたはソースがアイドル状態であり、ウォーターマークの増加が停止する可能性がある状況について説明しました。On the other side of the spectrum, a split/partition/shard or source may process records very fast and in turn increase its watermark relatively faster than the others. これ自体は問題ありません。ただし、一部のデータを送信するためにウォーターマークを使っているダウンストリームオペレータにとって、それは実際に問題になる可能性があります。

この場合、アイドル状態のソースとは対照的に、そのようなダウンストリームオペレータ(集計上のウィンドウ結合など)が進行する可能性があります。ただし、そのようなオペレータは、全ての入力からの最小限のウォーターマークが遅延入力によって抑制されるため、高速な入力からの過剰な量のデータをバッファする必要がある場合があります。All records emitted by the fast input will hence have to be buffered in the said downstream operator state, which can lead into uncontrollable growth of the operator’s state.

In order to address the issue, you can enable watermark alignment, which will make sure no sources/splits/shards/partitions increase their watermarks too far ahead of the rest. 全てのソースを個別に位置合わせできます:

WatermarkStrategy
        .<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20))
        .withWatermarkAlignment("alignment-group-1", Duration.ofSeconds(20), Duration.ofSeconds(1));
WatermarkStrategy
  .forBoundedOutOfOrderness[(Long, String)](Duration.ofSeconds(20))
  .withWatermarkAlignment("alignment-group-1", Duration.ofSeconds(20), Duration.ofSeconds(1))
WatermarkStrategy \
    .for_bounded_out_of_orderness(Duration.of_seconds(20)) \
    .with_watermark_alignment("alignment-group-1", Duration.of_seconds(20), Duration.of_seconds(1))
注意: FLIP-27ソース用だけのウォーターマークの位置合わせを有効にできます。 sources. 従来のもの、あるいはソースの後にDataStream#assignTimestampsAndWatermarksを介して適用された場合は動作しません。

アラインメントを有効にする場合、ソースがどのグループに所属するかをFlinkに指示する必要があります。You do that by providing a label (e.g. alignment-group-1) which bind together all sources that share it. さらに、そのグループに属する全てのソースに渡る現在の最小ウォーターマークから最大ドリフトを判別する必要があります。3番目のパラメータは現在の最大ウォーターマークを更新する頻度を記述します。頻繁な更新の欠点は、TMとJMの間でやり取りされるRPCメッセージが増加することです。 travelling between TMs and the JM.

位置合わせを行うために、Flinkは、あまりに遠くの未来のウォーターマークを生成したソース/タスクからの消費を停止します。その間、他のソース/タスクからレコードを読み取り続け、結合されたウォーターマークを前方に移動し、より高速にブロックを解除できます。

注意: Flink 1.17の時点で、分割レベルのウォーターマークの配置はFLIP-27ソースフレームワークでサポートされています。 ソースコネクタは、スプリット/パーティション/シャードを同じタスクに配置できるように、スプリットを再開および一時停止するためのインタフェースを実装する必要があります。一時停止インタフェースと再開インタフェースの詳細については、Source APIをご覧ください。

1.15.xから1.16.xの間のFlinkバージョンからアップグレードする場合は、pipeline.watermark-alignment.allow-unaligned-source-splitsをtrueに設定することで、分割レベルの配置を無効にできます。さらに、ソースが分割レベル配置をサポートしているかどうかを、実行時にUnsupportedOperationExceptionを投げるかどうかを確認するか、javadocを読むことで判断できます。この場合、致命的な例外を避けるために、分割レベルのウォーターマークの位置合わせを無効にすることが望ましいです。

フラグをtrueに設定すると、スプリット/シャード/パーティションの数がソースオペレータの並列度に等しい場合にのみウォーターマークの位置合わせが適切に動作します。これにより、全てのサブタスクに単一の作業単位が割り当てられることになります。一方、異なるペースでウォーターマークを生成し同じタスクに割り当てられる2つのKafkaパーティションがある場合、ウォーターマークは期待通りに動作しない可能性があります。幸いなことに、最悪の場合でも、基本的な位置合わせのパフォーマンスは位置合わせを全く行わない場合よりも悪化することはありません。

さらに、Flinkは同じソースや異なるソースのタスク間での位置合わせもサポートします。これは、異なる速度でウォーターマークを生成する2つの異なるソース(Kafkaとファイルなど)がある場合に便利です。

WatermarkGeneratorsの書き方 #

TimestampAssignerはイベントからフィールドを抽出する簡単な関数であるため、詳細に検討する必要はありません。一方、WatermarkGeneratorの書き方はもう少し複雑なため、次の2つのセクションでそのやり方を見ていきます。これは、WatermarkGeneratorインタフェースです:

/**
 * The {@code WatermarkGenerator} generates watermarks either based on events or
 * periodically (in a fixed interval).
 *
 * <p><b>Note:</b> This WatermarkGenerator subsumes the previous distinction between the
 * {@code AssignerWithPunctuatedWatermarks} and the {@code AssignerWithPeriodicWatermarks}.
 */
@Public
public interface WatermarkGenerator<T> {

    /**
     * Called for every event, allows the watermark generator to examine
     * and remember the event timestamps, or to emit a watermark based on
     * the event itself.
     */
    void onEvent(T event, long eventTimestamp, WatermarkOutput output);

    /**
     * Called periodically, and might emit a new watermark, or not.
     *
     * <p>The interval in which this method is called and Watermarks
     * are generated depends on {@link ExecutionConfig#getAutoWatermarkInterval()}.
     */
    void onPeriodicEmit(WatermarkOutput output);
}

ウォーターマーク生成には、periodicpunctuatedの2つの異なる形式があります。

periodic generatorは通常、onEvent()を介して受信イベントを監視し、フレームワークがonPeriodicEmit()を呼び出す時にウォーターマークを発行します。

puncutated generatorはonEvent()内のイベントを調べ、ストリーム内でウォーターマーク情報を運ぶ特別なmarker eventsまたはpunctuationsを待ちます。これらのイベントのいずれかを検出すると、すぐにウォーターマークを発行します。 通常、punctuated generatorsはonPeriodicEmit()からウォーターマークを発行しません。

次に、各スタイルのジェネレータを実装する方法を見ていきます。

Periodic WatermarkGeneratorの書き方 #

periodic generatorはストリームイベントを監視し、定期的にウォーターマークを生成します(おそらくストリーム要素に応じて、または純粋に処理時間に応じて)。

ウォーターマークが生成される間隔(nミリ秒ごと)は、ExecutionConfig.setAutoWatermarkInterval(...)によって定義されます。generatorsのonPeriodicEmit()メソッドは毎回呼び出され、返されたウォーターマークがnullでなく、前のウォーターマークよりも大きい場合は、新しいウォーターマークが発行されます。

ここでは、定期的なウォーターマーク生成を使う、ウォーターマークジェネレータの2つの簡単な例を示します。Flinkには、以下に示すBoundedOutOfOrdernessWatermarksと同様に動作するWatermarkGeneratorが付属していることに注意してください。その使い方については、ここで読むことができます。

/**
 * This generator generates watermarks assuming that elements arrive out of order,
 * but only to a certain degree. The latest elements for a certain timestamp t will arrive
 * at most n milliseconds after the earliest elements for timestamp t.
 */
public class BoundedOutOfOrdernessGenerator implements WatermarkGenerator<MyEvent> {

    private final long maxOutOfOrderness = 3500; // 3.5 seconds

    private long currentMaxTimestamp;

    @Override
    public void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {
        currentMaxTimestamp = Math.max(currentMaxTimestamp, eventTimestamp);
    }

    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
        // emit the watermark as current highest timestamp minus the out-of-orderness bound
        output.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness - 1));
    }

}

/**
 * This generator generates watermarks that are lagging behind processing time
 * by a fixed amount. It assumes that elements arrive in Flink after a bounded delay.
 */
public class TimeLagWatermarkGenerator implements WatermarkGenerator<MyEvent> {

    private final long maxTimeLag = 5000; // 5 seconds

    @Override
    public void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {
        // don't need to do anything because we work on processing time
    }

    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
        output.emitWatermark(new Watermark(System.currentTimeMillis() - maxTimeLag));
    }
}
/**
 * This generator generates watermarks assuming that elements arrive out of order,
 * but only to a certain degree. The latest elements for a certain timestamp t will arrive
 * at most n milliseconds after the earliest elements for timestamp t.
 */
class BoundedOutOfOrdernessGenerator extends WatermarkGenerator[MyEvent] {

    val maxOutOfOrderness = 3500L // 3.5 seconds

    var currentMaxTimestamp: Long = _

    override def onEvent(element: MyEvent, eventTimestamp: Long, output: WatermarkOutput): Unit = {
        currentMaxTimestamp = max(eventTimestamp, currentMaxTimestamp)
    }

    override def onPeriodicEmit(output: WatermarkOutput): Unit = {
        // emit the watermark as current highest timestamp minus the out-of-orderness bound
        output.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness - 1))
    }
}

/**
 * This generator generates watermarks that are lagging behind processing
 * time by a fixed amount. It assumes that elements arrive in Flink after
 * a bounded delay.
 */
class TimeLagWatermarkGenerator extends WatermarkGenerator[MyEvent] {

    val maxTimeLag = 5000L // 5 seconds

    override def onEvent(element: MyEvent, eventTimestamp: Long, output: WatermarkOutput): Unit = {
        // don't need to do anything because we work on processing time
    }

    override def onPeriodicEmit(output: WatermarkOutput): Unit = {
        output.emitWatermark(new Watermark(System.currentTimeMillis() - maxTimeLag))
    }
}
またPython APIではサポートされません

Punctuated WatermarkGeneratorの書き方 #

punctuated watermark generatorは、イベントのストリームを監視し、ウォーターマーク情報を運ぶ特別な要素を検出するたびに、ウォーターマークを発行します。

これは、イベントが特定のマーカーを運ぶことを示すたびにウォーターマークを発行するpunctuated generatorを実装する方法です。

public class PunctuatedAssigner implements WatermarkGenerator<MyEvent> {

    @Override
    public void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {
        if (event.hasWatermarkMarker()) {
            output.emitWatermark(new Watermark(event.getWatermarkTimestamp()));
        }
    }

    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
        // don't need to do anything because we emit in reaction to events above
    }
}
class PunctuatedAssigner extends WatermarkGenerator[MyEvent] {

    override def onEvent(element: MyEvent, eventTimestamp: Long): Unit = {
        if (event.hasWatermarkMarker()) {
            output.emitWatermark(new Watermark(event.getWatermarkTimestamp()))
        }
    }

    override def onPeriodicEmit(): Unit = {
        // don't need to do anything because we emit in reaction to events above
    }
}
またPython APIではサポートされません
注意: イベントごとにウォーターマークを生成することができます。しかし、各ウォーターマークはいくつかのダウンストリームの計算を引き起こすため、過剰なウォーターマークはパフォーマンスを低下させます。

ウォーターマーク戦略とKafkaコネクタ #

データソースとして、Apache Kafkaを使う場合、各Kafkaパーティションには単純なイベント時間パターン(昇順のタイムスタンプ、または有限の順不同)が含まれる場合があります。しかし、Kafkaからストリームを消費する場合、パーティションからイベントを交互配置しパーティションごとのパターンを破壊しながら複数のパーティションがしばしば並行して消費します (これはKafkaのコンシューマクライアントが動作する生来の方法です)。

この場合、FlinkのKafka-partition対応ウォーターマーク生成を使うことができます。 この機能を使って、ウォーターマークはKafkaコンシューマ内でKafkaパーティションごとに生成され、パーティション毎のウォーターマークはストリームシャッフル上でウォーターマークがマージされるのと同じ方法でマージされます。

例えば、イベントのタイムスタンプがKafkaパーティションごとに厳密に昇順である場合、ascending timestamps watermark generatorを使ってパーティションごとにウォータマークを生成すると、全体的に完璧なウォーターマークが生成されます。この例では、TimestampAssignerを提供していないことに注してください。代わりにKafkaレコード自体のタイムスタンプが使われます。

以下の図は、per-Kafka-partition watermark generationの使い方と、その場合のストリーミングデータフローを通じてウォーターマークがどのように伝搬するかを示しています。

KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
    .setBootstrapServers(brokers)
    .setTopics("my-topic")
    .setGroupId("my-group")
    .setStartingOffsets(OffsetsInitializer.earliest())
    .setValueOnlyDeserializer(new SimpleStringSchema())
    .build();

DataStream<String> stream = env.fromSource(
    kafkaSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(20)), "mySource");
val kafkaSource: KafkaSource[String] = KafkaSource.builder[String]()
    .setBootstrapServers("brokers")
    .setTopics("my-topic")
    .setGroupId("my-group")
    .setStartingOffsets(OffsetsInitializer.earliest())
    .setValueOnlyDeserializer(new SimpleStringSchema)
    .build()

val stream = env.fromSource(
    kafkaSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(20)), "mySource")
kafka_source = KafkaSource.builder()
    .set_bootstrap_servers(brokers)
    .set_topics("my-topic")
    .set_group_id("my-group")
    .set_starting_offsets(KafkaOffsetsInitializer.earliest())
    .set_value_only_deserializer(SimpleStringSchema())
    .build()

stream = env.from_source(
    source=kafka_source,
    watermark_strategy=WatermarkStrategy.for_bounded_out_of_orderness(Duration.of_seconds(20)),
    source_name="kafka_source")
Generating Watermarks with awareness for Kafka-partitions

オペレータがウォーターマークを処理する方法 #

原則として、オペレータは与えらえたウォーターマークをダウンストリームに転送する前に、ウォーターマークを完全に処理する必要があります。例えば、WindowOperatorは最初に起動されるべき全てのウィンドウを評価し、ウォーターマークによってトリガーされた全ての出力を生成した後にのみウォーターマーク自体が送信されます。言い換えると、ウォーターマークの発生によって生成された全ての要素は、ウォーターマークの前に発行されます。

同じルールがTwoInputStreamOperatorにも適用されます。ただし、この場合、オペレータの現在のウォーターマークは、その両方の入力の最小値として定義されます。

この動作の詳細は、OneInputStreamOperator#processWatermarkTwoInputStreamOperator#processWatermark1TwoInputStreamOperator#processWatermark2メソッドの実装によって定義されます。

非推奨のAssignerWithPeriodicWatermarksとAssignerWithPunctuatedWatermarks #

WatermarkStrategyTimestampAssignerWatermarkGeneratorの現在の抽象化を導入する前に、FlinkはAssignerWithPeriodicWatermarksAssignerWithPunctuatedWatermarksを使っていました。これらは引き続きAPIに表示されますが、新しいインタフェースは関心事が明確に分離され、ウォーターマーク生成のperiodicとpunctuated形式が統一されるため、新しいインタフェースを使うことをお勧めします。

Back to top

inserted by FC2 system