Hybrid Source
This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.

ハイブリッドソース #

HybridSourceは、具体的なsourcesのリストを含むソースです。 これは、異なるソースから入力を順次読み込んで単一の入力ストリームを生成するという問題を解決します。

例えば、ブートストラップのユースケースでは、Kafkaからの最新の無制限入力を続行する前に、S3から数日分の制限付き入力を読み取る必要がある場合があります。 アプリケーションを中断せずに協会付きファイル入力が終了すると、HybridSourceFileSourceからKafkaSourceに切り替わります。

HybridSourceが導入される前は、複数のソースを利用してトポロジを作成し、ユーザランドでスイッチングの仕組みを定義する必要がありました。これはおjペレーションの複雑さと非効率さに繋がりました。

HybridSourceを使うと、FlinkジョブグラフとDataStream APIの観点から、複数のソースが1つのソースとしても表示されます。

背景の詳細については、FLIP-150を参照してください。

コネクタを使うには、プロジェクトにflink-connector-baseの依存関係を追加する必要があいます:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-base</artifactId>
    <version>1.19-SNAPSHOT</version>
</dependency>

(通常、具体的なソースとの推移的な依存関係として発生します)

次のソースの開始位置 #

複数のソースをHybridSourceに配置するには、最後のソースを除くすべてのソースを制限付きにする必要があります。したがって、通常ソースには開始位置と終了位置を割り当てる必要があります。HybridSourceが制限付きの場合は最後のソースも制限付きであり、そうでなければ無制限です。 詳細は、特定のソースと外部ストレージシステムによって異なります。

ここでは、File/Kafkaの例に従って、最も基本的なシナリオを取り上げ、それからより複雑なシナリオを取り上げます。

グラフ構築時の固定の開始位置 #

例: 事前に決定された切り替え時間までファイルから読み取り、その後Kafkaから読み取りを続けます。 各ソースは事前に既知の範囲をカバーしているため、含まれるソースは直接使われているかのように事前に作成できます:

long switchTimestamp = ...; // derive from file input paths
FileSource<String> fileSource =
  FileSource.forRecordStreamFormat(new TextLineInputFormat(), Path.fromLocalFile(testDir)).build();
KafkaSource<String> kafkaSource =
          KafkaSource.<String>builder()
                  .setStartingOffsets(OffsetsInitializer.timestamp(switchTimestamp + 1))
                  .build();
HybridSource<String> hybridSource =
          HybridSource.builder(fileSource)
                  .addSource(kafkaSource)
                  .build();
switch_timestamp = ... # derive from file input paths
file_source = FileSource \
    .for_record_stream_format(StreamFormat.text_line_format(), test_dir) \
    .build()
kafka_source = KafkaSource \
    .builder() \
    .set_bootstrap_servers('localhost:9092') \
    .set_group_id('MY_GROUP') \
    .set_topics('quickstart-events') \
    .set_value_only_deserializer(SimpleStringSchema()) \
    .set_starting_offsets(KafkaOffsetsInitializer.timestamp(switch_timestamp)) \
    .build()
hybrid_source = HybridSource.builder(file_source).add_source(kafka_source).build()

切り替え時の動的な開始位置 #

例: ファイルソースが非常に大きなバックログを読み取るため、次のソースで利用可能な保存期間よりも時間が掛かる可能性があります。 切り替えは、“current time - X"に発生する必要があります。このことは、切替時に次のソースの開始時間を設定することを必要とします。 ここで、SourceFactoryを実装してKafkaSourceの遅延構築のために前のenumeratorから終了位置を転送する必要があります。

enumeratorは終了のタイムスタンプの取得をサポートする必要があります。現在、これにはソースのカスタマイズが必要とするかもしれません。 FileSourceへの動的な終了位置のサポートの追加は、FLINK-23633で追跡されています。

FileSource<String> fileSource = CustomFileSource.readTillOneDayFromLatest();
HybridSource<String> hybridSource =
    HybridSource.<String, CustomFileSplitEnumerator>builder(fileSource)
        .addSource(
            switchContext -> {
              CustomFileSplitEnumerator previousEnumerator =
                  switchContext.getPreviousEnumerator();
              // how to get timestamp depends on specific enumerator
              long switchTimestamp = previousEnumerator.getEndTimestamp();
              KafkaSource<String> kafkaSource =
                  KafkaSource.<String>builder()
                      .setStartingOffsets(OffsetsInitializer.timestamp(switchTimestamp + 1))
                      .build();
              return kafkaSource;
            },
            Boundedness.CONTINUOUS_UNBOUNDED)
        .build();
Still not supported in Python API.
inserted by FC2 system