FlinkCEP はFlink上に実装された Complex Event Processing (CEP) ライブラリです。データ内で何が重要なのかを理解する機会を与えながら、イベントの終わりの無いストリーム内でのイベントのパターンを検出することができます。
このページはFlink CEP内で利用可能なAPIの呼び出しを説明します。イベントのシーケンスを合致する時に検知および振舞うを紹介する前に、パターン APIを皮切りに紹介します。これによりストリーム内で検知したいパターンを指定することができます。それから、CEPライブラリがイベント時間で遅延を扱う時の仮定と、古いFlinkバージョンからFlink-1.3へどうやってジョブを移設するかを紹介します。
すぐに飛び込みたい場合は、Flink プログラムをセットアップし、FlinkCEP の依存をプロジェクトのpom.xml
に追加します。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep_2.11</artifactId>
<version>1.5-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep-scala_2.11</artifactId>
<version>1.5-SNAPSHOT</version>
</dependency>
情報 FlinkCEP はバイナリの配布物の一部ではありません。クラスタ実行のためにそれとどうやってリンクするかをここで見ます。
これで、パターンAPIを使って初めてのCEPプログラムを書き始めることができます。
注意 FlinkCEPはイベントの比較とマッチングに使うため、パターンマッチングを適用したいDataStream
内のイベントは適切なequals()
と hashCode()
メソッドを実装する必要があります。
DataStream<Event> input = ...
Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(
new SimpleCondition<Event>() {
@Override
public boolean filter(Event event) {
return event.getId() == 42;
}
}
).next("middle").subtype(SubEvent.class).where(
new SimpleCondition<Event>() {
@Override
public boolean filter(SubEvent subEvent) {
return subEvent.getVolume() >= 10.0;
}
}
).followedBy("end").where(
new SimpleCondition<Event>() {
@Override
public boolean filter(Event event) {
return event.getName().equals("end");
}
}
);
PatternStream<Event> patternStream = CEP.pattern(input, pattern);
DataStream<Alert> result = patternStream.select(
new PatternSelectFunction<Event, Alert> {
@Override
public Alert select(Map<String, List<Event>> pattern) throws Exception {
return createAlertFrom(pattern);
}
}
});
val input: DataStream[Event] = ...
val pattern = Pattern.begin("start").where(_.getId == 42)
.next("middle").subtype(classOf[SubEvent]).where(_.getVolume >= 10.0)
.followedBy("end").where(_.getName == "end")
val patternStream = CEP.pattern(input, pattern)
val result: DataStream[Alert] = patternStream.select(createAlert(_))
パターンAPIにより入力ストリームから抽出したい複雑なパターンのシーケンスを定義することができます。
それぞれの複雑なパターンのシーケンスは複数の簡単なパターンから成ります。つまり、同じプロパティを持つ個々のイベントの探すパターン。これから、これらの簡単なパターンを patternsと呼び、私たちがストリーム内で探している最終の複雑なパターンのシーケンスをパターン シーケンスと呼びます。パターンシーケンスはそのようなパターンのグラフとして見えます。1つのパターンから次への移行はユーザ固有の条件に基づいて発生します。例えば、event.getName().equals("start")
。match は有効なパターン移行を使った全ての複雑なパターングラフのパターンを訪ねる入力イベントのシーケンスです。
注意 各パターンはユニークな名前を持たなければなりません。合致したイベントを識別するためにあとでそれを使います。
注意 パターンの名前は文字":"
を含むことができません。
この章の残りでまず個々のパターンをどう定義するかを説明し、それから個々のパターンをどうやって複雑なパターンに組み合わせることができるかを説明します。
パターン は シングルトン あるいは ループ パターンのどちらかです。シングルトン パターンは 1つのイベントを受け付けますが、ループ パターンは1つ以上を受け付けることができます。パターン マッチングのシンボル内で、パターン "a b+ c?d"
(あるいは "a"
に続く1つ以上の "b"
、任意で "c"
が続き、"d"
が続きます)、a
, c?
および d
はシングルトンパターンですが、 b+
はループです。デフォルトでは、パターンはシングルトンパターンで、Quantifiersを使うことでそれをループに変換することができます。各パターンはイベントを受け付ける1つ以上の条件 を持つことができます。
FlinkCEPの中で、これらのメソッドを使ってループパターンを指定することができます: pattern.oneOrMore()
、指定されたイベントの1つ以上の出現を期待するパターン(例えば、以前述べられた b+
); そして pattern.times(#ofTimes)
、指定された意弁の型のある数の出現を期待するパターン、例えば、4 a
’s; そして pattern.times(#fromTimes, #toTimes)
、特定の最小数の出現と指定されたイベントの型の出現の最大数を期待するパターン、例えば 2-4 a
s。
pattern.greedy()
メソッドを使って貪欲にループパターンを作成することができますが、まだグループパタンを貪欲に作ることはできません。ループの有る無しの全てのパターンを任意のpattern.optional()
メソッドを使って作ることができます。
start
という名前のパターンについては、以下は有効な数量詞です:
// expecting 4 occurrences
start.times(4);
// expecting 0 or 4 occurrences
start.times(4).optional();
// expecting 2, 3 or 4 occurrences
start.times(2, 4);
// expecting 2, 3 or 4 occurrences and repeating as many as possible
start.times(2, 4).greedy();
// expecting 0, 2, 3 or 4 occurrences
start.times(2, 4).optional();
// expecting 0, 2, 3 or 4 occurrences and repeating as many as possible
start.times(2, 4).optional().greedy();
// expecting 1 or more occurrences
start.oneOrMore();
// expecting 1 or more occurrences and repeating as many as possible
start.oneOrMore().greedy();
// expecting 0 or more occurrences
start.oneOrMore().optional();
// expecting 0 or more occurrences and repeating as many as possible
start.oneOrMore().optional().greedy();
// expecting 2 or more occurrences
start.timesOrMore(2);
// expecting 2 or more occurrences and repeating as many as possible
start.timesOrMore(2).greedy();
// expecting 0, 2 or more occurrences and repeating as many as possible
start.timesOrMore(2).optional().greedy();
// expecting 4 occurrences
start.times(4)
// expecting 0 or 4 occurrences
start.times(4).optional()
// expecting 2, 3 or 4 occurrences
start.times(2, 4);
// expecting 2, 3 or 4 occurrences and repeating as many as possible
start.times(2, 4).greedy();
// expecting 0, 2, 3 or 4 occurrences
start.times(2, 4).optional();
// expecting 0, 2, 3 or 4 occurrences and repeating as many as possible
start.times(2, 4).optional().greedy();
// expecting 1 or more occurrences
start.oneOrMore()
// expecting 1 or more occurrences and repeating as many as possible
start.oneOrMore().greedy();
// expecting 0 or more occurrences
start.oneOrMore().optional()
// expecting 0 or more occurrences and repeating as many as possible
start.oneOrMore().optional().greedy();
// expecting 2 or more occurrences
start.timesOrMore(2);
// expecting 2 or more occurrences and repeating as many as possible
start.timesOrMore(2).greedy();
// expecting 0, 2 or more occurrences
start.timesOrMore(2).optional();
// expecting 0, 2 or more occurrences and repeating as many as possible
start.timesOrMore(2).optional().greedy();
各パターンにおいて、1つのパターンから次のパターンに変わるために、追加の 条件を設定することができます。これらの条件を以下に関係することができます:
やってくるイベントのプロパティ、例えば、値が5より大きくなければならない、あるいは以前受け付けたイベントの平均値よりも大きくなければならない。
合致イベントの隣接、例えば、どの一致の間でも合致しないパターンa,b,c
を検出する。
後者は“ループ” パターンを参照します、つまり 1つより多くのイベントを受け付けることができるパターン、例えばa b+ c
の中のb+
、これは1つ以上のb
を検索します。
pattern.where()
, pattern.or()
あるいは pattern.until()
メソッドを使ってイベントプロパティ上の条件を指定することができます。それらは IterativeCondition
あるいは SimpleCondition
のどちらかでしょう。
繰り返し条件: これは最も一般的な条件の型です。これは以前受け付けられたイベント、あるいはそれらの部分集合上の統計量のプロパティに基づく連続するイベントを受け付ける条件を指定する方法です。
以下は、もしパターンについて以前に受け付けられたイベントの価格の合計に加えて現在のイベントの価格が5.0の値を超えない場合に、名前が“foo”で始まる場合“middle”という名前のパターンのための次のイベントを受け付ける反復条件のためのコードです。反復条件は、特にループ パターンとの組み合わせの時に強力になります。例えば oneOrMore()
。
middle.oneOrMore().where(new IterativeCondition<SubEvent>() {
@Override
public boolean filter(SubEvent value, Context<SubEvent> ctx) throws Exception {
if (!value.getName().startsWith("foo")) {
return false;
}
double sum = value.getPrice();
for (Event event : ctx.getEventsForPattern("middle")) {
sum += event.getPrice();
}
return Double.compare(sum, 5.0) < 0;
}
});
middle.oneOrMore().where(
(value, ctx) => {
lazy val sum = ctx.getEventsForPattern("middle").asScala.map(_.getPrice).sum
value.getName.startsWith("foo") && sum + value.getPrice < 5.0
}
)
注意 context.getEventsForPattern(...)
の呼び出しは指定された可能な合致について以前に受け付けられたイベント全てを見つけます。この操作のコストは変化するかもしれないため、条件を実装する時はそれを最小限にするようにしてください。
単純な条件: この種類の条件は前述されたIterativeCondition
クラスを拡張し、イベント自身のプロパティのみに基づいて、イベントを受け付けるかどうかを決定します。
start.where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) {
return value.getName().startsWith("foo");
}
});
start.where(event => event.getName.startsWith("foo"))
最後に、受け付けられるイベントの型をpattern.subtype(subClass)
メソッドを使って初期イベントタイプ(ここではEvent
) の部分型に制限することもできます
start.subtype(SubEvent.class).where(new SimpleCondition<SubEvent>() {
@Override
public boolean filter(SubEvent value) {
return ... // some condition
}
});
start.subtype(classOf[SubEvent]).where(subEvent => ... /* some condition */)
組み合わせ条件: 上で示したように、subtype
条件を追加の条件と組み合わせることができます。これは各条件について維持されます。連続するwhere()
呼び出しによって任意の条件を組み合わせることができます。最終的な結果は個々の条件の論理学的なAND になるでしょう。ORを使って条件を組み合わせるために、以下で示すようにor()
メソッドを使うことができます。
pattern.where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) {
return ... // some condition
}
}).or(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) {
return ... // or condition
}
});
pattern.where(event => ... /* some condition */).or(event => ... /* or condition */)
停止条件: ループパターン (oneOrMore()
および oneOrMore().optional()
) の場合、停止条件を指定することもできます。例えば、値の合憲が50より小さい間 5より大きい値を持つイベントを受け付ける。
より理解するには、以下の例を見てください。仮定
"(a+ until b)"
("b"
まで1つ以上の "a"
)のようなパターン
やってくるイベントのシーケンス "a1" "c" "a2" "b" "a3"
ライブラリは以下の結果を出力するでしょう: {a1 a2} {a1} {a2} {a3}
。
見てわかるように、{a1 a2 a3}
あるいは {a2 a3}
は停止条件のために返されません。
FlinkCEP はイベント間の以下の隣接形式をサポートします:
厳密な隣接: 全ての合致するイベントが合致しないイベント無しに厳密に1つずつ現れることを期待する。
緩めの隣接: 合致するイベント間に現れる合致しないイベントを無視する。
非決定論的な緩めの隣接: いくつかの合致イベントを無視する追加の合致が可能な更に緩い隣接。
例を使って上を説明するために、入力"a1", "c", "a2", "b"
を持つパターンのシーケンス "a+ b"
(1つ以上の"a"
に "b"
が続く)は以下の結果を持つでしょう:
厳密な隣接: {a2 b}
– "a1"
の後の "c"
は "a1"
を捨てます。
緩めの隣接: {a1 b}
と {a1 a2 b}
– c
は無視されます。
非決定論的な緩めの隣接: {a1 b}
, {a2 b}
および {a1 a2 b}
。
ループ パターン (例えばoneOrMore()
と times()
) については、デフォルトは 緩い隣接です。もし厳密な隣接を必要とする場合は、consecutive()
呼び出しを使って明示的にそれを指定する必要があります。もし非決定論的な緩い隣接 を必要とする場合はallowCombinations()
呼び出しを使うことができます。
注意 この章では、1つのループパターン内の隣接について話しています。consecutive()
および allowCombinations()
の呼び出しはその文脈で理解される必要があります。Later when looking at Combining Patterns we’ll discuss other calls, such as next()
and followedBy()
, that are used to specify contiguity conditions between patterns.
パターンのオペレーション | 解説 |
---|---|
where(condition) |
現在のパターンの条件を定義します。パターンを合致させるために、イベントは条件を満たす必要があります。複数の連続する where() 句はANDされる条件になります:
|
or(condition) |
既存の条件とORされる新しい条件を追加します。条件のうち少なくとも1つを通過する場合のみ、イベントはパターンと合致することができます:
|
until(condition) |
ループパターンのための停止条件を指定します。指定された条件と一致するイベントが起きた時に、それ以上のイベントがパターンに受け付けられないことを意味します。
注意: イベントベースの条件に対応するパターンのための状態の掃除を考慮します。
|
subtype(subClass) |
現在のパターンのためのサブタイプの条件を定義します。このサブタイプのパターンの場合のみイベントが合致します:
|
oneOrMore() |
このパターンは少なくとも1つの合致するイベントの出現を期待することを指定します。 デフォルトでは緩い内部的な隣接(連続するイベント間)で使われます。内部的な隣接についての詳細な情報は一貫性を見てください。 注意: 状態の掃除を有効にするには
|
timesOrMore(#times) |
このパターンは少なくとも#times回の合致するイベントの出現を期待することを指定します。 デフォルトでは緩い内部的な隣接(連続するイベント間)で使われます。内部的な隣接についての詳細な情報は一貫性を見てください。
|
times(#ofTimes) |
このパターンは合致するイベントの出現が確実な数であることを期待します。 デフォルトでは緩い内部的な隣接(連続するイベント間)で使われます。内部的な隣接についての詳細な情報は一貫性を見てください。
|
times(#fromTimes, #toTimes) |
このパターンは 合致するイベントの#fromTimes と#toTimesの間の出現を期待することを指定します。 デフォルトでは緩い内部的な隣接(連続するイベント間)で使われます。内部的な隣接についての詳細な情報は一貫性を見てください。
|
optional() |
このパターンが任意であることを指定します。つまり、全く起こらないかもしれません。これは全ての前述した数量詞に適用可能です。
|
greedy() |
このパターンが貪欲であることを指定します。つまり、できる限り多く繰り返されるでしょう。これは数量詞にのみ敵よ可能で、今のところグループパターンをサポートしません。
|
consecutive() |
適用されない場合は( 例. 以下のようなパターン:
入力シーケンス C D A1 A2 A3 D A4 B について、以下のような合致を生成するでしょう: consecutive が適用される場合: {C A1 B}, {C A1 A2 B}, {C A1 A2 A3 B} consecutive が適用されない場合: {C A1 B}, {C A1 A2 B}, {C A1 A2 A3 B}, {C A1 A2 A3 A4 B} |
allowCombinations() |
適用されない場合は( 例. 以下のようなパターン:
入力シーケンス C D A1 A2 A3 D A4 B について、以下のような合致を生成するでしょう: combinations が有効な場合: {C A1 B}, {C A1 A2 B}, {C A1 A3 B}, {C A1 A4 B}, {C A1 A2 A3 B}, {C A1 A2 A4 B}, {C A1 A3 A4 B}, {C A1 A2 A3 A4 B} combinations が無効な場合: {C A1 B}, {C A1 A2 B}, {C A1 A2 A3 B}, {C A1 A2 A3 A4 B} |
パターンのオペレーション | 解説 |
---|---|
where(condition) |
現在のパターンの条件を定義します。パターンを合致させるために、イベントは条件を満たす必要があります。複数の連続する where() 句はANDされる条件になります:
|
or(condition) |
既存の条件とORされる新しい条件を追加します。条件のうち少なくとも1つを通過する場合のみ、イベントはパターンと合致することができます:
|
until(condition) |
ループパターンのための停止条件を指定します。指定された条件と一致するイベントが起きた時に、それ以上のイベントがパターンに受け付けられないことを意味します。
注意: イベントベースの条件に対応するパターンのための状態の掃除を考慮します。
|
subtype(subClass) |
現在のパターンのためのサブタイプの条件を定義します。このサブタイプのパターンの場合のみイベントが合致します:
|
oneOrMore() |
このパターンは少なくとも1つの合致するイベントの出現を期待することを指定します。 デフォルトでは緩い内部的な隣接(連続するイベント間)で使われます。内部的な隣接についての詳細な情報は一貫性を見てください。 注意: 状態の掃除を有効にするには
|
timesOrMore(#times) |
このパターンは少なくとも#times回の合致するイベントの出現を期待することを指定します。 デフォルトでは緩い内部的な隣接(連続するイベント間)で使われます。内部的な隣接についての詳細な情報は一貫性を見てください。
|
times(#ofTimes) |
このパターンは合致するイベントの出現が確実な数であることを期待します。 デフォルトでは緩い内部的な隣接(連続するイベント間)で使われます。内部的な隣接についての詳細な情報は一貫性を見てください。
|
times(#fromTimes, #toTimes) |
このパターンは 合致するイベントの#fromTimes と#toTimesの間の出現を期待することを指定します。 デフォルトでは緩い内部的な隣接(連続するイベント間)で使われます。内部的な隣接についての詳細な情報は一貫性を見てください。
|
optional() |
このパターンが任意であることを指定します。つまり、全く起こらないかもしれません。これは全ての前述した数量詞に適用可能です。
|
greedy() |
このパターンが貪欲であることを指定します。つまり、できる限り多く繰り返されるでしょう。これは数量詞にのみ敵よ可能で、今のところグループパターンをサポートしません。
|
consecutive() |
適用されない場合は( 例. 以下のようなパターン:
入力シーケンス C D A1 A2 A3 D A4 B について、以下のような合致を生成するでしょう: consecutive が適用される場合: {C A1 B}, {C A1 A2 B}, {C A1 A2 A3 B} consecutive が適用されない場合: {C A1 B}, {C A1 A2 B}, {C A1 A2 A3 B}, {C A1 A2 A3 A4 B} |
allowCombinations() |
適用されない場合は( 例. 以下のようなパターン:
入力シーケンス C D A1 A2 A3 D A4 B について、以下のような合致を生成するでしょう: combinations が有効な場合: {C A1 B}, {C A1 A2 B}, {C A1 A3 B}, {C A1 A4 B}, {C A1 A2 A3 B}, {C A1 A2 A4 B}, {C A1 A3 A4 B}, {C A1 A2 A3 A4 B} combinations が無効な場合: {C A1 B}, {C A1 A2 B}, {C A1 A2 A3 B}, {C A1 A2 A3 A4 B} |
個々のパターンがどのように見えるかを見てきたので、それらを完全なパターンのシーケンスとして組み合わせる方法を見る時です。
パターンのシーケンスは以下で見るように初期パターンから始まる必要があります:
Pattern<Event, ?> start = Pattern.<Event>begin("start");
val start : Pattern[Event, _] = Pattern.begin("start")
次に、それらの間で望ましい隣接条件を指定することでパターンシーケンスにより多くのパターンを追加することができます。前の章で、Flinkでサポートされる異なる隣接モードを説明しました。すなわち strict, relaxed およびnon-deterministic relaxed、そしてループパターンの中でそれらを適用する方法を説明しました。連続するパターンの間でそれらを適用するには、以下を使うことができます:
next()
、followedBy()
、followedByAny()
。あるいは
notNext()
、notFollowedBy()
注意 パターンのシーケンスはnotFollowedBy()
で終わることはできません。
注意 NOT
パターンは任意のパターンに先導されることはできません。
// strict contiguity
Pattern<Event, ?> strict = start.next("middle").where(...);
// relaxed contiguity
Pattern<Event, ?> relaxed = start.followedBy("middle").where(...);
// non-deterministic relaxed contiguity
Pattern<Event, ?> nonDetermin = start.followedByAny("middle").where(...);
// NOT pattern with strict contiguity
Pattern<Event, ?> strictNot = start.notNext("not").where(...);
// NOT pattern with relaxed contiguity
Pattern<Event, ?> relaxedNot = start.notFollowedBy("not").where(...);
// strict contiguity
val strict: Pattern[Event, _] = start.next("middle").where(...)
// relaxed contiguity
val relaxed: Pattern[Event, _] = start.followedBy("middle").where(...)
// non-deterministic relaxed contiguity
val nonDetermin: Pattern[Event, _] = start.followedByAny("middle").where(...)
// NOT pattern with strict contiguity
val strictNot: Pattern[Event, _] = start.notNext("not").where(...)
// NOT pattern with relaxed contiguity
val relaxedNot: Pattern[Event, _] = start.notFollowedBy("not").where(...)
緩い隣接は最初の連続する合致イベントだけが一致するだろうことを意味しますが、同じ開始について非決定論的な緩い隣接を使って複数の合致が発行されるだろうことを意味します。例として、イベントシーケンス"a", "c", "b1", "b2"
を仮定した時、パターン a b
は以下の結果を与えるでしょう:
a
と b
の厳密な隣接: {}
(一致しない)、"a"
の後の"c"
は"a"
を捨てさせます。
a
と b
の緩い隣接: {a b1}
、緩い隣接は “次の合致するものまで、一致しないイベントをスキップする”として見なされます。
a
と b
の非決定論的な緩い隣接: {a b1}
, {a b2}
、これは最も一般的な形式です。
パターンが有効であるための一時的な制約を定義することも可能です。例えば、pattern.within()
メソッドを使って10秒内に発生すべきパターンを定義することができます。一時的なパターンは処理とイベント時間の両方でサポートされます。
注意 パターンのシーケンスは1つの一時的な制約のみ持つことができます。複数のそのような制約が異なる個々のパターンで定義された場合、一番小さいものが適用されます。
next.within(Time.seconds(10));
next.within(Time.seconds(10))
begin
, followedBy
, followedByAny
および next
のための条件としてパターンのシーケンスを定義することもできます。パターンのシーケンスは論理的な一致条件と見なされ、GroupPattern
が返され、GroupPattern
に対してoneOrMore()
, times(#ofTimes)
, times(#fromTimes, #toTimes)
, optional()
, consecutive()
, allowCombinations()
を適用できるでしょう。
Pattern<Event, ?> start = Pattern.begin(
Pattern.<Event>begin("start").where(...).followedBy("start_middle").where(...)
);
// strict contiguity
Pattern<Event, ?> strict = start.next(
Pattern.<Event>begin("next_start").where(...).followedBy("next_middle").where(...)
).times(3);
// relaxed contiguity
Pattern<Event, ?> relaxed = start.followedBy(
Pattern.<Event>begin("followedby_start").where(...).followedBy("followedby_middle").where(...)
).oneOrMore();
// non-deterministic relaxed contiguity
Pattern<Event, ?> nonDetermin = start.followedByAny(
Pattern.<Event>begin("followedbyany_start").where(...).followedBy("followedbyany_middle").where(...)
).optional();
val start: Pattern[Event, _] = Pattern.begin(
Pattern.begin[Event, _]("start").where(...).followedBy("start_middle").where(...)
)
// strict contiguity
val strict: Pattern[Event, _] = start.next(
Pattern.begin[Event, _]("next_start").where(...).followedBy("next_middle").where(...)
).times(3)
// relaxed contiguity
val relaxed: Pattern[Event, _] = start.followedBy(
Pattern.begin[Event, _]("followedby_start").where(...).followedBy("followedby_middle").where(...)
).oneOrMore()
// non-deterministic relaxed contiguity
val nonDetermin: Pattern[Event, _] = start.followedByAny(
Pattern.begin[Event, _]("followedbyany_start").where(...).followedBy("followedbyany_middle").where(...)
).optional()
パターンのオペレーション | 解説 |
---|---|
begin(#name) |
開始パターンを定義:
|
begin(#pattern_sequence) |
開始パターンを定義:
|
next(#name) |
新しいパターンを追加します。合致イベントは以前の合致イベント(厳密な隣接)を直接継承する必要があります:
|
next(#pattern_sequence) |
新しいパターンを追加します。合致イベントのシーケンスは以前の合致イベント(厳密な隣接)を直接継承する必要があります:
|
followedBy(#name) |
新しいパターンを追加します。他のイベントが合致イベントと以前の合致イベント間で起こるかもしれません (緩い隣接):
|
followedBy(#pattern_sequence) |
新しいパターンを追加します。他のイベントが合致イベントのシーケンスと以前の合致イベント間で起こるかもしれません (緩い隣接):
|
followedByAny(#name) |
新しいパターンを追加します。合致イベントと以前の合致イベント間で他のイベントが起きるかもしれず、そして代わりの合致が各代わりの合致イベントについて存在するでしょう (非決定論的な緩い隣接):
|
followedByAny(#pattern_sequence) |
新しいパターンを追加します。合致イベントのシーケンスと以前の合致イベント間で他のイベントが起きるかもしれず、そして代わりの合致が各代わりの合致イベントのシーケンスについて存在するでしょう (非決定論的な緩い隣接):
|
notNext() |
新しい否定のパターンを追加します。破棄される部分一致について、合致 (否定)イベントは以前の合致イベント(厳密な隣接)を直接継承する必要があります:
|
notFollowedBy() |
新しい否定のパターンを追加します。他のイベントが合致(否定)イベントと以前の合致イベント(緩い隣接)間で発生する場合でも、部分一致するイベントのシーケンスが削除されるでしょう:
|
within(time) |
パターンに合致するイベントシーケンスの間隔の最大時間を定義します。完了していないイベントシーケンスがこの時間を継承する場合、それは削除されます:
|
パターンのオペレーション | 解説 |
---|---|
begin() |
開始パターンを定義:
|
next(#name) |
新しいパターンを追加します。合致イベントは以前の合致イベント(厳密な隣接)を直接継承する必要があります:
|
next(#pattern_sequence) |
新しいパターンを追加します。合致イベントのシーケンスは以前の合致イベント(厳密な隣接)を直接継承する必要があります:
|
followedBy(#name) |
新しいパターンを追加します。他のイベントが合致イベントと以前の合致イベント間で起こるかもしれません (緩い隣接):
|
followedBy(#pattern_sequence) |
新しいパターンを追加します。他のイベントが合致イベントのシーケンスと以前の合致イベント間で起こるかもしれません (緩い隣接):
|
followedByAny(#name) |
新しいパターンを追加します。合致イベントと以前の合致イベント間で他のイベントが起きるかもしれず、そして代わりの合致が各代わりの合致イベントについて存在するでしょう (非決定論的な緩い隣接):
|
followedByAny(#pattern_sequence) |
新しいパターンを追加します。合致イベントのシーケンスと以前の合致イベント間で他のイベントが起きるかもしれず、そして代わりの合致が各代わりの合致イベントのシーケンスについて存在するでしょう (非決定論的な緩い隣接):
|
notNext() |
新しい否定のパターンを追加します。破棄される部分一致について、合致 (否定)イベントは以前の合致イベント(厳密な隣接)を直接継承する必要があります:
|
notFollowedBy() |
新しい否定のパターンを追加します。他のイベントが合致(否定)イベントと以前の合致イベント(緩い隣接)間で発生する場合でも、部分一致するイベントのシーケンスが削除されるでしょう:
|
within(time) |
パターンに合致するイベントシーケンスの間隔の最大時間を定義します。完了していないイベントシーケンスがこの時間を継承する場合、それは削除されます:
|
指定されたパターンについて、同じイベントが複数の成功一致に割り当てられるかもしれません。To control to how many matches an event will be assigned, you need to specify the skip strategy called AfterMatchSkipStrategy
. 以下でリスト化されるように、4つの種類のスキップ戦略があります:
SKIP_TO_FIRST と SKIP_TO_LAST スキップ戦略を使う場合、有効な PatternName も指定されなければならないことに注意してください。
例えば、指定されたパターンa b{2}
とデータストリーム ab1, ab2, ab3, ab4, ab5, ab6
について、これら4つのスキップ戦略間の違いは、以下の通りです:
Skip Strategy | 結果 | 解説 |
---|---|---|
NO_SKIP |
ab1 ab2 ab3 ab2 ab3 ab4 ab3 ab4 ab5 ab4 ab5 ab6 |
合致 ab1 ab2 ab3 を見つけた後で、合致プロセスはどの結果も破棄しないでしょう。 |
SKIP_PAST_LAST_EVENT |
ab1 ab2 ab3 ab4 ab5 ab6 |
合致 ab1 ab2 ab3 を見つけた後で、合致プロセスは全ての開始された部分合致を破棄するでしょう。 |
SKIP_TO_FIRST[b ] |
ab1 ab2 ab3 ab2 ab3 ab4 ab3 ab4 ab5 ab4 ab5 ab6 |
合致 ab1 ab2 ab3 を見つけた後で、合致プロセスはab1 を含む全ての部分一致を破棄するでしょう。これは最初の b の前にくる唯一のイベントです。 |
SKIP_TO_LAST[b ] |
ab1 ab2 ab3 ab3 ab4 ab5 |
合致 ab1 ab2 ab3 を見つけた後で、合致プロセスは ab1 と ab2 を含む全ての部分合致を破棄するでしょう。これらは最後の b の前に来るイベントです。 |
どのスキップ戦略を使用するかを指定するには、単に以下のものを呼び出すことでAfterMatchSkipStrategy
を作成します:
Function | 解説 |
---|---|
AfterMatchSkipStrategy.noSkip() |
NO_SKIP スキップ戦略を生成します |
AfterMatchSkipStrategy.skipPastLastEvent() |
SKIP_PAST_LAST_EVENT スキップ戦略を生成します |
AfterMatchSkipStrategy.skipToFirst(patternName) |
参照されるパターン名 patternName を持つSKIP_TO_FIRST スキップ戦略を生成します |
AfterMatchSkipStrategy.skipToLast(patternName) |
参照されるパターン名patternNameを持つ SKIP_TO_LAST スキップ戦略を生成します |
そして、以下を呼び出すことでスキップ戦略をパターンに適用します:
AfterMatchSkipStrategy skipStrategy = ...
Pattern.begin("patternName", skipStrategy);
val skipStrategy = ...
Pattern.begin("patternName", skipStrategy)
探しているパターンのシーケンスを指定した後は、潜在的な合致を検知するためにそれを入力ストリームに適用する番です。パターンシーケンスに対してイベントのストリームを実行するには、PatternStream
を作成する必要があります。Given an input stream input
, a pattern pattern
and an optional comparator comparator
used to sort events with the same timestamp in case of EventTime or that arrived at the same moment, you create the PatternStream
by calling:
DataStream<Event> input = ...
Pattern<Event, ?> pattern = ...
EventComparator<Event> comparator = ... // optional
PatternStream<Event> patternStream = CEP.pattern(input, pattern, comparator);
val input : DataStream[Event] = ...
val pattern : Pattern[Event, _] = ...
var comparator : EventComparator[Event] = ... // optional
val patternStream: PatternStream[Event] = CEP.pattern(input, pattern, comparator)
入力ストリームはユースケースに応じてkeyed あるいは non-keyed されているかもしれません。
注意 パターンを non-keyed ストリームに適用することで並行度が1のジョブという結果になるでしょう。
一旦PatternStream
を取得すると、select
あるいは flatSelect
メソッドを使って検知したイベントシーケンスから選択することができます。
select()
メソッドは PatternSelectFunction
実装を必要とします。PatternSelectFunction
は各合致イベントシーケンスごとに呼ばれる select
メソッドを持ちます。それはMap<String, List<IN>>
の形で合致を受け取ります。キーはパターンシーケンス内の各パターンの名前で、値はそのパターンについて受け付けられた全てのイベントのリストです (IN
は入力エレメントの型です)。指定されたパターンについてのイベントはタイムスタンプで並べられます。各パターンについて受け付けられたイベントのリストを返す理由は、ループパターン(例えば oneToMany()
と times()
) を使う時に1つ以上のイベントが指定されたパターンについて受け付けられるかもしれないからです。選択関数は確実に1つの結果を返します。
class MyPatternSelectFunction<IN, OUT> implements PatternSelectFunction<IN, OUT> {
@Override
public OUT select(Map<String, List<IN>> pattern) {
IN startEvent = pattern.get("start").get(0);
IN endEvent = pattern.get("end").get(0);
return new OUT(startEvent, endEvent);
}
}
PatternFlatSelectFunction
は PatternSelectFunction
に似ています。唯一の違いはそれが任意の数の結果を返すことができることです。これを行うために、select
メソッドは出力要素をダウンストリームに転送するために使われる追加の Collector
パラメータを持ちます。
class MyPatternFlatSelectFunction<IN, OUT> implements PatternFlatSelectFunction<IN, OUT> {
@Override
public void flatSelect(Map<String, List<IN>> pattern, Collector<OUT> collector) {
IN startEvent = pattern.get("start").get(0);
IN endEvent = pattern.get("end").get(0);
for (int i = 0; i < startEvent.getValue(); i++ ) {
collector.collect(new OUT(startEvent, endEvent));
}
}
}
select()
メソッドは引数として選択関数を取ります。これは各合致イベントシーケンスごとに呼ばれます。それMap[String, Iterable[IN]]
の形で合致を受け取ります。キーはパターンシーケンス内の各パターンの名前で、値はそのパターンについて受け付けられた全てのイベントのIterableです (IN
は入力エレメントの型です)。
指定されたパターンについてのイベントはタイムスタンプで並べられます。各パターンについて受け付けられたイベントのiterableを返す理由は、ループパターン(例えば oneToMany()</c0 と
times()
) を使う時に1つ以上のイベントが指定されたパターンについて受け付けられるかもしれないからです。選択関数は呼び出しごとに確実に1つの結果を返します。
def selectFn(pattern : Map[String, Iterable[IN]]): OUT = {
val startEvent = pattern.get("start").get.next
val endEvent = pattern.get("end").get.next
OUT(startEvent, endEvent)
}
flatSelect
メソッドは select
メソッドに似ています。それらの唯一の違いは、flatSelect
メソッドに渡される関数は呼び出しごとに任意の数の結果を返すことができるということです。これを行うために、flatSelect
のための関数は出力要素をダウンストリームに転送するために使われる追加のCollector
パラメータを持ちます。
def flatSelectFn(pattern : Map[String, Iterable[IN]], collector : Collector[OUT]) = {
val startEvent = pattern.get("start").get.next
val endEvent = pattern.get("end").get.next
for (i <- 0 to startEvent.getValue) {
collector.collect(OUT(startEvent, endEvent))
}
}
Whenever a pattern has a window length attached via the within
keyword, it is possible that partial event sequences are discarded because they exceed the window length. これらのタイムアウトした部分合致に対応するために、select
とflatSelect
API 呼び出しによってタイムアウトハンドラを指定することができます。このタイムアウト ハンドラは各タイムアウト部分イベントシーケンスを必要とします。タイムアウトハンドラはパターンによって今まで合致した全てのイベントと、タイムアウトが検知された時のタイムスタンプを受け取ります。
部分パターンに対応するために、select
と flatSelect
API 呼び出しはパラメータとして取る上書きされたバージョンを提供します。
PatternTimeoutFunction
/PatternFlatTimeoutFunction
PatternSelectFunction
/PatternFlatSelectFunction
.PatternStream<Event> patternStream = CEP.pattern(input, pattern);
OutputTag<String> outputTag = new OutputTag<String>("side-output"){};
SingleOutputStreamOperator<ComplexEvent> result = patternStream.select(
new PatternTimeoutFunction<Event, TimeoutEvent>() {...},
outputTag,
new PatternSelectFunction<Event, ComplexEvent>() {...}
);
DataStream<TimeoutEvent> timeoutResult = result.getSideOutput(outputTag);
SingleOutputStreamOperator<ComplexEvent> flatResult = patternStream.flatSelect(
new PatternFlatTimeoutFunction<Event, TimeoutEvent>() {...},
outputTag,
new PatternFlatSelectFunction<Event, ComplexEvent>() {...}
);
DataStream<TimeoutEvent> timeoutFlatResult = flatResult.getSideOutput(outputTag);
val patternStream: PatternStream[Event] = CEP.pattern(input, pattern)
val outputTag = OutputTag[String]("side-output")
val result: SingleOutputStreamOperator[ComplexEvent] = patternStream.select(outputTag){
(pattern: Map[String, Iterable[Event]], timestamp: Long) => TimeoutEvent()
} {
pattern: Map[String, Iterable[Event]] => ComplexEvent()
}
val timeoutResult: DataStream<TimeoutEvent> = result.getSideOutput(outputTag);
flatSelect
API 呼び出しは、1つの目のパラメータとしてタイムアウト関数を、2つ目のパラメータとして選択関数を取る、同じ上書きされたバージョンを呼び出します。select
関数と対照的に、flatSelect
関数はCollector
を使って呼ばれます。任意の数のイベントを発行するためにcollectorを使うことができます。
val patternStream: PatternStream[Event] = CEP.pattern(input, pattern)
val outputTag = OutputTag[String]("side-output")
val result: SingleOutputStreamOperator[ComplexEvent] = patternStream.flatSelect(outputTag){
(pattern: Map[String, Iterable[Event]], timestamp: Long, out: Collector[TimeoutEvent]) =>
out.collect(TimeoutEvent())
} {
(pattern: mutable.Map[String, Iterable[Event]], out: Collector[ComplexEvent]) =>
out.collect(ComplexEvent())
}
val timeoutResult: DataStream<TimeoutEvent> = result.getSideOutput(outputTag);
CEP
では、要素が処理される順番は問題ではありません。To guarantee that elements are processed in the correct order when working in event time, an incoming element is initially put in a buffer where elements are sorted in ascending order based on their timestamp, and when a watermark arrives, all the elements in this buffer with timestamps smaller than that of the watermark are processed. これはウォーターマーク間の要素はイベント時間の順番で処理されることを意味します。
注意 イベント時間で動作する時は、ライブラリはウォーターマークの正確さを仮定します。
To guarantee that elements across watermarks are processed in event-time order, Flink’s CEP library assumes correctness of the watermark, and considers as late elements whose timestamp is smaller than that of the last seen watermark. 遅れてきた要素はもう処理されません。
以下の例はEvents
のkeyedデータストリーム上でパターン start, middle(name = "error") -> end(name = "critical")
を検知します。イベントはそれらのid
によってキー付けされ、有効なパターンは10秒以内に起きる必要があります。処理全体はイベント時間を使って行われます。
StreamExecutionEnvironment env = ...
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<Event> input = ...
DataStream<Event> partitionedInput = input.keyBy(new KeySelector<Event, Integer>() {
@Override
public Integer getKey(Event value) throws Exception {
return value.getId();
}
});
Pattern<Event, ?> pattern = Pattern.<Event>begin("start")
.next("middle").where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("error");
}
}).followedBy("end").where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("critical");
}
}).within(Time.seconds(10));
PatternStream<Event> patternStream = CEP.pattern(partitionedInput, pattern);
DataStream<Alert> alerts = patternStream.select(new PatternSelectFunction<Event, Alert>() {
@Override
public Alert select(Map<String, List<Event>> pattern) throws Exception {
return createAlert(pattern);
}
});
val env : StreamExecutionEnvironment = ...
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val input : DataStream[Event] = ...
val partitionedInput = input.keyBy(event => event.getId)
val pattern = Pattern.begin("start")
.next("middle").where(_.getName == "error")
.followedBy("end").where(_.getName == "critical")
.within(Time.seconds(10))
val patternStream = CEP.pattern(partitionedInput, pattern)
val alerts = patternStream.select(createAlert(_)))
Flink-1.3 のCEPライブラリはAPIのいくつかの変更に繋がる多くの新しい機能と一緒に出荷されます。ここでは、Flink-1.3を使って古いCEPジョブを実行できるように、それらに行わなければならない変更を説明します。これらの変更を行い、ジョブを再コンパイルした後で、ジョブの古いバージョンを使って取ったセーブポイントから実行を再開することができるでしょう。つまり 過去のデータを再処理する必要がありません。
必要な変更は以下の通りです:
FilterFunction
インタフェースの実装の代わりにSimpleCondition
クラスを継承するために、条件(where(...)
句内のもの)を変更します。
Change your functions provided as arguments to the select(...)
and flatSelect(...)
methods to expect a list of
events associated with each pattern (List
in Java
, Iterable
in Scala
). This is because with the addition of
the looping patterns, multiple input events can match a single (looping) pattern.
Flink 1.1 および 1.2 でのfollowedBy()
は 非決定論的な緩い隣接
(ここを見てください)を意味します。Flink 1.3ではこれは変更され、followedBy()
は緩い隣接
を意味し、一方で非決定論的な緩い隣接
が必要な場合はfollowedByAny()
が使われる必要があります。