本文的基础环境可以参考
    
     flink 1.10.1 java版本wordcount演示 (nc + socket)
    
    ,在此基础上增加cep实现复杂模式匹配测试。
   
    1. 添加依赖
   
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-cep_2.11</artifactId>
    <version>1.10.1</version>
</dependency>这里的版本应保持与flink版本一致,如果版本不一致,可能会导致各种错误。
    2. 程序代码
   
package com.demo.cep;
import com.demo.entity.DataModel;
import com.demo.entity.ResultModel;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.List;
import java.util.Map;
public class FlinkCEPDemo {
    public static void main(String[] args) throws Exception {
        //创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //从文件中读取数据
        String inputPath = "data/testdata.txt";
        DataStream<String> inputDataSet = env.readTextFile(inputPath);
        //对数据流进行处理转换
        DataStream<DataModel> dataStream = inputDataSet.map(new MapFunction<String, DataModel>() {
            @Override
            public DataModel map(String s) throws Exception {
                String splits[] = s.split(",");
                return new DataModel(splits[0], new Float(splits[1]),
                        new Float(splits[2]), new Float(splits[3]),
                        new Float(splits[4]));
            }
        });
        Pattern<DataModel, DataModel> pattern = Pattern.<DataModel>begin("start").where(
                new SimpleCondition<DataModel>() {
                    @Override
                    public boolean filter(DataModel dataModel) throws Exception {
                        return dataModel.getPrice() > 4;
                    }
                })
                .times(1)
                .followedBy("middle").where(
                        new SimpleCondition<DataModel>() {
                            @Override
                            public boolean filter(DataModel dataModel) throws Exception {
                                return dataModel.getPrice() <= 3;
                            }
                        }).times(2, 4).greedy()
                .followedBy("end").where(
                        new IterativeCondition<DataModel>() {
                            @Override
                            public boolean filter(DataModel dataModel, Context ctx) throws Exception {
                                // 取得当前事件对象
                                double sum = dataModel.getPrice();
                                // 获取满足middle条件的事件对象
                                Iterable<DataModel> middle = ctx.getEventsForPattern("middle");
                                for (DataModel model : middle) {
                                    sum += model.getPrice();
                                }
                                return dataModel.getPrice() > 4;
                            }
                        });
        PatternStream<DataModel> patternStream = CEP.pattern(dataStream.keyBy(DataModel::getCode), pattern);
        SingleOutputStreamOperator<ResultModel> select = patternStream.select(new MyPatternSelectFunction());
        select.print();
        env.execute();
    }
    public static class MyPatternSelectFunction implements PatternSelectFunction<com.demo.entity.DataModel, com.demo.entity.ResultModel> {
        @Override
        public com.demo.entity.ResultModel select(Map<String, List<com.demo.entity.DataModel>> pattern) {
            com.demo.entity.DataModel startEvent = pattern.get("start").get(0);
            com.demo.entity.DataModel endEvent = pattern.get("end").get(0);
            return new ResultModel(startEvent.getCode(), startEvent.getPrice(), endEvent.getPrice());
        }
    }
}
程序代码从大于4的数据开始,找到连续2到4个小于3的数据,尽可能匹配,然后再找到一个大于4的数据,则表示模式匹配完成。
    3. 辅助代码
   
package com.demo.entity;
public class DataModel {
    private String code;
    private float price;
    private float high;
    private float low;
    private float open;
    public DataModel() {
    }
    public DataModel(String code, float price, float high, float low, float open) {
        this.code = code;
        this.price = price;
        this.high = high;
        this.low = low;
        this.open = open;
    }
    public String getCode() {
        return code;
    }
    public void setCode(String code) {
        this.code = code;
    }
    public float getPrice() {
        return price;
    }
    public void setPrice(float price) {
        this.price = price;
    }
    public float getHigh() {
        return high;
    }
    public void setHigh(float high) {
        this.high = high;
    }
    public float getLow() {
        return low;
    }
    public void setLow(float low) {
        this.low = low;
    }
    public float getOpen() {
        return open;
    }
    public void setOpen(float open) {
        this.open = open;
    }
    @Override
    public String toString() {
        return "DataModel{" +
                "code='" + code + '\'' +
                ", price=" + price +
                ", high=" + high +
                ", low=" + low +
                ", open=" + open +
                '}';
    }
}
    4. 结果类定义
   
package com.demo.entity;
public class ResultModel {
    private String code;
    private float startPrice;
    private float endPrice;
    public ResultModel() {
    }
    public ResultModel(String code, float startPrice, float endPrice) {
        this.code = code;
        this.startPrice = startPrice;
        this.endPrice = endPrice;
    }
    public String getCode() {
        return code;
    }
    public void setCode(String code) {
        this.code = code;
    }
    public float getStartPrice() {
        return startPrice;
    }
    public void setStartPrice(float startPrice) {
        this.startPrice = startPrice;
    }
    public float getEndPrice() {
        return endPrice;
    }
    public void setEndPrice(float endPrice) {
        this.endPrice = endPrice;
    }
    @Override
    public String toString() {
        return "ResultModel{" +
                "code='" + code + '\'' +
                ", startPrice=" + startPrice +
                ", endPrice=" + endPrice +
                '}';
    }
}
    5. 测试数据
   
data/testdata.txt
0000001,1,1,1,1
0000001,1,1,1,1
0000001,1,1,1,1
0000001,5,1,1,1
0000001,2,1,1,1
0000001,1,1,1,1
0000001,2,1,1,1
0000001,3,1,1,1
0000001,6,1,1,1
    6. 启动程序,执行测试
   
可以看到找到匹配的数据,该数据有模式匹配的第一条数据和最后一条数据组合而成。
ResultModel{code='0000001', startPrice=5.0, endPrice=6.0} 
版权声明:本文为liaomingwu原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
