1.项目是 Spring的 版本比较老是3.1.4的,所以使用的Rabbit的jar 也要老一点我用的是1.3.5的
先导入jar包
<dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>1.3.5.RELEASE</version> </dependency>
2. 安装Rabbit windos版本 便于测试 rabbit有两个端口 一个是15672的这个端口是web管理端口,一个是5672的工厂链接端口
3.创建rabbitMq.properties配置文件
rmq.ip=locahost rmq.port=5672 rmq.producer.num=20 rmq.manager.user=用户名 rmq.manager.password=密码 rmq.host=通道名称
4.web打开 locahost:15672管理页面 默认账号密码都是guest
5.配置一个自己的账户 进入admin选项卡 选中Add a user 输入 Username和Password 然后Adduser记得配置权限
6.创建两个交换机 一个是正常交换,一个是死信交换,死信交换是用来做延时队列的,原理就是生产者生产一条定时消息1分钟,1分钟到了以后该信息将被交换至死信息中,消费者端监听死信交换机,死信中有了消息以后会响应消费者端
进入Exchanges选项卡,选择Add new exchange 只需要输入name 即可 选择 Add exchange保存
交换机名称就叫exchange_order ,然后同样的在创建一个die_exchange_order 交换机,创建完了以后,再去创建两个队列创建一个 queue_order队列 这个队列是正常队列,我们需要先输入Name 然后设置规则Arguments
1.x-dead-letter-exchange:die_exchange_order //这个是死信交换机名称
2.x-dead-letter-routing-key:交换机路由 //这个是死信交换机路由
3.x-message-ttl:60000 //这个是过期时间毫秒
然后保存,保存完了以后选择下面Dindings 绑定一个交换机 From exchange 交换机名称 这里需要绑定正常的交换机exchange_order 然后是Routing key 正常的交换机路由Routing key下面一点再说
创建完queue_order队列后,再创建一个die_queue_order死信队列,这个就不需要设置规则了,直接去绑定一个交换机 From exchange 这里就要填写为die_exchange_order了
创建完两个队列以后,在切换到Exchanges选项卡,选择第一个exchange_order然后选择Add binding from this exchange 绑定队列Toqueue,exchange_order需要绑定queue_order队列然后设置Routing key 的值,记得Exchanges说的是设置,在Queue中我说的是绑定,同样的操作一次die_exchange_order 绑定队列 die_queue_order,都操作完了以后 我们开始项目配置了
7.spring 配置 需要理解流程 : 链接Rabbit 需要一个工厂,生产者链接工厂打开通道,通过交换机的名称和路由key获取往队列中存储数据。那么我们就需要先配置工厂
<bean id="connectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
<constructor-arg value="localhost" />
<property name="username" value="${rmq.manager.user}" />
<property name="password" value="${rmq.manager.password}" />
<property name="host" value="${rmq.ip}" />
<property name="port" value="${rmq.port}" />
<property name="virtualHost" value="${rmq.host}" />
</bean>
工厂配置完成以后,我们声明队列 这里配置的是生产者和消费者在同一个服务端所以配两个
<rabbit:queue name="queue_order" durable="true" auto-delete="false" exclusive="false" />
<rabbit:queue name="die_queue_order" durable="true" auto-delete="false" exclusive="false" />
队列声明完成以后,需要声明一个工厂消息序列化
<bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" />
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory" message-converter="jsonMessageConverter" />
然后我们需要声明一个 生产者服务,一个消费者服务
<bean id="生产者类名首字母小写(服务)" class="类路径.生产者类名手首字母大写"></bean>
<bean id="消费者类名首字母小写(服务)" class="类路径.消费者类名手首字母大写"></bean>
然后生产者功能已经完成了 可以正常发送消息,但是消费者还没有完,消费者还需要监听,这里我们需要监听的是死信队列
<bean id="QueueListenerAdapter" class="org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter" >
<constructor-arg ref="消费者类名(服务)(指向刚刚配置的消费者Id)" />
<property name="defaultListenerMethod" value="consumerOrder"></property>
<property name="messageConverter" ref="jsonMessageConverter"></property>
</bean>
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto" concurrency="20">
<rabbit:listener queues="死信队列的名称die_queue_order" ref="QueueListenerAdapter" />
</rabbit:listener-container>
配置已经完成了
8.编写生产者类,和消费者类
/**
* 生产者
*/
public class RabbitProduce {
private static org.slf4j.Logger log = LoggerFactory.getLogger(RabbitProduce.class);
@Resource
private RabbitTemplate rabbitTemplate;
private final String Order_Exchanges_Name="正常交换机的名称 queue_order";
private final String Order_Exchanges_Key="正常交换机的路由 ";
/**
* CommonMessage 就是一个普通的实体类,可以自己写一些属性
*/
public void sendMessage(CommonMessage msg){
try {
log.error("发送信息开始");
System.out.println(rabbitTemplate.getConnectionFactory().getHost());
//发送信息 queueName交换机,就是上面的routingKey msg.getSource() 为 test_key
rabbitTemplate.convertAndSend(Order_Exchanges_Name,Order_Exchanges_Key, msg);
//如果是普通字符串消息需要先序列化,再发送消息
//rabbitTemplate.convertAndSend(queueName,msg.getSource(), SerializationUtils.serialize(msg));
log.error("发送信息结束");
} catch (Exception e) {
e.printStackTrace();
}
}
}
/**
* 消费者
*/
public class RabbitConsumer {
@Resource
private RabbitTemplate rabbitTemplate;
/**
* 消费
* */
public void consumerOrder(CommonMessage message) throws IOException {
//message 的信息就是 死信过来的信息 后面执行自己的逻辑
}
}
9.其中大部分人都会碰到的问题,就是注入失败,所以消费者类和生产者类 一定要放在spring容器能扫描得到的包,不然@Autowired 无法注入