Async I/O
This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.

外部データアクセスのための非同期 I/O #

このページでは、外部データストアとの非同期I/OのためのFlink APIの使用について説明します。 非同期またはイベント駆動型プログラミングに慣れていないユーザにとっては、Futures and event-driven programmingに関する記事が準備として役立つかもしれません。

注意: 非同期I/Oユーティリティの設計と実装についての詳細は、提案および設計ドキュメントFLIP-12: Asynchronous I/O Design and Implementationを参照してください。 新しい再試行サポートの詳細については、ドキュメントFLIP-232: Add Retry Support For Async I/O In DataStream APIを参照してください。

非同期I/O操作の必要性 #

外部システムとやり取りする時(例えば、データベースに保存されたデータを使ってストリームイベントを強化する時)、外部システムとの通信遅延がストリーミングアプリケーションの全体的な作業を占めないように注意する必要があります。

例えばMapFunctionで外部システム内のデータに単純にアクセスすることは、一般的に同期インタラクションを意味します: リクエストはデータベースに送信され、MapFunctionは応答が受信されるまで待ちます。多くの場合、この待機時間が関数の時間の大部分を占めます。

データベースとの非同期インタラクションは1つの並列関数インスタンスが多くのリクエストを同時に処理し、応答を同時に受信できることを意味します。こうすることで、待機時間は他のリクエストの送信と応答の受信と重なり合うことができます。少なくとも待機時間は複数のリクエストに償却されます。これにより、ほとんどの場合、ストリーミングスループットが大幅に向上します。

注意: MapFunctionをとても高い並列度にスケーリングするだけのスループットの改善は、ある場合にはうまくいきますが、通常とても高いリソースの代償に付きます: より多くの平行な MapFunction インスタンスはより多くのタスク、スレッド、Flink内部ネットワーク接続、データベースへのネットワーク接続、バッファおよび一般的な内部予約のオーバーヘッドを意味します。

必要条件 #

上のセクションで説明したように、データベース(あるいはキーバリューストア)への適切な非同期I/Oの実装には、非同期リクエストをサポートするデータベースへのクライアントが必要です。多くの人気のあるデータベースはそのようなクライアントを提供します。

そのようなクライアントが無い場合、複数のクライアントを生成し同期呼び出しをスレッドプールを使って処理することで、同期クライアントを制限付き同時クライアントに切り替えてみることができます。ただし、このやりかたは通常、適切な非同期クライアントよりも非効率的です。

非同期 I/O API #

Flinkの非同期I/O APIを使うと、ユーザはデータストリームで非同期リクエストクライアントを使えます。APIはデータストリームとの統合を処理し、処理順、イベント時間、耐障害性、再試行サポートなどを処理します。

目的のデータベース用の非同期クライアントがあると仮定すると、データベースに対する非同期I/Oをつかったストリーム変換を実装するには、次の3つの部分が必要です:

  • リクエストを発送するAsyncFunctionの実装
  • オペレーションの結果を取得して、それをResultFutureに渡すcallback
  • 再試行の有無に関わらず、DataStreamに対する非同期I/Oオペレーションを変換として適用

以下のコード例は基本的なパターンを説明します:

// This example implements the asynchronous request and callback with Futures that have the
// interface of Java 8's futures (which is the same one followed by Flink's Future)

/**
 * An implementation of the 'AsyncFunction' that sends requests and sets the callback.
 */
class AsyncDatabaseRequest extends RichAsyncFunction<String, Tuple2<String, String>> {

    /** The database specific client that can issue concurrent requests with callbacks */
    private transient DatabaseClient client;

    @Override
    public void open(OpenContext openContext) throws Exception {
        client = new DatabaseClient(host, post, credentials);
    }

    @Override
    public void close() throws Exception {
        client.close();
    }

    @Override
    public void asyncInvoke(String key, final ResultFuture<Tuple2<String, String>> resultFuture) throws Exception {

        // issue the asynchronous request, receive a future for result
        final Future<String> result = client.query(key);

        // set the callback to be executed once the request by the client is complete
        // the callback simply forwards the result to the result future
        CompletableFuture.supplyAsync(new Supplier<String>() {

            @Override
            public String get() {
                try {
                    return result.get();
                } catch (InterruptedException | ExecutionException e) {
                    // Normally handled explicitly.
                    return null;
                }
            }
        }).thenAccept( (String dbResult) -> {
            resultFuture.complete(Collections.singleton(new Tuple2<>(key, dbResult)));
        });
    }
}

// create the original stream
DataStream<String> stream = ...;

// apply the async I/O transformation without retry
DataStream<Tuple2<String, String>> resultStream =
    AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100);

// or apply the async I/O transformation with retry
// create an async retry strategy via utility class or a user defined strategy
AsyncRetryStrategy asyncRetryStrategy =
	new AsyncRetryStrategies.FixedDelayRetryStrategyBuilder(3, 100L) // maxAttempts=3, fixedDelay=100ms
		.ifResult(RetryPredicates.EMPTY_RESULT_PREDICATE)
		.ifException(RetryPredicates.HAS_EXCEPTION_PREDICATE)
		.build();

// apply the async I/O transformation with retry
DataStream<Tuple2<String, String>> resultStream =
	AsyncDataStream.unorderedWaitWithRetry(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100, asyncRetryStrategy);
/**
 * An implementation of the 'AsyncFunction' that sends requests and sets the callback.
 */
class AsyncDatabaseRequest extends AsyncFunction[String, (String, String)] {

    /** The database specific client that can issue concurrent requests with callbacks */
    lazy val client: DatabaseClient = new DatabaseClient(host, post, credentials)

    /** The context used for the future callbacks */
    implicit lazy val executor: ExecutionContext = ExecutionContext.fromExecutor(Executors.directExecutor())


    override def asyncInvoke(str: String, resultFuture: ResultFuture[(String, String)]): Unit = {

        // issue the asynchronous request, receive a future for the result
        val resultFutureRequested: Future[String] = client.query(str)

        // set the callback to be executed once the request by the client is complete
        // the callback simply forwards the result to the result future
        resultFutureRequested.onSuccess {
            case result: String => resultFuture.complete(Iterable((str, result)))
        }
    }
}

// create the original stream
val stream: DataStream[String] = ...

// apply the async I/O transformation without retry
val resultStream: DataStream[(String, String)] =
    AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100)

// apply the async I/O transformation with retry
// create an AsyncRetryStrategy
val asyncRetryStrategy: AsyncRetryStrategy[String] =
  new AsyncRetryStrategies.FixedDelayRetryStrategyBuilder(3, 100L) // maxAttempts=3, fixedDelay=100ms
    .ifResult(RetryPredicates.EMPTY_RESULT_PREDICATE)
    .ifException(RetryPredicates.HAS_EXCEPTION_PREDICATE)
    .build();

// apply the async I/O transformation with retry
val resultStream: DataStream[(String, String)] =
  AsyncDataStream.unorderedWaitWithRetry(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100, asyncRetryStrategy)

重要な注意: ResultFutureResultFuture.completeの最初の呼び出しで完了します。 後続の全てのcomplete呼び出しは無視されます。

次の3つのパラメータは非同期オペレーションを制御します:

  • Timeout: タイムアウトは、非同期操作が最終的に失敗したと見なされるまでの時間を定義します。再試行が有効な場合は、複数の再試行リクエストが含まれる場合があります。このパラメータはdead/failedリクエストを保護します。

  • Capacity: このパラメータは、同時に進行できる非同期リクエストの数を定義します。 非同期I/Oアプローチにより通常はスループットが大幅に向上しますが、ストリーミングアプリケーションではoperatorが依然としてボトルネックになる可能性があります。同時リクエストの数を制限すると、オペレータは保留中のリクエストのバックログが増え続けることを防ぎますが、容量が使い果たされるとバックプレッシャーがトリガーされます。

  • AsyncRetryStrategy: asyncRetryStrategyは、遅延再実行をトリガーする条件と遅延戦略を定義します。例えば、固定遅延、指数バックオフ遅延、カスタム実装など。

タイムアウト処理 #

非同期I/Oリクエストがタイムアウトすると、デフォルトでは例外が投げられジョブが再開されます。 タイムアウトを処理したい場合は、AsyncFunction#timeoutメソッドを上書きできます。 この入力レコードの処理が完了したことをFlinkに示すために、上書きする場合はResultFuture.complete()ResultFuture.completeExceptionally()を呼び出してください。タイムアウトが発生した時にレコードを出力したくない場合は、ResultFuture.complete(Collections.emptyList())を呼び出せます。

結果の整列 #

AsyncFunctionによって発行された同時リクエストは、どのリクエストが最初に終了したかに基づいて、未定義の順序で完了することがよくあります。 結果のレコードが発行された順番に制御するために、Flinkは以下の2つのモードを提供します:

  • Unordered: 結果レコードは非同期リクエストが終了するとすぐに発行されます。 ストリーム内のレコードの順番は非同期I/Oの前と後とで異なります。 基本的な時間の特性上、処理時間と一緒に使われた場合、このモードは最もレイテンシが低く、最もオーバーヘッドが低いです。 このモードでは、AsyncDataStream.unorderedWait(...)を使ってください。

  • Ordered: この場合、ストリームの順序が保持されます。結果レコードは非同期リクエストがトリガーされたのと同じ順序(オペレータの入力レコードの順序)で発行されます。これを実現するために、オペレータは先行する全てのレコードが発行(あるいはタイムアウト)されるまで、結果レコードをバッファします。 unordered modeと比較して、レコードまたは結果がチェックポイントされた状態で長期間保持されるため、通常、チェックポイント作成にある程度の余分な待ち時間とオーバーヘッドが生じます。 このモードにはAsyncDataStream.orderedWait(...)を使ってください。

イベントタイム #

ストリーミングアプリケーションがイベント時間で動作する場合、ウォーターマークは非同期I/Oオペレータによって正しく処理されます。そのことは同時に2つの順番モードについて以下のことを意味します:

  • Unordered: ウォーターマークはレコードを追い越しません。逆も同様です。つまり、ウォーターマークは順番の境界を確立します。 レコードは2つのウォーターマーク間でのみ順番無く発行されます。 あるウォーターマークの後で発生するレコードは、ウォーターマークが発行された後でのみ発行されるでしょう。 ウォーターマークが発行される前の入力からの全ての結果のレコードの後でのみ、今度はウォーターマークが発行されるでしょう。

    つまり、ウォーターマークが存在する場合、unorderedモードはorderedモードと同じ遅延と管理オーバーヘッドが一部発生します。オーバーヘッドの量はウォーターマークの頻度に依存します。

  • Ordered: レコード間の順序が維持されるのと同様に、ウォーターマークとレコードの順序も維持されます。処理時間での作業と比較して、オーバーヘッドの大きな変化はありません。

取り込み時間は自動的に生成されたウォーターマークを含むイベント時間の特殊なケースであることを思い出してください。

耐障害性の保証 #

非同期I/Oオペレータは完全に確実に1回の耐障害性の保証を提供します。実行中の非同期リクエストをチェックポイントに格納し、障害から回復する時にリクエストを復元/再トリガーします。

リトライサポート #

リトライサポートにより、ユーザのAsyncFunctionに対して透過的な非同期オペレータの組み込みの仕組みが導入されます。

  • AsyncRetryStrategy: AsyncRetryStrategyには再試行条件AsyncRetryPredicateと、再試行を続行するかどうかを決定するインタフェース、現在の試行回数に基づいた再試行間隔が含まれます。 トリガーの再試行条件が満たされた後、現在の試行回数が事前に設定した制限を超えたために再試行が中止されるか、タスクの終了時に再試行が強制的に終了される(この場合、システムは最終状態として実行結果または例外を取ります)可能性がある事に注意してください。

  • AsyncRetryPredicate: 再試行条件は実行結果または実行例外に基づいてトリガーできます。

実装のTips #

コールバック用のExecutor (またはScalaではExecutionContext)を持つFuturesを使った実装の場合は、コールバックは通常最小限の作業を行い、DirectExecutorがスレッド間のハンドオーバーの追加のオーバーヘッドを回避するため、DirectExecutorを使うことをお勧めします。通常コールバックは結果をResultFutureに渡すだけです。結果は出力バッファに追加されます。そこから、レコードの発行を含む重いロジックおよびチェックポイントの記帳とのやりとりがとにかく専用のスレッドプールで起こります。

DirectExecutororg.apache.flink.util.concurrent.Executors.directExecutor()またはcom.google.common.util.concurrent.MoreExecutors.directExecutor()を介して取得できます。

警告 #

AsyncFunctionはマルチスレッドとは呼ばれません

ここで明示的に指摘したい良くある混同は、AsyncFunctionはマルチスレッドの形式で呼ばれないということです。 たった一つのAsyncFunctionインスタンスが存在し、ストリームのそれぞれのパーティションの中の各レコードに対して順次呼ばれます。asyncInvoke(...)メソッドが早く返り、(クライアントによって)コールバック依存しない限り、適切な非同期I/Oは発生しません。

例えば、以下のパターンではasyncInvoke(...)関数がブロックされるため、非同期の動作が無効になります:

  • 結果が受け取られるまでlookup/queryメソッドがブロックの呼び出しをするデータベースクライアントの使用

  • asyncInvoke(...)メソッド内の非同期クライアントによって返されるfuture-typeオブジェクト上のブロック/待機

An AsyncFunction(AsyncWaitOperator)はジョブグラフのどこでも使えますが、SourceFunction/SourceStreamTaskに繋ぐことはできません。

再試行が有効な場合、より大きなキュー容量が必要になる可能性があります

新しい再試行機能により、より大きなキュー容量要件が発生する可能性があります。最大数は次のように概算できます:

inputRate * retryRate * avgRetryDuration

例えば、inputRate = 100 records/secのタスクの場合、要素の1%が平均1回の再試行をトリガーし、平均再試行時間が60秒の場合、追加のキュー容量要件は次のようになります:

100 records/sec * 1% * 60s = 60

つまり、ワークキューにさらに60の容量を追加しても、unordered出力モードではスループットに影響しない可能性があります。orderedモードの場合、先頭要素がキーポイントであり、未完了の状態が長くなるほど、処理遅延が長くなります。実際には、同じタイムアウト制約でより多くの再試行が行われる場合、再試行機能により先頭要素の不完全な時間が増加する可能性があります。

キュー容量が増加すると(バックプレッシャーを軽減する一般的な方法)、OOMのリスクが高まります。実際には、ListStateストレージの理論上の上限はInteger.MAX_VALUEのため、キュー容量の制限は同じですが、プロダクションであまりにキューを増やすことはできません。タスクの並列度を増やすことがより現実的な方法かもしれません。

Back to top

inserted by FC2 system