基于NetCore的RabbitMQ使用

  • Post author:
  • Post category:其他


由于最近公司做的项目,需要发短信/邮件/第三方接口异步回调信息等的及时处理,自己就简单的研究了以下RabbitMQ在NetCore中的实现。

RabbitMQ是什么具体就不再这里详细介绍了,自己去百度。本文主要把自己封装的RabbitMQ的使用介绍给大家,并提供相应的Nuget包供大家下载使用,谢谢各位大佬,本人能力有限。



MQ连接配置

在appsettings中加入如下内容:

//消息队列
  "RabbitMQ": {

    "HostName": "127.0.0.1", // 主机名称
    "Port": 5672, // 主机端口
    "UserName": "guest", // 连接账号
    "Password": "guest" // 连接密码

  }

自动读入到RabbitMQOption中



MQ交换机类型

这里通过枚举类ExchangeType我定义了四种交换机类型

在这里插入图片描述



生产者

生产者对应的IRabbitMQProducer接口以及其实现RabbitMQProducer已经自动通过单例模式注入,使用的时候你只需要注入即可,如果你想自己实现,可以覆盖我的实现类



生产者异常处理机制

生产者投递消息失败可能有以下几种情况:

1.MQ宕机

2.MQ发送数据到交换机失败

3.MQ发送到交换机成功路由到队列失败

对于第1点:

你可以实现IMQCrashHandle写宕机等问题造成的连接失败的公共处理方式

这里考虑的是采取短信或邮件或其他方式通知人员处理MQ.

在这里插入图片描述

对于第2,3点:

第1点的问题就会引发2,3点问题

这里采用了MQ消息投递的Confirm异步处理确认模式

这里你可以实现IMQProducerHandle接口对消息投递失败或成功进行相应的处理。

在这里插入图片描述

这里你可以在Success方法中对成功的消息做记录等操作,在Failure函数中可以对失败的消息进行自己存入db中等进行处理



消费者

消费者统一一接口IRabbitMQConsumer。



常规的消费者

用于普通的及时发送消息及时处理消息

主要是RabbitMQConsumer类,如果需要使用只需要继承RabbitMQConsumer抽象类即可



消峰消费者

用于对某一有限资源的访问控制等,目前未实现



利用死信队列实现延时处理的消费者

用于对消息进行延迟处理,分为整个管道所有消息同一延时,或每个消息延迟时间不同

延时处理队列可以通过实现RabbitMQDelayDeadLetterConsumer抽象类即可

如果延时消息消费失败,不会有重试机制,需要自己实现IMQConsumerHandle来处理



优先级消费者

目前暂未实现



消费者处理消息结果

在MQAction枚举中定义了消费者处理的枚举。主要有以下四种。

在这里插入图片描述

这里注意:对于使用JoinRetryQueue重试策略,次数达到之后消息的处理状态会由RETRY转为FAILURE,如果这时候需要外部接口处理需要实现IMQConsumerHandle中的Failure函数来自己处理消息



根据处理结果可以通过外部接口对消息进行处理

在IMQConsumerHandle接口分别定义了Success Retry Failure Reject分别对应上述的几种消息处理结果

可以针对这几种结果,自己实现该接口对其进行处理,注意一定要处理,因为在消费者内部已经是把该消息消费了

在这里插入图片描述



消费者异常处理机制

消费者出现异常,主要是消费失败,消息不能及时从管道中处理。

这里对于失败的消息,主要有以下几种策略进行处理。

在ConsumerRetryPolicy枚举类定义了重试策略

在这里插入图片描述



使用例子



初始化MQ

在Net5.0中在Startup.cs ConfigureSevice中加入以下代码

service.InitRabbitMQ(configuration);

在Net6.0直接在Program.cs中加入以下代码

//初始化RabbitMQ
builder.Services.InitRabbitMQ(builder.Configuration);



定义一个消费者

在这里插入图片描述



注入消费者发送消息到对应交换机和路由

在这里插入图片描述

这里我采用匿名Action来处理失败或成功的消息。



测试结果

RabbitMQ搭建好之后,启动项目即可在MQ界面中看到相应的交换机和队列已经建立成功

在这里插入图片描述

在这里插入图片描述



消息发送测试

注意消息发送之后都是序列化为字符串了的,需要消费消息需要反序列化才行



发送消息的时候把MQ停止了

可以从下面的截图中看到公共的MQ宕机连接不上的处理接口生效了,以及MQ消息投递失败的Action处理也成功了

在这里插入图片描述



正常投递消息并消费成功

在这里插入图片描述



正常投递消息模拟消费失败重试

这里我把消息内容从success改为retry表示需要重试

在这里插入图片描述

从运行结果可以看到重试了三次之后,状态转为处理失败,这个时候就需要你自己在外部处理消息了,如果外部不处理消息就会丢失



正常投递消息模拟消费直接转失败状态

在这里插入图片描述



正常投递消息模拟消费直接转拒绝状态

在这里插入图片描述

如果你需要自己实现相应的处理,你可以自己实现以上相应的接口,并配合我之前基于Autofac自动注入类通过Target来动态替换,具体地址为

https://blog.csdn.net/weixin_43872830/article/details/121157725?spm=1001.2014.3001.5502



为消息设置不同的重试时间间隔

这里主要采用延时队列加上死信队列,利用XMessageTTL实现不同时间间隔的重试,过期时间作用于消息上而不是队列上整体设置过期时间

在这里插入图片描述

消息的重试次数将由RetryIntervals的集合的长度决定,如果同时设置了RetryCount以及RetryInterval,那么起决定作用的也是RetryIntervals.

运行结果

在这里插入图片描述

从结果中我们可以看到10s和20s后分别重试了一次



延迟消息处理例子

消息过期时间相同的实现类

在这里插入图片描述

如果需要设置消息不同过期时间,这里的XMessageTTL不需要设置,然后在消息发送的时候设置消息的过期时间即可

在这里插入图片描述

启动项目后可以看到相应的交换机和队列已经创建

在这里插入图片描述

在这里插入图片描述



运行结果:

从结果可以看到10秒之前发送的消息被消费了

在这里插入图片描述

从结果可以看到消息进入队列后延时时间不一样,并且不会因为前面消息的过期时间比后面的过期时间长,而阻塞后面消息过期时间之后不能及时出队列

在这里插入图片描述


需要注意某个交换机或者队列一旦初始化完成之后,想要在改变交换机或者队列的一些属性就比较困难,比如修改某些属性重新启动程序,会提示mq的错误等,这些需要您自行了解,有一个叫做policy的东西。所以原则上交换机或队列一旦想好怎样设置之后,就不再改动它,特别是已经运行于生产环境中

如果需要使用我封装的RabbitMQ,可以在NuGet上搜索OpenDeepSpace.RabbitMQ进行下载使用与了解



版权声明:本文为weixin_43872830原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。