状態との連動

stateful 関数とオペレータは、より精巧な操作の全ての型について状態を重要な構築ブロックにしながら、個々の要素/イベントの処理を横断してデータを格納します。例えば:

  • アプリケーションがあるイベントのパターンを検索する時、状態はこれまで遭遇したイベントのシーケンスを格納するでしょう。
  • 1秒ごとにイベントを集約する場合、状態は延期された集約を保持します。
  • データポイントのストリーム上で機械学習のモデルを訓練する場合、状態はモデルパラメータの現在のバージョンを保持します。

状態を耐障害性のあるものにするためには、Flinkは状態に注意し、それをチェックポイントする必要があります。多くの場合において、Flinkはアプリケーションのための状態を管理することができます。このことはFlinkがアプリケーションがとても大きな状態を保持することができるようにメモリ管理(必要であれば多分ディスクに分割します)を扱うことを意味します。

このドキュメントはアプリケーションの開発時にFlinkの状態の抽象化を使う方法を説明します。

キー付けされた状態とオペレータの状態

Flinkでの状態の2つの基本的な種類があります: キー付けされた状態オペレーターの状態

キー付けされた状態

キー付けされた状態は常にキーに関連付けされ、KeyedStream上の関数とオペレータ内でのみ使うことができます。

キー毎に確実に1つの状態の分割を使って、キー付けされた状態をパーション化された、あるいはシャードされたオペレータの状態と考えることができます。各キー付けされた状態は論理的に <parallel-operator-instance, key> のユニークな合成に制限されます。そして各キーはキー付けされたオペレータの確実に1つの並行インスタンスに“所属する”ため、これを単純に <operator, key> と考えることができます。

キー付けされた状態は更にキー グループと呼ばれるものに組織化されます。キーグループはFlinkがキー付けされた状態を再分配するアトミックな単位です; 定義された最大並行度と全く同じだけのキーグループがあります。実行の間、キー付けされたオペレータの各並行インスタンスは1つ以上のキーグループのためにキーを使って動作します。

オペレータの状態

オペレータの状態 (あるいは キー付けされていない状態)を使って、各オペレータの状態は1つの並行オペレータインスタンスに制限されます。KafkaソースコネクタはFlinkでのオペレータの状態を使うための良い動機付けの例です。このKafkaコンシューマの各並行インスタンスはオペレータの状態としてトピックパーティションとオフセットのマップを保持します。

並行度が変更された時に、オペレータの状態インタフェースは並行オペレータインスタンスの間で状態の再分配をサポートします。この再分配を行うための異なるスキーマがあるかもしれません; 現在のところ以下が定義されています:

  • リスト形式の再分配: 各オペレータは状態の要素のリストを返します。状態全体は論理的に全てのリストの連結です。再開/再分配 時に、並行オペレータがあるため、リストは結果的に多くの部分リストに分割されます。各オペレータは部分リストを取得します。これは空、あるいは1つ以上の要素を含むかもしれません。

生および管理された状態

キー付けされた状態オペレータの状態 は2つの形式で存在します: 管理 および

管理された状態 は、内部ハッシュテーブルあるいはRocksDBのような、Flinkランタイムによって制御されるデータ構造で表現されます。例としては “ValueState”, “ListState”などです。Flinkのランタイムは状態を符号化し、それらをチェックポイントに書き込みます。

生の状態はオペレータが独自のデータ構造の中に保持する状態です。チェックポイントされる時に、それらはチェックポイント内にバイトの系列のみを書き込みます。Flinkは状態のデータ構造について何も知らず、生のバイトのみを見ます。

全てのデータストリームは管理された状態を使うことができますが、生の状態インタフェースはオペレータを実装する時のみ使うことができます。並行度が変更された時に管理された状態を使ってFlinkは自動的に状態を再分配することができ、より良いメモリ管理も行うため、(生の状態では無く)管理された状態を使うことをお勧めします。

管理されたキー付けされた状態の使用

The managed keyed state interface provides access to different types of state that are all scoped to the key of the current input element. このことは、状態のこの型はKeyedStream上でのみ使うことができることを意味します。KeyedStreamはstream.keyBy(…)を使って生成することができます。

ここで、まず利用可能な状態の異なる型を見てみましょう。そしてそれらがどうやってプログラム内で使うことができるかを見るつもりです。利用可能な状態のプリミティブは以下の通りです:

  • ValueState<T>: これは更新され扱うことができる値を保持します (上で述べたように入力要素のキーに適用され、オペレーションが見る各キーについて1つの値が存在するでしょう)。値はupdate(T) を使って設定することができ、T value()を使って扱うことができます。

  • ListState<T>: これは要素のリストを保持します。要素を追加することができ、現在格納されている全ての要素上でIterable を扱うことができます。要素は add(T)を使って追加され、Iterable はIterable<T> get()を使って扱うことができます。

  • ReducingState<T>: これは状態に追加された全ての値の集約を表す1つの値を維持します。インタフェースは ListState と同じですが、add(T)を使って追加された要素は指定されたReduceFunctionを使って1つの集合まで減らされます。

  • FoldingState<T>: これは状態に追加された全ての値の集約を表す1つの値を維持します。ReducingStateとは対照的に、集約型は状態に追加された要素の型とは異なるかもしれません。インタフェースは ListState と同じですが、add(T)を使って追加された要素は指定されたFoldFunctionを使って1つの集合まで折りたたまれます。

状態の全ての型も現在アクティブなキー、つまり入力要素のキー、の状態をクリアするclear()メソッドを持ちます。

注意 FoldingState はFlinkの次のバージョンの1つで非推奨になり、将来的に完全に削除されるでしょう。より一般的な代替策が提供されるでしょう。

これらの状態オブジェクトは状態とやり取りするためのみに使われることを覚えておくことが重要です。状態は内部に格納しておくことは必要ではないですが、ディスクあるいはそれ以外のどこかにあるかもしれません。2つ目の覚えておくべきことは、状態から取得した値は入力要素のキーに依存するということです。つまり、ユーザ関数の1つの呼び出しで取得した値は、もし伴うキーが異なる場合は他の呼び出しにおける値と異なるかもしれません。

状態のハンドルを取得するために、StateDescriptorを作成する必要があります。これは状態の名前 (後で見るように、いくつかの状態を作成することができ、それらは参照することができるようにユニークな名前を持ちます)、状態を保持する値の型、そしておそらくReduceFunctionのようなユーザ定義された関数を保持します。扱いたい状態がどの型かに依存して、ValueStateDescriptor, ListStateDescriptor, ReducingStateDescriptor あるいは FoldingStateDescriptorのいずれかを生成します。

状態はRuntimeContextを使ってアクセスされます。つまりrich functions内でのみ可能です。それについての情報は ここ を見てください。しかし手短に例も見てみましょう。RichFunction内で利用可能なRuntimeContext は状態にアクセスするためのこれらのメソッドを持ちます:

  • ValueState<T> getState(ValueStateDescriptor<T>)
  • ReducingState<T> getReducingState(ReducingStateDescriptor<T>)
  • ListState<T> getListState(ListStateDescriptor<T>)
  • FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC>)

This is an example FlatMapFunction that shows how all of the parts fit together:

public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {

    /**
     * The ValueState handle. The first field is the count, the second field a running sum.
     */
    private transient ValueState<Tuple2<Long, Long>> sum;

    @Override
    public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {

        // access the state value
        Tuple2<Long, Long> currentSum = sum.value();

        // update the count
        currentSum.f0 += 1;

        // add the second field of the input value
        currentSum.f1 += input.f1;

        // update the state
        sum.update(currentSum);

        // if the count reaches 2, emit the average and clear the state
        if (currentSum.f0 >= 2) {
            out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));
            sum.clear();
        }
    }

    @Override
    public void open(Configuration config) {
        ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
                new ValueStateDescriptor<>(
                        "average", // the state name
                        TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), // type information
                        Tuple2.of(0L, 0L)); // default value of the state, if nothing was set
        sum = getRuntimeContext().getState(descriptor);
    }
}

// this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env)
env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(1L, 4L), Tuple2.of(1L, 2L))
        .keyBy(0)
        .flatMap(new CountWindowAverage())
        .print();

// the printed output will be (1,4) and (1,5)

This example implements a poor man’s counting window. We key the tuples by the first field (in the example all have the same key 1). 関数はValueState内にカウントと実行中の合計を格納します。一旦カウントが2に達すると、平均を発行し、0から再出発するように状態をクリアします。もし最初のフィールド内に異なる値を持つタプルを持っていた場合は、それぞれの異なる入力キーについて異なる状態の値を保持するだろうことに注意してください。

Scala データストリーム APIでの状態

上で説明したインタフェースに加えて、Scala APIは KeyedStream上の1つのValueStateを使って stateful map() あるいは flatMap() 関数のためのショートカットを持ちます。ユーザ関数はOption内のValueStateの現在の値を取得し、状態を更新するために使われるだろう更新された値を返すべきです。

val stream: DataStream[(String, Int)] = ...

val counts: DataStream[(String, Int)] = stream
  .keyBy(_._1)
  .mapWithState((in: (String, Int), count: Option[Int]) =>
    count match {
      case Some(c) => ( (in._1, c), Some(c + in._2) )
      case None => ( (in._1, 0), Some(in._2) )
    })

管理されたオペレータの状態の使用

stateful 関数はもっと一般的なCheckpointedFunction インタフェース、あるいはListCheckpointed<T extends Serializable> インタフェースのどちらかを実装するかもしれません。

両方の場合で、キー付けされていない状態はシリアライズ可能なオブジェクトのListであることが期待され、お互いに依存せず、従って再スケールジに再分配されることが望ましいです。別の言い方をすると、これらのオブジェクトはキー付けされていない状態が再区分される申し分のない粒度です。例として、もし並行度1でBufferingSinkのチェックポイントされた状態が(test1, 2) および (test2, 2)を含む場合、並行度が2に増加すると(test1, 2) はタスク0に達し、一方で (test2, 2) はタスク1に行くでしょう。

ListCheckpointed

ListCheckpointed インタフェースは2つのメソッドの実装を必要とします:

List<T> snapshotState(long checkpointId, long timestamp) throws Exception;

void restoreState(List<T> state) throws Exception;

snapshotState()上でオペレータはチェックポイントへのオブジェクトのリストを返すべきで、restoreState は回復時にそのようなリストを扱うべきです。もし状態が再パーティション可能でなければ、snapshotState()内でCollections.singletonList(MY_STATE)を常に返すことができます。

CheckpointedFunction

CheckpointedFunction インタフェースは以下の2つのメソッドの実装も必要とします:

void snapshotState(FunctionSnapshotContext context) throws Exception;

void initializeState(FunctionInitializationContext context) throws Exception;

チェックポイントが実行されなければならない時はいつでもsnapshotState() が呼ばれます。ユーザ定義関数が初期化される時、関数が最初に初期化される時、以前のチェックポイントから実際に回復する時はいつでも対応する initializeState() が呼ばれます。このように考えると、initializeState() は状態の異なる型が初期化される場所だけでなく、状態の回復ロジックが含まれる場所でもあります。

This is an example of a function that uses CheckpointedFunction, a stateful SinkFunction that uses state to buffer elements before sending them to the outside world:

public class BufferingSink
        implements SinkFunction<Tuple2<String, Integer>>,
                   CheckpointedFunction,
                   CheckpointedRestoring<ArrayList<Tuple2<String, Integer>>> {

    private final int threshold;

    private transient ListState<Tuple2<String, Integer>> checkpointedState;

    private List<Tuple2<String, Integer>> bufferedElements;

    public BufferingSink(int threshold) {
        this.threshold = threshold;
        this.bufferedElements = new ArrayList<>();
    }

    @Override
    public void invoke(Tuple2<String, Integer> value) throws Exception {
        bufferedElements.add(value);
        if (bufferedElements.size() == threshold) {
            for (Tuple2<String, Integer> element: bufferedElements) {
                // send it to the sink
            }
            bufferedElements.clear();
        }
    }

    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        checkpointedState.clear();
        for (Tuple2<String, Integer> element : bufferedElements) {
            checkpointedState.add(element);
        }
    }

    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        checkpointedState = context.getOperatorStateStore().
            getSerializableListState("buffered-elements");

        if (context.isRestored()) {
            for (Tuple2<String, Integer> element : checkpointedState.get()) {
                bufferedElements.add(element);
            }
        }
    }

    @Override
    public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception {
        // this is from the CheckpointedRestoring interface.
        this.bufferedElements.addAll(state);
    }
}

initializeState メソッドは引数としてFunctionInitializationContextを取ります。これはキー付けされていない状態 “containers” を所化するのに便利です。これらはチェックポイント時にキー付けされていないオブジェクトが格納される型ListStateのコンテナです。

this.checkpointedState = context.getOperatorStateStore().getSerializableListState("buffered-elements");

コンテナを初期化した後で、障害の後で回復しているかどうか調べるためにコンテキストのisRestored() メソッドを使います。これが trueの場合、つまり 回復している場合、回復ロジックが適用されます。

修正されたBufferingSinkのコードの中で示されるように、状態の初期化の間に回復されたこのListStatesnapshotState()での詳細の利用のためにクラス変数に保持されます。There the ListState is cleared of all objects included by the previous checkpoint, and is then filled with the new ones we want to checkpoint.

As a side note, the keyed state can also be initialized in the initializeState() method. これは指定されたFunctionInitializationContextを使って行うことができます。

stateful ソース関数

stateful ソースは他のオペレータと比べて少しだけ注意が必要です。状態への更新を行い、コレクションをアトミックに出力(障害時/回復時に確実に1回のセマンティクスのために必要)するために、ユーザはソースのコンテキストからロックを取得する必要があります。

public static class CounterSource
        extends RichParallelSourceFunction<Long>
        implements ListCheckpointed<Long> {

    /**  current offset for exactly once semantics */
    private Long offset;

    /** flag for job cancellation */
    private volatile boolean isRunning = true;

    @Override
    public void run(SourceContext<Long> ctx) {
        final Object lock = ctx.getCheckpointLock();

        while (isRunning) {
            // output and state update are atomic
            synchronized (lock) {
                ctx.collect(offset);
                offset += 1;
            }
        }
    }

    @Override
    public void cancel() {
        isRunning = false;
    }

    @Override
    public List<Long> snapshotState(long checkpointId, long checkpointTimestamp) {
        return Collections.singletonList(offset);
    }

    @Override
    public void restoreState(List<Long> state) {
        for (Long s : state)
            offset = s;
    }
}

外部と通信をするためにFlinkによってチェックポイントが完全にFlinkによって通知される場合、幾つかのオペレータは情報が必要かもしれません。この場合、org.apache.flink.runtime.state.CheckpointListener インタフェースを見てください。

TOP
inserted by FC2 system