问题描述
在生产服务器搭了一套rocketmq环境,在生产服务器上能测试发送和接受消息都正常,并且观察namesrv和broker的日志,都没有发现异常日志。但就是在本地无法消费,网络是通的(ping和telnet都成功)
跟进问题总结
-
在跟代码发现订阅主题下的消费组的客户端数为空(为什么呢?)
1.1 观察消费端是否有下面的日志if (null == cidAll) { log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic); }
(1)客户端日志打印位置留意下方代码(日志文件为rocketmq_client.log)
public class ClientLogger { private static synchronized Appender createClientAppender() { // 通过CLIENT_LOG_ROOT = "rocketmq.client.logRoot"可以指定路径 String clientLogRoot = System.getProperty(CLIENT_LOG_ROOT, System.getProperty("user.home") + "/logs/rocketmqlogs"); String clientLogFileName = System.getProperty(CLIENT_LOG_FILENAME, "rocketmq_client.log"); } }
1.2 继续看rocketmq_client.log日志,会发现是因为访问不到broker,所以无法获取对应消费组信息
2021-11-17 16:52:03,364 WARN RocketmqClient - getConsumerIdListByGroup exception, xxx,xxx,xxx,xxx:10911 express org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to xxx,xxx,xxx,xxx:10911 failed at org.apache.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:394) at org.apache.rocketmq.client.impl.MQClientAPIImpl.getConsumerIdListByGroup(MQClientAPIImpl.java:888) at org.apache.rocketmq.client.impl.factory.MQClientInstance.findConsumerIdList(MQClientInstance.java:1067) at org.apache.rocketmq.client.impl.consumer.RebalanceImpl.rebalanceByTopic(RebalanceImpl.java:260) at org.apache.rocketmq.client.impl.consumer.RebalanceImpl.doRebalance(RebalanceImpl.java:223) at org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl.doRebalance(DefaultMQPushConsumerImpl.java:1012) at org.apache.rocketmq.client.impl.factory.MQClientInstance.doRebalance(MQClientInstance.java:958) at org.apache.rocketmq.client.impl.consumer.RebalanceService.run(RebalanceService.java:41) at java.lang.Thread.run(Thread.java:748)
- 把消费端部署在内网地址能通的环境就可以了
RocketMQ是如何消费的?
-
通过
主题下的消息队列集合
以及订阅该
主题的消费组内的客户端数
进行分配,获取该客户端分配的消息队列集合 -
通过指定
消息队列
的偏移量定义消息拉取请求
pullRequest
,拉取消息服务
PullMessageService
进行循环拉取任务
PullMessageService拉取消息
@Override
public void run() {
while (!this.isStopped()) {
try {
PullRequest pullRequest = this.pullRequestQueue.take();
this.pullMessage(pullRequest);
} catch (InterruptedException ignored) {
} catch (Exception e) {
log.error("Pull Message Service Run Method exception", e);
}
}
}
-
pullRequestQueue是个阻塞队列,当pullRequestQueue队列里没有元素时,会一直阻塞
1.1 那pullRequestQueue中什么时候放入元素(关注下面RebalanceService的介绍)
RebalanceService放入拉取请求pullRequest
-
默认每2秒钟重新负载一次 MQClientInstance#doRebalance
public void doRebalance() { // 以消费组的维度进行遍历负载 for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) { MQConsumerInner impl = entry.getValue(); if (impl != null) { try { impl.doRebalance(); } catch (Throwable e) { log.error("doRebalance exception", e); } } } }
-
RebalanceImpl#doRebalance
public void doRebalance(final boolean isOrder) { // 订阅信息(topic -> SubscriptionData) Map<String, SubscriptionData> subTable = this.getSubscriptionInner(); if (subTable != null) { for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) { final String topic = entry.getKey(); try { // 以主题维护进行负载 this.rebalanceByTopic(topic, isOrder); } catch (Throwable e) { if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { log.warn("rebalanceByTopic Exception", e); } } } } this.truncateMessageQueueNotMyTopic(); }
-
集群消息负载算法介绍RebalanceImpl#rebalanceByTopic
private void rebalanceByTopic(final String topic, final boolean isOrder) { switch (messageModel) { case CLUSTERING: { // 获取主题下队列列表 Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic); // 订阅该主题消费组内的客户端数(topic -> brokerAddr && brokerAddr + group -> cidAll) List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup); // 分配算法(默认平均分配:AllocateMessageQueueAveragely)为该客户端分配对应消息队列 allocateResult = strategy.allocate( this.consumerGroup, this.mQClientFactory.getClientId(), mqAll, cidAll); // 见下方3.1的介绍 boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder); } } }
3.1 RebalanceImpl#updateProcessQueueTableInRebalance
(1)按照消息队列MessageQueue的维度定义拉取请求PullRequest
(2)对拉取请求集合添加到拉取服务PullMessageService的pullRequestQueue队列中(见上方PullMessageService介绍)private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet, final boolean isOrder) { List<PullRequest> pullRequestList = new ArrayList<PullRequest>(); for (MessageQueue mq : mqSet) { if (!this.processQueueTable.containsKey(mq)) { ProcessQueue pq = new ProcessQueue(); // 消息队列的偏移量 long nextOffset = this.computePullFromWhereWithException(mq); if (nextOffset >= 0) { // pq在上方会new初始化 ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq); if(pre == null){ // 新建拉取请求 PullRequest pullRequest = new PullRequest(); pullRequest.setConsumerGroup(consumerGroup); pullRequest.setNextOffset(nextOffset); pullRequest.setMessageQueue(mq); pullRequest.setProcessQueue(pq); pullRequestList.add(pullRequest); changed = true; } } } } // 分发拉取请求 this.dispatchPullRequest(pullRequestList); }
获取订阅主题消费组内消费者
- 通过注册中心namesrv获取主题的路由信息(包含主题对应broker集合),随机返回其中一个broker
- 向broker请求消费组的消费者信息
// MQClientInstance
public List<String> findConsumerIdList(final String topic, final String group) {
// 获取主题相关的broker(问题点:返回的是内网地址,在外面访问不了)
String brokerAddr = this.findBrokerAddrByTopic(topic);
if (null != brokerAddr) {
// 向broker发送信息,获取消费组的消费者信息
return this.mQClientAPIImpl.getConsumerIdListByGroup(brokerAddr, group, 3000);
}
}
-
获取topic的broker信息
public String findBrokerAddrByTopic(final String topic) { // 获取topic的路由信息(topicRouteTable在哪里初始化? -->关注下方主题路由信息的介绍) TopicRouteData topicRouteData = this.topicRouteTable.get(topic); if (topicRouteData != null) { // 获取主题的broker列表 List<BrokerData> brokers = topicRouteData.getBrokerDatas(); if (!brokers.isEmpty()) { // 随机返回一个broker int index = random.nextInt(brokers.size()); BrokerData bd = brokers.get(index % brokers.size()); return bd.selectBrokerAddr(); } } }
主题路由信息
-
在客户端实例MQClientInstance中有个定时任务,默认每30秒更新路由信息(通过pollNameServerInterval可以配置)
// MQClientInstance public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault, DefaultMQProducer defaultMQProducer) { if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { // 从注册中心namesrv获取主题的路由信息 TopicRouteData topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3); if (topicRouteData != null) { // 将topic的路由信息缓存到本地 this.topicRouteTable.put(topic, cloneTopicRouteData); } } }
版权声明:本文为weixin_40803011原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。