java操作RabbitMQ

  • Post author:
  • Post category:java

1、创建虚拟主机、交换机、队列

RabbitMQ提供了自己的管理界面,可以通过管理界面来完成VirtualHost、Exchange、queue的创建。

1.1创建VirtualHost

在这里插入图片描述

1.2创建交换机

创建交换机的时候需要指定虚拟主机以及交换机的类型(direct(路由模式)、fanout(广播)、headers、topic)
direct:Exchange通过消息携带的路由键来将消息分发到对应的队列中
fanout:Exchange将消息分发到所有绑定到交换机的队列中
headers:Exchange通过判断消息头的值是否与绑定的值相匹配来分发消息
x-match为any时,消息头的任意一个值匹配就可以满足条件
x-match为all时,消息头的所有值匹配才能满足条件
topic:Exchange将满足路由规则的消息分发到对应的队列
在这里插入图片描述

1.3创建队列

创建队列时往往需要绑定到交换机上。
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
这是我们的虚拟主机以及交换机、队列就创建好了。我们可以使用java来操作。

2、java操作RabbitMQ

2.1导入MQ的依赖
	<dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.6.0</version>
    </dependency>
2.2创建与MQ的连接
public static Connection getConnection() throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();//MQ采用工厂模式来完成连接的创建
        //2.在工厂对象中设置连接信息(ip,port,virtualhost,username,password)
        factory.setHost("xxxxxx");//设置MQ安装的服务器ip地址
        factory.setPort(5672);//设置端口号
        factory.setVirtualHost("host1");//设置虚拟主机名称
        //MQ通过用户来管理
        factory.setUsername("xxxxxx");//设置用户名称
        factory.setPassword("123456");//设置用户密码
        //3.通过工厂对象获取连接
        Connection connection = factory.newConnection();
        return connection;
    }
2.3消息生产者发送消息到消息队列
//获取连接
		Connection connection = MQUtil.getConnection();
		//mq提供Channel来将处理消息
		//创建Channel
        Channel channel = connection.createChannel();
        String msg = "hello world"//basicPublish将消息发送到指定的交换机
        channel.basicPublish("ex3", "a", null, msg.getBytes());
     	//关闭连接
        channel.close();
        connection.close();
2.4消息消费者从消息队列中获取消息
//获取与MQ的连接
		Connection connection = MQUtil.getConnection();
		//创建Channel
        Channel channel = connection.createChannel();
        //通过basicConsumer方法从指定队列中获取消息,消息生产者会通过ex2交换机中的key值将消息发送到queue6中,因为在创建queue6时绑定到交换机ex3中,指定的路由键为a。
        //consumer参数是消息接收之后的处理方法
        channel.basicConsume("queue6",true,consumer);
        //获取Consumer
        Consumer consumer = new DefaultConsumer(channel){
        //重写handleDelivery方法(这个方法是消息的处理过程)
        //body参数就是接受到的消息
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            //将消息转换成String类型然后打印
                String msg = new String(body);
                System.out.println(msg);
            }
        };

在这里插入图片描述

3、SpringBoot使用RabbitMQ

3.1导入依赖
		<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
3.2生产者发送消息

编写配置文件

server:
  port: 8001
spring:
  application:
    name: producer

  rabbitmq:
    host: xxxxxx #mq服务器的ip地址
    port: 5672
    virtual-host: host3
    username: xxxxxx
    password: xxxxxx

SpringBoot提供了AmqpTemplate来操作RabbitMQ

//注入AmqpTemplate
private AmqpTemplate amqpTemplate;
//通过converAndSend方法发送消息
String msg = "hello world";
amqpTemplate.converAndSend("ex3","b",msg);
3.3消费者接受消息

编写配置文件

server:
  port: 8002
spring:
  application:
    name: consumer

  rabbitmq:
    host: xxxxxx #mq服务器的ip地址
    port: 5672
    virtual-host: host3
    username: xxxxxx
    password: xxxxxx

SpringBoot中提供监听器来监听队列
@RabbitListener注解监听队列
@RabbitHandle注解作用于方法,用来处理消息

@RabbitListener(queues = "queue7")
public class MsgService {
    @RabbitHandler
    public void getMsg(String msg){
        System.out.println(msg);
    }
}

在这里插入图片描述
以上就是java操作RabbitMQ的基本内容。


版权声明:本文为qq_39411354原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。