flink入门之使用JSONKeyValueDeserializationSchema反序列kafka消息

  • Post author:
  • Post category:其他




背景需求



在日常生产中,我们大多数时候都会用到flink的kafka connector,在使用过程中,大多数的程序员都会使用new SimpleStringSchema()来反序列化Kafka中的数据,然后使用alibaba提供的fastJson来解析数据

例如:

  val jsonObject = JSON.parseObject(jsonStr)
  val eventId = JSON.parseObject(jsonObject.getString("eventDetail")).getString("eventId")
  val uuid_geek = JSON.parseObject(jsonObject.getString("eventDetail")).getString("uuid")
  val poiIdArray = JSON.parseObject(jsonObject.getString("eventDetail")).getJSONArray("poiId")

虽然这样可以实现业务的需求,但一方面比较麻烦,另一方面是对于多字段的json数据,会把许多不必要的字段一起带出来,造成的效率减低。



解决方法



flink自带了一种反序列化的机制——JSONKeyValueDeserializationSchema。

这种方式不但可以解析json结构,避免出现空指针异常,而且可以把消费者信息带上来,非常方便。



代码

import java.util.Properties
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema

object Test{

  def main(args: Array[String]): Unit = {

    val env = StreamExecutionEnvironment.getExecutionEnvironment
	//kafka参数
    val properties: Properties = new Properties()
    properties.setProperty("bootstrap.servers", "flink:9092")
    properties.setProperty("group.id", "flink_test")
    properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    properties.setProperty("auto.offset.reset", "latest")
    //如果想读多个主题,必须放在java的list中
    import scala.collection.JavaConverters._
    val topics = List[String]("flinkTest").asJava
    //这里JSONKeyValueDeserializationSchema参数true,代表返回消费者信息
    val jsonDstream = env.addSource(new FlinkKafkaConsumer011(topics,new JSONKeyValueDeserializationSchema(true),properties))
    val result = jsonDstream.map(obj => {
      val name = obj.get("value").get("friend").get("name")
      val age = obj.get("value").get("age")
      val offset = obj.get("metadata").get("offset")
      val topic = obj.get("metadata").get("topic")
      val partition = obj.get("metadata").get("partition")
      (name,age,s"消费的主题是:$topic,分区是:$partition,当前偏移量是:$offset")
    })
    result.print()
    env.execute()

  }

我们输入一条json:

{“name”:“jack”,“age”:“18”,“city”:“北京市”,“friend”:{“name”:“xiaoming”,“age”:“15”}}

看到输出结果为:

(“xiaoming”,“18”,消费的主题是:“flinkTest”,分区是:0,当前偏移量是:51)

我们再输入一条不完整的数据:

{“name”:“jack”,“city”:“北京市”,“friend”:{“name”:“xiaoming”,“age”:“15”}}

看到输出结果为:

(“xiaoming”,null,消费的主题是:“flinkTest”,分区是:0,当前偏移量是:52)



总结

我们可以看到,用了JSONKeyValueDeserializationSchema反序列方法,我们就不用手动去解析json结构了,对于没有的字段,就是直接返回null,非常方便。

这里返回的值为JsonNode类型,我们可以手动转换成各种想要的类型,但是要注意,如果你取的字段有空值,会造成空指针异常,要放到try/catch中。



版权声明:本文为bradyM原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。