処理関数 (低レベル操作)

ProcessFunction

ProcessFunction は全て(非循環式の)ストリーミングアプリケーションのブロックの基本的な構築にアクセスすることができる、低レベルストリーム処理操作です。

  • イベント (ストリーム要素)
  • 状態 (耐障害性、一貫性、キー付けされたストリーム上のみ)
  • タイマー (イベント時間および処理時間。キー付けされたストリーム上のみ)

ProcessFunction はキー付けされた状態とタイマーへのアクセスを持つFlatMapFunctionのようなものだと考えることができます。入力ストリーム(s)の中で受信した各イベントごとに起動されることでイベントを処理します。

耐障害性の状態のtまえに、ProcessFunction はFlinkのキー付けされた状態へのアクセスを与え、他のstateful関数がキー付けされた状態にアクセスできるようにRuntimeContextにアクセス可能です。

タイマーによりアプリケーションは処理時間およびイベント時間での変化に反応することができます。関数processElement(...)への各呼び出しは、要素のイベント時間のタイムスタンプとTimerServiceへのアクセスを与えるContextオブジェクトを取得します。TimerService は将来のイベント/処理時間のインスタンのためのコールバックを登録するために使うことができます。タイマーの特定の時間になると、onTimer(...) メソッドが呼ばれます。During that call, all states are again scoped to the key with which the timer was created, allowing timers to manipulate keyed state.

注意 キー付けされた状態およびタイマーへアクセスしたい場合、キー付けされたストリーム上へProcessFunctionを適用する必要があります:

stream.keyBy(...).process(new MyProcessFunction())

低レベルのJoin

2つの入力上に低レベルのオペレーションを実現するために、申請はCoProcessFunctionを使うことができます。この関数は2つの異なる入力に制限され、2つの異なる入力空のレコードのためにprocessElement1(...) およびprocessElement2(...)への個々の呼び出しを得ます。

低レベルのjoinの実装は一般的にこのパターンに従います:

  • 1つの入力(あるいは両方)のための状態のオブジェクトを生成
  • 入力から要素を受け取る時に状態を更新
  • 他の入力から要素を受け取る時に状態を調査し、joinされた結果を生成

例えば、顧客データについての状態を維持しながら、顧客データを財務売買にjoinするかもしれません。順番がばらばらなイベントにも関わらず完全で決定論的なjoinを持つことに関心がある場合、顧客データのストリームのためのウォーターマークが売買の時間を過ぎた時に売買のためのjoinを評価および発行するためにタイマーを使うことができます。

以下の例はキー毎のカウントを維持し、キーのための更新無しに、1分すぎた(イベント時間)時にいつでもキー/カウントのペアを発行します。

  • カウント、キーおよび最後に修正されたタイムスタンプはValueStateに格納されます。これは暗黙のうちにキーによって範囲を付けられます。
  • 各レコードのために、ProcessFunctionはカウンターを増やし、最後に修正されたタイムスタンプを設定します。
  • The function also schedules a callback one minute into the future (in event time)
  • 各コールバック時に、それは格納されたカウントの最後に修正された時間に対するコールバックのイベント時間のタイムスタンプをチェックし、それらが合致した時にキー/カウントを発行します (つまりその分内に更新がもう発生しない)。

注意 この単純な例はセッションウィンドウを使って実装されているかもしれません。それが提供する基本的なパターンを悦明するために、ここではProcessFunctionを使います。

import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.ProcessFunction.Context;
import org.apache.flink.streaming.api.functions.ProcessFunction.OnTimerContext;
import org.apache.flink.util.Collector;


// the source data stream
DataStream<Tuple2<String, String>> stream = ...;

// apply the process function onto a keyed stream
DataStream<Tuple2<String, Long>> result = stream
    .keyBy(0)
    .process(new CountWithTimeoutFunction());

/**
 * The data type stored in the state
 */
public class CountWithTimestamp {

    public String key;
    public long count;
    public long lastModified;
}

/**
 * The implementation of the ProcessFunction that maintains the count and timeouts
 */
public class CountWithTimeoutFunction extends ProcessFunction<Tuple2<String, String>, Tuple2<String, Long>> {

    /** The state that is maintained by this process function */
    private ValueState<CountWithTimestamp> state;

    @Override
    public void open(Configuration parameters) throws Exception {
        state = getRuntimeContext().getState(new ValueStateDescriptor<>("myState", CountWithTimestamp.class));
    }

    @Override
    public void processElement(Tuple2<String, String> value, Context ctx, Collector<Tuple2<String, Long>> out)
            throws Exception {

        // retrieve the current count
        CountWithTimestamp current = state.value();
        if (current == null) {
            current = new CountWithTimestamp();
            current.key = value.f0;
        }

        // update the state's count
        current.count++;

        // set the state's timestamp to the record's assigned event time timestamp
        current.lastModified = ctx.timestamp();

        // write the state back
        state.update(current);

        // schedule the next timer 60 seconds from the current event time
        ctx.timerService().registerEventTimeTimer(current.lastModified + 60000);
    }

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, Long>> out)
            throws Exception {

        // get the state for the key that scheduled the timer
        CountWithTimestamp result = state.value();

        // check if this is an outdated timer or the latest timer
        if (timestamp == result.lastModified + 60000) {
            // emit the state on timeout
            out.collect(new Tuple2<String, Long>(result.key, result.count));
        }
    }
}
import org.apache.flink.api.common.state.ValueState
import org.apache.flink.api.common.state.ValueStateDescriptor
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.functions.ProcessFunction.Context
import org.apache.flink.streaming.api.functions.ProcessFunction.OnTimerContext
import org.apache.flink.util.Collector

// the source data stream
val stream: DataStream[Tuple2[String, String]] = ...

// apply the process function onto a keyed stream
val result: DataStream[Tuple2[String, Long]] = stream
  .keyBy(0)
  .process(new CountWithTimeoutFunction())

/**
  * The data type stored in the state
  */
case class CountWithTimestamp(key: String, count: Long, lastModified: Long)

/**
  * The implementation of the ProcessFunction that maintains the count and timeouts
  */
class CountWithTimeoutFunction extends ProcessFunction[(String, String), (String, Long)] {

  /** The state that is maintained by this process function */
  lazy val state: ValueState[CountWithTimestamp] = getRuntimeContext
    .getState(new ValueStateDescriptor[CountWithTimestamp]("myState", classOf[CountWithTimestamp]))


  override def processElement(value: (String, String), ctx: Context, out: Collector[(String, Long)]): Unit = {
    // initialize or retrieve/update the state

    val current: CountWithTimestamp = state.value match {
      case null =>
        CountWithTimestamp(value._1, 1, ctx.timestamp)
      case CountWithTimestamp(key, count, lastModified) =>
        CountWithTimestamp(key, count + 1, ctx.timestamp)
    }

    // write the state back
    state.update(current)

    // schedule the next timer 60 seconds from the current event time
    ctx.timerService.registerEventTimeTimer(current.lastModified + 60000)
  }

  override def onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[(String, Long)]): Unit = {
    state.value match {
      case CountWithTimestamp(key, count, lastModified) if (timestamp == lastModified + 60000) =>
        out.collect((key, count))
      case _ =>
    }
  }
}

上に戻る

注意: Flink 1.4.0より前は、処理時間のタイマーから呼ばれた時に、ProcessFunction.onTimer() メソッドはイベント時間のタイムスタンプとして現在の処理時間を設定します。この居小津はとても繊細で、ユーザによって気づかれないかもしれません。まあ、それは処理時間のタイムスタンプが非決定論的でウォーターマークを使って並べられていないため、有害です。その上、この間違ったタイムスタンプに依存するユーザによって実装されたロジックは、おそらく意図せず不完全です。ですので、私たちはそれを修正することに決めました。1.4.0にアップグレードする時に、この間違ったイベント時間のタイムスタンプを使ったFlinkのジョブは失敗するでしょう。ユーザはそれらのジョブを正しいロジックに適用する必要があります。

TOP
inserted by FC2 system