rabbitMq存在消息丢失的风险,包括生产者消息丢失,服务器消息丢失,消费者消息丢失这3种情况。rabbitMq发布确认是为了解决生产者消息丢失的问题,通过发布确认机制确保消息已经发送到Broker(已经保存到磁盘上)。服务器消息丢失主要是通过设置队列的持久化,交换机的持久化,消息的持久化进行解决。消费者消息丢失主要是通过消费者消费完消息后手动确认的方式进行解决。
本文对rabbitMq发布确认进行介绍。
消息发布确认主要包括单条消息发布确认,批量消息发布确认,消息异步确认这三种方案,在工作中选择使用消息异步确认的方案是最多最常用的。
1.单条消息的发布确认。
单条消息发布的方式效率是最低的,但是编程方式来说是最简单的。
channel.confirmSelect();开启消息发布确认,channel.waitForConfirms();确认消息是否发布到了Broker。
/**
* 单条发布确认
* @throws IOException
* @throws TimeoutException
* @throws InterruptedException
*/
public static void publicMessageConfirmOne() throws IOException, TimeoutException, InterruptedException {
Channel channel = RabbitUtils.getChannel();
// 开启消息的发布确认,消息需要被发送到队列才认为消息是发送成功的
channel.confirmSelect();
String queueName = "oneConfirmQueue";
// 队列声明
channel.queueDeclare(queueName, true, false, false, null);
long startTime = System.currentTimeMillis();
channel.basicPublish("", queueName, null, "message消息".getBytes());
// 消息发布后进行确认,确认消息是否发送成功
boolean flag = channel.waitForConfirms();
long endTime = System.currentTimeMillis();
System.out.println("发布消息" + flag + ", 消息发布确认耗时:" + (endTime - startTime));
}
2.批量消息的发布确认。
批量消息的发布确认其实和单条消息的确认方案非常像,只是在消息确认的时候是以一个批次多条消息的方式进行确认。但这种方式有一个缺点一旦消息发布确认结果是失败的,就只能知道是这个批次中有某些消息是未发送到Broker,但不知道具体是哪条消息。这种方式一般用得非常少。
/**
* 批量发布确认(实际上就是发送了多条消息之后再确认一下,本批次的多条消息是否发送成。但一旦发布失败,只能知道这批次中有发送失败的消息不知道具体是哪条失败了)
* @throws IOException
* @throws TimeoutException
* @throws InterruptedException
*/
public static void publicMessageConfirmMany() throws IOException, TimeoutException, InterruptedException {
Channel channel = RabbitUtils.getChannel();
// 开启消息的发布确认,消息需要被发送到队列才认为消息是发送成功的
channel.confirmSelect();
String queueName = "manyConfirmQueue";
int messsageCount = 100;
// 队列声明
channel.queueDeclare(queueName, true, false, false, null);
long startTime = System.currentTimeMillis();
for (int i = 0; i < messsageCount; i++) {
channel.basicPublish("", queueName, null, ("message消息" + i).getBytes());
// 每10条作为一个批次进行确认
if (i%10 == 0) {
// 消息发布后进行确认,确认消息是否发送成功
boolean flag = channel.waitForConfirms();
System.out.println("发布消息" + flag);
}
}
long endTime = System.currentTimeMillis();
System.out.println(" 消息发布确认耗时:" + (endTime - startTime));
}
3.消息异步确认。
消息异步确认主要是通过设置监听器,提供确认成功和确认失败的回调方法给Broker通知生产者消息是否发送成功。
channel.addConfirmListener(ackCallback, nackCallback);第一个参数是确认成功的回调接口,第二个参数是确认失败的回调接口。
/**
* 消息发布异步确认
* @throws IOException
* @throws TimeoutException
* @throws InterruptedException
*/
public static void publicMessageConfirmAsync() throws IOException, TimeoutException, InterruptedException {
Channel channel = RabbitUtils.getChannel();
// 开启消息的发布确认,消息需要被发送到队列才认为消息是发送成功的
channel.confirmSelect();
// 消息确认成功回调函数
ConfirmCallback ackCallback = new ConfirmCallback() {
@Override
public void handle(long deliveryTag, boolean multiple) throws IOException {
System.out.println("消息发送成功" + deliveryTag);
}
};
// 新消息确认失败回调函数
ConfirmCallback nackCallback = new ConfirmCallback() {
@Override
public void handle(long deliveryTag, boolean multiple) throws IOException {
System.out.println("消息发送失败" + deliveryTag);
}
};
// 消息发送确认监听器,服务器端回回调这个接口,告诉这个接口消息发送究竟是成功还是失败
channel.addConfirmListener(ackCallback, nackCallback);
String queueName = "asyncConfirmQueue";
int messsageCount = 100;
// 队列声明
channel.queueDeclare(queueName, true, false, false, null);
long startTime = System.currentTimeMillis();
for (int i = 0; i < messsageCount; i++) {
channel.basicPublish("", queueName, null, ("message消息" + i).getBytes());
}
long endTime = System.currentTimeMillis();
System.out.println(" 消息发布确认耗时:" + (endTime - startTime));
}