1.rmq-driverTrace.xml
<?xml version="1.0" encoding="utf-8"?>
<config>
<value>{"namesrvAddr":"prod-rq05-a.mq.01zhuanche.com:9876;prod-rq05-b.mq.01zhuanche.com:9876","consumerGroup":"driver_queue_driver_trace", "instanceName":"driverQueueDriverTrace", "topic":"driver_routes","consumeFromWhere":0,"messageModel":"CLUSTERING", "consumeThreadMin":12, "consumeThreadMax":20, "consumeMessageBatchMaxSize":50,"subExpression":"t_n"}</value>
</config>
2.DriverQueueDriverTraceConsumer
@Component
@ConfigDefinition(configKey = "rmq-driverTrace")
public class DriverQueueDriverTraceConsumer extends BaseRocketMqConsumer {
@Autowired
private IAreaApiService areaApiService;
@Autowired
private IDriverService driverService;
@Override
public boolean handleMessage(String message, String msgId) throws Exception {
if (!EnvironmentUtil.isProd()) {
logger.info("司机排队轨迹上报:[{}]", message);
}
//1.参数校验
DriverTraceNewBO driverTraceBO = JSON.parseObject(message, DriverTraceNewBO.class);
if (driverTraceBO == null) {
logger.warn("司机运力数据为空");
return true;
}
Integer cityId = driverTraceBO.getCityId();
if (null == cityId) {
logger.warn("cityId为空msgId=[{}]", msgId);
return true;
}
String cityIds = Dicts.getString(DictKeyConstant.DRIVER_QUEUE_CITY);
if (StringUtils.isEmpty(cityIds)) {
logger.warn("城市配置为空,msgId=[{}]", msgId);
return true;
}
List<String> cityIdList = Splitter.on(",").trimResults().omitEmptyStrings().splitToList(cityIds);
if (!CommonUtils.contain(cityIdList, String.valueOf(cityId), CommonConstant.ALL_CITY_FLAG)) {
if (!EnvironmentUtil.isProd()) {
logger.warn("城市不支持司机排队");
}
return true;
}
if (!DriverMqStatusEnum.SHANGBAN_DAIFUWU.getCode().equals(driverTraceBO.getType())) {
if (!EnvironmentUtil.isProd()) {
logger.info("只处理在线上班待服务的司机。");
}
return true;
}
//2.围栏检验
String businessId = Dicts.getString("driver_queue_business_id");
RequestAreaDTO areaDto = new RequestAreaDTO();
areaDto.setCityId(cityId);
areaDto.setX(driverTraceBO.getLongitude());
areaDto.setY(driverTraceBO.getLatitude());
List<RequestAreaDTO> areaDTOList = new ArrayList<RequestAreaDTO>();
areaDTOList.add(areaDto);
List<AreaBO> areas = areaApiService.getArea(businessId, areaDTOList);
if (areas != null && areas.size() > 1) {
logger.warn("同一轨迹点存在多个匹配围栏:[{}]", areas);
}
//3.轨迹处理
if (areas != null && areas.size() > 0) {
AreaBO area = areas.get(0);
String areaDriverListKey = BrainCacheKey.AREA_DRIVER_QUEUE_LIST.toKeys(area.getAreaId());
List<Integer> driverIdList = CacheManager.lrange(areaDriverListKey, 0, -1, Integer.class);
DriverAreaInfo info = new DriverAreaInfo();
info.setAreaId(area.getAreaId().toString());
info.setAreaName(area.getAreaName());
info.setDriverId(driverTraceBO.getDriverId());
SimpleDateFormat format = new SimpleDateFormat("yyyyMMddHHmmss");
String inQueueDate = format.format(new Date());
info.setEnQueueDate(inQueueDate);
if (driverIdList != null && driverIdList.contains(driverTraceBO.getDriverId())) {
resetDriverAreaInfo(info);
} else {
driverEnQueue(info);
}
} else {
removeDriverFromQueue(driverTraceBO);
}
return true;
}
3.BaseRocketMqConsumer
public abstract class BaseRocketMqConsumer implements MessageListenerConcurrently {
protected Logger logger = LoggerFactory.getLogger(getClass());
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msgExt : msgs) {
String msgId = msgExt.getMsgId();
MDC.put(TraceConstant.TRACE_KEY, UUID.randomUUID().toString().replace("-", ""));
String msgBody = new String(msgExt.getBody());
logger.debug("msgs-size={}, msgId={}, msgBody={}", msgs.size(), msgId, msgBody);
try {
handleMessage(msgBody, msgId);
} catch (Exception e) {
logger.error("body:{}", msgBody, e);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
} finally {
MDC.remove(TraceConstant.TRACE_KEY);
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
/**
* 处理类
*
* @param message
* @param msgId
* @return
* @throws Exception
*/
public abstract boolean handleMessage(String message, String msgId) throws Exception;
}
4. run.sh
java -javaagent:/u01/pinpoint-agent/pinpoint-bootstrap-1.8.4.jar -Dpinpoint.agentId=${pinpoint_agentId} -Dpinpoint.applicationName=${pinpoint_applicationName} -Drocketmq.client.logRoot=/u01/queue-consumer/log/ -Drocketmq.client.logLevel=ERROR ${JAVA_OPTS} -XX:+PrintCommandLineFlags -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCApplicationConcurrentTime -XX:+PrintGCApplicationStoppedTime -XX:+PrintHeapAtGC -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=500m -Xloggc:/u01/queue-consumer/log/gc.log -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/u01/queue-consumer/log/queue-consumer.dump -jar queue-consumer-0.0.1-SNAPSHOT.jar arrangeOrderBindConsumer,arrangeOrderEndConsumer,driverTraceConsumer,flightArrivalConsumer,initOrderCancelConsumer,orderCancelConsumer,orderConsumer,robNoticeConsumer,driverQueueDriverTraceConsumer,orderStartServiceConsumer,flightDelayConsumer,flightTakeOffConsumer,lockDriverConsumer,driverControlConsumer,interCityDriverTraceConsumer --server-lists ${zk_addr} --namespace consumer-${START_ENV} --base-sleep-time-milliseconds 1000 --max-sleep-time-milliseconds 3000 --max-retries 3 --spring.profiles.active=${START_ENV}
5.DispatcherDriverTraceApplication
@SpringBootApplication(exclude = {MongoAutoConfiguration.class, MongoDataAutoConfiguration.class})
public class DispatcherDriverTraceApplication {
public static void main(String[] args) {
ApplicationContext context = SpringApplication.run(DispatcherDriverTraceApplication.class, args);
RocketmqConsumerStartup.boot(args,context::getBean,false);
}
}
6.RocketmqConsumerStartup
public class RocketmqConsumerStartup {
private static final Logger logger = LoggerFactory.getLogger(RocketmqConsumerStartup.class);
private static final String ROOT = "/sq_consumer";
private static final String HOST_FMT = ROOT + "/%s";
private static final String MSG_FMT = "娑堣垂鑰呭畷鏈猴紝濡傛灉鏈嶅姟鍦ㄩ噸鍚垯蹇界暐锛宨p:{%s},杩涚▼鍐呰繍琛岀殑鏈嶅姟:{%s}";
public static void main(String[] args) {
boot(args);
}
public static void boot(String[] args) {
boot(args, BeanFactory::getBean, true);
}
public static void boot(String[] args, BiFunction<String, Class<MessageListener>, MessageListener> beanFactory, boolean initConfig) {
if (args == null || args.length == 0) {
throw new RuntimeException("please input consumerName");
}
if (beanFactory == null) {
throw new RuntimeException("bean factory cannot be null");
}
//鍒濆鍖栭厤缃鐞嗙郴缁�
try {
if (initConfig) {
ConfigManagerInit.initManagerFromClasspath();
}
} catch (IOException e) {
logger.error("init config manager fail", e);
System.exit(-1);
}
String input = args[0];
String[] beanNames = input.split(",");
for (String beanName : beanNames) {
MessageListener listener = beanFactory.apply(beanName, MessageListener.class);
if (listener == null) {
logger.info("can not get consumer:{} from spring", beanName);
continue;
}
ConfigDefinition configDefinition = listener.getClass().getAnnotation(ConfigDefinition.class);
if (configDefinition == null) {
logger.info("consumer:{} not annotated ConfigDefinition", beanName);
continue;
}
RocketMqConsumerWrap consumer = new RocketMqConsumerWrap(configDefinition.configKey(), listener);
boolean ret = consumer.start();
logger.info("consumer:{} start {}", beanName, ret);
}
JvmManager.start();
if (!Boolean.valueOf(System.getProperty("disable.probe.alive", "false"))) {
try {
CoordinatorRegistryCenter registryCenter = configZookeeper(args);
//娣诲姞瀹曟満鐩戞帶
addHostDownMonitor(registryCenter, beanNames, HOST_FMT, ROOT, MSG_FMT);
} catch (ParseException e) {
logger.error("RocketmqConsumerStartup", e);
}
}
}
}
7.RocketMqConsumerWrap
public class RocketMqConsumerWrap {
private Logger logger = LoggerFactory.getLogger(RocketMqConsumerWrap.class);
private static final AtomicInteger CONSUME_SEQ = new AtomicInteger();
private final String configKey;
private final BaseNodeResource<DefaultMQPushConsumer> consumerNode;
private final MessageListener messageListener;
private AtomicBoolean init = new AtomicBoolean(false);
private static volatile ScheduledExecutorService scheduledExecutorService;
public RocketMqConsumerWrap(String configKey, MessageListener messageListener) {
Preconditions.checkNotNull(configKey);
Preconditions.checkNotNull(messageListener);
this.configKey = configKey;
this.messageListener = messageListener;
this.consumerNode = BaseNodeResource.<DefaultMQPushConsumer>newBuilder()
.withKey(configKey)
.withFactory(this::initConsumer)
.withCleanupConsumer(this::cleanup)
.withWaitStopPeriod(5000) //鍏抽棴鑰佺殑consumer鍓嶏紝鍏堢瓑5000ms锛岀瓑寰呮棫鏁版嵁娑堣垂瀹屻��
.addFactoryFailedListener(this::factoryFailListenner)
.build();
}
private DefaultMQPushConsumer initConsumer(String content) {
try {
RocketMqConsumerConfig config = RocketMqConsumerConfig.from(content);
TwoTuple<ConsumeFromWhere, MessageModel> param = checkParam(config);
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(config.getConsumerGroup());
consumer.setNamesrvAddr(config.getNamesrvAddr());
int seq = CONSUME_SEQ.incrementAndGet() & 0x7fffffff;
consumer.setInstanceName(config.getInstanceName() + seq);
consumer.setConsumeFromWhere(param.first);
if (param.first == ConsumeFromWhere.CONSUME_FROM_TIMESTAMP
&& !Strings.isNullOrEmpty(config.getConsumeTimestamp())) {
consumer.setConsumeTimestamp(config.getConsumeTimestamp());
}
consumer.setMessageModel(param.second);
consumer.subscribe(config.getTopic(), config.getSubExpression());
if (config.getConsumeMessageBatchMaxSize() > 0) {
consumer.setConsumeMessageBatchMaxSize(config.getConsumeMessageBatchMaxSize());
}
if (config.getConsumeThreadMax() > 0) {
consumer.setConsumeThreadMax(config.getConsumeThreadMax());
}
if (config.getConsumeThreadMin() > 0) {
consumer.setConsumeThreadMin(config.getConsumeThreadMin());
}
if (config.getConsumeTimeout() > 0) {
consumer.setConsumeTimeout(config.getConsumeTimeout());
}
if (messageListener instanceof MessageListenerConcurrently) {
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
ConsumeConcurrentlyStatus status = null;
boolean succ = true;
String perfStatus = "success";
long startNanoTime = nanoTime();
try {
status = ((MessageListenerConcurrently) (RocketMqConsumerWrap.this.messageListener)).consumeMessage(msgs, context);
perfStatus = status.name().toLowerCase();
} catch (Throwable e) {
succ = false;
perfStatus = e.getClass().getSimpleName();
logger.error("consume message fail", e);
throw new RuntimeException(e);
} finally {
if (!succ) {
long micros = NANOSECONDS.toMicros(nanoTime() - startNanoTime);
perf("rocketmq.consume", config.getTopic(), config.getConsumerGroup(), perfStatus)
.micros(micros)
.logstashOnly();
}
}
return status;
});
} else if (messageListener instanceof MessageListenerOrderly) {
consumer.registerMessageListener((MessageListenerOrderly) (msgs, context) -> {
ConsumeOrderlyStatus status = null;
boolean succ = true;
String perfStatus = "success";
long startNanoTime = nanoTime();
try {
status = ((MessageListenerOrderly) (RocketMqConsumerWrap.this.messageListener)).consumeMessage(msgs, context);
perfStatus = status.name().toLowerCase();
} catch (Throwable e) {
succ = false;
perfStatus = e.getClass().getSimpleName();
logger.error("consume message fail", e);
throw new RuntimeException(e);
} finally {
if (!succ) {
long micros = NANOSECONDS.toMicros(nanoTime() - startNanoTime);
perf("rocketmq.consume", config.getTopic(), config.getConsumerGroup(), perfStatus)
.micros(micros)
.logstashOnly();
}
}
return status;
});
}
consumer.start();
return consumer;
} catch (MQClientException e) {
throw new RuntimeException(e);
}
}
public boolean start() {
DefaultMQPushConsumer consumer = this.consumerNode.get();
boolean success = consumer != null;
if (success) {
if (scheduledExecutorService == null) {
synchronized (ConsumeOffsetDelayReporter.class) {
if (scheduledExecutorService == null) {
scheduledExecutorService = makeScheduleExecutor();
}
}
}
if (init.compareAndSet(false, true)) {
scheduledExecutorService.scheduleAtFixedRate(new ConsumeOffsetDelayReporter(this), 15, 15, TimeUnit.SECONDS);
}
}
return success;
}
private void factoryFailListenner(String config, Throwable t) {
ConfigUtils.perfConfigFactoryFail(configKey, config, t);
}
private TwoTuple<ConsumeFromWhere, MessageModel> checkParam(RocketMqConsumerConfig config) {
if (Strings.isNullOrEmpty(config.getConsumerGroup())) {
throw new RocketMqConfigException("consumer group is empty");
}
if (Strings.isNullOrEmpty(config.getNamesrvAddr())) {
throw new RocketMqConfigException("namesrvAddr is empty");
}
if (Strings.isNullOrEmpty(config.getInstanceName())) {
throw new RocketMqConfigException("instanceName is empty");
}
if (Strings.isNullOrEmpty(config.getTopic())) {
throw new RocketMqConfigException("topic is empty");
}
if (Strings.isNullOrEmpty(config.getMessageModel())) {
throw new RocketMqConfigException("messageModel is empty");
}
ConsumeFromWhere consumeFromWhere;
int consumeWhere = config.getConsumeFromWhere();
if (consumeWhere == ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET.ordinal()) {
consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET;
} else if (consumeWhere == ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET.ordinal()) {
consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET;
} else if (consumeWhere == ConsumeFromWhere.CONSUME_FROM_TIMESTAMP.ordinal()) {
consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_TIMESTAMP;
} else {
throw new RocketMqConfigException("consumeFromWhere only can set 0, 4, 5");
}
MessageModel messageModel;
String messageModelStr = config.getMessageModel();
if (MessageModel.BROADCASTING.getModeCN().equals(messageModelStr)) {
messageModel = MessageModel.BROADCASTING;
} else if (MessageModel.CLUSTERING.getModeCN().equals(messageModelStr)) {
messageModel = MessageModel.CLUSTERING;
} else {
throw new RocketMqConfigException("messageModel only can set BROADCASTING, CLUSTERING");
}
if (config.getConsumeThreadMax() < config.getConsumeThreadMin()) {
throw new RocketMqConfigException("consumeThreadMax can not samller than consumeThreadMin");
}
if (Strings.isNullOrEmpty(config.getSubExpression())) {
config.setSubExpression("*");
}
return new TwoTuple<>(consumeFromWhere, messageModel);
}
private void cleanup(DefaultMQPushConsumer old) {
logger.info("Cleanup old RocketMqConsumer {}", old.toString());
old.shutdown();
}
public BaseNodeResource<DefaultMQPushConsumer> getConsumerNode() {
return consumerNode;
}
private static ScheduledExecutorService makeScheduleExecutor() {
return newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat("sq-rocket-mq-delay-reporter-%d")
.setDaemon(true)
.build());
}
}
8.ConsumeOffsetDelayReporter
RocketMqConsumerWrap的start方法另起一线程,用来监控consumer队列的延迟情况
@Slf4j
@RequiredArgsConstructor
public class ConsumeOffsetDelayReporter implements Runnable {
private final RocketMqConsumerWrap wrap;
private volatile String lastNamesrvAddr;
private volatile DefaultMQAdminExt mqAdminExt;
@Override
public void run() {
DefaultMQPushConsumer consumer = wrap.getConsumerNode().get();
if (consumer == null) {
return;
}
String consumeGroup = consumer.getConsumerGroup();
String namesrvAddr = consumer.getNamesrvAddr();
Set<String> topics = consumer.getDefaultMQPushConsumerImpl().getSubscriptionInner().keySet();
if (Strings.isNullOrEmpty(consumeGroup)
|| Strings.isNullOrEmpty(namesrvAddr)
|| topics.isEmpty()) {
log.info("娑堣垂缁勩�佸悕瀛楁湇鍔″湴鍧�鎴栬�呰闃呯殑topic涓虹┖");
return;
}
String topic = topics.stream().findFirst().get();
if (lastNamesrvAddr == null || !lastNamesrvAddr.equals(namesrvAddr)) {
if (this.mqAdminExt != null) {
try {
this.mqAdminExt.shutdown();
} catch (Throwable e) {
log.warn("shutdown mq admin fail", e);
}
}
try {
this.lastNamesrvAddr = namesrvAddr;
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt();
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
defaultMQAdminExt.setNamesrvAddr(namesrvAddr);
defaultMQAdminExt.start();
this.mqAdminExt = defaultMQAdminExt;
} catch (Throwable e) {
log.warn("start mq admin fail", e);
return;
}
}
long delay = 0L;
try {
//鍙幏鍙栨湰瀹炰緥鐨刴essageQueues
Set<MessageQueue> messageQueues = consumer.fetchSubscribeMessageQueues(topic);
if (messageQueues == null || messageQueues.isEmpty()) {
log.info("consumer group:{}, topic:{} 杩樻病鏈夊垎閰峬essageQueue", consumeGroup, topic);
return;
}
//褰撴秷璐圭鏈秷璐规椂锛屾鏂规硶浼氭姤閿�
ConsumeStats consumeStats = mqAdminExt.examineConsumeStats(consumeGroup, topic);
//閬嶅巻鎵�鏈夌殑闃熷垪锛岃绠楀爢绉噺
for (MessageQueue mq : messageQueues) {
//鍙绠梘roup涓嬫鐢熶骇绔彂閫佸搴旂殑Topic
if (topic.equals(mq.getTopic())) {
OffsetWrapper offsetWrapper = consumeStats.getOffsetTable().get(mq);
long diff = offsetWrapper.getBrokerOffset() - offsetWrapper.getConsumerOffset();
delay += diff;
}
}
} catch (Throwable e) {
log.warn("鑾峰彇娑堣垂寤舵椂澶辫触 topic", e);
return;
}
PerfUtils.perf("rocketmq.delay", topic, consumeGroup, this.lastNamesrvAddr)
.micros(delay)
.unsafeLogstashNow();
}
}
9.BaseNodeResource等基础类可以参考《redis封装和监控》,这里就不再copy啦。
版权声明:本文为qian_348840260原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。