rocketmq consumer的封装和监控

  • Post author:
  • Post category:其他


1.rmq-driverTrace.xml

<?xml version="1.0" encoding="utf-8"?>

<config> 
  <value>{"namesrvAddr":"prod-rq05-a.mq.01zhuanche.com:9876;prod-rq05-b.mq.01zhuanche.com:9876","consumerGroup":"driver_queue_driver_trace", "instanceName":"driverQueueDriverTrace", "topic":"driver_routes","consumeFromWhere":0,"messageModel":"CLUSTERING", "consumeThreadMin":12, "consumeThreadMax":20, "consumeMessageBatchMaxSize":50,"subExpression":"t_n"}</value> 
</config>

2.DriverQueueDriverTraceConsumer

@Component
@ConfigDefinition(configKey = "rmq-driverTrace")
public class DriverQueueDriverTraceConsumer extends BaseRocketMqConsumer {


    @Autowired
    private IAreaApiService areaApiService;

    @Autowired
    private IDriverService driverService;

    @Override
    public boolean handleMessage(String message, String msgId) throws Exception {
        if (!EnvironmentUtil.isProd()) {
            logger.info("司机排队轨迹上报:[{}]", message);
        }

        //1.参数校验
        DriverTraceNewBO driverTraceBO = JSON.parseObject(message, DriverTraceNewBO.class);
        if (driverTraceBO == null) {
            logger.warn("司机运力数据为空");
            return true;
        }

        Integer cityId = driverTraceBO.getCityId();
        if (null == cityId) {
            logger.warn("cityId为空msgId=[{}]", msgId);
            return true;
        }
        String cityIds = Dicts.getString(DictKeyConstant.DRIVER_QUEUE_CITY);
        if (StringUtils.isEmpty(cityIds)) {
            logger.warn("城市配置为空,msgId=[{}]", msgId);
            return true;
        }
        List<String> cityIdList = Splitter.on(",").trimResults().omitEmptyStrings().splitToList(cityIds);
        if (!CommonUtils.contain(cityIdList, String.valueOf(cityId), CommonConstant.ALL_CITY_FLAG)) {
            if (!EnvironmentUtil.isProd()) {
                logger.warn("城市不支持司机排队");
            }
            return true;
        }
        if (!DriverMqStatusEnum.SHANGBAN_DAIFUWU.getCode().equals(driverTraceBO.getType())) {
            if (!EnvironmentUtil.isProd()) {
                logger.info("只处理在线上班待服务的司机。");
            }
            return true;
        }
        //2.围栏检验
        String businessId = Dicts.getString("driver_queue_business_id");
        RequestAreaDTO areaDto = new RequestAreaDTO();
        areaDto.setCityId(cityId);
        areaDto.setX(driverTraceBO.getLongitude());
        areaDto.setY(driverTraceBO.getLatitude());
        List<RequestAreaDTO> areaDTOList = new ArrayList<RequestAreaDTO>();
        areaDTOList.add(areaDto);
        List<AreaBO> areas = areaApiService.getArea(businessId, areaDTOList);
        if (areas != null && areas.size() > 1) {
            logger.warn("同一轨迹点存在多个匹配围栏:[{}]", areas);
        }

        //3.轨迹处理
        if (areas != null && areas.size() > 0) {
            AreaBO area = areas.get(0);
            String areaDriverListKey = BrainCacheKey.AREA_DRIVER_QUEUE_LIST.toKeys(area.getAreaId());
            List<Integer> driverIdList = CacheManager.lrange(areaDriverListKey, 0, -1, Integer.class);
            DriverAreaInfo info = new DriverAreaInfo();
            info.setAreaId(area.getAreaId().toString());
            info.setAreaName(area.getAreaName());
            info.setDriverId(driverTraceBO.getDriverId());
            SimpleDateFormat format = new SimpleDateFormat("yyyyMMddHHmmss");
            String inQueueDate = format.format(new Date());
            info.setEnQueueDate(inQueueDate);
            if (driverIdList != null && driverIdList.contains(driverTraceBO.getDriverId())) {
                resetDriverAreaInfo(info);
            } else {
                driverEnQueue(info);
            }
        } else {
            removeDriverFromQueue(driverTraceBO);
        }
        return true;
    }

3.BaseRocketMqConsumer

public abstract class BaseRocketMqConsumer implements MessageListenerConcurrently {

    protected Logger logger = LoggerFactory.getLogger(getClass());

    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        for (MessageExt msgExt : msgs) {
            String msgId = msgExt.getMsgId();
            MDC.put(TraceConstant.TRACE_KEY, UUID.randomUUID().toString().replace("-", ""));
            String msgBody = new String(msgExt.getBody());
            logger.debug("msgs-size={}, msgId={}, msgBody={}", msgs.size(), msgId, msgBody);
            try {
                handleMessage(msgBody, msgId);
            } catch (Exception e) {
                logger.error("body:{}", msgBody, e);
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            } finally {
                MDC.remove(TraceConstant.TRACE_KEY);
            }
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }

    /**
     * 处理类
     *
     * @param message
     * @param msgId
     * @return
     * @throws Exception
     */
    public abstract boolean handleMessage(String message, String msgId) throws Exception;
}

4. run.sh

java -javaagent:/u01/pinpoint-agent/pinpoint-bootstrap-1.8.4.jar -Dpinpoint.agentId=${pinpoint_agentId} -Dpinpoint.applicationName=${pinpoint_applicationName} -Drocketmq.client.logRoot=/u01/queue-consumer/log/  -Drocketmq.client.logLevel=ERROR ${JAVA_OPTS} -XX:+PrintCommandLineFlags -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCApplicationConcurrentTime -XX:+PrintGCApplicationStoppedTime -XX:+PrintHeapAtGC -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=500m -Xloggc:/u01/queue-consumer/log/gc.log -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/u01/queue-consumer/log/queue-consumer.dump -jar queue-consumer-0.0.1-SNAPSHOT.jar arrangeOrderBindConsumer,arrangeOrderEndConsumer,driverTraceConsumer,flightArrivalConsumer,initOrderCancelConsumer,orderCancelConsumer,orderConsumer,robNoticeConsumer,driverQueueDriverTraceConsumer,orderStartServiceConsumer,flightDelayConsumer,flightTakeOffConsumer,lockDriverConsumer,driverControlConsumer,interCityDriverTraceConsumer --server-lists ${zk_addr} --namespace consumer-${START_ENV} --base-sleep-time-milliseconds 1000 --max-sleep-time-milliseconds 3000 --max-retries 3 --spring.profiles.active=${START_ENV}

5.DispatcherDriverTraceApplication

@SpringBootApplication(exclude = {MongoAutoConfiguration.class, MongoDataAutoConfiguration.class})
public class DispatcherDriverTraceApplication {

	public static void main(String[] args) {
		ApplicationContext context = SpringApplication.run(DispatcherDriverTraceApplication.class, args);
		RocketmqConsumerStartup.boot(args,context::getBean,false);
	}
}

6.RocketmqConsumerStartup

public class RocketmqConsumerStartup {

    private static final Logger logger = LoggerFactory.getLogger(RocketmqConsumerStartup.class);

    private static final String ROOT = "/sq_consumer";

    private static final String HOST_FMT = ROOT + "/%s";

    private static final String MSG_FMT = "娑堣垂鑰呭畷鏈猴紝濡傛灉鏈嶅姟鍦ㄩ噸鍚垯蹇界暐锛宨p:{%s},杩涚▼鍐呰繍琛岀殑鏈嶅姟:{%s}";

    public static void main(String[] args) {
        boot(args);
    }

    public static void boot(String[] args) {
        boot(args, BeanFactory::getBean, true);
    }

    public static void boot(String[] args, BiFunction<String, Class<MessageListener>, MessageListener> beanFactory, boolean initConfig) {
        if (args == null || args.length == 0) {
            throw new RuntimeException("please input consumerName");
        }
        if (beanFactory == null) {
            throw new RuntimeException("bean factory cannot be null");
        }
        //鍒濆鍖栭厤缃鐞嗙郴缁�
        try {
            if (initConfig) {
                ConfigManagerInit.initManagerFromClasspath();
            }
        } catch (IOException e) {
            logger.error("init config manager fail", e);
            System.exit(-1);
        }

        String input = args[0];
        String[] beanNames = input.split(",");
        for (String beanName : beanNames) {
            MessageListener listener = beanFactory.apply(beanName, MessageListener.class);
            if (listener == null) {
                logger.info("can not get consumer:{} from spring", beanName);
                continue;
            }
            ConfigDefinition configDefinition = listener.getClass().getAnnotation(ConfigDefinition.class);
            if (configDefinition == null) {
                logger.info("consumer:{} not annotated ConfigDefinition", beanName);
                continue;
            }
            RocketMqConsumerWrap consumer = new RocketMqConsumerWrap(configDefinition.configKey(), listener);
            boolean ret = consumer.start();
            logger.info("consumer:{} start {}", beanName, ret);
        }

        JvmManager.start();

        if (!Boolean.valueOf(System.getProperty("disable.probe.alive", "false"))) {
            try {
                CoordinatorRegistryCenter registryCenter = configZookeeper(args);
                //娣诲姞瀹曟満鐩戞帶
                addHostDownMonitor(registryCenter, beanNames, HOST_FMT, ROOT, MSG_FMT);
            } catch (ParseException e) {
                logger.error("RocketmqConsumerStartup", e);
            }
        }
    }
}

7.RocketMqConsumerWrap

public class RocketMqConsumerWrap {
    private Logger logger = LoggerFactory.getLogger(RocketMqConsumerWrap.class);

    private static final AtomicInteger CONSUME_SEQ = new AtomicInteger();

    private final String configKey;

    private final BaseNodeResource<DefaultMQPushConsumer> consumerNode;

    private final MessageListener messageListener;

    private AtomicBoolean init = new AtomicBoolean(false);

    private static volatile ScheduledExecutorService scheduledExecutorService;

    public RocketMqConsumerWrap(String configKey, MessageListener messageListener) {
        Preconditions.checkNotNull(configKey);
        Preconditions.checkNotNull(messageListener);
        this.configKey = configKey;
        this.messageListener = messageListener;
        this.consumerNode = BaseNodeResource.<DefaultMQPushConsumer>newBuilder()
                .withKey(configKey)
                .withFactory(this::initConsumer)
                .withCleanupConsumer(this::cleanup)
                .withWaitStopPeriod(5000) //鍏抽棴鑰佺殑consumer鍓嶏紝鍏堢瓑5000ms锛岀瓑寰呮棫鏁版嵁娑堣垂瀹屻��
                .addFactoryFailedListener(this::factoryFailListenner)
                .build();
    }

    private DefaultMQPushConsumer initConsumer(String content) {
        try {
            RocketMqConsumerConfig config = RocketMqConsumerConfig.from(content);
            TwoTuple<ConsumeFromWhere, MessageModel> param = checkParam(config);
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(config.getConsumerGroup());
            consumer.setNamesrvAddr(config.getNamesrvAddr());
            int seq = CONSUME_SEQ.incrementAndGet() & 0x7fffffff;
            consumer.setInstanceName(config.getInstanceName() + seq);
            consumer.setConsumeFromWhere(param.first);
            if (param.first == ConsumeFromWhere.CONSUME_FROM_TIMESTAMP
                    && !Strings.isNullOrEmpty(config.getConsumeTimestamp())) {
                consumer.setConsumeTimestamp(config.getConsumeTimestamp());
            }
            consumer.setMessageModel(param.second);
            consumer.subscribe(config.getTopic(), config.getSubExpression());
            if (config.getConsumeMessageBatchMaxSize() > 0) {
                consumer.setConsumeMessageBatchMaxSize(config.getConsumeMessageBatchMaxSize());
            }
            if (config.getConsumeThreadMax() > 0) {
                consumer.setConsumeThreadMax(config.getConsumeThreadMax());
            }

            if (config.getConsumeThreadMin() > 0) {
                consumer.setConsumeThreadMin(config.getConsumeThreadMin());
            }
            if (config.getConsumeTimeout() > 0) {
                consumer.setConsumeTimeout(config.getConsumeTimeout());
            }
            if (messageListener instanceof MessageListenerConcurrently) {
                consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
                    ConsumeConcurrentlyStatus status = null;
                    boolean succ = true;
                    String perfStatus = "success";
                    long startNanoTime = nanoTime();
                    try {
                        status = ((MessageListenerConcurrently) (RocketMqConsumerWrap.this.messageListener)).consumeMessage(msgs, context);
                        perfStatus = status.name().toLowerCase();
                    } catch (Throwable e) {
                        succ = false;
                        perfStatus = e.getClass().getSimpleName();
                        logger.error("consume message fail", e);
                        throw new RuntimeException(e);
                    } finally {
                        if (!succ) {
                            long micros = NANOSECONDS.toMicros(nanoTime() - startNanoTime);
                            perf("rocketmq.consume", config.getTopic(), config.getConsumerGroup(), perfStatus)
                                    .micros(micros)
                                    .logstashOnly();
                        }
                    }
                    return status;
                });
            } else if (messageListener instanceof MessageListenerOrderly) {
                consumer.registerMessageListener((MessageListenerOrderly) (msgs, context) -> {
                    ConsumeOrderlyStatus status = null;
                    boolean succ = true;
                    String perfStatus = "success";
                    long startNanoTime = nanoTime();
                    try {
                        status = ((MessageListenerOrderly) (RocketMqConsumerWrap.this.messageListener)).consumeMessage(msgs, context);
                        perfStatus = status.name().toLowerCase();
                    } catch (Throwable e) {
                        succ = false;
                        perfStatus = e.getClass().getSimpleName();
                        logger.error("consume message fail", e);
                        throw new RuntimeException(e);
                    } finally {
                        if (!succ) {
                            long micros = NANOSECONDS.toMicros(nanoTime() - startNanoTime);
                            perf("rocketmq.consume", config.getTopic(), config.getConsumerGroup(), perfStatus)
                                    .micros(micros)
                                    .logstashOnly();
                        }
                    }
                    return status;
                });
            }


            consumer.start();
            return consumer;
        } catch (MQClientException e) {
            throw new RuntimeException(e);
        }
    }

    public boolean start() {
        DefaultMQPushConsumer consumer = this.consumerNode.get();
        boolean success = consumer != null;
        if (success) {
            if (scheduledExecutorService == null) {
                synchronized (ConsumeOffsetDelayReporter.class) {
                    if (scheduledExecutorService == null) {
                        scheduledExecutorService = makeScheduleExecutor();
                    }
                }
            }
            if (init.compareAndSet(false, true)) {
                scheduledExecutorService.scheduleAtFixedRate(new ConsumeOffsetDelayReporter(this), 15, 15, TimeUnit.SECONDS);
            }
        }
        return success;
    }

    private void factoryFailListenner(String config, Throwable t) {
        ConfigUtils.perfConfigFactoryFail(configKey, config, t);
    }

    private TwoTuple<ConsumeFromWhere, MessageModel> checkParam(RocketMqConsumerConfig config) {
        if (Strings.isNullOrEmpty(config.getConsumerGroup())) {
            throw new RocketMqConfigException("consumer group is empty");
        }

        if (Strings.isNullOrEmpty(config.getNamesrvAddr())) {
            throw new RocketMqConfigException("namesrvAddr is empty");
        }

        if (Strings.isNullOrEmpty(config.getInstanceName())) {
            throw new RocketMqConfigException("instanceName is empty");
        }

        if (Strings.isNullOrEmpty(config.getTopic())) {
            throw new RocketMqConfigException("topic is empty");
        }

        if (Strings.isNullOrEmpty(config.getMessageModel())) {
            throw new RocketMqConfigException("messageModel is empty");
        }

        ConsumeFromWhere consumeFromWhere;
        int consumeWhere = config.getConsumeFromWhere();
        if (consumeWhere == ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET.ordinal()) {
            consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET;
        } else if (consumeWhere == ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET.ordinal()) {
            consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET;
        } else if (consumeWhere == ConsumeFromWhere.CONSUME_FROM_TIMESTAMP.ordinal()) {
            consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_TIMESTAMP;
        } else {
            throw new RocketMqConfigException("consumeFromWhere only can set 0, 4, 5");
        }

        MessageModel messageModel;
        String messageModelStr = config.getMessageModel();
        if (MessageModel.BROADCASTING.getModeCN().equals(messageModelStr)) {
            messageModel = MessageModel.BROADCASTING;
        } else if (MessageModel.CLUSTERING.getModeCN().equals(messageModelStr)) {
            messageModel = MessageModel.CLUSTERING;
        } else {
            throw new RocketMqConfigException("messageModel only can set BROADCASTING, CLUSTERING");
        }

        if (config.getConsumeThreadMax() < config.getConsumeThreadMin()) {
            throw new RocketMqConfigException("consumeThreadMax can not samller than consumeThreadMin");
        }

        if (Strings.isNullOrEmpty(config.getSubExpression())) {
            config.setSubExpression("*");
        }

        return new TwoTuple<>(consumeFromWhere, messageModel);
    }

    private void cleanup(DefaultMQPushConsumer old) {
        logger.info("Cleanup old RocketMqConsumer {}", old.toString());
        old.shutdown();
    }

    public BaseNodeResource<DefaultMQPushConsumer> getConsumerNode() {
        return consumerNode;
    }

    private static ScheduledExecutorService makeScheduleExecutor() {
        return newSingleThreadScheduledExecutor(
                new ThreadFactoryBuilder().setNameFormat("sq-rocket-mq-delay-reporter-%d")
                        .setDaemon(true)
                        .build());
    }
}

8.ConsumeOffsetDelayReporter

RocketMqConsumerWrap的start方法另起一线程,用来监控consumer队列的延迟情况

@Slf4j
@RequiredArgsConstructor
public class ConsumeOffsetDelayReporter implements Runnable {

    private final RocketMqConsumerWrap wrap;

    private volatile String lastNamesrvAddr;

    private volatile DefaultMQAdminExt mqAdminExt;

    @Override
    public void run() {
        DefaultMQPushConsumer consumer = wrap.getConsumerNode().get();
        if (consumer == null) {
            return;
        }
        String consumeGroup = consumer.getConsumerGroup();
        String namesrvAddr = consumer.getNamesrvAddr();
        Set<String> topics = consumer.getDefaultMQPushConsumerImpl().getSubscriptionInner().keySet();
        if (Strings.isNullOrEmpty(consumeGroup)
                || Strings.isNullOrEmpty(namesrvAddr)
                || topics.isEmpty()) {
            log.info("娑堣垂缁勩�佸悕瀛楁湇鍔″湴鍧�鎴栬�呰闃呯殑topic涓虹┖");
            return;
        }
        String topic = topics.stream().findFirst().get();
        if (lastNamesrvAddr == null || !lastNamesrvAddr.equals(namesrvAddr)) {
            if (this.mqAdminExt != null) {
                try {
                    this.mqAdminExt.shutdown();
                } catch (Throwable e) {
                    log.warn("shutdown mq admin fail", e);
                }
            }
            try {
                this.lastNamesrvAddr = namesrvAddr;
                DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt();
                defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
                defaultMQAdminExt.setNamesrvAddr(namesrvAddr);
                defaultMQAdminExt.start();
                this.mqAdminExt = defaultMQAdminExt;
            } catch (Throwable e) {
                log.warn("start mq admin fail", e);
                return;
            }
        }
        long delay = 0L;
        try {
            //鍙幏鍙栨湰瀹炰緥鐨刴essageQueues
            Set<MessageQueue> messageQueues = consumer.fetchSubscribeMessageQueues(topic);
            if (messageQueues == null || messageQueues.isEmpty()) {
                log.info("consumer group:{}, topic:{} 杩樻病鏈夊垎閰峬essageQueue", consumeGroup, topic);
                return;
            }
            //褰撴秷璐圭鏈秷璐规椂锛屾鏂规硶浼氭姤閿�
            ConsumeStats consumeStats = mqAdminExt.examineConsumeStats(consumeGroup, topic);
            //閬嶅巻鎵�鏈夌殑闃熷垪锛岃绠楀爢绉噺
            for (MessageQueue mq : messageQueues) {
                //鍙绠梘roup涓嬫鐢熶骇绔彂閫佸搴旂殑Topic
                if (topic.equals(mq.getTopic())) {
                    OffsetWrapper offsetWrapper = consumeStats.getOffsetTable().get(mq);
                    long diff = offsetWrapper.getBrokerOffset() - offsetWrapper.getConsumerOffset();
                    delay += diff;
                }
            }
        } catch (Throwable e) {
            log.warn("鑾峰彇娑堣垂寤舵椂澶辫触 topic", e);
            return;
        }
        PerfUtils.perf("rocketmq.delay", topic, consumeGroup, this.lastNamesrvAddr)
                .micros(delay)
                .unsafeLogstashNow();
    }


}

9.BaseNodeResource等基础类可以参考《redis封装和监控》,这里就不再copy啦。



版权声明:本文为qian_348840260原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。