Skywalking的告警模块源码解析,带你探索它的前世今生
本文基于Skywalking8.4.0,所贴代码皆添加了注释
告警模块的介绍和使用请参看
【弄nèng – Skywalking】入门篇(三)—— Skywalking告警模块使用
官方文档:
http://skywalking.apache.org/docs/main/v8.4.0/en/setup/backend/backend-alarm/
一. 告警处理类简介
告警模块中处理逻辑涉及到的类及其职责介绍
AlarmModule
org.apache.skywalking.oap.server.core.alarm.AlarmModule,继承 ModuleDefine 抽象类
报警模块定义了报警实现者的主入口,SkyWalking支持报警实现可插拔
- services():返回MetricsNotify的实现类NotifyHandler
AlarmModuleProvider
org.apache.skywalking.oap.server.core.alarm.provider.AlarmModuleProvider,实现 ModuleProvider 抽象类,Alarm模块的提供者Provider。在准备阶段ModuleDefine中调用,运行
prepare()
进行准备操作
- prepare(): 加载alarm-settings.yml
@Override
public void prepare() throws ServiceNotProvidedException, ModuleStartException {
Reader applicationReader;
try {
// 1.加载alarm-settings.yml
applicationReader = ResourceUtils.read("alarm-settings.yml");
} catch (FileNotFoundException e) {
throw new ModuleStartException("can't load alarm-settings.yml", e);
}
RulesReader reader = new RulesReader(applicationReader);
// 解析alarm-settings.yml,转换成Rules,包括List<AlarmRule> rules和List<String> webhooks;
Rules rules = reader.readRules();
// 转换成AlarmRulesWatcher, 包括构建RunningRule集合
alarmRulesWatcher = new AlarmRulesWatcher(rules, this);
notifyHandler = new NotifyHandler(alarmRulesWatcher);
// 注入持久化Callback
notifyHandler.init(new AlarmStandardPersistence());
// 为这个MetricsNotify提供程序的服务注册一个实现
this.registerServiceImplementation(MetricsNotify.class, notifyHandler);
}
- createConfigBeanIfAbsent(): 返回AlarmSettings
- name():提供这名字,default
AlarmRulesWatcher
将Rules转换成RunningRule,构建Map<String, List> runningContext,metricsName对应RunningRule集合
和Map<AlarmRule, RunningRule> alarmRuleRunningRuleMap 规则和RunningRule对应MAP
-notify(Rules newRules)
void notify(Rules newRules) {
Map<AlarmRule, RunningRule> newAlarmRuleRunningRuleMap = new HashMap<>();
Map<String, List<RunningRule>> newRunningContext = new HashMap<>();
newRules.getRules().forEach(rule -> {
/*
* If there is already an alarm rule that is the same as the new one, we'll reuse its
* corresponding runningRule, to keep its history metrics
*/
RunningRule runningRule = alarmRuleRunningRuleMap.getOrDefault(rule, new RunningRule(rule));
newAlarmRuleRunningRuleMap.put(rule, runningRule);
String metricsName = rule.getMetricsName();
List<RunningRule> runningRules = newRunningContext.computeIfAbsent(metricsName, key -> new ArrayList<>());
runningRules.add(runningRule);
});
this.rules = newRules;
this.runningContext = newRunningContext;
this.alarmRuleRunningRuleMap = newAlarmRuleRunningRuleMap;
log.info("Update alarm rules to {}", rules);
}
NotifyHandler
告警服务的实现类,处理告警逻辑
属性:
- private final AlarmCore core; 告警核心处理
- private final AlarmRulesWatcher alarmRulesWatcher;
方法
-
notify(Metrics metrics): 接收Metrics,根据scope封装MetaInAlarm信息,获取MetricsName所有的RunningRule集合,遍历执行
in()
添加到Window中
@Override
public void notify(Metrics metrics) {
WithMetadata withMetadata = (WithMetadata) metrics;
MetricsMetaInfo meta = withMetadata.getMeta();
int scope = meta.getScope();
if (!DefaultScopeDefine.inServiceCatalog(scope) && !DefaultScopeDefine.inServiceInstanceCatalog(scope)
&& !DefaultScopeDefine.inEndpointCatalog(scope) && !DefaultScopeDefine.inServiceRelationCatalog(scope)
&& !DefaultScopeDefine.inServiceInstanceRelationCatalog(scope) && !DefaultScopeDefine.inEndpointRelationCatalog(scope)) {
return;
}
MetaInAlarm metaInAlarm;
if (DefaultScopeDefine.inServiceCatalog(scope)) {
final String serviceId = meta.getId();
final IDManager.ServiceID.ServiceIDDefinition serviceIDDefinition = IDManager.ServiceID.analysisId(
serviceId);
ServiceMetaInAlarm serviceMetaInAlarm = new ServiceMetaInAlarm();
serviceMetaInAlarm.setMetricsName(meta.getMetricsName());
serviceMetaInAlarm.setId(serviceId);
serviceMetaInAlarm.setName(serviceIDDefinition.getName());
metaInAlarm = serviceMetaInAlarm;
} else if (DefaultScopeDefine.inServiceInstanceCatalog(scope)) {
final String instanceId = meta.getId();
final IDManager.ServiceInstanceID.InstanceIDDefinition instanceIDDefinition = IDManager.ServiceInstanceID.analysisId(
instanceId);
final IDManager.ServiceID.ServiceIDDefinition serviceIDDefinition = IDManager.ServiceID.analysisId(
instanceIDDefinition.getServiceId());
ServiceInstanceMetaInAlarm instanceMetaInAlarm = new ServiceInstanceMetaInAlarm();
instanceMetaInAlarm.setMetricsName(meta.getMetricsName());
instanceMetaInAlarm.setId(instanceId);
instanceMetaInAlarm.setName(instanceIDDefinition.getName() + " of " + serviceIDDefinition.getName());
metaInAlarm = instanceMetaInAlarm;
} else if (DefaultScopeDefine.inEndpointCatalog(scope)) {
final String endpointId = meta.getId();
final IDManager.EndpointID.EndpointIDDefinition endpointIDDefinition = IDManager.EndpointID.analysisId(
endpointId);
final IDManager.ServiceID.ServiceIDDefinition serviceIDDefinition = IDManager.ServiceID.analysisId(
endpointIDDefinition.getServiceId());
EndpointMetaInAlarm endpointMetaInAlarm = new EndpointMetaInAlarm();
endpointMetaInAlarm.setMetricsName(meta.getMetricsName());
endpointMetaInAlarm.setId(meta.getId());
endpointMetaInAlarm.setName(
endpointIDDefinition.getEndpointName() + " in " + serviceIDDefinition.getName());
metaInAlarm = endpointMetaInAlarm;
} else if (DefaultScopeDefine.inServiceRelationCatalog(scope)) {
final String serviceRelationId = meta.getId();
final IDManager.ServiceID.ServiceRelationDefine serviceRelationDefine = IDManager.ServiceID.analysisRelationId(
serviceRelationId);
final IDManager.ServiceID.ServiceIDDefinition sourceIdDefinition = IDManager.ServiceID.analysisId(
serviceRelationDefine.getSourceId());
final IDManager.ServiceID.ServiceIDDefinition destIdDefinition = IDManager.ServiceID.analysisId(
serviceRelationDefine.getDestId());
ServiceRelationMetaInAlarm serviceRelationMetaInAlarm = new ServiceRelationMetaInAlarm();
serviceRelationMetaInAlarm.setMetricsName(meta.getMetricsName());
serviceRelationMetaInAlarm.setId(serviceRelationId);
serviceRelationMetaInAlarm.setName(sourceIdDefinition.getName() + " to " + destIdDefinition.getName());
metaInAlarm = serviceRelationMetaInAlarm;
} else if (DefaultScopeDefine.inServiceInstanceRelationCatalog(scope)) {
final String instanceRelationId = meta.getId();
final IDManager.ServiceInstanceID.ServiceInstanceRelationDefine serviceRelationDefine = IDManager.ServiceInstanceID.analysisRelationId(
instanceRelationId);
final IDManager.ServiceInstanceID.InstanceIDDefinition sourceIdDefinition = IDManager.ServiceInstanceID.analysisId(
serviceRelationDefine.getSourceId());
final IDManager.ServiceID.ServiceIDDefinition sourceServiceId = IDManager.ServiceID.analysisId(
sourceIdDefinition.getServiceId());
final IDManager.ServiceInstanceID.InstanceIDDefinition destIdDefinition = IDManager.ServiceInstanceID.analysisId(
serviceRelationDefine.getDestId());
final IDManager.ServiceID.ServiceIDDefinition destServiceId = IDManager.ServiceID.analysisId(
destIdDefinition.getServiceId());
ServiceInstanceRelationMetaInAlarm instanceRelationMetaInAlarm = new ServiceInstanceRelationMetaInAlarm();
instanceRelationMetaInAlarm.setMetricsName(meta.getMetricsName());
instanceRelationMetaInAlarm.setId(instanceRelationId);
instanceRelationMetaInAlarm.setName(sourceIdDefinition.getName() + " of " + sourceServiceId.getName()
+ " to " + destIdDefinition.getName() + " of " + destServiceId.getName());
metaInAlarm = instanceRelationMetaInAlarm;
} else if (DefaultScopeDefine.inEndpointRelationCatalog(scope)) {
final String endpointRelationId = meta.getId();
final IDManager.EndpointID.EndpointRelationDefine endpointRelationDefine = IDManager.EndpointID.analysisRelationId(
endpointRelationId);
final IDManager.ServiceID.ServiceIDDefinition sourceService = IDManager.ServiceID.analysisId(
endpointRelationDefine.getSourceServiceId());
final IDManager.ServiceID.ServiceIDDefinition destService = IDManager.ServiceID.analysisId(
endpointRelationDefine.getDestServiceId());
EndpointRelationMetaInAlarm endpointRelationMetaInAlarm = new EndpointRelationMetaInAlarm();
endpointRelationMetaInAlarm.setMetricsName(meta.getMetricsName());
endpointRelationMetaInAlarm.setId(endpointRelationId);
endpointRelationMetaInAlarm.setName(endpointRelationDefine.getSource() + " in " + sourceService.getName()
+ " to " + endpointRelationDefine.getDest() + " in " + destService.getName());
metaInAlarm = endpointRelationMetaInAlarm;
} else {
return;
}
// MetricsName对应的RunningRule集合
List<RunningRule> runningRules = core.findRunningRule(meta.getMetricsName());
if (runningRules == null) {
return;
}
// 遍历执行,如果metrics的实体(服务,端口等)在rule的include中,则根据timeBucket添加到该runningRule的values中
runningRules.forEach(rule -> rule.in(metaInAlarm, metrics));
}
RunningRule
告警规则执行类,计算指标值
方法
- in(MetaInAlarm meta, Metrics metrics):添加metrics到window中
- moveTo(LocalDateTime targetTime):移动窗口
- check():检查条件,决定是否触发报警,告警阈值实际判断逻辑
public Optional<AlarmMessage> checkAlarm() {
if (isMatch()) {
/*
* When
* 1. Metrics value threshold triggers alarm by rule
* 2. Counter reaches the count threshold;
* 3. Isn't in silence stage, judged by SilenceCountdown(!=0).
*/
counter++;
if (counter >= countThreshold && silenceCountdown < 1) {
silenceCountdown = silencePeriod;
return Optional.of(new AlarmMessage());
} else {
silenceCountdown--;
}
} else {
silenceCountdown--;
if (counter > 0) {
counter--;
}
}
return Optional.empty();
}
Window
一个指标窗口,基于警报规则#period。这个窗口随时间滑动,只保留最近的N(period)桶
方法
- add(Metrics metrics):添加桶
- moveTo(LocalDateTime current) 移除桶
AlarmEntrance
告警服务数据入口,判断有误初始化alarm模块,找到NotifyHandler,传送数据
- forward(Metrics metrics): 转发数据到NotifyHandler
AlarmNotifyWorker
报警通知Worker,接受Metrics,做一个简单的路由到报警核心。在MetricsAggregateWorker进行L1聚合之后执行告警工作。
- in(Metrics metrics): 转发数据给NotifyHandler处理(根据SCOPE封装对象) –》 遍历所有本scope的rule, rule列表通过解析alarm-setting,yaml得到–》 rule.in 判断metricsNAME是否相同,添加到windows里
AlarmCallback
回调,包括webhook,持久化等等
allCallbacks.add(new WebhookCallback(alarmRulesWatcher));
allCallbacks.add(new GRPCCallback(alarmRulesWatcher));
allCallbacks.add(new SlackhookCallback(alarmRulesWatcher));
allCallbacks.add(new WechatHookCallback(alarmRulesWatcher));
allCallbacks.add(new DingtalkHookCallback(alarmRulesWatcher));
allCallbacks.add(new FeishuHookCallback(alarmRulesWatcher));
AlarmCore
告警核心处理,根据告警设置,包括某些时间窗口的指标值。通过使用它的内部定时器
*触发器和报警规则来决定是否发送报警到数据库和webhook(s)
- start(List allCallbacks):在NotifyHandler初始化时调用,启动告警定时任务10s执行一次
public void start(List<AlarmCallback> allCallbacks) {
LocalDateTime now = LocalDateTime.now();
lastExecuteTime = now;
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> {
try {
final List<AlarmMessage> alarmMessageList = new ArrayList<>(30);
LocalDateTime checkTime = LocalDateTime.now();
int minutes = Minutes.minutesBetween(lastExecuteTime, checkTime).getMinutes();
boolean[] hasExecute = new boolean[]{false};
// 遍历所有窗口数据
alarmRulesWatcher.getRunningContext().values().forEach(ruleList -> ruleList.forEach(runningRule -> {
// 如果与上次间隔大于1分钟,则执行窗口里的runningRule
if (minutes > 0) {
// 移动窗口
runningRule.moveTo(checkTime);
/*
* Don't run in the first quarter per min, avoid to trigger false alarm.
*/
// 秒刻度大于15
if (checkTime.getSecondOfMinute() > 15) {
hasExecute[0] = true;
// runningRule.check()执行告警计算逻辑,产生告警信息
alarmMessageList.addAll(runningRule.check());
}
}
}));
// Set the last execute time, and make sure the second is `00`, such as: 18:30:00
if (hasExecute[0]) {
lastExecuteTime = checkTime.minusSeconds(checkTime.getSecondOfMinute());
}
// 如果存在告警,则进行符合告警判断
if (alarmMessageList.size() > 0) {
if (alarmRulesWatcher.getCompositeRules().size() > 0) {
// 复合告警处理逻辑
List<AlarmMessage> messages = alarmRulesWatcher.getCompositeRuleEvaluator().evaluate(alarmRulesWatcher.getCompositeRules(), alarmMessageList);
alarmMessageList.addAll(messages);
}
// 回调处理告警信息
List<AlarmMessage> filteredMessages = alarmMessageList.stream().filter(msg -> !msg.isOnlyAsCondition()).collect(Collectors.toList());
allCallbacks.forEach(callback -> callback.doAlarm(filteredMessages));
}
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
}
}, 10, 10, TimeUnit.SECONDS);
}
- runningRule.moveTo(checkTime) 移动后窗口,切记period不要设置成1,因为最后一个bucket最多保留20s的数据
告警处理逻辑
以上是告警处理过程中涉及到类,整体的告警处理逻辑如下:
二. 告警规则动态配置
告警模块中规则动态读取涉及到的类及其职责介绍
告警模块在准备阶段会初始化告警规则,AlarmModuleProvider加载alarm-setting.yml
当我们使用动态配置alarm-setting.yml的时候,处理流程,涉及到的类及其职责介绍如下:
更多关于skywalking动态配置请看
官方文档–动态配置
我们以grpc方式动态配置进行讲解,其他原理相同,只是接收数据方式不同
ConfigWatcherRegister
配置监听默认实现者,实现 DynamicConfigurationService
方法
- start():启动定时任务读取配置
- configSync(): 同步配置
void configSync() {
// 主动拉取配置,readConfig根据不同的数据源使用不同的实现类
Optional<ConfigTable> configTable = readConfig(register.keys());
// Config table would be null if no change detected from the implementation.
configTable.ifPresent(config -> {
config.getItems().forEach(item -> {
// alarm.default.alarm-settings
String itemName = item.getName();
WatcherHolder holder = register.get(itemName);
if (holder != null) {
ConfigChangeWatcher watcher = holder.getWatcher();
String newItemValue = item.getValue();
if (newItemValue == null) {
if (watcher.value() != null) {
// Notify watcher, the new value is null with delete event type.
watcher.notify(
new ConfigChangeWatcher.ConfigChangeEvent(null, ConfigChangeWatcher.EventType.DELETE));
} else {
// Don't need to notify, stay in null.
}
} else {
if (!newItemValue.equals(watcher.value())) {
// 新值与旧值不同则更新配置,也就是调用AlarmRulesWatcher.notify() 重新获取RunningRule集合
watcher.notify(new ConfigChangeWatcher.ConfigChangeEvent(
newItemValue,
ConfigChangeWatcher.EventType.MODIFY
));
} else {
// Don't need to notify, stay in the same config value.
}
}
} else {
LOGGER.warn("Config {} from configuration center, doesn't match any watcher, ignore.", itemName);
}
});
LOGGER.trace("Current configurations after the sync." + LINE_SEPARATOR + register.toString());
});
}
-
readConfig() 读取配置,根据不同的数据源使用不同的实现类
动态告警配置逻辑
三. 告警数据来源
前面介绍了告警处理和告警规则动态配置,告警处理的NotifyHandler.notify(Metrics metrics): 接收Metrics进行告警处理。但是谁把告警Metrics传递过来呢,接下来我们继续探究万恶之源
BootstrapFlow
流程启动类
- start(): 加载启动执行的ModuleProvider,为其注入service和调用provider.start()。其中AnalyzerModuleProvider就是分析器提供者
AnalyzerModuleProvider
分析器提供者,加载oal/core.oal 解析成 OALDefine
- start():
@Override
public void start() throws ModuleStartException {
// load official analysis
// 加载oal
getManager().find(CoreModule.NAME)
.provider()
.getService(OALEngineLoaderService.class)
.load(CoreOALDefine.INSTANCE);
DynamicConfigurationService dynamicConfigurationService = getManager().find(ConfigurationModule.NAME)
.provider()
.getService(
DynamicConfigurationService.class);
dynamicConfigurationService.registerConfigChangeWatcher(thresholds);
dynamicConfigurationService.registerConfigChangeWatcher(uninstrumentedGatewaysConfig);
dynamicConfigurationService.registerConfigChangeWatcher(traceSampleRateWatcher);
segmentParserService.setListenerManager(listenerManager());
processService.start(meterConfigs);
}
OALEngineLoaderService
aol加载处理类
-load(): 解析OALDefine
public void load(OALDefine define) throws ModuleStartException {
if (oalDefineSet.contains(define)) {
// each oal define will only be activated once
return;
}
try {
OALEngine engine = loadOALEngine(define);
StreamAnnotationListener streamAnnotationListener = new StreamAnnotationListener(moduleManager);
// 设置@stream注解监听器
engine.setStreamListener(streamAnnotationListener);
// 设置Dispatcher监听器
engine.setDispatcherListener(moduleManager.find(CoreModule.NAME)
.provider()
.getService(SourceReceiver.class)
.getDispatcherDetectorListener());
// 调用OALRuntime.start
engine.start(OALEngineLoaderService.class.getClassLoader());
// 调用OALRuntime.notifyAllListeners
engine.notifyAllListeners();
oalDefineSet.add(define);
} catch (ReflectiveOperationException | OALCompileException e) {
throw new ModuleStartException(e.getMessage(), e);
}
}
OALRuntime
OAL Runtime是类生成引擎,它从OAL脚本定义加载生成的类,这个运行时动态加载
- start(): 解析aol,生成运行时类
-
generateClassAtRuntime(): 根据aol动态生成告警指标实体类,生成metricsClasses 和 dispatcherClasses,并将其注入类加载器。根据MetricsName为类名metricsStmt.getMetricsName() + “Metrics”,并为实体类添加@Stream。
动态生成的包含一些OAL function类的metrics ,和一些doMetricsName方法,dispatcher,source。其中dispatcher有调用MetricsStreamProcessor.getInstance().in(metrics); - notifyAllListeners(): 遍历生成的metricsClasses,启动监听,监听@Stream注解(索引名,scope,处理器,把产生的dispatcherClasses添加到全局Dispatcher中 key :scope, value: List
notifyAllListeners()
@Override
public void notifyAllListeners() throws ModuleStartException {
for (Class metricsClass : metricsClasses) {
try {
streamAnnotationListener.notify(metricsClass);
} catch (StorageException e) {
throw new ModuleStartException(e.getMessage(), e);
}
}
for (Class dispatcherClass : dispatcherClasses) {
try {
dispatcherDetectorListener.addIfAsSourceDispatcher(dispatcherClass);
} catch (Exception e) {
throw new ModuleStartException(e.getMessage(), e);
}
}
}
StreamAnnotationListener
@Stream注解处理类,根据Processor类型处理,以上生成的metricsClasses注解@Sream中processor=MetricsStreamProcessor
notify()
@Override
public void notify(Class aClass) throws StorageException {
if (aClass.isAnnotationPresent(Stream.class)) {
Stream stream = (Stream) aClass.getAnnotation(Stream.class);
if (stream.processor().equals(RecordStreamProcessor.class)) {
RecordStreamProcessor.getInstance().create(moduleDefineHolder, stream, aClass);
} else if (stream.processor().equals(MetricsStreamProcessor.class)) {
MetricsStreamProcessor.getInstance().create(moduleDefineHolder, stream, aClass);
} else if (stream.processor().equals(TopNStreamProcessor.class)) {
TopNStreamProcessor.getInstance().create(moduleDefineHolder, stream, aClass);
} else if (stream.processor().equals(NoneStreamProcessor.class)) {
NoneStreamProcessor.getInstance().create(moduleDefineHolder, stream, aClass);
} else if (stream.processor().equals(ManagementStreamProcessor.class)) {
ManagementStreamProcessor.getInstance().create(moduleDefineHolder, stream, aClass);
} else {
throw new UnexpectedException("Unknown stream processor.");
}
} else {
throw new UnexpectedException(
"Stream annotation listener could only parse the class present stream annotation.");
}
}
MetricsStreamProcessor
流式计算,指标Stream聚合处理器
属性
- entryWorkers: Metrics处理器 MetricsAggregateWorker集合,(@Stream注解类里的processor和metrics实体封装得到MetricsAggregateWorker)
方法
- create(): 为每个指标(上述生成的实体)创建工作者和工作流程 (包括AlarmNotifyWorker和MetricsAggregateWorker)
- in(Metrics metrics): 接收Dispatcher来的数据,执行MetricsAggregateWorker.in() 进行L1聚合处理,之后传递给下一个Worker
MetricsAggregateWorker
提供了内存中的指标合并功能。这个聚合叫做L1聚合,它合并后的数据刚刚接收分析。属于同一实体、指标类型和时间的指标bucket, L1聚合将它们合并成一个指标对象,以减少不必要的内存和网络
- onWork(): 处理数据
private void onWork(List<Metrics> metricsList) {
metricsList.forEach(metrics -> {
aggregationCounter.inc();
// 执行Metrics实体里的combine() 进行函数计算
mergeDataCache.accept(metrics);
});
// 传递给下一个worker
mergeDataCache.read().forEach(
data -> {
if (log.isDebugEnabled()) {
log.debug(data.toString());
}
nextWorker.in(data);
}
);
}
DispatcherManager
- forward(Source source) 接收数据转发所有的本scope的Dispatcher,例如sourceaDispatcher把数据给MetricsStreamProcessor,MetricsStreamProcessor在交给具体MetricsAggregateWorker处理和AlarmNotifyWorker, MetricsRemoteWorker等处理器处理
- MetricsRemoteWorker: 将L1聚合后的数据通过grpc发送到远程AGG角色oap
-
AlarmNotifyWorker:将聚合后的数据进行告警处理,接
#一.告警处理--AlarmNotifyWorker
至此整个告警流程首尾相连!
告警数据来源流程图
接图1告警来源入口,整个告警流程首尾相连!
项目推荐
IT-CLOUD
:IT服务管理平台,集成基础服务,中间件服务,监控告警服务等。
IT-CLOUD-ACTIVITI6
:Activiti教程源码。博文在本CSDN Activiti系列中。
IT-CLOUD-ELASTICSEARCH
:elasticsearch教程源码。博文在本CSDN elasticsearch系列中。
IT-CLOUD-KAFKA
:spring整合kafka教程源码。博文在本CSDN kafka系列中。
IT-CLOUD-KAFKA-CLIENT
:kafka client教程源码。博文在本CSDN kafka系列中。
开源项目,持续更新中,喜欢请 Star~