RabiitMQ

  • Post author:
  • Post category:其他




今日总结



首先,上午先讲了RabiitMQ交换机剩余的部分。

1.主题交换机(TopicExchange),主要由交换机名和路由KEY(模糊匹配)构成,需要注意的是模糊匹配中的*可以代替一个单词,而#可以代替零个或多个单词。

代码示例:

    @Bean
    public Queue queueTwo() {
        return new Queue("queueTwo");
    }

    //主题交换机
    @Bean
    public TopicExchange topicExchangeTwo() {
        return new TopicExchange("topicExchangeTwo");
    }

    //交换机与队列进行绑定
    @Bean
    public Binding queueTwoToTopicExchangeTwo(Queue queueTwo, TopicExchange topicExchangeTwo) {
        return BindingBuilder.bind(queueTwo).to(topicExchangeTwo).with("order.#");
    }

2.扇形交换机( FanoutExchange),又可以称作广播模式(订阅发布模式),不需要key,即可将消息投递到绑定的队列中。

代码示例:

  @Bean
    public Queue queueThree() {
        return new Queue("queueThree");
    }

    @Bean
    public Queue queueFour() {
        return new Queue("queueFour");
    }

    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange("fanoutExchange");
    }

    @Bean
    public Binding queueThreeToFanoutExchange(Queue queueThree, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(queueThree).to(fanoutExchange);
    }

    @Bean
    public Binding queueFourToFanoutExchange(Queue queueFour, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(queueFour).to(fanoutExchange);
    }

3.头交换机,不过头交换机的使用比较麻烦,使用的地方较少。需要注意的点是它需要把规则放到发送内容中。

最后需要注意的一点是:队列尽量不要绑定到不同的交换机,一个交换机是允许绑定多个队列!!!



然后就是下午讲的知识点,高阶的MQ知识点。

这里是以在**rabbitmq中如何保证数据不丢失?**进行讲解的。



首先我们要认识到一点的是:


RabbitMQ消息投递路径是怎么进行的?



·生产者到交换机

·交换机到队列

·通过confirmCallback

·通过returnCallback

·生产者–>交换机->队列->消费者



然后我们先理解生产者到交换机的流程:


·通过confirmCallback

·生产者投递消息后,如果Broker收到消息后,会给生产者一个ACK。生产者通过ACK,可以确认这条消息是否正常发送到Broker,这种方式是消息可靠性投递的核心



交换机到队列流程:


·交换机到队列不成功,则丢弃消息(默认)

·交换机到队列不成功,返回给消息生产者,触发returnCallback

·通过returnCallback

·消息从交换器发送到对应队列失败时触发



其次是开启confirmCallback流程:


//开启手动确认

rabbitTemplate.setMandatory(true);//开启消息确认

//绑定回调函数

rabbitTemplate.setConfirmCallback(callback);

//创建关联数据对象

CorrelationData correlationData=new CorrelationData(orderNum);



再然后就是returnCallback。



然后从消费者我们只需要实现一个方法:

@RabbitListener(queues = "orderQueue")
public void getOrderMQ(Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag,OrderDto orderDto) throws InterruptedException, IOException {
}



最后就是死信:


什么是死信?


一般来说,producer将消息投递到queue中,consumer从queue取出消息进行消费,但某些时候由于特定的原因导致queue中的某些消息无法被消费,

这样的消息如果没有后续的处理,就变成了死信(Dead Letter),所有的死信都会放到死信队列中。


死信队列的来源:



·消息被拒绝(basic.reject或basic.nack)并且requeue=false.

·消息TTL过期

·队列达到最大长度(队列满了,无法再添加数据到mq中)



版权声明:本文为weixin_44002920原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。