The Broadcast State Pattern
This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.

ブロードキャスト状態パターン #

このセクションでは、実際にブロードキャスト状態を使う方法について学習します。ステートフルストリーム処理の背後にある概念については、ステートフルストリーム処理を参照してください。

提供されるAPI #

提供されるAPIを示すために、その完全な機能を紹介する前に例から始めます。実行例として、様々な色や形のオブジェクトのストリームがあり、特定のパターン、例えば長方形の後に三角形が続くものを見つけたい場合を取り上げます。興味のあるパターンのセットは時間の経過とともに変わると仮定します。

この例では、最初のストリームにはのプロパティを持つItemが含まれます。もう1つのストリームには、Rulesが含まれます。

Itemsのストリームから初めて、同じ色のペアが必要なので、Colorキーを付けるだけです。これにより、同じ色の要素が同じ物理マシーン上に配置されるようになります。

// key the items by color
KeyedStream<Item, Color> colorPartitionedStream = itemStream
                        .keyBy(new KeySelector<Item, Color>(){...});
# key the items by color
color_partitioned_stream = item_stream.key_by(lambda item: ...)

Rulesに進むと、ルールを含むストリームは全てのダウンストリームタスクにブロードキャストされる必要があり、これらのタスクはルールをローカルに保存して、受信する全てのItemsに対して評価できるようにする必要があります。以下のスニペットは、i) ルールのストリームをブロードキャストし、ii) 指定されたMapStateDescriptorを使って、ルールが保存されるブロードキャスト状態を作成します。

// a map descriptor to store the name of the rule (string) and the rule itself.
MapStateDescriptor<String, Rule> ruleStateDescriptor = new MapStateDescriptor<>(
			"RulesBroadcastState",
			BasicTypeInfo.STRING_TYPE_INFO,
			TypeInformation.of(new TypeHint<Rule>() {}));
		
// broadcast the rules and create the broadcast state
BroadcastStream<Rule> ruleBroadcastStream = ruleStream
                        .broadcast(ruleStateDescriptor);
# a map descriptor to store the name of the rule (string) and the rule (Python object) itself.
rule_state_descriptor = MapStateDescriptor("RuleBroadcastState", Types.STRING(), Types.PICKLED_BYTE_ARRAY())

# broadcast the rules and create the broadcast state
rule_broadcast_stream = rule_stream.broadcast(rule_state_descriptor)

最後に、Itemから受信する要素に対してRulesを評価するために、以下を行う必要があります:

  1. 2つのストリームに接続し、
  2. 一致を検知するロジックを指定します。

ストリーム(キー付きあるいはキー無し)をBroadcastStreamに接続するには、BroadcastStreamを引数としてconnect()呼び出しを使って非ブロードキャストストリームに接続します。これは、BroadcastConnectedStreamを返し、特別なタイプのCoProcessFunctionを使ってprocess()を呼び出すことができます。関数には、一致のロジックが含まれます。 関数の正確なタイプは非ブロードキャストストリームの型によって異なります。

  • keyedの場合、関数はKeyedBroadcastProcessFunctionです。
  • non-keyedの場合、関数はBroadcastProcessFunctionです。

非ブロードキャストストリームにキーが設定されているとすると、次のスニペットには上記の呼び出しが含まれます:

接続は、引数としてBroadcastStreamを使って非ブロードキャストストリーム上で呼び出す必要があります。
DataStream<String> output = colorPartitionedStream
                 .connect(ruleBroadcastStream)
                 .process(
                     
                     // type arguments in our KeyedBroadcastProcessFunction represent:
                     //   1. the key of the keyed stream
                     //   2. the type of elements in the non-broadcast side
                     //   3. the type of elements in the broadcast side
                     //   4. the type of the result, here a string
                     
                     new KeyedBroadcastProcessFunction<Color, Item, Rule, String>() {
                         // my matching logic
                     }
                 );
class MyKeyedBroadcastProcessFunction(KeyedBroadcastProcessFunction):
    # my matching logic
    ...

output = color_partitioned_stream \
    .connect(rule_broadcast_stream) \
    .process(MyKeyedBroadcastProcessFunction())

BroadcastProcessFunctionとKeyedBroadcastProcessFunction #

CoProcessFunctionの場合と同様に、これらの関数には実装する2つのプロセスメソッドがあります; ブロードキャストストリーム内の受信要素の処理を担当するprocessBroadcastElement()と、非ブロードキャストストリームに使われるprocessElement()です。メソッドの完全なシグニチャーは次の通りです:

public abstract class BroadcastProcessFunction<IN1, IN2, OUT> extends BaseBroadcastProcessFunction {

    public abstract void processElement(IN1 value, ReadOnlyContext ctx, Collector<OUT> out) throws Exception;

    public abstract void processBroadcastElement(IN2 value, Context ctx, Collector<OUT> out) throws Exception;
}
public abstract class KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> {

    public abstract void processElement(IN1 value, ReadOnlyContext ctx, Collector<OUT> out) throws Exception;

    public abstract void processBroadcastElement(IN2 value, Context ctx, Collector<OUT> out) throws Exception;

    public void onTimer(long timestamp, OnTimerContext ctx, Collector<OUT> out) throws Exception;
}
class BroadcastProcessFunction(BaseBroadcastProcessFunction, Generic[IN1, IN2, OUT]):

    @abstractmethod
    def process_element(value: IN1, ctx: ReadOnlyContext):
        pass
    
    @abstractmethod
    def process_broadcast_element(value: IN2, ctx: Context):
        pass
class KeyedBroadcastProcessFunction(BaseBrodcastProcessFunction, Generic[KEY, IN1, IN2, OUT]):

    @abstractmethod
    def process_element(value: IN1, ctx: ReadOnlyContext):
        pass
    
    @abstractmethod
    def process_broadcast_element(value: IN2, ctx: Context):
        pass
    
    def on_timer(timestamp: int, ctx: OnTimerContext):
        pass

最初に注意すべきことは、どちらの関数も、ブロードキャスト側の要素の処理用のprocessBroadcastElement()メソッドと、非ブロードキャスト側の要素用のprocessElement()の実装を必要とすることです。

2つのメソッドは、提供されるコンテキストが異なります。非ブロードキャスト側はReadOnlyContextを持ちますが、ブロードキャスト側はContextを持ちます。

これら両方のコンテキスト(次の列挙のctx)は、

  1. ブロードキャスト状態へのアクセスを許可します: ctx.getBroadcastState(MapStateDescriptor<K, V> stateDescriptor)
  2. 要素のタイムスタンプをクエリできます: ctx.timestamp(),
  3. 現在のウォーターマークを取得します: ctx.currentWatermark()
  4. 現在の処理時間を取得します: ctx.currentProcessingTime()
  5. 要素を副出力に発行します: ctx.output(OutputTag<X> outputTag, X value)
  1. ブロードキャスト状態へのアクセスを許可します: ctx.get_broadcast_state(state_descriptor: MapStateDescriptor)
  2. 要素のタイムスタンプをクエリできます: ctx.timestamp(),
  3. 現在のウォーターマークを取得します: ctx.current_watermark()
  4. 現在の処理時間を取得します: ctx.current_processing_time()
  5. 要素を副出力に発行します: yield output_tag, value

getBroadcastState()stateDescriptorは上記の.broadcast(ruleStateDescriptor)のstateDescriptorと同一である必要があります。

違いは、それぞれがブロードキャスト状態に与えるアクセスのタイプにあります。ブロードキャスト側にはread-write accessが、非ブロードキャスト側にはread-only accessがあります (従って名前が付けられます)。この理由は、Flinkにはタスク間の通信がないためです。So, to guarantee that the contents in the Broadcast State are the same across all parallel instances of our operator, we give read-write access only to the broadcast side, which sees the same elements across all tasks, and we require the computation on each incoming element on that side to be identical across all tasks. このルールを無視すると、状態の一貫性の保証が破られ、一貫性のない結果になり、デバッグが困難になることがよくあります。

processBroadcastElement()に実装されたロジックは、全ての並列インスタンスに渡って同じ決定的な動作をする必要があります。

最後に、KeyedBroadcastProcessFunctionはキー付きストリームで動作しているため、BroadcastProcessFunctionでは利用できないいくつかの機能が公開されています。つまり:

  1. processElement()メソッドのReadOnlyContextはFlinkの基盤となるタイマーサービスへのアクセスを与えます。これによりイベントおよび/または処理時間を登録できます。When a timer fires, the onTimer() (shown above) is invoked with an OnTimerContext which exposes the same functionality as the ReadOnlyContext plus
  • the ability to ask if the timer that fired was an event or processing time one and
  • to query the key associated with the timer.
  1. processBroadcastElement()メソッドのContextにはメソッドapplyToKeyedState(StateDescriptor<S, VS> stateDescriptor, KeyedStateFunction<KS, S> function)が含まれています。これにより、KeyedStateFunctionを登録して、提供されたstateDescriptorに関連付けられた全てのキーの全ての状態に適用できるようになります。 apply_to_keyed_stateはPyFlinkではまだサポートされていないことに注意してください。

タイマーの登録は、KeyedBroadcastProcessFunctionprocessElement()でのみ可能です。 and only there. processBroadcastElement()メソッドでは、ブロードキャストされた要素に関連付けられたキーがないため、不可能です。
元の例に戻ると、KeyedBroadcastProcessFunctionは次のようになります:

new KeyedBroadcastProcessFunction<Color, Item, Rule, String>() {

    // store partial matches, i.e. first elements of the pair waiting for their second element
    // we keep a list as we may have many first elements waiting
    private final MapStateDescriptor<String, List<Item>> mapStateDesc =
        new MapStateDescriptor<>(
            "items",
            BasicTypeInfo.STRING_TYPE_INFO,
            new ListTypeInfo<>(Item.class));

    // identical to our ruleStateDescriptor above
    private final MapStateDescriptor<String, Rule> ruleStateDescriptor =
        new MapStateDescriptor<>(
            "RulesBroadcastState",
            BasicTypeInfo.STRING_TYPE_INFO,
            TypeInformation.of(new TypeHint<Rule>() {}));

    @Override
    public void processBroadcastElement(Rule value,
                                        Context ctx,
                                        Collector<String> out) throws Exception {
        ctx.getBroadcastState(ruleStateDescriptor).put(value.name, value);
    }

    @Override
    public void processElement(Item value,
                               ReadOnlyContext ctx,
                               Collector<String> out) throws Exception {

        final MapState<String, List<Item>> state = getRuntimeContext().getMapState(mapStateDesc);
        final Shape shape = value.getShape();
    
        for (Map.Entry<String, Rule> entry :
                ctx.getBroadcastState(ruleStateDescriptor).immutableEntries()) {
            final String ruleName = entry.getKey();
            final Rule rule = entry.getValue();
    
            List<Item> stored = state.get(ruleName);
            if (stored == null) {
                stored = new ArrayList<>();
            }
    
            if (shape == rule.second && !stored.isEmpty()) {
                for (Item i : stored) {
                    out.collect("MATCH: " + i + " - " + value);
                }
                stored.clear();
            }
    
            // there is no else{} to cover if rule.first == rule.second
            if (shape.equals(rule.first)) {
                stored.add(value);
            }
    
            if (stored.isEmpty()) {
                state.remove(ruleName);
            } else {
                state.put(ruleName, stored);
            }
        }
    }
}
class MyKeyedBroadcastProcessFunction(KeyedBroadcastProcessFunction):

    def __init__(self):
        self._map_state_desc = MapStateDescriptor("item", Types.STRING(), Types.LIST(Types.PICKLED_BYTE_ARRAY()))
        self._rule_state_desc = MapStateDescriptor("RulesBroadcastState", Types.STRING(), Types.PICKLED_BYTE_ARRAY())
        self._map_state = None
    
    def open(self, ctx: RuntimeContext):
        self._map_state = ctx.get_map_state(self._map_state_desc)
    
    def process_broadcast_element(value: Rule, ctx: KeyedBroadcastProcessFunction.Context):
        ctx.get_broadcast_state(self._rule_state_desc).put(value.name, value)
    
    def process_element(value: Item, ctx: KeyedBroadcastProcessFunction.ReadOnlyContext):
        shape = value.get_shape()

        for rule_name, rule in ctx.get_broadcast_state(self._rule_state_desc).items():

            stored = self._map_state.get(rule_name)
            if stored is None:
                stored = []
            
            if shape == rule.second and len(stored) > 0:
                for i in stored:
                    yield "MATCH: {} - {}".format(i, value)
                stored = []
            
            if shape == rule.first:
                stored.append(value)
            
            if len(stored) == 0:
                self._map_state.remove(rule_name)
            else:
                self._map_state.put(rule_name, stored)

重要な考慮事項 #

提供されるAPIについて説明した後で、このセクションではブロードキャスト状態を使う時に留意すべき重要な点に焦点を当てます。それらは:

  • タスク間の通信はありません: 前述したように、これが、(Keyed)-BroadcastProcessFunctionのブロードキャスト側のみがブロードキャスト状態の内容を変更できる理由です。さらに、ユーザは、全てのタスクが受信要素ごとに同じ方法でブロードキャスト状態の内容を変更することを確認する必要があります。そうしなければ、異なるタスクが異なる内容を持ち、一貫性のない結果が生じる可能性があります。

  • ブロードキャスト状態のイベントの順序はタスクによって異なる場合があります: ストリームの要素をブロードキャストすると全ての要素が(最終的には)全てのダウンストリームタスクに送信されることが保証されますが、要素は異なる順序で各タスクに到着する可能性があります。従って、各受信要素の状態更新は受信イベントの順序に依存してはなりません

  • 全てのタスクがブロードキャスト状態にチェックポイントを作成します: チェックポイントが発生すると、全てのタスクのブロードキャスト状態に同じ要素が含まれますが(チェックポイントバリアは要素を超えません)、1つだけではなく全てのタスクがブロードキャスト状態にチェックポイントを作成します。これはチェックポイントが設定された状態のサイズがp倍(=並列度)大きくなるという犠牲を伴いますが、リストア中に全てのタスクが同じファイルから読み取られることを回避する(したがってホットスポットを回避する)という設計上の決定です。Flinkは、復元/再スケーリング中に重複が無いおよび欠落データが無いことを保証します。 同じかそれ以下の並列度でリカバリする場合、各タスクはチェックポイントが設定された状態を読み取ります。スケールアップすると、各タスクは独自の状態を読み取り、残りのタスク(p_new-p_old)はラウンドロビン形式で前のタスクのチェックポイントを読み取ります。

  • RocksDB状態バックエンドが無い: ブロードキャスト状態は実行時にメモリ内に保持され、それに応じてメモリプロビジョニングを行う必要があります。これは全てのオペレータの状態に当てはまります。

Back to top

inserted by FC2 system