自动创建队列、交换机和绑定
一、pom.xml
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.0.5.RELEASE</version>
</dependency>
二、发送者(producer)
@Resource
private RabbitAdmin rabbitAdmin;
@Override
protected void ext() {
// 创建交换机
rabbitAdmin.declareExchange(new DirectExchange("exchangeName", true, false));
// 创建队列
rabbitAdmin.declareQueue(new Queue("queueName", true));
// 绑定队列到交换机
rabbitAdmin.declareBinding(new Binding("queueName",
Binding.DestinationType.QUEUE, "exchangeName",
"routingKey", new HashMap<>()));
}
发送消息封装
/**
* 发送消息接口
*/
public interface IMessageQueueProducer<T extends Message> {
void push(T var1);
}
/**
* 发送消息实现
* message必须设置Exchange和Routing,否则会使用默认的交换机和路由key
*/
@Component
public class RabbitMessageQueueProducer<T extends Message<?>> implements IMessageQueueProducer<T> {
private Logger logger = LoggerFactory.getLogger(this.getClass());
@Autowired
private RabbitTemplate rabbitTemplate;
public RabbitMessageQueueProducer() {
}
public void push(T message) {
String exchange = "exchangeName";
String routing = "routingKey";
if (StrUtil.isNotBlank(message.getExchange())) {
exchange = message.getExchange();
}
if (StrUtil.isNotBlank(message.getRouting())) {
routing = message.getRouting();
}
this.logger.debug("send to exchange {}, routing {} of message {}.", new Object[]{exchange, routing, message});
this.rabbitTemplate.convertAndSend(exchange, routing, message);
}
}
/**
* 发送消息实体接口
*/
@JsonDeserialize(
as = DefaultMessage.class
)
public interface Message<D extends Serializable> extends Serializable {
String getTenantId();
void setCommunicationType(String var1);
void setMessageType(String var1);
void setCommandType(String var1);
String getExchange();
void setExchange(String var1);
String getRouting();
void setRouting(String var1);
String getQueue();
void setQueue(String var1);
String getSenderId();
String getSenderName();
List<String> getReceivers();
List<String> getReceiverNames();
MessageTemplate getTemplate();
D getData();
void setData(D var1);
Map<String, Object> getVariables();
}
/**
* 发送消息实体实现
*/
@Data
public class DefaultMessage<D extends Serializable> implements Message<D> {
private static final long serialVersionUID = 1L;
private String tenantId;
private String exchange;
private String routing;
private String queue;
private String communicationType;
private String commandType;
private String messageType;
private String messageTypeName;
private String senderId;
private String senderName;
private List<String> receivers;
private List<String> receiverNames;
private MessageTemplate template;
private D data;
private Map<String, Object> variables;
}
发送消息
@Autowired
private IMessageQueueProducer<DefaultMessage<?>> messageQueueProducer;
......
Message<String> message = new DefaultMessage<>();
message.setExchange("exchangeName");
message.setRouting("routingKey");
message.setData("test helllo");
messageQueueProducer.push(message);
三、消费者(consumer)
方式一:
@RabbitListener(bindings = {
@QueueBinding(
exchange = @Exchange(value = "exchangeName"),
key = "routingKey",
value = @Queue(value = "queueName")
)
})
public void consume(Message message) {
// do something
}
方式二:
@RabbitListener(queues = "queueName")
public void consume(Message message) {
// do something
}
推荐第一种方式,防止发送方服务自动创建还未完成
建议:
1、这里的Message都是自定义的类,在发送消息时会自动转换成spring的Message
2、建议交换机、队列、路由Key使用常量(不会经常变),减少配置文件的配置项
对有帮助的小伙伴记得点个赞哟~
版权声明:本文为a251628111原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。