This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.
データソース #
このページでは、FlinkのデータソースAPIとその背後にある概念とアーキテクチャーについて説明します。 FlinkでデータソースAPIがどのように機能するか、あるいは新しいデータソースを実装したい場合は、これをお読みください。
事前定義されたソースコネクタを探している場合は、コネクタドキュメントを確認してください。
データソースの概念 #
コアコンポーネント
データソースには3つのコアコンポーネントがあります: Splits、SplitEnumerator、SourceReader。
-
Splitは、ファイルやログ分割のような、ソースで消費されるデータの一部です。Splitsは、ソースが作業を分割し、データの読み取りを並列化する粒度です。
-
SourceReaderはSplitsを要求し、Splitで表されるファイルやログの分割を読み取るなどして、それらを処理します。SourceReadersは
SourceOperators
のTaskManager上で並行して実行され、イベント/レコードの並列ストリームを生成します。 -
SplitEnumeratorはSplitsを生成し、それらをSourceReadersに割り当てます。これは、Job Manager上で単一のインスタンスとして実行され、保留中のSplitsのバックログを維持し、それらをバランスのとれた方法でリーダーに割り当てる役割があります。
Source クラスは、上記の3つのコンポーネントを組み合わせるAPIエントリポイントです。
ストリーミングとバッチを横断した統合
データソースAPIは、無制限のストリーミングソースと制限付きのバッチソースの両方を統合された方法でサポートします。
両方のケースの違いは最小限です: 制限付き/バッチのケースでは、enumeratorはsplitの固定セットを生成し、各splitは必然的に有限になります。無制限のストリーミングの場合、2つのうち1つは真ではありません(splitは有限ではないか、enumeratorは新しいsplitを生成し続けます)。
例 #
以下は、ストリーミングとバッチの場合でデータソースのコンポーネントがどのようにやり取りするかを示す、いくつかの簡略化された概念的な例を示しています。
これはKafkaとFileソースの実装がどのように機能するかを正確には説明していないことに注意してください; 説明を目的として、各部分は簡略化されています。
制限付きファイルソース
ソースには、読み取るディレクトリのURI/Pathと、ファイルの解析方法を定義するFormatがあります。
- Splitはファイル、またはファイルのregionです(データ形式がファイルの分割をサポートしている場合)。
- SplitEnumeratorはシチエされたディレクトリパスにある全てのファイルを一覧表示します。Splitsを要求する次のリーダーにSplitsを割り当てます。全てのSplitsが割り当てられると、NoMoreSplitsでリクエストに応答します。
- SourceReaderはSplitを要求し、割り当てられたSplit(ファイルまたはファイルのregion)を読み込み、指定されたFormatを使って解析します。新しいSplitが取得されず、NoMoreSplitsメッセージを取得した場合は、終了します。
無制限のストリーミングファイルソース
SplitEnumeratorがNoMoreSplitsに応答しないことを除いて、このソースは上記と同じように動作し、定期的に指定されたURI/Pathの内容を一覧表示して新しいファイルを調べます。新しいファイルが見つかると、それらの新しいSplitsを生成し、利用可能なSourceReaderに割り当てることができます。
無制限のストリーミングKafkaソース
ソースにはKafkaトピック(またはトピックのリストまたはトピックの正規表現)と、レコードを解析するためのDeserializerがあります。
- SplitはKafkaトピックパーティションです。
- SplitEnumeratorはブローカーに接続して、購読トピックに関連する全てのトピックパーティションを一覧表示します。enumeratorはオプションでこの操作を繰り返して新しく追加されたトピック/パーティションを検出できます。
- SourceReaderはKafkaConsumerを使って割り当てられたSplits(トピックパーティション)を読み取り、指定されたDesirializerを使ってレコードを逆シリアライズします。splits (トピックパーティション)には終了が無いため、リーダーがデータの終了に到達することはありません。
制限のあるKafkaソース
各Split(トピックパーティション)に定義された終了オフセットがあることを除いて、上記と同じです。SourceReaderがSplitの終了オフセットに到達すると、Splitを終了します。全ての割り当てられたSplitが終了すると、SourceReaderが終了します。
Data Source API #
このセクションでは、FLIP-27で導入された新しいソースAPIの主要なインタフェースについて説明し、開発者にソース開発に関するヒントを提供します。
ソース #
Source APIは、以下のコンポーネントを作成するためのファクトリ形式のインタフェースです。
- Split Enumerator
- Source Reader
- Split Serializer
- Enumerator Checkpoint Serializer
これに加えて、ソースはソースの boundedness 属性を提供するため、FlinkはFlinkジョブを実行するための適切なモードを選択できます。
ソースのインスタンスは実行時にシリアライズされFlinkクラスタにアップロードされるため、ソースの実装はシリアライズ化可能である必要があります。
SplitEnumerator #
SplitEnumerator
はソースの"頭脳"であることが期待されます。SplitEnumerator
の一般的な実装では、以下のことを行います:
SourceReader
登録処理SourceReader
障害処理addSplitsBack()
メソッドはSourceReader
が失敗した時に呼び出されます。SplitEnumeratorは、失敗したSourceReader
によって通知されていないsplit割り当てを撤回する必要があります。
SourceEvent
処理SourceEvent
はSplitEnumerator
とSourceReader
の間で送信される独自のイベントです。実装はこの仕組みを利用して高度な調整を実行できます。
- Splitの検出と割り当て
SplitEnumerator
は、新しいsplitの検出、新しいSourceReader
の登録、SourceReader
の障害などを含む、様々なイベントに対応してsplitsを割り当てることができます。
SplitEnumerator
は、SplitEnumerator
の作成または復元時にSource
に提供される
SplitEnumeratorContext
を利用して上記の作業を実行できます。
SplitEnumeratorContext
により、SplitEnumerator
はリーダーの必要な情報を取得し、調整アクションを実行できるようになります。
Source
の実装はSplitEnumeratorContext
をSplitEnumerator
インスタンスに渡すことが期待されます。
SplitEnumerator
実装はメソッドが呼び出された時のみ調整アクションを実行することで事後対応的に適切に機能しますが、SplitEnumerator
実装はアクティブにアクションを実行したほうが良いです。例えば、SplitEnumerator
は定期的にsplit検出を実行し、新しいsplitをSourceReaders
に割り当てたほうが良いです。
そのような実装では、SplitEnumeratorContext
のcallAsync()
メソッドが便利な場合があります。以下のコードスニペットは、SplitEnumerator
実装が独自のスレッドを維持せずにこれをどう実現できるかを示しています。
class MySplitEnumerator implements SplitEnumerator<MySplit, MyCheckpoint> {
private final long DISCOVER_INTERVAL = 60_000L;
private final SplitEnumeratorContext<MySplit> enumContext ;
/** The Source creates instances of SplitEnumerator and provides the context. */
MySplitEnumerator(SplitEnumeratorContext<MySplit> enumContext) {
this.enumContext = enumContext;
}
/**
* A method to discover the splits.
*/
private List<MySplit> discoverSplits() {...}
@Override
public void start() {
...
enumContext.callAsync(this::discoverSplits, (splits, thrown) -> {
Map<Integer, List<MySplit>> assignments = new HashMap<>();
int parallelism = enumContext.currentParallelism();
for (MySplit split : splits) {
int owner = split.splitId().hashCode() % parallelism;
assignments.computeIfAbsent(owner, s -> new ArrayList<>()).add(split);
}
enumContext.assignSplits(new SplitsAssignment<>(assignments));
}, 0L, DISCOVER_INTERVAL);
...
}
...
}
また、Python APIではサポートされません。
SourceReader #
SourceReader は、タスクマネージャーで実行されるコンポーネントでSplitsからのレコードを消費します。
SourceReader
は、プルベースの消費インタフェースを公開します。Flinkタスクは、ループ内でpollNext(ReaderOutput)
を呼び出し続け、SourceReader
からレコードをポーリングします。pollNext(ReaderOutput)
メソッドの返り値は、ソースリーダーのステータスを示します。
MORE_AVAILABLE
- SourceReaderにはすぐに利用できるさらに多くのレコードがあります。NOTHING_AVAILABLE
- SourceReaderには現時点ではこれ以上のレコードはありませんが、将来的にはさらに多くのレコードがある可能性があります。END_OF_INPUT
- SourceReaderは全てのレコードを使い果たし、データの終わりに達しました。これは、SourceReaderを閉じることができることを意味します。
パフォーマンスを考慮して、ReaderOutput
がpollNext(ReaderOutput)
メソッドに提供されるため、SourceReader
は必要に応じて一回のpollNext()の呼び出しで複数のレコードを呼び出すことができます。例えば、外部システムがブロックの粒度で動作する場合があります。ブロックには複数のレコードを含めることができますが、ソースはブロックの境界でのみチェックポイントを作成できます。この場合、SourceReader
は一度に1つのブロック内の全てのレコードをReaderOutput
に発行できます。
**ただし、SourceReader
の実装は必要な場合を除き単一のpollNext(ReaderOutput)
呼び出しで複数のレコードを発行することは避けるべきです。**これはSourceReader
からポーリングしているタスクスレッドはイベントループで動作するため、ブロックできないからです。
SourceReader
の全ての状態は、snapshotState()
呼び出しで返されるSourceSplit
の中で維持される必要があります。これにより、必要に応じてSourceSplit
を他のSourceReaders
に再割り当てすることができます。
SourceReaderContext
はSourceReader
の作成時にSource
に提供されます。Source
がコンテキストをSourceReader
インスタンスに渡すことが期待されます。SourceReader
はSourceReaderContext
を介してSourceEvent
をSplitEnumerator
に送信できます。Source
の典型的なデザインパターンは、SourceReader
にローカル情報をグローバルな視点で決定を行うSplitEnumerator
に報告させます。
SourceReader
APIは、ユーザがsplitを手動で処理し、レコードを取得して引き渡すための独自のスレッドモデルを使えるようにする低レベルのAPIです。SourceReader
の実装を容易にするために、FlinkはSourceReader
の作成に必要な作業量を大幅に削減する
SourceReaderBase
クラスを提供します。
コネクタの開発者はSourceReader
を最初から作成するのではなくSourceReaderBase
を利用することを強くお勧めします。詳細については、Split Reader APIセクションをご覧ください。
ソースの使用 #
Source
からDataStream
を作成するには、Source
をStreamExecutionEnvironment
に渡す必要があります。例えば:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Source mySource = new MySource(...);
DataStream<Integer> stream = env.fromSource(
mySource,
WatermarkStrategy.noWatermarks(),
"MySourceName");
...
val env = StreamExecutionEnvironment.getExecutionEnvironment()
val mySource = new MySource(...)
val stream = env.fromSource(
mySource,
WatermarkStrategy.noWatermarks(),
"MySourceName")
...
env = StreamExecutionEnvironment.get_execution_environment()
my_source = ...
env.from_source(
my_source,
WatermarkStrategy.no_watermarks(),
"my_source_name")
Split Reader API #
コアのSourceReader
APIは完全に非同期であり、splitの非同期の読み込みを手動で管理する実装を必要とします。
ただし、実際には、ほとんどのソースは、クライアント(例えばKafkaConsumer
)での*poll()*呼び出しや分散ファイルシステム(HDFS、 S3、…)でのI/O操作のブロックなどの、ブロック操作を実行します。これを非同期ソースAPIと互換性を持たせるには、これらのブロッキング(同期)操作を別のスレッドで実行し、リーダーの非同期部分にデータを渡す必要があります。
SplitReader は、ファイル読み取り、Kafkaなどの単純な同期読み取り/ポーリングベースのソース実装のための高レベルAPIです。
コアはSourceReaderBase
クラスで、SplitReader
を受け取り、SplitReader
を実行するフェッチャースレッドを作成し、様々な消費スレッドモデルをサポートします。
SplitReader #
SplitReader
APIには次の3つのメソッドしかありません:
- RecordsWithSplitIds を返すブロッキングフェッチメソッド
- splitの変更を処理するための非ブロックメソッド。
- ブロッキングフェッチ操作を起動するための非ブロックの起動メソッド。
SplitReader
は外部のシステムからのレコードの読み込みのみに焦点を当てているため、SourceReader
に比べてかなり単純です。
詳細については、クラスのJavaドキュメントを確認してください。
SourceReaderBase #
SourceReader
実装が以下のことを行うことは非常に一般的です:
- 外部システムのsplitからブロック的な方法でフェッチするスレッドのプールを持ちます。
- 内部フェッチスレッドと
pollNext(ReaderOutput)
のような他のメソッドの呼び出しとの間の同期を処理します。 - ウォーターマークの位置合わせのためにsplitごとのウォーターマークを維持します。
- チェックポイントの各splitの状態を維持します。
新しいSourceReader
を書き込む作業を減らすために、Flinkは、SourceReader
の基本実装として機能する
SourceReaderBase
クラスを提供します。
SourceReaderBase
は上記全ての作業をそのまま実行します。新しいSourceReader
を書くには、SourceReader
実装をSourceReaderBase
から継承させ、幾つかのメソッドを埋め、高レベル
SplitReader
を実装します。
SplitFetcherManager #
SourceReaderBase
は、動作する
SplitFetcherManager
の動作に応じて、そのままでいくつかのスレッドモデルをサポートします。
SplitFetcherManager
はSplitReader
でそれぞれフェッチするSplitFetcher
のプールの作成と維持をサポートします。splitを各splitフェッチャーに割り当てる方法も決定します。
例として、以下に示すように、SplitFetcherManager
は個定数のスレッドを持つことができ、各スレッドはSourceReader
に割り当てられた幾つかのsplitから取得されます。
以下のコードスニペットは、このスレッドモデルを実装します。
/**
* A SplitFetcherManager that has a fixed size of split fetchers and assign splits
* to the split fetchers based on the hash code of split IDs.
*/
public class FixedSizeSplitFetcherManager<E, SplitT extends SourceSplit>
extends SplitFetcherManager<E, SplitT> {
private final int numFetchers;
public FixedSizeSplitFetcherManager(
int numFetchers,
FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
Supplier<SplitReader<E, SplitT>> splitReaderSupplier) {
super(elementsQueue, splitReaderSupplier);
this.numFetchers = numFetchers;
// Create numFetchers split fetchers.
for (int i = 0; i < numFetchers; i++) {
startFetcher(createSplitFetcher());
}
}
@Override
public void addSplits(List<SplitT> splitsToAdd) {
// Group splits by their owner fetchers.
Map<Integer, List<SplitT>> splitsByFetcherIndex = new HashMap<>();
splitsToAdd.forEach(split -> {
int ownerFetcherIndex = split.hashCode() % numFetchers;
splitsByFetcherIndex
.computeIfAbsent(ownerFetcherIndex, s -> new ArrayList<>())
.add(split);
});
// Assign the splits to their owner fetcher.
splitsByFetcherIndex.forEach((fetcherIndex, splitsForFetcher) -> {
fetchers.get(fetcherIndex).addSplits(splitsForFetcher);
});
}
}
また、Python APIではサポートされません。
このスレッドモデルを使うSourceReader
は以下のようにして作成できます:
public class FixedFetcherSizeSourceReader<E, T, SplitT extends SourceSplit, SplitStateT>
extends SourceReaderBase<E, T, SplitT, SplitStateT> {
public FixedFetcherSizeSourceReader(
FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
Supplier<SplitReader<E, SplitT>> splitFetcherSupplier,
RecordEmitter<E, T, SplitStateT> recordEmitter,
Configuration config,
SourceReaderContext context) {
super(
elementsQueue,
new FixedSizeSplitFetcherManager<>(
config.getInteger(SourceConfig.NUM_FETCHERS),
elementsQueue,
splitFetcherSupplier),
recordEmitter,
config,
context);
}
@Override
protected void onSplitFinished(Map<String, SplitStateT> finishedSplitIds) {
// Do something in the callback for the finished splits.
}
@Override
protected SplitStateT initializedState(SplitT split) {
...
}
@Override
protected SplitT toSplitType(String splitId, SplitStateT splitState) {
...
}
}
また、Python APIではサポートされません。
SourceReader
実装は、SplitFetcherManager
とSourceReaderBase
の上に簡単にそれらの独自のスレッドモデルを実装することもできます。
イベント時間とウォーターマーク #
イベント時間の割り当てとウォーターマーク生成は、データソースの一部として発生します。ソースリーダーから送信されるイベントストリームにはイベントタイムスタンプがあり、(ストリーミング実行中に)ウォーターマークを含みます。イベント時間とウォーターマークの概要については、タイムリーなストリーム処理を参照してください。
従来に基づいたアプリケーション
SourceFunction
は通常はstream.assignTimestampsAndWatermarks(WatermarkStrategy)
を介して、後の別のステップでタイムスタンプとウォーターマークを生成します。タイムスタンプは既に割り当てられていて以前のsplit対応のウォーターマークを上書きするため、この関数は新しいソースで使わないでください。
API #
WatermarkStrategy
は、DataStream APIでの作成中にソースに渡され、
TimestampAssigner
と
WatermarkGenerator
の両方を作成します。
environment.fromSource(
Source<OUT, ?, ?> source,
WatermarkStrategy<OUT> timestampsAndWatermarks,
String sourceName);
environment.from_source(
source: Source,
watermark_strategy: WatermarkStrategy,
source_name: str,
type_info: TypeInformation = None)
TimestampAssigner
とWatermarkGenerator
はReaderOutput
(またはSourceOutput
)の一部として透過的に実行されるため、ソースの実装者はタイムスタンプ抽出やウォーターマーク生成コードを実装する必要はありません。
イベントタイムスタンプ #
イベントタイムスタンプは次の2つのステップで割り当てられます:
-
SourceReaderは*、
SourceOutput.collect(event, timestamp)
を呼び出すことで、ソースレコードのタイムスタンプ*をイベントにアタッチすることができます. これは、Kafka、Kinesis、Pulsar、Pravegaなどレコードベースのデータソースにのみ関係します。 タイムスタンプのあるレコードに基づいていないソース(ファイルなど)には、ソースレコードのタイムスタンプがありません。 このステップはソースコネクタの実装の一部であり、ソースを使うアプリケーションによってパラメータ化されることはありません。 -
アプリケーションによって設定された
TimestampAssigner
が最終的なタイムスタンプを割り当てます。TimestampAssigner
はソースレコードのタイムスタンプとイベントを確認します。assignerはソースレコードのタイムスタンプを使うかイベントのフィールドにアクセスして、最終的なイベントのタイムスタンプを取得できます。
この2段階のアプローチにより、ユーザはソースシステムからのタイムスタンプと、イベントデータ内のタイムスタンプの両方をイベントタイムスタンプとして参照できます。
注意: ソースレコードのタイムスタンプの無いデータソース(ファイルなど)を使い、ソースレコードのタイムスタンプを最終的なイベントタイムスタンプとして使う場合、イベントはLONG_MIN
*(=-9,223,372,036,854,775,808)*に等しいデフォルトのタイムスタンプを取得します。
ウォーターマークの生成 #
ウォーターマークのgeneratorsはストリーミングの実行中のみアクティブになります。バッチ実行はウォーターマークGeneratorsを停止します; 以下で説明する全ての関連操作は事実上no-opになります。
データソースAPIは、ウォーターマークgeneratorsのsplitごとの個別の実行をサポートします。これにより、Flinkはsplitごとのイベント時間の進捗を個別に監視できます。これはイベント時間のスキューを適切に処理し、アプリケーション全体のイベント時間の進捗を妨げないようにするために重要です。
Split Reader APIを使ってソースコネクタを実装する場合、これは自動的に処理されます。Split Reader APIに基づくすべての実装には、すぐに使えるsplit対応のウォーターマークがあります。
低レベルSourceReader
APIの実装でsplit対応ウォーターマーク生成を使うには、実装でイベントを異なるsplitから異なるoutputに出力する必要があります: Split-local SourceOutputs。Split-localは、createOutputForSplit(splitId)
とreleaseOutputForSplit(splitId)
メソッドを介して、main
ReaderOutput
で作成およびリリースできます。詳細については、クラスとメソッドのJavaDocsを参照してください。
Splitレベルウォーターマーク配置 #
ソースオペレータのウォーターマークの配置はFlinkランタイムによって処理されますが、splitレベルのウォーターマークの配置を実現するには、ソースでSourceReader#pauseOrResumeSplits
とSplitReader#pauseOrResumeSplits
を追加で実装する必要があります。Splitレベルのウォーターマークの配置は、複数のsplitsがソースリーダーに割り当てられている場合に役立ちます。By default, these implementations will throw an UnsupportedOperationException
, pipeline.watermark-alignment.allow-unaligned-source-splits
is set to false, when there is more than one split assigned, and the split exceeds the watermark alignment threshold configured by the WatermarkStrategy
. SourceReaderBase
にはSourceReader#pauseOrResumeSplits
の実装が含まれているため、継承するソースはSplitReader#pauseOrResumeSplits
を実装するだけで済みます。実装のヒントの詳細についてはjavadocsを参照してください。