RocketMQ无法消费消息问题的排查(获取不到消费组信息导致)

  • Post author:
  • Post category:其他




问题描述

在生产服务器搭了一套rocketmq环境,在生产服务器上能测试发送和接受消息都正常,并且观察namesrv和broker的日志,都没有发现异常日志。但就是在本地无法消费,网络是通的(ping和telnet都成功)



跟进问题总结

  1. 在跟代码发现订阅主题下的消费组的客户端数为空(为什么呢?)

    1.1 观察消费端是否有下面的日志

    if (null == cidAll) {
       log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);
    }
    

    (1)客户端日志打印位置留意下方代码(日志文件为rocketmq_client.log)

    public class ClientLogger {
    	private static synchronized Appender createClientAppender() {
    		// 通过CLIENT_LOG_ROOT = "rocketmq.client.logRoot"可以指定路径
    		String clientLogRoot = System.getProperty(CLIENT_LOG_ROOT, System.getProperty("user.home") + "/logs/rocketmqlogs");
    		String clientLogFileName = System.getProperty(CLIENT_LOG_FILENAME, "rocketmq_client.log");
    	}
    }
    

    1.2 继续看rocketmq_client.log日志,会发现是因为访问不到broker,所以无法获取对应消费组信息

    2021-11-17 16:52:03,364 WARN RocketmqClient - getConsumerIdListByGroup exception, xxx,xxx,xxx,xxx:10911 express
    org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to  xxx,xxx,xxx,xxx:10911 failed
    	at org.apache.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:394)
    	at org.apache.rocketmq.client.impl.MQClientAPIImpl.getConsumerIdListByGroup(MQClientAPIImpl.java:888)
    	at org.apache.rocketmq.client.impl.factory.MQClientInstance.findConsumerIdList(MQClientInstance.java:1067)
    	at org.apache.rocketmq.client.impl.consumer.RebalanceImpl.rebalanceByTopic(RebalanceImpl.java:260)
    	at org.apache.rocketmq.client.impl.consumer.RebalanceImpl.doRebalance(RebalanceImpl.java:223)
    	at org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl.doRebalance(DefaultMQPushConsumerImpl.java:1012)
    	at org.apache.rocketmq.client.impl.factory.MQClientInstance.doRebalance(MQClientInstance.java:958)
    	at org.apache.rocketmq.client.impl.consumer.RebalanceService.run(RebalanceService.java:41)
    	at java.lang.Thread.run(Thread.java:748)
    
  2. 把消费端部署在内网地址能通的环境就可以了



RocketMQ是如何消费的?

  1. 通过

    主题下的消息队列集合

    以及订阅该

    主题的消费组内的客户端数

    进行分配,获取该客户端分配的消息队列集合
  2. 通过指定

    消息队列

    的偏移量定义消息拉取请求

    pullRequest

    ,拉取消息服务

    PullMessageService

    进行循环拉取任务



PullMessageService拉取消息

@Override
    public void run() {
        while (!this.isStopped()) {
            try {
                PullRequest pullRequest = this.pullRequestQueue.take();
                this.pullMessage(pullRequest);
            } catch (InterruptedException ignored) {
            } catch (Exception e) {
                log.error("Pull Message Service Run Method exception", e);
            }
        }
    }
  1. pullRequestQueue是个阻塞队列,当pullRequestQueue队列里没有元素时,会一直阻塞

    1.1 那pullRequestQueue中什么时候放入元素(关注下面RebalanceService的介绍)



RebalanceService放入拉取请求pullRequest

  • 默认每2秒钟重新负载一次 MQClientInstance#doRebalance

    public void doRebalance() {
        // 以消费组的维度进行遍历负载
    	for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
                MQConsumerInner impl = entry.getValue();
                if (impl != null) {
                    try {
                        impl.doRebalance();
                    } catch (Throwable e) {
                        log.error("doRebalance exception", e);
                    }
                }
            }
    }
    
  • RebalanceImpl#doRebalance

    public void doRebalance(final boolean isOrder) {
    	// 订阅信息(topic -> SubscriptionData)
        Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
        if (subTable != null) {
            for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
                final String topic = entry.getKey();
                try {
                    // 以主题维护进行负载
                    this.rebalanceByTopic(topic, isOrder);
                } catch (Throwable e) {
                    if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                        log.warn("rebalanceByTopic Exception", e);
                    }
                }
            }
        }
    
        this.truncateMessageQueueNotMyTopic();
    }
    
  • 集群消息负载算法介绍RebalanceImpl#rebalanceByTopic

    private void rebalanceByTopic(final String topic, final boolean isOrder) {
    	switch (messageModel) {
    		case CLUSTERING: {
    			// 获取主题下队列列表
    			Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
    			// 订阅该主题消费组内的客户端数(topic -> brokerAddr  && brokerAddr  + group -> cidAll)
    			List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
    			// 分配算法(默认平均分配:AllocateMessageQueueAveragely)为该客户端分配对应消息队列
    			allocateResult = strategy.allocate(
                            this.consumerGroup,
                            this.mQClientFactory.getClientId(),
                            mqAll,
                            cidAll);
                // 见下方3.1的介绍            
    			boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
    		}
    	}
    }
    

    3.1 RebalanceImpl#updateProcessQueueTableInRebalance

    (1)按照消息队列MessageQueue的维度定义拉取请求PullRequest

    (2)对拉取请求集合添加到拉取服务PullMessageService的pullRequestQueue队列中(见上方PullMessageService介绍)

    private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,
        final boolean isOrder) {
    	List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
    	for (MessageQueue mq : mqSet) {
    		if (!this.processQueueTable.containsKey(mq)) {
    		    ProcessQueue pq = new ProcessQueue();
    			// 消息队列的偏移量
    			long nextOffset = this.computePullFromWhereWithException(mq);
    			if (nextOffset >= 0) {
    			    // pq在上方会new初始化
    				ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
    				if(pre == null){
    				    // 新建拉取请求
    					PullRequest pullRequest = new PullRequest();
                        pullRequest.setConsumerGroup(consumerGroup);
                        pullRequest.setNextOffset(nextOffset);
                        pullRequest.setMessageQueue(mq);
                        pullRequest.setProcessQueue(pq);
                        pullRequestList.add(pullRequest);
                        changed = true;
    				}
    			}
    		}
    	}
    	// 分发拉取请求
    	this.dispatchPullRequest(pullRequestList);
    }
    



获取订阅主题消费组内消费者

  • 通过注册中心namesrv获取主题的路由信息(包含主题对应broker集合),随机返回其中一个broker
  • 向broker请求消费组的消费者信息
// MQClientInstance
public List<String> findConsumerIdList(final String topic, final String group) {
	// 获取主题相关的broker(问题点:返回的是内网地址,在外面访问不了)
	String brokerAddr = this.findBrokerAddrByTopic(topic);
	if (null != brokerAddr) {
		// 向broker发送信息,获取消费组的消费者信息
		return this.mQClientAPIImpl.getConsumerIdListByGroup(brokerAddr, group, 3000);
	}
}
  1. 获取topic的broker信息

    public String findBrokerAddrByTopic(final String topic) {
    	// 获取topic的路由信息(topicRouteTable在哪里初始化? -->关注下方主题路由信息的介绍)
    	TopicRouteData topicRouteData = this.topicRouteTable.get(topic);
    	if (topicRouteData != null) {
    		// 获取主题的broker列表
    		List<BrokerData> brokers = topicRouteData.getBrokerDatas();
    		if (!brokers.isEmpty()) {
    			 // 随机返回一个broker
    			 int index = random.nextInt(brokers.size());
                 BrokerData bd = brokers.get(index % brokers.size());
                 return bd.selectBrokerAddr();
    		}
    	}
    }
    



主题路由信息

  1. 在客户端实例MQClientInstance中有个定时任务,默认每30秒更新路由信息(通过pollNameServerInterval可以配置)

    // MQClientInstance
    public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
            DefaultMQProducer defaultMQProducer) {
    	if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
    		// 从注册中心namesrv获取主题的路由信息
    		TopicRouteData topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
    		if (topicRouteData != null) {
    			// 将topic的路由信息缓存到本地
    			this.topicRouteTable.put(topic, cloneTopicRouteData);
    		}
    	}
    }
    



版权声明:本文为weixin_40803011原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。