1、maven依赖
<!-- rabbitMQ依赖 -->
<!-- spring相关的其他依赖请参考源码,此处不做过多描述 -->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>1.4.6.RELEASE</version>
</dependency>
2
、
spring-rabbit
配置
2.1、配置rabbitMQ连接
(1)采用<rabbit/>方式配置连接
<rabbit:connection-factory id="connectionFactory"
host="${rabbitmq.host}"
port="${rabbitmq.port}"
username="${rabbitmq.username}"
password="${rabbitmq.password}"
virtual-host=""
publisher-confirms="true"
publisher-returns="true"
channel-cache-size="5"
/>
(2)采用普通方式配置连接
<bean id="connectionFactory"
class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
<constructor-arg value="${rabbitmq.host}" />
<property name="username" value="${rabbitmq.username}" />
<property name="password" value="${rabbitmq.password}" />
<property name="port" value="${rabbitmq.port}" />
<!-- 缓存中要维护的通道数 -->
<property name="channelCacheSize" value="5" />
<!-- 开启发送确认机制 -->
<property name="publisherConfirms" value="true"/>
<!-- 开启结果返回机制 -->
<property name="publisherReturns" value="true"/>
</bean>
2.2、配置RabbitAdmin
<rabbit:admin connection-factory="connectionFactory" />
2.3、定义Queue
<rabbit:queue name="queueTest" durable="true" auto-delete="false"
exclusive="false" />
常用参数解释:
durable:是否持久化
auto-delete:是否当没有连接时自动删除
exclusive:是否只能由创建者使用
2.4、定义Exchange,并绑定Queue
(1)定义direct类型
<rabbit:direct-exchange name="exchangeTest"
durable="true" auto-delete="false">
<rabbit:bindings>
<!-- 此处没有指定RoutingKey -->
<rabbit:binding queue="queueTest" key=""></rabbit:binding>
</rabbit:bindings>
</rabbit:direct-exchange>
(2)定义topic类型
<rabbit:topic-exchange name="topicexchangetest" durable="true" auto-delete="false">
<rabbit:bindings>
<rabbit:binding queue="" key=""></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>
(3)定义fanout类型
<rabbit:fanout-exchange name=""></rabbit:topic-exchange>
(4)定义headers类型
<rabbit:headers-exchange name=""></rabbit:topic-exchange>
常用参数解释:
durable:是否持久化
auto-delete:是否当没有连接时自动删除
注意:在此例中,如果不把Exchange和queue进行绑定,发送消息的时候ConfirmCallback依然正常执行,因为消息到达了Exchange。但是ReturnCallback就会执行回调方法,传回错误信息:NO_ROUTE。Exchange没有找到指定的Queue,丢弃这条消息并把消息返回给生产者。回调顺序是ReturnCallback在前,ConfirmCallback在后。
2.5、定义rabbit template
(1)普通方式配置rabbitTemplate
<bean class="org.springframework.amqp.rabbit.core.RabbitTemplate" id="rabbitTemplate">
<property name="connectionFactory" ref="connectionFactory" />
<property name="exchange" value="exchangeTest" />
<!-- 启动AMQP协议层面事务机制来解决发送确认机制,但是采用事务机制实现会降低RabbitMQ的消息吞吐量。
RabbitMQ团队为我们拿出了更好的方案,即采用发送方确认模式,事务机制和confirmCallback只能二选一 -->
<!-- <property name="channelTransacted" value="true" /> -->
<!-- mandatory 监听是否有符合的队列 -->
<property name="mandatory" value="true"/>
<!-- 设置发送确认回执监听方法 -->
<property name="confirmCallback" ref="confirmcallback" />
<!-- 设置结果返回监听方法 -->
<property name="returnCallback" ref="MyReturnCallback"/>
<!-- 设置消息转换 -->
<property name="messageConverter" ref="JsonMessageConverter" />
</bean>
(2)<rabbit/>方式配置rabbitTemplate和相关监听器
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"
exchange="exchangeTest"
mandatory="true"
confirm-callback="confirmcallback"
return-callback="MyReturnCallback"
encoding="UTF-8"
message-converter="JsonMessageConverter"
>
</rabbit:template>
常用参数解释:
mandatory:监听是否有符合的队列
confirm-callback:消息确认回调类
return-callback:mandatory监听结果回调类
encoding:编码
message-converter:消息转化类
(1)其他相关配置
<bean id="JsonMessageConverter" class="org.springframework.amqp.support.converter.JsonMessageConverter">
<!-- if necessary, override the DefaultClassMapper -->
<property name="classMapper" ref="customClassmapper" />
</bean>
<bean id="customClassmapper" class="pruducer.com.taikang.tkh.message.common.CustomClassMapper"/>
<bean id="confirmcallback" class="pruducer.com.taikang.tkh.message.common.ConfirmReturnBack"/>
<bean id="MyReturnCallback" class="pruducer.com.taikang.tkh.message.common.MyReturnCallback"/>
2.6配置Consumer
<rabbit:listener-container
prefetch="10"
connection-factory="connectionFactory"
acknowledge="manual">
<rabbit:listener queues="queueTest" ref="messageReceiver" />
</rabbit:listener-container>
<!-- 自定义消息接收者 -->
<bean id="messageReceiver" class="consumer.com.taikang.tkh.message.listener.MessageConsumer"></bean>
常用参数解释:
prefetch:消息预取数目为每次接收10条。
acknowledge:acknowledge=”manual”:意为表示该消费者的ack方式为手动 ;acknowledge=”auto”表示自动。
3、spring-rabbit常用方法介绍
3.1、消息发送
/**
* 发送消息到默认的交换机和队列(不带有自定义参数)
* @param messageObject 消息对象
* @return boolean 发送标记
*/
RabbitTemplate.convertAndSend(messageObject);
/**
* 发送消息到默认的交换机和队列
* @param messageObject 消息对象
* @param messageObject 自定义参数,在监听器ConfirmCallback中可以取到。
* @return boolean 发送标记
*/
RabbitTemplate.correlationConvertAndSend(messageObject,correlationdata);
/**
* 发送消息到指定的队列
* @param queue 队列名称
* @param messageObject 消息对象
* @param messageObject 自定义参数,在监听器ConfirmCallback中可以取到。
* @return boolean 发送标记
*/
RabbitTemplate.convertAndSend(queue, messageObject,correlationdata);
/**
* 发送消息到指定的交换机和队列
* @param exchange 交换机名称
* @param queue 队列名称
* @param messageObject 自定义参数,在监听器ConfirmCallback中可以取到。
* @return boolean 发送标记
*/
RabbitTemplate.convertAndSend(exchange,queue,messageObject,correlationdata);
/**
* 发送消息到默认的交换机和队列(不带有自定义参数)
Send方法还有很多,此处只列举一种
* @param Message AMQP封装的消息对象
* @return void
*/
RabbitTemplate.send(Message message);
注意
:
凡是带有
c
onvert
的系统都会自动把消息转换为
AMQP的Message对象;
没有convert的需要自己将发送的对象转换为Message对象。
3.2、Confirm监听
public class ConfirmReturnBack implements ConfirmCallback{
/**
* Confirmation callback.
* @param correlationData 回调的相关数据.
* @param ack true for ack, false for nack
* @param cause 专门给NACK准备的一个可选的原因,其他情况为null。
*/
public void confirm(CorrelationData correlationdata, boolean ack, String cause) {
System.out.println("Exchange接收是否成功(ack): " + ack + "。 返回的用户参数(correlationData): " + correlationdata + "。NACK原因(cause) : " + cause);
}
}
注意:CorrelationData 是在发送消息时传入回调方法的参数,可以用于区分消息对象。 CorrelationData对象中只有一个属性 String id。通过这个参数,我们可以区分当前是发送哪一条消息时的回调,并通过ack参数来进行失败重发功能。
3.3、Return监听
/**
* 实现此方法在basicpublish失败时回调
* 相当于 ReturnListener的功能。
* 在发布消息时设置mandatory等于true,
* 监听消息是否有相匹配的队列,没有时ReturnCallback将执行returnedMessage方法,消息将返给发送者
*/
public class MyReturnCallback implements ReturnCallback {
public void returnedMessage(Message message, int replyCode, String replyText,
String exchange, String routingKey) {
try {
System.out.println("消息发送进指定队列失败:失败原因(+replyText):"+replyText
+";错误代码(replyCode):"+replyCode
+";消息对象:"+new String(message.getBody(),"UTF-8")
+"exchange:"+exchange
+"routingKey:"+routingKey);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
}
3.4、消息处理
/**
* 消息到达消费者监听类
*/
public class MessageConsumer implements ChannelAwareMessageListener {
/**
* 处理收到的rabbit消息的回调方法。
* @param message AMQP封装消息对象
* @param channel 信道对象,可以进行确认回复
* @throws Exception Any.
*/
public void onMessage(Message message, Channel channel) throws Exception {
try {
String srt2=new String(message.getBody(),"UTF-8");
System.out.println("消费者收到消息:"+srt2);
//成功应答
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
e.printStackTrace();
//失败应答
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
}
}
}
注意:ChannelAwareMessageListener和MessageListener两个接口都实现对消息到达消费者时的监听,只不过MessageListener的onMessage方法没有Channel参数,不能实现消息手动应答功能。
4、Demo项目完整代码
上一篇文章:《RabbitMQ学习(二):Java使用RabbitMQ》
【四川乐山程序员联盟,欢迎大家加群相互交流学习5 7 1 8 1 4 7 4 3】