由于最近公司做的项目,需要发短信/邮件/第三方接口异步回调信息等的及时处理,自己就简单的研究了以下RabbitMQ在NetCore中的实现。
RabbitMQ是什么具体就不再这里详细介绍了,自己去百度。本文主要把自己封装的RabbitMQ的使用介绍给大家,并提供相应的Nuget包供大家下载使用,谢谢各位大佬,本人能力有限。
MQ连接配置
在appsettings中加入如下内容:
//消息队列
"RabbitMQ": {
"HostName": "127.0.0.1", // 主机名称
"Port": 5672, // 主机端口
"UserName": "guest", // 连接账号
"Password": "guest" // 连接密码
}
自动读入到RabbitMQOption中
MQ交换机类型
这里通过枚举类ExchangeType我定义了四种交换机类型
生产者
生产者对应的IRabbitMQProducer接口以及其实现RabbitMQProducer已经自动通过单例模式注入,使用的时候你只需要注入即可,如果你想自己实现,可以覆盖我的实现类
生产者异常处理机制
生产者投递消息失败可能有以下几种情况:
1.MQ宕机
2.MQ发送数据到交换机失败
3.MQ发送到交换机成功路由到队列失败
对于第1点:
你可以实现IMQCrashHandle写宕机等问题造成的连接失败的公共处理方式
这里考虑的是采取短信或邮件或其他方式通知人员处理MQ.
对于第2,3点:
第1点的问题就会引发2,3点问题
这里采用了MQ消息投递的Confirm异步处理确认模式
这里你可以实现IMQProducerHandle接口对消息投递失败或成功进行相应的处理。
这里你可以在Success方法中对成功的消息做记录等操作,在Failure函数中可以对失败的消息进行自己存入db中等进行处理
消费者
消费者统一一接口IRabbitMQConsumer。
常规的消费者
用于普通的及时发送消息及时处理消息
主要是RabbitMQConsumer类,如果需要使用只需要继承RabbitMQConsumer抽象类即可
消峰消费者
用于对某一有限资源的访问控制等,目前未实现
利用死信队列实现延时处理的消费者
用于对消息进行延迟处理,分为整个管道所有消息同一延时,或每个消息延迟时间不同
延时处理队列可以通过实现RabbitMQDelayDeadLetterConsumer抽象类即可
如果延时消息消费失败,不会有重试机制,需要自己实现IMQConsumerHandle来处理
优先级消费者
目前暂未实现
消费者处理消息结果
在MQAction枚举中定义了消费者处理的枚举。主要有以下四种。
这里注意:对于使用JoinRetryQueue重试策略,次数达到之后消息的处理状态会由RETRY转为FAILURE,如果这时候需要外部接口处理需要实现IMQConsumerHandle中的Failure函数来自己处理消息
根据处理结果可以通过外部接口对消息进行处理
在IMQConsumerHandle接口分别定义了Success Retry Failure Reject分别对应上述的几种消息处理结果
可以针对这几种结果,自己实现该接口对其进行处理,注意一定要处理,因为在消费者内部已经是把该消息消费了
消费者异常处理机制
消费者出现异常,主要是消费失败,消息不能及时从管道中处理。
这里对于失败的消息,主要有以下几种策略进行处理。
在ConsumerRetryPolicy枚举类定义了重试策略
使用例子
初始化MQ
在Net5.0中在Startup.cs ConfigureSevice中加入以下代码
service.InitRabbitMQ(configuration);
在Net6.0直接在Program.cs中加入以下代码
//初始化RabbitMQ
builder.Services.InitRabbitMQ(builder.Configuration);
定义一个消费者
注入消费者发送消息到对应交换机和路由
这里我采用匿名Action来处理失败或成功的消息。
测试结果
RabbitMQ搭建好之后,启动项目即可在MQ界面中看到相应的交换机和队列已经建立成功
消息发送测试
注意消息发送之后都是序列化为字符串了的,需要消费消息需要反序列化才行
发送消息的时候把MQ停止了
可以从下面的截图中看到公共的MQ宕机连接不上的处理接口生效了,以及MQ消息投递失败的Action处理也成功了
正常投递消息并消费成功
正常投递消息模拟消费失败重试
这里我把消息内容从success改为retry表示需要重试
从运行结果可以看到重试了三次之后,状态转为处理失败,这个时候就需要你自己在外部处理消息了,如果外部不处理消息就会丢失
正常投递消息模拟消费直接转失败状态
正常投递消息模拟消费直接转拒绝状态
如果你需要自己实现相应的处理,你可以自己实现以上相应的接口,并配合我之前基于Autofac自动注入类通过Target来动态替换,具体地址为
https://blog.csdn.net/weixin_43872830/article/details/121157725?spm=1001.2014.3001.5502
为消息设置不同的重试时间间隔
这里主要采用延时队列加上死信队列,利用XMessageTTL实现不同时间间隔的重试,过期时间作用于消息上而不是队列上整体设置过期时间
消息的重试次数将由RetryIntervals的集合的长度决定,如果同时设置了RetryCount以及RetryInterval,那么起决定作用的也是RetryIntervals.
运行结果
从结果中我们可以看到10s和20s后分别重试了一次
延迟消息处理例子
消息过期时间相同的实现类
如果需要设置消息不同过期时间,这里的XMessageTTL不需要设置,然后在消息发送的时候设置消息的过期时间即可
启动项目后可以看到相应的交换机和队列已经创建
运行结果:
从结果可以看到10秒之前发送的消息被消费了
从结果可以看到消息进入队列后延时时间不一样,并且不会因为前面消息的过期时间比后面的过期时间长,而阻塞后面消息过期时间之后不能及时出队列
需要注意某个交换机或者队列一旦初始化完成之后,想要在改变交换机或者队列的一些属性就比较困难,比如修改某些属性重新启动程序,会提示mq的错误等,这些需要您自行了解,有一个叫做policy的东西。所以原则上交换机或队列一旦想好怎样设置之后,就不再改动它,特别是已经运行于生产环境中
如果需要使用我封装的RabbitMQ,可以在NuGet上搜索OpenDeepSpace.RabbitMQ进行下载使用与了解