4- java client api

  • Post author:
  • Post category:java


java client api 手册

预览

RabbitMQ java 客户端使用 com.rabbitmq.client 作为最上层的包名. 关键的类和接口是:

Channel
Connection
ConnectionFactory
Consumer

Channel 接口包含发送消息,消费消息等协议相关的操作. Connection 被用来创建 channels, 注册连接生命周期相关的handlers, 并在不用的时候 close connections. Connections 通过ConnectionFactory实例化.

Connections and Channels(连接和渠道)

api的核心类是 Connection and Channel, 代表了 AMQP 0-9-1 模型的 connection and an channel, 通过下面方式导入Connection和Channel:

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

Connecting to a broker(连接到broker)

下面的代码是连接到一个 AMQP broker的例子, 需要给定相关参数:

ConnectionFactory factory = new ConnectionFactory();
factory.setUsername(userName);
factory.setPassword(password);
factory.setVirtualHost(virtualHost);
factory.setHost(hostName);
factory.setPort(portNumber);
Connection conn = factory.newConnection();

作为另外一种选择,可以使用 URIs 的方式:

ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://userName:password@hostName:portNumber/virtualHost");
Connection conn = factory.newConnection();

Connection 接口用来创建一个Channel

Channel channel = conn.createChannel();

断开连接,需要关闭Channel和Connection:

channel.close();
conn.close();

使用 Exchanges and Queues

客户的应用会和 exchanges 和 queues一起工作. 在使用exchanges和queues之前我们必须先 “declared”. Declaring 是为了确保相应命名的exchange和queue存在,必要的时候可以创建一个exchange或者queue.

继续上面的例子,下面的代码声明一个exchange和queue,并将它们”bind”一起.

channel.exchangeDeclare(exchangeName, "direct", true);
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, exchangeName, routingKey);

上面的代码声明了一个

  • 持久的, 非自动删除的, direct类型的exchange(a durable, non-autodelete exchange of “direct” type);

  • 一个非持久的, 专用的, 自动删除的服务端命名的queue(a non-durable, exclusive, autodelete queue with a generated name).

  • 并通过queueBind方法把他们通过一个routing key关联起来.

注意: 上面是一种典型的声明queue的方式, 这种场景下, 只有一个客户端可以使用queue, queue不需要有一个显著的名字, 其他的客户端不能使用(exclusive), 而且会自动删除. 如果多个客户端想共享一个queue,需要给queue一个大众的名字, 如下:

channel.exchangeDeclare(exchangeName, "direct", true);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);

这声明了一个:

– a durable, non-autodelete exchange of “direct” type

– a durable, non-exclusive, non-autodelete queue with a well-known name

注意: 所有Channel的API都是重载的. 便利的短形式的 exchangeDeclare, queueDeclare 和 queueBind 会使用一些参数的默认值. 也有一些长形式的api,可以让你控制参数的值.

这些 “short form, long form” 的模式贯穿客户端api的使用.

Publishing messages

发布一个消息, 使用 Channel.basicPublish,代码示下:

byte[] messageBodyBytes = "Hello, world!".getBytes();
channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);

为了更好的控制, 可以重载一些变量声明一下必要的flag, 或预设置一些消息的属性.

channel.basicPublish(exchangeName, routingKey, mandatory,
                     MessageProperties.PERSISTENT_TEXT_PLAIN,
                     messageBodyBytes);

这发布了一个 delivery mode 2 (persistent), priority 1 and content-type “text/plain”的消息. 你也可以使用 a Builder class 去设置一些自定义的属性. 例如:

channel.basicPublish(exchangeName, routingKey,
             new AMQP.BasicProperties.Builder()
               .contentType("text/plain")
               .deliveryMode(2)
               .priority(1)
               .userId("bob")
               .build()),
               messageBodyBytes);

下面的例子的消息带了一些自定义的header:

Map<String, Object> headers = new HashMap<String, Object>();
headers.put("latitude",  51.5252949);
headers.put("longitude", -0.0905493);

channel.basicPublish(exchangeName, routingKey,
             new AMQP.BasicProperties.Builder()
               .headers(headers)
               .build()),
               messageBodyBytes);

下面例子的消息带了过期时间:

channel.basicPublish(exchangeName, routingKey,
             new AMQP.BasicProperties.Builder()
               .expiration("60000")
               .build()),
               messageBodyBytes);

我们就不在这里展示所有的例子了.

备注: BasicProperties是 class AMQP的一个内部类.

Invocations of Channel#basicPublish will eventually block if a resource-driven alarm is in effect.

渠道和并发的考虑: Channels and Concurrency Considerations (Thread Safety)

Channel 实例不能在线程直接共享. 在多线程中最好一个线程对应一个Channel实例. 如果多个线程并发操作一个channel, 可能会产生发送的消息不正确,和消费确认不正确.

订阅接收消息

import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;

最有效的接收消息方式是通 Consumer 接口定义建立一个订阅关系. 之后消息就会被自动的发送到consumer.

当我们调用api和Consumers相关的方法时, 通常会使用一个consumer tag代表一个私人的订阅关系. ‘consumer tag”可以在客户端或者服务端产生. 不同的consumer在相同的channel上必须有不同的consumer tag.

最简单的实现一个 Consumer 是集成默认的实现类 DefaultConsumer.这个对象可以作为 basicConsume call 的一个参数建立订阅关系:

boolean autoAck = false;
channel.basicConsume(queueName, autoAck, "myConsumerTag",
     new DefaultConsumer(channel) {
         @Override
         public void handleDelivery(String consumerTag,
                                    Envelope envelope,
                                    AMQP.BasicProperties properties,
                                    byte[] body)
             throws IOException
         {
             String routingKey = envelope.getRoutingKey();
             String contentType = properties.getContentType();
             long deliveryTag = envelope.getDeliveryTag();
             // (process the message components here ...)
             channel.basicAck(deliveryTag, false);
         }
     });

这里,因为我们设置了 autoAck = false, 所有Consumer必须确认发送给它的消息, 最方便的方式是在handleDelivery 方法中确认.

复杂一点的场景是 Consumers 需要重载更多的方法。 特别的, 当 channels 和 connections close 的时候调用 handleShutdownSignal 方法 , handleConsumeOk 当consumer 被注册成功是调用.

Consumers 也有可能实现 handleCancelOk 和 handleCancel方法处理显式或者(非显式,queue被删除等)的取消订阅关系.

可以显式的取消一个consumer: Channel.basicCancel:

channel.basicCancel(consumerTag);

传递一个consumerTag.

Consumers 的回调方法在一个单独的线程中, 和Connection管理的线程是分离的, 这意味着Consumer可以安全的调用一些Connection和Channel的阻塞方法: 例如 queueDeclare, txCommit, basicCancel 或者 basicPublish.

每一个channel都有自己的分发线程.最通用的使用方式是一个 Consumer 对应一个 Channel.如果你有多个consumer在一个Channel上, 注意 一个运行期很长的Consumer 将会耽搁相同Channel上的其他消费者.

收私人消息Retrieving individual messages

使用 Channel.basicGet 可以显式的收取消息. 返回的值是一个GetResponse实例,我们可以extract出消息头和消息体信息.

boolean autoAck = false;
GetResponse response = channel.basicGet(queueName, autoAck);
if (response == null) {
    // No message retrieved.
} else {
    AMQP.BasicProperties props = response.getProps();
    byte[] body = response.getBody();
    long deliveryTag = response.getEnvelope().getDeliveryTag();
    ...

因为上面” autoAck = false “,必须调用 Channel.basicAck 确认成功收到消息:

    ...
    channel.basicAck(method.deliveryTag, false); // acknowledge receipt of the message
}

处理不能路由的消息Handling unroutable messages

如果一个消息在发送的时候 “mandatory” flags 被设置了, 但是又不能被路由, broker将把这个消息返回给发送的客户端. (通过一个 AMQP.Basic.Return 命令).

对于这种返回,为了能够得到通知,客户端可以实现一个 ReturnListener 接口并调用 Channel.setReturnListener 设置接口. 如果客户端没有设置监听接口, 相关的被返回的消息会被默默的丢弃.

channel.setReturnListener(new ReturnListener() {
    public void handleBasicReturn(int replyCode,
                                  String replyText,
                                  String exchange,
                                  String routingKey,
                                  AMQP.BasicProperties properties,
                                  byte[] body)
    throws IOException {
        ...
    }
 });

一个返回的listener将会被调用,例如,如果客户端发送了一个”mandatory”的消息到一个”direct” exchange,如果exchange 没有绑定queue.

关闭协议 Shutdown Protocol

AMQP客户端关闭概论 Overview of the AMQP client shutdown

AMQP 0-9-1 connection 和 channel 共享一般的途径管理网络的失败,内部的失败和显式的关闭.

AMQP 0-9-1 connection 和 channel 有下面的生命周期状态:

  • open: 对象已经做好被使用的准备
  • closing: 对象已经被明确的通知被关闭,已经发送了一个关闭的请求到低层的对象, 正在等待低层的对象完成关闭.
  • closed: 对象已经收到从任何低层来的已经完成关闭过程的通知,因而对象关闭自己.

这些对象在关闭状态已经结束了,不管是什么原因引起的, 例如 客户端的请求, 内部的错误, 远程的网络请求或者网络失败。

AMQP connection 和 channel处理下面和关闭相关的方法:

  • addShutdownListener(ShutdownListener listener) and removeShutdownListener(ShutdownListener listener), 管理listener, listener将在对象的状态转变为”closed”的时候被触发. 备注: 增加一个 ShutdownListener 到一个已经被关闭的对象上, 将立刻触发listener.

  • getCloseReason(), 允许获得关闭的原因.

  • isOpen(), 用来测试是否对象处于打开状态.
  • close(int closeCode, String closeMessage),显式通知对象关闭.

listeners的简单的用途如下:

import com.rabbitmq.client.ShutdownSignalException;
import com.rabbitmq.client.ShutdownListener;

connection.addShutdownListener(new ShutdownListener() {
    public void shutdownCompleted(ShutdownSignalException cause)
    {
        ...
    }
});
关于关闭的环境信息

Information about the circumstances of a shutdown

客户端可能会收到 ShutdownSignalException, 异常包含了所有的可用的关于关闭的原因.可以调用 getCloseReason()方法或者在服务中使用原因参数(ShutdownSignalException cause) .

ShutdownSignalException 提供了分析关闭原因的方法. 通过调用 isHardError() 我们可以知道关闭是因为Connection错误还是Channel错误, getReason()返回相关的原因, 以 AMQP 方法返回, 返回方法如 AMQP.Channel.Close 或 AMQP.Connection.Close (or null如果是因为其他库异常,例如网络通信失败抛出异常,这种情况, 异常可以通过getCause()获取).

public void shutdownCompleted(ShutdownSignalException cause)
{
  if (cause.isHardError())
  {
    Connection conn = (Connection)cause.getReference();
    if (!cause.isInitiatedByApplication())
    {
      Method reason = cause.getReason();
      ...
    }
    ...
  } else {
    Channel ch = (Channel)cause.getReference();
    ...
  }
}
自动使用isOper方法

Atomicity and use of the isOpen() method

使用Connection和Channel的对生产上的代码是不推荐使用的, 因为isOpen的返回值依赖于是否存在关闭的原因。下面的代码展示了竞争状态下的一种可能性: The following code illustrates the possibility of race conditions:

public void brokenMethod(Channel channel)
{
    if (channel.isOpen())
    {
        // The following code depends on the channel being in open state.
        // However there is a possibility of the change in the channel state
        // between isOpen() and basicQos(1) call
        ...
        channel.basicQos(1);
    }
}

实际中,我们经常忽略这种检查, 简单的尝试我们希望的动作. 如果在运行的过程中,connection的channel被关闭了, 将会抛出一个 ShutdownSignalException 异常暗示对象处于一个错误的状态.当broker出乎意料的关闭了连接, 我们也可以捕获一个 IOException – 因为SocketException而抛出, 或者当Broker开始干净的关闭连接时,捕获 ShutdownSignalException.

public void validMethod(Channel channel)
{
    try {
        ...
        channel.basicQos(1);
    } catch (ShutdownSignalException sse) {
        // possibly check if channel was closed
        // by the time we started action and reasons for
        // closing it
        ...
    } catch (IOException ioe) {
        // check why connection was closed
        ...
    }
}

连接的高级选项

Advanced Connection options

消费线程池

Consumer thread pool

消费线程可以自动的被一个线程池分配, 默认的在一个ExecutorService的线程池中.如果需要更高级的控制,可以传递一个自定义的 ExecutorService的实例在 newConnection() 方法中. 下面提供了一个更大的线程池和默认的相比:

ExecutorService es = Executors.newFixedThreadPool(20);
Connection conn = factory.newConnection(es);

Executors 和 ExecutorService在 java.util.concurrent 包中.

当Connection被关闭的时候, 默认的ExecutorService也会被调用shutdown(). 但是如果用户自己提供了一个 ExecutorService (例如上面) 将不会被shutdown().客户端需要保证自己提供的ExecutorService最终被执行shutdown 方法.

相同的 executor可以在多个 connections直接共享, 或者在 re-used on re-connection 中使用,但是不能在被调用shutdown()之后使用.

只有在一个有明显的证据证明消费回调出现瓶颈的时候才使用这种特性.如果没有consumer或者很少的consumer callback方法被调用,默认的更有效.

使用主机列表

Using Lists of Hosts

存在一种情况,传递一个 Address 列表到 newConnection(). 一个 Address 是一个简单的方便的类在 com.rabbitmq.client 包中,构造函数包含host和port. 例如:

Address[] addrArr = new Address[]{ new Address(hostname1, portnumber1)
                                 , new Address(hostname2, portnumber2)};
Connection conn = factory.newConnection(addrArr);

将尝试连接 hostname1:portnumber1,如果连接失败,尝试连接 hostname2:portnumber2. 返回第一个成功的连接(不会抛IOException异常). 这相当于在factory中重复设置host, port,并调用factory.newConnection(),直到其中一次成功.

如果再提供一个ExecutorService可以使用 factory.newConnection(es, addrArr)) 方法. 线程池和第一个成功的连接相关.

心跳

Heartbeat Timeout

可以通过下面的api设置:

ConnectionFactory cf = new ConnectionFactory();

// set the heartbeat timeout to 60 seconds
cf.setRequestedHeartbeat(60);
自定义线程工厂

Custom Thread Factories

有些环境下例如 Google App Engine (GAE)会严格限制线程的初始化,在这种环境下使用RabbitMQ的客户端, 有必要配置一个自定义的线程工厂,使用一个适当的方法实例化线程, e.g. GAE’s ThreadManager. 下面是一个 Google App Engine的例子.

import com.google.appengine.api.ThreadManager;

ConnectionFactory cf = new ConnectionFactory();
cf.setThreadFactory(ThreadManager.backgroundThreadFactory());

网络失败中的自动恢复

Automatic Recovery From Network Failures

连接的恢复

Connection Recovery

网络连接在客户端和RabbitMQ节点之间可能失败. RabbitMQ Java 客户端提供了连接和拓扑的自动恢复. 对许多应用而已, 自动恢复过程遵循下面的步骤: supports automatic recovery of connections and topology (queues, exchanges, bindings, and consumers). The automatic recovery process for many applications follows the following steps:

  • 重新 Connection
  • 恢复连接 listeners
  • 重新打开 channels
  • 恢复 channel listeners
  • 恢复 channel basic.qos 设置, 发布者 confirms 和 transaction 的设置

拓扑的恢复包括下面的动作,在每一个channel上执行

  • 重新声明 exchanges (except for predefined ones)
  • 重新声明 queues
  • 恢复所有 bindings
  • 恢复所有 consumers

为了能够使连接自动恢复, 使用 factory.setAutomaticRecoveryEnabled(true):

ConnectionFactory factory = new ConnectionFactory();

factory.setUsername(userName);

factory.setPassword(password);

factory.setVirtualHost(virtualHost);

factory.setHost(hostName);

factory.setPort(portNumber);

factory.setAutomaticRecoveryEnabled(true);

// connection that will recover automatically

Connection conn = factory.newConnection();

如果因为某种异常,自动恢复失败了(例如. RabbitMQ 节点依然不能够达), 它将在一个固定的时间进行重试 (默认是5s).时间间隔是可以配置的:

ConnectionFactory factory = new ConnectionFactory();
// attempt recovery every 10 seconds
factory.setNetworkRecoveryInterval(10000);

如果提供了一个地址列表,地址将会被轮流使用:

ConnectionFactory factory = new ConnectionFactory();

Address[] addresses = {new Address("192.168.1.4"), new Address("192.168.1.5")};
factory.newConnection(addresses);
恢复监听器

Recovery Listeners

可以注册一个或者多个恢复监听器在可自动恢复的连接和渠道上,只有设置了自动恢复才可以注册.

addRecoveryListener

removeRecoveryListener

备注: 为了使用这些方法, 需要将连接或渠道 转换为 Recoverable 对象.

发布的影响 Effects on Publishing

使用 Channel.basicPublish 发布的消息当连接死了的时候就会丢失时. 客户端不能保证连接恢复的时候消息会正常重新发送. 为了确保消息到大RabbitMQ, 程序应该考虑到这种情况,使用发布确认机制.

拓扑的恢复

Topology Recovery

拓扑恢复包括 exchanges, queues, bindings 和 consumers的恢复, 默认是enabled, 但是可以禁止它.

ConnectionFactory factory = new ConnectionFactory();

Connection conn = factory.newConnection();
factory.setAutomaticRecoveryEnabled(true);
factory.setTopologyRecoveryEnabled(false);

M####手工确认和自动恢复####

当使用手工确认的时候, 网络可能在消息发送和确认之间失败. 在工厂恢复后, RabbitMQ将重设发送标记. 这意味着 basic.ack, basic.nack, and basic.reject 会产生一个channel 异常. 为了避免这个, RabbitMQ java客户端需要跟踪和持续更新delivery tag,使他们平滑增长.

Channel.basicAck, Channel.basicNack, 和 Channel.basicReject 之后 转换调整的delivery tags 到使用 RabbitMQ的程序. 使用手工确认和自动恢复的必须能够处理重复的消息.

未处理的异常

Unhandled Exceptions

和 connection, channel, recovery, and consumer 生命循环相关的异常的处理被代理给一个异常Handler. Exception handler 是一个实现了 ExceptionHandler 接口的对象.默认使用一个 DefaultExceptionHandler的实例. 它把异常信息打印到标准输出.

It is possible to override the handler using ConnectionFactory#setExceptionHandler. It will be used for all connections created by the factory:

ConnectionFactory factory = new ConnectionFactory();
cf.setExceptionHandler(customHandler);

异常处理被用来打印异常.

警告和限制

Caveats and Limitations

为了拓扑恢复的可能性, RabbitMQ Java客户端必须维护一个声明的 queues, exchanges, and bindings的缓存. The cache is per-connection. 某些RabbitMQ的特性使客户端没有办法观察到一些拓宽的变化,例如一个queue被删除了,因为TTL. RabbitMQ Java 客户端可以通过下面几种最普通的方式确认被缓存的对象无效:

  • 当一个queue被删除了.
  • 当一个exchange被删除了.
  • 当一个binding被删除了.
  • 当 consumer 被自动删除的queue取消了.
  • 当queue或者exchange的绑定被解除了,因为一个自动删除的exchange.

然而, 客户端不能在一个连接之外追踪这些拓扑的变化,依靠自动删除queue或exchange的应用,在使用自动连接恢复的时候, 应该显式的删除那些不使用的或者自动删除的实体,并更新客户端的缓存.这种功能可以通过 Channel#queueDelete, Channel#exchangeDelete, Channel#queueUnbind, and Channel#exchangeUnbind 被删除在 RabbitMQ 3.3.x 版本(删除不存在的不会产生异常).

Java doc


http://www.rabbitmq.com/releases/rabbitmq-java-client/v3.6.1/rabbitmq-java-client-javadoc-3.6.1/


http://rabbitmq-into-chinese.readthedocs.org/