Flink是由Apache软件基金会开发的开源流处理框架,社区活跃,并由阿里主导;
布隆过滤器是海量数据去重利器;
Redisson是Redis官方推荐的Java版的Redis客户端,它基于Redis做了更多功能封装,其中就包括布隆过滤器;
结合这三者可以快速的实现一个流式的数据去重功能。
package bill;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010;
import org.apache.flink.streaming.util.serialization.SerializationSchema;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import java.io.InputStream;
import java.util.Properties;
/**
* Created by Bill on 2019-5-9.
*/
public class KafkaMessageStreaming {
public static void main(String[] args) throws Exception {
String inTopic = args[0];
String outTopic = args[1];
String redisUrl = args[2];
String jobName = args[3];
String kafkaProps = args[4];
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(300000);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
Configuration conf = new Configuration();
conf.setString("redis_url",redisUrl);
conf.setString("job_name",jobName);
env.getConfig().setGlobalJobParameters(conf);
Properties props = new Properties();
InputStream in = KafkaMessageStreaming.class.getClassLoader().getResourceAsStream(kafkaProps);
props.load(in);
FlinkKafkaConsumer010<String> consumer =
new FlinkKafkaConsumer010<>(inTopic, new SimpleStringSchema(), props);
consumer.assignTimestampsAndWatermarks(new MessageWaterEmitter());
FlinkKafkaProducer010<Tuple2<String, String>> producer =
new FlinkKafkaProducer010<Tuple2<String, String>>(outTopic, new SerializationSchema<Tuple2<String, String>>() {
@Override
public byte[] serialize(Tuple2<String, String> element) {
return (element.f0+","+element.f1).getBytes();
}
}, props);
env.addSource(consumer)
.flatMap(new MessageSplitter())
.keyBy(1, 2, 3)
.flatMap(new UniqAnalysis())
.addSink(producer);
env.execute(jobName);
}
}
package bill;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;
import org.redisson.Redisson;
import org.redisson.api.RBloomFilter;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
public class UniqAnalysis extends RichFlatMapFunction<Tuple4<Long, String, String, String>, Tuple2<String, String>> {
private String redisUrl = null;
private String jobName = null;
transient private RedissonClient redisson = null;
transient private RBloomFilter<String> bloomFilter = null;
private Tuple2<String, String> doaminFirstTime = new Tuple2<>("", "");
@Override
public void flatMap(Tuple4<Long, String, String, String> in, Collector<Tuple2<String, String>> out) {
if (!this.bloomFilter.contains(in.f1 + in.f2 + in.f3)) {
this.bloomFilter.add(in.f1 + in.f2 + in.f3);
doaminFirstTime.f0 = String.valueOf(in.f0);
doaminFirstTime.f1 = in.f1 +","+ in.f2 +"," + in.f3;
out.collect(doaminFirstTime);
//System.out.println("filter redisUrl:" + this.redisUrl + " bloomFilter:" + bloomFilter);
}
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
ExecutionConfig.GlobalJobParameters globalParams = getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
Configuration globConf = (Configuration) globalParams;
this.redisUrl = globConf.getString("redis_url", null);
this.jobName = globConf.getString("job_name", null);
//System.out.println("redisUrl:"+this.redisUrl);
if(this.redisUrl!=null) {
Config redissonConfig = new Config();
redissonConfig.useSingleServer().setAddress(this.redisUrl);
redisson = Redisson.create(redissonConfig);
this.bloomFilter = redisson.getBloomFilter("first");
this.bloomFilter.tryInit(100_000_000, 0.03);
System.out.println(System.currentTimeMillis()+" ["+this.jobName+"] RichFilterFunction open new MessageSplitter bloomFilter:" + bloomFilter);
}
}
@Override
public void close() {
System.out.println(System.currentTimeMillis()+" ["+this.jobName+"] redisson.shutdown()");
this.redisson.shutdown();
}
}
package bill;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.util.Collector;
/**
* Created by Bill on 2019-5-9.
*/
public class MessageSplitter implements FlatMapFunction<String, Tuple4<Long, String, String, String>> {
private Tuple4<Long, String, String, String> doaminFirstTime = new Tuple4<>(0L, "", "", "");
@Override
public void flatMap(String value, Collector<Tuple4<Long, String, String, String>> out) {
if (value != null && value.contains(",")) {
String[] parts = value.split(",");
if(parts.length==9 && parts[6].length()!=0){
doaminFirstTime.f0 = Long.parseLong(parts[1]);
doaminFirstTime.f1 = parts[4];
doaminFirstTime.f2 = parts[5];
doaminFirstTime.f3 = parts[6];
out.collect(doaminFirstTime);
}
}
}
}
版权声明:本文为Bill_Xiang原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。