flink消费kafka消息,自定义keyselector

  • Post author:
  • Post category:其他


为了能达到在不同的keyby/window中,能按照自定义的业务要求,把相同的消息分到同一个消费窗口,需要自定义keyselector,如本例中,我想要按照消息体中的url字段,把相同url字段的消息,用同一个线程来消费:写法为:

//通过自定义keyby selector,能保证相同url的新媒体消息,出现在同一个keyby/window窗口
        env.fromSource(newSource, WatermarkStrategy.noWatermarks(), "newPageSource")
                .keyBy((KeySelector<ConsumerRecord, Object>) consumerRecord -> {
                    String jsonStr = new String((byte[]) consumerRecord.value(),
                            StandardCharsets.UTF_8);
                    if (StrUtil.isNotBlank(jsonStr) && !"null".equals(jsonStr)) {
                        JSONObject jsonObject = JSONUtil.parseObj(jsonStr);
                        String url = jsonObject.getStr("url");
                        return url;
                    }

                    return "";
                }).addSink(newPageSink).name("newPageSink");



版权声明:本文为fangletian1981原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。