Kafka发送数据

  • Post author:
  • Post category:其他


import org.apache.spark.sql.avro.{SchemaConverters, AvroSerializer}

import org.apache.avro.Schema

import org.apache.avro.generic.{GenericRecord, GenericData}

val avroSchema: Schema = SchemaConverters.toAvroType(df.schema)

val serializedData = df.rdd.map(row => {

val genericRecord = new GenericData.Record(avroSchema)

for (i <- 0 until row.length) {

genericRecord.put(i, row(i))

}

AvroSerializer.serialize(genericRecord, avroSchema)

})



版权声明:本文为li93675原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。