Data Sources
This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.

データソース #

このページでは、FlinkのデータソースAPIとその背後にある概念とアーキテクチャーについて説明します。 FlinkでデータソースAPIがどのように機能するか、あるいは新しいデータソースを実装したい場合は、これをお読みください。

事前定義されたソースコネクタを探している場合は、コネクタドキュメントを確認してください。

データソースの概念 #

コアコンポーネント

データソースには3つのコアコンポーネントがあります: SplitsSplitEnumeratorSourceReader

  • Splitは、ファイルやログ分割のような、ソースで消費されるデータの一部です。Splitsは、ソースが作業を分割し、データの読み取りを並列化する粒度です。

  • SourceReaderSplitsを要求し、Splitで表されるファイルやログの分割を読み取るなどして、それらを処理します。SourceReadersSourceOperatorsのTaskManager上で並行して実行され、イベント/レコードの並列ストリームを生成します。

  • SplitEnumeratorSplitsを生成し、それらをSourceReadersに割り当てます。これは、Job Manager上で単一のインスタンスとして実行され、保留中のSplitsのバックログを維持し、それらをバランスのとれた方法でリーダーに割り当てる役割があります。

Source クラスは、上記の3つのコンポーネントを組み合わせるAPIエントリポイントです。

Illustration of SplitEnumerator and SourceReader interacting

ストリーミングとバッチを横断した統合

データソースAPIは、無制限のストリーミングソースと制限付きのバッチソースの両方を統合された方法でサポートします。

両方のケースの違いは最小限です: 制限付き/バッチのケースでは、enumeratorはsplitの固定セットを生成し、各splitは必然的に有限になります。無制限のストリーミングの場合、2つのうち1つは真ではありません(splitは有限ではないか、enumeratorは新しいsplitを生成し続けます)。

#

以下は、ストリーミングとバッチの場合でデータソースのコンポーネントがどのようにやり取りするかを示す、いくつかの簡略化された概念的な例を示しています。

これはKafkaとFileソースの実装がどのように機能するかを正確には説明していないことに注意してください; 説明を目的として、各部分は簡略化されています。

制限付きファイルソース

ソースには、読み取るディレクトリのURI/Pathと、ファイルの解析方法を定義するFormatがあります。

  • Splitはファイル、またはファイルのregionです(データ形式がファイルの分割をサポートしている場合)。
  • SplitEnumeratorはシチエされたディレクトリパスにある全てのファイルを一覧表示します。Splitsを要求する次のリーダーにSplitsを割り当てます。全てのSplitsが割り当てられると、NoMoreSplitsでリクエストに応答します。
  • SourceReaderはSplitを要求し、割り当てられたSplit(ファイルまたはファイルのregion)を読み込み、指定されたFormatを使って解析します。新しいSplitが取得されず、NoMoreSplitsメッセージを取得した場合は、終了します。

無制限のストリーミングファイルソース

SplitEnumeratorNoMoreSplitsに応答しないことを除いて、このソースは上記と同じように動作し、定期的に指定されたURI/Pathの内容を一覧表示して新しいファイルを調べます。新しいファイルが見つかると、それらの新しいSplitsを生成し、利用可能なSourceReaderに割り当てることができます。

無制限のストリーミングKafkaソース

ソースにはKafkaトピック(またはトピックのリストまたはトピックの正規表現)と、レコードを解析するためのDeserializerがあります。

  • SplitはKafkaトピックパーティションです。
  • SplitEnumeratorはブローカーに接続して、購読トピックに関連する全てのトピックパーティションを一覧表示します。enumeratorはオプションでこの操作を繰り返して新しく追加されたトピック/パーティションを検出できます。
  • SourceReaderはKafkaConsumerを使って割り当てられたSplits(トピックパーティション)を読み取り、指定されたDesirializerを使ってレコードを逆シリアライズします。splits (トピックパーティション)には終了が無いため、リーダーがデータの終了に到達することはありません。

制限のあるKafkaソース

各Split(トピックパーティション)に定義された終了オフセットがあることを除いて、上記と同じです。SourceReaderがSplitの終了オフセットに到達すると、Splitを終了します。全ての割り当てられたSplitが終了すると、SourceReaderが終了します。

Data Source API #

このセクションでは、FLIP-27で導入された新しいソースAPIの主要なインタフェースについて説明し、開発者にソース開発に関するヒントを提供します。

ソース #

Source APIは、以下のコンポーネントを作成するためのファクトリ形式のインタフェースです。

  • Split Enumerator
  • Source Reader
  • Split Serializer
  • Enumerator Checkpoint Serializer

これに加えて、ソースはソースの boundedness 属性を提供するため、FlinkはFlinkジョブを実行するための適切なモードを選択できます。

ソースのインスタンスは実行時にシリアライズされFlinkクラスタにアップロードされるため、ソースの実装はシリアライズ化可能である必要があります。

SplitEnumerator #

SplitEnumerator はソースの"頭脳"であることが期待されます。SplitEnumeratorの一般的な実装では、以下のことを行います:

  • SourceReader 登録処理
  • SourceReader 障害処理
    • addSplitsBack()メソッドはSourceReaderが失敗した時に呼び出されます。SplitEnumeratorは、失敗したSourceReaderによって通知されていないsplit割り当てを撤回する必要があります。
  • SourceEvent処理
    • SourceEventSplitEnumeratorSourceReaderの間で送信される独自のイベントです。実装はこの仕組みを利用して高度な調整を実行できます。
  • Splitの検出と割り当て
    • SplitEnumeratorは、新しいsplitの検出、新しいSourceReaderの登録、SourceReaderの障害などを含む、様々なイベントに対応してsplitsを割り当てることができます。

SplitEnumeratorは、SplitEnumeratorの作成または復元時にSourceに提供される SplitEnumeratorContext を利用して上記の作業を実行できます。 SplitEnumeratorContextにより、SplitEnumeratorはリーダーの必要な情報を取得し、調整アクションを実行できるようになります。 Sourceの実装はSplitEnumeratorContextSplitEnumeratorインスタンスに渡すことが期待されます。

SplitEnumerator実装はメソッドが呼び出された時のみ調整アクションを実行することで事後対応的に適切に機能しますが、SplitEnumerator実装はアクティブにアクションを実行したほうが良いです。例えば、SplitEnumeratorは定期的にsplit検出を実行し、新しいsplitをSourceReadersに割り当てたほうが良いです。 そのような実装では、SplitEnumeratorContextcallAsync()メソッドが便利な場合があります。以下のコードスニペットは、SplitEnumerator実装が独自のスレッドを維持せずにこれをどう実現できるかを示しています。

class MySplitEnumerator implements SplitEnumerator<MySplit, MyCheckpoint> {
    private final long DISCOVER_INTERVAL = 60_000L;

    private final SplitEnumeratorContext<MySplit> enumContext            ;

    /** The Source creates instances of SplitEnumerator and provides the context. */
    MySplitEnumerator(SplitEnumeratorContext<MySplit> enumContext) {
        this.enumContext = enumContext;
    }

    /**
     * A method to discover the splits.
     */
    private List<MySplit> discoverSplits() {...}
    
    @Override
    public void start() {
        ...
        enumContext.callAsync(this::discoverSplits, (splits, thrown) -> {
            Map<Integer, List<MySplit>> assignments = new HashMap<>();
            int parallelism = enumContext.currentParallelism();
            for (MySplit split : splits) {
                int owner = split.splitId().hashCode() % parallelism;
                assignments.computeIfAbsent(owner, s -> new ArrayList<>()).add(split);
            }
            enumContext.assignSplits(new SplitsAssignment<>(assignments));
        }, 0L, DISCOVER_INTERVAL);
        ...
    }
    ...
}
またPython APIではサポートされません

SourceReader #

SourceReader は、タスクマネージャーで実行されるコンポーネントでSplitsからのレコードを消費します。

SourceReaderは、プルベースの消費インタフェースを公開します。Flinkタスクは、ループ内でpollNext(ReaderOutput)を呼び出し続け、SourceReaderからレコードをポーリングします。pollNext(ReaderOutput)メソッドの返り値は、ソースリーダーのステータスを示します。

  • MORE_AVAILABLE - SourceReaderにはすぐに利用できるさらに多くのレコードがあります。
  • NOTHING_AVAILABLE - SourceReaderには現時点ではこれ以上のレコードはありませんが、将来的にはさらに多くのレコードがある可能性があります。
  • END_OF_INPUT - SourceReaderは全てのレコードを使い果たし、データの終わりに達しました。これは、SourceReaderを閉じることができることを意味します。

パフォーマンスを考慮して、ReaderOutputpollNext(ReaderOutput)メソッドに提供されるため、SourceReaderは必要に応じて一回のpollNext()の呼び出しで複数のレコードを呼び出すことができます。例えば、外部システムがブロックの粒度で動作する場合があります。ブロックには複数のレコードを含めることができますが、ソースはブロックの境界でのみチェックポイントを作成できます。この場合、SourceReaderは一度に1つのブロック内の全てのレコードをReaderOutputに発行できます。 **ただし、SourceReaderの実装は必要な場合を除き単一のpollNext(ReaderOutput)呼び出しで複数のレコードを発行することは避けるべきです。**これはSourceReaderからポーリングしているタスクスレッドはイベントループで動作するため、ブロックできないからです。

SourceReaderの全ての状態は、snapshotState()呼び出しで返されるSourceSplitの中で維持される必要があります。これにより、必要に応じてSourceSplitを他のSourceReadersに再割り当てすることができます。

SourceReaderContextSourceReaderの作成時にSourceに提供されます。SourceがコンテキストをSourceReaderインスタンスに渡すことが期待されます。SourceReaderSourceReaderContextを介してSourceEventSplitEnumeratorに送信できます。Sourceの典型的なデザインパターンは、SourceReaderにローカル情報をグローバルな視点で決定を行うSplitEnumeratorに報告させます。

SourceReader APIは、ユーザがsplitを手動で処理し、レコードを取得して引き渡すための独自のスレッドモデルを使えるようにする低レベルのAPIです。SourceReaderの実装を容易にするために、FlinkはSourceReaderの作成に必要な作業量を大幅に削減する SourceReaderBase クラスを提供します。 コネクタの開発者はSourceReaderを最初から作成するのではなくSourceReaderBaseを利用することを強くお勧めします。詳細については、Split Reader APIセクションをご覧ください。

ソースの使用 #

SourceからDataStreamを作成するには、SourceStreamExecutionEnvironmentに渡す必要があります。例えば:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

Source mySource = new MySource(...);

DataStream<Integer> stream = env.fromSource(
        mySource,
        WatermarkStrategy.noWatermarks(),
        "MySourceName");
...
val env = StreamExecutionEnvironment.getExecutionEnvironment()

val mySource = new MySource(...)

val stream = env.fromSource(
      mySource,
      WatermarkStrategy.noWatermarks(),
      "MySourceName")
...
env = StreamExecutionEnvironment.get_execution_environment()

my_source = ...

env.from_source(
    my_source,
    WatermarkStrategy.no_watermarks(),
    "my_source_name")

Split Reader API #

コアのSourceReader APIは完全に非同期であり、splitの非同期の読み込みを手動で管理する実装を必要とします。 ただし、実際には、ほとんどのソースは、クライアント(例えばKafkaConsumer)での*poll()*呼び出しや分散ファイルシステム(HDFS、 S3、…)でのI/O操作のブロックなどの、ブロック操作を実行します。これを非同期ソースAPIと互換性を持たせるには、これらのブロッキング(同期)操作を別のスレッドで実行し、リーダーの非同期部分にデータを渡す必要があります。

SplitReader は、ファイル読み取り、Kafkaなどの単純な同期読み取り/ポーリングベースのソース実装のための高レベルAPIです。

コアはSourceReaderBaseクラスで、SplitReaderを受け取り、SplitReaderを実行するフェッチャースレッドを作成し、様々な消費スレッドモデルをサポートします。

SplitReader #

SplitReader APIには次の3つのメソッドしかありません:

  • RecordsWithSplitIds を返すブロッキングフェッチメソッド
  • splitの変更を処理するための非ブロックメソッド。
  • ブロッキングフェッチ操作を起動するための非ブロックの起動メソッド。

SplitReaderは外部のシステムからのレコードの読み込みのみに焦点を当てているため、SourceReaderに比べてかなり単純です。 詳細については、クラスのJavaドキュメントを確認してください。

SourceReaderBase #

SourceReader実装が以下のことを行うことは非常に一般的です:

  • 外部システムのsplitからブロック的な方法でフェッチするスレッドのプールを持ちます。
  • 内部フェッチスレッドとpollNext(ReaderOutput)のような他のメソッドの呼び出しとの間の同期を処理します。
  • ウォーターマークの位置合わせのためにsplitごとのウォーターマークを維持します。
  • チェックポイントの各splitの状態を維持します。

新しいSourceReaderを書き込む作業を減らすために、Flinkは、SourceReaderの基本実装として機能する SourceReaderBase クラスを提供します。 SourceReaderBaseは上記全ての作業をそのまま実行します。新しいSourceReaderを書くには、SourceReader実装をSourceReaderBaseから継承させ、幾つかのメソッドを埋め、高レベル SplitReader を実装します。

SplitFetcherManager #

SourceReaderBaseは、動作する SplitFetcherManager の動作に応じて、そのままでいくつかのスレッドモデルをサポートします。 SplitFetcherManagerSplitReaderでそれぞれフェッチするSplitFetcherのプールの作成と維持をサポートします。splitを各splitフェッチャーに割り当てる方法も決定します。

例として、以下に示すように、SplitFetcherManagerは個定数のスレッドを持つことができ、各スレッドはSourceReaderに割り当てられた幾つかのsplitから取得されます。

One fetcher per split threading model.

以下のコードスニペットは、このスレッドモデルを実装します。

/**
 * A SplitFetcherManager that has a fixed size of split fetchers and assign splits
 * to the split fetchers based on the hash code of split IDs.
 */
public class FixedSizeSplitFetcherManager<E, SplitT extends SourceSplit>
        extends SplitFetcherManager<E, SplitT> {
    private final int numFetchers;

    public FixedSizeSplitFetcherManager(
            int numFetchers,
            FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
            Supplier<SplitReader<E, SplitT>> splitReaderSupplier) {
        super(elementsQueue, splitReaderSupplier);
        this.numFetchers = numFetchers;
        // Create numFetchers split fetchers.
        for (int i = 0; i < numFetchers; i++) {
            startFetcher(createSplitFetcher());
        }
    }

    @Override
    public void addSplits(List<SplitT> splitsToAdd) {
        // Group splits by their owner fetchers.
        Map<Integer, List<SplitT>> splitsByFetcherIndex = new HashMap<>();
        splitsToAdd.forEach(split -> {
            int ownerFetcherIndex = split.hashCode() % numFetchers;
            splitsByFetcherIndex
                    .computeIfAbsent(ownerFetcherIndex, s -> new ArrayList<>())
                    .add(split);
        });
        // Assign the splits to their owner fetcher.
        splitsByFetcherIndex.forEach((fetcherIndex, splitsForFetcher) -> {
            fetchers.get(fetcherIndex).addSplits(splitsForFetcher);
        });
    }
}
またPython APIではサポートされません

このスレッドモデルを使うSourceReaderは以下のようにして作成できます:

public class FixedFetcherSizeSourceReader<E, T, SplitT extends SourceSplit, SplitStateT>
        extends SourceReaderBase<E, T, SplitT, SplitStateT> {

    public FixedFetcherSizeSourceReader(
            FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
            Supplier<SplitReader<E, SplitT>> splitFetcherSupplier,
            RecordEmitter<E, T, SplitStateT> recordEmitter,
            Configuration config,
            SourceReaderContext context) {
        super(
                elementsQueue,
                new FixedSizeSplitFetcherManager<>(
                        config.getInteger(SourceConfig.NUM_FETCHERS),
                        elementsQueue,
                        splitFetcherSupplier),
                recordEmitter,
                config,
                context);
    }

    @Override
    protected void onSplitFinished(Map<String, SplitStateT> finishedSplitIds) {
        // Do something in the callback for the finished splits.
    }

    @Override
    protected SplitStateT initializedState(SplitT split) {
        ...
    }

    @Override
    protected SplitT toSplitType(String splitId, SplitStateT splitState) {
        ...
    }
}
またPython APIではサポートされません

SourceReader実装は、SplitFetcherManagerSourceReaderBaseの上に簡単にそれらの独自のスレッドモデルを実装することもできます。

イベント時間とウォーターマーク #

イベント時間の割り当てとウォーターマーク生成は、データソースの一部として発生します。ソースリーダーから送信されるイベントストリームにはイベントタイムスタンプがあり、(ストリーミング実行中に)ウォーターマークを含みます。イベント時間とウォーターマークの概要については、タイムリーなストリーム処理を参照してください。

従来に基づいたアプリケーション SourceFunction は通常はstream.assignTimestampsAndWatermarks(WatermarkStrategy)を介して、後の別のステップでタイムスタンプとウォーターマークを生成します。タイムスタンプは既に割り当てられていて以前のsplit対応のウォーターマークを上書きするため、この関数は新しいソースで使わないでください。

API #

WatermarkStrategyは、DataStream APIでの作成中にソースに渡され、 TimestampAssigner WatermarkGenerator の両方を作成します。

environment.fromSource(
    Source<OUT, ?, ?> source,
    WatermarkStrategy<OUT> timestampsAndWatermarks,
    String sourceName);
environment.from_source(
    source: Source,
    watermark_strategy: WatermarkStrategy,
    source_name: str,
    type_info: TypeInformation = None) 

TimestampAssignerWatermarkGeneratorReaderOutput(またはSourceOutput)の一部として透過的に実行されるため、ソースの実装者はタイムスタンプ抽出やウォーターマーク生成コードを実装する必要はありません。

イベントタイムスタンプ #

イベントタイムスタンプは次の2つのステップで割り当てられます:

  1. SourceReaderは*、SourceOutput.collect(event, timestamp)を呼び出すことで、ソースレコードのタイムスタンプ*をイベントにアタッチすることができます. これは、Kafka、Kinesis、Pulsar、Pravegaなどレコードベースのデータソースにのみ関係します。 タイムスタンプのあるレコードに基づいていないソース(ファイルなど)には、ソースレコードのタイムスタンプがありません。 このステップはソースコネクタの実装の一部であり、ソースを使うアプリケーションによってパラメータ化されることはありません。

  2. アプリケーションによって設定されたTimestampAssignerが最終的なタイムスタンプを割り当てます。 TimestampAssignerソースレコードのタイムスタンプとイベントを確認します。assignerはソースレコードのタイムスタンプを使うかイベントのフィールドにアクセスして、最終的なイベントのタイムスタンプを取得できます。

この2段階のアプローチにより、ユーザはソースシステムからのタイムスタンプと、イベントデータ内のタイムスタンプの両方をイベントタイムスタンプとして参照できます。

注意: ソースレコードのタイムスタンプの無いデータソース(ファイルなど)を使い、ソースレコードのタイムスタンプを最終的なイベントタイムスタンプとして使う場合、イベントはLONG_MIN *(=-9,223,372,036,854,775,808)*に等しいデフォルトのタイムスタンプを取得します。

ウォーターマークの生成 #

ウォーターマークのgeneratorsはストリーミングの実行中のみアクティブになります。バッチ実行はウォーターマークGeneratorsを停止します; 以下で説明する全ての関連操作は事実上no-opになります。

データソースAPIは、ウォーターマークgeneratorsのsplitごとの個別の実行をサポートします。これにより、Flinkはsplitごとのイベント時間の進捗を個別に監視できます。これはイベント時間のスキューを適切に処理し、アプリケーション全体のイベント時間の進捗を妨げないようにするために重要です。

Watermark Generation in a Source with two Splits.

Split Reader APIを使ってソースコネクタを実装する場合、これは自動的に処理されます。Split Reader APIに基づくすべての実装には、すぐに使えるsplit対応のウォーターマークがあります。

低レベルSourceReader APIの実装でsplit対応ウォーターマーク生成を使うには、実装でイベントを異なるsplitから異なるoutputに出力する必要があります: Split-local SourceOutputs。Split-localは、createOutputForSplit(splitId)releaseOutputForSplit(splitId)メソッドを介して、main ReaderOutput で作成およびリリースできます。詳細については、クラスとメソッドのJavaDocsを参照してください。

Splitレベルウォーターマーク配置 #

ソースオペレータのウォーターマークの配置はFlinkランタイムによって処理されますが、splitレベルのウォーターマークの配置を実現するには、ソースでSourceReader#pauseOrResumeSplitsSplitReader#pauseOrResumeSplitsを追加で実装する必要があります。Splitレベルのウォーターマークの配置は、複数のsplitsがソースリーダーに割り当てられている場合に役立ちます。By default, these implementations will throw an UnsupportedOperationException, pipeline.watermark-alignment.allow-unaligned-source-splits is set to false, when there is more than one split assigned, and the split exceeds the watermark alignment threshold configured by the WatermarkStrategy. SourceReaderBaseにはSourceReader#pauseOrResumeSplitsの実装が含まれているため、継承するソースはSplitReader#pauseOrResumeSplitsを実装するだけで済みます。実装のヒントの詳細についてはjavadocsを参照してください。

inserted by FC2 system