这篇文章总结实时推荐系统中使用Flink流式计算的一些实践经验。
数据源选择Kafka数据流,经过Flink SQL和自定义函数处理得到各种时间口径下的数据,作为特征传到模型,得到预测结果后向用户实时推送。
其中涉及到的一些比较关键的点:
a. Flink时间戳概念很重要,了解event time,processing time,watermark,窗口处理的相关知识对Flink如何处理数据会有更清晰的认识,可以参考一下极客时间《Flink核心技术与实践》相关章节。
b. Flink目前支持利用Java进行UDF,UDTF等操作,一般自定函数后打包成jar包即可以在Flink中进行引用,下面给出一些简单的示例。
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;
import util.DateUtil;
import util.StringUtil;
import java.util.Arrays;
import java.util.List;
/**
* UDTF Demo
**/
public class ExtractFeatureList extends TableFunction<Row> {
// 在线特征
private final static List<String> featureNames = Arrays.asList(new String[]{'f1','f2'
});
public void eval(String s) {
// 特征值初始化
Row row = new Row(featureNames.size());
for (int i = 0; i < featureNames.size(); ++i) {
row.setField(i, 0.0);
}
// 解析json格式的特征集合
if (StringUtil.isJSONObject(s)) {
JSONObject obj = JSONObject.parseObject(s);
// 将每个有效特征值取出
for (int i = 0; i < featureNames.size(); ++i) {
String name = featureNames.get(i);
double value = getValidFeatureValue(obj, name);
row.setField(i, value);
}
}
collect(row);
}
@Override
public TypeInformation<Row> getResultType() {
return Types.ROW(
Types.DOUBLE, Types.DOUBLE
);
}
/**
* 获得符合有效期限制的特征值
*/
private double getValidFeatureValue(JSONObject obj, String featureName) {
}
}
/**
* UDF Demo
* SQL用法: select user_id,random_pick(item_list) as rec_list from tbname
**/
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.table.functions.ScalarFunction;
import java.util.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
public class RandomPick extends ScalarFunction {
private static final String PROBABILITY_EXTRACT = "\\{.*?\\}";
private static final Pattern pattern = Pattern.compile(PROBABILITY_EXTRACT);
public String eval(String s) {
Random random = new Random();
String result = "xxxxx";
return result;
}
}
踩坑复盘:
1. 滑动(固定)窗口算子的运行受水印时间和作业并行度影响较大,当日志计数不正确时通常可以从这两方面去排查原因。比如在对数据流较少的日志计数时可以通过减小作业并行度和union其他水印时间刷新较频繁的日志来使得能及时触发计数。
2. 采用固定窗口算子时QA在测试时某些行为的计数一直累计不正确,后来排查到是因为QA所在服务器时间戳远大于当前时间,所以导致了flink对于当前时间的日志判定为过期日志而无法触发累计。
版权声明:本文为qq632683582原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。