DataStream
操作からの結果のメインのストリームに加えて、任意の数のside出力結果ストリームを生成することもできます。結果ストリーム内のデータの型はメインストリーム内のデータの型と一致する必要は無く、異なるside出力の型は異なるかもしれません。通常はストリームをリプリケートし各ストリームから不要なデータをフィルタアウトする必要があるストリームデータを分割したい時は、この操作は有用かもしれません。
side出力を使う場合は、まずside出力ストリームを識別するために使われるだろうOutputTag
を定義する必要があります:
// this needs to be an anonymous inner class, so that we can analyze the type
OutputTag<String> outputTag = new OutputTag<String>("side-output") {};
val outputTag = OutputTag[String]("side-output")
side出力ストリームが含む要素の型に応じてどのようにOutputTag
が型付けされるかに注意してください。
side出力へのデータの発行は以下の関数から可能です:
データをOutputTag
を使って識別されるside出力に発行するために、上の関数内でユーザに公開されるContext
パラメータを使うことができます。以下はProcessFunction
からside出力データを発行する例です:
DataStream<Integer> input = ...;
final OutputTag<String> outputTag = new OutputTag<String>("side-output"){};
SingleOutputStreamOperator<Integer> mainDataStream = input
.process(new ProcessFunction<Integer, Integer>() {
@Override
public void processElement(
Integer value,
Context ctx,
Collector<Integer> out) throws Exception {
// emit data to regular output
out.collect(value);
// emit data to side output
ctx.output(outputTag, "sideout-" + String.valueOf(value));
}
});
val input: DataStream[Int] = ...
val outputTag = OutputTag[String]("side-output")
val mainDataStream = input
.process(new ProcessFunction[Int, Int] {
override def processElement(
value: Int,
ctx: ProcessFunction[Int, Int]#Context,
out: Collector[Int]): Unit = {
// emit data to regular output
out.collect(value)
// emit data to side output
ctx.output(outputTag, "sideout-" + String.valueOf(value))
}
})
side出力ストリームを扱うには、DataStream
操作の結果にgetSideOutput(OutputTag)
を使います。これはside出力ストリームの結果に型付けされたDataStream
を与えるでしょう:
final OutputTag<String> outputTag = new OutputTag<String>("side-output"){};
SingleOutputStreamOperator<Integer> mainDataStream = ...;
DataStream<String> sideOutputStream = mainDataStream.getSideOutput(outputTag);
val outputTag = OutputTag[String]("side-output")
val mainDataStream = ...
val sideOutputStream: DataStream[String] = mainDataStream.getSideOutput(outputTag)