Spring + RabbitMq + 延迟队列 实现订单取消和完成功能

  • Post author:
  • Post category:其他


1.项目是 Spring的 版本比较老是3.1.4的,所以使用的Rabbit的jar 也要老一点我用的是1.3.5的

先导入jar包

<dependency>
    <groupId>org.springframework.amqp</groupId>
    <artifactId>spring-rabbit</artifactId>
    <version>1.3.5.RELEASE</version>
</dependency>

2. 安装Rabbit windos版本 便于测试 rabbit有两个端口 一个是15672的这个端口是web管理端口,一个是5672的工厂链接端口

3.创建rabbitMq.properties配置文件

rmq.ip=locahost
rmq.port=5672
rmq.producer.num=20
rmq.manager.user=用户名
rmq.manager.password=密码
rmq.host=通道名称

4.web打开 locahost:15672管理页面  默认账号密码都是guest

5.配置一个自己的账户  进入admin选项卡 选中Add a user 输入 Username和Password 然后Adduser记得配置权限

6.创建两个交换机 一个是正常交换,一个是死信交换,死信交换是用来做延时队列的,原理就是生产者生产一条定时消息1分钟,1分钟到了以后该信息将被交换至死信息中,消费者端监听死信交换机,死信中有了消息以后会响应消费者端

进入Exchanges选项卡,选择Add new exchange  只需要输入name 即可  选择 Add exchange保存

交换机名称就叫exchange_order ,然后同样的在创建一个die_exchange_order 交换机,创建完了以后,再去创建两个队列创建一个 queue_order队列 这个队列是正常队列,我们需要先输入Name 然后设置规则Arguments

1.x-dead-letter-exchange:die_exchange_order //这个是死信交换机名称

2.x-dead-letter-routing-key:交换机路由 //这个是死信交换机路由

3.x-message-ttl:60000 //这个是过期时间毫秒

然后保存,保存完了以后选择下面Dindings  绑定一个交换机 From exchange 交换机名称 这里需要绑定正常的交换机exchange_order 然后是Routing key 正常的交换机路由Routing key下面一点再说

创建完queue_order队列后,再创建一个die_queue_order死信队列,这个就不需要设置规则了,直接去绑定一个交换机 From exchange 这里就要填写为die_exchange_order了

创建完两个队列以后,在切换到Exchanges选项卡,选择第一个exchange_order然后选择Add binding from this exchange 绑定队列Toqueue,exchange_order需要绑定queue_order队列然后设置Routing key 的值,记得Exchanges说的是设置,在Queue中我说的是绑定,同样的操作一次die_exchange_order 绑定队列 die_queue_order,都操作完了以后 我们开始项目配置了

7.spring 配置 需要理解流程 : 链接Rabbit 需要一个工厂,生产者链接工厂打开通道,通过交换机的名称和路由key获取往队列中存储数据。那么我们就需要先配置工厂

	<bean id="connectionFactory"  class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
		<constructor-arg value="localhost" />
		<property name="username" value="${rmq.manager.user}" />
		<property name="password" value="${rmq.manager.password}" />
		<property name="host" value="${rmq.ip}" />
		<property name="port" value="${rmq.port}" />
        <property name="virtualHost" value="${rmq.host}" />
	</bean>

工厂配置完成以后,我们声明队列 这里配置的是生产者和消费者在同一个服务端所以配两个

<rabbit:queue name="queue_order" durable="true" auto-delete="false" exclusive="false" />
	<rabbit:queue name="die_queue_order" durable="true" auto-delete="false" exclusive="false" />

队列声明完成以后,需要声明一个工厂消息序列化

	<bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" />
	<rabbit:template  id="rabbitTemplate" connection-factory="connectionFactory"  message-converter="jsonMessageConverter" />

然后我们需要声明一个  生产者服务,一个消费者服务

<bean id="生产者类名首字母小写(服务)" class="类路径.生产者类名手首字母大写"></bean>
<bean id="消费者类名首字母小写(服务)" class="类路径.消费者类名手首字母大写"></bean>

然后生产者功能已经完成了  可以正常发送消息,但是消费者还没有完,消费者还需要监听,这里我们需要监听的是死信队列

	<bean id="QueueListenerAdapter" class="org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter" >
		<constructor-arg ref="消费者类名(服务)(指向刚刚配置的消费者Id)" />
		<property name="defaultListenerMethod" value="consumerOrder"></property>
		<property name="messageConverter" ref="jsonMessageConverter"></property>
	</bean>
	<rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto" concurrency="20">
		<rabbit:listener queues="死信队列的名称die_queue_order" ref="QueueListenerAdapter" />
	</rabbit:listener-container>

配置已经完成了

8.编写生产者类,和消费者类

/**
 * 生产者
 */
public class RabbitProduce {
private static org.slf4j.Logger log = LoggerFactory.getLogger(RabbitProduce.class);
    
    @Resource
    private RabbitTemplate rabbitTemplate;

    private final String Order_Exchanges_Name="正常交换机的名称 queue_order";
    private final String Order_Exchanges_Key="正常交换机的路由 ";

    /**
    * CommonMessage  就是一个普通的实体类,可以自己写一些属性
    */
    public void sendMessage(CommonMessage msg){
        try {
            log.error("发送信息开始");
            System.out.println(rabbitTemplate.getConnectionFactory().getHost());
            //发送信息  queueName交换机,就是上面的routingKey msg.getSource() 为 test_key
            rabbitTemplate.convertAndSend(Order_Exchanges_Name,Order_Exchanges_Key, msg);
            //如果是普通字符串消息需要先序列化,再发送消息
            //rabbitTemplate.convertAndSend(queueName,msg.getSource(), SerializationUtils.serialize(msg));
            log.error("发送信息结束");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }


}
/**
 * 消费者
 */
public class RabbitConsumer  {

    @Resource
    private RabbitTemplate rabbitTemplate;


    /**
     * 消费
     * */
    public void consumerOrder(CommonMessage message) throws IOException {
        //message 的信息就是 死信过来的信息 后面执行自己的逻辑
    }


}

9.其中大部分人都会碰到的问题,就是注入失败,所以消费者类和生产者类 一定要放在spring容器能扫描得到的包,不然@Autowired 无法注入




版权声明:本文为qq_33563609原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。