go如何操作kafka-直接上代码

  • Post author:
  • Post category:其他




1、kafka的配置项

// KafkaConfig kafka配置
type KafkaConfig struct {
	DisableConsumer bool   `mapstructure:"disableConsumer"`
	Debug           bool   `mapstructure:"debug"`
	Address         string `mapstructure:"address"`
	ReadTimeout     int64  `mapstructure:"read-timeout"`
	WriteTimeout    int64  `mapstructure:"write-timeout"`
	RequiredAck     int    `mapstructure:"required-ack"`
	MaxOpenRequests int    `mapstructure:"max-open-requests"`
	Partition       int    `mapstructure:"partition"`
}
// Conf 上层使用
type Conf struct {
  KafKa            map[string]*KafkaConfig `mapstructure:"kafka"`
}
// ymal
kafka:
  disableConsumer:
    debug: true
    address: 127.0.0.1:9092
    required-ack: -1 # 发送完数据后是否需要拿多少个副本确认 -1 需要全部
    read-timeout: 30 # 默认30s
    write-timeout: 30 # 默认30s
    max-open-requests: 5  # 在发送阻塞之前,允许有多少个未完成的请求,默认为5
    partition: 2 # 分区生成方案 0根据topic进行hash、1随机、2轮询



2、kafka初始化配置

使用的是”github.com/IBM/sarama”包

type Kafka struct {
	Key             string
	DisableConsumer bool
	Debug           bool
	Producer        sarama.SyncProducer
	Consumer        sarama.Consumer
	Client          sarama.Client
}

// kafkaClient 做kafka的k-v,因为kafka集群搭建,这用的是单体,通过map获取不同的broker
var kafkaClient = new(sync.Map)

func InitKafka() {
	for k, v := range setting.Conf.KafKa {
		key := k
		val := v
		scfg := buildConfig(val)
		kafka, err := newKafkaClient(key, val, scfg)
		if err != nil {
			zap.L().Error("newKafkaClient(key, val, scfg) failed:", zap.Error(err))
			return
		}
		kafkaClient.Store(key, kafka)
	}
}

// GetClient 根据map的key获取kafka实例
func GetClient(key string) (*Kafka, error) {
	val, ok := kafkaClient.Load(key)
	if !ok {
		return nil, fmt.Errorf("获取kafka client失败,key:%s", key)
	}

	return val.(*Kafka), nil
}

// buildConfig kafka配置
func buildConfig(v *setting.KafkaConfig) *sarama.Config {
	cfg := sarama.NewConfig()
	cfg.Producer.RequiredAcks = sarama.RequiredAcks(v.RequiredAck)
	cfg.Producer.Return.Successes = true

	if v.Partition == 1 {
		cfg.Producer.Partitioner = sarama.NewRandomPartitioner
	}

	if v.Partition == 2 {
		cfg.Producer.Partitioner = sarama.NewRoundRobinPartitioner
	}

	if v.ReadTimeout != 0 {
		cfg.Net.ReadTimeout = time.Duration(v.ReadTimeout) * time.Second
	}

	if v.WriteTimeout != 0 {
		cfg.Net.WriteTimeout = time.Duration(v.WriteTimeout) * time.Second
	}

	if v.MaxOpenRequests != 0 {
		cfg.Net.MaxOpenRequests = v.MaxOpenRequests
	}

	return cfg
}

func newKafkaClient(key string, cfg *setting.KafkaConfig, scfg *sarama.Config) (*Kafka, error) {
	client, err := sarama.NewClient(strings.Split(cfg.Address, ","), scfg)
	if err != nil {
		return nil, err
	}

	syncProducer, err := sarama.NewSyncProducer(strings.Split(cfg.Address, ","), scfg)
	if err != nil {
		return nil, err
	}

	consumer, err := sarama.NewConsumer(strings.Split(cfg.Address, ","), scfg)
	if err != nil {
		return nil, err
	}

	return &Kafka{
		Key:             key,
		DisableConsumer: cfg.DisableConsumer,
		Debug:           cfg.Debug,
		Producer:        syncProducer,
		Consumer:        consumer,
		Client:          client,
	}, nil
}



3、生产者模型

// SendMessage 发送消息,默认分区
func SendMessage(key, topic, value string) error {
	return SendMessagePartitionPar(key, topic, value, "")
}

// SendMessagePartitionPar 发送消息,指定分区
func SendMessagePartitionPar(key, topic, value, partitionKey string) error {
	kafka, err := GetClient(key)
	if err != nil {
		return err
	}

	msg := &sarama.ProducerMessage{
		Topic:     topic,
		Value:     sarama.StringEncoder(value),
		Timestamp: time.Now(),
	}

	if partitionKey != "" {
		msg.Key = sarama.StringEncoder(partitionKey)
	}
	partition, offset, err := kafka.Producer.SendMessage(msg)
	if err != nil {
		return err
	}
	if kafka.Debug {
		zap.L().Info("发送kafka消息成功",
			zap.Int32("partition", partition),
			zap.Int64("offset", offset))
	}

	return err
}



4、消费者模型

单个消费者消费topic

// Consumer 消费者函数
func Consumer(ctx context.Context, key, topic string, fn func(message *sarama.ConsumerMessage) error) (err error) {
	kafka, err := GetClient(key)
	if err != nil {
		return
	}
	partitions, err := kafka.Consumer.Partitions(topic)
	if err != nil {
		return
	}
	for _, partition := range partitions {
		// 针对每个分区创建一个对应的分区消费者
		offset, errx := kafka.Client.GetOffset(topic, partition, sarama.OffsetNewest)
		if errx != nil {
			zap.L().Info("获取Offset失败:", zap.Error(errx))
			continue
		}
		pc, errx := kafka.Consumer.ConsumePartition(topic, partition, offset-1)
		if errx != nil {
			zap.L().Info("获取Offset失败:", zap.Error(errx))
			return nil
		}
		// 从每个分区都消费消息
		go func(consume sarama.PartitionConsumer) {
			defer func() {
				if err := recover(); err != nil {
					zap.L().Error("消费kafka信息发生panic,err:%s", zap.Any("err:", err))
				}
			}()

			defer func() {
				err := pc.Close()
				if err != nil {
					zap.L().Error("消费kafka信息发生panic,err:%s", zap.Any("err:", err))
				}
			}()

			for {
				select {
				case msg := <-pc.Messages():
					err := MiddlewareConsumerHandler(fn)(msg)
					if err != nil {
						return
					}
				case <-ctx.Done():
					return
				}
			}

		}(pc)
	}
	return nil
}


func MiddlewareConsumerHandler(fn func(message *sarama.ConsumerMessage) error) func(message *sarama.ConsumerMessage) error {
	return func(msg *sarama.ConsumerMessage) error {
		return fn(msg)
	}
}

type ConsumerGroupHandler func(message *sarama.ConsumerMessage) error

func (ConsumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error {
	return nil
}

func (ConsumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error {
	return nil
}

func (h ConsumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
	for msg := range claim.Messages() {
		if err := h(msg); err != nil {
			zap.L().Info("消息处理失败",
				zap.String("topic", msg.Topic),
				zap.String("value", string(msg.Value)))
			continue
		}
		sess.MarkMessage(msg, "")
	}

	return nil
}

消费者组消费topic

// ConsumerGroup 消费者组消费消息
func ConsumerGroup(ctx context.Context, key, groupId, topics string, msgHandler ConsumerGroupHandler) (err error) {
	kafka, err := GetClient(key)
	if err != nil {
		return
	}

	if isConsumerDisabled(kafka) {
		return
	}

	consumerGroup, err := sarama.NewConsumerGroupFromClient(groupId, kafka.Client)
	if err != nil {
		return
	}

	go func() {
		defer func() {
			if err := recover(); err != nil {
				zap.L().Error("消费kafka发生panic", zap.Any("panic", err))
			}
		}()

		defer func() {
			err := consumerGroup.Close()
			if err != nil {
				zap.L().Error("close err", zap.Any("panic", err))
			}
		}()

		for {
			select {
			case <-ctx.Done():
				return
			default:
				err := consumerGroup.Consume(ctx, strings.Split(topics, ","), ConsumerGroupHandler(func(msg *sarama.ConsumerMessage) error {
					return MiddlewareConsumerHandler(msgHandler)(msg)
				}))
				if err != nil {
					zap.L().Error("消费kafka失败 err", zap.Any("panic", err))

				}
			}
		}
	}()

	return
}

func isConsumerDisabled(in *Kafka) bool {
	if in.DisableConsumer {
		zap.L().Info("kafka consumer disabled,key:%s", zap.String("key", in.Key))
	}

	return in.DisableConsumer
}



版权声明:本文为weixin_51991615原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。