API 移行ガイド

Flink 1.2 から変更された2,3のAPIがあります。変更のほとんどはそれら固有のドキュメントの中で説明されます。以下はAPIの変更の統一されたリストで、Flink 1.3へアップグレードする時の移設についての詳細へリンクしています。

TypeSerializer インタフェースの変更

これは状態について独自のTypeSerializerを実装するほとんどのユーザに関連するでしょう。

Flink 1.3から、セーブポイントの回復上のシリアライザの互換性に関連する2つの追加のメソッドが追加されました。これらのメソッドを実装する方法の詳細については シリアライザのアップグレードと互換性の扱いを見てください。

ProcessFunction は常に RichFunctionです

Flink 1.2では、ProcessFunction とその豪華版 RichProcessFunctionが導入されました。Flink 1.3から、RichProcessFunction が削除され、ProcessFunction は今では常にライフサイクル メソッドと実行コンテキストにアクセスするRichFunctionです。

Flink 1.3 のCEPライブラリはAPIのいくつかの変更に繋がる多くの新しい機能と一緒に出荷されます。詳細はCEP移設ドキュメントを見てください。

Flink 1.3では、ユーザが独自のログフレームワークを使えるように、主要なFlinkのアーティファクトは特定のログ依存から綺麗になりました。

Example and quickstart archtypes already have loggers specified and should not be affected. 他の独自のプロジェクトについては、ロガーの依存を追加するようにしてください。例えば、まヴぇnのpom.xmlで、以下を追加することができます:

<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-log4j12</artifactId>
    <version>1.7.7</version>
</dependency>

<dependency>
    <groupId>log4j</groupId>
    <artifactId>log4j</artifactId>
    <version>1.2.17</version>
</dependency>

State documentationで述べたように、Flinkは2つの状態を持ちます: keyednon-keyed 状態 (operator 状態とも呼ばれます)。両方の型がオペレータとユーザ定義の関数の両方で利用可能です。このドキュメントはFlink1.1関数のコードからFlink1.2への移設のプロセスの初めから終わりまで案内し、Flink1.1からの提携されたウィンドウ操作の非推奨に関連するFlink1.2で導入された幾つかの重要な内部的な変更を示します。

移設プロセスは2つの目的を提供するでしょう:

  1. 関数が再スケールのようなFlink1.2で導入された新しい機能を利用できる、

  2. 新しいFlink1.2ジョブが以前のFlink1.1によって生成されたセーブポイントから実行を再開できるようにする。

このガイドのステップに従った後で、実行中のジョブをFlink 1.1を使ってsavepoint を取りそれをFlink 1.2 に開始ポイントとして与えることでFlink 1.1 から Flink 1.2 に簡単に移設することができるでしょう。これによりFlink 1.2のジョブはFlink 1.1の前任者を中止したところから実行を開始することができるでしょう。

ユーザ関数の例

As running examples for the remainder of this document we will use the CountMapper and the BufferingSink functions. 最初の1つはkeyed状態を使った関数の例で、2つ目はnon-keyed 状態です。上で述べた2つの関数のFlink1.1でのコードは以下で紹介します:

public class CountMapper extends RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> {

    private transient ValueState<Integer> counter;

    private final int numberElements;

    public CountMapper(int numberElements) {
        this.numberElements = numberElements;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        counter = getRuntimeContext().getState(
            new ValueStateDescriptor<>("counter", Integer.class, 0));
    }

    @Override
    public void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, Integer>> out) throws Exception {
        int count = counter.value() + 1;
        counter.update(count);

        if (count % numberElements == 0) {
            out.collect(Tuple2.of(value.f0, count));
            counter.update(0); // reset to 0
        }
    }
}

public class BufferingSink implements SinkFunction<Tuple2<String, Integer>>,
    Checkpointed<ArrayList<Tuple2<String, Integer>>> {

    private final int threshold;

    private ArrayList<Tuple2<String, Integer>> bufferedElements;

    BufferingSink(int threshold) {
        this.threshold = threshold;
        this.bufferedElements = new ArrayList<>();
    }

    @Override
    public void invoke(Tuple2<String, Integer> value) throws Exception {
        bufferedElements.add(value);
        if (bufferedElements.size() == threshold) {
            for (Tuple2<String, Integer> element: bufferedElements) {
	        // send it to the sink
	    }
	    bufferedElements.clear();
	}
    }

    @Override
    public ArrayList<Tuple2<String, Integer>> snapshotState(
        long checkpointId, long checkpointTimestamp) throws Exception {
	    return bufferedElements;
    }

    @Override
    public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception {
        bufferedElements.addAll(state);
    }
}

CountMapper は形式(word, 1)のキーによってグループ化された入力ストリームを仮定するRichFlatMapFuctionです。関数はそれぞれやってくるキーについてのカウンタ(ValueState<Integer> counter) を保持し、もしある単語の出現数がユーザが指定した閾値を超えると、その単語自身と出現数を含むタプルを発行します。

BufferingSink は要素(もしかするとCountMapperの出力) を受け取るSinkFunction で、それらを最終的なシンクに発行する前にあるユーザ定義の閾値に達するまでバッファします。これは、データベースあるいは外部のストレージシステムへの多くの高くつく呼び出しを避ける一般的な方法です。耐障害性のある方法でバッファリングを行うために、バッファされた要素は定期的にチェックポイントされるリスト (bufferedElements) に保持されます。

State API の移設

Flink 1.2の新しい機能を利用するために、上のコードは新しい状態の抽象を使うように修正する必要があります。これらの変更を行った後で、ジョブの並行度を変更(スケールアップあるいはダウン)することができ、ジョブの新しいバージョンを前任者が中止したところから開始することができるでしょう。

Keyed State: Something to note before delving into the details of the migration process is that if your function has only keyed state, then the exact same code from Flink 1.1 also works for Flink 1.2 with full support for the new features and full backwards compatibility. 変更はコードの構成を良くするためだけに行われるかもしれませんが、これは単に形式の問題です。

上で述べたことを使って、この章の残りはキー付けの無い状態に注目します。

再スケーリングと新しい状態の抽象

最初の修正は古いCheckpointed<T extends Serializable> 状態インタフェースか新しいものへの移行です。Flink 1.2 では、ステートフルな関数はもっと一般的なCheckpointedFunction インタフェース、あるいはListCheckpointed<T extends Serializable>インタフェースのどちらかを実装することができます。それは意味論的に古いCheckpointedに近いです。

両方の場合で、キー付けされていない状態はシリアライズ可能なオブジェクトのListであることが期待され、お互いに依存せず、従って再スケールジに再分配されることが望ましいです。別の言い方をすると、これらのオブジェクトはキー付けされていない状態が再区分される申し分のない粒度です。例として、もし並行度1でBufferingSinkのチェックポイントされた状態が(test1, 2) および (test2, 2)を含む場合、並行度が2に増加すると(test1, 2) はタスク0に達し、一方で (test2, 2) はタスク1に行くでしょう。

キー付きされた状態およびキー付きされていない無い状態の両方の再スケーリングの背後の原則の詳細については、状態のドキュメントの中で見つけることができます。

ListCheckpointed

ListCheckpointed インタフェースは2つのメソッドの実装を必要とします:

List<T> snapshotState(long checkpointId, long timestamp) throws Exception;

void restoreState(List<T> state) throws Exception;

それらのセマンティクスは古いCheckpointed インタフェースでの対応物と同じです。唯一の違いは、今はsnapshotState() がチェックポイントするためにオブジェクトのリストを返す必要があり、上で述べたように、restoreState はリカバリ時にこのリストを処理する必要があります。状態が再分割可能では無い場合、snapshotState()の中でCollections.singletonList(MY_STATE)を常に返すかもしれません。BufferingSink のための更新されたコードが、以下に含まれます:

public class BufferingSinkListCheckpointed implements
        SinkFunction<Tuple2<String, Integer>>,
        ListCheckpointed<Tuple2<String, Integer>>,
        CheckpointedRestoring<ArrayList<Tuple2<String, Integer>>> {

    private final int threshold;

    private transient ListState<Tuple2<String, Integer>> checkpointedState;

    private List<Tuple2<String, Integer>> bufferedElements;

    public BufferingSinkListCheckpointed(int threshold) {
        this.threshold = threshold;
        this.bufferedElements = new ArrayList<>();
    }

    @Override
    public void invoke(Tuple2<String, Integer> value) throws Exception {
        this.bufferedElements.add(value);
        if (bufferedElements.size() == threshold) {
            for (Tuple2<String, Integer> element: bufferedElements) {
                // send it to the sink
            }
            bufferedElements.clear();
        }
    }

    @Override
    public List<Tuple2<String, Integer>> snapshotState(
            long checkpointId, long timestamp) throws Exception {
        return this.bufferedElements;
    }

    @Override
    public void restoreState(List<Tuple2<String, Integer>> state) throws Exception {
        if (!state.isEmpty()) {
            this.bufferedElements.addAll(state);
        }
    }

    @Override
    public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception {
        // this is from the CheckpointedRestoring interface.
        this.bufferedElements.addAll(state);
    }
}

コード内で見られるように、更新された関数はCheckpointedRestoringインタフェースも実装します。これは後方互換性の理由のためで、詳細はこの章の最後で説明されるでしょう。

CheckpointedFunction

CheckpointedFunction インタフェースも2つのメソッドの実装を必要とします:

void snapshotState(FunctionSnapshotContext context) throws Exception;

void initializeState(FunctionInitializationContext context) throws Exception;

Flink 1.1でのように、snapshotState() はチェックポイントが行われる時にいつでも呼ばれますが、(restoreState()に対応する) initializeState()は障害から回復する場合のみではなく、ユーザ定義関数が初期化される時に毎回呼ばれます。こう考えると、initializeState()は状態の異なる型が初期化される唯一の場所ではなく、むしろ状態の回復ロジックが含まれる場所です。BufferingSinkのためのCheckpointedFunction インタフェースの実装が以下で示されます。

public class BufferingSink implements SinkFunction<Tuple2<String, Integer>>,
        CheckpointedFunction, CheckpointedRestoring<ArrayList<Tuple2<String, Integer>>> {

    private final int threshold;

    private transient ListState<Tuple2<String, Integer>> checkpointedState;

    private List<Tuple2<String, Integer>> bufferedElements;

    public BufferingSink(int threshold) {
        this.threshold = threshold;
        this.bufferedElements = new ArrayList<>();
    }

    @Override
    public void invoke(Tuple2<String, Integer> value) throws Exception {
        bufferedElements.add(value);
        if (bufferedElements.size() == threshold) {
            for (Tuple2<String, Integer> element: bufferedElements) {
                // send it to the sink
            }
            bufferedElements.clear();
        }
    }

    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        checkpointedState.clear();
        for (Tuple2<String, Integer> element : bufferedElements) {
            checkpointedState.add(element);
        }
    }

    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        checkpointedState = context.getOperatorStateStore().
            getSerializableListState("buffered-elements");

        if (context.isRestored()) {
            for (Tuple2<String, Integer> element : checkpointedState.get()) {
                bufferedElements.add(element);
            }
        }
    }

    @Override
    public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception {
        // this is from the CheckpointedRestoring interface.
        this.bufferedElements.addAll(state);
    }
}

initializeState は引数としてFunctionInitializationContextを取ります。これはキー付けされていない状態“container”を初期化するために使われます。これはチェックポイント時にキー付けされていない状態のオブジェクトが格納される型ListStateのコンテナです:

this.checkpointedState = context.getOperatorStateStore().getSerializableListState("buffered-elements");

コンテナを初期化した後で、障害の後で回復しているかどうか調べるためにコンテキストのisRestored() メソッドを使います。これが trueの場合、つまり 回復している場合、回復ロジックが適用されます。

修正されたBufferingSinkのコードの中で示されるように、状態の初期化の間に回復されたこのListStatesnapshotState()での詳細の利用のためにクラス変数に保持されます。There the ListState is cleared of all objects included by the previous checkpoint, and is then filled with the new ones we want to checkpoint.

As a side note, the keyed state can also be initialized in the initializeState() method. これは、Flink 1.1の場合のRuntimeContextの代わりに、引数として渡されるFunctionInitializationContext を使って行うことができます。もしCheckpointedFunctionインタフェースがCountMapper の例で使われる場合、古いopen() メソッドは削除されるかも知れず、新しいsnapshotState()initializeState() メソッドは以下のように見えるでしょう:

public class CountMapper extends RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>
        implements CheckpointedFunction {

    private transient ValueState<Integer> counter;

    private final int numberElements;

    public CountMapper(int numberElements) {
        this.numberElements = numberElements;
    }

    @Override
    public void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, Integer>> out) throws Exception {
        int count = counter.value() + 1;
        counter.update(count);

        if (count % numberElements == 0) {
            out.collect(Tuple2.of(value.f0, count));
            counter.update(0); // reset to 0
        }
    }

    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        // all managed, nothing to do.
    }

    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        counter = context.getKeyedStateStore().getState(
            new ValueStateDescriptor<>("counter", Integer.class, 0));
    }
}

Flink自身はチェックポイント時にキー付けされた状態によって管理されるスナップショットを管理するため、snapshotState() メソッドは空であることに注意してください。

これまでのところFlink 1.2 で導入された新しい機能を利用するために関数をどう修正するかを見てきました。The question that remains is “Can I make sure that my modified (Flink 1.2) job will start from where my already running job from Flink 1.1 stopped?”.

答えは、はい、です。そしてその方法はとても簡単です。キー付けされた状態については、何もする必要はありません。Flink はFlink 1.1からの状態の回復の面倒を見るでしょう。キー付けされていない状態については、上のコードで示されるように新しい関数はCheckpointedRestoring インタフェースを実装しなければなりません。interface, as shown in the code above. This has a single method, the familiar restoreState() from the old Checkpointed interface from Flink 1.1. BufferingSinkの修正されたコードで示されるように、restoreState() メソッドはその前任者と一致します。

揃えられた処理時間のウィンドウ オペレータ

Flink 1.1 で、かつ明確なイビクタあるいはトリガーを持たない処理時間上のオペレータの場合、キー付けされたストリーム上のtimeWindow()WindowOperatorの特別な型を実体化するでしょう。これはAggregatingProcessingTimeWindowOperator あるいはAccumulatingProcessingTimeWindowOperatorのどちらかかもしれません。これらの両方のオペレータは入力要素が順番に到着すると仮定するため、両方のオペレータはaligned ウィンドウ オペレータとして参照されます。要素はウィンドウ オペレータに到着した瞬間の柱時計の時間をタイムスタンプとして取得するため、処理時間での操作時にはこれは有効です。These operators were restricted to using the memory state backend, and had optimized data structures for storing the per-window elements which leveraged the in-order input element arrival.

Flink 1.2では、並んだウィンドウ オペレータは非推奨で、全てのウィンドウ操作は一般的な WindowOperatorを通過します。FlinkはFlink 1.1のセーブポイントでの並んだウィンドウ操作によって格納される状態を透過的に読み込み、それを一般的なWindowOperatorと互換性を持つ形式に変換し、一般的な WindowOperatorを使って実行を再開するため、この移設にはFlink 1.1のジョブのコードの変更は必要ありません。

注意 非推奨ですが、確実にこの目的のために導入された特別な WindowAssigners を使って、Flink 1.2 で並べられたウィンドウ操作をまだ使うことができます。これらのアサイナーはSlidingAlignedProcessingTimeWindowsTumblingAlignedProcessingTimeWindows アサイナーで、それぞれウィンドウをずらし、転がすためのものです。これらのオペレータを使っている間にFlink 1.1のセーブポイントから実行を再開する方法は無いため、並んだウィンドウを使うFlink 1.2のジョブは新しいジョブでなければなりません。

注意 並んだウィンドウのオペレータは非再スケールで、Flink 1.1 と互換性がありません

Flink 1.2で並んだウィンドウ操作を使うためのコードを以下に示します:

// for tumbling windows
DataStream<Tuple2<String, Integer>> window1 = source
	.keyBy(0)
	.window(TumblingAlignedProcessingTimeWindows.of(Time.of(1000, TimeUnit.MILLISECONDS)))
	.apply(your-function)

// for sliding windows
DataStream<Tuple2<String, Integer>> window1 = source
	.keyBy(0)
	.window(SlidingAlignedProcessingTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
	.apply(your-function)
// for tumbling windows
val window1 = source
    .keyBy(0)
    .window(TumblingAlignedProcessingTimeWindows.of(Time.of(1000, TimeUnit.MILLISECONDS)))
    .apply(your-function)

// for sliding windows
val window2 = source
    .keyBy(0)
    .window(SlidingAlignedProcessingTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
    .apply(your-function)

上に戻る

TOP
inserted by FC2 system