This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.
ハイブリッドソース #
HybridSource
は、具体的なsourcesのリストを含むソースです。
これは、異なるソースから入力を順次読み込んで単一の入力ストリームを生成するという問題を解決します。
例えば、ブートストラップのユースケースでは、Kafkaからの最新の無制限入力を続行する前に、S3から数日分の制限付き入力を読み取る必要がある場合があります。
アプリケーションを中断せずに協会付きファイル入力が終了すると、HybridSource
はFileSource
からKafkaSource
に切り替わります。
HybridSource
が導入される前は、複数のソースを利用してトポロジを作成し、ユーザランドでスイッチングの仕組みを定義する必要がありました。これはおjペレーションの複雑さと非効率さに繋がりました。
HybridSource
を使うと、FlinkジョブグラフとDataStream
APIの観点から、複数のソースが1つのソースとしても表示されます。
背景の詳細については、FLIP-150を参照してください。
コネクタを使うには、プロジェクトにflink-connector-base
の依存関係を追加する必要があいます:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>1.19-SNAPSHOT</version>
</dependency>
(通常、具体的なソースとの推移的な依存関係として発生します)
次のソースの開始位置 #
複数のソースをHybridSource
に配置するには、最後のソースを除くすべてのソースを制限付きにする必要があります。したがって、通常ソースには開始位置と終了位置を割り当てる必要があります。HybridSource
が制限付きの場合は最後のソースも制限付きであり、そうでなければ無制限です。
詳細は、特定のソースと外部ストレージシステムによって異なります。
ここでは、File/Kafkaの例に従って、最も基本的なシナリオを取り上げ、それからより複雑なシナリオを取り上げます。
グラフ構築時の固定の開始位置 #
例: 事前に決定された切り替え時間までファイルから読み取り、その後Kafkaから読み取りを続けます。 各ソースは事前に既知の範囲をカバーしているため、含まれるソースは直接使われているかのように事前に作成できます:
long switchTimestamp = ...; // derive from file input paths
FileSource<String> fileSource =
FileSource.forRecordStreamFormat(new TextLineInputFormat(), Path.fromLocalFile(testDir)).build();
KafkaSource<String> kafkaSource =
KafkaSource.<String>builder()
.setStartingOffsets(OffsetsInitializer.timestamp(switchTimestamp + 1))
.build();
HybridSource<String> hybridSource =
HybridSource.builder(fileSource)
.addSource(kafkaSource)
.build();
switch_timestamp = ... # derive from file input paths
file_source = FileSource \
.for_record_stream_format(StreamFormat.text_line_format(), test_dir) \
.build()
kafka_source = KafkaSource \
.builder() \
.set_bootstrap_servers('localhost:9092') \
.set_group_id('MY_GROUP') \
.set_topics('quickstart-events') \
.set_value_only_deserializer(SimpleStringSchema()) \
.set_starting_offsets(KafkaOffsetsInitializer.timestamp(switch_timestamp)) \
.build()
hybrid_source = HybridSource.builder(file_source).add_source(kafka_source).build()
切り替え時の動的な開始位置 #
例: ファイルソースが非常に大きなバックログを読み取るため、次のソースで利用可能な保存期間よりも時間が掛かる可能性があります。
切り替えは、“current time - X"に発生する必要があります。このことは、切替時に次のソースの開始時間を設定することを必要とします。
ここで、SourceFactory
を実装してKafkaSource
の遅延構築のために前のenumeratorから終了位置を転送する必要があります。
enumeratorは終了のタイムスタンプの取得をサポートする必要があります。現在、これにはソースのカスタマイズが必要とするかもしれません。
FileSource
への動的な終了位置のサポートの追加は、FLINK-23633で追跡されています。
FileSource<String> fileSource = CustomFileSource.readTillOneDayFromLatest();
HybridSource<String> hybridSource =
HybridSource.<String, CustomFileSplitEnumerator>builder(fileSource)
.addSource(
switchContext -> {
CustomFileSplitEnumerator previousEnumerator =
switchContext.getPreviousEnumerator();
// how to get timestamp depends on specific enumerator
long switchTimestamp = previousEnumerator.getEndTimestamp();
KafkaSource<String> kafkaSource =
KafkaSource.<String>builder()
.setStartingOffsets(OffsetsInitializer.timestamp(switchTimestamp + 1))
.build();
return kafkaSource;
},
Boundedness.CONTINUOUS_UNBOUNDED)
.build();