手短に言うと、この機能を使ってユーザはFlinkの外側からFlinkの管理されたパーティション化状態(状態との連携を見てください)をクエリすることができます。幾つかのシナリオについては、クエリ可能な状態は実際にはしばしばボトルネックになるキー-値ストアのような外部システムを使った分散オペレーション/トランザクションの必要を除外します。
MemoryStateBackend
あるいは FsStateBackend
は、値を扱う時に連携しないため、代わりに格納された値を使います。read-modify-write パターンは安全では無く、クエリ可能な状態サーバが同時実行の修正により失敗するかもしれません。RocksDBStateBackend
はこのような問題はありません。
状態をクエリ可能にするために、クエリ可能な状態サーバはまず最初にquery.server.enable
設定パラメータを true
に設定してグローバルに有効になる必要があります(現在のデフォルト)。そして、以下のどちらかを使って適切な状態がクエリ可能になる必要があります
QueryableStateStream
、シンクのような挙動をして、やってくる値をクエリ可能な状態として提供する便利なオブジェクト。あるいは、StateDescriptor#setQueryable(String queryableStateName)
、オペレータのキー付けされた状態をクエリ可能にします。以下の章はこれら2つのやり方の使い方を説明します。
KeyedStream
は以下のメソッドを使ってその値をクエリ可能な状態として提供するかもしれません:
// ValueState
QueryableStateStream asQueryableState(
String queryableStateName,
ValueStateDescriptor stateDescriptor)
// Shortcut for explicit ValueStateDescriptor variant
QueryableStateStream asQueryableState(String queryableStateName)
// FoldingState
QueryableStateStream asQueryableState(
String queryableStateName,
FoldingStateDescriptor stateDescriptor)
// ReducingState
QueryableStateStream asQueryableState(
String queryableStateName,
ReducingStateDescriptor stateDescriptor)
ListState
シンクは片付けされないかもしれない常に成長するリストになり、従って結果的にあまりの多くのメモリを消費するだろうため、クエリ可能なListStateシンクはありません。
A call to these methods returns a QueryableStateStream
, which cannot be further transformed and currently only holds the name as well as the value and key serializer for the queryable state stream. シンクと比較可能で、それ以上の変換は続けることができません。
内部的には QueryableStateStream
がクエリ可能な状態インスタンスを更新するために全てのやってくるレコードを使用するオペレータを解釈します。以下のようにプログラム内で、選択した状態の変化に応じてValueState#update(value)
あるいはAppendingState#add(value)
を使って、キー付けされたストリームの全てのレコードは状態インスタンスを更新するために使われます:
stream.keyBy(0).asQueryableState("query-name")
これはScala APIの flatMapWithState
のように振舞います。
オペレータの管理されたキー付けされた状態 (管理されたキー付けされた状態の使用を見てください)は、以下の例のように StateDescriptor#setQueryable(String queryableStateName)
を使って適切な状態ディスクリプタをクエリ可能にすることで、クエリ可能にすることができます:
ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
new ValueStateDescriptor<>(
"average", // the state name
TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), // type information
Tuple2.of(0L, 0L)); // default value of the state, if nothing was set
descriptor.setQueryable("query-name"); // queryable state name
QueryableStateClient
ヘルパークラスが状態を内部的に提供するKvState
インスタンスに対してクエリのために使われるかもしれません。有効な JobManager
アドレスとポートを使ってセットアップし以下のように生成される必要があります:
final Configuration config = new Configuration();
config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, queryAddress);
config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, queryPort);
QueryableStateClient client = new QueryableStateClient(config);
クエリのメソッドはこれです:
Future<byte[]> getKvState(
JobID jobID,
String queryableStateName,
int keyHashCode,
byte[] serializedKeyAndNamespace)
このメソッドの呼び出しはIDjobID
を持つジョブのqueryableStateName
によって識別されるクエリ可能な状態インスタンスのためのシリアライズ化された状態値を結果的に持つFuture
を返します。keyHashCode
はObject.hashCode()
によって返されるキーのハッシュコードで、serializedKeyAndNamespace
はシリアライズ化されたキーと名前空間です。
QueryableStateClient#shutdown()
を使ってシャットダウンする必要があります。
現在の実装は提供するキー/名前空間と返される結果の両方についてシリアライズ化されたデータを使ってのみ動作するという意味でまだまだ低レベルです。ユーザ(あるいは幾つかのフォローアップユーティリティ)はこれのためにシリアライザをセットアップする責任があります。これのいいところは、クエリサービスはクラスをロードする問題などを心配する用事がないということです。
キー/名前空間と値のシリアライズ化のためのシリアライズ化のユーティリティがKvStateRequestSerializer
に含まれます。
以下の例はCountWindowAverage
の例 (管理されたキー付けされた状態の使用を見てください)をクエリ可能にしこの値をクエリする方法を示すことで、拡張します。
public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
private transient ValueState<Tuple2<Long /* count */, Long /* sum */>> sum;
@Override
public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {
Tuple2<Long, Long> currentSum = sum.value();
currentSum.f0 += 1;
currentSum.f1 += input.f1;
sum.update(currentSum);
if (currentSum.f0 >= 2) {
out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));
sum.clear();
}
}
@Override
public void open(Configuration config) {
ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
new ValueStateDescriptor<>(
"average", // the state name
TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), // type information
Tuple2.of(0L, 0L)); // default value of the state, if nothing was set
descriptor.setQueryable("query-name");
sum = getRuntimeContext().getState(descriptor);
}
}
一旦ジョブの中で使用すると、このオペレータからジョブIDを扱うことができ、そしてキーの現在の状態をクエリすることができます:
final Configuration config = new Configuration();
config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, queryAddress);
config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, queryPort);
QueryableStateClient client = new QueryableStateClient(config);
final TypeSerializer<Long> keySerializer =
TypeInformation.of(new TypeHint<Long>() {}).createSerializer(new ExecutionConfig());
final TypeSerializer<Tuple2<Long, Long>> valueSerializer =
TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}).createSerializer(new ExecutionConfig());
final byte[] serializedKey =
KvStateRequestSerializer.serializeKeyAndNamespace(
key, keySerializer,
VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE);
Future<byte[]> serializedResult =
client.getKvState(jobId, "query-name", key.hashCode(), serializedKey);
// now wait for the result and return it
final FiniteDuration duration = new FiniteDuration(1, TimeUnit.SECONDS);
byte[] serializedValue = Await.result(serializedResult, duration);
Tuple2<Long, Long> value =
KvStateRequestSerializer.deserializeValue(serializedValue, valueSerializer);
以下の設定パラメータはクエリ可能な状態サーバとクライアントの挙動に影響を与えます。それらはQueryableStateOptions
の中で定義されます。
query.server.enable
: クエリ可能な状態サーバを開始するかどうかを示すためのフラグquery.server.port
: 内部的な KvStateServer
へバインドするポート(0 => 利用可能なランダムなポートを取り上げます)query.server.network-threads
: KvStateServer
のためのネットワーク(イベントループ)スレッドの数 (0 => #slots)query.server.query-threads
: KvStateServerHandler
のための非同期クエリスレッドの数 (0 => #slots)。QueryableStateClient
)query.client.network-threads
: KvStateClient
のためのネットワーク(イベントループ)スレッドの数 (0 => 利用可能なコアの数)query.client.lookup.num-retries
: 場所の調査が失敗した時の再試行の数query.client.lookup.retry-delay
: 場所の調査の失敗時の再試行の遅延 (ミリ秒)