This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.
Side Outputs
side出力 #
DataStream
操作からの結果のメインのストリームに加えて、任意の数のside出力結果ストリームを生成することもできます。結果ストリーム内のデータの型はメインストリーム内のデータの型と一致する必要は無く、異なるside出力の型は異なるかもしれません。通常はストリームをリプリケートし各ストリームから不要なデータをフィルタアウトする必要があるストリームデータを分割したい時は、この操作は有用かもしれません。
side出力を使う場合は、最初にside出力ストリームを識別するために使われるだろうOutputTag
を定義する必要があります:
// this needs to be an anonymous inner class, so that we can analyze the type
OutputTag<String> outputTag = new OutputTag<String>("side-output") {};
val outputTag = OutputTag[String]("side-output")
output_tag = OutputTag("side-output", Types.STRING())
side出力ストリームが含む要素の型に応じてどのようにOutputTag
が型付けされるかに注意してください。
side出力へのデータの発行は以下の関数から可能です:
- ProcessFunction
- KeyedProcessFunction
- CoProcessFunction
- KeyedCoProcessFunction
- ProcessWindowFunction
- ProcessAllWindowFunction
データをOutputTag
を使って識別されるside出力に発行するために、上の関数内でユーザに公開されるContext
パラメータを使うことができます。以下はProcessFunction
からside出力データを発行する例です:
DataStream<Integer> input = ...;
final OutputTag<String> outputTag = new OutputTag<String>("side-output"){};
SingleOutputStreamOperator<Integer> mainDataStream = input
.process(new ProcessFunction<Integer, Integer>() {
@Override
public void processElement(
Integer value,
Context ctx,
Collector<Integer> out) throws Exception {
// emit data to regular output
out.collect(value);
// emit data to side output
ctx.output(outputTag, "sideout-" + String.valueOf(value));
}
});
val input: DataStream[Int] = ...
val outputTag = OutputTag[String]("side-output")
val mainDataStream = input
.process(new ProcessFunction[Int, Int] {
override def processElement(
value: Int,
ctx: ProcessFunction[Int, Int]#Context,
out: Collector[Int]): Unit = {
// emit data to regular output
out.collect(value)
// emit data to side output
ctx.output(outputTag, "sideout-" + String.valueOf(value))
}
})
input = ... # type: DataStream
output_tag = OutputTag("side-output", Types.STRING())
class MyProcessFunction(ProcessFunction):
def process_element(self, value: int, ctx: ProcessFunction.Context):
# emit data to regular output
yield value
# emit data to side output
yield output_tag, "sideout-" + str(value)
main_data_stream = input \
.process(MyProcessFunction(), Types.INT())
side出力ストリームを扱うには、DataStream
操作の結果にgetSideOutput(OutputTag)
を使います。これにより、サイド出力ストリームの結果に型付けされたDataStream
が得られます:
final OutputTag<String> outputTag = new OutputTag<String>("side-output"){};
SingleOutputStreamOperator<Integer> mainDataStream = ...;
DataStream<String> sideOutputStream = mainDataStream.getSideOutput(outputTag);
val outputTag = OutputTag[String]("side-output")
val mainDataStream = ...
val sideOutputStream: DataStream[String] = mainDataStream.getSideOutput(outputTag)
output_tag = OutputTag("side-output", Types.STRING())
main_data_stream = ... # type: DataStream
side_output_stream = main_data_stream.get_side_output(output_tag) # type: DataStream
注意 サイド出力を生成する場合、get_side_output(OutputTag)
をPython APIで呼び出す必要があります。そうしなければ、サイド出力ストリームの結果がメインストリームに予期せず出力され、データ型が異なる場合に失敗する可能性があります。