当消息处理消费失败后,SpringCloud Stream 会自动默认重试3次,重试三次失败后,RepublishMessageRecoverer类recover方法会将改变routingkey为队列名称发送至死信队列。目前产生死信队列有两种方式:
- 默认自动为每个消息队列产生一个死信队列,消费失败时会路由至该队列的死信队列
- 直接指定每个消息队列绑定的死信队列,多个消息队列可绑定同一个死信队列
本案例采用第2种方式,便于所有消费失败信息处理。
配置注意事项
注意若系统中已经存在的消息队列,一定要在rabbitmq删除该队列,否则无法创建死信队列。 只需要配置spring.cloud.rabbit.bindings部分其中有两个注意点:
1、spring.cloud.bindings.input.group和spring.cloud.bindings.input.destination必须要配置,并且值不能一致,否则因为springcloud Dalston.SR1版本问题,死信消息队列的routingKey会取数错误,详细可参见RabbitMessageChannelBinder类createConsumerEndpoint方法,Spring Cloud 2.0发现该代码已经大部分改动了,没有BUG。
2、maxAttempts 可根据实际情况判断是否需要重试
导包
<!--rabbitmq-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
<version>3.0.13.RELEASE</version>
</dependency>
yml配置
- 配置死信队列
rabbit:
bindings:
saveOrderInput:
consumer:
#ttl: 20000 # 默认不做限制,即无限。消息在队列中最大的存活时间。当消息滞留超过ttl时,会被当成消费失败消息,即会被转发到死信队列或丢弃.即消息在队列中存活的最大时间为 20s
# DLQ相关
autoBindDlq: true # 是否自动声明死信队列(DLQ)并将其绑定到死信交换机(DLX)。默认是false。
republishToDlq: true
deadLetterExchange: exchange-order-dlq #绑定exchange
deadLetterQueueName: exchange-order-dlq.saveOrderInput #死信队列名字:exchanName.queueName
完整配置如下(对队列saveOrderInput配置了死信队列):
server:
port: 8004 #服务端口号
spring:
application:
name: blog-service #项目名称
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
# url: jdbc:mysql://mysql:3306/llc_user?serverTimezone=UTC?useUnicode=true&characterEncoding=UTF-8&useSSL=false&serverTimezone=GMT%2B8
url: jdbc:mysql://mysql:3306/llc?serverTimezone=UTC?useUnicode=true&characterEncoding=UTF-8&useSSL=false&serverTimezone=GMT%2B8
username: root
password: ****
cloud:
nacos:
discovery: #nacos的服务发现
server-addr: nacos:8848 #nacos的地址
namespace: d19a4977-eb66-4f0d-a22a-3fefa43d6352 #nacos对应的命名空间名称
config: #nacos的服务配置
server-addr: nacos:8848 #nacos的地址
file-extension: yml #nacos的服务配置类型
group: DEFAULT_GROUP #nacos的默认分组
namespace: d19a4977-eb66-4f0d-a22a-3fefa43d6352 #nacos对应的命名空间名称
ext-config:
- data-id: currency.yml #项目名称 id
refresh: true #服务配置刷新
sentinel:
transport:
dashboard: localhost:8080 #配置Sentinel dashboard地址
port: 8719
# mq的相关配置
stream:
binders: #需要绑定的rabbitmq的服务信息
defaultRabbit: #定义的名称,用于bidding整合
type: rabbit #消息组件类型
environment: #配置rabbimq连接环境
spring:
rabbitmq:
host: 127.0.0.1 #rabbitmq 服务器的地址
port: 5672 #rabbitmq 服务器端口
username: guest #rabbitmq 用户名
password: guest #rabbitmq 密码
virtual-host: / #虚拟路径
bindings: #服务的整合处理
# 生产者
saveOrderOutput: #这个是消息通道的名称 --->保存订单输出通道
destination: exchange-saveOrder #exchange名称,交换模式默认是topic;把SpringCloud stream的消息输出通道绑定到RabbitMQ的exchange-saveOrder交换器。
content-type: application/json #设置消息的类型,本次为json
default-binder: defaultRabbit
group: saveOrderGroup #分组
saveOrderInput: #这个是消息通道的名称 ---> 保存订单输入通道
destination: exchange-saveOrder #exchange名称,交换模式默认是topic;把SpringCloud stream的消息输出通道绑定到RabbitMQ的exchange-saveOrder交换器。
content-type: application/json #设置消息的类型,本次为json
default-binder: defaultRabbit
group: saveOrderGroup #分组
orderInput: #这个是消息通道的名称 ---> 订单输入通道
destination: exchange-order #exchange名称,交换模式默认是topic;把SpringCloud stream的消息输出通道绑定到RabbitMQ的exchange-saveOrder交换器。
content-type: application/json #设置消息的类型,本次为json
default-binder: defaultRabbit
group: orderGroup #分组
orderOutput: #这个是消息通道的名称 ---> 订单输出通道
destination: exchange-order #exchange名称,交换模式默认是topic;把SpringCloud stream的消息输出通道绑定到RabbitMQ的exchange-saveOrder交换器。
content-type: application/json #设置消息的类型,本次为json
default-binder: defaultRabbit
group: orderGroup #分组
messageInput:
destination: exchange-message
content-typ: application/json
default-binder: defaultRabbit
group: messageGroup
messageOutput:
destination: exchange-message
content-typ: application/json
default-binder: defaultRabbit
group: messageGroup
rabbit:
bindings:
saveOrderInput:
consumer:
#ttl: 20000 # 默认不做限制,即无限。消息在队列中最大的存活时间。当消息滞留超过ttl时,会被当成消费失败消息,即会被转发到死信队列或丢弃.即消息在队列中存活的最大时间为 20s
# DLQ相关
autoBindDlq: true # 是否自动声明死信队列(DLQ)并将其绑定到死信交换机(DLX)。默认是false。
republishToDlq: true
#deadLetterExchange: exchange-order-dlq #绑定exchange
#deadLetterQueueName: exchange-order-dlq.saveOrderInput #死信队列名字:exchanName.queueName
redis:
port: 6379 #redis的端口号默认端口是6379
password: 123456 #密码
host: redis
timeout: 5000
database: 3
# rabbitmq:
# host: 127.0.0.1
# port: 5672
# username: guest
# password: guest
stone:
redis:
# 开启动态切换redis db【可选,默认不开启】
dynamic-database: true
#security:
# oauth2:
# resource:
# user-info-uri: http://zuul:8001/oauth-service/user-me # http://localhost:8998/user-me
# prefer-token-info: false
# hystrix
feign:
hystrix:
enabled: false
- 定义绑定接口(生产者)
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.stereotype.Component;
@Component
public interface OrderOutputChannelProcessor {
/**
* 订单输出通道(需要与配置文件中的保持一致)
*/
String ORDER_OUTPUT = "orderOutput";
/**
* 订单输出
*
* @return
*/
@Output(ORDER_OUTPUT)
MessageChannel orderOutput();
/**
* 保存订单输出通道(需要与配置文件中的保持一致)
*/
String SAVE_ORDER_OUTPUT = "saveOrderOutput";
/**
* 保存订单输出
*
* @return
*/
@Output(SAVE_ORDER_OUTPUT)
MessageChannel saveOrderOutput();
/**
* 保存订单输出通道(需要与配置文件中的保持一致)
*/
String MESSAGE_OUTPUT = "messageOutput";
/**
* 保存订单输出
*
* @return
*/
@Output(MESSAGE_OUTPUT)
MessageChannel messageOutput();
}
- 定义绑定接口(消费者)
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.stereotype.Component;
/**
* @Description 订单消息输入通道处理器
* @time: 2020/11/29 15:37
*/
@Component
public interface OrderInputChannelProcessor {
/**
* 保存订单输入通道(需要与配置文件中的保持一致)
*/
String SAVE_ORDER_INPUT = "saveOrderInput";
/**
* 订单输入通道(需要与配置文件中的保持一致)
*/
String ORDER_INPUT = "orderInput";
//String ORDER_DLX_INPUT = "orderDlxInput";
/**
* 订单输入通道(需要与配置文件中的保持一致)
*/
String MESSAGE_INPUT = "messageInput";
/**
* 保存订单输入方法
*
* @return
*/
@Input(SAVE_ORDER_INPUT)
SubscribableChannel saveOrderInput();
/**
* 订单输入方法
*
* @return
*/
@Input(ORDER_INPUT)
SubscribableChannel orderInput();
/**
* 订单输入方法
*
* @return
*/
@Input(MESSAGE_INPUT)
SubscribableChannel messageInput();
}
- 定义监听器
import com.itllc.config.mqConfig.channel.OrderInputChannelProcessor;
import com.itllc.config.mqConfig.channel.OrderOutputChannelProcessor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.Message;
import org.springframework.messaging.handler.annotation.SendTo;
@Slf4j
@EnableBinding({OrderInputChannelProcessor.class, OrderOutputChannelProcessor.class})
public class OrderMessageListener {
/**
* 保存订单逻辑
* 通过OrderChannelProcessor.SAVE_ORDER_INPUT 接收消息
* 然后通过@SendTo 将处理后的消息发送到 OrderChannelProcessor.ORDER_OUTPUT
*
* @param message
* @return
*/
@StreamListener(OrderInputChannelProcessor.SAVE_ORDER_INPUT)
@SendTo(OrderOutputChannelProcessor.ORDER_OUTPUT)
public String saveOrderMessage(Message<String> message) {
log.info("保存订单的消息:" + message);
int i = 1 / 0;
//处理之后的订单消息
return "【" + message.getPayload() + "】";
}
/**
* 监听OrderChannelProcessor.ORDER_INPUT 通道,进行发送短信业务操作
*
* @param message
*/
@StreamListener(OrderInputChannelProcessor.ORDER_INPUT)
public void orderToSMSMessage(String message) {
log.info("进行发送短信业务操作----------:{}", message);
}
/**
* 监听OrderChannelProcessor.ORDER_INPUT 通道,进行存储到es业务操作
*
* @param message
*/
@StreamListener(OrderInputChannelProcessor.ORDER_INPUT)
public void orderToEsMessage(String message) {
log.info("进行存储到es业务操作------------:{}", message);
}
/**
* 监听OrderChannelProcessor.ORDER_INPUT 通道,进行存储到es业务操作
*
* @param message
*/
@StreamListener(OrderInputChannelProcessor.MESSAGE_INPUT)
// @SendTo(OrderOutputChannelProcessor.ORDER_OUTPUT)
public void testMessage(String message) {
log.info("自定义输入通道------------:{}", message);
}
}
测试流程
定义个controller
/**
* 发送消息自定义
*
* @param message
*/
@GetMapping(value = "/sendOwnMessage")
@ApiOperation("发送消息自定义")
public void sendOwnMessage(@NotBlank @ApiParam(value = "发送的消息",required = true) @RequestParam("message") String message) {
//发送消息
llcMqService.sendOwnMessage(message);
log.info("发送保存订单消息成功");
}
定义个service
import com.itllc.config.mqConfig.channel.OrderOutputChannelProcessor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import javax.annotation.Resource;
/**
* @author lilinchun
* @date 2022/5/13 0013 20:50
*/
@Slf4j
@EnableBinding(value = {OrderOutputChannelProcessor.class})
public class LlcMqService {
@Autowired
@Output(OrderOutputChannelProcessor.SAVE_ORDER_OUTPUT)
private MessageChannel channel;
@Resource
private OrderOutputChannelProcessor orderOutputChannelProcessor;
/**
* 发送消息
*
* @param msg
*/
public void sendMsg(String msg) {
System.out.println("----------------"+msg);
channel.send(MessageBuilder.withPayload(msg).build());
log.info("消息发送成功:" + msg);
}
/**
* 自定义发送消息
* @param msg
*/
public void sendOwnMessage(String msg) {
System.out.println("----------------"+msg);
orderOutputChannelProcessor.messageOutput().send(MessageBuilder.withPayload(msg).build());
log.info("消息发送成功:" + msg);
}
}
测试结果
现在测试当消息转发出现问题的时候怎么解决
再次运行后,可以看到消息被重复执行了三次,因为springCloud stream消息默认情况下是消费消息时若是出错了会重试三次,超过三次之后就会进入死信队列,若没有配置死信队列则该消息会丢失。
启动消费者和生产者,可以在RabbitMQ web界面中可以看到已经创建exchange-saveOrder.saveOrderGroup队列的死信队列 exchange-saveOrder.saveOrderGroup.dlq。
运行1次(任何次数都行)生产者方法发送消息给消费者,在RabbitMQ web页面可以明显的看出出错了的消息已经全部进入的死信队列当中。
查看dlq队列
点击页面队列名exchange-saveOrder.saveOrderGroup.dlq进入:
查看错误日志信息(点击get Messages)
可以看到消息原封不动保存,并且有具体的报错信息。方便与我们来进行分析代码。
- 移除消息
按提示需要执行下面命令开启插件,不需要重启rabbitmq
$ rabbitmq-plugins enable rabbitmq_shovel rabbitmq_shovel_management
清空消息时,可以再输入框中填入目标队列名称(可以从Get messages 部分的RoutingKey进行复制),点击按钮,清空死信队列消息。这时死信队列中的消息会回到原来的队列当中去。
参考文章:
https://blog.csdn.net/a767815662/article/details/110848635