RabbitMq发送和接收消息(自动创建队列、交换机和绑定)

  • Post author:
  • Post category:其他




自动创建队列、交换机和绑定



一、pom.xml

<dependency>
    <groupId>org.springframework.amqp</groupId>
    <artifactId>spring-rabbit</artifactId>
    <version>2.0.5.RELEASE</version>
</dependency>



二、发送者(producer)

@Resource
private RabbitAdmin rabbitAdmin;
@Override
protected void ext() {
    // 创建交换机
    rabbitAdmin.declareExchange(new DirectExchange("exchangeName", true, false));
    // 创建队列
    rabbitAdmin.declareQueue(new Queue("queueName", true));
    // 绑定队列到交换机
    rabbitAdmin.declareBinding(new Binding("queueName",
            Binding.DestinationType.QUEUE, "exchangeName",
            "routingKey", new HashMap<>()));
}



发送消息封装

/**
 * 发送消息接口
 */
public interface IMessageQueueProducer<T extends Message> {
    void push(T var1);
}

/**
 * 发送消息实现
 * message必须设置Exchange和Routing,否则会使用默认的交换机和路由key
 */
@Component
public class RabbitMessageQueueProducer<T extends Message<?>> implements IMessageQueueProducer<T> {
    private Logger logger = LoggerFactory.getLogger(this.getClass());
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public RabbitMessageQueueProducer() {
    }

    public void push(T message) {
        String exchange = "exchangeName";
        String routing = "routingKey";
        if (StrUtil.isNotBlank(message.getExchange())) {
            exchange = message.getExchange();
        }

        if (StrUtil.isNotBlank(message.getRouting())) {
            routing = message.getRouting();
        }

        this.logger.debug("send to exchange {}, routing {} of message {}.", new Object[]{exchange, routing, message});
        this.rabbitTemplate.convertAndSend(exchange, routing, message);
    }
}


/**
 * 发送消息实体接口
 */
@JsonDeserialize(
    as = DefaultMessage.class
)
public interface Message<D extends Serializable> extends Serializable {
    String getTenantId();

    void setCommunicationType(String var1);

    void setMessageType(String var1);

    void setCommandType(String var1);

    String getExchange();

    void setExchange(String var1);

    String getRouting();

    void setRouting(String var1);

    String getQueue();

    void setQueue(String var1);

    String getSenderId();

    String getSenderName();

    List<String> getReceivers();

    List<String> getReceiverNames();

    MessageTemplate getTemplate();

    D getData();

    void setData(D var1);

    Map<String, Object> getVariables();
}

/**
 * 发送消息实体实现
 */
@Data
public class DefaultMessage<D extends Serializable> implements Message<D> {
    private static final long serialVersionUID = 1L;
    private String tenantId;
    private String exchange;
    private String routing;
    private String queue;
    private String communicationType;
    private String commandType;
    private String messageType;
    private String messageTypeName;
    private String senderId;
    private String senderName;
    private List<String> receivers;
    private List<String> receiverNames;
    private MessageTemplate template;
    private D data;
    private Map<String, Object> variables;
 }



发送消息

@Autowired
private IMessageQueueProducer<DefaultMessage<?>> messageQueueProducer;
......
Message<String> message = new DefaultMessage<>();
message.setExchange("exchangeName");
message.setRouting("routingKey");
message.setData("test helllo");
messageQueueProducer.push(message);



三、消费者(consumer)

方式一:

@RabbitListener(bindings = {
        @QueueBinding(
                exchange = @Exchange(value = "exchangeName"),
                key = "routingKey",
                value = @Queue(value = "queueName")
        )
})
public void consume(Message message) {
    // do something
}

方式二:

@RabbitListener(queues = "queueName")
public void consume(Message message) {
    // do something
}

推荐第一种方式,防止发送方服务自动创建还未完成



建议:

1、这里的Message都是自定义的类,在发送消息时会自动转换成spring的Message

2、建议交换机、队列、路由Key使用常量(不会经常变),减少配置文件的配置项


对有帮助的小伙伴记得点个赞哟~



版权声明:本文为a251628111原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。