RabbitMQ死信机制实现延迟队列

  • Post author:
  • Post category:其他




延迟队列

延迟队列存储的对象肯定是对应的延时消息,所谓”延时消息”是指当消息被发送以后,并不想让消费者立即拿到消息,而是等待指定时间后,消费者才拿到这个消息进行消费

RabbitMQ本身没有直接支持延迟队列功能,但是可以通过以下特性模拟出延迟队列的功能。 但是我们可以通过RabbitMQ的两个特性来曲线实现延迟队列:Time To Live(TTL) 和 Dead Letter Exchanges(DLX)

Time To Live(TTL)

RabbitMQ可以针对Queue和Message设置 x-message-tt,来控制消息的生存时间,如果超时,则消息变为dead letter(死信)



死信队列

DLX(Dead Letter Exchange),死信交换器。当队列中的消息被拒绝、或者过期会变成死信,死信可以被重新发布到另一个交换器,这个交换器就是DLX,与DLX绑定的队列称为死信队列。

造成死信的原因:

信息被拒绝

信息超时

超过了队列的最大长度

可以通过设置x-dead-letter-exchange参数指定DLX,设置x-dead-letter-routing-key指定DLX使用的路由键。

在这里插入图片描述

  • 生产者
package com.ghgcn.mq.test02;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;

public class ProducerDLX {

	private static String userName = "root";
	private static String password = "root123";
	private static String host = "localhost";
	private static int port = 5672;

	public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {

		ConnectionFactory factory = new ConnectionFactory();

		factory.setUsername(userName);
		factory.setPassword(password);
		factory.setHost(host);
		factory.setPort(port);

		// 1.建立连接
		Connection conn = factory.newConnection();
		// 2.建立信道
		Channel channel = conn.createChannel();
		// 3.声明交换器
		channel.exchangeDeclare("exchange.dlx", "direct", true);
		channel.exchangeDeclare("exchange.normal", "fanout", true);
		
		//4. 声明队列
		Map<String, Object> arg = new HashMap<String, Object>();
		// 设置DLX
		arg.put("x-dead-letter-exchange", "exchange.dlx");
		arg.put("x-dead-letter-routing-key", "routingkey.dlx");
		// 设置消息过期时间,消息过期后,会重新发布到DLX
		arg.put("x-message-ttl", 10000);
		
		channel.queueDeclare("queue.normal", true, false, false, arg);
		//	死信队列
		channel.queueDeclare("queue.dlx", true, false, false, null);
	
		//4. 绑定队列
		channel.queueBind("queue.normal", "exchange.normal", "");
		//这里的routingkey与上面设置的一样
		channel.queueBind("queue.dlx", "exchange.dlx", "routingkey.dlx");
		
		
		byte[] body = "死信测试  ".getBytes();;
		
		channel.basicPublish("exchange.normal", "", MessageProperties.PERSISTENT_TEXT_PLAIN, body );
		
	
		TimeUnit.SECONDS.sleep(10);
		channel.close();
		conn.close();
	}

}

  • 消费者
package com.ghgcn.mq.test02;

import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class DLXConsumer {


	private static String userName = "root";
	private static String password = "root123";
	private static String host = "localhost";
	private static int port = 5672;

	public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {

		ConnectionFactory factory = new ConnectionFactory();

		factory.setUsername(userName);
		factory.setPassword(password);
		factory.setHost(host);
		factory.setPort(port);

		// 1.建立连接
		Connection conn = factory.newConnection();
		// 2.建立信道
		Channel channel = conn.createChannel();
		channel.exchangeDeclare("exchange.dlx", "direct", true);
		channel.queueDeclare("queue.dlx", true, false, false, null);
		
		channel.queueBind("queue.dlx", "exchange.dlx", "routingkey.dlx");
		
		channel.basicConsume("queue.dlx", new DefaultConsumer(channel) {

			@Override
			public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
					throws IOException {
				
				System.err.println(" 死信 "+consumerTag);
				System.err.println(" 死信  body"+new String(body));
				
				channel.basicAck(envelope.getDeliveryTag(), false);
				
			}
			
		});
		
		
		
		TimeUnit.SECONDS.sleep(100);
		channel.close();
		conn.close();
	}

}



延迟队列

延迟队列存储的是延迟消息,延迟消息指的是,当消息被发发布出去之后,并不立即投递给消费者,而是在指定时间之后投递。如:在订单系统中,订单有30秒的付款时间,在订单超时之后在投递给消费者处理超时订单。

rabbitMq没有直接支持延迟队列,可以通过死信队列实现。在死信队列中,可以为普通交换器绑定多个消息队列,假设绑定过期时间为5分钟,10分钟和30分钟,3个消息队列,然后为每个消息队列设置DLX,为每个DLX关联一个死信队列。当消息过期之后,被转存到对应的死信队列中,然后投递给指定的消费者消费。

在这里插入图片描述

在这里插入图片描述



生产者

package com.ghgcn.mq.test02;

import java.io.IOException;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;

public class DelayProducer {

	private static String userName = "root";
	private static String password = "root123";
	private static String host = "localhost";
	private static int port = 5672;

	public static void main(String[] args) throws IOException, TimeoutException {

		ConnectionFactory factory = new ConnectionFactory();

		factory.setUsername(userName);
		factory.setPassword(password);
		factory.setHost(host);
		factory.setPort(port);

		// 1.建立连接
		Connection conn = factory.newConnection();
		// 2.建立信道
		Channel channel = conn.createChannel();
		
		channel.exchangeDeclare("exchange.delay", "direct", true, false,
				false, null);
		// 创建dlx,用于将过期的message路由到不同的队列
		channel.exchangeDeclare("exchange.dlx-5s", "fanout", true, false,
				false, null);
		// 创建死信队列,接收过期的message
		channel.queueDeclare("queue-delay-5s", true, false, false, null);

		// 创建两个消息过期队列,并设置dlx
		Map<String, Object> arg = new HashMap<String, Object>();
		arg.put("x-dead-letter-exchange", "exchange.dlx-5s");
		arg.put("x-message-ttl", 5000);
		channel.queueDeclare("queue-5s", true, false, false, arg);

		// 队列与交换器绑定
		channel.queueBind("queue-5s", "exchange.delay", "routingkey-5s");
		channel.queueBind("queue-delay-5s", "exchange.dlx-5s", "");

		// 发布消息
		channel.basicPublish("exchange.delay", "routingkey-5s",
				MessageProperties.PERSISTENT_TEXT_PLAIN,
				("Message-5s"+new Date()).getBytes());
		//释放资源
		
		channel.close();
		
		conn.close();
		
	}

}



消费者

package com.ghgcn.mq.test02;

import java.io.IOException;
import java.util.Date;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class Queue5s {
	private static String userName = "root";
	private static String password = "root123";
	private static String host = "localhost";
	private static int port = 5672;

	public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {

		ConnectionFactory factory = new ConnectionFactory();

		factory.setUsername(userName);
		factory.setPassword(password);
		factory.setHost(host);
		factory.setPort(port);

		// 1.建立连接
		Connection conn = factory.newConnection();
		// 2.建立信道
		Channel channel = conn.createChannel();

		// 创建dlx,用于将过期的message路由到不同的队列
		channel.exchangeDeclare("exchange.dlx-5s", "fanout", true, false, false, null);
		// 创建死信队列,接收过期的message
		channel.queueDeclare("queue-delay-5s", true, false, false, null);

		channel.queueBind("queue-delay-5s", "exchange.dlx-5s", "");
		
		
		
		channel.basicConsume("queue-delay-5s", new DefaultConsumer(channel) {

			@Override
			public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
					throws IOException {
				
				System.err.println("queue-delay-5s  "+new String(body)+"  接收时间  "+ new Date());
				channel.basicAck(envelope.getDeliveryTag(), false);
				System.out.println("============确认完成 =============");
			}
			
		});

		TimeUnit.SECONDS.sleep(100);
		channel.close();
		conn.close();
	}

}

在这里插入图片描述

  • x-dead-letter-exchange”, “exchange.dlx-5s”); 声明死信交换器
  • “x-message-ttl”, 5000); 过期时间
  • 建立将死信交换器设置为fanout
  • routingKey会默认与原来的一样
queue-delay-5s  Message-5sThu May 30 17:56:00 CST 2019  接收时间  Thu May 30 17:56:05 CST 2019============确认完成 =============

envelope Envelope(deliveryTag=1, redeliver=false, exchange=exchange.dlx-5s, routingKey=routingkey-5s)
properties #contentHeader<basic>(content-type=text/plain, content-encoding=null, headers={x-first-death-exchange=exchange.delay, x-death=[{reason=expired, count=1, exchange=exchange.delay, time=Thu May 30 17:56:05 CST 2019, routing-keys=[routingkey-5s], queue=queue-5s}], x-first-death-reason=expired, x-first-death-queue=queue-5s}, delivery-mode=2, priority=0, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)



版权声明:本文为ko0491原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。