flink1.10 异常 this type (GenericType<pojo) cannot be used as key.
实现功能: 统计单词个数
代码如下:
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
String path = "D:\\cjj\\test.txt";
DataStream<String> ds = env.readTextFile(path);
SingleOutputStreamOperator<WordCountModel> maps = ds.flatMap(new FlatMapFunction<String, WordCountModel>() {
@Override
public void flatMap(String s, Collector<WordCountModel> out) throws Exception {
String[] words = s.split("\\s");
for (String word : words) {
out.collect(WordCountModel.of(word.trim(), 1l));
}
}
});
maps.keyBy("word").sum("num").print();
env.execute("提交任务");
}
实体类:
public class WordCountModel {
private String word;
private Long num;
private Integer age;
public static WordCountModel of(String word, Long num) {
WordCountModel wordCount = new WordCountModel();
wordCount.word = word;
wordCount.num = num;
wordCount.age = 12;
return wordCount;
}
@Override
public String toString() {
return word + " " + num;
}
}
当我们用java pojo类进行分组聚合计算时汇报如下错误信息:
org.apache.flink.api.common.InvalidProgramException:
This type (GenericType) cannot be used as key.
出现这种情况解决办法:
1、实体类所有变量都是public
2、keyby用到的变量不能是布尔类型的
3、添加无参构造函数(flink1.10不需要)
修改后的实体类
public class WordCountModel {
public String word;
public Long num;
public Integer age;
public static WordCountModel of(String word, Long num) {
WordCountModel wordCount = new WordCountModel();
wordCount.word = word;
wordCount.num = num;
wordCount.age = 12;
return wordCount;
}
@Override
public String toString() {
return word + " " + num;
}
}
运行结果:
注意:实体类没有参与运算的属性也必须是public
版权声明:本文为cjj386906718原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。