Flink系列02: FlinkCEP从源码开始学习(个体模式与模式组)

首先,我们可以设置最简单的条件,比如event.price = 100,这表示一个条件,在实务中,很可能会出现多个条件,比如 价格 = 100 and 个数 = 10 or 价格 = 0.



start.where(new SimpleCondition<Event>() {
    public boolean filter(Event value) {
        return value.getName().startsWith("foo");



 * private class MyCondition extends IterativeCondition<Event> {
 * 		@Override
 *     	public boolean filter(Event value, Context<Event> ctx) throws Exception {
 *     		if (!value.getName().equals("middle")) {
 *     			return false;
 *     		}
 *     		double sum = 0.0;
 *     		for (Event e: ctx.getEventsForPattern("middle")) {
 *     			sum += e.getPrice();
 *     		}
 *     		sum += value.getPrice();
 *     		return Double.compare(sum, 5.0) <= 0;
 *     	}
 *    }


pattern.where(new IterativeCondition<Event>() {
    public boolean filter(Event value, Context ctx) throws Exception {
        return ... // some condition


    public boolean filter(T value) throws Exception {
        return subtype.isAssignableFrom(value.getClass());




分别是: where, or, until 和 subtype

     * Adds a condition that has to be satisfied by an event in order to be considered a match. If
     * another condition has already been set, the new one is going to be combined with the previous
     * with a logical {@code AND}. In other case, this is going to be the only condition.
     * @param condition The condition as an {@link IterativeCondition}.
     * @return The pattern with the new condition is set.
    public Pattern<T, F> where(IterativeCondition<F> condition) {
        Preconditions.checkNotNull(condition, "The condition cannot be null.");

        ClosureCleaner.clean(condition, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
        if (this.condition == null) {
            this.condition = condition;
        } else {
            this.condition = new RichAndCondition<>(this.condition, condition);
        return this;

     * Adds a condition that has to be satisfied by an event in order to be considered a match. If
     * another condition has already been set, the new one is going to be combined with the previous
     * with a logical {@code OR}. In other case, this is going to be the only condition.
     * @param condition The condition as an {@link IterativeCondition}.
     * @return The pattern with the new condition is set.
    public Pattern<T, F> or(IterativeCondition<F> condition) {
        Preconditions.checkNotNull(condition, "The condition cannot be null.");

        ClosureCleaner.clean(condition, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);

        if (this.condition == null) {
            this.condition = condition;
        } else {
            this.condition = new RichOrCondition<>(this.condition, condition);
        return this;

     * Applies a subtype constraint on the current pattern. This means that an event has to be of
     * the given subtype in order to be matched.
     * @param subtypeClass Class of the subtype
     * @param <S> Type of the subtype
     * @return The same pattern with the new subtype constraint
    public <S extends F> Pattern<T, S> subtype(final Class<S> subtypeClass) {
        Preconditions.checkNotNull(subtypeClass, "The class cannot be null.");

        if (condition == null) {
            this.condition = new SubtypeCondition<F>(subtypeClass);
        } else {
            this.condition =
                    new RichAndCondition<>(condition, new SubtypeCondition<F>(subtypeClass));

        Pattern<T, S> result = (Pattern<T, S>) this;

        return result;

     * Applies a stop condition for a looping state. It allows cleaning the underlying state.
     * @param untilCondition a condition an event has to satisfy to stop collecting events into
     *     looping state
     * @return The same pattern with applied untilCondition
    public Pattern<T, F> until(IterativeCondition<F> untilCondition) {
        Preconditions.checkNotNull(untilCondition, "The condition cannot be null");

        if (this.untilCondition != null) {
            throw new MalformedPatternException("Only one until condition can be applied.");

        if (!quantifier.hasProperty(Quantifier.QuantifierProperty.LOOPING)) {
            throw new MalformedPatternException(
                    "The until condition is only applicable to looping states.");

        ClosureCleaner.clean(untilCondition, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
        this.untilCondition = untilCondition;

        return this;


A quantifier describing the Pattern. There are three main groups of {@link Quantifier}





RichAndCondition, RichOrCondition,RichNotCondition的区别在于对继承的IterativeCondition的filter方法的不同实现,代表了基本的逻辑关系,与、或、非。虽然非没有用到,估计是作者写嗨了?

    public boolean filter(T value, Context<T> ctx) throws Exception {
        return getLeft().filter(value, ctx) && getRight().filter(value, ctx);
    public boolean filter(T value, Context<T> ctx) throws Exception {
        return getLeft().filter(value, ctx) || getRight().filter(value, ctx);
    public boolean filter(T value, Context<T> ctx) throws Exception {
        return !getNestedConditions()[0].filter(value, ctx);


    public RichCompositeIterativeCondition(final IterativeCondition<T>... nestedConditions) {
        for (IterativeCondition<T> condition : nestedConditions) {
            Preconditions.checkNotNull(condition, "The condition cannot be null.");
        this.nestedConditions = nestedConditions;


    public Pattern<T, F> oneOrMore(@Nullable Time windowTime) {
        this.quantifier = Quantifier.looping(quantifier.getConsumingStrategy());
        this.times = Times.of(1, windowTime);
        return this;

    public Pattern<T, F> timesOrMore(int times, @Nullable Time windowTime) {
        this.quantifier = Quantifier.looping(quantifier.getConsumingStrategy());
        this.times = Times.of(times, windowTime);
        return this;




  • Single:一次,不用考虑后续
  • Looping:设置Looping策略,Looping Until的语境不一定是遇到某个事件为止,也可能是Looping Within XXX Seconds…这一块后面也会涉及到
  • Times:计算具体次数,但是没有OrMore,必须是具体的次数


  1. SimpleCondition: 不需要上下文(基于该个体模式已经匹配到的其他元素)
  2. IterativeCondition: 复杂的特征,需要上下文
  3. Subtype:  判断事件类关系的特殊条件


  1. 不组合:where或者or都行,一个条件,相当于直接给condition赋值。
  2. CombiningConditions: 将条件组合起来,可以通过where(其实就是and)和or 或者Until(与Loop组合)。但是这里可惜的是,调用方法的必须是Pattern对象,condition对象自身没有逻辑组合方法,因此必须通过个体模式对象自己一条、一条的组合每一个基本条件。
  3. StopCondition:搭配OneOrMore或TimesOrMore的Until条件


  • where:可以单独使用。a.where(b) 相当于and方法
  • or:也可以单独使用。 a.or(b) 相当于 a or b
  • until: 必须在具备Looping特性的条件上使用,比如OneOrMore和TimesOrMore


  • 定义基本模式变量  => 从begin(String)开始
  • 如何让模式去匹配一个事件 =>定义一个基本匹配条件(Simple,Iterative,Subtype)
  • 如何复杂的去匹配一个事件 =>使用复杂的匹配条件,即将多个基本匹配条件逻辑组合起来(where,or,until)

到现在为止,我们终于可以在Flink在实现类似于 “a”的简单匹配模式了!

那么如何实现 a+, a?,a until …. 的一次匹配多个事件?

  • 如何让该模式去一次匹配多个事件 => 使用量词。使用量词来限定匹配次数。

下面我们要学习如何把个体模式进一步组合起来,组合为 “模式组”(中文翻译找不到更好的词了)。我们只有能够完整的定义出  a+ b1 b2 c 这样的完整正则,才能提出什么是完整匹配和部分匹配,以及在应对完全匹配和多个部分匹配时如何设置剪枝策略(AfterMatchSkipStrategy)。

Group of Pattern, 模式组


  1. begin方法,已经说过了,用于定义初始事件
  2. 最高级别:next方法,对应ConsumingStrategy.Strict, 即不允许两个匹配事件之间有其他事件(不允许不连续)
  3. 放松级别:followedBy方法,对应ConsumingStrategy.SKIP_TILL_NEXT,允许两个匹配事件之间有其他事件
  4. 最放松级别:followedByAny方法,对应ConsumingStrategy.SKIP_TILL_ANY,根据文档的说法,指的是即使是匹配到的事件,也允许不连续。

即,如果模式组定义为 a b, 数据流为 a c b1 b2

(b1 b2为两个不同的B类型变量)

  1. next => 匹配不到,因为隔着C
  2. followedBy => a b1, 因为到B1为止,已经匹配到了,不存在下面的a b2
  3. followedByAny => a b1 或者 a b2(不必从第一个开始)


模式组: a b+ c

数据流:”a”, “b1”, “d1”, “b2”, “d2”, “b3” “c”

  1. LoopPattern后调用consecutive方法(即Next) => a b3 c
  2. LoopPattern默认(即followedBy) =>

    {a b1 c}


    {a b1 b2 c}


    {a b1 b2 b3 c}


    {a b2 c}


    {a b2 b3 c}


    {a b3 c}
  3. LoopPattern后调用allowCombination方法(其实就是followedByAny)

    => {a b1 c}


    {a b1 b2 c}


    {a b1 b3 c}


    {a b1 b2 b3 c}


    {a b2 c}


    {a b2 b3 c}


    {a b3 c}


  1. next方法 => a b3 c
  2. followedBy =>

    {a b1 c}


    {a b1 b2 c}


    {a b1 b2 b3 c}


    {a b2 c}


    {a b2 b3 c}


    {a b3 c}

  3. followedByAny => {a b1 c}


    {a b2 c}


    {a b3 c}


    {a b1 b2 c}


    {a b1 b3 c}


    {a b2 b3 c}


    {a b1 b2 b3 c} =>


为了明明是STRICT,却允许出现 a b3 c 呢?

那为什么followedBy中会允许出现 a b2 c 和 a b3 c呢?为什么不从b1开始?



即虽然LoopPattern B本身是consecutive,但是a和b+之间是followedByAny关系。



private IterativeCondition<T> getInnerIgnoreCondition(Pattern<T, ?> pattern) {
            Quantifier.ConsumingStrategy consumingStrategy =
            if (headOfGroup(pattern)) {
                // for the head pattern of a group pattern, we should consider the
                // inner consume strategy of the group pattern
                consumingStrategy = currentGroupPattern.getQuantifier().getInnerConsumingStrategy();

            IterativeCondition<T> innerIgnoreCondition = null;
            switch (consumingStrategy) {
                case STRICT:
                    innerIgnoreCondition = null;
                case SKIP_TILL_NEXT:
                    innerIgnoreCondition =
                            new RichNotCondition<>((IterativeCondition<T>) pattern.getCondition());
                case SKIP_TILL_ANY:
                    innerIgnoreCondition = BooleanConditions.trueFunction();

            if (currentGroupPattern != null && currentGroupPattern.getUntilCondition() != null) {
                innerIgnoreCondition =
                                (IterativeCondition<T>) currentGroupPattern.getUntilCondition(),
            return innerIgnoreCondition;

         * @return The {@link IterativeCondition condition} for the {@code IGNORE} edge that
         *     corresponds to the specified {@link Pattern} and extended with stop(until) condition
         *     if necessary. For more on strategy see {@link Quantifier}
        private IterativeCondition<T> getIgnoreCondition(Pattern<T, ?> pattern) {
            Quantifier.ConsumingStrategy consumingStrategy =
            if (headOfGroup(pattern)) {
                // for the head pattern of a group pattern, we should consider the inner consume
                // strategy
                // of the group pattern if the group pattern is not the head of the TIMES/LOOPING
                // quantifier;
                // otherwise, we should consider the consume strategy of the group pattern
                if (isCurrentGroupPatternFirstOfLoop()) {
                    consumingStrategy = currentGroupPattern.getQuantifier().getConsumingStrategy();
                } else {
                    consumingStrategy =

            IterativeCondition<T> ignoreCondition = null;
            switch (consumingStrategy) {
                case STRICT:
                    ignoreCondition = null;
                case SKIP_TILL_NEXT:
                    ignoreCondition =
                            new RichNotCondition<>((IterativeCondition<T>) pattern.getCondition());
                case SKIP_TILL_ANY:
                    ignoreCondition = BooleanConditions.trueFunction();

            if (currentGroupPattern != null && currentGroupPattern.getUntilCondition() != null) {
                ignoreCondition =
                                (IterativeCondition<T>) currentGroupPattern.getUntilCondition(),
            return ignoreCondition;


private ConsumingStrategy innerConsumingStrategy = ConsumingStrategy.SKIP_TILL_NEXT;
public void combinations() {
                "Combinations not applicable to " + this + "!");
                innerConsumingStrategy != ConsumingStrategy.STRICT,
                "You can apply either combinations or consecutive, not both!");
                innerConsumingStrategy != ConsumingStrategy.SKIP_TILL_ANY,
                "Combinations already applied!");

        innerConsumingStrategy = ConsumingStrategy.SKIP_TILL_ANY;

    public void consecutive() {
                hasProperty(QuantifierProperty.LOOPING) || hasProperty(QuantifierProperty.TIMES),
                "Consecutive not applicable to " + this + "!");
                innerConsumingStrategy != ConsumingStrategy.SKIP_TILL_ANY,
                "You can apply either combinations or consecutive, not both!");
                innerConsumingStrategy != ConsumingStrategy.STRICT, "Consecutive already applied!");

        innerConsumingStrategy = ConsumingStrategy.STRICT;


  1. 默认 => skip_till_next
  2. consecutive => strict
  3. allowCombination => skip_till_any


  1. Strict => IgnoreCondition为空,即 不能忽视任何事件
  2. skip_till_next => 只要不满足当前条件,即忽视、跳过
  3. allowCombination => 永远都可以跳过


    public Pattern<T, F> within(Time windowTime) {
        return within(windowTime, WithinType.FIRST_AND_LAST);

     * Defines the maximum time interval in which a matching pattern has to be completed in order to
     * be considered valid. This interval corresponds to the maximum time gap between events.
     * @param withinType Type of the within interval between events
     * @param windowTime Time of the matching window
     * @return The same pattern operator with the new window length
    public Pattern<T, F> within(Time windowTime, WithinType withinType) {
        if (windowTime != null) {
            windowTimes.put(withinType, windowTime);

        return this;

官网默认的within(Time.seconds(10)) 对应的就是WithinType.FIRST_AND_LAST

package org.apache.flink.cep.pattern;

/** Type enum of time interval corresponds to the maximum time gap between events. */
public enum WithinType {
    // Interval corresponds to the maximum time gap between the previous and current event.
    // Interval corresponds to the maximum time gap between the first and last event.


  1. PREVIOUS_AND_CURRENT: 两次事件之间的间隔
  2. FIRST_AND_LAST: 第一个事件到最后一个事件的间隔,即必须在指定时间内完成整个模式的匹配。



public class GroupPattern<T, F extends T> extends Pattern<T, F> {

    /** Group pattern representing the pattern definition of this group. */
    private final Pattern<T, ? extends T> groupPattern;

            final Pattern<T, ? extends T> previous,
            final Pattern<T, ? extends T> groupPattern,
            final Quantifier.ConsumingStrategy consumingStrategy,
            final AfterMatchSkipStrategy afterMatchSkipStrategy) {
        super("GroupPattern", previous, consumingStrategy, afterMatchSkipStrategy);
        this.groupPattern = groupPattern;

    public Pattern<T, F> where(IterativeCondition<F> condition) {
        throw new UnsupportedOperationException("GroupPattern does not support where clause.");

    public Pattern<T, F> or(IterativeCondition<F> condition) {
        throw new UnsupportedOperationException("GroupPattern does not support or clause.");

    public <S extends F> Pattern<T, S> subtype(final Class<S> subtypeClass) {
        throw new UnsupportedOperationException("GroupPattern does not support subtype clause.");

    public Pattern<T, ? extends T> getRawPattern() {
        return groupPattern;




    private final AfterMatchSkipStrategy afterMatchSkipStrategy;

    protected Pattern(
            final String name,
            final Pattern<T, ? extends T> previous,
            final ConsumingStrategy consumingStrategy,
            final AfterMatchSkipStrategy afterMatchSkipStrategy) {
        this.name = name;
        this.previous = previous;
        this.quantifier = Quantifier.one(consumingStrategy);
        this.afterMatchSkipStrategy = afterMatchSkipStrategy;


    public static <X> Pattern<X, X> begin(
            final String name, final AfterMatchSkipStrategy afterMatchSkipStrategy) {
        return new Pattern<X, X>(name, null, ConsumingStrategy.STRICT, afterMatchSkipStrategy);
    public Pattern<T, T> next(final String name) {
        return new Pattern<>(name, this, ConsumingStrategy.STRICT, afterMatchSkipStrategy);

     * Appends a new pattern to the existing one. The new pattern enforces that there is no event
     * matching this pattern right after the preceding matched event.
     * @param name Name of the new pattern
     * @return A new pattern which is appended to this one
    public Pattern<T, T> notNext(final String name) {
        if (quantifier.hasProperty(Quantifier.QuantifierProperty.OPTIONAL)) {
            throw new UnsupportedOperationException(
                    "Specifying a pattern with an optional path to NOT condition is not supported yet. "
                            + "You can simulate such pattern with two independent patterns, one with and the other without "
                            + "the optional part.");
        return new Pattern<>(name, this, ConsumingStrategy.NOT_NEXT, afterMatchSkipStrategy);

    public Pattern<T, T> followedBy(final String name) {
        return new Pattern<>(name, this, ConsumingStrategy.SKIP_TILL_NEXT, afterMatchSkipStrategy);

     * Appends a new pattern to the existing one. The new pattern enforces that there is no event
     * matching this pattern between the preceding pattern and succeeding this one.
     * <p><b>NOTE:</b> There has to be other pattern after this one.
     * @param name Name of the new pattern
     * @return A new pattern which is appended to this one
    public Pattern<T, T> notFollowedBy(final String name) {
        if (quantifier.hasProperty(Quantifier.QuantifierProperty.OPTIONAL)) {
            throw new UnsupportedOperationException(
                    "Specifying a pattern with an optional path to NOT condition is not supported yet. "
                            + "You can simulate such pattern with two independent patterns, one with and the other without "
                            + "the optional part.");
        return new Pattern<>(name, this, ConsumingStrategy.NOT_FOLLOW, afterMatchSkipStrategy);

    public Pattern<T, T> notFollowedBy(final String name) {
        if (quantifier.hasProperty(Quantifier.QuantifierProperty.OPTIONAL)) {
            throw new UnsupportedOperationException(
                    "Specifying a pattern with an optional path to NOT condition is not supported yet. "
                            + "You can simulate such pattern with two independent patterns, one with and the other without "
                            + "the optional part.");
        return new Pattern<>(name, this, ConsumingStrategy.NOT_FOLLOW, afterMatchSkipStrategy);

     * Appends a new pattern to the existing one. The new pattern enforces non-strict temporal
     * contiguity. This means that a matching event of this pattern and the preceding matching event
     * might be interleaved with other events which are ignored.
     * @param name Name of the new pattern
     * @return A new pattern which is appended to this one
    public Pattern<T, T> followedByAny(final String name) {
        return new Pattern<>(name, this, ConsumingStrategy.SKIP_TILL_ANY, afterMatchSkipStrategy);


    public Pattern<T, T> notNext(final String name) {
        if (quantifier.hasProperty(Quantifier.QuantifierProperty.OPTIONAL)) {
            throw new UnsupportedOperationException(
                    "Specifying a pattern with an optional path to NOT condition is not supported yet. "
                            + "You can simulate such pattern with two independent patterns, one with and the other without "
                            + "the optional part.");
        return new Pattern<>(name, this, ConsumingStrategy.NOT_NEXT, afterMatchSkipStrategy);

    public Pattern<T, T> notFollowedBy(final String name) {
        if (quantifier.hasProperty(Quantifier.QuantifierProperty.OPTIONAL)) {
            throw new UnsupportedOperationException(
                    "Specifying a pattern with an optional path to NOT condition is not supported yet. "
                            + "You can simulate such pattern with two independent patterns, one with and the other without "
                            + "the optional part.");
        return new Pattern<>(name, this, ConsumingStrategy.NOT_FOLLOW, afterMatchSkipStrategy);

不出意料,没有notFollowedByAny和 notUntil,并且都注明了,Not链接不能跟在optional后面



