flink 1.10.1 cep java版本实现复杂事件模式匹配

  • Post author:
  • Post category:java


本文的基础环境可以参考

flink 1.10.1 java版本wordcount演示 (nc + socket)

,在此基础上增加cep实现复杂模式匹配测试。

1. 添加依赖

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-cep_2.11</artifactId>
    <version>1.10.1</version>
</dependency>

这里的版本应保持与flink版本一致,如果版本不一致,可能会导致各种错误。

2. 程序代码

package com.demo.cep;

import com.demo.entity.DataModel;
import com.demo.entity.ResultModel;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.List;
import java.util.Map;

public class FlinkCEPDemo {

    public static void main(String[] args) throws Exception {

        //创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);


        //从文件中读取数据
        String inputPath = "data/testdata.txt";
        DataStream<String> inputDataSet = env.readTextFile(inputPath);
        //对数据流进行处理转换
        DataStream<DataModel> dataStream = inputDataSet.map(new MapFunction<String, DataModel>() {
            @Override
            public DataModel map(String s) throws Exception {
                String splits[] = s.split(",");

                return new DataModel(splits[0], new Float(splits[1]),
                        new Float(splits[2]), new Float(splits[3]),
                        new Float(splits[4]));
            }
        });


        Pattern<DataModel, DataModel> pattern = Pattern.<DataModel>begin("start").where(
                new SimpleCondition<DataModel>() {
                    @Override
                    public boolean filter(DataModel dataModel) throws Exception {
                        return dataModel.getPrice() > 4;
                    }
                })
                .times(1)
                .followedBy("middle").where(
                        new SimpleCondition<DataModel>() {
                            @Override
                            public boolean filter(DataModel dataModel) throws Exception {
                                return dataModel.getPrice() <= 3;
                            }
                        }).times(2, 4).greedy()
                .followedBy("end").where(
                        new IterativeCondition<DataModel>() {
                            @Override
                            public boolean filter(DataModel dataModel, Context ctx) throws Exception {

                                // 取得当前事件对象
                                double sum = dataModel.getPrice();

                                // 获取满足middle条件的事件对象
                                Iterable<DataModel> middle = ctx.getEventsForPattern("middle");

                                for (DataModel model : middle) {
                                    sum += model.getPrice();
                                }

                                return dataModel.getPrice() > 4;
                            }
                        });

        PatternStream<DataModel> patternStream = CEP.pattern(dataStream.keyBy(DataModel::getCode), pattern);

        SingleOutputStreamOperator<ResultModel> select = patternStream.select(new MyPatternSelectFunction());

        select.print();

        env.execute();

    }

    public static class MyPatternSelectFunction implements PatternSelectFunction<com.demo.entity.DataModel, com.demo.entity.ResultModel> {
        @Override
        public com.demo.entity.ResultModel select(Map<String, List<com.demo.entity.DataModel>> pattern) {
            com.demo.entity.DataModel startEvent = pattern.get("start").get(0);
            com.demo.entity.DataModel endEvent = pattern.get("end").get(0);

            return new ResultModel(startEvent.getCode(), startEvent.getPrice(), endEvent.getPrice());
        }
    }

}

程序代码从大于4的数据开始,找到连续2到4个小于3的数据,尽可能匹配,然后再找到一个大于4的数据,则表示模式匹配完成。

3. 辅助代码

package com.demo.entity;

public class DataModel {

    private String code;
    private float price;
    private float high;
    private float low;
    private float open;

    public DataModel() {
    }

    public DataModel(String code, float price, float high, float low, float open) {
        this.code = code;
        this.price = price;
        this.high = high;
        this.low = low;
        this.open = open;
    }

    public String getCode() {
        return code;
    }

    public void setCode(String code) {
        this.code = code;
    }

    public float getPrice() {
        return price;
    }

    public void setPrice(float price) {
        this.price = price;
    }

    public float getHigh() {
        return high;
    }

    public void setHigh(float high) {
        this.high = high;
    }

    public float getLow() {
        return low;
    }

    public void setLow(float low) {
        this.low = low;
    }

    public float getOpen() {
        return open;
    }

    public void setOpen(float open) {
        this.open = open;
    }

    @Override
    public String toString() {
        return "DataModel{" +
                "code='" + code + '\'' +
                ", price=" + price +
                ", high=" + high +
                ", low=" + low +
                ", open=" + open +
                '}';
    }
}

4. 结果类定义

package com.demo.entity;

public class ResultModel {

    private String code;
    private float startPrice;
    private float endPrice;

    public ResultModel() {
    }

    public ResultModel(String code, float startPrice, float endPrice) {
        this.code = code;
        this.startPrice = startPrice;
        this.endPrice = endPrice;
    }

    public String getCode() {
        return code;
    }

    public void setCode(String code) {
        this.code = code;
    }

    public float getStartPrice() {
        return startPrice;
    }

    public void setStartPrice(float startPrice) {
        this.startPrice = startPrice;
    }

    public float getEndPrice() {
        return endPrice;
    }

    public void setEndPrice(float endPrice) {
        this.endPrice = endPrice;
    }

    @Override
    public String toString() {
        return "ResultModel{" +
                "code='" + code + '\'' +
                ", startPrice=" + startPrice +
                ", endPrice=" + endPrice +
                '}';
    }
}

5. 测试数据

data/testdata.txt

0000001,1,1,1,1
0000001,1,1,1,1
0000001,1,1,1,1
0000001,5,1,1,1
0000001,2,1,1,1
0000001,1,1,1,1
0000001,2,1,1,1
0000001,3,1,1,1
0000001,6,1,1,1

6. 启动程序,执行测试

可以看到找到匹配的数据,该数据有模式匹配的第一条数据和最后一条数据组合而成。

ResultModel{code='0000001', startPrice=5.0, endPrice=6.0}



版权声明:本文为liaomingwu原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。