版本:4.2.0
消息通过producer发送到broker,broker缓存到本地磁盘,consumer怎么得到消息?
两种方法:
1.broker将消息发送到consumer
2.consumer请求broker得到消息
这两种方式就是MQ的推模型和拉模型,RocketMQ选择的是拉模型,拉取消息的好处在于consumer清楚自己的消费能力,每次拉取的消息不会超出自己的消费能力,消费完再拉取下一批。
Consumer启动之后就会自动的消费消息,作为coder,我们并没有写多余的代码去请求broker拉取消息,RocketMQ时如何做到实时拉取消息的呢?
一般情况下,消费者这么定义:
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("test", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for(MessageExt message : list){
System.out.println("msg : " + new String(message.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
如此简单的代码就能做到实时消费?貌似有点不可思议
所有答案都在start方法里面:
public void start() throws MQClientException {
this.defaultMQPushConsumerImpl.start();
}
可以看到,这里其实调用了defaultMQPushConsumerImpl的start方法,那就进去看看
public synchronized void start() throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),
this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
this.serviceState = ServiceState.START_FAILED;
// 检查配置
this.checkConfig();
// 缓存topic
this.copySubscription();
if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
this.defaultMQPushConsumer.changeInstanceNameToPID();
}
// 初始化mQClientFactory
this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
this.pullAPIWrapper = new PullAPIWrapper(
mQClientFactory,
this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
if (this.defaultMQPushConsumer.getOffsetStore() != null) {
this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
} else {
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
// 广播消费模式初始化本地offsetStore
this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
case CLUSTERING:
// 集群消费模式初始化远程offsetStore
this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
default:
break;
}
this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
}
this.offsetStore.load();
if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
this.consumeOrderly = true;
// 顺序消费
this.consumeMessageService =
new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
this.consumeOrderly = false;
// 并发消费
this.consumeMessageService =
new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
}
// 并发消费:定期清理过期的消息,然后将消息发送回broker,如果发送失败会发送到重试队列
// 顺序消费:锁住processQueue,pullMessageService会获取queue的最大offset,但是不能保证顺序消费
this.consumeMessageService.start();
// 缓存到本地
boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
this.consumeMessageService.shutdown();
throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
mQClientFactory.start();
log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The PushConsumer service state not OK, maybe started once, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}
// 更新topic路由信息
this.updateTopicSubscribeInfoWhenSubscriptionChanged();
this.mQClientFactory.checkClientInBroker();
// 向broker master发送心跳包
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
this.mQClientFactory.rebalanceImmediately();
}
mQClientFactory是consumer的核心成员之一,它的初始化都干了什么?
public MQClientInstance getAndCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
String clientId = clientConfig.buildMQClientId();
MQClientInstance instance = this.factoryTable.get(clientId);
if (null == instance) {
instance =
new MQClientInstance(clientConfig.cloneClientConfig(),
this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
if (prev != null) {
instance = prev;
log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
} else {
log.info("Created new MQClientInstance for clientId:[{}]", clientId);
}
}
return instance;
}
mQClientFactory为什么放到缓存呢?
庞大的工程一般情况会存在多个消费者,这些消费者又有可能不是同一个consumeGroup,所以利用consumeGroup来区分不同mQClientFactory。
public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String clientId, RPCHook rpcHook) {
this.clientConfig = clientConfig;
this.instanceIndex = instanceIndex;
this.nettyClientConfig = new NettyClientConfig();
this.nettyClientConfig.setClientCallbackExecutorThreads(clientConfig.getClientCallbackExecutorThreads());
this.nettyClientConfig.setUseTLS(clientConfig.isUseTLS());
this.clientRemotingProcessor = new ClientRemotingProcessor(this);
this.mQClientAPIImpl = new MQClientAPIImpl(this.nettyClientConfig, this.clientRemotingProcessor, rpcHook, clientConfig);
if (this.clientConfig.getNamesrvAddr() != null) {
this.mQClientAPIImpl.updateNameServerAddressList(this.clientConfig.getNamesrvAddr());
log.info("user specified name server address: {}", this.clientConfig.getNamesrvAddr());
}
this.clientId = clientId;
this.mQAdminImpl = new MQAdminImpl(this);
this.pullMessageService = new PullMessageService(this);
this.rebalanceService = new RebalanceService(this);
this.defaultMQProducer = new DefaultMQProducer(MixAll.CLIENT_INNER_PRODUCER_GROUP);
this.defaultMQProducer.resetClientConfig(clientConfig);
this.consumerStatsManager = new ConsumerStatsManager(this.scheduledExecutorService);
log.info("created a new client Instance, FactoryIndex: {} ClinetID: {} {} {}, serializeType={}",
this.instanceIndex,
this.clientId,
this.clientConfig,
MQVersion.getVersionDesc(MQVersion.CURRENT_VERSION), RemotingCommand.getSerializeTypeConfigInThisServer());
}
看上面初始化过程,mQClientFactory是当之无愧的核心成员。
主要关注这三个成员:
mQClientAPIImpl:内部持有一个NettyRemotingClient,主要与nettyServer建立连接
pullMessageService:拉取消息的实现类
rebalanceService:主要负责MessageQueue和客户端的均衡
mQClientFactory初始化完成之后看看它启动干了哪些事
public void start() throws MQClientException {
synchronized (this) {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
// If not specified,looking address from name server
if (null == this.clientConfig.getNamesrvAddr()) {
this.mQCl