本文的基础环境可以参考
flink 1.10.1 java版本wordcount演示 (nc + socket)
,在此基础上增加cep实现复杂模式匹配测试。
1. 添加依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep_2.11</artifactId>
<version>1.10.1</version>
</dependency>
这里的版本应保持与flink版本一致,如果版本不一致,可能会导致各种错误。
2. 程序代码
package com.demo.cep;
import com.demo.entity.DataModel;
import com.demo.entity.ResultModel;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.List;
import java.util.Map;
public class FlinkCEPDemo {
public static void main(String[] args) throws Exception {
//创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//从文件中读取数据
String inputPath = "data/testdata.txt";
DataStream<String> inputDataSet = env.readTextFile(inputPath);
//对数据流进行处理转换
DataStream<DataModel> dataStream = inputDataSet.map(new MapFunction<String, DataModel>() {
@Override
public DataModel map(String s) throws Exception {
String splits[] = s.split(",");
return new DataModel(splits[0], new Float(splits[1]),
new Float(splits[2]), new Float(splits[3]),
new Float(splits[4]));
}
});
Pattern<DataModel, DataModel> pattern = Pattern.<DataModel>begin("start").where(
new SimpleCondition<DataModel>() {
@Override
public boolean filter(DataModel dataModel) throws Exception {
return dataModel.getPrice() > 4;
}
})
.times(1)
.followedBy("middle").where(
new SimpleCondition<DataModel>() {
@Override
public boolean filter(DataModel dataModel) throws Exception {
return dataModel.getPrice() <= 3;
}
}).times(2, 4).greedy()
.followedBy("end").where(
new IterativeCondition<DataModel>() {
@Override
public boolean filter(DataModel dataModel, Context ctx) throws Exception {
// 取得当前事件对象
double sum = dataModel.getPrice();
// 获取满足middle条件的事件对象
Iterable<DataModel> middle = ctx.getEventsForPattern("middle");
for (DataModel model : middle) {
sum += model.getPrice();
}
return dataModel.getPrice() > 4;
}
});
PatternStream<DataModel> patternStream = CEP.pattern(dataStream.keyBy(DataModel::getCode), pattern);
SingleOutputStreamOperator<ResultModel> select = patternStream.select(new MyPatternSelectFunction());
select.print();
env.execute();
}
public static class MyPatternSelectFunction implements PatternSelectFunction<com.demo.entity.DataModel, com.demo.entity.ResultModel> {
@Override
public com.demo.entity.ResultModel select(Map<String, List<com.demo.entity.DataModel>> pattern) {
com.demo.entity.DataModel startEvent = pattern.get("start").get(0);
com.demo.entity.DataModel endEvent = pattern.get("end").get(0);
return new ResultModel(startEvent.getCode(), startEvent.getPrice(), endEvent.getPrice());
}
}
}
程序代码从大于4的数据开始,找到连续2到4个小于3的数据,尽可能匹配,然后再找到一个大于4的数据,则表示模式匹配完成。
3. 辅助代码
package com.demo.entity;
public class DataModel {
private String code;
private float price;
private float high;
private float low;
private float open;
public DataModel() {
}
public DataModel(String code, float price, float high, float low, float open) {
this.code = code;
this.price = price;
this.high = high;
this.low = low;
this.open = open;
}
public String getCode() {
return code;
}
public void setCode(String code) {
this.code = code;
}
public float getPrice() {
return price;
}
public void setPrice(float price) {
this.price = price;
}
public float getHigh() {
return high;
}
public void setHigh(float high) {
this.high = high;
}
public float getLow() {
return low;
}
public void setLow(float low) {
this.low = low;
}
public float getOpen() {
return open;
}
public void setOpen(float open) {
this.open = open;
}
@Override
public String toString() {
return "DataModel{" +
"code='" + code + '\'' +
", price=" + price +
", high=" + high +
", low=" + low +
", open=" + open +
'}';
}
}
4. 结果类定义
package com.demo.entity;
public class ResultModel {
private String code;
private float startPrice;
private float endPrice;
public ResultModel() {
}
public ResultModel(String code, float startPrice, float endPrice) {
this.code = code;
this.startPrice = startPrice;
this.endPrice = endPrice;
}
public String getCode() {
return code;
}
public void setCode(String code) {
this.code = code;
}
public float getStartPrice() {
return startPrice;
}
public void setStartPrice(float startPrice) {
this.startPrice = startPrice;
}
public float getEndPrice() {
return endPrice;
}
public void setEndPrice(float endPrice) {
this.endPrice = endPrice;
}
@Override
public String toString() {
return "ResultModel{" +
"code='" + code + '\'' +
", startPrice=" + startPrice +
", endPrice=" + endPrice +
'}';
}
}
5. 测试数据
data/testdata.txt
0000001,1,1,1,1
0000001,1,1,1,1
0000001,1,1,1,1
0000001,5,1,1,1
0000001,2,1,1,1
0000001,1,1,1,1
0000001,2,1,1,1
0000001,3,1,1,1
0000001,6,1,1,1
6. 启动程序,执行测试
可以看到找到匹配的数据,该数据有模式匹配的第一条数据和最后一条数据组合而成。
ResultModel{code='0000001', startPrice=5.0, endPrice=6.0}
版权声明:本文为liaomingwu原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。