rabbitmq 持续获取_RabbitMQ系列笔记封装篇

  • Post author:
  • Post category:其他


ae93596d9406b22c2e285f3edbaebb57.png


导语

在阅读本篇笔记时,如果你还不熟悉RabbitMQ,请查看公众号中关于RabbitMQ系列笔记相关文章,如果你已经熟悉了,还请在本篇文章多多指教。本文使用

go mod

进行获取相关包,使用Go1.12.6版本进行编写,编译器工具使用Vscode。


封装思路

首先为我们的RabbitMQ的工作模式用变量进行区别,分别代表工作模式、广播模式、路由模式、主题模式。

1const (
2    SimpleQueueType = "SimpleQueue"
3    BroadQueueType  = "BroadQueue"
4    DirectQueueType = "DirectQueue"
5    TopicQueueType  = "TopicQueue"
6)

定义一个客户端,该客户端包含连接,工作模式的类型,和所需队列的属性,我们使用这个结构体实现相关方法

1type MsgClient struct {
2    Conn *amqp.Connection
3    Type string `json:"type"` //消息类型
4    Data string `json:"data"` //队列数据
5}

定义一个接口,这里需要注意的是消费者里是一个函数,函数中的参数是将来我们获取的消息。

 1// 定义rabbitMQ的接口方法
 2type IMMessageClient interface {
 3    // 连接RabbitMQ,并获取连接
 4    ConnectToRabbitmq(Connection string)
 5    // 发送消息
 6    PublishToQueue(msg []byte) error
 7    // 消费消息
 8    ConsumeFromQueue(handlerfunc func(d amqp.Delivery)) error
 9}
10

定义我们的四种工作模式的结构体,这里需要注意的是,因为对于广播模式和路由模式属性基本相同,用了同一个结构体,Topic主题模式由于路由和绑定的路由可能不同,故单独分离了出来。

 1type SimpleQueue struct {
 2    Rout_key      string `json:"rout_key"`      //路由
 3    Queue         string `json:"queue"`         //队列的名字
 4    Is_persistent bool   `json:"is_persistent"` //队列是否持久化
 5}
 6type ComplexQueue struct {
 7    ExchangeName  string `json:"exchangeName"`
 8    Rout_key      string `json:"rout_key"`      //路由
 9    Queue         string `json:"queue"`         //队列的名字
10    Is_persistent bool   `json:"is_persistent"` //队列是否持久化
11}
12
13type TopicQueue struct {
14    ExchangeName  string `json:"exchangeName"`
15    Rout_key      string `json:"rout_key"`      //路由
16    Queue         string `json:"queue"`         //队列的名字
17    Is_persistent bool   `json:"is_persistent"` //队列是否持久化
18    Bind_key      string `json:"bind_key"`      //绑定的路由
19}

实现接口的相关方法,这里只是部分思路代码,如果想看封装源码,我已经上传到github上,有需要的可以直接拉取下来,当然也可以提交更好的代码到分支上。

 1//获取连接
 2func (m *MsgClient) ConnectToRabbitmq(Connection string) {
 3
 4    var err error
 5    m.Conn, err = amqp.Dial(fmt.Sprintf("%s/", Connection))
 6    if err != nil {
 7        log.Fatal(err)
 8    }
 9}
10// 发消息时判断其类型,注意使用json进行反序列化
11if m.Type == SimpleQueueType {
12            var s SimpleQueue
13            json.Unmarshal([]byte(m.Data), &s)
14            q, err := ch.QueueDeclare(
15                s.Queue,         //name队列的名称
16                s.Is_persistent, //durble是否持久化
17                false,           //delete when unused是否自动删除
18                false,           //exclusive是否设置排他,如果设置为true,则队列仅对首次声明他的连接可见,并在连接断开的时候自动删除
19                false,           //no-wait是否阻塞
20                nil,             //arguments
21            )
22            FailOnError(err, "队列申请失败")
23            err = ch.Publish(
24                "",
25                q.Name, // 路由,即队列的名字
26                false,  //mandatory
27                false,  //immediate
28                amqp.Publishing{
29                    DeliveryMode: amqp.Persistent, //消息的持久化
30                    ContentType:  "text/plain",
31                    Body:         msg,
32                },
33            )
34            FailOnError(err, "发送消息失败")
35}
36//接收消息时需要绑定路由
37// 队列绑定
38err = ch.QueueBind(
39    q.Name,         //队列的名字
40    s.Rout_key,     //routing key
41    s.ExchangeName, //所绑定的交换器
42    false,
43    nil,
44)

读取我们的消息

1func consumeLoop(deliveries <-chan amqp.Delivery, handlerfunc func(d amqp.Delivery)) {
2
3    for d := range deliveries {
4        fmt.Println("有数据:", string(d.Body))
5        handlerfunc(d)
6    }
7}


现在我们来测试一下吧

测试需要编写一个消费者收到消息后处理消息

1func recive(d amqp.Delivery) {
2    fmt.Println(string(d.Body))
3    d.Acknowledger.Ack(d.DeliveryTag, true)
4}

测试我们的work模式,这里为了持续测试,我们使用一个协程,并用http监听防止我们的程序退出。

 1simplequeue := client.NewSimpleQueue("user", "Login", true)
 2body, _ := json.Marshal(simplequeue)
 3fmt.Println(string(body))
 4Simple := &client.MsgClient{
 5    Type: client.SimpleQueueType,
 6    Data: string(body),
 7}
 8body, _ = json.Marshal(Simple)
 9fmt.Println(string(body))
10Simple.ConnectToRabbitmq("amqp://admin:admin@192.168.10.252:5672"
11go Simple.ConsumeFromQueue(recive)
12http.ListenAndServe("0.0.0.0:8200", nil)

输出进行了公平调度

205ec9655ee40faea18c77b9ba2a63d9.png

c544753bb852b76e70c770f5e8e15cc1.png

测试我们的广播模式

1broadqueue := client.NewComplexQueue("broadqueue_exchange", "broadqueue_route", "", true)
2body, _ := json.Marshal(broadqueue)
3Simple := &client.MsgClient{
4    Type: client.BroadQueueType,
5    Data: string(body),
6}
7Simple.ConnectToRabbitmq("amqp://admin:admin@192.168.10.252:5672")
8go Simple.ConsumeFromQueue(recive)

两个消费者同时收到了消息进行打印

a94bc088fc8dc5b3d7d128178bbc1c86.png


推荐阅读

  • 开发环境搭建(持续更新中)
  • RabbitMQ系列笔记介绍篇
  • Golang中Modle包的使用
  • goriila context深入学习笔记
  • Go Context深入学习笔记
  • 基于Nginx和Consul构建高可用及自动发现的Docker服务架构
  • 关于log日志的深入学习笔记

本文欢迎转载,转载请联系作者,谢谢!

  • 公众号【常更新】:无崖子天下无敌
  • GitHub:https://github.com/yuwe1
  • CSDN【看心情更新】: https://blog.csdn.net/weixin_40051278
  • 博客地址【定期更新】:https://mowuya.cn/

753b82dbe1c7b09900562dc0b97deb4a.png



版权声明:本文为weixin_31703753原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。