版本:4.2.0
消息通过producer发送到broker,broker缓存到本地磁盘,consumer怎么得到消息?
两种方法:
1.broker将消息发送到consumer
2.consumer请求broker得到消息
这两种方式就是MQ的推模型和拉模型,RocketMQ选择的是拉模型,拉取消息的好处在于consumer清楚自己的消费能力,每次拉取的消息不会超出自己的消费能力,消费完再拉取下一批。
Consumer启动之后就会自动的消费消息,作为coder,我们并没有写多余的代码去请求broker拉取消息,RocketMQ时如何做到实时拉取消息的呢?
一般情况下,消费者这么定义:
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer"); consumer.setNamesrvAddr("localhost:9876"); consumer.subscribe("test", "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { for(MessageExt message : list){ System.out.println("msg : " + new String(message.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start();
如此简单的代码就能做到实时消费?貌似有点不可思议
所有答案都在start方法里面:
public void start() throws MQClientException { this.defaultMQPushConsumerImpl.start(); }
可以看到,这里其实调用了defaultMQPushConsumerImpl的start方法,那就进去看看
public synchronized void start() throws MQClientException { switch (this.serviceState) { case CREATE_JUST: log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(), this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode()); this.serviceState = ServiceState.START_FAILED; // 检查配置 this.checkConfig(); // 缓存topic this.copySubscription(); if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) { this.defaultMQPushConsumer.changeInstanceNameToPID(); } // 初始化mQClientFactory this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook); this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup()); this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel()); this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy()); this.rebalanceImpl.setmQClientFactory(this.mQClientFactory); this.pullAPIWrapper = new PullAPIWrapper( mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode()); this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList); if (this.defaultMQPushConsumer.getOffsetStore() != null) { this.offsetStore = this.defaultMQPushConsumer.getOffsetStore(); } else { switch (this.defaultMQPushConsumer.getMessageModel()) { case BROADCASTING: // 广播消费模式初始化本地offsetStore this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup()); break; case CLUSTERING: // 集群消费模式初始化远程offsetStore this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup()); break; default: break; } this.defaultMQPushConsumer.setOffsetStore(this.offsetStore); } this.offsetStore.load(); if (this.getMessageListenerInner() instanceof MessageListenerOrderly) { this.consumeOrderly = true; // 顺序消费 this.consumeMessageService = new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner()); } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) { this.consumeOrderly = false; // 并发消费 this.consumeMessageService = new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner()); } // 并发消费:定期清理过期的消息,然后将消息发送回broker,如果发送失败会发送到重试队列 // 顺序消费:锁住processQueue,pullMessageService会获取queue的最大offset,但是不能保证顺序消费 this.consumeMessageService.start(); // 缓存到本地 boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this); if (!registerOK) { this.serviceState = ServiceState.CREATE_JUST; this.consumeMessageService.shutdown(); throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup() + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), null); } mQClientFactory.start(); log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup()); this.serviceState = ServiceState.RUNNING; break; case RUNNING: case START_FAILED: case SHUTDOWN_ALREADY: throw new MQClientException("The PushConsumer service state not OK, maybe started once, " + this.serviceState + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null); default: break; } // 更新topic路由信息 this.updateTopicSubscribeInfoWhenSubscriptionChanged(); this.mQClientFactory.checkClientInBroker(); // 向broker master发送心跳包 this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); this.mQClientFactory.rebalanceImmediately(); }
mQClientFactory是consumer的核心成员之一,它的初始化都干了什么?
public MQClientInstance getAndCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) { String clientId = clientConfig.buildMQClientId(); MQClientInstance instance = this.factoryTable.get(clientId); if (null == instance) { instance = new MQClientInstance(clientConfig.cloneClientConfig(), this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook); MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance); if (prev != null) { instance = prev; log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId); } else { log.info("Created new MQClientInstance for clientId:[{}]", clientId); } } return instance; }
mQClientFactory为什么放到缓存呢?
庞大的工程一般情况会存在多个消费者,这些消费者又有可能不是同一个consumeGroup,所以利用consumeGroup来区分不同mQClientFactory。
public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String clientId, RPCHook rpcHook) { this.clientConfig = clientConfig; this.instanceIndex = instanceIndex; this.nettyClientConfig = new NettyClientConfig(); this.nettyClientConfig.setClientCallbackExecutorThreads(clientConfig.getClientCallbackExecutorThreads()); this.nettyClientConfig.setUseTLS(clientConfig.isUseTLS()); this.clientRemotingProcessor = new ClientRemotingProcessor(this); this.mQClientAPIImpl = new MQClientAPIImpl(this.nettyClientConfig, this.clientRemotingProcessor, rpcHook, clientConfig); if (this.clientConfig.getNamesrvAddr() != null) { this.mQClientAPIImpl.updateNameServerAddressList(this.clientConfig.getNamesrvAddr()); log.info("user specified name server address: {}", this.clientConfig.getNamesrvAddr()); } this.clientId = clientId; this.mQAdminImpl = new MQAdminImpl(this); this.pullMessageService = new PullMessageService(this); this.rebalanceService = new RebalanceService(this); this.defaultMQProducer = new DefaultMQProducer(MixAll.CLIENT_INNER_PRODUCER_GROUP); this.defaultMQProducer.resetClientConfig(clientConfig); this.consumerStatsManager = new ConsumerStatsManager(this.scheduledExecutorService); log.info("created a new client Instance, FactoryIndex: {} ClinetID: {} {} {}, serializeType={}", this.instanceIndex, this.clientId, this.clientConfig, MQVersion.getVersionDesc(MQVersion.CURRENT_VERSION), RemotingCommand.getSerializeTypeConfigInThisServer()); }
看上面初始化过程,mQClientFactory是当之无愧的核心成员。
主要关注这三个成员:
mQClientAPIImpl:内部持有一个NettyRemotingClient,主要与nettyServer建立连接
pullMessageService:拉取消息的实现类
rebalanceService:主要负责MessageQueue和客户端的均衡
mQClientFactory初始化完成之后看看它启动干了哪些事
public void start() throws MQClientException { synchronized (this) { switch (this.serviceState) { case CREATE_JUST: this.serviceState = ServiceState.START_FAILED; // If not specified,looking address from name server if (null == this.clientConfig.getNamesrvAddr()) { this.mQCl