rocketMq之消息发送和消费(六)

  • Post author:
  • Post category:其他


引进maven依赖

<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client -->
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.7.1</version>
</dependency>

流程

  • 发送者步骤分析
  1. 创建Producer,并指定生产者组名
  2. 指定NameServer的地址
  3. 启动Producer
  4. 创建消息对象,指定Topic、Tag及消息体
  5. 发送消息
  6. 关闭Producer
  • 消费者步骤分析
  1. 创建Consumer,并指定消费者组名
  2. 指定NameServer的地址
  3. 订阅Topic、Tag
  4. 设置回调函数、处理消息
  5. 启动Consumer

消息发送

  • 同步消息

客户端线程阻塞消息,直到得到结果,如消息通知、短信

    public static void main(String[] argv) throws Exception {
        //创建Producer,并指定生产者组名
        DefaultMQProducer producer = new DefaultMQProducer("group1");
        //指定NameServer的地址
        producer.setNamesrvAddr("106.55.146.154:9876;39.106.142.155:9876");
        //启动Producer
        producer.start();
        //创建消息对象,指定Topic、Tag及消息体
        for (int i=0; i<10; i++) {
            Message msg = new Message("base", "tag1", ("hello world" + i).getBytes());
            //发送消息
            SendResult result = producer.send(msg);
            System.out.println(result.getSendStatus());
            System.out.println(result.getMsgId());
            System.out.println(result.getMessageQueue());
            TimeUnit.SECONDS.sleep(1);
        }
        //关闭Producer
        producer.shutdown();
    }
  • 异步消息

对时间比较敏感的业务场景

    public static void main(String[] argv) throws Exception {
        //创建Producer,并指定生产者组名
        DefaultMQProducer producer = new DefaultMQProducer("group1");
        //指定NameServer的地址
        producer.setNamesrvAddr("106.55.146.154:9876;39.106.142.155:9876");
        //启动Producer
        producer.start();
        //创建消息对象,指定Topic、Tag及消息体
        for (int i=0; i<10; i++) {
            Message msg = new Message("base", "tag1", ("hello world" + i).getBytes());
            //发送消息
            producer.send(msg, new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    System.out.println(sendResult.getSendStatus());
                    System.out.println(sendResult.getMsgId());
                    System.out.println(sendResult.getMessageQueue());
                }

                @Override
                public void onException(Throwable throwable) {
                    System.out.println(throwable.getMessage());
                }
            });
            TimeUnit.SECONDS.sleep(1);
        }
        //关闭Producer
        producer.shutdown();
    }
  • 单向消息

不关注发送成功或失败的场景,如日志

    public static void main(String[] argv) throws Exception {
        //创建Producer,并指定生产者组名
        DefaultMQProducer producer = new DefaultMQProducer("group1");
        //指定NameServer的地址
        producer.setNamesrvAddr("106.55.146.154:9876;39.106.142.155:9876");
        //启动Producer
        producer.start();
        //创建消息对象,指定Topic、Tag及消息体
        for (int i=0; i<10; i++) {
            Message msg = new Message("base", "tag1", ("hello world" + i).getBytes());
            //发送消息
            producer.sendOneway(msg);
            TimeUnit.SECONDS.sleep(1);
        }
        //关闭Producer
        producer.shutdown();
    }

消息消费

  • 基本消费
    public static void main(String[] argv) throws Exception{
        //创建Consumer,并指定消费者组名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
        //指定NameServer的地址
        consumer.setNamesrvAddr("106.55.146.154:9876;39.106.142.155:9876");
        //订阅Topic、Tag
        consumer.subscribe("base", "tag1");
        //设置回调函数、处理消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                list.forEach(v -> {
                    System.out.println(v);
                });

                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //
        consumer.start();
    }

消费模式

  • 负载均衡模式(默认模式)

消费者共同分担消费任务,例如,生产者来10个消息,总共有两个消费者,一个消费者消费2个,一个消费者消费8个

consumer.setMessageModel(MessageModel.CLUSTERING);
  • 广播模式

所有消费者都要消费生产者发送的全部消息,例如,生产者来10个消息,总共两个消费者,一个消费者消费10,另一个消费者也消费10个

consumer.setMessageModel(MessageModel.BROADCASTING);



版权声明:本文为qq_19831379原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。