ProcessFunction
は全て(非循環式の)ストリーミングアプリケーションのブロックの基本的な構築にアクセスすることができる、低レベルストリーム処理操作です。
ProcessFunction
はキー付けされた状態とタイマーへのアクセスを持つFlatMapFunction
のようなものだと考えることができます。入力ストリーム(s)の中で受信した各イベントごとに起動されることでイベントを処理します。
耐障害性の状態のために、他のステートフル機能がキー付けされた状態にアクセスできるのに似た方法で、ProcessFunction
にFlinkのキー付けされた状態、RuntimeContext
を経由してアクセス可能、にアクセスさせます。キー付けされた状態を持つ全ての関数と似て、ProcessFunction
はKeyedStream
へ適用される必要があります: java stream.keyBy("id").process(new MyProcessFunction())
タイマーによりアプリケーションは処理時間およびイベント時間での変化に反応することができます。関数processElement(...)
の各呼び出しは、要素のイベント時間のタイムスタンプとTimerServiceへのアクセスを持つContext
オブジェクトを取ります。TimerService
は将来のイベント/処理時間のインスタンスのためのコールバックを登録するために使うことができます。タイマーの特定の時間になると、onTimer(...)
メソッドが呼ばれます。During that call, all states are again scoped to the key with which the timer was created, allowing timers to perform keyed state manipulation as well.
To realize low-level operations on two inputs, applications can use CoProcessFunction
. It relates to ProcessFunction
in the same way that CoFlatMapFunction
relates to FlatMapFunction
: the function is bound to two different inputs and
gets individual calls to processElement1(...)
and processElement2(...)
for records from the two different inputs.
Implementing a low level join typically follows this pattern:
For example, you might be joining customer data to financial trades, while keeping state for the customer data. If you care about having complete and deterministic joins in the face of out-of-order events, you can use a timer to evaluate and emit the join for a trade when the watermark for the customer data stream has passed the time of that trade.
The following example maintains counts per key, and emits a key/count pair whenever a minute passes (in event time) without an update for that key:
ValueState
, which is implicitly scoped by key.ProcessFunction
increments the counter and sets the last-modification timestampNote: This simple example could have been implemented with session windows. We use ProcessFunction
here to illustrate
the basic pattern it provides.
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.RichProcessFunction;
import org.apache.flink.streaming.api.functions.ProcessFunction.Context;
import org.apache.flink.streaming.api.functions.ProcessFunction.OnTimerContext;
import org.apache.flink.util.Collector;
// the source data stream
DataStream<Tuple2<String, String>> stream = ...;
// apply the process function onto a keyed stream
DataStream<Tuple2<String, Long>> result = stream
.keyBy(0)
.process(new CountWithTimeoutFunction());
/**
* The data type stored in the state
*/
public class CountWithTimestamp {
public String key;
public long count;
public long lastModified;
}
/**
* The implementation of the ProcessFunction that maintains the count and timeouts
*/
public class CountWithTimeoutFunction extends RichProcessFunction<Tuple2<String, String>, Tuple2<String, Long>> {
/** The state that is maintained by this process function */
private ValueState<CountWithTimestamp> state;
@Override
public void open(Configuration parameters) throws Exception {
state = getRuntimeContext().getState(new ValueStateDescriptor<>("myState", CountWithTimestamp.class));
}
@Override
public void processElement(Tuple2<String, Long> value, Context ctx, Collector<Tuple2<String, Long>> out)
throws Exception {
// retrieve the current count
CountWithTimestamp current = state.value();
if (current == null) {
current = new CountWithTimestamp();
current.key = value.f0;
}
// update the state's count
current.count++;
// set the state's timestamp to the record's assigned event time timestamp
current.lastModified = ctx.timestamp();
// write the state back
state.update(current);
// schedule the next timer 60 seconds from the current event time
ctx.timerService().registerEventTimeTimer(current.timestamp + 60000);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, Long>> out)
throws Exception {
// get the state for the key that scheduled the timer
CountWithTimestamp result = state.value();
// check if this is an outdated timer or the latest timer
if (timestamp == result.lastModified) {
// emit the state
out.collect(new Tuple2<String, Long>(result.key, result.count));
}
}
}
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.ProcessFunction.Context;
import org.apache.flink.streaming.api.functions.ProcessFunction.OnTimerContext;
import org.apache.flink.util.Collector;
// the source data stream
DataStream<Tuple2<String, String>> stream = ...;
// apply the process function onto a keyed stream
DataStream<Tuple2<String, Long>> result = stream
.keyBy(0)
.process(new CountWithTimeoutFunction());
/**
* The data type stored in the state
*/
case class CountWithTimestamp(key: String, count: Long, lastModified: Long)
/**
* The implementation of the ProcessFunction that maintains the count and timeouts
*/
class TimeoutStateFunction extends ProcessFunction[(String, Long), (String, Long)] {
/** The state that is maintained by this process function */
lazy val state: ValueState[CountWithTimestamp] = getRuntimeContext()
.getState(new ValueStateDescriptor<>("myState", clasOf[CountWithTimestamp]))
override def processElement(value: (String, Long), ctx: Context, out: Collector[(String, Long)]): Unit = {
// initialize or retrieve/update the state
val current: CountWithTimestamp = state.value match {
case null =>
CountWithTimestamp(key, 1, ctx.timestamp)
case CountWithTimestamp(key, count, time) =>
CountWithTimestamp(key, count + 1, ctx.timestamp)
}
// write the state back
state.update(current)
// schedule the next timer 60 seconds from the current event time
ctx.timerService.registerEventTimeTimer(current.timestamp + 60000)
}
override def onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[(String, Long)]): Unit = {
state.value match {
case CountWithTimestamp(key, count, lastModified) if (lastModified == timestamp) =>
out.collect((key, count))
case _ =>
}
}
}