Flink connector,java对象传输到kafka序列化与反序列化

  • Post author:
  • Post category:java




Flink connector,java对象传输到kafka序列化与反序列化

当dwd层完成计算后,需要将数据写入kafka,然后再由dws从kafka读出该数据,进一步统计。在使用java编写flink程序时,这些数据就是用java对象表示和操作的。所以Flink如何将java对象数据写入kafka,以及如何读出,需要我们自己实现序列化和反序列化方法。通过实现org.apache.flink.api.common.serialization.SerializationSchema接口,我们可以定义自己的序列化方法;

主要修改addSink和addSource指定的Schema即可。

#dwd层,即需要序列化方法将对象写入kafka的应用

package com.ti.dwd;

import com.ti.domain.SecurityInfo;
import com.ti.functions.map.ParseJsonMapFunction;
import com.ti.utils.serializer.FlinkSecurityInfoSerializationSchema;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;

import java.util.Properties;

public class ParseJson {
    public static void main(String[] args) throws Exception {
//        从kafka创建流,元素为json str。
        StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        Properties properties=new Properties();
        properties.setProperty("bootstrap.servers","localhost:9092");
        properties.setProperty("group.id","dwd_ParseJson");
        properties.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("auto.offset.reset","latest");

        DataStream<String> kafkaStream=env.addSource(new FlinkKafkaConsumer<String>("ods_json",new SimpleStringSchema(),properties));
        kafkaStream.print("2");

        DataStream<SecurityInfo> infoStream=kafkaStream.map(new ParseJsonMapFunction());

        Properties properties1=new Properties();
        properties1.setProperty("bootstrap.servers","localhost:9092");

        infoStream.addSink(new FlinkKafkaProducer<SecurityInfo>("dwd_ParseJson",new FlinkSecurityInfoSerializationSchema<SecurityInfo>(),properties1));
        env.execute("ParseJson");
    }
}

#dws层,即需要反序列化方法将kafka的字节流转为对象的应用
package com.ti.dws;

import com.ti.domain.SecurityInfo;
import com.ti.functions.map.ParseJsonMapFunction;
import com.ti.utils.serializer.FlinkSecurityInfoDeserializationSchema;
import com.ti.utils.serializer.FlinkSecurityInfoSerializationSchema;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import java.util.Properties;

public class LocationDayPeriod {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        Properties properties=new Properties();
        properties.setProperty("bootstrap.servers","localhost:9092");
        properties.setProperty("groupid","dws_LocationDayPeriod");
        properties.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("auto.offset.reset","latest");

        DataStream<SecurityInfo> infoStream=env.addSource(new FlinkKafkaConsumer<SecurityInfo>("dwd_ParseJson",new FlinkSecurityInfoDeserializationSchema<SecurityInfo>(SecurityInfo.class), properties));
        StreamTableEnvironment tableEnv=StreamTableEnvironment.create(env);
        Table infoTable=tableEnv.fromDataStream(infoStream);
        infoTable.printSchema();
        tableEnv.toChangelogStream(infoTable).print("infoTable:");
        Table countLocationTable=tableEnv.sqlQuery("select location,sampletime,count(location) from "+infoTable+" group by location,sampletime");
        Table countTypeTable=tableEnv.sqlQuery("select type,sampletime,count(location) from "+infoTable+" group by type,sampletime");
        Table countArchTable=tableEnv.sqlQuery("select architecture,sampletime,count(location) from "+infoTable+" group by architecture,sampletime");
        tableEnv.toChangelogStream(countLocationTable).print("countLocationTable: ");
        tableEnv.toChangelogStream(countTypeTable).print("countTypeTable:");
        tableEnv.toChangelogStream(countArchTable).print("countArchTable:");
//  所有数据中,每天分布了多少个。
        env.execute("locationDayPeriod");
    }
}

#序列化 Flink schema
package com.ti.utils.serializer;

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;

public class FlinkSecurityInfoSerializationSchema<T> implements SerializationSchema<T> {

    @Override
    public byte[] serialize(T element) {
        return SerializeUtil.serialize(element);
    }
}

#反序列化  Flink schema
package com.ti.utils.serializer;

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.TypeExtractor;

import java.io.IOException;

public class FlinkSecurityInfoDeserializationSchema<T> implements DeserializationSchema<T> {
    private Class<T> clazz;
    public FlinkSecurityInfoDeserializationSchema(Class<T> clazz){
        this.clazz=clazz;
    }

    @Override
    public T deserialize(byte[] message) throws IOException {
        return (T)SerializeUtil.deserialize(message,clazz);
    }

    @Override
    public boolean isEndOfStream(T nextElement) {
        return false;
    }

    @Override
    public TypeInformation<T> getProducedType() {
        return TypeExtractor.getForClass(clazz);
    }
}

#实现具体功能的util

package com.ti.utils.serializer;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;

public class SerializeUtil {

    public static byte[] serialize(Object object) {
        ObjectOutputStream oos = null;
        ByteArrayOutputStream baos = null;
        try {
            // 序列化
            baos = new ByteArrayOutputStream();
            oos = new ObjectOutputStream(baos);
            oos.writeObject(object);
            byte[] bytes = baos.toByteArray();
            return bytes;
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }

    @SuppressWarnings("unchecked")
    public static <T> Object deserialize(byte[] bytes,Class<T> className) {
        ByteArrayInputStream bais = null;
        T tmpObject = null;
        try {
            // 反序列化
            bais = new ByteArrayInputStream(bytes);
            ObjectInputStream ois = new ObjectInputStream(bais);
            tmpObject = (T)ois.readObject();
            return tmpObject;
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }
}



版权声明:本文为m0_50689246原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。