Joining
This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.

結合 #

ウィンドウ結合 #

ウィンドウ結合は、胸中のキーを共有し、同じウィンドウ内にある2つのストリームの要素を結合します。これらのウィンドウはwindow assignerを使って定義でき、両方のストリームの要素で評価されます。

次に、両側の要素がユーザ定義のJoinFunctionまたはFlatJoinFunctionに渡され、そこでユーザは結合条件を満たす結果を出力できます:

一般的な使い方は以下のように要約できます:

stream.join(otherStream)
    .where(<KeySelector>)
    .equalTo(<KeySelector>)
    .window(<WindowAssigner>)
    .apply(<JoinFunction>);

セマンティクスの関するいくつかの注意:

  • 2つのストリームの要素のペアごとの組み合わせの作成は、内部結合のように動作します。つまり、結合するもう一方のストリームの対応する要素がない場合、一方のストリームの要素は出力されません。
  • 結合される要素は、それぞれのウィンドウ内にまだ存在する最大のタイムスタンプをタイムスタンプとして持ちます。例えば、境界として[5, 10)を持つウィンドウでは、結合された要素のタイムスタンプが9になります。

次のセクションでは、幾つかの例示的なシナリオを使って、様々な種類のウィンドウ結合がどのように動作するかについて概要を説明します。

タンブリングウィンドウ結合 #

タンブリングウィンドウ結合を実行する場合、共通のキーと共通のタンブリングウィンドウを持つ全ての要素がペアごとの組み合わせとして結合され、JoinFunctionまたはFlatJoinFunctionに渡されます。これは内部結合のように動作するため、タンブリングウィンドウ内にもう一方のストリームの要素を持たないストリームの要素は発行されません!

図に示すように、2ミリ秒のサイズでタンブリングウィンドウを定義します。その結果[0,1], [2,3], ...という形式のウィンドウが生成されます。この画像は、JoinFunctionに渡される各ウィンドウ内の全ての要素のペアごとの組み合わせを示しています。タンブリングウィンドウ[6,7]では、オレンジ色の要素⑥と⑦と結合する要素が緑色のストリームに存在しないため、内も出力されないことに注意してください。

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
 
...

DataStream<Integer> orangeStream = ...;
DataStream<Integer> greenStream = ...;

orangeStream.join(greenStream)
    .where(<KeySelector>)
    .equalTo(<KeySelector>)
    .window(TumblingEventTimeWindows.of(Time.milliseconds(2)))
    .apply (new JoinFunction<Integer, Integer, String> (){
        @Override
        public String join(Integer first, Integer second) {
            return first + "," + second;
        }
    });
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time

...

val orangeStream: DataStream[Integer] = ...
val greenStream: DataStream[Integer] = ...

orangeStream.join(greenStream)
    .where(elem => /* select key */)
    .equalTo(elem => /* select key */)
    .window(TumblingEventTimeWindows.of(Time.milliseconds(2)))
    .apply { (e1, e2) => e1 + "," + e2 }

スライディングウィンドウ結合 #

スライディングウィンドウ結合を実行する場合、共通のキーと共通のスライディングウィンドウを持つ全ての要素がペアごとの組み合わせとして結合され、JoinFunctionまたはFlatJoinFunctionに渡されます。現在のスライディングウィンドウ内で、一方のストリームの要素が他方のストリームの要素を持たないものは、出力されません。一部の要素は1つのスライディングウィンドウでは結合されていますが、別のスライディングウィンドウでは結合されていない場合があることに注意してください。

この例では、2ミリ秒のサイズのスライディングウィンドウを使い、それを1ミリ秒ずつスライドさせます。その結果、スライディングウィンドウ[-1, 0],[0,1],[1,2],[2,3], …になります。x軸の下の結合要素は、各スライディングウィンドウのJoinFunctionに渡される要素です。ここでは、例えばウィンドウ[2,3]でオレンジ色の②が緑色の③とどのように結合されているかも分かります。ただし、ウィンドウ[1,2]内の何も結合されません。

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

...

DataStream<Integer> orangeStream = ...;
DataStream<Integer> greenStream = ...;

orangeStream.join(greenStream)
    .where(<KeySelector>)
    .equalTo(<KeySelector>)
    .window(SlidingEventTimeWindows.of(Time.milliseconds(2) /* size */, Time.milliseconds(1) /* slide */))
    .apply (new JoinFunction<Integer, Integer, String> (){
        @Override
        public String join(Integer first, Integer second) {
            return first + "," + second;
        }
    });
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time

...

val orangeStream: DataStream[Integer] = ...
val greenStream: DataStream[Integer] = ...

orangeStream.join(greenStream)
    .where(elem => /* select key */)
    .equalTo(elem => /* select key */)
    .window(SlidingEventTimeWindows.of(Time.milliseconds(2) /* size */, Time.milliseconds(1) /* slide */))
    .apply { (e1, e2) => e1 + "," + e2 }

セッションウィンドウ結合 #

セッションウィンドウ結合を実行する場合、_“combined”_がセッション基準を満たす同じキーを持つ全ての要素が結合され、JoinFunctionまたはFlatJoinFunctionに渡されます。これも内部結合を実行するため、1つのストリームの要素のみを含むセッションウィンドウがある場合、何も出力されません!

ここでは、各セッションが少なくとも1ミリ秒のギャップで分割されるセッションウィンドウ結合を定義します。セッションは3つあり、最初の2つのセッションでは、両方のストリームから結合された要素がJoinFunctionに渡されます。3番目のセッションでは、緑のストリームに要素がないため、⑧と⑨は結合されません!

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
 
...

DataStream<Integer> orangeStream = ...;
DataStream<Integer> greenStream = ...;

orangeStream.join(greenStream)
    .where(<KeySelector>)
    .equalTo(<KeySelector>)
    .window(EventTimeSessionWindows.withGap(Time.milliseconds(1)))
    .apply (new JoinFunction<Integer, Integer, String> (){
        @Override
        public String join(Integer first, Integer second) {
            return first + "," + second;
        }
    });
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows
import org.apache.flink.streaming.api.windowing.time.Time

...

val orangeStream: DataStream[Integer] = ...
val greenStream: DataStream[Integer] = ...

orangeStream.join(greenStream)
    .where(elem => /* select key */)
    .equalTo(elem => /* select key */)
    .window(EventTimeSessionWindows.withGap(Time.milliseconds(1)))
    .apply { (e1, e2) => e1 + "," + e2 }

インターバル結合 #

インターバル結合は2つのストリーム(ここではAとBと呼びます)の要素を共通のキーで結合します。ストリームBの要素には、ストリームAの要素のタイムスタンプと相対的な時間間隔にあるタイムスタンプがあります。

これは、より形式的に次のように表現することもできます b.timestamp ∈ [a.timestamp + lowerBound; a.timestamp + upperBound]あるいは a.timestamp + lowerBound <= b.timestamp <= a.timestamp + upperBound

ここで、aとbは共通キーを共有するAとBの要素です。下限が常に上限より小さいか等しい限り、下限と上限は両方とも負または正のいずれかになります。現在、インターバル結合は内部結合のみを実行します。

要素のペアがProcessJoinFunctionに渡されると、2つの要素のうち大きいほうのタイムスタンプ(ProcessJoinFunction.Context経由でアクセスできます)が割り合てられます。

現在、インターバル結合はイベント時間のみをサポートします。

上の例では、2つのストリーム’orange’と’green’を、下限が-2ミリ秒、上限を+1ミリ秒で結合します。デフォルトでは、これらの境界は包括的ですが、.lowerBoundExclusive().upperBoundExclusive()を適用して動作を変更できます。

もう一度、より正式な表記法を使うと、三角形で示されているように

orangeElem.ts + lowerBound <= greenElem.ts <= orangeElem.ts + upperBound

変換されます。

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;

...

DataStream<Integer> orangeStream = ...;
DataStream<Integer> greenStream = ...;

orangeStream
    .keyBy(<KeySelector>)
    .intervalJoin(greenStream.keyBy(<KeySelector>))
    .between(Time.milliseconds(-2), Time.milliseconds(1))
    .process (new ProcessJoinFunction<Integer, Integer, String>(){

        @Override
        public void processElement(Integer left, Integer right, Context ctx, Collector<String> out) {
            out.collect(left + "," + right);
        }
    });
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction
import org.apache.flink.streaming.api.windowing.time.Time

...

val orangeStream: DataStream[Integer] = ...
val greenStream: DataStream[Integer] = ...

orangeStream
    .keyBy(elem => /* select key */)
    .intervalJoin(greenStream.keyBy(elem => /* select key */))
    .between(Time.milliseconds(-2), Time.milliseconds(1))
    .process(new ProcessJoinFunction[Integer, Integer, String] {
        override def processElement(left: Integer, right: Integer, ctx: ProcessJoinFunction[Integer, Integer, String]#Context, out: Collector[String]): Unit = {
            out.collect(left + "," + right)
        }
    })

Back to top

inserted by FC2 system