FlinkCEP - Flinkのための複雑なイベント処理

FlinkCEP はFlink上に実装された Complex Event Processing (CEP) ライブラリです。データ内で何が重要なのかを理解する機会を与えながら、イベントの終わりの無いストリーム内でのイベントのパターンを検出することができます。

このページはFlink CEP内で利用可能なAPIの呼び出しを説明します。イベントのシーケンスを合致する時に検知および振舞うを紹介する前に、パターン APIを皮切りに紹介します。これによりストリーム内で検知したいパターンを指定することができます。それから、CEPライブラリがイベント時間で遅延を扱う時の仮定と、古いFlinkバージョンからFlink-1.3へどうやってジョブを移設するかを紹介します。

開始

すぐに飛び込みたい場合は、Flink プログラムをセットアップし、FlinkCEP の依存をプロジェクトのpom.xmlに追加します。

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-cep_2.11</artifactId>
  <version>1.5-SNAPSHOT</version>
</dependency>
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-cep-scala_2.11</artifactId>
  <version>1.5-SNAPSHOT</version>
</dependency>

情報 FlinkCEP はバイナリの配布物の一部ではありません。クラスタ実行のためにそれとどうやってリンクするかをここで見ます。

これで、パターンAPIを使って初めてのCEPプログラムを書き始めることができます。

注意 FlinkCEPはイベントの比較とマッチングに使うため、パターンマッチングを適用したいDataStream内のイベントは適切なequals()hashCode() メソッドを実装する必要があります。

DataStream<Event> input = ...

Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(
        new SimpleCondition<Event>() {
            @Override
            public boolean filter(Event event) {
                return event.getId() == 42;
            }
        }
    ).next("middle").subtype(SubEvent.class).where(
        new SimpleCondition<Event>() {
            @Override
            public boolean filter(SubEvent subEvent) {
                return subEvent.getVolume() >= 10.0;
            }
        }
    ).followedBy("end").where(
         new SimpleCondition<Event>() {
            @Override
            public boolean filter(Event event) {
                return event.getName().equals("end");
            }
         }
    );

PatternStream<Event> patternStream = CEP.pattern(input, pattern);

DataStream<Alert> result = patternStream.select(
    new PatternSelectFunction<Event, Alert> {
        @Override
        public Alert select(Map<String, List<Event>> pattern) throws Exception {
            return createAlertFrom(pattern);
        }
    }
});
val input: DataStream[Event] = ...

val pattern = Pattern.begin("start").where(_.getId == 42)
  .next("middle").subtype(classOf[SubEvent]).where(_.getVolume >= 10.0)
  .followedBy("end").where(_.getName == "end")

val patternStream = CEP.pattern(input, pattern)

val result: DataStream[Alert] = patternStream.select(createAlert(_))

パターンAPI

パターンAPIにより入力ストリームから抽出したい複雑なパターンのシーケンスを定義することができます。

それぞれの複雑なパターンのシーケンスは複数の簡単なパターンから成ります。つまり、同じプロパティを持つ個々のイベントの探すパターン。これから、これらの簡単なパターンを patternsと呼び、私たちがストリーム内で探している最終の複雑なパターンのシーケンスをパターン シーケンスと呼びます。パターンシーケンスはそのようなパターンのグラフとして見えます。1つのパターンから次への移行はユーザ固有の条件に基づいて発生します。例えば、event.getName().equals("start")match は有効なパターン移行を使った全ての複雑なパターングラフのパターンを訪ねる入力イベントのシーケンスです。

注意 各パターンはユニークな名前を持たなければなりません。合致したイベントを識別するためにあとでそれを使います。

注意 パターンの名前は文字":"を含むことができません

この章の残りでまず個々のパターンをどう定義するかを説明し、それから個々のパターンをどうやって複雑なパターンに組み合わせることができるかを説明します。

個々のパターン

パターンシングルトン あるいは ループ パターンのどちらかです。シングルトン パターンは 1つのイベントを受け付けますが、ループ パターンは1つ以上を受け付けることができます。パターン マッチングのシンボル内で、パターン "a b+ c?d" (あるいは "a"に続く1つ以上の "b"、任意で "c"が続き、"d"が続きます)、a, c? および d はシングルトンパターンですが、 b+ はループです。デフォルトでは、パターンはシングルトンパターンで、Quantifiersを使うことでそれをループに変換することができます。各パターンはイベントを受け付ける1つ以上の条件 を持つことができます。

数量詞

FlinkCEPの中で、これらのメソッドを使ってループパターンを指定することができます: pattern.oneOrMore()、指定されたイベントの1つ以上の出現を期待するパターン(例えば、以前述べられた b+); そして pattern.times(#ofTimes)、指定された意弁の型のある数の出現を期待するパターン、例えば、4 a’s; そして pattern.times(#fromTimes, #toTimes)、特定の最小数の出現と指定されたイベントの型の出現の最大数を期待するパターン、例えば 2-4 as。

pattern.greedy()メソッドを使って貪欲にループパターンを作成することができますが、まだグループパタンを貪欲に作ることはできません。ループの有る無しの全てのパターンを任意のpattern.optional() メソッドを使って作ることができます。

startという名前のパターンについては、以下は有効な数量詞です:

// expecting 4 occurrences
 start.times(4);

 // expecting 0 or 4 occurrences
 start.times(4).optional();

 // expecting 2, 3 or 4 occurrences
 start.times(2, 4);

 // expecting 2, 3 or 4 occurrences and repeating as many as possible
 start.times(2, 4).greedy();

 // expecting 0, 2, 3 or 4 occurrences
 start.times(2, 4).optional();

 // expecting 0, 2, 3 or 4 occurrences and repeating as many as possible
 start.times(2, 4).optional().greedy();

 // expecting 1 or more occurrences
 start.oneOrMore();

 // expecting 1 or more occurrences and repeating as many as possible
 start.oneOrMore().greedy();

 // expecting 0 or more occurrences
 start.oneOrMore().optional();

 // expecting 0 or more occurrences and repeating as many as possible
 start.oneOrMore().optional().greedy();

 // expecting 2 or more occurrences
 start.timesOrMore(2);

 // expecting 2 or more occurrences and repeating as many as possible
 start.timesOrMore(2).greedy();

 // expecting 0, 2 or more occurrences and repeating as many as possible
 start.timesOrMore(2).optional().greedy();
// expecting 4 occurrences
 start.times(4)

 // expecting 0 or 4 occurrences
 start.times(4).optional()

 // expecting 2, 3 or 4 occurrences
 start.times(2, 4);

 // expecting 2, 3 or 4 occurrences and repeating as many as possible
 start.times(2, 4).greedy();

 // expecting 0, 2, 3 or 4 occurrences
 start.times(2, 4).optional();

 // expecting 0, 2, 3 or 4 occurrences and repeating as many as possible
 start.times(2, 4).optional().greedy();

 // expecting 1 or more occurrences
 start.oneOrMore()

 // expecting 1 or more occurrences and repeating as many as possible
 start.oneOrMore().greedy();

 // expecting 0 or more occurrences
 start.oneOrMore().optional()

 // expecting 0 or more occurrences and repeating as many as possible
 start.oneOrMore().optional().greedy();

 // expecting 2 or more occurrences
 start.timesOrMore(2);

 // expecting 2 or more occurrences and repeating as many as possible
 start.timesOrMore(2).greedy();

 // expecting 0, 2 or more occurrences
 start.timesOrMore(2).optional();

 // expecting 0, 2 or more occurrences and repeating as many as possible
 start.timesOrMore(2).optional().greedy();

条件

各パターンにおいて、1つのパターンから次のパターンに変わるために、追加の 条件を設定することができます。これらの条件を以下に関係することができます:

  1. やってくるイベントのプロパティ、例えば、値が5より大きくなければならない、あるいは以前受け付けたイベントの平均値よりも大きくなければならない。

  2. 合致イベントの隣接、例えば、どの一致の間でも合致しないパターンa,b,c を検出する。

後者は“ループ” パターンを参照します、つまり 1つより多くのイベントを受け付けることができるパターン、例えばa b+ cの中のb+、これは1つ以上のbを検索します。

プロパティ上の条件

pattern.where(), pattern.or() あるいは pattern.until() メソッドを使ってイベントプロパティ上の条件を指定することができます。それらは IterativeCondition あるいは SimpleConditionのどちらかでしょう。

繰り返し条件: これは最も一般的な条件の型です。これは以前受け付けられたイベント、あるいはそれらの部分集合上の統計量のプロパティに基づく連続するイベントを受け付ける条件を指定する方法です。

以下は、もしパターンについて以前に受け付けられたイベントの価格の合計に加えて現在のイベントの価格が5.0の値を超えない場合に、名前が“foo”で始まる場合“middle”という名前のパターンのための次のイベントを受け付ける反復条件のためのコードです。反復条件は、特にループ パターンとの組み合わせの時に強力になります。例えば oneOrMore()

middle.oneOrMore().where(new IterativeCondition<SubEvent>() {
    @Override
    public boolean filter(SubEvent value, Context<SubEvent> ctx) throws Exception {
        if (!value.getName().startsWith("foo")) {
            return false;
        }

        double sum = value.getPrice();
        for (Event event : ctx.getEventsForPattern("middle")) {
            sum += event.getPrice();
        }
        return Double.compare(sum, 5.0) < 0;
    }
});
middle.oneOrMore().where(
    (value, ctx) => {
        lazy val sum = ctx.getEventsForPattern("middle").asScala.map(_.getPrice).sum
        value.getName.startsWith("foo") && sum + value.getPrice < 5.0
    }
)

注意 context.getEventsForPattern(...) の呼び出しは指定された可能な合致について以前に受け付けられたイベント全てを見つけます。この操作のコストは変化するかもしれないため、条件を実装する時はそれを最小限にするようにしてください。

単純な条件: この種類の条件は前述されたIterativeCondition クラスを拡張し、イベント自身のプロパティのみに基づいて、イベントを受け付けるかどうかを決定します。

start.where(new SimpleCondition<Event>() {
    @Override
    public boolean filter(Event value) {
        return value.getName().startsWith("foo");
    }
});
start.where(event => event.getName.startsWith("foo"))

最後に、受け付けられるイベントの型をpattern.subtype(subClass)メソッドを使って初期イベントタイプ(ここではEvent) の部分型に制限することもできます

start.subtype(SubEvent.class).where(new SimpleCondition<SubEvent>() {
    @Override
    public boolean filter(SubEvent value) {
        return ... // some condition
    }
});
start.subtype(classOf[SubEvent]).where(subEvent => ... /* some condition */)

組み合わせ条件: 上で示したように、subtype 条件を追加の条件と組み合わせることができます。これは各条件について維持されます。連続するwhere()呼び出しによって任意の条件を組み合わせることができます。最終的な結果は個々の条件の論理学的なAND になるでしょう。ORを使って条件を組み合わせるために、以下で示すようにor() メソッドを使うことができます。

pattern.where(new SimpleCondition<Event>() {
    @Override
    public boolean filter(Event value) {
        return ... // some condition
    }
}).or(new SimpleCondition<Event>() {
    @Override
    public boolean filter(Event value) {
        return ... // or condition
    }
});
pattern.where(event => ... /* some condition */).or(event => ... /* or condition */)

停止条件: ループパターン (oneOrMore() および oneOrMore().optional()) の場合、停止条件を指定することもできます。例えば、値の合憲が50より小さい間 5より大きい値を持つイベントを受け付ける。

より理解するには、以下の例を見てください。仮定

  • "(a+ until b)" ("b"まで1つ以上の "a")のようなパターン

  • やってくるイベントのシーケンス "a1" "c" "a2" "b" "a3"

  • ライブラリは以下の結果を出力するでしょう: {a1 a2} {a1} {a2} {a3}

見てわかるように、{a1 a2 a3} あるいは {a2 a3} は停止条件のために返されません。

隣接条件

FlinkCEP はイベント間の以下の隣接形式をサポートします:

  1. 厳密な隣接: 全ての合致するイベントが合致しないイベント無しに厳密に1つずつ現れることを期待する。

  2. 緩めの隣接: 合致するイベント間に現れる合致しないイベントを無視する。

  3. 非決定論的な緩めの隣接: いくつかの合致イベントを無視する追加の合致が可能な更に緩い隣接。

例を使って上を説明するために、入力"a1", "c", "a2", "b" を持つパターンのシーケンス "a+ b" (1つ以上の"a""b"が続く)は以下の結果を持つでしょう:

  1. 厳密な隣接: {a2 b}"a1"の後の "c""a1" を捨てます。

  2. 緩めの隣接: {a1 b}{a1 a2 b}c は無視されます。

  3. 非決定論的な緩めの隣接: {a1 b}, {a2 b} および {a1 a2 b}

ループ パターン (例えばoneOrMore()times()) については、デフォルトは 緩い隣接です。もし厳密な隣接を必要とする場合は、consecutive() 呼び出しを使って明示的にそれを指定する必要があります。もし非決定論的な緩い隣接 を必要とする場合はallowCombinations() 呼び出しを使うことができます。

注意 この章では、1つのループパターンの隣接について話しています。consecutive() および allowCombinations() の呼び出しはその文脈で理解される必要があります。Later when looking at Combining Patterns we’ll discuss other calls, such as next() and followedBy(), that are used to specify contiguity conditions between patterns.

パターンのオペレーション 解説
where(condition)

現在のパターンの条件を定義します。パターンを合致させるために、イベントは条件を満たす必要があります。複数の連続する where() 句はANDされる条件になります:

pattern.where(new IterativeCondition<Event>() {
    @Override
    public boolean filter(Event value, Context ctx) throws Exception {
        return ... // some condition
    }
});
or(condition)

既存の条件とORされる新しい条件を追加します。条件のうち少なくとも1つを通過する場合のみ、イベントはパターンと合致することができます:

pattern.where(new IterativeCondition<Event>() {
    @Override
    public boolean filter(Event value, Context ctx) throws Exception {
        return ... // some condition
    }
}).or(new IterativeCondition<Event>() {
    @Override
    public boolean filter(Event value, Context ctx) throws Exception {
        return ... // alternative condition
    }
});
until(condition)

ループパターンのための停止条件を指定します。指定された条件と一致するイベントが起きた時に、それ以上のイベントがパターンに受け付けられないことを意味します。

oneOrMore()と隣接する場合のみ適用可能です

注意: イベントベースの条件に対応するパターンのための状態の掃除を考慮します。

pattern.oneOrMore().until(new IterativeCondition<Event>() {
    @Override
    public boolean filter(Event value, Context ctx) throws Exception {
        return ... // alternative condition
    }
});
subtype(subClass)

現在のパターンのためのサブタイプの条件を定義します。このサブタイプのパターンの場合のみイベントが合致します:

pattern.subtype(SubEvent.class);
oneOrMore()

このパターンは少なくとも1つの合致するイベントの出現を期待することを指定します。

デフォルトでは緩い内部的な隣接(連続するイベント間)で使われます。内部的な隣接についての詳細な情報は一貫性を見てください。

注意: 状態の掃除を有効にするには until()within() のどちらかを使うことをお勧めします。

pattern.oneOrMore();
timesOrMore(#times)

このパターンは少なくとも#times回の合致するイベントの出現を期待することを指定します。

デフォルトでは緩い内部的な隣接(連続するイベント間)で使われます。内部的な隣接についての詳細な情報は一貫性を見てください。

pattern.timesOrMore(2);
times(#ofTimes)

このパターンは合致するイベントの出現が確実な数であることを期待します。

デフォルトでは緩い内部的な隣接(連続するイベント間)で使われます。内部的な隣接についての詳細な情報は一貫性を見てください。

pattern.times(2);
times(#fromTimes, #toTimes)

このパターンは 合致するイベントの#fromTimes#toTimesの間の出現を期待することを指定します。

デフォルトでは緩い内部的な隣接(連続するイベント間)で使われます。内部的な隣接についての詳細な情報は一貫性を見てください。

pattern.times(2, 4);
optional()

このパターンが任意であることを指定します。つまり、全く起こらないかもしれません。これは全ての前述した数量詞に適用可能です。

pattern.oneOrMore().optional();
greedy()

このパターンが貪欲であることを指定します。つまり、できる限り多く繰り返されるでしょう。これは数量詞にのみ敵よ可能で、今のところグループパターンをサポートしません。

pattern.oneOrMore().greedy();
consecutive()

oneOrMore()times() と協力して動作し、合致したイベント間の厳密な隣接を押し付けます。つまり、(next()のように)どのような合致しない要素でも一致を破壊します。

適用されない場合は(followedBy()のように)緩い隣接が使われます。

例. 以下のようなパターン:

Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
  @Override
  public boolean filter(Event value) throws Exception {
    return value.getName().equals("c");
  }
})
.followedBy("middle").where(new SimpleCondition<Event>() {
  @Override
  public boolean filter(Event value) throws Exception {
    return value.getName().equals("a");
  }
}).oneOrMore().consecutive()
.followedBy("end1").where(new SimpleCondition<Event>() {
  @Override
  public boolean filter(Event value) throws Exception {
    return value.getName().equals("b");
  }
});

入力シーケンス C D A1 A2 A3 D A4 B について、以下のような合致を生成するでしょう:

consecutive が適用される場合: {C A1 B}, {C A1 A2 B}, {C A1 A2 A3 B}

consecutive が適用されない場合: {C A1 B}, {C A1 A2 B}, {C A1 A2 A3 B}, {C A1 A2 A3 A4 B}

allowCombinations()

oneOrMore()times() と協力して動作し、合致したイベント間の(followedByAny()のような)非決定論的な緩い隣接を押し付けます。

適用されない場合は(followedBy()のように)緩い隣接が使われます。

例. 以下のようなパターン:

Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
  @Override
  public boolean filter(Event value) throws Exception {
    return value.getName().equals("c");
  }
})
.followedBy("middle").where(new SimpleCondition<Event>() {
  @Override
  public boolean filter(Event value) throws Exception {
    return value.getName().equals("a");
  }
}).oneOrMore().allowCombinations()
.followedBy("end1").where(new SimpleCondition<Event>() {
  @Override
  public boolean filter(Event value) throws Exception {
    return value.getName().equals("b");
  }
});

入力シーケンス C D A1 A2 A3 D A4 B について、以下のような合致を生成するでしょう:

combinations が有効な場合: {C A1 B}, {C A1 A2 B}, {C A1 A3 B}, {C A1 A4 B}, {C A1 A2 A3 B}, {C A1 A2 A4 B}, {C A1 A3 A4 B}, {C A1 A2 A3 A4 B}

combinations が無効な場合: {C A1 B}, {C A1 A2 B}, {C A1 A2 A3 B}, {C A1 A2 A3 A4 B}

パターンのオペレーション 解説
where(condition)

現在のパターンの条件を定義します。パターンを合致させるために、イベントは条件を満たす必要があります。複数の連続する where() 句はANDされる条件になります:

pattern.where(event => ... /* some condition */)
or(condition)

既存の条件とORされる新しい条件を追加します。条件のうち少なくとも1つを通過する場合のみ、イベントはパターンと合致することができます:

pattern.where(event => ... /* some condition */)
    .or(event => ... /* alternative condition */)
until(condition)

ループパターンのための停止条件を指定します。指定された条件と一致するイベントが起きた時に、それ以上のイベントがパターンに受け付けられないことを意味します。

oneOrMore()と隣接する場合のみ適用可能です

注意: イベントベースの条件に対応するパターンのための状態の掃除を考慮します。

pattern.oneOrMore().until(event => ... /* some condition */)
subtype(subClass)

現在のパターンのためのサブタイプの条件を定義します。このサブタイプのパターンの場合のみイベントが合致します:

pattern.subtype(classOf[SubEvent])
oneOrMore()

このパターンは少なくとも1つの合致するイベントの出現を期待することを指定します。

デフォルトでは緩い内部的な隣接(連続するイベント間)で使われます。内部的な隣接についての詳細な情報は一貫性を見てください。

注意: 状態の掃除を有効にするには until()within() のどちらかを使うことをお勧めします。

pattern.oneOrMore()
timesOrMore(#times)

このパターンは少なくとも#times回の合致するイベントの出現を期待することを指定します。

デフォルトでは緩い内部的な隣接(連続するイベント間)で使われます。内部的な隣接についての詳細な情報は一貫性を見てください。

pattern.timesOrMore(2)
times(#ofTimes)

このパターンは合致するイベントの出現が確実な数であることを期待します。

デフォルトでは緩い内部的な隣接(連続するイベント間)で使われます。内部的な隣接についての詳細な情報は一貫性を見てください。

pattern.times(2)
times(#fromTimes, #toTimes)

このパターンは 合致するイベントの#fromTimes#toTimesの間の出現を期待することを指定します。

デフォルトでは緩い内部的な隣接(連続するイベント間)で使われます。内部的な隣接についての詳細な情報は一貫性を見てください。

pattern.times(2, 4);
optional()

このパターンが任意であることを指定します。つまり、全く起こらないかもしれません。これは全ての前述した数量詞に適用可能です。

pattern.oneOrMore().optional()
greedy()

このパターンが貪欲であることを指定します。つまり、できる限り多く繰り返されるでしょう。これは数量詞にのみ敵よ可能で、今のところグループパターンをサポートしません。

pattern.oneOrMore().greedy()
consecutive()

oneOrMore()times() と協力して動作し、合致したイベント間の厳密な隣接を押し付けます。つまり、(next()のように)どのような合致しない要素でも一致を破壊します。

適用されない場合は(followedBy()のように)緩い隣接が使われます。

例. 以下のようなパターン:

Pattern.begin("start").where(_.getName().equals("c"))
  .followedBy("middle").where(_.getName().equals("a"))
                       .oneOrMore().consecutive()
  .followedBy("end1").where(_.getName().equals("b"));

入力シーケンス C D A1 A2 A3 D A4 B について、以下のような合致を生成するでしょう:

consecutive が適用される場合: {C A1 B}, {C A1 A2 B}, {C A1 A2 A3 B}

consecutive が適用されない場合: {C A1 B}, {C A1 A2 B}, {C A1 A2 A3 B}, {C A1 A2 A3 A4 B}

allowCombinations()

oneOrMore()times() と協力して動作し、合致したイベント間の(followedByAny()のような)非決定論的な緩い隣接を押し付けます。

適用されない場合は(followedBy()のように)緩い隣接が使われます。

例. 以下のようなパターン:

Pattern.begin("start").where(_.getName().equals("c"))
  .followedBy("middle").where(_.getName().equals("a"))
                       .oneOrMore().allowCombinations()
  .followedBy("end1").where(_.getName().equals("b"));

入力シーケンス C D A1 A2 A3 D A4 B について、以下のような合致を生成するでしょう:

combinations が有効な場合: {C A1 B}, {C A1 A2 B}, {C A1 A3 B}, {C A1 A4 B}, {C A1 A2 A3 B}, {C A1 A2 A4 B}, {C A1 A3 A4 B}, {C A1 A2 A3 A4 B}

combinations が無効な場合: {C A1 B}, {C A1 A2 B}, {C A1 A2 A3 B}, {C A1 A2 A3 A4 B}

パターンの組み合わせ

個々のパターンがどのように見えるかを見てきたので、それらを完全なパターンのシーケンスとして組み合わせる方法を見る時です。

パターンのシーケンスは以下で見るように初期パターンから始まる必要があります:

Pattern<Event, ?> start = Pattern.<Event>begin("start");
val start : Pattern[Event, _] = Pattern.begin("start")

次に、それらの間で望ましい隣接条件を指定することでパターンシーケンスにより多くのパターンを追加することができます。前の章で、Flinkでサポートされる異なる隣接モードを説明しました。すなわち strict, relaxed およびnon-deterministic relaxed、そしてループパターンの中でそれらを適用する方法を説明しました。連続するパターンの間でそれらを適用するには、以下を使うことができます:

  1. 厳密な隣接についてはnext()
  2. 緩い隣接についてはfollowedBy()
  3. 非決定論的な緩い隣接についてはfollowedByAny()

あるいは

  1. 他に直接続くイベント型を必要としない場合notNext()
  2. 他の2つのイベント型の間にあるイベント型を必要としない場合notFollowedBy()

注意 パターンのシーケンスはnotFollowedBy()で終わることはできません。

注意 NOT パターンは任意のパターンに先導されることはできません。

// strict contiguity
Pattern<Event, ?> strict = start.next("middle").where(...);

// relaxed contiguity
Pattern<Event, ?> relaxed = start.followedBy("middle").where(...);

// non-deterministic relaxed contiguity
Pattern<Event, ?> nonDetermin = start.followedByAny("middle").where(...);

// NOT pattern with strict contiguity
Pattern<Event, ?> strictNot = start.notNext("not").where(...);

// NOT pattern with relaxed contiguity
Pattern<Event, ?> relaxedNot = start.notFollowedBy("not").where(...);
// strict contiguity
val strict: Pattern[Event, _] = start.next("middle").where(...)

// relaxed contiguity
val relaxed: Pattern[Event, _] = start.followedBy("middle").where(...)

// non-deterministic relaxed contiguity
val nonDetermin: Pattern[Event, _] = start.followedByAny("middle").where(...)

// NOT pattern with strict contiguity
val strictNot: Pattern[Event, _] = start.notNext("not").where(...)

// NOT pattern with relaxed contiguity
val relaxedNot: Pattern[Event, _] = start.notFollowedBy("not").where(...)

緩い隣接は最初の連続する合致イベントだけが一致するだろうことを意味しますが、同じ開始について非決定論的な緩い隣接を使って複数の合致が発行されるだろうことを意味します。例として、イベントシーケンス"a", "c", "b1", "b2"を仮定した時、パターン a bは以下の結果を与えるでしょう:

  1. ab の厳密な隣接: {} (一致しない)、"a"の後の"c""a"を捨てさせます。

  2. abの緩い隣接: {a b1}、緩い隣接は “次の合致するものまで、一致しないイベントをスキップする”として見なされます。

  3. abの非決定論的な緩い隣接: {a b1}, {a b2}、これは最も一般的な形式です。

パターンが有効であるための一時的な制約を定義することも可能です。例えば、pattern.within() メソッドを使って10秒内に発生すべきパターンを定義することができます。一時的なパターンは処理とイベント時間の両方でサポートされます。

注意 パターンのシーケンスは1つの一時的な制約のみ持つことができます。複数のそのような制約が異なる個々のパターンで定義された場合、一番小さいものが適用されます。

next.within(Time.seconds(10));
next.within(Time.seconds(10))

begin, followedBy, followedByAny および nextのための条件としてパターンのシーケンスを定義することもできます。パターンのシーケンスは論理的な一致条件と見なされ、GroupPattern が返され、GroupPatternに対してoneOrMore(), times(#ofTimes), times(#fromTimes, #toTimes), optional(), consecutive(), allowCombinations()を適用できるでしょう。

Pattern<Event, ?> start = Pattern.begin(
    Pattern.<Event>begin("start").where(...).followedBy("start_middle").where(...)
);

// strict contiguity
Pattern<Event, ?> strict = start.next(
    Pattern.<Event>begin("next_start").where(...).followedBy("next_middle").where(...)
).times(3);

// relaxed contiguity
Pattern<Event, ?> relaxed = start.followedBy(
    Pattern.<Event>begin("followedby_start").where(...).followedBy("followedby_middle").where(...)
).oneOrMore();

// non-deterministic relaxed contiguity
Pattern<Event, ?> nonDetermin = start.followedByAny(
    Pattern.<Event>begin("followedbyany_start").where(...).followedBy("followedbyany_middle").where(...)
).optional();
val start: Pattern[Event, _] = Pattern.begin(
    Pattern.begin[Event, _]("start").where(...).followedBy("start_middle").where(...)
)

// strict contiguity
val strict: Pattern[Event, _] = start.next(
    Pattern.begin[Event, _]("next_start").where(...).followedBy("next_middle").where(...)
).times(3)

// relaxed contiguity
val relaxed: Pattern[Event, _] = start.followedBy(
    Pattern.begin[Event, _]("followedby_start").where(...).followedBy("followedby_middle").where(...)
).oneOrMore()

// non-deterministic relaxed contiguity
val nonDetermin: Pattern[Event, _] = start.followedByAny(
    Pattern.begin[Event, _]("followedbyany_start").where(...).followedBy("followedbyany_middle").where(...)
).optional()


パターンのオペレーション 解説
begin(#name)

開始パターンを定義:

Pattern<Event, ?> start = Pattern.<Event>begin("start");
begin(#pattern_sequence)

開始パターンを定義:

Pattern<Event, ?> start = Pattern.<Event>begin(
    Pattern.<Event>begin("start").where(...).followedBy("middle").where(...)
);
next(#name)

新しいパターンを追加します。合致イベントは以前の合致イベント(厳密な隣接)を直接継承する必要があります:

Pattern<Event, ?> next = start.next("middle");
next(#pattern_sequence)

新しいパターンを追加します。合致イベントのシーケンスは以前の合致イベント(厳密な隣接)を直接継承する必要があります:

Pattern<Event, ?> next = start.next(
    Pattern.<Event>begin("start").where(...).followedBy("middle").where(...)
);
followedBy(#name)

新しいパターンを追加します。他のイベントが合致イベントと以前の合致イベント間で起こるかもしれません (緩い隣接):

Pattern<Event, ?> followedBy = start.followedBy("middle");
followedBy(#pattern_sequence)

新しいパターンを追加します。他のイベントが合致イベントのシーケンスと以前の合致イベント間で起こるかもしれません (緩い隣接):

Pattern<Event, ?> followedBy = start.followedBy(
    Pattern.<Event>begin("start").where(...).followedBy("middle").where(...)
);
followedByAny(#name)

新しいパターンを追加します。合致イベントと以前の合致イベント間で他のイベントが起きるかもしれず、そして代わりの合致が各代わりの合致イベントについて存在するでしょう (非決定論的な緩い隣接):

Pattern<Event, ?> followedByAny = start.followedByAny("middle");
followedByAny(#pattern_sequence)

新しいパターンを追加します。合致イベントのシーケンスと以前の合致イベント間で他のイベントが起きるかもしれず、そして代わりの合致が各代わりの合致イベントのシーケンスについて存在するでしょう (非決定論的な緩い隣接):

Pattern<Event, ?> followedByAny = start.followedByAny(
    Pattern.<Event>begin("start").where(...).followedBy("middle").where(...)
);
notNext()

新しい否定のパターンを追加します。破棄される部分一致について、合致 (否定)イベントは以前の合致イベント(厳密な隣接)を直接継承する必要があります:

Pattern<Event, ?> notNext = start.notNext("not");
notFollowedBy()

新しい否定のパターンを追加します。他のイベントが合致(否定)イベントと以前の合致イベント(緩い隣接)間で発生する場合でも、部分一致するイベントのシーケンスが削除されるでしょう:

Pattern<Event, ?> notFollowedBy = start.notFllowedBy("not");
within(time)

パターンに合致するイベントシーケンスの間隔の最大時間を定義します。完了していないイベントシーケンスがこの時間を継承する場合、それは削除されます:

pattern.within(Time.seconds(10));
パターンのオペレーション 解説
begin()

開始パターンを定義:

val start = Pattern.begin[Event]("start")
next(#name)

新しいパターンを追加します。合致イベントは以前の合致イベント(厳密な隣接)を直接継承する必要があります:

val next = start.next("middle")
next(#pattern_sequence)

新しいパターンを追加します。合致イベントのシーケンスは以前の合致イベント(厳密な隣接)を直接継承する必要があります:

val next = start.next(
    Pattern.begin[Event]("start").where(...).followedBy("middle").where(...)
)
followedBy(#name)

新しいパターンを追加します。他のイベントが合致イベントと以前の合致イベント間で起こるかもしれません (緩い隣接):

val followedBy = start.followedBy("middle")
followedBy(#pattern_sequence)

新しいパターンを追加します。他のイベントが合致イベントのシーケンスと以前の合致イベント間で起こるかもしれません (緩い隣接):

val followedBy = start.followedBy(
    Pattern.begin[Event]("start").where(...).followedBy("middle").where(...)
)
followedByAny(#name)

新しいパターンを追加します。合致イベントと以前の合致イベント間で他のイベントが起きるかもしれず、そして代わりの合致が各代わりの合致イベントについて存在するでしょう (非決定論的な緩い隣接):

val followedByAny = start.followedByAny("middle")
followedByAny(#pattern_sequence)

新しいパターンを追加します。合致イベントのシーケンスと以前の合致イベント間で他のイベントが起きるかもしれず、そして代わりの合致が各代わりの合致イベントのシーケンスについて存在するでしょう (非決定論的な緩い隣接):

val followedByAny = start.followedByAny(
    Pattern.begin[Event]("start").where(...).followedBy("middle").where(...)
)
notNext()

新しい否定のパターンを追加します。破棄される部分一致について、合致 (否定)イベントは以前の合致イベント(厳密な隣接)を直接継承する必要があります:

val notNext = start.notNext("not")
notFollowedBy()

新しい否定のパターンを追加します。他のイベントが合致(否定)イベントと以前の合致イベント(緩い隣接)間で発生する場合でも、部分一致するイベントのシーケンスが削除されるでしょう:

val notFollowedBy = start.notFllowedBy("not")
within(time)

パターンに合致するイベントシーケンスの間隔の最大時間を定義します。完了していないイベントシーケンスがこの時間を継承する場合、それは削除されます:

pattern.within(Time.seconds(10))

合致後のスキップ戦略

指定されたパターンについて、同じイベントが複数の成功一致に割り当てられるかもしれません。To control to how many matches an event will be assigned, you need to specify the skip strategy called AfterMatchSkipStrategy. 以下でリスト化されるように、4つの種類のスキップ戦略があります:

  • NO_SKIP: 全ての可能な一致が発行されるでしょう。
  • SKIP_PAST_LAST_EVENT: 合致のイベントを含む全ての部分一致を破棄します。
  • SKIP_TO_FIRST: 最初のPatternNameを継承する合致イベントを含む全ての部分一致を破棄します。
  • SKIP_TO_LAST: 最後のPatternNameを継承する合致イベントを含む全ての部分一致を破棄します。

SKIP_TO_FIRSTSKIP_TO_LAST スキップ戦略を使う場合、有効な PatternName も指定されなければならないことに注意してください。

例えば、指定されたパターンa b{2} とデータストリーム ab1, ab2, ab3, ab4, ab5, ab6について、これら4つのスキップ戦略間の違いは、以下の通りです:

Skip Strategy 結果 解説
NO_SKIP ab1 ab2 ab3
ab2 ab3 ab4
ab3 ab4 ab5
ab4 ab5 ab6
合致 ab1 ab2 ab3を見つけた後で、合致プロセスはどの結果も破棄しないでしょう。
SKIP_PAST_LAST_EVENT ab1 ab2 ab3
ab4 ab5 ab6
合致 ab1 ab2 ab3を見つけた後で、合致プロセスは全ての開始された部分合致を破棄するでしょう。
SKIP_TO_FIRST[b] ab1 ab2 ab3
ab2 ab3 ab4
ab3 ab4 ab5
ab4 ab5 ab6
合致 ab1 ab2 ab3 を見つけた後で、合致プロセスはab1を含む全ての部分一致を破棄するでしょう。これは最初の bの前にくる唯一のイベントです。
SKIP_TO_LAST[b] ab1 ab2 ab3
ab3 ab4 ab5
合致 ab1 ab2 ab3を見つけた後で、合致プロセスは ab1ab2 を含む全ての部分合致を破棄するでしょう。これらは最後の bの前に来るイベントです。

どのスキップ戦略を使用するかを指定するには、単に以下のものを呼び出すことでAfterMatchSkipStrategy を作成します:

Function 解説
AfterMatchSkipStrategy.noSkip() NO_SKIP スキップ戦略を生成します
AfterMatchSkipStrategy.skipPastLastEvent() SKIP_PAST_LAST_EVENT スキップ戦略を生成します
AfterMatchSkipStrategy.skipToFirst(patternName) 参照されるパターン名 patternName を持つSKIP_TO_FIRST スキップ戦略を生成します
AfterMatchSkipStrategy.skipToLast(patternName) 参照されるパターン名patternNameを持つ SKIP_TO_LAST スキップ戦略を生成します

そして、以下を呼び出すことでスキップ戦略をパターンに適用します:

AfterMatchSkipStrategy skipStrategy = ...
Pattern.begin("patternName", skipStrategy);
val skipStrategy = ...
Pattern.begin("patternName", skipStrategy)

デタッチング パターン

探しているパターンのシーケンスを指定した後は、潜在的な合致を検知するためにそれを入力ストリームに適用する番です。パターンシーケンスに対してイベントのストリームを実行するには、PatternStreamを作成する必要があります。Given an input stream input, a pattern pattern and an optional comparator comparator used to sort events with the same timestamp in case of EventTime or that arrived at the same moment, you create the PatternStream by calling:

DataStream<Event> input = ...
Pattern<Event, ?> pattern = ...
EventComparator<Event> comparator = ... // optional

PatternStream<Event> patternStream = CEP.pattern(input, pattern, comparator);
val input : DataStream[Event] = ...
val pattern : Pattern[Event, _] = ...
var comparator : EventComparator[Event] = ... // optional

val patternStream: PatternStream[Event] = CEP.pattern(input, pattern, comparator)

入力ストリームはユースケースに応じてkeyed あるいは non-keyed されているかもしれません。

注意 パターンを non-keyed ストリームに適用することで並行度が1のジョブという結果になるでしょう。

パターンからの選択

一旦PatternStream を取得すると、select あるいは flatSelect メソッドを使って検知したイベントシーケンスから選択することができます。

select() メソッドは PatternSelectFunction 実装を必要とします。PatternSelectFunction は各合致イベントシーケンスごとに呼ばれる select メソッドを持ちます。それはMap<String, List<IN>>の形で合致を受け取ります。キーはパターンシーケンス内の各パターンの名前で、値はそのパターンについて受け付けられた全てのイベントのリストです (IN は入力エレメントの型です)。指定されたパターンについてのイベントはタイムスタンプで並べられます。各パターンについて受け付けられたイベントのリストを返す理由は、ループパターン(例えば oneToMany()times()) を使う時に1つ以上のイベントが指定されたパターンについて受け付けられるかもしれないからです。選択関数は確実に1つの結果を返します。

class MyPatternSelectFunction<IN, OUT> implements PatternSelectFunction<IN, OUT> {
    @Override
    public OUT select(Map<String, List<IN>> pattern) {
        IN startEvent = pattern.get("start").get(0);
        IN endEvent = pattern.get("end").get(0);
        return new OUT(startEvent, endEvent);
    }
}

PatternFlatSelectFunctionPatternSelectFunctionに似ています。唯一の違いはそれが任意の数の結果を返すことができることです。これを行うために、select メソッドは出力要素をダウンストリームに転送するために使われる追加の Collector パラメータを持ちます。

class MyPatternFlatSelectFunction<IN, OUT> implements PatternFlatSelectFunction<IN, OUT> {
    @Override
    public void flatSelect(Map<String, List<IN>> pattern, Collector<OUT> collector) {
        IN startEvent = pattern.get("start").get(0);
        IN endEvent = pattern.get("end").get(0);

        for (int i = 0; i < startEvent.getValue(); i++ ) {
            collector.collect(new OUT(startEvent, endEvent));
        }
    }
}

select() メソッドは引数として選択関数を取ります。これは各合致イベントシーケンスごとに呼ばれます。それMap[String, Iterable[IN]]の形で合致を受け取ります。キーはパターンシーケンス内の各パターンの名前で、値はそのパターンについて受け付けられた全てのイベントのIterableです (IN は入力エレメントの型です)。

指定されたパターンについてのイベントはタイムスタンプで並べられます。各パターンについて受け付けられたイベントのiterableを返す理由は、ループパターン(例えば oneToMany()</c0 と times()) を使う時に1つ以上のイベントが指定されたパターンについて受け付けられるかもしれないからです。選択関数は呼び出しごとに確実に1つの結果を返します。

def selectFn(pattern : Map[String, Iterable[IN]]): OUT = {
    val startEvent = pattern.get("start").get.next
    val endEvent = pattern.get("end").get.next
    OUT(startEvent, endEvent)
}

flatSelect メソッドは select メソッドに似ています。それらの唯一の違いは、flatSelect メソッドに渡される関数は呼び出しごとに任意の数の結果を返すことができるということです。これを行うために、flatSelectのための関数は出力要素をダウンストリームに転送するために使われる追加のCollector パラメータを持ちます。

def flatSelectFn(pattern : Map[String, Iterable[IN]], collector : Collector[OUT]) = {
    val startEvent = pattern.get("start").get.next
    val endEvent = pattern.get("end").get.next
    for (i <- 0 to startEvent.getValue) {
        collector.collect(OUT(startEvent, endEvent))
    }
}

部分的なパターンのタイムアウトの処理

Whenever a pattern has a window length attached via the within keyword, it is possible that partial event sequences are discarded because they exceed the window length. これらのタイムアウトした部分合致に対応するために、selectflatSelect API 呼び出しによってタイムアウトハンドラを指定することができます。このタイムアウト ハンドラは各タイムアウト部分イベントシーケンスを必要とします。タイムアウトハンドラはパターンによって今まで合致した全てのイベントと、タイムアウトが検知された時のタイムスタンプを受け取ります。

部分パターンに対応するために、selectflatSelect API 呼び出しはパラメータとして取る上書きされたバージョンを提供します。

  • PatternTimeoutFunction/PatternFlatTimeoutFunction
  • OutputTag for the side output in which the timeouted matches will be returned
  • and the known PatternSelectFunction/PatternFlatSelectFunction.
PatternStream<Event> patternStream = CEP.pattern(input, pattern);

OutputTag<String> outputTag = new OutputTag<String>("side-output"){};

SingleOutputStreamOperator<ComplexEvent> result = patternStream.select(
    new PatternTimeoutFunction<Event, TimeoutEvent>() {...},
    outputTag,
    new PatternSelectFunction<Event, ComplexEvent>() {...}
);

DataStream<TimeoutEvent> timeoutResult = result.getSideOutput(outputTag);

SingleOutputStreamOperator<ComplexEvent> flatResult = patternStream.flatSelect(
    new PatternFlatTimeoutFunction<Event, TimeoutEvent>() {...},
    outputTag,
    new PatternFlatSelectFunction<Event, ComplexEvent>() {...}
);

DataStream<TimeoutEvent> timeoutFlatResult = flatResult.getSideOutput(outputTag);
val patternStream: PatternStream[Event] = CEP.pattern(input, pattern)

val outputTag = OutputTag[String]("side-output")

val result: SingleOutputStreamOperator[ComplexEvent] = patternStream.select(outputTag){
    (pattern: Map[String, Iterable[Event]], timestamp: Long) => TimeoutEvent()
} {
    pattern: Map[String, Iterable[Event]] => ComplexEvent()
}

val timeoutResult: DataStream<TimeoutEvent> = result.getSideOutput(outputTag);

flatSelect API 呼び出しは、1つの目のパラメータとしてタイムアウト関数を、2つ目のパラメータとして選択関数を取る、同じ上書きされたバージョンを呼び出します。select 関数と対照的に、flatSelect 関数はCollectorを使って呼ばれます。任意の数のイベントを発行するためにcollectorを使うことができます。

val patternStream: PatternStream[Event] = CEP.pattern(input, pattern)

val outputTag = OutputTag[String]("side-output")

val result: SingleOutputStreamOperator[ComplexEvent] = patternStream.flatSelect(outputTag){
    (pattern: Map[String, Iterable[Event]], timestamp: Long, out: Collector[TimeoutEvent]) =>
        out.collect(TimeoutEvent())
} {
    (pattern: mutable.Map[String, Iterable[Event]], out: Collector[ComplexEvent]) =>
        out.collect(ComplexEvent())
}

val timeoutResult: DataStream<TimeoutEvent> = result.getSideOutput(outputTag);

イベント時間の遅延の処理

CEP では、要素が処理される順番は問題ではありません。To guarantee that elements are processed in the correct order when working in event time, an incoming element is initially put in a buffer where elements are sorted in ascending order based on their timestamp, and when a watermark arrives, all the elements in this buffer with timestamps smaller than that of the watermark are processed. これはウォーターマーク間の要素はイベント時間の順番で処理されることを意味します。

注意 イベント時間で動作する時は、ライブラリはウォーターマークの正確さを仮定します。

To guarantee that elements across watermarks are processed in event-time order, Flink’s CEP library assumes correctness of the watermark, and considers as late elements whose timestamp is smaller than that of the last seen watermark. 遅れてきた要素はもう処理されません。

以下の例はEventsのkeyedデータストリーム上でパターン start, middle(name = "error") -> end(name = "critical")を検知します。イベントはそれらのidによってキー付けされ、有効なパターンは10秒以内に起きる必要があります。処理全体はイベント時間を使って行われます。

StreamExecutionEnvironment env = ...
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

DataStream<Event> input = ...

DataStream<Event> partitionedInput = input.keyBy(new KeySelector<Event, Integer>() {
	@Override
	public Integer getKey(Event value) throws Exception {
		return value.getId();
	}
});

Pattern<Event, ?> pattern = Pattern.<Event>begin("start")
	.next("middle").where(new SimpleCondition<Event>() {
		@Override
		public boolean filter(Event value) throws Exception {
			return value.getName().equals("error");
		}
	}).followedBy("end").where(new SimpleCondition<Event>() {
		@Override
		public boolean filter(Event value) throws Exception {
			return value.getName().equals("critical");
		}
	}).within(Time.seconds(10));

PatternStream<Event> patternStream = CEP.pattern(partitionedInput, pattern);

DataStream<Alert> alerts = patternStream.select(new PatternSelectFunction<Event, Alert>() {
	@Override
	public Alert select(Map<String, List<Event>> pattern) throws Exception {
		return createAlert(pattern);
	}
});
val env : StreamExecutionEnvironment = ...
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

val input : DataStream[Event] = ...

val partitionedInput = input.keyBy(event => event.getId)

val pattern = Pattern.begin("start")
  .next("middle").where(_.getName == "error")
  .followedBy("end").where(_.getName == "critical")
  .within(Time.seconds(10))

val patternStream = CEP.pattern(partitionedInput, pattern)

val alerts = patternStream.select(createAlert(_)))

Flink-1.3 のCEPライブラリはAPIのいくつかの変更に繋がる多くの新しい機能と一緒に出荷されます。ここでは、Flink-1.3を使って古いCEPジョブを実行できるように、それらに行わなければならない変更を説明します。これらの変更を行い、ジョブを再コンパイルした後で、ジョブの古いバージョンを使って取ったセーブポイントから実行を再開することができるでしょう。つまり 過去のデータを再処理する必要がありません。

必要な変更は以下の通りです:

  1. FilterFunctionインタフェースの実装の代わりにSimpleConditionクラスを継承するために、条件(where(...) 句内のもの)を変更します。

  2. Change your functions provided as arguments to the select(...) and flatSelect(...) methods to expect a list of events associated with each pattern (List in Java, Iterable in Scala). This is because with the addition of the looping patterns, multiple input events can match a single (looping) pattern.

  3. Flink 1.1 および 1.2 でのfollowedBy()非決定論的な緩い隣接 (ここを見てください)を意味します。Flink 1.3ではこれは変更され、followedBy()緩い隣接を意味し、一方で非決定論的な緩い隣接が必要な場合はfollowedByAny() が使われる必要があります。

上に戻る

TOP
inserted by FC2 system