go连接kafka

  • Post author:
  • Post category:其他


Part1前言

本文主要介绍如何通过go语言连接kafka。这里采用的是sarama库。

https://github.com/Shopify/sarama

028f4b4a51b9ba91d30b9d78c85486ec.png

Part2库的安装

go get -u github.com/Shopify/sarama

go get相关定义

参数介绍:
-d 只下载不安装
-f 只有在你包含了 -u 参数的时候才有效,不让 -u 去验证 import 中的每一个都已经获取了,这对于本地 fork 的包特别有用
-fix 在获取源码之后先运行 fix,然后再去做其他的事情
-t 同时也下载需要为运行测试所需要的包
-u 强制使用网络去更新包和它的依赖包
-v 显示执行的命令

Part3生产消息

参考代码

https://github.com/Shopify/sarama/tree/main/examples/txn_producer


这里为了提升性能不考虑事务的概念。

1创建消费者

version, err := sarama.ParseKafkaVersion("2.8.1")
 if err != nil {
  Error(err.Error())
  return
 }
 config := sarama.NewConfig()
 config.Version = version
 config.Producer.Idempotent = true
 config.Producer.Return.Successes = true
 config.Producer.Return.Errors = true
 config.Producer.RequiredAcks = sarama.WaitForAll
 config.Producer.Partitioner = sarama.NewManualPartitioner
 config.Net.MaxOpenRequests = 1

 brokers := G_Configjson.KafkaBroker
 producer, err := sarama.NewSyncProducer(strings.Split(brokers, ","), config)
 if err != nil {
  Error(err.Error())
  return
 }

代码解读:

ParseKafkaVersion :填写当前kafka的版本

Idempotent:kafka 0.11.0.0版本引入了idempotent producer机制,在这个机制中,同一消息可能被producer发送多次,但是在broker端只会写入一次,他为每一条消息编号去重,而且对kafka开销影响不大。需要设置producer端的新参数 enable.idempotent 为true。

WaitForAll:等待所有副本完成提交

MaxOpenRequests:允许有多少未完成请求的发送,这个值越大,性能越好,但是如果Idempotent为false,不能确保消息顺序。

NewSyncProducer:创建生产者,可以有多个brokers。

2发送消息

msg := &sarama.ProducerMessage{Topic: kafkaProduce.sipTopic, Key: nil, Partition: kafkaProduce.sipPartition,
  Value: sarama.ByteEncoder(packet.Data()),
 }
 partition, offset, err := kafkaProduce.sipProduce.SendMessage(msg)
 if nil != err {
  logger.Error(err.Error())
  logger.Info(fmt.Sprintf("Kafka SendMessage partition=%d offset=%d", partition, offset))
 }

代码解读:

发送消息需要指定发送的Topic,Partition,以及数据等内容。

Part4消费消息

代码参考

https://github.com/Shopify/sarama/blob/main/examples/consumergroup/main.go

3创建消费者

version, err := sarama.ParseKafkaVersion("2.8.1")
 if err != nil {
  log.Panicf("Error parsing Kafka version: %v", err)
 }
  config := sarama.NewConfig()
 config.Version = version
 //设置初始offset
 config.Consumer.Offsets.Initial = sarama.OffsetOldest
  client, err := sarama.NewConsumerGroup(strings.Split(brokers, ","), group, config)

代码解读:初始化信息和之前的创建生产者类似,不过通过Offsets.Initial来设置是消费最新的消息还是之前未消费完的消息

NewConsumerGroup:创建消费者

4消费数据

这里需要创建一个struct让其来消费,struct定义如下

type Consumer struct {
}

func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error {
 return nil
}

func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error {
 return nil
}

func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
 for {
  select {
  case message := <-claim.Messages():
   log.Printf("Message claimed: timestamp = %v, topic = %s offset=%d", message.Timestamp, message.Topic, message.Offset)
   session.MarkMessage(message, "")
  case <-session.Context().Done():
   return nil
  }
 }
}

重点函数是ConsumeClaim来消费数据。

之后将struct对象设置给之前创建的消费者,使用示例如下

if err := client.Consume(ctx, strings.Split(topics, ","), &consumer); err != nil {
  log.Panicf("Error from consumer: %v", err)
 }

Part5总结

本文主要解决go语言通过sarama库来链接kafka进行消费者和生产者的创建。



版权声明:本文为g0415shenw原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。