自定义Flink消费和生产Kafka消息(消费时Schema、生产时Key&Value&分区)

  • Post author:
  • Post category:其他

当我们在消费Kafka数据时,比较常用的是SimpleStringSchema,我个人比较喜欢用的是JSONKeyValueDeserializationSchema。上述两个API可以完成绝大多数的开发场景,但是但遇到个别的特殊场景时,我们需要自定义数据格式,此时我们就需要自定义xxSchema了,这里我们需要实现KafkaDeserializationSchema,从而来完成重定义消费时Kafka的数据格式。

以下是基本的模板,需要什么样的数据格式,只需要做简单的修改即可。

package com.demo.flink.sink;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.kafka.clients.consumer.ConsumerRecord;

/**
 * @author jiajingsi
 * Date 2019-04-09 14:24
 * Description
 * Version 1.0
 */
public class CustomKafkaSchema implements KafkaDeserializationSchema<ConsumerRecord<String, String>> {

    private final static String ENCODING = "UTF8";

    /**
     * Method to decide whether the element signals the end of the stream. If
     * true is returned the element won't be emitted.
     *
     * @param nextElement The element to test for the end-of-stream signal.
     * @return True, if the element signals end of stream, false otherwise.
     */
    @Override
    public boolean isEndOfStream(ConsumerRecord<String, String> nextElement) {
        return false;
    }

    /**
     * Deserializes the Kafka record.
     *
     * @param record Kafka record to be deserialized.
     * @return The deserialized message as an object (null if the message cannot be deserialized).
     */
    @Override
    public ConsumerRecord<String, String> deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
        return ConsumerRecord(record.topic(),
                record.partition(),
                record.offset(),
                record.timestamp(),
                record.timestampType(),
                record.checksum(),
                record.serializedKeySize(),
                record.serializedValueSize(),
                new String(record.key(), ENCODING),
                new String(record.value(), ENCODING));
    }

    @Override
    public TypeInformation<ConsumerRecord<String, String>> getProducedType() {
        return null;
    }
}

当我们需要将处理完的数据写入Kafka时,因为FlinkKafkaProducer011需要传入的必要参数有两个:String topic,xxSerializationSchema serializationSchema。第二个参数就是写入消息的格式。可以通过实现两个方法来达到我们的目的:SerializationSchema、KeyedSerializationSchema,后者在1.10版本时已经过时,所以我们可以通过前者来实现。两者本质上的区别可能就是后者是key/value格式的数据,同时后者也允许重写目标topic这样可以满足我们的数据动态写入不同topic的需求。因为FlinkKafkaProducer011都继承了TwoPhaseCommitSinkFunction,从而保证数据不丢失。KeyedSerializationSchema也可以通过Semantic来实现EXACTLY_ONCE仅一次语义。具体可以自己查看FlinkKafkaProducer011。

package com.demo.flink.sink;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;

/**
 * @author jiajingsi
 * Date 2019-04-09 14:24
 * Description
 * Version 1.0
 */
public class CustomKeyedSerializationSchema implements KeyedSerializationSchema<ObjectNode> {

    private final static String ENCODING = "UTF8";


    /**
     * Serializes the key of the incoming element to a byte array
     * This method might return null if no key is available.
     *
     * @param record The incoming element to be serialized
     * @return the key of the element as a byte array
     */
    @Override
    public byte[] serializeKey(ObjectNode record) {
        return record.get("after").get("id").toString().getBytes();
    }

    /**
     * Serializes the value of the incoming element to a byte array.
     *
     * @param record The incoming element to be serialized
     * @return the value of the element as a byte array
     */
    @Override
    public byte[] serializeValue(ObjectNode record) {
        return record.toString().getBytes();
    }

    /**
     * Optional method to determine the target topic for the element.
     *
     * @param record Incoming element to determine the target topic from
     * @return null or the target topic
     */
    @Override
    public String getTargetTopic(ObjectNode record) {
        return "mysql-31.ods.demo";
    }
}

分区KafkaProducer 默认使用的是FlinkFixedPartitioner。

在我使用的Flink版本1.10,KeyedSerializationSchema已经过时了,我们可以通过使用KafkaSerializationSchema来代替,使用方式基本差不多,可自行参考源码。


版权声明:本文为weixin_40163498原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。