外部データアクセスのための非同期 I/O

このページは外部データストアを使った非同期I/OのためのFlinkのAPIの使用を説明します。非同期あるいはイベント駆動プログラミングに詳しくないユーザは、機能とイベント駆動プログラミングについての論文が事前に有用かも知れません。

注意: 非同期I/Oユーティリティの設計と実装についての詳細は、提案と設計のドキュメント FLIP-12: 非同期 I/O の設計と実装 で見つかるかもしれません。

非同期I/O操作の必要性

外部システムと影響しあう場合(例えば、データベース内に格納されたデータを使ってストリームイベントを増やす場合)、外部システムとの通信遅延がストリーミングアプリケーションの全体の仕事を占めないように注意する必要があります。

例えばMapFunction内で、外部システム内のデータへ無邪気にアクセスすることは、同期相互作用を意味します: リクエストはデータベースに送信され、MapFunction は応答が受け取られるまで待ちます。多くの場合において、この待機は関数の時間の大多数を占めます。

データベースとの非同期な相互影響は、1の並行度の関数インスタンスが同時に多くのリクエストを処理することができ、応答を同時に受け取ることができることを意味します。つまり、待機時間は他のリクエストの送信と応答の受信と重なり合うことができます。少なくとも待機時間は複数のリクエストに償却されます。これはほとんどの場合において、かなり高いストリーミング スループットに繋がります。

注意: MapFunction をとても高い並行度にスケーリングするだけのスループットの改善は、ある場合にはうまくいきますが、通常とても高いリソースの代償に付きます: より多くの平行な MapFunction インスタンスはより多くのタスク、スレッド、Flink内部ネットワーク接続、データベースへのネットワーク接続、バッファ および 一般的な内部予約の記帳を意味します。

必要条件

上の章で説明されたように、データベースへの適切な非同期(あるいはキー/値 ストア)の実装は、非同期のリクエストをサポートするデータベースへのクライアントを必要とします。多くの人気のあるデータベースはそのようなクライアントを提供します。

そのようなクライアントが無い場合、複数のクライアントと生成し同期呼び出しをスレッドプールを使って処理することで、非同期クライアントを制限された同時クライアントに切り替えてみることができます。しかし、このやり方は通常適切な非同期クライアントよりも効率が悪いです。

非同期 I/O API

Flinkの非同期 I/O API を使って、データストリームを持つ非同期リクエストクライアント使うことができます。API はデータストリームとの統合、処理順を調整、イベント時間、耐障害性などを処理します。

目的のデータベースのための非同期クライアントを持つと仮定すると、データベースに対する非同期I/Oを使ったストリーム返還を実装するために3つの部分が必要です:

  • リクエストを発送するAsyncFunction の実装
  • オペレーションの結果を取り、それをResultFutureに渡すcallback
  • データストリーム上の非同期I/O操作を変換として適用

以下のコード例は基本的なパターンを説明します:

// This example implements the asynchronous request and callback with Futures that have the
// interface of Java 8's futures (which is the same one followed by Flink's Future)

/**
 * An implementation of the 'AsyncFunction' that sends requests and sets the callback.
 */
class AsyncDatabaseRequest extends RichAsyncFunction<String, Tuple2<String, String>> {

    /** The database specific client that can issue concurrent requests with callbacks */
    private transient DatabaseClient client;

    @Override
    public void open(Configuration parameters) throws Exception {
        client = new DatabaseClient(host, post, credentials);
    }

    @Override
    public void close() throws Exception {
        client.close();
    }

    @Override
    public void asyncInvoke(final String str, final ResultFuture<Tuple2<String, String>> resultFuture) throws Exception {

        // issue the asynchronous request, receive a future for result
        Future<String> resultFuture = client.query(str);

        // set the callback to be executed once the request by the client is complete
        // the callback simply forwards the result to the result future
        resultFuture.thenAccept( (String result) -> {

            resultFuture.complete(Collections.singleton(new Tuple2<>(str, result)));
         
        });
    }
}

// create the original stream
DataStream<String> stream = ...;

// apply the async I/O transformation
DataStream<Tuple2<String, String>> resultStream =
    AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100);
/**
 * An implementation of the 'AsyncFunction' that sends requests and sets the callback.
 */
class AsyncDatabaseRequest extends AsyncFunction[String, (String, String)] {

    /** The database specific client that can issue concurrent requests with callbacks */
    lazy val client: DatabaseClient = new DatabaseClient(host, post, credentials)

    /** The context used for the future callbacks */
    implicit lazy val executor: ExecutionContext = ExecutionContext.fromExecutor(Executors.directExecutor())


    override def asyncInvoke(str: String, resultFutre: ResultFuture[(String, String)]): Unit = {

        // issue the asynchronous request, receive a future for the result
        val resultFuture: Future[String] = client.query(str)

        // set the callback to be executed once the request by the client is complete
        // the callback simply forwards the result to the result future
        resultFuture.onSuccess {
            case result: String => resultFuture.complete(Iterable((str, result)));
        }
    }
}

// create the original stream
val stream: DataStream[String] = ...

// apply the async I/O transformation
val resultStream: DataStream[(String, String)] =
    AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100)

重要な注意: ResultFutureResultFuture.completeの最初の呼び出しを使って完了されます。全てのサブリクエストのcomplete 呼び出しは無視されます。

以下の2つのパラメータは非同期オペレーションを制御します:

  • タイムアウト: タイムアウトは非同期リクエストが失敗したと見なされるまでどれだけ長く掛かるかを定義します。このパラメータはdead/failedリクエストを保護します。

  • 容量: このパラメータはどれだけ多くの非同期リクエストが同時に進行中になることができるかを定義します。たとえ非同期I/Oのやり方が一般的により良いスループットに繋がるとしても、オペレータはまだストリーミングアプリケーションでのボトルネックになりえます。同時に発生するリクエストの数を制限することで、オペレータはこれまで成長している延期されたリクエストのバックログを収集しなくなるでしょう。しかしそれはいったん容量が使い尽くされるとバックプレッシャーを引き起こすでしょう。

結果の整列

AsyncFunctionによって発行される同時に発生するリクエストは、どのリクエストが最初に終了したかに基づいて、しばしば何らかの非定義の順番で完了します。結果のレコードが発行された順番に制御するために、Flinkは以下の2つのモードを提供します:

  • Unordered: 結果のレコードは非同期リクエストが完了するとすぐに発行されます。ストリーム内のレコードの順番は非同期I/Oの前と後とで異なります。基本的な時間の特性上、processing timeと一緒に使われた場合、このモードは最もレイテンシが低く、最もオーバーヘッドが低いです。このモードのためには AsyncDataStream.unorderedWait(...) を使います。

  • Ordered: この場合、ストリームの順番は保持されます。結果のレコードは非同期リクエストが起動されたものと同じ順番(入力レコードのオペレータの順番)で発行されます。これを行うために、全ての処理中のレコードが発行(あるいはタイムアウト)されるまで、オペレータは結果のレコードをバッファします。レコードあるいは結果はチェックポイントされた状態の中でunorderedモードに比較して長期間維持されるため、これは通常余分なレイテンシとチェックポイント時の幾らかのおオーバーヘッドを導入します。このモードのためには、AsyncDataStream.orderedWait(...) を使います。

イベントタイム

ストリーミングアプリケーションがevent timeと連携する場合、ウォーターマークは非同期I/Oオペレータによって正しく処理されるでしょう。そのことは同時に2つの順番モードについて以下のことを意味します:

  • Unordered: ウォーターマークはレコードに追いつかず、逆も同様です。ウォーターマークが順番の境界を確立することを意味します。レコードは2つのウォーターマーク間でのみ順番無く発行されます。あるウォーターマークの後で発生するレコードは、ウォーターマークが発行された後でのみ発行されるでしょう。ウォーターマークが発行される前の入力からの全ての結果のレコードの後でのみ、今度はウォーターマークが発行されるでしょう。

    ウォーターマークの前で、unordered モードがorderedと同じようになんらかの同じレイテンシと管理のオーバーヘッドを導入することを意味します。オーバーヘッドの量はウォーターマークの頻度に依存します。

  • Ordered: レコード間の順番が保持されるのと同じように、レコードのウォーターマークの順番が保持されます。processing timeとの連携に比べて、オーバーヘッドにはそれほど大きな変更はありません。

Ingestion Time は、ソースの処理時間に基づく自動的に生成されたウォーターマークを使ったevent time の特別な場合であることを思い出してください。

耐障害性の保証

非同期I/Oオペレータは完全に確実に1回の耐障害性の保証を提供します。チェックポイント内の実行中の非同期リクエストのレコードを格納し、障害時の回復時にリクエストを回復/再起動します。

実装のTips

コールバックは一般的に最小の仕事を行い、DirectExecutorは追加のスレッド-トゥ-スレッドの手渡しオーバーヘッドを避けるため、コールバックのためのExecutor (あるいはScalaでの ExecutionContext)を持つFutures を使った実装については、DirectExecutorを使うことを提案します。コールバックは一般的に結果をResultFutureに手渡しするだけです。これは出力バッファに追加します。そこから、レコードの発行を含む重いロジックおよびチェックポイントの記帳とのやりとりがとにかく専用のスレッドプールで起こります。

DirectExecutororg.apache.flink.runtime.concurrent.Executors.directExecutor() あるいは com.google.common.util.concurrent.MoreExecutors.directExecutor()を使って取得されるかもしれません。

警告

AsyncFunction は Multi-Threaded とは呼ばれません。

ここで明示的に指摘したい良くある混同は、AsyncFunctionはマルチスレッドの形式で呼ばれないということです。たった一つのAsyncFunctionが存在し、それはストリームのそれぞれの部分の中の各レコードについて順次呼ばれます。asyncInvoke(...) メソッドが早く返り、(クライアントによって)コールバックに頼らない限り、それは適切な非同期I/Oにはならないでしょう。

例えば、以下のパターンはasyncInvoke(...) 関数をブロックすることになり、従って非同期の挙動を無効にします:

  • 結果が受け取られるまでlookup/queryメソッドがブロックの呼び出しをするデータベースクライアントの使用

  • asyncInvoke(...)メソッド内の非同期クライアントによって返されるfuture-typeオブジェクト上のブロック/待機

上に戻る

TOP
inserted by FC2 system