RocketMQ Consumer 消费流程

  • Post author:
  • Post category:其他


版本:4.2.0

消息通过producer发送到broker,broker缓存到本地磁盘,consumer怎么得到消息?

两种方法:

1.broker将消息发送到consumer

2.consumer请求broker得到消息

这两种方式就是MQ的推模型和拉模型,RocketMQ选择的是拉模型,拉取消息的好处在于consumer清楚自己的消费能力,每次拉取的消息不会超出自己的消费能力,消费完再拉取下一批。

Consumer启动之后就会自动的消费消息,作为coder,我们并没有写多余的代码去请求broker拉取消息,RocketMQ时如何做到实时拉取消息的呢?

一般情况下,消费者这么定义:

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("test", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
        for(MessageExt message : list){
            System.out.println("msg : " + new String(message.getBody()));
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});
consumer.start();

如此简单的代码就能做到实时消费?貌似有点不可思议

所有答案都在start方法里面:

public void start() throws MQClientException {
    this.defaultMQPushConsumerImpl.start();
}

可以看到,这里其实调用了defaultMQPushConsumerImpl的start方法,那就进去看看

public synchronized void start() throws MQClientException {
    switch (this.serviceState) {
        case CREATE_JUST:
            log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),
                     this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
            this.serviceState = ServiceState.START_FAILED;
            // 检查配置
            this.checkConfig();
            // 缓存topic
            this.copySubscription();
​
            if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
                this.defaultMQPushConsumer.changeInstanceNameToPID();
            }
            // 初始化mQClientFactory
            this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
​
            this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
            this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
            this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
            this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
​
            this.pullAPIWrapper = new PullAPIWrapper(
                mQClientFactory,
                this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
            this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
​
            if (this.defaultMQPushConsumer.getOffsetStore() != null) {
                this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
            } else {
                switch (this.defaultMQPushConsumer.getMessageModel()) {
                    case BROADCASTING:
                        // 广播消费模式初始化本地offsetStore
                        this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
                        break;
                    case CLUSTERING:
                        // 集群消费模式初始化远程offsetStore
                        this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
                        break;
                    default:
                        break;
                }
                this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
            }
            this.offsetStore.load();
            
            if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
                this.consumeOrderly = true;
                // 顺序消费
                this.consumeMessageService =
                    new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
            } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
                this.consumeOrderly = false;
                // 并发消费
                this.consumeMessageService =
                    new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
            }
            // 并发消费:定期清理过期的消息,然后将消息发送回broker,如果发送失败会发送到重试队列
            // 顺序消费:锁住processQueue,pullMessageService会获取queue的最大offset,但是不能保证顺序消费
            this.consumeMessageService.start();
            // 缓存到本地
            boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
            if (!registerOK) {
                this.serviceState = ServiceState.CREATE_JUST;
                this.consumeMessageService.shutdown();
                throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
                                            + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                                            null);
            }
            
            mQClientFactory.start();
            log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
            this.serviceState = ServiceState.RUNNING;
            break;
        case RUNNING:
        case START_FAILED:
        case SHUTDOWN_ALREADY:
            throw new MQClientException("The PushConsumer service state not OK, maybe started once, "
                                        + this.serviceState
                                        + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                                        null);
        default:
            break;
    }
    // 更新topic路由信息
    this.updateTopicSubscribeInfoWhenSubscriptionChanged();
    this.mQClientFactory.checkClientInBroker();
    // 向broker master发送心跳包
    this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
    this.mQClientFactory.rebalanceImmediately();
}

mQClientFactory是consumer的核心成员之一,它的初始化都干了什么?

public MQClientInstance getAndCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
    String clientId = clientConfig.buildMQClientId();
    MQClientInstance instance = this.factoryTable.get(clientId);
    if (null == instance) {
        instance =
            new MQClientInstance(clientConfig.cloneClientConfig(),
                                 this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
        MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
        if (prev != null) {
            instance = prev;
            log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
        } else {
            log.info("Created new MQClientInstance for clientId:[{}]", clientId);
        }
    }
​
    return instance;
}

mQClientFactory为什么放到缓存呢?

庞大的工程一般情况会存在多个消费者,这些消费者又有可能不是同一个consumeGroup,所以利用consumeGroup来区分不同mQClientFactory。

public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String clientId, RPCHook rpcHook) {
    this.clientConfig = clientConfig;
    this.instanceIndex = instanceIndex;
    this.nettyClientConfig = new NettyClientConfig();
    this.nettyClientConfig.setClientCallbackExecutorThreads(clientConfig.getClientCallbackExecutorThreads());
    this.nettyClientConfig.setUseTLS(clientConfig.isUseTLS());
    this.clientRemotingProcessor = new ClientRemotingProcessor(this);
    this.mQClientAPIImpl = new MQClientAPIImpl(this.nettyClientConfig, this.clientRemotingProcessor, rpcHook, clientConfig);
​
    if (this.clientConfig.getNamesrvAddr() != null) {
        this.mQClientAPIImpl.updateNameServerAddressList(this.clientConfig.getNamesrvAddr());
        log.info("user specified name server address: {}", this.clientConfig.getNamesrvAddr());
    }
​
    this.clientId = clientId;
​
    this.mQAdminImpl = new MQAdminImpl(this);
​
    this.pullMessageService = new PullMessageService(this);
​
    this.rebalanceService = new RebalanceService(this);
​
    this.defaultMQProducer = new DefaultMQProducer(MixAll.CLIENT_INNER_PRODUCER_GROUP);
    this.defaultMQProducer.resetClientConfig(clientConfig);
​
    this.consumerStatsManager = new ConsumerStatsManager(this.scheduledExecutorService);
​
    log.info("created a new client Instance, FactoryIndex: {} ClinetID: {} {} {}, serializeType={}",
             this.instanceIndex,
             this.clientId,
             this.clientConfig,
             MQVersion.getVersionDesc(MQVersion.CURRENT_VERSION), RemotingCommand.getSerializeTypeConfigInThisServer());
}

看上面初始化过程,mQClientFactory是当之无愧的核心成员。

主要关注这三个成员:

mQClientAPIImpl:内部持有一个NettyRemotingClient,主要与nettyServer建立连接

pullMessageService:拉取消息的实现类

rebalanceService:主要负责MessageQueue和客户端的均衡

mQClientFactory初始化完成之后看看它启动干了哪些事

public void start() throws MQClientException {
​
    synchronized (this) {
        switch (this.serviceState) {
            case CREATE_JUST:
                this.serviceState = ServiceState.START_FAILED;
                // If not specified,looking address from name server
                if (null == this.clientConfig.getNamesrvAddr()) {
                    this.mQCl



版权声明:本文为small_bao原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。