Flink教程(14) Keyed State状态管理之MapState使用案例

  • Post author:
  • Post category:其他




系列文章


Flink教程(13) Keyed State状态管理之ValueState的使用 温差报警



Flink教程(14) Keyed State状态管理之MapState使用案例



Flink教程(15) Keyed State状态管理之ListState使用 ValueState实现



Flink教程(16) Keyed State状态管理之ReducingState使用案例 求最大值



Flink教程(17) Keyed State状态管理之AggregatingState使用案例 求平均值



一、MapState的方法

MapState的方法和Java的Map的方法极为相似,所以上手相对容易。

常用的有如下:

  • get()方法获取值
  • put(),putAll()方法更新值
  • remove()删除某个key
  • contains()判断是否存在某个key
  • isEmpty() 判断是否为空

    在这里插入图片描述



二、定义MapStateDescriptor和获取MapState

MapStateDescriptor<String, Integer> diffCountDescriptor = new MapStateDescriptor<String, Integer>(
        "diff-count-map",//state的id
        String.class,//key的类型
        Integer.class);//value的类型

diffCountMap = getRuntimeContext().getMapState(diffCountDescriptor);



三、统计温度升高/降低超过阈值的次数

程序主题和上一篇博客

Keyed State状态之ValueState

一样

public static class MyKeyedProcessFunction extends KeyedProcessFunction<String, SensorRecord, Tuple3<String, Integer, Integer>> {

    private int tempDiff;

    public MyKeyedProcessFunction(int tempDiff) {
        this.tempDiff = tempDiff;
    }

    //上次温度
    private transient ValueState<Double> lastTemp;

    //温度升高/降低超过预警值的次数
    private transient MapState<String, Integer> diffCountMap;

    private String upKey = "upKey";

    private String downKey = "downKey";

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);

        ValueStateDescriptor<Double> lastTempDescriptor = new ValueStateDescriptor<Double>(
                "last-temp",
                Double.class);

        lastTemp = getRuntimeContext().getState(lastTempDescriptor);

        MapStateDescriptor<String, Integer> diffCountDescriptor = new MapStateDescriptor<String, Integer>(
                "diff-count-map",
                String.class,
                Integer.class);

        diffCountMap = getRuntimeContext().getMapState(diffCountDescriptor);
    }

    @Override
    public void processElement(SensorRecord value, Context ctx, Collector<Tuple3<String, Integer, Integer>> out) throws Exception {

        //第一条数据,需要处理
        if (lastTemp.value() == null) {
            lastTemp.update(Double.MIN_VALUE);

            if (!diffCountMap.contains(upKey)){
                diffCountMap.put(upKey, 0);
            }

            if (!diffCountMap.contains(downKey)){
                diffCountMap.put(downKey, 0);
            }
        }
        else {
            boolean needOut = false;

            //温度升高超过阈值
            if (value.getRecord() - lastTemp.value() > tempDiff){
                diffCountMap.put(upKey, diffCountMap.get(upKey) + 1);
                needOut = true;
            }
            //温度降低超过阈值
            else if (lastTemp.value() - value.getRecord() > tempDiff) {
                diffCountMap.put(downKey, diffCountMap.get(downKey) + 1);
                needOut = true;
            }

            if (needOut && !diffCountMap.isEmpty()){
                out.collect(Tuple3.of(value.getId(), diffCountMap.get(upKey), diffCountMap.get(downKey)));
            }
        }

        if (value.getRecord() != null) {
            lastTemp.update(value.getRecord());
        }
    }
}



版权声明:本文为winterking3原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。