処理関数 (低レベル操作)

ProcessFunction

ProcessFunction は全て(非循環式の)ストリーミングアプリケーションのブロックの基本的な構築にアクセスすることができる、低レベルストリーム処理操作です。

  • イベント (ストリーム要素)
  • 状態 (耐障害性、一貫性)
  • タイマー (イベント時間と処理時間)

ProcessFunction はキー付けされた状態とタイマーへのアクセスを持つFlatMapFunctionのようなものだと考えることができます。入力ストリーム(s)の中で受信した各イベントごとに起動されることでイベントを処理します。

耐障害性の状態のために、他のステートフル機能がキー付けされた状態にアクセスできるのに似た方法で、ProcessFunctionにFlinkのキー付けされた状態RuntimeContextを経由してアクセス可能、にアクセスさせます。キー付けされた状態を持つ全ての関数と似て、ProcessFunctionKeyedStreamへ適用される必要があります: java stream.keyBy("id").process(new MyProcessFunction())

タイマーによりアプリケーションは処理時間およびイベント時間での変化に反応することができます。関数processElement(...)の各呼び出しは、要素のイベント時間のタイムスタンプとTimerServiceへのアクセスを持つContext オブジェクトを取ります。TimerService は将来のイベント/処理時間のインスタンスのためのコールバックを登録するために使うことができます。タイマーの特定の時間になると、onTimer(...) メソッドが呼ばれます。During that call, all states are again scoped to the key with which the timer was created, allowing timers to perform keyed state manipulation as well.

低レベルのJoin

To realize low-level operations on two inputs, applications can use CoProcessFunction. It relates to ProcessFunction in the same way that CoFlatMapFunction relates to FlatMapFunction: the function is bound to two different inputs and gets individual calls to processElement1(...) and processElement2(...) for records from the two different inputs.

Implementing a low level join typically follows this pattern:

  • Create a state object for one input (or both)
  • Update the state upon receiving elements from its input
  • Upon receiving elements from the other input, probe the state and produce the joined result

For example, you might be joining customer data to financial trades, while keeping state for the customer data. If you care about having complete and deterministic joins in the face of out-of-order events, you can use a timer to evaluate and emit the join for a trade when the watermark for the customer data stream has passed the time of that trade.

The following example maintains counts per key, and emits a key/count pair whenever a minute passes (in event time) without an update for that key:

  • The count, key, and last-modification-timestamp are stored in a ValueState, which is implicitly scoped by key.
  • For each record, the ProcessFunction increments the counter and sets the last-modification timestamp
  • The function also schedules a callback one minute into the future (in event time)
  • Upon each callback, it checks the callback’s event time timestamp against the last-modification time of the stored count and emits the key/count if they match (i.e., no further update occurred during that minute)

Note: This simple example could have been implemented with session windows. We use ProcessFunction here to illustrate the basic pattern it provides.

import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.RichProcessFunction;
import org.apache.flink.streaming.api.functions.ProcessFunction.Context;
import org.apache.flink.streaming.api.functions.ProcessFunction.OnTimerContext;
import org.apache.flink.util.Collector;


// the source data stream
DataStream<Tuple2<String, String>> stream = ...;

// apply the process function onto a keyed stream
DataStream<Tuple2<String, Long>> result = stream
    .keyBy(0)
    .process(new CountWithTimeoutFunction());

/**
 * The data type stored in the state
 */
public class CountWithTimestamp {

    public String key;
    public long count;
    public long lastModified;
}

/**
 * The implementation of the ProcessFunction that maintains the count and timeouts
 */
public class CountWithTimeoutFunction extends RichProcessFunction<Tuple2<String, String>, Tuple2<String, Long>> {

    /** The state that is maintained by this process function */
    private ValueState<CountWithTimestamp> state;

    @Override
    public void open(Configuration parameters) throws Exception {
        state = getRuntimeContext().getState(new ValueStateDescriptor<>("myState", CountWithTimestamp.class));
    }

    @Override
    public void processElement(Tuple2<String, Long> value, Context ctx, Collector<Tuple2<String, Long>> out)
            throws Exception {

        // retrieve the current count
        CountWithTimestamp current = state.value();
        if (current == null) {
            current = new CountWithTimestamp();
            current.key = value.f0;
        }

        // update the state's count
        current.count++;

        // set the state's timestamp to the record's assigned event time timestamp
        current.lastModified = ctx.timestamp();

        // write the state back
        state.update(current);

        // schedule the next timer 60 seconds from the current event time
        ctx.timerService().registerEventTimeTimer(current.timestamp + 60000);
    }

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, Long>> out)
            throws Exception {

        // get the state for the key that scheduled the timer
        CountWithTimestamp result = state.value();

        // check if this is an outdated timer or the latest timer
        if (timestamp == result.lastModified) {
            // emit the state
            out.collect(new Tuple2<String, Long>(result.key, result.count));
        }
    }
}
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.ProcessFunction.Context;
import org.apache.flink.streaming.api.functions.ProcessFunction.OnTimerContext;
import org.apache.flink.util.Collector;

// the source data stream
DataStream<Tuple2<String, String>> stream = ...;

// apply the process function onto a keyed stream
DataStream<Tuple2<String, Long>> result = stream
    .keyBy(0)
    .process(new CountWithTimeoutFunction());

/**
 * The data type stored in the state
 */
case class CountWithTimestamp(key: String, count: Long, lastModified: Long)

/**
 * The implementation of the ProcessFunction that maintains the count and timeouts
 */
class TimeoutStateFunction extends ProcessFunction[(String, Long), (String, Long)] {

  /** The state that is maintained by this process function */
  lazy val state: ValueState[CountWithTimestamp] = getRuntimeContext()
      .getState(new ValueStateDescriptor<>("myState", clasOf[CountWithTimestamp]))


  override def processElement(value: (String, Long), ctx: Context, out: Collector[(String, Long)]): Unit = {
    // initialize or retrieve/update the state

    val current: CountWithTimestamp = state.value match {
      case null => 
        CountWithTimestamp(key, 1, ctx.timestamp)
      case CountWithTimestamp(key, count, time) =>
        CountWithTimestamp(key, count + 1, ctx.timestamp)
    }

    // write the state back
    state.update(current)

    // schedule the next timer 60 seconds from the current event time
    ctx.timerService.registerEventTimeTimer(current.timestamp + 60000)
  }

  override def onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[(String, Long)]): Unit = {
    state.value match {
      case CountWithTimestamp(key, count, lastModified) if (lastModified == timestamp) => 
        out.collect((key, count))
      case _ =>
    }
  }
}

上に戻る

TOP
inserted by FC2 system