クエリ可能な状態

注意: クエリ可能な状態のためのクライアントAPIは現在のところ開発中の状態で、提供されたインタフェースの安定性について保証がありません。もうすぐやってくるFlinkのバージョンでクライアント側に破壊的なAPIの変更があるでしょう。

手短に言うと、この機能を使ってユーザはFlinkの外側からFlinkの管理されたパーティション化状態(状態との連携を見てください)をクエリすることができます。幾つかのシナリオについては、クエリ可能な状態は実際にはしばしばボトルネックになるキー-値ストアのような外部システムを使った分散オペレーション/トランザクションの必要を除外します。

注意: クエリ可能な状態オペレータを使った同期および潜在的にそのオペレーションをブロックするよりも同時実行のスレッドからキー付けされた状態をアクセスします。Javaヒープ空間を使ったどのような状態バックエンド、例えばMemoryStateBackend あるいは FsStateBackend は、値を扱う時に連携しないため、代わりに格納された値を使います。read-modify-write パターンは安全では無く、クエリ可能な状態サーバが同時実行の修正により失敗するかもしれません。RocksDBStateBackend はこのような問題はありません。

クエリ可能な状態の作成

状態をクエリ可能にするために、クエリ可能な状態サーバはまず最初にquery.server.enable設定パラメータを trueに設定してグローバルに有効になる必要があります(現在のデフォルト)。そして、以下のどちらかを使って適切な状態がクエリ可能になる必要があります

  • QueryableStateStream、シンクのような挙動をして、やってくる値をクエリ可能な状態として提供する便利なオブジェクト。あるいは、
  • StateDescriptor#setQueryable(String queryableStateName)、オペレータのキー付けされた状態をクエリ可能にします。

以下の章はこれら2つのやり方の使い方を説明します。

クエリ可能な状態ストリーム

KeyedStream は以下のメソッドを使ってその値をクエリ可能な状態として提供するかもしれません:

// ValueState
QueryableStateStream asQueryableState(
    String queryableStateName,
    ValueStateDescriptor stateDescriptor)

// Shortcut for explicit ValueStateDescriptor variant
QueryableStateStream asQueryableState(String queryableStateName)

// FoldingState
QueryableStateStream asQueryableState(
    String queryableStateName,
    FoldingStateDescriptor stateDescriptor)

// ReducingState
QueryableStateStream asQueryableState(
    String queryableStateName,
    ReducingStateDescriptor stateDescriptor)
注意: ListState シンクは片付けされないかもしれない常に成長するリストになり、従って結果的にあまりの多くのメモリを消費するだろうため、クエリ可能なListStateシンクはありません。

A call to these methods returns a QueryableStateStream, which cannot be further transformed and currently only holds the name as well as the value and key serializer for the queryable state stream. シンクと比較可能で、それ以上の変換は続けることができません。

内部的には QueryableStateStreamがクエリ可能な状態インスタンスを更新するために全てのやってくるレコードを使用するオペレータを解釈します。以下のようにプログラム内で、選択した状態の変化に応じてValueState#update(value)あるいはAppendingState#add(value)を使って、キー付けされたストリームの全てのレコードは状態インスタンスを更新するために使われます:

stream.keyBy(0).asQueryableState("query-name")

これはScala APIの flatMapWithStateのように振舞います。

管理されたキー付けされた状態

オペレータの管理されたキー付けされた状態 (管理されたキー付けされた状態の使用を見てください)は、以下の例のように StateDescriptor#setQueryable(String queryableStateName)を使って適切な状態ディスクリプタをクエリ可能にすることで、クエリ可能にすることができます:

ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
        new ValueStateDescriptor<>(
                "average", // the state name
                TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), // type information
                Tuple2.of(0L, 0L)); // default value of the state, if nothing was set
descriptor.setQueryable("query-name"); // queryable state name
注意: `queryableStateName` パラメータは任意に選択され、クエリのためだけに使われるかもしれません。状態の独自の名前に一致する必要はありません。

クエリの状態

QueryableStateClient ヘルパークラスが状態を内部的に提供するKvStateインスタンスに対してクエリのために使われるかもしれません。有効な JobManager アドレスとポートを使ってセットアップし以下のように生成される必要があります:

final Configuration config = new Configuration();
config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, queryAddress);
config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, queryPort);

QueryableStateClient client = new QueryableStateClient(config);

クエリのメソッドはこれです:

Future<byte[]> getKvState(
    JobID jobID,
    String queryableStateName,
    int keyHashCode,
    byte[] serializedKeyAndNamespace)

このメソッドの呼び出しはIDjobIDを持つジョブのqueryableStateName によって識別されるクエリ可能な状態インスタンスのためのシリアライズ化された状態値を結果的に持つFutureを返します。keyHashCodeObject.hashCode()によって返されるキーのハッシュコードで、serializedKeyAndNamespaceはシリアライズ化されたキーと名前空間です。

注意: クライアントは非同期で複数のスレッドで共有することができます。リソースを解放するために使用しない倍井は、QueryableStateClient#shutdown()を使ってシャットダウンする必要があります。

現在の実装は提供するキー/名前空間と返される結果の両方についてシリアライズ化されたデータを使ってのみ動作するという意味でまだまだ低レベルです。ユーザ(あるいは幾つかのフォローアップユーティリティ)はこれのためにシリアライザをセットアップする責任があります。これのいいところは、クエリサービスはクラスをロードする問題などを心配する用事がないということです。

キー/名前空間と値のシリアライズ化のためのシリアライズ化のユーティリティがKvStateRequestSerializerに含まれます。

以下の例はCountWindowAverageの例 (管理されたキー付けされた状態の使用を見てください)をクエリ可能にしこの値をクエリする方法を示すことで、拡張します。

public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {

    private transient ValueState<Tuple2<Long /* count */, Long /* sum */>> sum;

    @Override
    public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {
        Tuple2<Long, Long> currentSum = sum.value();
        currentSum.f0 += 1;
        currentSum.f1 += input.f1;
        sum.update(currentSum);

        if (currentSum.f0 >= 2) {
            out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));
            sum.clear();
        }
    }

    @Override
    public void open(Configuration config) {
        ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
                new ValueStateDescriptor<>(
                        "average", // the state name
                        TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), // type information
                        Tuple2.of(0L, 0L)); // default value of the state, if nothing was set
        descriptor.setQueryable("query-name");
        sum = getRuntimeContext().getState(descriptor);
    }
}

一旦ジョブの中で使用すると、このオペレータからジョブIDを扱うことができ、そしてキーの現在の状態をクエリすることができます:

final Configuration config = new Configuration();
config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, queryAddress);
config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, queryPort);

QueryableStateClient client = new QueryableStateClient(config);

final TypeSerializer<Long> keySerializer =
        TypeInformation.of(new TypeHint<Long>() {}).createSerializer(new ExecutionConfig());
final TypeSerializer<Tuple2<Long, Long>> valueSerializer =
        TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}).createSerializer(new ExecutionConfig());

final byte[] serializedKey =
        KvStateRequestSerializer.serializeKeyAndNamespace(
                key, keySerializer,
                VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE);

Future<byte[]> serializedResult =
        client.getKvState(jobId, "query-name", key.hashCode(), serializedKey);

// now wait for the result and return it
final FiniteDuration duration = new FiniteDuration(1, TimeUnit.SECONDS);
byte[] serializedValue = Await.result(serializedResult, duration);
Tuple2<Long, Long> value =
        KvStateRequestSerializer.deserializeValue(serializedValue, valueSerializer);

設定

以下の設定パラメータはクエリ可能な状態サーバとクライアントの挙動に影響を与えます。それらはQueryableStateOptionsの中で定義されます。

サーバ

  • query.server.enable: クエリ可能な状態サーバを開始するかどうかを示すためのフラグ
  • query.server.port: 内部的な KvStateServer へバインドするポート(0 => 利用可能なランダムなポートを取り上げます)
  • query.server.network-threads: KvStateServerのためのネットワーク(イベントループ)スレッドの数 (0 => #slots)
  • query.server.query-threads: KvStateServerHandlerのための非同期クエリスレッドの数 (0 => #slots)。

クライアント (QueryableStateClient)

  • query.client.network-threads: KvStateClientのためのネットワーク(イベントループ)スレッドの数 (0 => 利用可能なコアの数)
  • query.client.lookup.num-retries: 場所の調査が失敗した時の再試行の数
  • query.client.lookup.retry-delay: 場所の調査の失敗時の再試行の遅延 (ミリ秒)

制限事項

  • クエリ可能な状態のライフサイクルはジョブのライフサイクルに制限されます。例えば起動時にタスクはクエリ可能な状態を登録し、破棄時にそれを登録解除します。将来のバージョンでは、タスクが完了した後でクエリできるようにし、状態のリプリケーションを使って回復を高速化するために、これを切り離すことが望ましいです。
  • 利用可能な KvState についての通知は単純な連絡で起こります。将来には、これは照会と通知を使ってもっと頑丈になるように改良されるべきです。
  • サーバとクライアントはクエリのための統計を追跡し続けます。それらはどこにも公開されていないため、これらは現在のところデフォルトで無効です。メトリクスシステムによってこれらの数値を公開するために良いサポートがあればすぐに、統計を有効にする必要があります。
TOP
inserted by FC2 system