RabbitMQ – RabbitMQ的确认机制 及 Java实现

  • Post author:
  • Post category:java


什么是RabbitMQ确认机制 (即Publisher Acknowledgements)

使用标准的AMQP 0-9-1,保证消息不丢失的唯一方式即为使用事务 – 将channel设置为事务的,发布消息,然后提交。这种情况下,重量级的事务会将吞吐量降低到原来的1/250。使用确认机制即可拯救这种情形。

为开启确认,客户端需要发送confirm.select方法。取决于no-wait是否被设置,Broker将返回confirm.select-ok。

根据AMQP规范,如果设置了no-wait,服务器将不会响应方法,客户端也不应等待回复。如果服务器无法完成方法,将抛出channel或连接异常

如果channel上调用了confirm.select方法,则这个channel处于confirm模式下。开启事务的channel不可以处于确认模式,反之亦然。

一旦channel处于confirm模式,broker和客户端都会对消息计数(第一个confirm.select后从1开始)。broker在处理消息后,通过在同一个channel中发送basic.ack来确认消息。basic.ack的delivery-tag域包含了此次确认的消息的序列号。broker也可以设置basic.ack中的multiple域,表明到这个序列号为止的消息(包含此序列号)都已经被处理。

Java中发送confirm.select的方法是channel.confirmSelect(),等待ack返回的方法是:channel.waitForConfirmsOrDie();

Negative Acknowledgment (nack)

异常情况下,如果broker无法成功处理消息,则不会返回basic.ack,而将返回basic.nack。这时,basic.nack里面各个域的含义和对应的basic.ack是一样的,除了requeue域需要被忽略。通过nack一条或多条消息,broker表明它无法处理消息,并且拒绝承担相应的处理责任;此时,客户端可以选择重发这些消息。

在channel被设置成confirm模式之后,接下来所有被发布的消息都会被确认或者nack一次。但是一条消息多久之后被确认是没有保证的。不会有消息被同时确认和nack。

只有在负责queue的Erlang进程发生内部错误发生时,basic.nack才会被分发出来。

注:根据AMQP规范

ack的域:ack(

delivery-tag


delivery-tag

,

bit


multiple

),

nack的域:nack(

delivery-tag


delivery-tag

,

bit


multiple

,

bit


requeue

)。

消息何时被确认?

对于无法路由的消息,在exchange确认消息无法被路由到任何queue时(即返回空的queue列表时), broker会发送一条confirm。如果发送消息时指定了mandatory,在basic.ack之前,basic.return会先被发送到客户端。basic.nack(即negative acknowledgements)也是一样的。

对于可被路由的消息,在消息被所有适合的queue接收后,basic.ack会被返回。对于路由到持久化队列(durable queues)中的持久化消息,接收意味着持久化到硬盘;对于镜像队列(mirrored queues),接收意味着所有镜像都已经接收了消息。

持久化消息的Ack 时延

持久化消息被路由到持久化队列时,只有在消息被持久化到硬盘之后,basic.ack才会被返回。为了降低fsync(2)的调用次数,在queue空闲时,或者每隔一段特定时间(数百毫秒),RabbitMQ才会将消息批量存储到硬盘。这意味着,在恒定负载下,basic.ack的延时会达到数百毫秒。为了提高吞吐量,强烈建议各应用使用异步处理ack(当做stream);或者批量发布消息,然后等待响应。各个客户端库里面,实现这种方法的具体API各有不同。

确认与消息的保证送达

在消息被写入disk前,如果broker挂了,则持久化消息会被丢失。在特定情况下,这会导致broker的异常表现。

例如,考虑如下情景(不开启confirm mode时):

  1. 某客户端发布一条持久化消息到持久化queue中
  2. 某另一个客户端消费了queue中的这条消息(注意,消息和queue都是持久化的),但是还没有发送ack。
  3. broker挂掉然后重启了
  4. 消费客户端重新连接,并开始消费消息。

此时,消费客户端认为这条消息会被(broker)再次分发。但事实并非如此:(在批量将消息写入disk前)重启导致broker丢失了消息。为了保证持久化,客户端应该使用确认机制。如果发送端的channel设置为确认模式,那么发送端就不会收到被丢失消息对应的ack(因为消息还没有被写入disk,ack不会被发送)。

Delivery Tag的最大值

confirm ack的delivery tag是个64位的long值,因此最大值为9223372036854775807。

使用Java客户端实现Confirm Mode

/** 
* @Title: ConfirmDontLoseMessages.java 
* @Package com.example.rabbitmq.confirmmode 
* @Description: java实现Confirm Mode
* @author LIUYUEFENG559 
* @date 2016年10月20日 上午10:41:53 
* @version V1.0
*/
package com.example.rabbitmq.confirmmode;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.MessageProperties;
import com.rabbitmq.client.QueueingConsumer;

public class ConfirmDontLoseMessages {
    private final static int msgCount = 10000; // 默认发送10000条消息
    private final static String QUEUE_NAME = "confirm-test2"; // 队列名称
    private static ConnectionFactory connectionFactory;

    public static void main(String[] args)
            throws IOException, InterruptedException {
        connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("10.59.79.37");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        connectionFactory.setPort(5672);

        Thread consumerThread = new Thread(new Consumer());
        Thread publisherThread = new Thread(new Publisher());
        // 开启消费者线程

        consumerThread.start();
        // 开启生产者线程
        publisherThread.start();

    }

    // 消息发布者
    @SuppressWarnings("ThrowablePrintedToSystemOut")
    static class Publisher implements Runnable {
        public void run() {
            try {
                long startTime = System.currentTimeMillis();
                // 连接并非线程安全的,所以要每线程一个连接
                Connection conn = connectionFactory.newConnection();
                Channel ch = conn.createChannel();
                // 创建一个持久化的,非独享,不自动删除的队列
                ch.queueDeclare(QUEUE_NAME, true, false, false, null);
                // 开启通道上的 publisher acknowledgements
                ch.confirmSelect();

                // 发送持久化消息,消息内容为helloWorld
                for (long i = 0; i < msgCount; ++i) {
                    ch.basicPublish("", QUEUE_NAME,
                            MessageProperties.PERSISTENT_BASIC,
                            "helloWorld".getBytes());
                }

                // 等待所有消息都被ack或者nack,如果某个消息被nack,则抛出IOException
                ch.waitForConfirmsOrDie();
                long endTime = System.currentTimeMillis();
                System.out.printf("Test took %.3fs\n",
                        (float) (endTime - startTime) / 1000);
                // 删除队列,不论是否在使用中
                ch.queueDelete(QUEUE_NAME);
                ch.close();
                conn.close();
            } catch (Throwable e) {
                System.out.println("damn fuck! error detected :(");
                System.out.print(e);
            }
        }
    }

    // 消息消费者
    static class Consumer implements Runnable {

        public void run() {
            try {
                // 每线程一个连接
                Connection conn = connectionFactory.newConnection();
                Channel ch = conn.createChannel();
                ch.queueDeclare(QUEUE_NAME, true, false, false, null);

                // 创建消息消费者
                QueueingConsumer qc = new QueueingConsumer(ch);
                ch.basicConsume(QUEUE_NAME, true, qc);
                for (int i = 0; i < msgCount; ++i) {
                    qc.nextDelivery();
                }
                // 关闭通道和连接
                System.out.println("consumer done");
                ch.close();
                conn.close();
            } catch (Throwable e) {
                System.out.println("damn fuck! some error happened!");
                System.out.print(e);
            }
        }
    }
}

参考:


https://www.rabbitmq.com/amqp-0-9-1-reference.html



https://www.rabbitmq.com/confirms.html



http://hg.rabbitmq.com/rabbitmq-java-client/file/default/test/src/com/rabbitmq/examples/ConfirmDontLoseMessages.java



版权声明:本文为qiutongyeluo原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。