SpringCloud stream 集成RabbitMQ

  • Post author:
  • Post category:其他


当消息处理消费失败后,SpringCloud Stream 会自动默认重试3次,重试三次失败后,RepublishMessageRecoverer类recover方法会将改变routingkey为队列名称发送至死信队列。目前产生死信队列有两种方式:

  • 默认自动为每个消息队列产生一个死信队列,消费失败时会路由至该队列的死信队列
  • 直接指定每个消息队列绑定的死信队列,多个消息队列可绑定同一个死信队列

本案例采用第2种方式,便于所有消费失败信息处理。



配置注意事项

注意若系统中已经存在的消息队列,一定要在rabbitmq删除该队列,否则无法创建死信队列。 只需要配置spring.cloud.rabbit.bindings部分其中有两个注意点:

1、spring.cloud.bindings.input.group和spring.cloud.bindings.input.destination必须要配置,并且值不能一致,否则因为springcloud Dalston.SR1版本问题,死信消息队列的routingKey会取数错误,详细可参见RabbitMessageChannelBinder类createConsumerEndpoint方法,Spring Cloud 2.0发现该代码已经大部分改动了,没有BUG。

2、maxAttempts 可根据实际情况判断是否需要重试



导包

		<!--rabbitmq-->
		<dependency>
			<groupId>org.springframework.cloud</groupId>
			<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
			<version>3.0.13.RELEASE</version>
		</dependency>



yml配置

  • 配置死信队列
      rabbit:
        bindings:
          saveOrderInput:
            consumer:
              #ttl: 20000 # 默认不做限制,即无限。消息在队列中最大的存活时间。当消息滞留超过ttl时,会被当成消费失败消息,即会被转发到死信队列或丢弃.即消息在队列中存活的最大时间为 20s
              # DLQ相关
              autoBindDlq: true # 是否自动声明死信队列(DLQ)并将其绑定到死信交换机(DLX)。默认是false。
              republishToDlq: true
              deadLetterExchange: exchange-order-dlq  #绑定exchange
              deadLetterQueueName: exchange-order-dlq.saveOrderInput  #死信队列名字:exchanName.queueName

完整配置如下(对队列saveOrderInput配置了死信队列):

server:
  port: 8004  #服务端口号
spring:
  application:
    name: blog-service #项目名称
  datasource:
    driver-class-name:  com.mysql.cj.jdbc.Driver
#    url: jdbc:mysql://mysql:3306/llc_user?serverTimezone=UTC?useUnicode=true&characterEncoding=UTF-8&useSSL=false&serverTimezone=GMT%2B8
    url: jdbc:mysql://mysql:3306/llc?serverTimezone=UTC?useUnicode=true&amp;characterEncoding=UTF-8&amp;useSSL=false&amp;serverTimezone=GMT%2B8
    username: root
    password: ****
  cloud:
    nacos:
      discovery:        #nacos的服务发现
        server-addr: nacos:8848  #nacos的地址
        namespace: d19a4977-eb66-4f0d-a22a-3fefa43d6352   #nacos对应的命名空间名称
      config:          #nacos的服务配置
        server-addr: nacos:8848  #nacos的地址
        file-extension: yml   #nacos的服务配置类型
        group: DEFAULT_GROUP  #nacos的默认分组
        namespace: d19a4977-eb66-4f0d-a22a-3fefa43d6352  #nacos对应的命名空间名称
        ext-config:
          - data-id: currency.yml  #项目名称 id
            refresh: true   #服务配置刷新
    sentinel:
      transport:
        dashboard: localhost:8080 #配置Sentinel dashboard地址
        port: 8719

      # mq的相关配置
    stream:
      binders: #需要绑定的rabbitmq的服务信息
        defaultRabbit:  #定义的名称,用于bidding整合
          type: rabbit  #消息组件类型
          environment:  #配置rabbimq连接环境
            spring:
              rabbitmq:
                host: 127.0.0.1   #rabbitmq 服务器的地址
                port: 5672           #rabbitmq 服务器端口
                username: guest       #rabbitmq 用户名
                password: guest       #rabbitmq 密码
                virtual-host: /       #虚拟路径
      bindings:        #服务的整合处理
        # 生产者
        saveOrderOutput:    #这个是消息通道的名称 --->保存订单输出通道
          destination: exchange-saveOrder     #exchange名称,交换模式默认是topic;把SpringCloud stream的消息输出通道绑定到RabbitMQ的exchange-saveOrder交换器。
          content-type: application/json      #设置消息的类型,本次为json
          default-binder: defaultRabbit
          group: saveOrderGroup               #分组

        saveOrderInput:    #这个是消息通道的名称 ---> 保存订单输入通道
          destination: exchange-saveOrder     #exchange名称,交换模式默认是topic;把SpringCloud stream的消息输出通道绑定到RabbitMQ的exchange-saveOrder交换器。
          content-type: application/json      #设置消息的类型,本次为json
          default-binder: defaultRabbit
          group: saveOrderGroup               #分组
        orderInput:         #这个是消息通道的名称 ---> 订单输入通道
          destination: exchange-order     #exchange名称,交换模式默认是topic;把SpringCloud stream的消息输出通道绑定到RabbitMQ的exchange-saveOrder交换器。
          content-type: application/json      #设置消息的类型,本次为json
          default-binder: defaultRabbit
          group: orderGroup               #分组
        orderOutput:        #这个是消息通道的名称 ---> 订单输出通道
          destination: exchange-order     #exchange名称,交换模式默认是topic;把SpringCloud stream的消息输出通道绑定到RabbitMQ的exchange-saveOrder交换器。
          content-type: application/json      #设置消息的类型,本次为json
          default-binder: defaultRabbit
          group: orderGroup               #分组
        messageInput:
          destination: exchange-message
          content-typ: application/json
          default-binder: defaultRabbit
          group: messageGroup
        messageOutput:
          destination: exchange-message
          content-typ: application/json
          default-binder: defaultRabbit
          group: messageGroup

      rabbit:
        bindings:
          saveOrderInput:
            consumer:
              #ttl: 20000 # 默认不做限制,即无限。消息在队列中最大的存活时间。当消息滞留超过ttl时,会被当成消费失败消息,即会被转发到死信队列或丢弃.即消息在队列中存活的最大时间为 20s
              # DLQ相关
              autoBindDlq: true # 是否自动声明死信队列(DLQ)并将其绑定到死信交换机(DLX)。默认是false。
              republishToDlq: true
              #deadLetterExchange: exchange-order-dlq  #绑定exchange
              #deadLetterQueueName: exchange-order-dlq.saveOrderInput  #死信队列名字:exchanName.queueName

  redis:
    port: 6379            #redis的端口号默认端口是6379
    password: 123456        #密码
    host: redis
    timeout: 5000
    database: 3
#  rabbitmq:
#    host: 127.0.0.1
#    port: 5672
#    username: guest
#    password: guest
stone:
  redis:
    # 开启动态切换redis db【可选,默认不开启】
    dynamic-database: true

#security:
#  oauth2:
#    resource:
#      user-info-uri: http://zuul:8001/oauth-service/user-me    #  http://localhost:8998/user-me
#      prefer-token-info: false

# hystrix
feign:
  hystrix:
    enabled: false
  • 定义绑定接口(生产者)


import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.stereotype.Component;

@Component
public interface OrderOutputChannelProcessor {
    /**
     * 订单输出通道(需要与配置文件中的保持一致)
     */
    String ORDER_OUTPUT = "orderOutput";

    /**
     * 订单输出
     *
     * @return
     */
    @Output(ORDER_OUTPUT)
    MessageChannel orderOutput();


    /**
     * 保存订单输出通道(需要与配置文件中的保持一致)
     */
    String SAVE_ORDER_OUTPUT = "saveOrderOutput";
    /**
     * 保存订单输出
     *
     * @return
     */
    @Output(SAVE_ORDER_OUTPUT)
    MessageChannel saveOrderOutput();

    /**
     * 保存订单输出通道(需要与配置文件中的保持一致)
     */
    String MESSAGE_OUTPUT = "messageOutput";


    /**
     * 保存订单输出
     *
     * @return
     */
    @Output(MESSAGE_OUTPUT)
    MessageChannel messageOutput();
}

  • 定义绑定接口(消费者)

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.stereotype.Component;

/**
 * @Description 订单消息输入通道处理器
 * @time: 2020/11/29 15:37
 */
@Component
public interface OrderInputChannelProcessor {
    /**
     * 保存订单输入通道(需要与配置文件中的保持一致)
     */
    String SAVE_ORDER_INPUT = "saveOrderInput";
    /**
     * 订单输入通道(需要与配置文件中的保持一致)
     */
    String ORDER_INPUT = "orderInput";
    //String ORDER_DLX_INPUT = "orderDlxInput";


    /**
     * 订单输入通道(需要与配置文件中的保持一致)
     */
    String MESSAGE_INPUT = "messageInput";


    /**
     * 保存订单输入方法
     *
     * @return
     */
    @Input(SAVE_ORDER_INPUT)
    SubscribableChannel saveOrderInput();

    /**
     * 订单输入方法
     *
     * @return
     */
    @Input(ORDER_INPUT)
    SubscribableChannel orderInput();


    /**
     * 订单输入方法
     *
     * @return
     */
    @Input(MESSAGE_INPUT)
    SubscribableChannel messageInput();


}

  • 定义监听器

import com.itllc.config.mqConfig.channel.OrderInputChannelProcessor;
import com.itllc.config.mqConfig.channel.OrderOutputChannelProcessor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.Message;
import org.springframework.messaging.handler.annotation.SendTo;

@Slf4j
@EnableBinding({OrderInputChannelProcessor.class, OrderOutputChannelProcessor.class})
public class OrderMessageListener {

    /**
     * 保存订单逻辑
     * 通过OrderChannelProcessor.SAVE_ORDER_INPUT 接收消息
     * 然后通过@SendTo 将处理后的消息发送到 OrderChannelProcessor.ORDER_OUTPUT
     *
     * @param message
     * @return
     */
    @StreamListener(OrderInputChannelProcessor.SAVE_ORDER_INPUT)
    @SendTo(OrderOutputChannelProcessor.ORDER_OUTPUT)
    public String saveOrderMessage(Message<String> message) {
        log.info("保存订单的消息:" + message);
        int i = 1 / 0;
        //处理之后的订单消息
        return "【" + message.getPayload() + "】";
    }

    /**
     * 监听OrderChannelProcessor.ORDER_INPUT 通道,进行发送短信业务操作
     *
     * @param message
     */
    @StreamListener(OrderInputChannelProcessor.ORDER_INPUT)
    public void orderToSMSMessage(String message) {
        log.info("进行发送短信业务操作----------:{}", message);
    }

    /**
     * 监听OrderChannelProcessor.ORDER_INPUT 通道,进行存储到es业务操作
     *
     * @param message
     */
    @StreamListener(OrderInputChannelProcessor.ORDER_INPUT)
    public void orderToEsMessage(String message) {
        log.info("进行存储到es业务操作------------:{}", message);
    }

    /**
     * 监听OrderChannelProcessor.ORDER_INPUT 通道,进行存储到es业务操作
     *
     * @param message
     */
    @StreamListener(OrderInputChannelProcessor.MESSAGE_INPUT)
//    @SendTo(OrderOutputChannelProcessor.ORDER_OUTPUT)
    public void testMessage(String message) {
        log.info("自定义输入通道------------:{}", message);
    }
}



测试流程

定义个controller



    /**
     * 发送消息自定义
     *
     * @param message
     */
    @GetMapping(value = "/sendOwnMessage")
    @ApiOperation("发送消息自定义")
    public void sendOwnMessage(@NotBlank  @ApiParam(value = "发送的消息",required = true) @RequestParam("message") String message) {
        //发送消息
        llcMqService.sendOwnMessage(message);
        log.info("发送保存订单消息成功");
    }

定义个service


import com.itllc.config.mqConfig.channel.OrderOutputChannelProcessor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;

import javax.annotation.Resource;

/**
 * @author lilinchun
 * @date 2022/5/13 0013 20:50
 */
@Slf4j
@EnableBinding(value = {OrderOutputChannelProcessor.class})
public class LlcMqService {


    @Autowired
    @Output(OrderOutputChannelProcessor.SAVE_ORDER_OUTPUT)
    private MessageChannel channel;



    @Resource
    private  OrderOutputChannelProcessor orderOutputChannelProcessor;



    /**
     * 发送消息
     *
     * @param msg
     */
    public void sendMsg(String msg) {
        System.out.println("----------------"+msg);
        channel.send(MessageBuilder.withPayload(msg).build());
        log.info("消息发送成功:" + msg);
    }


    /**
     * 自定义发送消息
     * @param msg
     */
    public void sendOwnMessage(String msg) {
        System.out.println("----------------"+msg);
        orderOutputChannelProcessor.messageOutput().send(MessageBuilder.withPayload(msg).build());
        log.info("消息发送成功:" + msg);

    }
}

测试结果

在这里插入图片描述

现在测试当消息转发出现问题的时候怎么解决

在这里插入图片描述

再次运行后,可以看到消息被重复执行了三次,因为springCloud stream消息默认情况下是消费消息时若是出错了会重试三次,超过三次之后就会进入死信队列,若没有配置死信队列则该消息会丢失。
在这里插入图片描述

启动消费者和生产者,可以在RabbitMQ web界面中可以看到已经创建exchange-saveOrder.saveOrderGroup队列的死信队列 exchange-saveOrder.saveOrderGroup.dlq。

在这里插入图片描述

运行1次(任何次数都行)生产者方法发送消息给消费者,在RabbitMQ web页面可以明显的看出出错了的消息已经全部进入的死信队列当中。

查看dlq队列

点击页面队列名exchange-saveOrder.saveOrderGroup.dlq进入:

在这里插入图片描述

查看错误日志信息(点击get Messages)

在这里插入图片描述

可以看到消息原封不动保存,并且有具体的报错信息。方便与我们来进行分析代码。

  • 移除消息

在这里插入图片描述

按提示需要执行下面命令开启插件,不需要重启rabbitmq

$ rabbitmq-plugins enable rabbitmq_shovel rabbitmq_shovel_management

清空消息时,可以再输入框中填入目标队列名称(可以从Get messages 部分的RoutingKey进行复制),点击按钮,清空死信队列消息。这时死信队列中的消息会回到原来的队列当中去。

参考文章:

https://blog.csdn.net/a767815662/article/details/110848635