FlinkCEP - Flinkのための複雑なイベント処理

FlinkCEP はFlinkのための複雑なイベント処理のライブラリです。絶え間のないデータのストリーム内で複雑なイベントのパターンを容易に検知することができます。複雑なイベントはマッチング シーケンスから組み立てることができます。これにより、素早くデータの中で何が本当に重要なのかを把握する機会を与えます。

注意 イベントの比較とマッチングに使われれるため、パターンマッチングを適用したいDataStream内のイベントは適切なequals()hashCode() メソッドを実装する必要があります。

開始

飛び込むためには、Flinkプログラムのセットアップをする必要があります。次に、FlinkCEP依存性をプロジェクトのpom.xmlに追加する必要があります。

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-cep_2.10</artifactId>
  <version>1.3-SNAPSHOT</version>
</dependency>
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-cep-scala_2.10</artifactId>
  <version>1.3-SNAPSHOT</version>
</dependency>

FlinkCEPは現在のところバイナリ配布の一部ではないことに注意してください。クラスタ実行のためにそれをリンクするには ここを見てください。

これで、パターンAPIを使って初めてのCEPプログラムを書き始めることができます。

DataStream<Event> input = ...

Pattern<Event, ?> pattern = Pattern.begin("start").where(evt -> evt.getId() == 42)
    .next("middle").subtype(SubEvent.class).where(subEvt -> subEvt.getVolume() >= 10.0)
    .followedBy("end").where(evt -> evt.getName().equals("end"));

PatternStream<Event> patternStream = CEP.pattern(input, pattern);

DataStream<Alert> result = patternStream.select(pattern -> {
    return createAlertFrom(pattern);
});
val input: DataStream[Event] = ...

val pattern = Pattern.begin("start").where(_.getId == 42)
  .next("middle").subtype(classOf[SubEvent]).where(_.getVolume >= 10.0)
  .followedBy("end").where(_.getName == "end")

val patternStream = CEP.pattern(input, pattern)

val result: DataStream[Alert] = patternStream.select(createAlert(_))

Note that we use Java 8 lambdas in our Java code examples to make them more succinct.

パターンAPI

パターンAPIにより複雑なイベントパターンを素早く定義することができます。

各パターンは多段のステージあるいは状態と呼ぶものから成ります。一つの状態から次に行くために、ユーザは条件を指定することができます。これらの条件はイベントの隣接あるいはイベントのフィルタ条件がありえます。

各パターンは初期状態から始まる必要があります:

Pattern<Event, ?> start = Pattern.<Event>begin("start");
val start : Pattern[Event, _] = Pattern.begin("start")

各状態は後で合致したイベントを識別するためにユニークな名前を持たなければなりません。更に、where メソッドを使ってスタートイベントとして受け付けられるイベントのためのフィルタ条件を指定することができます。

start.where(new FilterFunction<Event>() {
    @Override
    public boolean filter(Event value) {
        return ... // some condition
    }
});
start.where(event => ... /* some condition */)

受け付けられるイベントの型をsubtypeメソッドを使って初期イベントタイプ(Event) のなんらかの部分型に制限することもできます

start.subtype(SubEvent.class).where(new FilterFunction<SubEvent>() {
    @Override
    public boolean filter(SubEvent value) {
        return ... // some condition
    }
});
start.subtype(classOf[SubEvent]).where(subEvent => ... /* some condition */)

ここで分かるように、subtypeの条件はsubtypeに追加のフィルター条件を組み合わせることもできます。In fact you can always provide multiple conditions by calling where and subtype multiple times. These conditions will then be combined using the logical AND operator.

In order to construct or conditions, one has to call the or method with a respective filter function. Any existing filter function is then ORed with the given one.

pattern.where(new FilterFunction<Event>() {
    @Override
    public boolean filter(Event value) {
        return ... // some condition
    }
}).or(new FilterFunction<Event>() {
    @Override
    public boolean filter(Event value) {
        return ... // or condition
    }
});
pattern.where(event => ... /* some condition */).or(event => ... /* or condition */)

Next, we can append further states to detect complex patterns. We can control the contiguity of two succeeding events to be accepted by the pattern.

Strict contiguity means that two matching events have to succeed directly. This means that no other events can occur in between. A strict contiguity pattern state can be created via the next method.

Pattern<Event, ?> strictNext = start.next("middle");
val strictNext: Pattern[Event, _] = start.next("middle")

Non-strict contiguity means that other events are allowed to occur in-between two matching events. A non-strict contiguity pattern state can be created via the followedBy method.

Pattern<Event, ?> nonStrictNext = start.followedBy("middle");
val nonStrictNext : Pattern[Event, _] = start.followedBy("middle")

It is also possible to define a temporal constraint for the pattern to be valid. For example, one can define that a pattern should occur within 10 seconds via the within method.

next.within(Time.seconds(10));
next.within(Time.seconds(10))


Pattern Operation 解説
Begin

Defines a starting pattern state:

Pattern<Event, ?> start = Pattern.<Event>begin("start");
Next

Appends a new pattern state. A matching event has to directly succeed the previous matching event:

Pattern<Event, ?> next = start.next("next");
FollowedBy

Appends a new pattern state. Other events can occur between a matching event and the previous matching event:

Pattern<Event, ?> followedBy = start.followedBy("next");
Where

Defines a filter condition for the current pattern state. Only if an event passes the filter, it can match the state:

patternState.where(new FilterFunction<Event>() {
    @Override
    public boolean filter(Event value) throws Exception {
        return ... // some condition
    }
});
Or

Adds a new filter condition which is ORed with an existing filter condition. Only if an event passes the filter condition, it can match the state:

patternState.where(new FilterFunction<Event>() {
    @Override
    public boolean filter(Event value) throws Exception {
        return ... // some condition
    }
}).or(new FilterFunction<Event>() {
    @Override
    public boolean filter(Event value) throws Exception {
        return ... // alternative condition
    }
});
Subtype

Defines a subtype condition for the current pattern state. Only if an event is of this subtype, it can match the state:

patternState.subtype(SubEvent.class);
Within

Defines the maximum time interval for an event sequence to match the pattern. If a non-completed event sequence exceeds this time, it is discarded:

patternState.within(Time.seconds(10));
Pattern Operation 解説
Begin

Defines a starting pattern state:

val start = Pattern.begin[Event]("start")
Next

Appends a new pattern state. A matching event has to directly succeed the previous matching event:

val next = start.next("middle")
FollowedBy

Appends a new pattern state. Other events can occur between a matching event and the previous matching event:

val followedBy = start.followedBy("middle")
Where

Defines a filter condition for the current pattern state. Only if an event passes the filter, it can match the state:

patternState.where(event => ... /* some condition */)
Or

Adds a new filter condition which is ORed with an existing filter condition. Only if an event passes the filter condition, it can match the state:

patternState.where(event => ... /* some condition */)
    .or(event => ... /* alternative condition */)
Subtype

Defines a subtype condition for the current pattern state. Only if an event is of this subtype, it can match the state:

patternState.subtype(classOf[SubEvent])
Within

Defines the maximum time interval for an event sequence to match the pattern. If a non-completed event sequence exceeds this time, it is discarded:

patternState.within(Time.seconds(10))

デタッチング パターン

In order to run a stream of events against your pattern, you have to create a PatternStream. Given an input stream input and a pattern pattern, you create the PatternStream by calling

DataStream<Event> input = ...
Pattern<Event, ?> pattern = ...

PatternStream<Event> patternStream = CEP.pattern(input, pattern);
val input : DataStream[Event] = ...
val pattern : Pattern[Event, _] = ...

val patternStream: PatternStream[Event] = CEP.pattern(input, pattern)

パターンからの選択

Once you have obtained a PatternStream you can select from detected event sequences via the select or flatSelect methods.

The select method requires a PatternSelectFunction implementation. A PatternSelectFunction has a select method which is called for each matching event sequence. It receives a map of string/event pairs of the matched events. The string is defined by the name of the state to which the event has been matched. The select method can return exactly one result.

class MyPatternSelectFunction<IN, OUT> implements PatternSelectFunction<IN, OUT> {
    @Override
    public OUT select(Map<String, IN> pattern) {
        IN startEvent = pattern.get("start");
        IN endEvent = pattern.get("end");
        return new OUT(startEvent, endEvent);
    }
}

A PatternFlatSelectFunction is similar to the PatternSelectFunction, with the only distinction that it can return an arbitrary number of results. In order to do this, the select method has an additional Collector parameter which is used for the element output.

class MyPatternFlatSelectFunction<IN, OUT> implements PatternFlatSelectFunction<IN, OUT> {
    @Override
    public void select(Map<String, IN> pattern, Collector<OUT> collector) {
        IN startEvent = pattern.get("start");
        IN endEvent = pattern.get("end");

        for (int i = 0; i < startEvent.getValue(); i++ ) {
            collector.collect(new OUT(startEvent, endEvent));
        }
    }
}

The select method takes a selection function as argument, which is called for each matching event sequence. It receives a map of string/event pairs of the matched events. The string is defined by the name of the state to which the event has been matched. The selection function returns exactly one result per call.

def selectFn(pattern : mutable.Map[String, IN]): OUT = {
    val startEvent = pattern.get("start").get
    val endEvent = pattern.get("end").get
    OUT(startEvent, endEvent)
}

The flatSelect method is similar to the select method. Their only difference is that the function passed to the flatSelect method can return an arbitrary number of results per call. In order to do this, the function for flatSelect has an additional Collector parameter which is used for the element output.

def flatSelectFn(pattern : mutable.Map[String, IN], collector : Collector[OUT]) = {
    val startEvent = pattern.get("start").get
    val endEvent = pattern.get("end").get
    for (i <- 0 to startEvent.getValue) {
        collector.collect(OUT(startEvent, endEvent))
    }
}

部分的なパターンのタイムアウトの処理

Whenever a pattern has a window length associated via the within keyword, it is possible that partial event patterns will be discarded because they exceed the window length. In order to react to these timeout events the select and flatSelect API calls allow a timeout handler to be specified. This timeout handler is called for each partial event pattern which has timed out. The timeout handler receives all the events that have been matched so far by the pattern, and the timestamp when the timeout was detected.

In order to treat partial patterns, the select and flatSelect API calls offer an overloaded version which takes as the first parameter a PatternTimeoutFunction/PatternFlatTimeoutFunction and as second parameter the known PatternSelectFunction/PatternFlatSelectFunction. The return type of the timeout function can be different from the select function. The timeout event and the select event are wrapped in Either.Left and Either.Right respectively so that the resulting data stream is of type org.apache.flink.types.Either.

PatternStream<Event> patternStream = CEP.pattern(input, pattern);

DataStream<Either<TimeoutEvent, ComplexEvent>> result = patternStream.select(
    new PatternTimeoutFunction<Event, TimeoutEvent>() {...},
    new PatternSelectFunction<Event, ComplexEvent>() {...}
);

DataStream<Either<TimeoutEvent, ComplexEvent>> flatResult = patternStream.flatSelect(
    new PatternFlatTimeoutFunction<Event, TimeoutEvent>() {...},
    new PatternFlatSelectFunction<Event, ComplexEvent>() {...}
);

In order to treat partial patterns, the select API call offers an overloaded version which takes as the first parameter a timeout function and as second parameter a selection function. The timeout function is called with a map of string-event pairs of the partial match which has timed out and a long indicating when the timeout occurred. The string is defined by the name of the state to which the event has been matched. The timeout function returns exactly one result per call. The return type of the timeout function can be different from the select function. The timeout event and the select event are wrapped in Left and Right respectively so that the resulting data stream is of type Either.

val patternStream: PatternStream[Event] = CEP.pattern(input, pattern)

DataStream[Either[TimeoutEvent, ComplexEvent]] result = patternStream.select{
    (pattern: mutable.Map[String, Event], timestamp: Long) => TimeoutEvent()
} {
    pattern: mutable.Map[String, Event] => ComplexEvent()
}

The flatSelect API call offers the same overloaded version which takes as the first parameter a timeout function and as second parameter a selection function. In contrast to the select functions, the flatSelect functions are called with an Collector. The collector can be used to emit an arbitrary number of events.

val patternStream: PatternStream[Event] = CEP.pattern(input, pattern)

DataStream[Either[TimeoutEvent, ComplexEvent]] result = patternStream.flatSelect{
    (pattern: mutable.Map[String, Event], timestamp: Long, out: Collector[TimeoutEvent]) =>
        out.collect(TimeoutEvent())
} {
    (pattern: mutable.Map[String, Event], out: Collector[ComplexEvent]) =>
        out.collect(ComplexEvent())
}

The following example detects the pattern start, middle(name = "error") -> end(name = "critical") on a keyed data stream of Events. The events are keyed by their ids and a valid pattern has to occur within 10 seconds. The whole processing is done with event time.

StreamExecutionEnvironment env = ...
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

DataStream<Event> input = ...

DataStream<Event> partitionedInput = input.keyBy(new KeySelector<Event, Integer>() {
	@Override
	public Integer getKey(Event value) throws Exception {
		return value.getId();
	}
});

Pattern<Event, ?> pattern = Pattern.<Event>begin("start")
	.next("middle").where(new FilterFunction<Event>() {
		@Override
		public boolean filter(Event value) throws Exception {
			return value.getName().equals("error");
		}
	}).followedBy("end").where(new FilterFunction<Event>() {
		@Override
		public boolean filter(Event value) throws Exception {
			return value.getName().equals("critical");
		}
	}).within(Time.seconds(10));

PatternStream<Event> patternStream = CEP.pattern(partitionedInput, pattern);

DataStream<Alert> alerts = patternStream.select(new PatternSelectFunction<Event, Alert>() {
	@Override
	public Alert select(Map<String, Event> pattern) throws Exception {
		return createAlert(pattern);
	}
});
val env : StreamExecutionEnvironment = ...
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

val input : DataStream[Event] = ...

val partitionedInput = input.keyBy(event => event.getId)

val pattern = Pattern.begin("start")
  .next("middle").where(_.getName == "error")
  .followedBy("end").where(_.getName == "critical")
  .within(Time.seconds(10))

val patternStream = CEP.pattern(partitionedInput, pattern)

val alerts = patternStream.select(createAlert(_)))
TOP
inserted by FC2 system