背景需求
在日常生产中,我们大多数时候都会用到flink的kafka connector,在使用过程中,大多数的程序员都会使用new SimpleStringSchema()来反序列化Kafka中的数据,然后使用alibaba提供的fastJson来解析数据
例如:
val jsonObject = JSON.parseObject(jsonStr)
val eventId = JSON.parseObject(jsonObject.getString("eventDetail")).getString("eventId")
val uuid_geek = JSON.parseObject(jsonObject.getString("eventDetail")).getString("uuid")
val poiIdArray = JSON.parseObject(jsonObject.getString("eventDetail")).getJSONArray("poiId")
虽然这样可以实现业务的需求,但一方面比较麻烦,另一方面是对于多字段的json数据,会把许多不必要的字段一起带出来,造成的效率减低。
解决方法
flink自带了一种反序列化的机制——JSONKeyValueDeserializationSchema。
这种方式不但可以解析json结构,避免出现空指针异常,而且可以把消费者信息带上来,非常方便。
代码
import java.util.Properties
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema
object Test{
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
//kafka参数
val properties: Properties = new Properties()
properties.setProperty("bootstrap.servers", "flink:9092")
properties.setProperty("group.id", "flink_test")
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("auto.offset.reset", "latest")
//如果想读多个主题,必须放在java的list中
import scala.collection.JavaConverters._
val topics = List[String]("flinkTest").asJava
//这里JSONKeyValueDeserializationSchema参数true,代表返回消费者信息
val jsonDstream = env.addSource(new FlinkKafkaConsumer011(topics,new JSONKeyValueDeserializationSchema(true),properties))
val result = jsonDstream.map(obj => {
val name = obj.get("value").get("friend").get("name")
val age = obj.get("value").get("age")
val offset = obj.get("metadata").get("offset")
val topic = obj.get("metadata").get("topic")
val partition = obj.get("metadata").get("partition")
(name,age,s"消费的主题是:$topic,分区是:$partition,当前偏移量是:$offset")
})
result.print()
env.execute()
}
我们输入一条json:
{“name”:“jack”,“age”:“18”,“city”:“北京市”,“friend”:{“name”:“xiaoming”,“age”:“15”}}
看到输出结果为:
(“xiaoming”,“18”,消费的主题是:“flinkTest”,分区是:0,当前偏移量是:51)
我们再输入一条不完整的数据:
{“name”:“jack”,“city”:“北京市”,“friend”:{“name”:“xiaoming”,“age”:“15”}}
看到输出结果为:
(“xiaoming”,null,消费的主题是:“flinkTest”,分区是:0,当前偏移量是:52)
总结
我们可以看到,用了JSONKeyValueDeserializationSchema反序列方法,我们就不用手动去解析json结构了,对于没有的字段,就是直接返回null,非常方便。
这里返回的值为JsonNode类型,我们可以手动转换成各种想要的类型,但是要注意,如果你取的字段有空值,会造成空指针异常,要放到try/catch中。