基于Flink及Redis布隆过滤器的大数据去重

  • Post author:
  • Post category:其他


Flink是由Apache软件基金会开发的开源流处理框架,社区活跃,并由阿里主导;

布隆过滤器是海量数据去重利器;

Redisson是Redis官方推荐的Java版的Redis客户端,它基于Redis做了更多功能封装,其中就包括布隆过滤器;

结合这三者可以快速的实现一个流式的数据去重功能。

package bill;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010;
import org.apache.flink.streaming.util.serialization.SerializationSchema;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;

import java.io.InputStream;
import java.util.Properties;

/**
 * Created by Bill on 2019-5-9.
 */
public class KafkaMessageStreaming {
    public static void main(String[] args) throws Exception {
        String inTopic = args[0];
        String outTopic = args[1];
        String redisUrl = args[2];
        String jobName = args[3];
        String kafkaProps = args[4];

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(300000);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        Configuration conf = new Configuration();
        conf.setString("redis_url",redisUrl);
        conf.setString("job_name",jobName);
        env.getConfig().setGlobalJobParameters(conf);

        Properties props = new Properties();
        InputStream in = KafkaMessageStreaming.class.getClassLoader().getResourceAsStream(kafkaProps);
        props.load(in);

        FlinkKafkaConsumer010<String> consumer =
                new FlinkKafkaConsumer010<>(inTopic, new SimpleStringSchema(), props);
        consumer.assignTimestampsAndWatermarks(new MessageWaterEmitter());

        FlinkKafkaProducer010<Tuple2<String, String>> producer =
                new FlinkKafkaProducer010<Tuple2<String, String>>(outTopic, new SerializationSchema<Tuple2<String, String>>() {
                    @Override
                    public byte[] serialize(Tuple2<String, String> element) {
                        return (element.f0+","+element.f1).getBytes();
                    }
                }, props);

        env.addSource(consumer)
                .flatMap(new MessageSplitter())
                .keyBy(1, 2, 3)
                .flatMap(new UniqAnalysis())
                .addSink(producer);

        env.execute(jobName);
    }
}
package bill;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;
import org.redisson.Redisson;
import org.redisson.api.RBloomFilter;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;

public class UniqAnalysis extends RichFlatMapFunction<Tuple4<Long, String, String, String>, Tuple2<String, String>> {
    private String redisUrl = null;
    private String jobName = null;
    transient private RedissonClient redisson = null;
    transient private RBloomFilter<String> bloomFilter = null;
    private Tuple2<String, String> doaminFirstTime = new Tuple2<>("", "");
    @Override
    public void flatMap(Tuple4<Long, String, String, String> in, Collector<Tuple2<String, String>> out) {
        if (!this.bloomFilter.contains(in.f1 + in.f2 + in.f3)) {
            this.bloomFilter.add(in.f1 + in.f2 + in.f3);
            doaminFirstTime.f0 = String.valueOf(in.f0);
            doaminFirstTime.f1 = in.f1 +","+ in.f2 +"," + in.f3;
            out.collect(doaminFirstTime);
            //System.out.println("filter redisUrl:" + this.redisUrl + " bloomFilter:" + bloomFilter);
        }
    }
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        ExecutionConfig.GlobalJobParameters globalParams = getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
        Configuration globConf = (Configuration) globalParams;
        this.redisUrl = globConf.getString("redis_url", null);
        this.jobName = globConf.getString("job_name", null);
        //System.out.println("redisUrl:"+this.redisUrl);
        if(this.redisUrl!=null) {
            Config redissonConfig = new Config();
            redissonConfig.useSingleServer().setAddress(this.redisUrl);
            redisson = Redisson.create(redissonConfig);
            this.bloomFilter = redisson.getBloomFilter("first");
            this.bloomFilter.tryInit(100_000_000, 0.03);
            System.out.println(System.currentTimeMillis()+" ["+this.jobName+"] RichFilterFunction open new MessageSplitter bloomFilter:" + bloomFilter);
        }
    }

    @Override
    public void close() {
        System.out.println(System.currentTimeMillis()+" ["+this.jobName+"] redisson.shutdown()");
        this.redisson.shutdown();
    }
}
package bill;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.util.Collector;

/**
 * Created by Bill on 2019-5-9.
 */
public class MessageSplitter implements FlatMapFunction<String, Tuple4<Long, String, String, String>> {
    private Tuple4<Long, String, String, String> doaminFirstTime = new Tuple4<>(0L, "", "", "");
    @Override
    public void flatMap(String value, Collector<Tuple4<Long, String, String, String>> out) {
        if (value != null && value.contains(",")) {
            String[] parts = value.split(",");
            if(parts.length==9 && parts[6].length()!=0){
                doaminFirstTime.f0 = Long.parseLong(parts[1]);
                doaminFirstTime.f1 = parts[4];
                doaminFirstTime.f2 = parts[5];
                doaminFirstTime.f3 = parts[6];
                out.collect(doaminFirstTime);
            }
        }
    }
}



版权声明:本文为Bill_Xiang原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。