Working with State
This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.

状態との連動 #

このセクションでは、ステートフルプログラムを書くためにFlinkが提供するAPIについて学習します。ステートフルストリーム処理の背後にある概念については、ステートフルストリーム処理をご覧ください。

キー付きのDataStream #

キー付き状態を使いたい場合は、まず状態(およびストリーム自体のレコード)を分割するために使うキーをDataStreamで指定する必要があります。Java APIのkeyBy(KeySelector)またはPython APIのkey_by(KeySelector)を使ってキーを指定できます。 これによりKeyedStreamが生成され、キー付き状態を使う操作が可能になります。

キーセレクタ関数は入力として1つのレコードを受け取り、そのレコードのキーを返します。キーは任意のタイプにすることができ、決定論的な計算から導出される必要があります

Flinkのデータモデルはキー-値ペアに基づいていません。従って、データセットの型を物理的にキーと値にまとめる必要はありません。キーは“virtual”です: それらはグループ化のオペレータを導くために実際のデータ上で関数として定義されています。

以下の例は、単にオブジェクトのフィールドを返すkey selector関数の例を示します。

// some ordinary POJO
public class WC {
  public String word;
  public int count;

  public String getWord() { return word; }
}
DataStream<WC> words = // [...]
KeyedStream<WC> keyed = words
  .keyBy(WC::getWord);
// some ordinary case class
case class WC(word: String, count: Int)
val words: DataStream[WC] = // [...]
val keyed = words.keyBy( _.word )
words = # type: DataStream[Row]
keyed = words.key_by(lambda row: row[0])

タプルキーと式キー #

Flinkにはキーを定義する2つの代替方法もあります: Java/Scala APIのタプルキーと式キーです(Python APIではまだサポートされていません)。これにより、オブジェクトのフィールドを選択するためにタプルフィールドのインデックスまたは式を使ってキーを指定できます。現在これらを使うことをはお勧めしませんが、DataStreamのJavadocを参照してそれらについて学ぶことができます。KeySelector関数を使うほうが断然優れています: Javaラムダを使うと使いやすく、実行時のオーバーヘッドが潜在的に少なくなります。 function is strictly superior: with Java lambdas they are easy to use and they have potentially less overhead at runtime.

Back to top

キー付き状態の使用 #

キー付き状態のインタフェースは現在の入力要素のキーを全てスコープとする様々なタイプの状態へのアクセスを提供します。これは、このタイプの状態はKeyedStreamでのみ使えることを意味します。これはJava/Scala APIではstream.keyBy(…)で、Python APIではstream.key_by(…)で作成できます。

ここでは、まず利用可能な状態の様々なタイプの状態を見てから、それらをプログラムでどのように使うかを見ていきます。利用可能な状態のプリミティブは以下の通りです:

  • ValueState: これは更新され取得できる値を保持します (上で述べたように入力要素のキーに適用され、オペレーションが見る各キーについて1つの値が存在するでしょう)。値あhupdate(T)を使って設定でき、T value()を使って取得できます。

  • ListState<T>: これは要素のリストを保持します。要素を追加し、現在格納されている全ての要素に対してIterableを取得できます。要素はadd(T)またはaddAll(List<T>)を使って追加され、IterableはIterable<T> get()を使って取得できます。update(List<T>)を使って既存のリストを上書きすることもできます。

  • ReducingState<T>: これは、状態に追加された全ての値の集計を表す単一の値を保持します。 added to the state. インタフェースはListStateに似ていますが、add(T)を使って追加された要素は、指定されたReduceFunctionを使って集約に還元されます。

  • AggregatingState<IN, OUT>: これは、状態に追加された全ての値の集約を表す単一の値を保持します。ReducingStateと対比的に、集約タイプは状態に追加された要素のタイプとは異なる場合があります。インタフェースはListStateと同じですが、add(IN)を使って追加された要素は指定されたAggregateFunctionを使って集約されます。

  • MapState<UK, UV>: マッピングのリストを保持します。キーと値のペアを状態に入れて、現在保存されているマッピングのIterableを取得できます。マッピングはput(UK, UV)またはputAll(Map<UK, UV>)を使って追加されます。ユーザーキーと関連付けられた値はget(UK)を使って取得できます。マッピング、キー、値のiterableビューは、それぞれentries()keys()values()を使って取得できます。 isEmpty()を使って、このマップにキーと値のマッピングが含まれているかどうかを確認することもできます。

状態の全てのタイプには、現在のアクティブなキー、つまり入力要素のキーの状態をクリアするメソッドclear()もあります。

これらの状態オブジェクトは状態とのインタフェースとしてのみ使われることに留意することが必要です。状態は内部に格納しておくことは必要ではないですが、ディスクあるいはそれ以外のどこかにあるかもしれません。 2番目に留意すべきことは、状態から取得する値は入力要素のキーに依存するということです。したがって、関連するキーが異なる場合、ユーザ関数の1回の呼び出しで取得する値は、別の呼び出しでの値を異なる可能性があります。

状態ハンドルを取得するには、StateDescriptorを作成する必要があります。これは状態の名前 (後で見るように、いくつかの状態を作成することができ、それらは参照することができるようにユニークな名前を持ちます)、状態を保持する値の型、そしておそらくReduceFunctionのようなユーザ定義された関数を保持します。取得する状態のタイプに応じて、ValueStateDescriptorListStateDescriptorAggregatingStateDescriptorReducingStateDescriptorMapStateDescriptorのいずれかを作成します。

状態へのアクセスには、RuntimeContextを使うため、rich functionsでのみ可能です。 詳細については、こちらereを参照してください。ただし、すぐに例も示します。RichFunctionで利用できるRuntimeContextには状態にアクセスするための次のメソッドがあります:

  • ValueState<T> getState(ValueStateDescriptor<T>)
  • ReducingState<T> getReducingState(ReducingStateDescriptor<T>)
  • ListState<T> getListState(ListStateDescriptor<T>)
  • AggregatingState<IN, OUT> getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT>)
  • MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV>)

これは、全ての部分がどのように組み合わされるかを示すFlatMapFunctionの例です:

public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {

    /**
     * The ValueState handle. The first field is the count, the second field a running sum.
     */
    private transient ValueState<Tuple2<Long, Long>> sum;

    @Override
    public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {

        // access the state value
        Tuple2<Long, Long> currentSum = sum.value();

        // update the count
        currentSum.f0 += 1;

        // add the second field of the input value
        currentSum.f1 += input.f1;

        // update the state
        sum.update(currentSum);

        // if the count reaches 2, emit the average and clear the state
        if (currentSum.f0 >= 2) {
            out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));
            sum.clear();
        }
    }

    @Override
    public void open(Configuration config) {
        ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
                new ValueStateDescriptor<>(
                        "average", // the state name
                        TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), // type information
                        Tuple2.of(0L, 0L)); // default value of the state, if nothing was set
        sum = getRuntimeContext().getState(descriptor);
    }
}

// this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env)
env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(1L, 4L), Tuple2.of(1L, 2L))
        .keyBy(value -> value.f0)
        .flatMap(new CountWindowAverage())
        .print();

// the printed output will be (1,4) and (1,5)
class CountWindowAverage extends RichFlatMapFunction[(Long, Long), (Long, Long)] {

  private var sum: ValueState[(Long, Long)] = _

  override def flatMap(input: (Long, Long), out: Collector[(Long, Long)]): Unit = {

    // access the state value
    val tmpCurrentSum = sum.value

    // If it hasn't been used before, it will be null
    val currentSum = if (tmpCurrentSum != null) {
      tmpCurrentSum
    } else {
      (0L, 0L)
    }

    // update the count
    val newSum = (currentSum._1 + 1, currentSum._2 + input._2)

    // update the state
    sum.update(newSum)

    // if the count reaches 2, emit the average and clear the state
    if (newSum._1 >= 2) {
      out.collect((input._1, newSum._2 / newSum._1))
      sum.clear()
    }
  }

  override def open(parameters: Configuration): Unit = {
    sum = getRuntimeContext.getState(
      new ValueStateDescriptor[(Long, Long)]("average", createTypeInformation[(Long, Long)])
    )
  }
}


object ExampleCountWindowAverage extends App {
  val env = StreamExecutionEnvironment.getExecutionEnvironment

  env.fromCollection(List(
    (1L, 3L),
    (1L, 5L),
    (1L, 7L),
    (1L, 4L),
    (1L, 2L)
  )).keyBy(_._1)
    .flatMap(new CountWindowAverage())
    .print()
  // the printed output will be (1,4) and (1,5)

  env.execute("ExampleKeyedState")
}
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment, FlatMapFunction, RuntimeContext
from pyflink.datastream.state import ValueStateDescriptor

class CountWindowAverage(FlatMapFunction):

    def __init__(self):
        self.sum = None

    def open(self, runtime_context: RuntimeContext):
        descriptor = ValueStateDescriptor(
            "average",  # the state name
            Types.PICKLED_BYTE_ARRAY()  # type information
        )
        self.sum = runtime_context.get_state(descriptor)

    def flat_map(self, value):
        # access the state value
        current_sum = self.sum.value()
        if current_sum is None:
            current_sum = (0, 0)

        # update the count
        current_sum = (current_sum[0] + 1, current_sum[1] + value[1])

        # update the state
        self.sum.update(current_sum)

        # if the count reaches 2, emit the average and clear the state
        if current_sum[0] >= 2:
            self.sum.clear()
            yield value[0], int(current_sum[1] / current_sum[0])


env = StreamExecutionEnvironment.get_execution_environment()
env.from_collection([(1, 3), (1, 5), (1, 7), (1, 4), (1, 2)]) \
    .key_by(lambda row: row[0]) \
    .flat_map(CountWindowAverage()) \
    .print()

env.execute()

# the printed output will be (1,4) and (1,5)

この例では、不十分なカウンティングウィンドウを実装します。最初のフィールドによってタプルにキーを設定します(この例では、全てに同じキー1を持ちます)。この関数はカウントと累計をValueStateに保存します。カウントが2に達すると、平均を出力して状態をクリアし、0からやり直します。もし最初のフィールド内に異なる値を持つタプルを持っていた場合は、それぞれの異なる入力キーについて異なる状態の値を保持するだろうことに注意してください。

状態の生存時間(TTL) #

time-to-live (TTL)は任意のタイプのキー状態に割り当てることができます。TTLが設定されており、状態の値が有効期限切れの場合、格納された値は以下で詳しく説明するベストエフォートベースでクリーンアップされます。

全ての状態コレクションタイプはエントリごとのTTLをサポートします。これは、リスト要素とマップエントリは独立して期限切れになることを意味します。

状態のTTLを使うには、最初にStateTtlConfig設定オブジェクトを構築する必要があります。次に設定を渡すことで、任意の状態のdescriptorのTTL機能を有効にできます。

import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;

StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
    .build();
    
ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("text state", String.class);
stateDescriptor.enableTimeToLive(ttlConfig);
import org.apache.flink.api.common.state.StateTtlConfig
import org.apache.flink.api.common.state.ValueStateDescriptor
import org.apache.flink.api.common.time.Time

val ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
    .build
    
val stateDescriptor = new ValueStateDescriptor[String]("text state", classOf[String])
stateDescriptor.enableTimeToLive(ttlConfig)
from pyflink.common.time import Time
from pyflink.common.typeinfo import Types
from pyflink.datastream.state import ValueStateDescriptor, StateTtlConfig

ttl_config = StateTtlConfig \
  .new_builder(Time.seconds(1)) \
  .set_update_type(StateTtlConfig.UpdateType.OnCreateAndWrite) \
  .set_state_visibility(StateTtlConfig.StateVisibility.NeverReturnExpired) \
  .build()

state_descriptor = ValueStateDescriptor("text state", Types.STRING())
state_descriptor.enable_time_to_live(ttl_config)

設定には、考量すべきいくつかのオプションがあります:

newBuilderメソッドの最初のパラメータは必須であり、有効期限の値です。

更新タイプは状態TTLが更新されるタイミングを設定します(デフォルトではOnCreateAndWriteです):

  • StateTtlConfig.UpdateType.OnCreateAndWrite - 作成および書き込みアクセスのみ

  • StateTtlConfig.UpdateType.OnReadAndWrite - 読み取りアクセスの時も

    (注意: 状態の可視性をStateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUpにした場合、同時に状態読み取りキャッシュが無効になり、PyFlinkでp邪フォーマンスがいくらか低下します。

状態の可視性は、期限切れの値がまだクリーンアップされていない場合に、読み取りアクセス時に期限切れの値を返すかどうかを設定します(デフォルトはNeverReturnExpired):

  • StateTtlConfig.StateVisibility.NeverReturnExpired - 期限切れの値は返されません

    (注意: 状態のread/writekキャッシュが無効になり、PyFlinkでパフォーマンスがいくらか低下します)

  • StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp - まだ利用可能であれば返されます

NeverReturnExpiredの場合、期限切れの状態は、削除する必要がある場合でも、あたかも存在しないかのうように動作します。このオプションは、TTLの後に現ミッツにデータを読み取りアクセスできなくする必要があるユースケースに役立ちます。

別のオプションReturnExpiredIfNotCleanedUpにより、クリーンアップ前に期限切れの状態を返すことができます。

注意:

  • 状態バックエンドは、ユーザ値とともに最終更新のタイムスタンプを保存します。つまりこの機能を有効にすると、状態ストレージの消費量が増加します。 ヒープ状態バックエンドは、ユーザ状態オブジェクトへの参照を含む追加のJavaオブジェクトとプリミティブなLong値をメモリに格納します。RocksDB状態バックエンドは、保存された値、リストエントリ、マップエントリごとに8倍とを追加します。

  • 現在、処理時間を参照するTTLのみがサポートされています。

  • TTL無しで以前に設定された状態をTTL対応descriptorを使って回復しようとしたり、その逆を試みようとすると、互換性のエラーとStateMigrationExceptionが発生します。

  • TTL設定はチェックポイントやセーブポイントの一部ではなく、現在の実行中のジョブでFlinkがどう扱うかの設定です。

  • TTLを短い値から長い値に調整してチェックポイント状態を回復することはお勧めできません。潜在的なデータエラーが発生する可能性があります。

  • TTLを使ったマップ状態は、現在のところ、ユーザ値のシリアライザがnull値を処理できる場合のみnullユーザ値をサポートします。 シリアライザがnull値をサポートしない場合、シリアライズ化された形式で追加のバイトを犠牲にしてNullableSerializerでラップできます。

  • TTLが有効な設定では、実際には非推奨になっているStateDescriptordefaultValueは効果が無くなります。これは、セマンティクスをより明確にし、状態のnullまたは期限切れの場合にユーザがデフォルト値を手動で管理できるようにすることを目的としています。

期限切れの状態のクリーンアップ #

デフォるtおでは、期限切れの値は読み取り時に明示的に削除され、設定sらえた状態バックエンドでサポートされている場合は、バックグランドで定期的にガベージコレクトされます。バックグラウンドクリーンアップはStateTtlConfigで無効にできます:

import org.apache.flink.api.common.state.StateTtlConfig;
StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .disableCleanupInBackground()
    .build();
import org.apache.flink.api.common.state.StateTtlConfig
val ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .disableCleanupInBackground
    .build
from pyflink.common.time import Time
from pyflink.datastream.state import StateTtlConfig

ttl_config = StateTtlConfig \
  .new_builder(Time.seconds(1)) \
  .disable_cleanup_in_background() \
  .build()

バックグラウンドでの特殊なクリーンアップをよりきめ細かく制御する場合は、以下で示すように個々に設定できます。 現在のところ、ヒープ状態バックエンドはインクリメンタルクリーンアップに依存しており、RocksDBバックエンドはバックエンドクリーンアップに圧縮フィルタを使います。

完全あスナップショットでのクリーンアップ #

さらに、完全な状態のスナップショットを取った時点でクリーンアップをアクティブにするとサイズが削減されます。現在の実装では、ローカル状態はクリーンアップされませんが、以前のスナップショットから復元する場合は、削除された期限切れ状態は含まれません。 It can be configured in StateTtlConfig:

import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.time.Time;

StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .cleanupFullSnapshot()
    .build();
import org.apache.flink.api.common.state.StateTtlConfig
import org.apache.flink.api.common.time.Time

val ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .cleanupFullSnapshot
    .build
from pyflink.common.time import Time
from pyflink.datastream.state import StateTtlConfig

ttl_config = StateTtlConfig \
  .new_builder(Time.seconds(1)) \
  .cleanup_full_snapshot() \
  .build()

このオプションはRocksDB状態バックエンドのincrementalチェックポイントには適用されません。

既存のジョブの場合、個のクリーンアップ戦略は、例えばセーブポイントから再起動後に、StateTtlConfigでいつでも有効または無効にできます。
インクリメンタルクリーンアップ #

別のオプションは、一部の状態エントリのクリーンアップを段階的にトリガーすることです。 トリガーは、各状態アクセスまたは各レコード処理からのコールバックにすることができます。 個のクリーンアップ戦略が特定の状態に対してアクティブである場合、ストレージバックエンドはそのすべてのエントリに渡ってこの状態の遅延グローバルイテレータを保持します。 インクリメンタルクリーンアップがトリガーされる旅に、イテレータは進められます。 通過した状態エントリがチェックされ、期限切れのものはクリーンアップされます。

この機能はStateTtlConfigで設定できます:

import org.apache.flink.api.common.state.StateTtlConfig;
 StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .cleanupIncrementally(10, true)
    .build();
import org.apache.flink.api.common.state.StateTtlConfig
val ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .cleanupIncrementally(10, true)
    .build
from pyflink.common.time import Time
from pyflink.datastream.state import StateTtlConfig

ttl_config = StateTtlConfig \
  .new_builder(Time.seconds(1)) \
  .cleanup_incrementally(10, True) \
  .build()

この戦略には2つのパラメータがあります。1つ目は各クリーンアップトリガーごとのチェックされた状態エントリの数です。 各状態アクセスごとに常にトリガーされます。 2つ目は各レコードの処理ごとにクリーンアップを追加でトリガーするかどうかを定義します。 ヒープバックエンドのデフォルトのバックグラウンドクリーンアップは、レコード処理ごとのクリーンアップ無しに5つのエントリがチェックされます。

注意:

  • 状態へのアクセスがは発生しない場合か、レコードが処理されない場合、期限切れの状態が残ります。
  • インクリメンタルクリーンアップに時間が掛かると、レコード処理のレイテンシが増加します。
  • 現時点では、インクリメンタルクリーンアップjはヒープ状態バックエンドに対してのみ実装されています。RocksDBに設定しても効果はありません。
  • ヒープ状態バックエンドが同期スナップショットで使われている場合、グローバルイテレータは同時変更をサポートしていない特定の実装のため、イテレート中に全てのキーのコピーを補助します。 この機能を有効にすると、メモリの消費が増加します。同期スナップショットにはこの問題はありません。
  • 既存のジョブの場合、個のクリーンアップ戦略は、例えばセーブポイントから再起動後に、StateTtlConfigでいつでも有効または無効にできます。
RocksDB圧縮中のクリーンアップ #

RocksDB状態バックエンドが使われている場合、FlinkはバックグランドクリーンアップのためにFlink固有の圧縮フィルタが呼び出されます。 RocksDBは定期的に非同期圧縮を実行して、 状態の更新をマージし、ストレージを削減します。 Flinkのコンパクションフィルタは状態エントリの有効期限タイムスタンプをTTLでチェックし、期限切れの値を除外します。

この機能はStateTtlConfigで設定できます:

import org.apache.flink.api.common.state.StateTtlConfig;

StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .cleanupInRocksdbCompactFilter(1000, Time.hours(1))
    .build();
import org.apache.flink.api.common.state.StateTtlConfig

val ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .cleanupInRocksdbCompactFilter(1000, Time.hours(1))
    .build
from pyflink.common.time import Time
from pyflink.datastream.state import StateTtlConfig

ttl_config = StateTtlConfig \
  .new_builder(Time.seconds(1)) \
  .cleanup_in_rocksdb_compact_filter(1000, Time.hours(1)) \
  .build()

RocksDBのcompaction圧縮フィルタは、一定数の状態エントリを処理した後、有効期限の確認に使われる現在のタイムスタンプを毎回Flinkからクエリします。 これを変更して、独自の値をStateTtlConfig.newBuilder(...).cleanupInRocksdbCompactFilter(long queryTimeAfterNumEntries)メソッドに渡せます。 タイムスタンプをより頻繁に更新するとクリーンアップ速度が向上しますが、ネイティブコードからJNI呼び出しが使われるため、圧縮パフォーマンスが低下します。 RocksDBバックエンドのデフォルトのバックグランドクリーンアップは1000エントリが処理されるたびに現在のタイムスタンプをクエリします。

定期的な圧縮は、特にほとんどアクセスされない状態エントリの場合、期限切れの状態エントリのクリーンアップを高速化できます。 この値より古いファイルは圧縮のために選択され、以前と同じレベルに再書き込みされます。 ファイルが定期的に圧縮フィルタを通過するようにします。 これを変更してカスタム値をStateTtlConfig.newBuilder(...).cleanupInRocksdbCompactFilter(long queryTimeAfterNumEntries, Time periodicCompactionTime)メソッドに渡すことができます。 定期的な圧縮のデフォルト値は30日です。 0に設定して定期的な圧縮をオフにしたり、小さな値を設定して期限切れの状態エントリのクリーンアップを高速化することもできますが、より多くの圧縮がトリガーされます。

FlinkCompactionFilterのデバッグレベルをアクティブにすることで、RocksDBフィルタのネイティブコードからデバッグログをアクティブにできます。

log4j.logger.org.rocksdb.FlinkCompactionFilter=DEBUG

注意:

  • 圧縮中にTTLフィルタを呼び出すと速度が低下します。 TTLフィルタは、最終アクセスのタイムスタンプを解析し、圧縮されるキーごとに保存された全ての状態エントリの有効期限を確認する必要があります。 コレクション状態タイプ(リストまたはマップ)の場合、チェックは格納された要素ごとにも呼び出されます。
  • この機能が非固定バイト数の要素を持つリスト状態で使われる場合、ネイティブTTLフィルタは少なくとも最初の要素の有効期限が切れている各状態エントリごとに、JNI経由で要素のFlink Javaタイプシリアライザを追加で呼び出して次の有効期限切れ要素のオフセットを決める必要があります。
  • 既存のジョブの場合、個のクリーンアップ戦略は、例えばセーブポイントから再起動後に、StateTtlConfigでいつでも有効または無効にできます。
  • 定期的な圧縮はTTLが有効な場合のみ機能します。

Scala データストリーム APIでの状態 #

上記のインタフェースに加えて、Scala APIにはKeyedStream上の単一のValueStateを持つステートフルなmap()またはflatMap()関数のショートカットがあります。ユーザ関数はOptionValueStateの現在の値を取得し、状態を更新するために使われる更新された値を返す必要があります。

val stream: DataStream[(String, Int)] = ...

val counts: DataStream[(String, Int)] = stream
  .keyBy(_._1)
  .mapWithState((in: (String, Int), count: Option[Int]) =>
    count match {
      case Some(c) => ( (in._1, c), Some(c + in._2) )
      case None => ( (in._1, 0), Some(in._2) )
    })

オペレータの状態 #

オペレータの状態 (あるいは non-keyed state)は、1つの並列オペレータインスタンスにバインドされた状態です。Kafka Connectorは、Flinkでのオペレータ状態の使用を動機づける良い例です。Kafkaコンシューマの各並列インスタンスは、オペレータ状態としてトピックパーティションとオフセットのマップを保持します。

オペレータ状態インタフェースは、並列度が変更された場合の並列オペレータインスタンス間の状態の再配布をサポートします。この再配布を行うには様々なスキームがあります。

一般的なステートフルなFlinkアプリケーションでは、オペレータ状態は必要ありません。これは主に、ソース/シンクの実装や状態を分割するためのキーが無いシナリオで使われる特殊な状態です。

注意: オペレータ状態はPython DataStream APIではまだサポートされません。

ブロードキャスト状態 #

Broadcast StateOperator Stateの特別なタイプです。これは、1つのストリームのレコードを全てのダウンストリームタスクにブロードキャストする必要があるユースケースをサポートするために導入され、全てのサブタスク間で同じ状態を維持するために使われます。この状態は、2番目のストリームのレコードの処理中にアクセスできます。ブロードキャスト状態が自然に適合する例として、次のようなものがあります。別のストリームからの全ての要素に対して評価したい一連のルールを含む低スループットのストリームを考えます。上記のタイプのユースケースを念頭に置くと、ブロードキャスト状態は次の点に他のオペレータ状態とは異なります:

  1. マップ形式を持ち
  2. 入力としてbroadcastedストリームとnon-broadcastedストリームを持つ特定のオペレータのみが利用可能で、
  3. このようなオペレータは異なる名前を持つ複数のブロードキャスト状態を持てます。

Back to top

オペレータ状態の使用 #

オペレータ状態を使うには、ステートフル関数でCheckpointedFunctionインタフェースを実装します。

CheckpointedFunction #

CheckpointedFunctionインタフェースは様々な再配布スキーマを持つ非キー付け状態へのアクセスを提供します。これには2つのメソッドの実装が必要です:

void snapshotState(FunctionSnapshotContext context) throws Exception;

void initializeState(FunctionInitializationContext context) throws Exception;

チェックポイントを実行する必要がある場合は常にsnapshotState()が呼ばれます。対応するinitializeState()は、関数が最初に初期化される時、または関数が以前のチェックポイントから実際に回復する時など、ユーザ定義関数が初期化されるたびに呼び出されます。これを考慮すると、initializeState()は様々なタイプの状態が初期化される場所であるだけではなく、状態回復ロジックが含まれる場所でもあります。

現在、リスト形式のオペレータ状態がサポートされます。状態は、お互いに独立したシリアライズ化可能なオブジェクトのListであることが期待されるため、再スケーリング時に再配布できます。言い換えると、これらオブジェクトは、キー無しの状態を再配布できる最も細かい粒度です。状態アクセス方法に応じて、次の再配布スキームが定義されます:

  • Even-split redistribution: 各オペレータは状態要素のリストを返します。状態全体は論理的に全てのリストを連結したものです。再開/再分配 時に、並行オペレータがあるため、リストは結果的に多くの部分リストに分割されます。 各オペレータは部分リストを取得します。これは空、あるいは1つ以上の要素を含むかもしれません。 例として、並列度1のオペレータのチェックポイントが設定された状態に要素element1element2が含まれている場合、並列度を2に増やすと、element1はオペレータインスタンス0になり、一方element2はオペレータインスタンス1になる可能性があります。

  • Union redistribution: 各オペレータは状態要素のリストを返します。状態全体は論理的に全てのリストを連結したものです。回復/再配布字に、各オペレータは状態要素の完全なリストを取得します。リストのカーディナリティが高い可能性がある場合は、この機能を使わないでください。チェックポイントメタデータは各リストエントリへのオフセットを格納しするため、RPCフレームサイズやメモリ不足エラーが発生する可能性があります。

以下は、要素を外部に送信する前にそれらをバッファするためにCheckpointedFunctionを使うステートフルSinkFunctionの例です。これは、基本的な均等分割再配布リストの状態を示しています:

public class BufferingSink
        implements SinkFunction<Tuple2<String, Integer>>,
                   CheckpointedFunction {

    private final int threshold;

    private transient ListState<Tuple2<String, Integer>> checkpointedState;

    private List<Tuple2<String, Integer>> bufferedElements;

    public BufferingSink(int threshold) {
        this.threshold = threshold;
        this.bufferedElements = new ArrayList<>();
    }

    @Override
    public void invoke(Tuple2<String, Integer> value, Context contex) throws Exception {
        bufferedElements.add(value);
        if (bufferedElements.size() >= threshold) {
            for (Tuple2<String, Integer> element: bufferedElements) {
                // send it to the sink
            }
            bufferedElements.clear();
        }
    }

    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        checkpointedState.update(bufferedElements);
    }

    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        ListStateDescriptor<Tuple2<String, Integer>> descriptor =
            new ListStateDescriptor<>(
                "buffered-elements",
                TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}));

        checkpointedState = context.getOperatorStateStore().getListState(descriptor);

        if (context.isRestored()) {
            for (Tuple2<String, Integer> element : checkpointedState.get()) {
                bufferedElements.add(element);
            }
        }
    }
}
class BufferingSink(threshold: Int = 0)
  extends SinkFunction[(String, Int)]
    with CheckpointedFunction {

  @transient
  private var checkpointedState: ListState[(String, Int)] = _

  private val bufferedElements = ListBuffer[(String, Int)]()

  override def invoke(value: (String, Int), context: Context): Unit = {
    bufferedElements += value
    if (bufferedElements.size >= threshold) {
      for (element <- bufferedElements) {
        // send it to the sink
      }
      bufferedElements.clear()
    }
  }

  override def snapshotState(context: FunctionSnapshotContext): Unit = {
    checkpointedState.update(bufferedElements.asJava)
  }

  override def initializeState(context: FunctionInitializationContext): Unit = {
    val descriptor = new ListStateDescriptor[(String, Int)](
      "buffered-elements",
      TypeInformation.of(new TypeHint[(String, Int)]() {})
    )

    checkpointedState = context.getOperatorStateStore.getListState(descriptor)

    if(context.isRestored) {
      for(element <- checkpointedState.get().asScala) {
        bufferedElements += element
      }
    }
  }

}

initializeStateメソッドは引数としてFunctionInitializationContextを取ります。これはキーの無い状態"containers"を初期化するために使われます。これらはタイプListStateのコンテナで、キー無しの状態オブジェクトがチェックポイント時に保存されます。

キー付き状態と同様に、状態名と状態が保持する値の型についての情報方を含むStateDescriptorを塚tって状態がどのように初期化されるかに注目してください。

ListStateDescriptor<Tuple2<String, Integer>> descriptor =
    new ListStateDescriptor<>(
        "buffered-elements",
        TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}));

checkpointedState = context.getOperatorStateStore().getListState(descriptor);
val descriptor = new ListStateDescriptor[(String, Long)](
    "buffered-elements",
    TypeInformation.of(new TypeHint[(String, Long)]() {})
)

checkpointedState = context.getOperatorStateStore.getListState(descriptor)
状態アクセスメソッドの命名規則には、その再配布パターンとそれに続く状態構造が含まれています。例えば、回復時にunion redistribution schemeでリスト状態を使うには、getUnionListState(descriptor)を使って状態にアクセスします。 メソッド名に再配布パターンが含まれていない場合、例えば getListState(descriptor)、単に基本的な均等分割再配布スキームが使われることを意味します。

コンテナを初期化した後、コンテキストのisRestored()メソッドを使って障害後に回復しているかどうかを確認します。trueの場合、つまり回復している場合、回復ロジックが適用されます。

変更されたBufferingSinkのコードで示されているように、状態の初期化中に回復されたこのListStateは、snapshotState()で将来使えるようにクラス変数に保持されます。ここでListStateから前のチェックポイントに含まれていた全てのオブジェクトがクリアされ、次にチェックポイントを作成する新しいオブジェクトが埋め込まれます。

補足として、キー付き状態はinitializeState()メソッドで初期化することもできます。これは、提供されているFunctionInitializationContextを使って行えます。

stateful ソース関数 #

stateful ソースは他のオペレータと比べて少しだけ注意が必要です。 状態と出力コレクションの更新をアトミックに行うため(失敗/回復時に確実に1回のセマンティクスに必要)、ユーザはソースのコンテキストからロックを取得する必要があります。

public static class CounterSource
        extends RichParallelSourceFunction<Long>
        implements CheckpointedFunction {

    /**  current offset for exactly once semantics */
    private Long offset = 0L;

    /** flag for job cancellation */
    private volatile boolean isRunning = true;
    
    /** Our state object. */
    private ListState<Long> state;

    @Override
    public void run(SourceContext<Long> ctx) {
        final Object lock = ctx.getCheckpointLock();

        while (isRunning) {
            // output and state update are atomic
            synchronized (lock) {
                ctx.collect(offset);
                offset += 1;
            }
        }
    }

    @Override
    public void cancel() {
        isRunning = false;
    }

    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        state = context.getOperatorStateStore().getListState(new ListStateDescriptor<>(
                "state",
                LongSerializer.INSTANCE));
                
        // restore any state that we might already have to our fields, initialize state
        // is also called in case of restore.
        for (Long l : state.get()) {
            offset = l;
        }
    }

    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        state.update(Collections.singletonList(offset));
    }
}
class CounterSource
       extends RichParallelSourceFunction[Long]
       with CheckpointedFunction {

  @volatile
  private var isRunning = true

  private var offset = 0L
  private var state: ListState[Long] = _

  override def run(ctx: SourceFunction.SourceContext[Long]): Unit = {
    val lock = ctx.getCheckpointLock

    while (isRunning) {
      // output and state update are atomic
      lock.synchronized({
        ctx.collect(offset)

        offset += 1
      })
    }
  }

  override def cancel(): Unit = isRunning = false
  
  override def initializeState(context: FunctionInitializationContext): Unit = {
    state = context.getOperatorStateStore.getListState(
      new ListStateDescriptor[Long]("state", classOf[Long]))

    for (l <- state.get().asScala) {
      offset = l
    }
  }

  override def snapshotState(context: FunctionSnapshotContext): Unit = {
    state.update(java.util.Collections.singletonList(offset))
  }
}

外部と通信をするためにFlinkによってチェックポイントが完全にFlinkによって通知される場合、幾つかのオペレータは情報が必要かもしれません。この場合、org.apache.flink.api.common.state.CheckpointListenerインタフェースを参照してください。

Back to top

inserted by FC2 system