RabbitMQ 四种Exchange类型
-
RabbitMQ Exchange类型之Direct Exchange
-
RabbitMQ Exchange类型之Topic Exchange
-
RabbitMQ Exchange类型之fanout Exchange
-
RabbitMQ Exchange类型之headers Exchange
1. headers Exchange介绍
headers Exchange与
Direct、topic、fanout
不同,它时通过匹配AMQP消息的header而非路由键。
headers Exchange
与
Direct Exchange
类似,性能方面比
Direct Exchange
差很多,所以在实际项目中用的很少,通过一张图来直观感受一下其工作流程
2. Java代码实现
- 生产者
public static void main(String[] args) throws Exception{
//1 创建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
//设置虚拟主机
connectionFactory.setVirtualHost("/");
//创建Connection
Connection connection = connectionFactory.newConnection();
//创建Channel
Channel channel = connection.createChannel();
//交换机名称
String exchangeName = "test_headers_exchange";
//消息头
Map<String,Object> headers=new HashMap<>();
headers.put("format","JSON");
headers.put("type","pdf");
//添加到headers
AMQP.BasicProperties basicProperties = new AMQP.BasicProperties.Builder()
.headers(headers)
.build();
// routingKey
//发送5条消息
for (int i=1;i<=5;i++) {
String msg = "RabbitMQ headers Exchange TEST"+i;
channel.basicPublish(exchangeName, "", basicProperties , msg.getBytes());
}
channel.close();
connection.close();
}
- 消费者
在声明消息队列时,需要指定header参数,其中
x-match
为特殊的
headers
,为
all
时则表示要匹配所有的header,如果为
any
则表示只要匹配其中的一个header即可
Map<String,Object> arguments=new HashMap<>();
arguments.put("x-match","all");
//arguments.put("x-match","any");
//声明一个队列
channel.queueDeclare(queueName, false, false, false, arguments);
public static void main(String[] args) throws Exception{
ConnectionFactory connectionFactory = new ConnectionFactory() ;
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setAutomaticRecoveryEnabled(true);
connectionFactory.setNetworkRecoveryInterval(3000);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//交换机名称
String exchangeName = "test_headers_exchange";
//交换机类型
String exchangeType = "headers";
//消息队列名称
String queueName = "test_fanout_queue";
//fanout模式不以routingKey为匹配规则,可以为空
String routingKey = " ";
//声明交换机
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
//队列的参数
//x-match为特殊的headers,为all时则表示要匹配所有的header,如果为any则表示只要匹配其中的一个header即可
Map<String,Object> arguments=new HashMap<>();
arguments.put("x-match","all");
//arguments.put("x-match","any");
//声明一个队列
channel.queueDeclare(queueName, false, false, false, arguments);
//建立Exchange 和Queue绑定关系
channel.queueBind(queueName, exchangeName, routingKey);
//参数说明:队列名称、是否自动ACK、DefaultConsumer(消费者)
channel.basicConsume(queueName, true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
System.out.println("收到消息:"+new String(body));
}
});
}
依次启动
消费者
和
生产者
,在
消费者
控制台可以看到接收到消息的输出
RabbitMQ系列文章目录
1、RabbitMQ Windows/CentOS7平台安装手册
2、RabbitMQ中一些重要概念
3、RabbitMQ Exchange类型之Direct Exchange
4、RabbitMQ Exchange类型之Topic Exchange
5、RabbitMQ Exchange类型之fanout Exchange
6、RabbitMQ Exchange类型之headers Exchang
7、Confirm消息确认机制
8、RabbitMQ中ReturnListener的使用
9、RabbitMQ消费端限流
10、ACK确认机制与消息补偿
11、RabbitMQ队列/消息的生存时间(Time-To-Live)
12、RabbitMQ死信队列(Dead Letter Exchanges)
13、Spring AMQP API详解
14、Spring Boot整合RabbitMQ