FlinkCEP is the Complex Event Processing (CEP) library implemented on top of Flink. It allows you to detect event patterns in an endless stream of events, giving you the opportunity to get hold of what’s important in your data.
This page describes the API calls available in Flink CEP. We start by presenting the Pattern API, which allows you to specify the patterns that you want to detect in your stream, before presenting how you can detect and act upon matching event sequences. We then present the assumptions the CEP library makes when dealing with lateness in event time and how you can migrate your job from an older Flink version to Flink-1.13.
If you want to jump right in, set up a Flink program and add the FlinkCEP dependency to the pom.xml of your project.
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-cep</artifactId> <version>2.0-SNAPSHOT</version> </dependency>
Copied to clipboard!
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-cep-scala_2.12</artifactId> <version>2.0-SNAPSHOT</version> </dependency>
Copied to clipboard!
FlinkCEP is not part of the binary distribution. See how to link with it for cluster execution here.
Now you can start writing your first CEP program using the Pattern API.
The events in the DataStream to which you want to apply pattern matching must implement proper equals() and hashCode() methods because FlinkCEP uses them for comparing and matching events.
DataStreamEvent> input = . ; PatternEvent, ?> pattern = Pattern.Event>begin("start") .where(SimpleCondition.of(event -> event.getId() == 42)) .next("middle") .subtype(SubEvent.class) .where(SimpleCondition.of(subEvent -> subEvent.getVolume() >= 10.0)) .followedBy("end") .where(SimpleCondition.of(event -> event.getName().equals("end"))); PatternStreamEvent> patternStream = CEP.pattern(input, pattern); DataStreamAlert> result = patternStream.process( new PatternProcessFunctionEvent, Alert>() @Override public void processMatch( MapString, ListEvent>> pattern, Context ctx, CollectorAlert> out) throws Exception out.collect(createAlertFrom(pattern)); > >);
val input: DataStream[Event] = . val pattern = Pattern.begin[Event]("start").where(_.getId == 42) .next("middle").subtype(classOf[SubEvent]).where(_.getVolume >= 10.0) .followedBy("end").where(_.getName == "end") val patternStream = CEP.pattern(input, pattern) val result: DataStream[Alert] = patternStream.process( new PatternProcessFunction[Event, Alert]() override def processMatch( `match`: util.Map[String, util.List[Event]], ctx: PatternProcessFunction.Context, out: Collector[Alert]): Unit = out.collect(createAlertFrom(pattern)) > >)
The pattern API allows you to define complex pattern sequences that you want to extract from your input stream.
Each complex pattern sequence consists of multiple simple patterns, i.e. patterns looking for individual events with the same properties. From now on, we will call these simple patterns patterns, and the final complex pattern sequence we are searching for in the stream, the pattern sequence. You can see a pattern sequence as a graph of such patterns, where transitions from one pattern to the next occur based on user-specified conditions, e.g. event.getName().equals("end") . A match is a sequence of input events which visits all patterns of the complex pattern graph, through a sequence of valid pattern transitions.
Each pattern must have a unique name, which you use later to identify the matched events.
Pattern names CANNOT contain the character ":" .
In the rest of this section we will first describe how to define Individual Patterns, and then how you can combine individual patterns into Complex Patterns.
A Pattern can be either a singleton or a looping pattern. Singleton patterns accept a single event, while looping patterns can accept more than one. In pattern matching symbols, the pattern "a b+ c? d" (or "a" , followed by one or more "b" ’s, optionally followed by a "c" , followed by a "d" ), a , c? , and d are singleton patterns, while b+ is a looping one. By default, a pattern is a singleton pattern and you can transform it to a looping one by using Quantifiers. Each pattern can have one or more Conditions based on which it accepts events.
In FlinkCEP, you can specify looping patterns using these methods: pattern.oneOrMore() , for patterns that expect one or more occurrences of a given event (e.g. the b+ mentioned before); and pattern.times(#ofTimes) , for patterns that expect a specific number of occurrences of a given type of event, e.g. 4 a ’s; and pattern.times(#fromTimes, #toTimes) , for patterns that expect a specific minimum number of occurrences and a maximum number of occurrences of a given type of event, e.g. 2-4 a s.
You can make looping patterns greedy using the pattern.greedy() method, but you cannot yet make group patterns greedy. You can make all patterns, looping or not, optional using the pattern.optional() method.
For a pattern named start , the following are valid quantifiers:
// expecting 4 occurrences start.times(4); // expecting 0 or 4 occurrences start.times(4).optional(); // expecting 2, 3 or 4 occurrences start.times(2, 4); // expecting 2, 3 or 4 occurrences and repeating as many as possible start.times(2, 4).greedy(); // expecting 0, 2, 3 or 4 occurrences start.times(2, 4).optional(); // expecting 0, 2, 3 or 4 occurrences and repeating as many as possible start.times(2, 4).optional().greedy(); // expecting 1 or more occurrences start.oneOrMore(); // expecting 1 or more occurrences and repeating as many as possible start.oneOrMore().greedy(); // expecting 0 or more occurrences start.oneOrMore().optional(); // expecting 0 or more occurrences and repeating as many as possible start.oneOrMore().optional().greedy(); // expecting 2 or more occurrences start.timesOrMore(2); // expecting 2 or more occurrences and repeating as many as possible start.timesOrMore(2).greedy(); // expecting 0, 2 or more occurrences start.timesOrMore(2).optional() // expecting 0, 2 or more occurrences and repeating as many as possible start.timesOrMore(2).optional().greedy();
// expecting 4 occurrences start.times(4) // expecting 0 or 4 occurrences start.times(4).optional() // expecting 2, 3 or 4 occurrences start.times(2, 4) // expecting 2, 3 or 4 occurrences and repeating as many as possible start.times(2, 4).greedy() // expecting 0, 2, 3 or 4 occurrences start.times(2, 4).optional() // expecting 0, 2, 3 or 4 occurrences and repeating as many as possible start.times(2, 4).optional().greedy() // expecting 1 or more occurrences start.oneOrMore() // expecting 1 or more occurrences and repeating as many as possible start.oneOrMore().greedy() // expecting 0 or more occurrences start.oneOrMore().optional() // expecting 0 or more occurrences and repeating as many as possible start.oneOrMore().optional().greedy() // expecting 2 or more occurrences start.timesOrMore(2) // expecting 2 or more occurrences and repeating as many as possible start.timesOrMore(2).greedy() // expecting 0, 2 or more occurrences start.timesOrMore(2).optional() // expecting 0, 2 or more occurrences and repeating as many as possible start.timesOrMore(2).optional().greedy()
For every pattern you can specify a condition that an incoming event has to meet in order to be “accepted” into the pattern e.g. its value should be larger than 5, or larger than the average value of the previously accepted events. You can specify conditions on the event properties via the pattern.where() , pattern.or() or pattern.until() methods. These can be either IterativeCondition s or SimpleCondition s.
Iterative Conditions: This is the most general type of condition. This is how you can specify a condition that accepts subsequent events based on properties of the previously accepted events or a statistic over a subset of them.
Below is the code for an iterative condition that accepts the next event for a pattern named “middle” if its name starts with “foo”, and if the sum of the prices of the previously accepted events for that pattern plus the price of the current event do not exceed the value of 5.0. Iterative conditions can be powerful, especially in combination with looping patterns, e.g. oneOrMore() .
middle.oneOrMore() .subtype(SubEvent.class) .where(new IterativeConditionSubEvent>() @Override public boolean filter(SubEvent value, ContextSubEvent> ctx) throws Exception if (!value.getName().startsWith("foo")) return false; > double sum = value.getPrice(); for (Event event : ctx.getEventsForPattern("middle")) sum += event.getPrice(); > return Double.compare(sum, 5.0) 0; > >);
middle.oneOrMore() .subtype(classOf[SubEvent]) .where( (value, ctx) => lazy val sum = ctx.getEventsForPattern("middle").map(_.getPrice).sum value.getName.startsWith("foo") && sum + value.getPrice 5.0 > )
The call to ctx.getEventsForPattern(. ) finds all the previously accepted events for a given potential match. The cost of this operation can vary, so when implementing your condition, try to minimize its use.
Described context gives one access to event time characteristics as well. For more info see Time context.
Simple Conditions: This type of condition extends the aforementioned IterativeCondition class and decides whether to accept an event or not, based only on properties of the event itself.
start.where(SimpleCondition.of(value -> value.getName().startsWith("foo")));
start.where(event => event.getName.startsWith("foo"))
Finally, you can also restrict the type of the accepted event to a subtype of the initial event type (here Event ) via the pattern.subtype(subClass) method.
start.subtype(SubEvent.class) .where(SimpleCondition.of(value -> . /*some condition*/));
start.subtype(classOf[SubEvent]).where(subEvent => . /* some condition */)
Combining Conditions: As shown above, you can combine the subtype condition with additional conditions. This holds for every condition. You can arbitrarily combine conditions by sequentially calling where() . The final result will be the logical AND of the results of the individual conditions. To combine conditions using OR, you can use the or() method, as shown below.
pattern.where(SimpleCondition.of(value -> . /*some condition*/)) .or(SimpleCondition.of(value -> . /*some condition*/));
pattern.where(event => . /* some condition */).or(event => . /* or condition */)
Stop condition: In case of looping patterns ( oneOrMore() and oneOrMore().optional() ) you can also specify a stop condition, e.g. accept events with value larger than 5 until the sum of values is smaller than 50.
To better understand it, have a look at the following example. Given
As you can see or are not returned due to the stop condition.
Defines a condition for the current pattern. To match the pattern, an event must satisfy the condition. Multiple consecutive where() clauses lead to their conditions being AND ed.
pattern.where(new IterativeConditionEvent>() @Override public boolean filter(Event value, Context ctx) throws Exception return . ; // some condition > >);
pattern.where(event => . /* some condition */)
Adds a new condition which is OR ed with an existing one. An event can match the pattern only if it passes at least one of the conditions.
pattern.where(new IterativeConditionEvent>() @Override public boolean filter(Event value, Context ctx) throws Exception return . ; // some condition > >).or(new IterativeConditionEvent>() @Override public boolean filter(Event value, Context ctx) throws Exception return . ; // alternative condition > >);
pattern.where(event => . /* some condition */) .or(event => . /* alternative condition */)
Specifies a stop condition for a looping pattern. Meaning if event matching the given condition occurs, no more events will be accepted into the pattern. Applicable only in conjunction with oneOrMore() NOTE: It allows for cleaning state for corresponding pattern on event-based condition.
pattern.oneOrMore().until(new IterativeConditionEvent>() @Override public boolean filter(Event value, Context ctx) throws Exception return . ; // alternative condition > >);
pattern.oneOrMore().until(event => . /* some condition */)
Defines a subtype condition for the current pattern. An event can only match the pattern if it is of this subtype.
pattern.subtype(SubEvent.class);
pattern.subtype(classOf[SubEvent])
Specifies that this pattern expects at least one occurrence of a matching event. By default a relaxed internal contiguity (between subsequent events) is used. For more info on internal contiguity see consecutive. It is advised to use either until() or within() to enable state clearing.
pattern.oneOrMore();
pattern.oneOrMore()
Specifies that this pattern expects at least #times occurrences of a matching event. By default a relaxed internal contiguity (between subsequent events) is used. For more info on internal contiguity see consecutive.
pattern.timesOrMore(2);
Specifies that this pattern expects an exact number of occurrences of a matching event. By default a relaxed internal contiguity (between subsequent events) is used. For more info on internal contiguity see consecutive.
pattern.times(2);
pattern.times(2)
Specifies that this pattern expects occurrences between #fromTimes and #toTimes of a matching event. By default a relaxed internal contiguity (between subsequent events) is used. For more info on internal contiguity see consecutive.
pattern.times(2, 4);
pattern.times(2, 4)
Specifies that this pattern is optional, i.e. it may not occur at all. This is applicable to all aforementioned quantifiers.
pattern.oneOrMore().optional();
pattern.oneOrMore().optional()
Specifies that this pattern is greedy, i.e. it will repeat as many as possible. This is only applicable to quantifiers and it does not support group pattern currently.
pattern.oneOrMore().greedy();
pattern.oneOrMore().greedy()
Now that you’ve seen what an individual pattern can look like, it is time to see how to combine them into a full pattern sequence.
A pattern sequence has to start with an initial pattern, as shown below:
PatternEvent, ?> start = Pattern.Event>begin("start");
val start : Pattern[Event, _] = Pattern.begin("start")
Next, you can append more patterns to your pattern sequence by specifying the desired contiguity conditions between them. FlinkCEP supports the following forms of contiguity between events:
To apply them between consecutive patterns, you can use:
A pattern sequence cannot end with notFollowedBy() if the time interval is not defined via withIn() .
A NOT pattern cannot be preceded by an optional one.
// strict contiguity PatternEvent, ?> strict = start.next("middle").where(. ); // relaxed contiguity PatternEvent, ?> relaxed = start.followedBy("middle").where(. ); // non-deterministic relaxed contiguity PatternEvent, ?> nonDetermin = start.followedByAny("middle").where(. ); // NOT pattern with strict contiguity PatternEvent, ?> strictNot = start.notNext("not").where(. ); // NOT pattern with relaxed contiguity PatternEvent, ?> relaxedNot = start.notFollowedBy("not").where(. );
// strict contiguity val strict: Pattern[Event, _] = start.next("middle").where(. ) // relaxed contiguity val relaxed: Pattern[Event, _] = start.followedBy("middle").where(. ) // non-deterministic relaxed contiguity val nonDetermin: Pattern[Event, _] = start.followedByAny("middle").where(. ) // NOT pattern with strict contiguity val strictNot: Pattern[Event, _] = start.notNext("not").where(. ) // NOT pattern with relaxed contiguity val relaxedNot: Pattern[Event, _] = start.notFollowedBy("not").where(. )
Relaxed contiguity means that only the first succeeding matching event will be matched, while with non-deterministic relaxed contiguity, multiple matches will be emitted for the same beginning. As an example, a pattern "a b" , given the event sequence "a", "c", "b1", "b2" , will give the following results:
It’s also possible to define a temporal constraint for the pattern to be valid. For example, you can define that a pattern should occur within 10 seconds via the pattern.within() method. Temporal patterns are supported for both processing and event time.
A pattern sequence can only have one temporal constraint. If multiple such constraints are defined on different individual patterns, then the smallest is applied.
next.within(Time.seconds(10));
next.within(Time.seconds(10))
Notice that a pattern sequence can end with notFollowedBy() with temporal constraint E.g. a pattern like:
Pattern.Event>begin("start") .next("middle") .where(SimpleCondition.of(value -> value.getName().equals("a"))) .notFollowedBy("end") .where(SimpleCondition.of(value -> value.getName().equals("b"))) .within(Time.seconds(10));
Pattern.begin("start").where(_.getName().equals("a")) .notFollowedBy("end").where(_.getName == "b") .within(Time.seconds(10))
You can apply the same contiguity condition as discussed in the previous section within a looping pattern. The contiguity will be applied between elements accepted into such a pattern. To illustrate the above with an example, a pattern sequence "a b+ c" ( "a" followed by any(non-deterministic relaxed) sequence of one or more "b" ’s followed by a "c" ) with input "a", "b1", "d1", "b2", "d2", "b3" "c" will have the following results:
For looping patterns (e.g. oneOrMore() and times() ) the default is relaxed contiguity. If you want strict contiguity, you have to explicitly specify it by using the consecutive() call, and if you want non-deterministic relaxed contiguity you can use the allowCombinations() call.
Works in conjunction with oneOrMore() and times() and imposes strict contiguity between the matching events, i.e. any non-matching element breaks the match (as in next() ). If not applied a relaxed contiguity (as in followedBy() ) is used.
E.g. a pattern like:
Pattern.Event>begin("start") .where(SimpleCondition.of(value -> value.getName().equals("c"))) .followedBy("middle") .where(SimpleCondition.of(value -> value.getName().equals("a"))) .oneOrMore() .consecutive() .followedBy("end1") .where(SimpleCondition.of(value -> value.getName().equals("b")));
Pattern.begin("start").where(_.getName().equals("c")) .followedBy("middle").where(_.getName().equals("a")) .oneOrMore().consecutive() .followedBy("end1").where(_.getName().equals("b"))
Will generate the following matches for an input sequence: C D A1 A2 A3 D A4 B with consecutive applied: , , without consecutive applied: , , , .
Works in conjunction with oneOrMore() and times() and imposes non-deterministic relaxed contiguity between the matching events (as in followedByAny() ). If not applied a relaxed contiguity (as in followedBy() ) is used.
E.g. a pattern like:
Pattern.Event>begin("start") .where(SimpleCondition.of(value -> value.getName().equals("c"))) .followedBy("middle") .where(SimpleCondition.of(value -> value.getName().equals("a"))) .oneOrMore() .allowCombinations() .followedBy("end1") .where(SimpleCondition.of(value -> value.getName().equals("b")));
Pattern.begin("start").where(_.getName().equals("c")) .followedBy("middle").where(_.getName().equals("a")) .oneOrMore().allowCombinations() .followedBy("end1").where(_.getName().equals("b"))
Will generate the following matches for an input sequence: C D A1 A2 A3 D A4 B . with combinations enabled: , , , , , , , without combinations enabled: , , , .
It’s also possible to define a pattern sequence as the condition for begin , followedBy , followedByAny and next . The pattern sequence will be considered as the matching condition logically and a GroupPattern will be returned and it is possible to apply oneOrMore() , times(#ofTimes) , times(#fromTimes, #toTimes) , optional() , consecutive() , allowCombinations() to the GroupPattern .
PatternEvent, ?> start = Pattern.begin( Pattern.Event>begin("start").where(. ).followedBy("start_middle").where(. ) ); // strict contiguity PatternEvent, ?> strict = start.next( Pattern.Event>begin("next_start").where(. ).followedBy("next_middle").where(. ) ).times(3); // relaxed contiguity PatternEvent, ?> relaxed = start.followedBy( Pattern.Event>begin("followedby_start").where(. ).followedBy("followedby_middle").where(. ) ).oneOrMore(); // non-deterministic relaxed contiguity PatternEvent, ?> nonDetermin = start.followedByAny( Pattern.Event>begin("followedbyany_start").where(. ).followedBy("followedbyany_middle").where(. ) ).optional();
val start: Pattern[Event, _] = Pattern.begin( Pattern.begin[Event]("start").where(. ).followedBy("start_middle").where(. ) ) // strict contiguity val strict: Pattern[Event, _] = start.next( Pattern.begin[Event]("next_start").where(. ).followedBy("next_middle").where(. ) ).times(3) // relaxed contiguity val relaxed: Pattern[Event, _] = start.followedBy( Pattern.begin[Event]("followedby_start").where(. ).followedBy("followedby_middle").where(. ) ).oneOrMore() // non-deterministic relaxed contiguity val nonDetermin: Pattern[Event, _] = start.followedByAny( Pattern.begin[Event]("followedbyany_start").where(. ).followedBy("followedbyany_middle").where(. ) ).optional()
Defines a starting pattern.
PatternEvent, ?> start = Pattern.Event>begin("start");