RabbitMQ系列教程(六)RabbitMQ Exchange类型之headers Exchange

  • Post author:
  • Post category:其他




RabbitMQ 四种Exchange类型



1. headers Exchange介绍

headers Exchange与

Direct、topic、fanout

不同,它时通过匹配AMQP消息的header而非路由键。


headers Exchange



Direct Exchange

类似,性能方面比

Direct Exchange

差很多,所以在实际项目中用的很少,通过一张图来直观感受一下其工作流程

在这里插入图片描述



2. Java代码实现

  • 生产者
public static void main(String[] args) throws Exception{
        //1 创建ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        //设置虚拟主机
        connectionFactory.setVirtualHost("/");
        //创建Connection
        Connection connection = connectionFactory.newConnection();
        //创建Channel
        Channel channel = connection.createChannel();
        //交换机名称
        String exchangeName = "test_headers_exchange";

        //消息头
        Map<String,Object> headers=new HashMap<>();
        headers.put("format","JSON");
        headers.put("type","pdf");
        //添加到headers
        AMQP.BasicProperties basicProperties = new AMQP.BasicProperties.Builder()
                .headers(headers)
                .build();

        // routingKey
        //发送5条消息
        for (int i=1;i<=5;i++) {
            String msg = "RabbitMQ headers Exchange  TEST"+i;
            channel.basicPublish(exchangeName, "", basicProperties , msg.getBytes());
        }
        channel.close();
        connection.close();
    }
  • 消费者

在声明消息队列时,需要指定header参数,其中

x-match

为特殊的

headers

,为

all

时则表示要匹配所有的header,如果为

any

则表示只要匹配其中的一个header即可

 Map<String,Object> arguments=new HashMap<>();
        arguments.put("x-match","all");
        //arguments.put("x-match","any");
        //声明一个队列
        channel.queueDeclare(queueName, false, false, false, arguments);
public static void main(String[] args) throws Exception{

        ConnectionFactory connectionFactory = new ConnectionFactory() ;

        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        connectionFactory.setAutomaticRecoveryEnabled(true);
        connectionFactory.setNetworkRecoveryInterval(3000);
        Connection connection = connectionFactory.newConnection();

        Channel channel = connection.createChannel();
        //交换机名称
        String exchangeName = "test_headers_exchange";
        //交换机类型
        String exchangeType = "headers";
        //消息队列名称
        String queueName = "test_fanout_queue";
        //fanout模式不以routingKey为匹配规则,可以为空
        String routingKey = " ";

        //声明交换机
        channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);

        //队列的参数
        //x-match为特殊的headers,为all时则表示要匹配所有的header,如果为any则表示只要匹配其中的一个header即可
        Map<String,Object> arguments=new HashMap<>();
        arguments.put("x-match","all");
        //arguments.put("x-match","any");
        //声明一个队列
        channel.queueDeclare(queueName, false, false, false, arguments);
        //建立Exchange 和Queue绑定关系
        channel.queueBind(queueName, exchangeName, routingKey);


        //参数说明:队列名称、是否自动ACK、DefaultConsumer(消费者)
            channel.basicConsume(queueName, true,new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                                           byte[] body) throws IOException {

                    System.out.println("收到消息:"+new String(body));
                }
            });
    }

依次启动

消费者



生产者

,在

消费者

控制台可以看到接收到消息的输出



RabbitMQ系列文章目录


1、RabbitMQ Windows/CentOS7平台安装手册



2、RabbitMQ中一些重要概念



3、RabbitMQ Exchange类型之Direct Exchange



4、RabbitMQ Exchange类型之Topic Exchange



5、RabbitMQ Exchange类型之fanout Exchange



6、RabbitMQ Exchange类型之headers Exchang



7、Confirm消息确认机制



8、RabbitMQ中ReturnListener的使用



9、RabbitMQ消费端限流



10、ACK确认机制与消息补偿



11、RabbitMQ队列/消息的生存时间(Time-To-Live)



12、RabbitMQ死信队列(Dead Letter Exchanges)



13、Spring AMQP API详解



14、Spring Boot整合RabbitMQ