ProcessFunction
は全て(非循環式の)ストリーミングアプリケーションのブロックの基本的な構築にアクセスすることができる、低レベルストリーム処理操作です。
ProcessFunction
はキー付けされた状態とタイマーへのアクセスを持つFlatMapFunction
のようなものだと考えることができます。入力ストリーム(s)の中で受信した各イベントごとに起動されることでイベントを処理します。
耐障害性の状態のために、他のステートフル機能がキー付けされた状態にアクセスできるのに似た方法で、ProcessFunction
にFlinkのキー付けされた状態、RuntimeContext
を経由してアクセス可能、にアクセスさせます。キー付けされた状態を持つ全ての関数と似て、ProcessFunction
はKeyedStream
へ適用される必要があります: java stream.keyBy("id").process(new MyProcessFunction())
タイマーによりアプリケーションは処理時間およびイベント時間での変化に反応することができます。関数processElement(...)
の各呼び出しは、要素のイベント時間のタイムスタンプとTimerServiceへのアクセスを持つContext
オブジェクトを取ります。TimerService
は将来のイベント/処理時間のインスタンスのためのコールバックを登録するために使うことができます。タイマーの特定の時間になると、onTimer(...)
メソッドが呼ばれます。呼び出しの間、タイマーがキー付けされた操作も実施できるようにしながら、全ての状態はタイマーが生成された時のキーに再びスコープされます。
2つの入力上に低レベルのオペレーションを実現するために、申請はCoProcessFunction
を使うことができます。CoFlatMapFunction
が FlatMapFunction
に関連するのと同じ方法で ProcessFunction
に関連します: 関数は2つの異なる入力に縛られ、2つの異なる入力から各レコードについてprocessElement1(...)
と processElement2(...)
へのそれぞれの呼び出しを取得します。
低レベルのjoinの実装は一般的にこのパターンに従います:
例えば、顧客データについての状態を維持しながら、顧客データを財務売買にjoinするかもしれません。順番がばらばらなイベントにも関わらず完全で決定論的なjoinを持つことに関心がある場合、顧客データのストリームのためのウォーターマークが売買の時間を過ぎた時に売買のためのjoinを評価および発行するためにタイマーを使うことができます。
以下の例はキー毎のカウントを維持し、キーのための更新無しに、1分すぎた(イベント時間)時にいつでもキー/カウントのペアを発行します。
ValueState
に格納されます。これは暗黙のうちにキーによって範囲を付けられます。ProcessFunction
はカウンターを増やし、最後に修正されたタイムスタンプを設定します。Note: This simple example could have been implemented with session windows. それが提供する基本的なパターンを悦明するために、ここではProcessFunction
を使います。
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.RichProcessFunction;
import org.apache.flink.streaming.api.functions.ProcessFunction.Context;
import org.apache.flink.streaming.api.functions.ProcessFunction.OnTimerContext;
import org.apache.flink.util.Collector;
// the source data stream
DataStream<Tuple2<String, String>> stream = ...;
// apply the process function onto a keyed stream
DataStream<Tuple2<String, Long>> result = stream
.keyBy(0)
.process(new CountWithTimeoutFunction());
/**
* The data type stored in the state
*/
public class CountWithTimestamp {
public String key;
public long count;
public long lastModified;
}
/**
* The implementation of the ProcessFunction that maintains the count and timeouts
*/
public class CountWithTimeoutFunction extends RichProcessFunction<Tuple2<String, String>, Tuple2<String, Long>> {
/** The state that is maintained by this process function */
private ValueState<CountWithTimestamp> state;
@Override
public void open(Configuration parameters) throws Exception {
state = getRuntimeContext().getState(new ValueStateDescriptor<>("myState", CountWithTimestamp.class));
}
@Override
public void processElement(Tuple2<String, Long> value, Context ctx, Collector<Tuple2<String, Long>> out)
throws Exception {
// retrieve the current count
CountWithTimestamp current = state.value();
if (current == null) {
current = new CountWithTimestamp();
current.key = value.f0;
}
// update the state's count
current.count++;
// set the state's timestamp to the record's assigned event time timestamp
current.lastModified = ctx.timestamp();
// write the state back
state.update(current);
// schedule the next timer 60 seconds from the current event time
ctx.timerService().registerEventTimeTimer(current.timestamp + 60000);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, Long>> out)
throws Exception {
// get the state for the key that scheduled the timer
CountWithTimestamp result = state.value();
// check if this is an outdated timer or the latest timer
if (timestamp == result.lastModified) {
// emit the state
out.collect(new Tuple2<String, Long>(result.key, result.count));
}
}
}
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.ProcessFunction.Context;
import org.apache.flink.streaming.api.functions.ProcessFunction.OnTimerContext;
import org.apache.flink.util.Collector;
// the source data stream
DataStream<Tuple2<String, String>> stream = ...;
// apply the process function onto a keyed stream
DataStream<Tuple2<String, Long>> result = stream
.keyBy(0)
.process(new CountWithTimeoutFunction());
/**
* The data type stored in the state
*/
case class CountWithTimestamp(key: String, count: Long, lastModified: Long)
/**
* The implementation of the ProcessFunction that maintains the count and timeouts
*/
class TimeoutStateFunction extends ProcessFunction[(String, Long), (String, Long)] {
/** The state that is maintained by this process function */
lazy val state: ValueState[CountWithTimestamp] = getRuntimeContext()
.getState(new ValueStateDescriptor<>("myState", clasOf[CountWithTimestamp]))
override def processElement(value: (String, Long), ctx: Context, out: Collector[(String, Long)]): Unit = {
// initialize or retrieve/update the state
val current: CountWithTimestamp = state.value match {
case null =>
CountWithTimestamp(key, 1, ctx.timestamp)
case CountWithTimestamp(key, count, time) =>
CountWithTimestamp(key, count + 1, ctx.timestamp)
}
// write the state back
state.update(current)
// schedule the next timer 60 seconds from the current event time
ctx.timerService.registerEventTimeTimer(current.timestamp + 60000)
}
override def onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[(String, Long)]): Unit = {
state.value match {
case CountWithTimestamp(key, count, lastModified) if (lastModified == timestamp) =>
out.collect((key, count))
case _ =>
}
}
}