stateful 関数とオペレータは、より精巧な操作の全ての型について状態を重要な構築ブロックにしながら、個々の要素/イベントの処理を横断してデータを格納します。例えば:
状態を耐障害性のあるものにするためには、Flinkは状態に注意し、それをチェックポイントする必要があります。多くの場合において、Flinkはアプリケーションのための状態を管理することができます。このことはFlinkがアプリケーションがとても大きな状態を保持することができるようにメモリ管理(必要であれば多分ディスクに分割します)を扱うことを意味します。
このドキュメントはアプリケーションの開発時にFlinkの状態の抽象化を使う方法を説明します。
Flinkでの状態の2つの基本的な種類があります: キー付けされた状態
と オペレーターの状態
。
キー付けされた状態は常にキーに関連付けされ、KeyedStream
上の関数とオペレータ内でのみ使うことができます。
キー毎に確実に1つの状態の分割を使って、キー付けされた状態をパーション化された、あるいはシャードされたオペレータの状態と考えることができます。各キー付けされた状態は論理的に <parallel-operator-instance, key> のユニークな合成に制限されます。そして各キーはキー付けされたオペレータの確実に1つの並行インスタンスに“所属する”ため、これを単純に <operator, key> と考えることができます。
キー付けされた状態は更にキー グループと呼ばれるものに組織化されます。キーグループはFlinkがキー付けされた状態を再分配するアトミックな単位です; 定義された最大並行度と全く同じだけのキーグループがあります。実行の間、キー付けされたオペレータの各並行インスタンスは1つ以上のキーグループのためにキーを使って動作します。
オペレータの状態 (あるいは キー付けされていない状態)を使って、各オペレータの状態は1つの並行オペレータインスタンスに制限されます。KafkaソースコネクタはFlinkでのオペレータの状態を使うための良い動機付けの例です。このKafkaコンシューマの各並行インスタンスはオペレータの状態としてトピックパーティションとオフセットのマップを保持します。
並行度が変更された時に、オペレータの状態インタフェースは並行オペレータインスタンスの間で状態の再分配をサポートします。この再分配を行うための異なるスキーマがあるかもしれません; 現在のところ以下が定義されています:
キー付けされた状態 と オペレータの状態 は2つの形式で存在します: 管理 および 生。
管理された状態 は、内部ハッシュテーブルあるいはRocksDBのような、Flinkランタイムによって制御されるデータ構造で表現されます。例としては “ValueState”, “ListState”などです。Flinkのランタイムは状態を符号化し、それらをチェックポイントに書き込みます。
生の状態はオペレータが独自のデータ構造の中に保持する状態です。チェックポイントされる時に、それらはチェックポイント内にバイトの系列のみを書き込みます。Flinkは状態のデータ構造について何も知らず、生のバイトのみを見ます。
全てのデータストリームは管理された状態を使うことができますが、生の状態インタフェースはオペレータを実装する時のみ使うことができます。並行度が変更された時に管理された状態を使ってFlinkは自動的に状態を再分配することができ、より良いメモリ管理も行うため、(生の状態では無く)管理された状態を使うことをお勧めします。
The managed keyed state interface provides access to different types of state that are all scoped to the key of the current input element. このことは、状態のこの型はKeyedStream
上でのみ使うことができることを意味します。KeyedStreamはstream.keyBy(…)
を使って生成することができます。
ここで、まず利用可能な状態の異なる型を見てみましょう。そしてそれらがどうやってプログラム内で使うことができるかを見るつもりです。利用可能な状態のプリミティブは以下の通りです:
ValueState<T>
: これは更新され扱うことができる値を保持します (上で述べたように入力要素のキーに適用され、オペレーションが見る各キーについて1つの値が存在するでしょう)。値はupdate(T)
を使って設定することができ、T value()
を使って扱うことができます。
ListState<T>
: これは要素のリストを保持します。要素を追加することができ、現在格納されている全ての要素上でIterable
を扱うことができます。要素は add(T)
を使って追加され、Iterable はIterable<T> get()
を使って扱うことができます。
ReducingState<T>
: これは状態に追加された全ての値の集約を表す1つの値を維持します。インタフェースは ListState
と同じですが、add(T)
を使って追加された要素は指定されたReduceFunction
を使って1つの集合まで減らされます。
FoldingState<T>
: これは状態に追加された全ての値の集約を表す1つの値を維持します。ReducingState
とは対照的に、集約型は状態に追加された要素の型とは異なるかもしれません。インタフェースは ListState
と同じですが、add(T)
を使って追加された要素は指定されたFoldFunction
を使って1つの集合まで折りたたまれます。
状態の全ての型も現在アクティブなキー、つまり入力要素のキー、の状態をクリアするclear()
メソッドを持ちます。
注意 FoldingState
はFlinkの次のバージョンの1つで非推奨になり、将来的に完全に削除されるでしょう。より一般的な代替策が提供されるでしょう。
これらの状態オブジェクトは状態とやり取りするためのみに使われることを覚えておくことが重要です。状態は内部に格納しておくことは必要ではないですが、ディスクあるいはそれ以外のどこかにあるかもしれません。2つ目の覚えておくべきことは、状態から取得した値は入力要素のキーに依存するということです。つまり、ユーザ関数の1つの呼び出しで取得した値は、もし伴うキーが異なる場合は他の呼び出しにおける値と異なるかもしれません。
状態のハンドルを取得するために、StateDescriptor
を作成する必要があります。これは状態の名前 (後で見るように、いくつかの状態を作成することができ、それらは参照することができるようにユニークな名前を持ちます)、状態を保持する値の型、そしておそらくReduceFunction
のようなユーザ定義された関数を保持します。扱いたい状態がどの型かに依存して、ValueStateDescriptor
, ListStateDescriptor
, ReducingStateDescriptor
あるいは FoldingStateDescriptor
のいずれかを生成します。
状態はRuntimeContext
を使ってアクセスされます。つまりrich functions内でのみ可能です。それについての情報は ここ を見てください。しかし手短に例も見てみましょう。RichFunction
内で利用可能なRuntimeContext
は状態にアクセスするためのこれらのメソッドを持ちます:
ValueState<T> getState(ValueStateDescriptor<T>)
ReducingState<T> getReducingState(ReducingStateDescriptor<T>)
ListState<T> getListState(ListStateDescriptor<T>)
FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC>)
This is an example FlatMapFunction
that shows how all of the parts fit together:
public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
/**
* The ValueState handle. The first field is the count, the second field a running sum.
*/
private transient ValueState<Tuple2<Long, Long>> sum;
@Override
public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {
// access the state value
Tuple2<Long, Long> currentSum = sum.value();
// update the count
currentSum.f0 += 1;
// add the second field of the input value
currentSum.f1 += input.f1;
// update the state
sum.update(currentSum);
// if the count reaches 2, emit the average and clear the state
if (currentSum.f0 >= 2) {
out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));
sum.clear();
}
}
@Override
public void open(Configuration config) {
ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
new ValueStateDescriptor<>(
"average", // the state name
TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), // type information
Tuple2.of(0L, 0L)); // default value of the state, if nothing was set
sum = getRuntimeContext().getState(descriptor);
}
}
// this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env)
env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(1L, 4L), Tuple2.of(1L, 2L))
.keyBy(0)
.flatMap(new CountWindowAverage())
.print();
// the printed output will be (1,4) and (1,5)
This example implements a poor man’s counting window. We key the tuples by the first field (in the example all have the same key 1
). 関数はValueState
内にカウントと実行中の合計を格納します。一旦カウントが2に達すると、平均を発行し、0
から再出発するように状態をクリアします。もし最初のフィールド内に異なる値を持つタプルを持っていた場合は、それぞれの異なる入力キーについて異なる状態の値を保持するだろうことに注意してください。
上で説明したインタフェースに加えて、Scala APIは KeyedStream
上の1つのValueState
を使って stateful map()
あるいは flatMap()
関数のためのショートカットを持ちます。ユーザ関数はOption
内のValueState
の現在の値を取得し、状態を更新するために使われるだろう更新された値を返すべきです。
val stream: DataStream[(String, Int)] = ...
val counts: DataStream[(String, Int)] = stream
.keyBy(_._1)
.mapWithState((in: (String, Int), count: Option[Int]) =>
count match {
case Some(c) => ( (in._1, c), Some(c + in._2) )
case None => ( (in._1, 0), Some(in._2) )
})
stateful 関数はもっと一般的なCheckpointedFunction
インタフェース、あるいはListCheckpointed<T extends Serializable>
インタフェースのどちらかを実装するかもしれません。
両方の場合で、キー付けされていない状態はシリアライズ可能なオブジェクトのList
であることが期待され、お互いに依存せず、従って再スケールジに再分配されることが望ましいです。別の言い方をすると、これらのオブジェクトはキー付けされていない状態が再区分される申し分のない粒度です。例として、もし並行度1でBufferingSink
のチェックポイントされた状態が(test1, 2)
および (test2, 2)
を含む場合、並行度が2に増加すると(test1, 2)
はタスク0に達し、一方で (test2, 2)
はタスク1に行くでしょう。
ListCheckpointed
インタフェースは2つのメソッドの実装を必要とします:
List<T> snapshotState(long checkpointId, long timestamp) throws Exception;
void restoreState(List<T> state) throws Exception;
snapshotState()
上でオペレータはチェックポイントへのオブジェクトのリストを返すべきで、restoreState
は回復時にそのようなリストを扱うべきです。もし状態が再パーティション可能でなければ、snapshotState()
内でCollections.singletonList(MY_STATE)
を常に返すことができます。
CheckpointedFunction
インタフェースは以下の2つのメソッドの実装も必要とします:
void snapshotState(FunctionSnapshotContext context) throws Exception;
void initializeState(FunctionInitializationContext context) throws Exception;
チェックポイントが実行されなければならない時はいつでもsnapshotState()
が呼ばれます。ユーザ定義関数が初期化される時、関数が最初に初期化される時、以前のチェックポイントから実際に回復する時はいつでも対応する initializeState()
が呼ばれます。このように考えると、initializeState()
は状態の異なる型が初期化される場所だけでなく、状態の回復ロジックが含まれる場所でもあります。
This is an example of a function that uses CheckpointedFunction
, a stateful SinkFunction
that
uses state to buffer elements before sending them to the outside world:
public class BufferingSink
implements SinkFunction<Tuple2<String, Integer>>,
CheckpointedFunction,
CheckpointedRestoring<ArrayList<Tuple2<String, Integer>>> {
private final int threshold;
private transient ListState<Tuple2<String, Integer>> checkpointedState;
private List<Tuple2<String, Integer>> bufferedElements;
public BufferingSink(int threshold) {
this.threshold = threshold;
this.bufferedElements = new ArrayList<>();
}
@Override
public void invoke(Tuple2<String, Integer> value) throws Exception {
bufferedElements.add(value);
if (bufferedElements.size() == threshold) {
for (Tuple2<String, Integer> element: bufferedElements) {
// send it to the sink
}
bufferedElements.clear();
}
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
checkpointedState.clear();
for (Tuple2<String, Integer> element : bufferedElements) {
checkpointedState.add(element);
}
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
checkpointedState = context.getOperatorStateStore().
getSerializableListState("buffered-elements");
if (context.isRestored()) {
for (Tuple2<String, Integer> element : checkpointedState.get()) {
bufferedElements.add(element);
}
}
}
@Override
public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception {
// this is from the CheckpointedRestoring interface.
this.bufferedElements.addAll(state);
}
}
initializeState
メソッドは引数としてFunctionInitializationContext
を取ります。これはキー付けされていない状態 “containers” を所化するのに便利です。これらはチェックポイント時にキー付けされていないオブジェクトが格納される型ListState
のコンテナです。
this.checkpointedState = context.getOperatorStateStore().getSerializableListState("buffered-elements");
コンテナを初期化した後で、障害の後で回復しているかどうか調べるためにコンテキストのisRestored()
メソッドを使います。これが true
の場合、つまり 回復している場合、回復ロジックが適用されます。
修正されたBufferingSink
のコードの中で示されるように、状態の初期化の間に回復されたこのListState
はsnapshotState()
での詳細の利用のためにクラス変数に保持されます。There the ListState
is cleared of all objects included by the previous checkpoint, and is then filled with the new ones we want to checkpoint.
As a side note, the keyed state can also be initialized in the initializeState()
method. これは指定されたFunctionInitializationContext
を使って行うことができます。
stateful ソースは他のオペレータと比べて少しだけ注意が必要です。状態への更新を行い、コレクションをアトミックに出力(障害時/回復時に確実に1回のセマンティクスのために必要)するために、ユーザはソースのコンテキストからロックを取得する必要があります。
public static class CounterSource
extends RichParallelSourceFunction<Long>
implements ListCheckpointed<Long> {
/** current offset for exactly once semantics */
private Long offset;
/** flag for job cancellation */
private volatile boolean isRunning = true;
@Override
public void run(SourceContext<Long> ctx) {
final Object lock = ctx.getCheckpointLock();
while (isRunning) {
// output and state update are atomic
synchronized (lock) {
ctx.collect(offset);
offset += 1;
}
}
}
@Override
public void cancel() {
isRunning = false;
}
@Override
public List<Long> snapshotState(long checkpointId, long checkpointTimestamp) {
return Collections.singletonList(offset);
}
@Override
public void restoreState(List<Long> state) {
for (Long s : state)
offset = s;
}
}
外部と通信をするためにFlinkによってチェックポイントが完全にFlinkによって通知される場合、幾つかのオペレータは情報が必要かもしれません。この場合、org.apache.flink.runtime.state.CheckpointListener
インタフェースを見てください。