1.创建连接工具类
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* 连接工具类
*/
public class RabbitMQUtils {
private static ConnectionFactory factory;
static {
//类加载时只执行一次
factory = new ConnectionFactory();
factory.setHost("mq的主机IP地址");
factory.setPort(5672);
factory.setVirtualHost("/zz");
factory.setUsername("root");
factory.setPassword("root");
}
/**
* 定义提供连接对象的方法
* @return
*/
public static Connection getConnection(){
try {
return factory.newConnection();
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
/**
* 关闭通道和关闭连接工具的方法
* @param channel
* @param connection
*/
public static void closeConnectionAndChanel(Channel channel,Connection connection){
try {
if(channel != null){
channel.close();
}
if(connection != null){
connection.close();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
2.修改生产者代码
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.junit.Test;
import utils.RabbitMQUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 生产者
*/
public class Provider {
//生产消息
@Test
public void testSendMessage() throws IOException, TimeoutException {
//获取连接对象
Connection connection = RabbitMQUtils.getConnection();
//通过链接获取通道对象
Channel channel = connection.createChannel();
//通道绑定对应的消息队列
//参数1:队列名称,不存在会自动创建
//参数2:定义队列是否要持久化 true 持久化队列 false 不持久化
//参数3:是否独占队列(当前队列只允许当前连接使用)
//参数4:是否在消费完成后自动删除队列
//参数5:额外附加参数
channel.queueDeclare("hello",false,false,false,null);
//发布消息
//参数1:交换机名称
// 参数2:队列名称
// 参数3:传递消息的额外设置 设置MessageProperties.PERSISTENT_TEXT_PLAIN属性 持久化消息
// 参数4:消息的具体内容(需要一个字节数组)
channel.basicPublish("","hello",null,"hello rabbitmq".getBytes());
//关闭
RabbitMQUtils.closeConnectionAndChanel(channel,connection);
}
}
3.修改消费者代码
import com.rabbitmq.client.*;
import utils.RabbitMQUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 消费者
*/
public class Customer {
public static void main(String[] args) throws IOException, TimeoutException {
//获取连接对象
Connection connection = RabbitMQUtils.getConnection();
//通过链接获取通道对象
Channel channel = connection.createChannel();
//通道绑定对应的消息队列(参数尽量和生产者中的参数保持一致)
//参数1:队列名称,不存在会自动创建
//参数2:定义队列是否要持久化 true 持久化队列 false 不持久化
//参数3:是否独占队列(当前队列只允许当前连接使用)
//参数4:是否在消费完成后自动删除队列
//参数5:额外附加参数
channel.queueDeclare("hello",false,false,false,null);
//消费消息
//参数1:消费哪个消息 队列名称
//参数2:是否开启消息的自动确认机制
//参数3:消费时的回调接口
channel.basicConsume("hello",true,new DefaultConsumer(channel){
//最后一个参数:消息队列中取出消息
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("打印消息:"+new String(body));
}
});
}
}
更多:
RabbitMQ学习笔记一:了解及在Linux下安装RabbitMQ
版权声明:本文为weixin_45020617原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。