Flink实践小结

  • Post author:
  • Post category:其他


这篇文章总结实时推荐系统中使用Flink流式计算的一些实践经验。

数据源选择Kafka数据流,经过Flink SQL和自定义函数处理得到各种时间口径下的数据,作为特征传到模型,得到预测结果后向用户实时推送。

其中涉及到的一些比较关键的点:

a. Flink时间戳概念很重要,了解event time,processing time,watermark,窗口处理的相关知识对Flink如何处理数据会有更清晰的认识,可以参考一下极客时间《Flink核心技术与实践》相关章节。

b. Flink目前支持利用Java进行UDF,UDTF等操作,一般自定函数后打包成jar包即可以在Flink中进行引用,下面给出一些简单的示例。

import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;
import util.DateUtil;
import util.StringUtil;

import java.util.Arrays;
import java.util.List;

/**
 * UDTF Demo
 **/
public class ExtractFeatureList extends TableFunction<Row> {
    
    // 在线特征 
    private final static List<String> featureNames = Arrays.asList(new String[]{'f1','f2'
    });

    public void eval(String s) {
        // 特征值初始化
        Row row = new Row(featureNames.size());
        for (int i = 0; i < featureNames.size(); ++i) {
            row.setField(i, 0.0);
        }

        // 解析json格式的特征集合
        if (StringUtil.isJSONObject(s)) {
            JSONObject obj = JSONObject.parseObject(s);

            // 将每个有效特征值取出
            for (int i = 0; i < featureNames.size(); ++i) {
                String name = featureNames.get(i);
                double value = getValidFeatureValue(obj, name);
                row.setField(i, value);
            }
        }
        collect(row);
    }

    @Override
    public TypeInformation<Row> getResultType() {
        return Types.ROW(
        Types.DOUBLE, Types.DOUBLE
        );
    }

    /**
     * 获得符合有效期限制的特征值
     */
    private double getValidFeatureValue(JSONObject obj, String featureName) {
        
    }

}




/**
 * UDF Demo
 * SQL用法: select user_id,random_pick(item_list) as rec_list from tbname
 **/
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.table.functions.ScalarFunction;

import java.util.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

public class RandomPick extends ScalarFunction {

    private static final String PROBABILITY_EXTRACT = "\\{.*?\\}";
    private static final Pattern pattern = Pattern.compile(PROBABILITY_EXTRACT);


    public String eval(String s) {

        Random random = new Random();
		
		String result = "xxxxx";

        return result;
    }
}

踩坑复盘:

1. 滑动(固定)窗口算子的运行受水印时间和作业并行度影响较大,当日志计数不正确时通常可以从这两方面去排查原因。比如在对数据流较少的日志计数时可以通过减小作业并行度和union其他水印时间刷新较频繁的日志来使得能及时触发计数。

2. 采用固定窗口算子时QA在测试时某些行为的计数一直累计不正确,后来排查到是因为QA所在服务器时间戳远大于当前时间,所以导致了flink对于当前时间的日志判定为过期日志而无法触发累计。



版权声明:本文为qq632683582原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。