当我们在消费Kafka数据时,比较常用的是SimpleStringSchema,我个人比较喜欢用的是JSONKeyValueDeserializationSchema。上述两个API可以完成绝大多数的开发场景,但是但遇到个别的特殊场景时,我们需要自定义数据格式,此时我们就需要自定义xxSchema了,这里我们需要实现KafkaDeserializationSchema,从而来完成重定义消费时Kafka的数据格式。
以下是基本的模板,需要什么样的数据格式,只需要做简单的修改即可。
package com.demo.flink.sink;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.kafka.clients.consumer.ConsumerRecord;
/**
* @author jiajingsi
* Date 2019-04-09 14:24
* Description
* Version 1.0
*/
public class CustomKafkaSchema implements KafkaDeserializationSchema<ConsumerRecord<String, String>> {
private final static String ENCODING = "UTF8";
/**
* Method to decide whether the element signals the end of the stream. If
* true is returned the element won't be emitted.
*
* @param nextElement The element to test for the end-of-stream signal.
* @return True, if the element signals end of stream, false otherwise.
*/
@Override
public boolean isEndOfStream(ConsumerRecord<String, String> nextElement) {
return false;
}
/**
* Deserializes the Kafka record.
*
* @param record Kafka record to be deserialized.
* @return The deserialized message as an object (null if the message cannot be deserialized).
*/
@Override
public ConsumerRecord<String, String> deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
return ConsumerRecord(record.topic(),
record.partition(),
record.offset(),
record.timestamp(),
record.timestampType(),
record.checksum(),
record.serializedKeySize(),
record.serializedValueSize(),
new String(record.key(), ENCODING),
new String(record.value(), ENCODING));
}
@Override
public TypeInformation<ConsumerRecord<String, String>> getProducedType() {
return null;
}
}
当我们需要将处理完的数据写入Kafka时,因为FlinkKafkaProducer011需要传入的必要参数有两个:String topic,xxSerializationSchema serializationSchema。第二个参数就是写入消息的格式。可以通过实现两个方法来达到我们的目的:SerializationSchema、KeyedSerializationSchema,后者在1.10版本时已经过时,所以我们可以通过前者来实现。两者本质上的区别可能就是后者是key/value格式的数据,同时后者也允许重写目标topic这样可以满足我们的数据动态写入不同topic的需求。因为FlinkKafkaProducer011都继承了TwoPhaseCommitSinkFunction,从而保证数据不丢失。KeyedSerializationSchema也可以通过Semantic来实现EXACTLY_ONCE仅一次语义。具体可以自己查看FlinkKafkaProducer011。
package com.demo.flink.sink;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
/**
* @author jiajingsi
* Date 2019-04-09 14:24
* Description
* Version 1.0
*/
public class CustomKeyedSerializationSchema implements KeyedSerializationSchema<ObjectNode> {
private final static String ENCODING = "UTF8";
/**
* Serializes the key of the incoming element to a byte array
* This method might return null if no key is available.
*
* @param record The incoming element to be serialized
* @return the key of the element as a byte array
*/
@Override
public byte[] serializeKey(ObjectNode record) {
return record.get("after").get("id").toString().getBytes();
}
/**
* Serializes the value of the incoming element to a byte array.
*
* @param record The incoming element to be serialized
* @return the value of the element as a byte array
*/
@Override
public byte[] serializeValue(ObjectNode record) {
return record.toString().getBytes();
}
/**
* Optional method to determine the target topic for the element.
*
* @param record Incoming element to determine the target topic from
* @return null or the target topic
*/
@Override
public String getTargetTopic(ObjectNode record) {
return "mysql-31.ods.demo";
}
}
分区KafkaProducer 默认使用的是FlinkFixedPartitioner。
在我使用的Flink版本1.10,KeyedSerializationSchema已经过时了,我们可以通过使用KafkaSerializationSchema来代替,使用方式基本差不多,可自行参考源码。