side出力

DataStream 操作からの結果のメインのストリームに加えて、任意の数のside出力結果ストリームを生成することもできます。結果ストリーム内のデータの型はメインストリーム内のデータの型と一致する必要は無く、異なるside出力の型は異なるかもしれません。通常はストリームをリプリケートし各ストリームから不要なデータをフィルタアウトする必要があるストリームデータを分割したい時は、この操作は有用かもしれません。

side出力を使う場合は、まずside出力ストリームを識別するために使われるだろうOutputTag を定義する必要があります:

// this needs to be an anonymous inner class, so that we can analyze the type
OutputTag<String> outputTag = new OutputTag<String>("side-output") {};
val outputTag = OutputTag[String]("side-output")

side出力ストリームが含む要素の型に応じてどのようにOutputTagが型付けされるかに注意してください。

side出力へのデータの発行は以下の関数から可能です:

データをOutputTagを使って識別されるside出力に発行するために、上の関数内でユーザに公開されるContext パラメータを使うことができます。以下はProcessFunctionからside出力データを発行する例です:

DataStream<Integer> input = ...;

final OutputTag<String> outputTag = new OutputTag<String>("side-output"){};

SingleOutputStreamOperator<Integer> mainDataStream = input
  .process(new ProcessFunction<Integer, Integer>() {

      @Override
      public void processElement(
          Integer value,
          Context ctx,
          Collector<Integer> out) throws Exception {
        // emit data to regular output
        out.collect(value);

        // emit data to side output
        ctx.output(outputTag, "sideout-" + String.valueOf(value));
      }
    });
val input: DataStream[Int] = ...
val outputTag = OutputTag[String]("side-output")

val mainDataStream = input
  .process(new ProcessFunction[Int, Int] {
    override def processElement(
        value: Int,
        ctx: ProcessFunction[Int, Int]#Context,
        out: Collector[Int]): Unit = {
      // emit data to regular output
      out.collect(value)

      // emit data to side output
      ctx.output(outputTag, "sideout-" + String.valueOf(value))
    }
  })

side出力ストリームを扱うには、DataStream操作の結果にgetSideOutput(OutputTag)を使います。これはside出力ストリームの結果に型付けされたDataStreamを与えるでしょう:

final OutputTag<String> outputTag = new OutputTag<String>("side-output"){};

SingleOutputStreamOperator<Integer> mainDataStream = ...;

DataStream<String> sideOutputStream = mainDataStream.getSideOutput(outputTag);
val outputTag = OutputTag[String]("side-output")

val mainDataStream = ...

val sideOutputStream: DataStream[String] = mainDataStream.getSideOutput(outputTag)

上に戻る

TOP
inserted by FC2 system