RabbitMQ学习(三):Spring整合RabbitMQ

  • Post author:
  • Post category:其他

1、maven依赖

<!-- rabbitMQ依赖 -->
<!-- spring相关的其他依赖请参考源码,此处不做过多描述 -->
<dependency>
    <groupId>org.springframework.amqp</groupId>
    <artifactId>spring-rabbit</artifactId>
    <version>1.4.6.RELEASE</version>
</dependency>

2

spring-rabbit
配置

2.1、配置rabbitMQ连接

(1)采用<rabbit/>方式配置连接

<rabbit:connection-factory id="connectionFactory" 
host="${rabbitmq.host}" 
port="${rabbitmq.port}" 
        username="${rabbitmq.username}" 
        password="${rabbitmq.password}" 
        virtual-host=""
        publisher-confirms="true"
        publisher-returns="true"
        channel-cache-size="5"
        />

(2)采用普通方式配置连接

<bean id="connectionFactory"
	class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
		<constructor-arg value="${rabbitmq.host}" />
		<property name="username" value="${rabbitmq.username}" />
		<property name="password" value="${rabbitmq.password}" />
		<property name="port" value="${rabbitmq.port}" />
		<!-- 缓存中要维护的通道数 -->
		<property name="channelCacheSize" value="5" />
		<!-- 开启发送确认机制 -->
		<property name="publisherConfirms" value="true"/>
		<!-- 开启结果返回机制 -->
		<property name="publisherReturns" value="true"/>
	</bean>

2.2、配置RabbitAdmin

<rabbit:admin connection-factory="connectionFactory" />

2.3、定义Queue

<rabbit:queue name="queueTest" durable="true" auto-delete="false"
exclusive="false" />

常用参数解释:

durable:是否持久化

auto-delete:是否当没有连接时自动删除

exclusive:是否只能由创建者使用


2.4、定义Exchange,并绑定Queue

(1)定义direct类型

<rabbit:direct-exchange name="exchangeTest"
		durable="true" auto-delete="false">
	<rabbit:bindings>
<!-- 此处没有指定RoutingKey -->
			<rabbit:binding queue="queueTest" key=""></rabbit:binding>
		</rabbit:bindings>
</rabbit:direct-exchange>

(2)定义topic类型

<rabbit:topic-exchange name="topicexchangetest" durable="true" auto-delete="false">
		<rabbit:bindings>
			<rabbit:binding queue="" key=""></rabbit:binding>
		</rabbit:bindings>
</rabbit:topic-exchange>

(3)定义fanout类型

<rabbit:fanout-exchange name=""></rabbit:topic-exchange>

(4)定义headers类型

<rabbit:headers-exchange name=""></rabbit:topic-exchange>

常用参数解释:

durable:是否持久化

auto-delete:是否当没有连接时自动删除

注意:在此例中,如果不把Exchange和queue进行绑定,发送消息的时候ConfirmCallback依然正常执行,因为消息到达了Exchange。但是ReturnCallback就会执行回调方法,传回错误信息:NO_ROUTEExchange没有找到指定的Queue,丢弃这条消息并把消息返回给生产者。回调顺序是ReturnCallback在前,ConfirmCallback在后。


2.5、定义rabbit template

(1)普通方式配置rabbitTemplate

<bean class="org.springframework.amqp.rabbit.core.RabbitTemplate" id="rabbitTemplate">
		<property name="connectionFactory" ref="connectionFactory" />
		<property name="exchange" value="exchangeTest" />
		
		<!-- 启动AMQP协议层面事务机制来解决发送确认机制,但是采用事务机制实现会降低RabbitMQ的消息吞吐量。
		RabbitMQ团队为我们拿出了更好的方案,即采用发送方确认模式,事务机制和confirmCallback只能二选一 -->
		<!-- <property name="channelTransacted" value="true" /> -->

		<!-- mandatory 监听是否有符合的队列 -->
		<property name="mandatory" value="true"/>
		
		<!-- 设置发送确认回执监听方法 -->
		<property name="confirmCallback" ref="confirmcallback" /> 
		<!-- 设置结果返回监听方法 -->
		<property name="returnCallback" ref="MyReturnCallback"/>
		<!-- 设置消息转换 -->
		<property name="messageConverter" ref="JsonMessageConverter" />
	</bean>

(2)<rabbit/>方式配置rabbitTemplate和相关监听器

<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory" 
		exchange="exchangeTest" 
		mandatory="true" 
		confirm-callback="confirmcallback"
		return-callback="MyReturnCallback" 
		encoding="UTF-8"
		message-converter="JsonMessageConverter"
		>
	</rabbit:template>

 

常用参数解释:

mandatory:监听是否有符合的队列

confirm-callback:消息确认回调类

return-callback:mandatory监听结果回调类

encoding:编码

message-converter:消息转化类

 

(1)其他相关配置

<bean id="JsonMessageConverter" class="org.springframework.amqp.support.converter.JsonMessageConverter">
    <!-- if necessary, override the DefaultClassMapper -->
    <property name="classMapper" ref="customClassmapper" />
</bean>


<bean id="customClassmapper" class="pruducer.com.taikang.tkh.message.common.CustomClassMapper"/>

 

<bean id="confirmcallback" class="pruducer.com.taikang.tkh.message.common.ConfirmReturnBack"/>

 

<bean id="MyReturnCallback" class="pruducer.com.taikang.tkh.message.common.MyReturnCallback"/>

2.6配置Consumer

<rabbit:listener-container  
	prefetch="10" 
	connection-factory="connectionFactory"  
	acknowledge="manual">
	<rabbit:listener  queues="queueTest" ref="messageReceiver" />
</rabbit:listener-container>

<!-- 自定义消息接收者 -->
<bean id="messageReceiver" class="consumer.com.taikang.tkh.message.listener.MessageConsumer"></bean>

 

常用参数解释:

prefetch:消息预取数目为每次接收10条。

acknowledge:acknowledge=”manual”:意为表示该消费者的ack方式为手动 ;acknowledge=”auto”表示自动。


3、spring-rabbit常用方法介绍

3.1、消息发送

/**
* 发送消息到默认的交换机和队列(不带有自定义参数)
* @param messageObject 消息对象
* @return boolean 发送标记 
*/
RabbitTemplate.convertAndSend(messageObject);
 
/**
* 发送消息到默认的交换机和队列
* @param messageObject 消息对象
* @param messageObject 自定义参数,在监听器ConfirmCallback中可以取到。
* @return boolean 发送标记 
*/
RabbitTemplate.correlationConvertAndSend(messageObject,correlationdata);
 
/**
* 发送消息到指定的队列
* @param queue           队列名称
* @param messageObject   消息对象
* @param messageObject 自定义参数,在监听器ConfirmCallback中可以取到。
* @return boolean 发送标记 
*/
RabbitTemplate.convertAndSend(queue, messageObject,correlationdata);
 
/**
* 发送消息到指定的交换机和队列
* @param exchange       交换机名称
* @param queue          队列名称
* @param messageObject 自定义参数,在监听器ConfirmCallback中可以取到。
* @return boolean 发送标记 
*/
RabbitTemplate.convertAndSend(exchange,queue,messageObject,correlationdata);
 
/**
* 发送消息到默认的交换机和队列(不带有自定义参数)
Send方法还有很多,此处只列举一种
* @param Message AMQP封装的消息对象
* @return void 
*/
RabbitTemplate.send(Message message);

注意

凡是带有
c
onvert
的系统都会自动把消息转换为
AMQP的Message对象;

没有convert的需要自己将发送的对象转换为Message对象。


3.2、Confirm监听

public class ConfirmReturnBack implements ConfirmCallback{
	/**
	 * Confirmation callback.
	 * @param correlationData 回调的相关数据.
	 * @param ack true for ack, false for nack
	 * @param cause 专门给NACK准备的一个可选的原因,其他情况为null。
	 */
	public void confirm(CorrelationData correlationdata, boolean ack, String cause) {
		System.out.println("Exchange接收是否成功(ack): " + ack + "。 返回的用户参数(correlationData): " + correlationdata + "。NACK原因(cause) : " + cause);
	}
}

注意CorrelationData 是在发送消息时传入回调方法的参数,可以用于区分消息对象。 CorrelationData对象中只有一个属性 String id。通过这个参数,我们可以区分当前是发送哪一条消息时的回调,并通过ack参数来进行失败重发功能。

3.3、Return监听

/**
 * 实现此方法在basicpublish失败时回调
 * 相当于 ReturnListener的功能。
 * 在发布消息时设置mandatory等于true,
 * 监听消息是否有相匹配的队列,没有时ReturnCallback将执行returnedMessage方法,消息将返给发送者 
 */
public class MyReturnCallback implements ReturnCallback {

	public void returnedMessage(Message message, int replyCode, String replyText, 
			String exchange, String routingKey) {
		try {
			System.out.println("消息发送进指定队列失败:失败原因(+replyText):"+replyText
					+";错误代码(replyCode):"+replyCode
					+";消息对象:"+new String(message.getBody(),"UTF-8")
					+"exchange:"+exchange
					+"routingKey:"+routingKey);
		} catch (UnsupportedEncodingException e) {
			e.printStackTrace();
		}
	}
}

3.4、消息处理

/**
 * 消息到达消费者监听类
 */
public class MessageConsumer implements ChannelAwareMessageListener {
	/**
	 * 处理收到的rabbit消息的回调方法。
	 * @param message AMQP封装消息对象
	 * @param channel 信道对象,可以进行确认回复
	 * @throws Exception Any.
	 */
	public void onMessage(Message message, Channel channel) throws Exception {
		try {
			String srt2=new String(message.getBody(),"UTF-8");
			System.out.println("消费者收到消息:"+srt2);
			//成功应答
			channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
		} catch (Exception e) {
			e.printStackTrace();
			//失败应答
			channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
		}
	}
}

注意ChannelAwareMessageListenerMessageListener两个接口都实现对消息到达消费者时的监听,只不过MessageListeneronMessage方法没有Channel参数,不能实现消息手动应答功能。

 

4、Demo项目完整代码

 Spring整合RabbitMQ完整项目源码.rar


上一篇文章:《RabbitMQ学习(二):Java使用RabbitMQ



【四川乐山程序员联盟,欢迎大家加群相互交流学习5 7 1 8 1 4 7 4 3】




版权声明:本文为LeiXiaoTao_Java原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。