Side Outputs
This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.

side出力 #

DataStream操作からの結果のメインのストリームに加えて、任意の数のside出力結果ストリームを生成することもできます。結果ストリーム内のデータの型はメインストリーム内のデータの型と一致する必要は無く、異なるside出力の型は異なるかもしれません。通常はストリームをリプリケートし各ストリームから不要なデータをフィルタアウトする必要があるストリームデータを分割したい時は、この操作は有用かもしれません。

side出力を使う場合は、最初にside出力ストリームを識別するために使われるだろうOutputTagを定義する必要があります:

// this needs to be an anonymous inner class, so that we can analyze the type
OutputTag<String> outputTag = new OutputTag<String>("side-output") {};
val outputTag = OutputTag[String]("side-output")
output_tag = OutputTag("side-output", Types.STRING())

side出力ストリームが含む要素の型に応じてどのようにOutputTagが型付けされるかに注意してください。

side出力へのデータの発行は以下の関数から可能です:

データをOutputTagを使って識別されるside出力に発行するために、上の関数内でユーザに公開されるContextパラメータを使うことができます。以下はProcessFunctionからside出力データを発行する例です:

DataStream<Integer> input = ...;

final OutputTag<String> outputTag = new OutputTag<String>("side-output"){};

SingleOutputStreamOperator<Integer> mainDataStream = input
  .process(new ProcessFunction<Integer, Integer>() {

      @Override
      public void processElement(
          Integer value,
          Context ctx,
          Collector<Integer> out) throws Exception {
        // emit data to regular output
        out.collect(value);

        // emit data to side output
        ctx.output(outputTag, "sideout-" + String.valueOf(value));
      }
    });
val input: DataStream[Int] = ...
val outputTag = OutputTag[String]("side-output")

val mainDataStream = input
  .process(new ProcessFunction[Int, Int] {
    override def processElement(
        value: Int,
        ctx: ProcessFunction[Int, Int]#Context,
        out: Collector[Int]): Unit = {
      // emit data to regular output
      out.collect(value)

      // emit data to side output
      ctx.output(outputTag, "sideout-" + String.valueOf(value))
    }
  })
input = ...  # type: DataStream
output_tag = OutputTag("side-output", Types.STRING())

class MyProcessFunction(ProcessFunction):

    def process_element(self, value: int, ctx: ProcessFunction.Context):
        # emit data to regular output
        yield value

        # emit data to side output
        yield output_tag, "sideout-" + str(value)


main_data_stream = input \
    .process(MyProcessFunction(), Types.INT())

side出力ストリームを扱うには、DataStream操作の結果にgetSideOutput(OutputTag)を使います。これにより、サイド出力ストリームの結果に型付けされたDataStreamが得られます:

final OutputTag<String> outputTag = new OutputTag<String>("side-output"){};

SingleOutputStreamOperator<Integer> mainDataStream = ...;

DataStream<String> sideOutputStream = mainDataStream.getSideOutput(outputTag);
val outputTag = OutputTag[String]("side-output")

val mainDataStream = ...

val sideOutputStream: DataStream[String] = mainDataStream.getSideOutput(outputTag)
output_tag = OutputTag("side-output", Types.STRING())

main_data_stream = ...  # type: DataStream

side_output_stream = main_data_stream.get_side_output(output_tag)  # type: DataStream

注意 サイド出力を生成する場合、get_side_output(OutputTag)をPython APIで呼び出す必要があります。そうしなければ、サイド出力ストリームの結果がメインストリームに予期せず出力され、データ型が異なる場合に失敗する可能性があります。

Back to top

inserted by FC2 system