Nacos–源码剖析 Sentinel Dashboard–数据持久化

  • Post author:
  • Post category:其他



目录

​​​​​​​


1.0.Nacos源码剖析


1.1.Nacos配置存储mysql数据库


1.2.客户端工作流程


1.2.1服务创建


1.2.2.服务注册


1.2.3.服务发现


1.2.4.服务下线


1.2.4.服务订阅


1.3.服务端工作流程


1.3.1.注册处理


1.3.2.一致性算法Distro协议介绍


1.3.3.Distro服务启动-寻址模式


1.3.3.1.单机寻址模式


1.3.3.2.文件寻址模式


​编辑


1.3.3.3.服务器寻址模式


1.3.5.集群数据同步


1.3.5.1.全量同步


◆ 任务启动


◆ 数据执行加载


◆ 数据同步


1.3.5.2.增量同步


◆ 增量数据入口


◆ 增量同步操作


◆ 详细增量数据同步(单机)


2 .0.Sentinel Dashboard数据持久化


2.1.动态配置原理


2.2.Sentinel+Nacos数据持久化


2.2.1.Dashboard改造分析


2.2.2.页面改造


2.2.3.Nacos配置


2.3.4.Dashboard持久化改造


2.3.5.Nacos配置创建


2.3.6.改造源码


2.3.6.1.改造NacosConfifig


2.3.6.2.动态获取流控规则


2.3.6.3.数据持久化测试


2.3.系统规则定义



1.0.


Nacos源码剖析


Nacos


源码有很多值得我们学习的地方,为了深入理解


Nacos


,我们剖析源码,分析如下2个知识点:


1


:


Nacos对注册中心的访问原理


2


:


Nacos注册服务处理流程


我们接下来对


Nacos


源码做一个深度剖析,首先搭建


Nacos


源码环境,源码环境搭建起来比较轻松,几乎不会报什么错误,我们这里就不去演示源码环境搭建了。


客户端与注册中心服务端的交互,主要集中在




服务注册









服务下线









服务发现









订阅服务




,其实使用最多的就是




服务注册









服务发现




,下面我会从源码的角度分析一下这四个功能。





Nacos


源码中


nacos





example


中com.alibaba.nacos.example.


NamingExample


类分别演示了这


4


个功能的操作,我们可以把它当做入口,代码如下:






public class






NamingExample




{








public static void






main(String[] args)






throws






NacosException {






Properties properties =






new






Properties();





properties.setProperty(






“serverAddr”






, System.






getProperty






(






“serverAddr”






));





properties.setProperty(






“namespace”






, System.






getProperty






(






“namespace”






));







//采用NamingService实现服务注册









NamingService naming = NamingFactory.createNamingService(properties);







//服务注册









naming.registerInstance(






“nacos.test.3”






,






“11.11.11.11”






,




8888




,






“TEST1”






);





naming.registerInstance(






“nacos.test.3”






,






“2.2.2.2”






,




9999




,






“DEFAULT”






);







//服务发现









System.








out








.println(naming.getAllInstances(






“nacos.test.3”






));







//服务下线









naming.deregisterInstance(






“nacos.test.3”






,






“2.2.2.2”






,




9999




,






“DEFAULT”






);





System.








out








.println(naming.getAllInstances(






“nacos.test.3”






));







//服务订阅









Executor executor =






new






ThreadPoolExecutor(




1




,




1




,




0L




,





TimeUnit.MILLISECONDS,






new






LinkedBlockingQueue<Runnable>(),







new






ThreadFactory() {






@Override







public






Thread newThread(Runnable r) {






Thread thread =






new






Thread(r);





thread.setName(






“test-thread”






);







return






thread;





}





}





);





naming.subscribe(






“nacos.test.3”






,






new






AbstractEventListener() {








//EventListener onEvent is sync to handle, If process too low in onEvent, maybe block other onEvent callback.









//So you can override getExecutor() to async handle event.









@Override







public






Executor getExecutor() {








return






executor




;





}



@Override







public void






onEvent(Event event) {






System.








out








.println(((NamingEvent) event).getServiceName());





System.








out








.println(((NamingEvent) event).getInstances());





}





});





}





}





1.1.Nacos配置存储mysql数据库




配置文件存储


mysql


,打开


nacos





console


工程下


application


.properties文件,添加


mysql


连接信息:





# 当前配置存储 采用mysql存储






spring.datasource.platform




=




mysql




### Count of DB:






db.num




=




1




### Connect URL of DB:






db.url.0




=




jdbc:mysql://127.0.0.1:3306/nacos?characterEncoding=utf8&connectTimeout=1000





&socketTimeout=3000&autoReconnect=true&useUnicode=true&useSSL=false&serverTimezone=UTC





db.user.0




=




nacos





db.password.0




=




nacos



存储配置的表模型在


nacos





console


工程下


resources


/


META





INF





schema


.sql,在


mysql


库中执行脚本。


配置好上面的信息,


nacos


将会把配置文件信息保存到数据库中。


启动


nacos


,在


nacos





console


工程下,启动


Nacos


启动类。




@SpringBootApplication




(scanBasePackages =




“com.alibaba.nacos”




)





@ServletComponentScan






// 定时器






@EnableScheduling





public class




Nacos {




public static void




main(String[] args) {






SpringApplication.





run





(Nacos.




class




, args);





}





}





1.




2.




客户端工作流程




我们先来看一下客户端是如何实现




服务注册









服务发现









服务下线




操作







服务订阅




操作的。


1.2.1服务创建

一旦创建

NamingService

就会支持服务订阅。


NamingService

对象创建功能:





/**







* 创建NamingService 对象







*/






public static




NamingService createNamingService(Properties properties)




throws




NacosException {






try




{






Class<?> driverImplClass = Class.





forName





(




“com.alibaba.nacos.client.naming.NacosNamingService”




);





Constructor constructor = driverImplClass.getConstructor(Properties.




class




);






// NamingService








NamingService vendorImpl = (NamingService) constructor.newInstance(properties);





return




vendorImpl;





}




catch




(Throwable e) {






throw new




NacosException(NacosException.





CLIENT_INVALID_PARAM





, e);





}





}


接下来跟踪看看创建的

NamingService

功能,创建对象时会初始化远程代理对象、心跳检测和订阅定时任务。





/**







* 构造函数 创建NacosNamingService对象







*/







public





NacosNamingService(String serverList)





throws





NacosException {






Properties properties =





new





Properties();





properties.setProperty(PropertyKeyConst.






SERVER_ADDR






, serverList);






// 初始化 配置








init(properties);





}




public





NacosNamingService(Properties properties)





throws





NacosException {






init(properties);





}




/**







* 初始胡过程







*/







private void





init(Properties properties)





throws





NacosException {






ValidatorUtils.





checkInitParam





(properties);






this





.





namespace





= InitUtils.





initNamespaceForNaming





(properties);





InitUtils.





initSerialization





();





initServerAddr(properties);





InitUtils.





initWebRootContext





(properties);





initCacheDir();





initLogName(properties);






// 创建代理对象 执行远程操作






http协议









this





.





serverProxy





=





new





NamingProxy(





this





.





namespace





,





this





.





endpoint





,





this





.





serverList





, properties);






// 定时任务心跳检测









this





.





beatReactor





=





new





BeatReactor(





this





.





serverProxy





, initClientBeatThreadCount(properties));






// 订阅 定时任务 定时从服务拉取服务列表









this





.





hostReactor





=





new





HostReactor(





this





.





serverProxy





,





beatReactor





,





this





.





cacheDir





, isLoadCacheAtStart(properties),





isPushEmptyProtect(properties), initPollingThreadCount(properties));





}


继续跟踪


HostReactor


订阅





public





HostReactor(NamingProxy serverProxy, BeatReactor beatReactor, String cacheDir,





boolean





loadCacheAtStart,






boolean





pushEmptyProtection,





int





pollingThreadCount) {







// init executorService









this





.





executor





=





new





ScheduledThreadPoolExecutor(pollingThreadCount,





new





ThreadFactory() {






@Override






public





Thread newThread(Runnable r) {






Thread thread =





new





Thread(r);





thread.setDaemon(





true





);





thread.setName(





“com.alibaba.nacos.client.naming.updater”





);






return





thread;





}





});




this





.





beatReactor





= beatReactor;






this





.





serverProxy





= serverProxy;






this





.





cacheDir





= cacheDir;






if





(loadCacheAtStart) {







this





.





serviceInfoMap





=





new





ConcurrentHashMap<String, ServiceInfo>(DiskCache.





read





(





this





.





cacheDir





));





}





else





{







this





.





serviceInfoMap





=





new





ConcurrentHashMap<String, ServiceInfo>(




16




);





}






this





.





pushEmptyProtection





= pushEmptyProtection;






this





.





updatingMap





=





new





ConcurrentHashMap<String, Object>();






this





.





failoverReactor





=





new





FailoverReactor(





this





, cacheDir);






// go on 创建订阅服务列表对象









this





.





pushReceiver





=





new





PushReceiver(





this





);






this





.





notifier





=





new





InstancesChangeNotifier();



NotifyCenter.





registerToPublisher





(InstancesChangeEvent.





class





,




16384




);





NotifyCenter.





registerSubscriber





(





notifier





);





}


跟踪订阅服务列表对象


PushReceiver





public class





PushReceiver





implements





Runnable, Closeable {





private static final





Charset






UTF_8






= Charset.





forName





(





“UTF-8”





);




private static final int







UDP_MSS






=




64




*




1024




;






// 定时任务线程池









private





ScheduledExecutorService





executorService





;




private





DatagramSocket





udpSocket





;




private





HostReactor





hostReactor





;




private volatile boolean






closed





=





false





;




public





PushReceiver(HostReactor hostReactor) {







try





{







this





.





hostReactor





= hostReactor;






// 通讯对象(NIO)->远程调用









this





.





udpSocket





=





new





DatagramSocket();






// 创建定时任务线程池->定时任务









this





.





executorService





=





new





ScheduledThreadPoolExecutor(




1




,





new





ThreadFactory() {






@Override






public





Thread newThread(Runnable r) {






Thread thread =





new





Thread(r);





thread.setDaemon(





true





);





thread.setName(





“com.alibaba.nacos.naming.push.receiver”





);






return





thread;





}





});






// 循环执行定时任务 每过一段时间获取一次数据









this





.





executorService





.execute(





this





);





}





catch





(Exception e) {








NAMING_LOGGER






.error(





“[NA] init udp socket failed”





, e);





}





}



@Override






public void





run() {







while





(!





closed





) {







try





{





// byte[] is initialized with 0 full filled by default









byte





[] buffer =





new byte





[






UDP_MSS






];





DatagramPacket packet =





new





DatagramPacket(buffer, buffer.





length





);






// 接收数据 获取远程数据包 NIO









udpSocket





.receive(packet);



String json =





new





String(IoUtils.





tryDecompress





(packet.getData()),






UTF_8






).trim();







NAMING_LOGGER






.info(





“received push data: ”





+ json +





” from ”





+ packet.getAddress().toString());






// 反序列化








PushPacket pushPacket = JacksonUtils.





toObj





(json, PushPacket.





class





);





String ack;






if





(





“dom”





.equals(pushPacket.





type





) ||





“service”





.equals(pushPacket.





type





)) {







// 处理反序列化数据包









hostReactor





.processServiceJson(pushPacket.





data





);






// send ack to server







// ack确认机制 告诉服务器 已经获取到信息了








ack =





“{







\”






type






\”






:






\”






push-ack






\”












+





“,






\”






lastRefTime






\”






:






\”












+ pushPacket.





lastRefTime





+












\”






,






\”






data






\”






:”








+












\”\”






}”





;





}





else if





(





“dump”





.equals(pushPacket.





type





)) {







// dump data to server








ack =





“{







\”






type






\”






:






\”






dump-ack






\”












+





“,






\”






lastRefTime






\”






:






\”












+ pushPacket.





lastRefTime





+












\”






,






\”






data






\”






:”








+












\”












+ StringUtils.





escapeJavaScript





(JacksonUtils.





toJson





(





hostReactor





.getServiceInfoMap()))





+












\”






}”





;





}





else





{







// do nothing send ack only








ack =





“{







\”






type






\”






:






\”






unknown-ack






\”












+





“,






\”






lastRefTime






\”






:






\”












+ pushPacket.





lastRefTime








+












\”






,






\”






data






\”






:”





+












\”\”






}”





;





}




udpSocket





.send(





new





DatagramPacket(ack.getBytes(






UTF_8






), ack.getBytes(






UTF_8






).





length





,





packet.getSocketAddress()));





}





catch





(Exception e) {







if





(





closed





) {







return





;





}







NAMING_LOGGER






.error(





“[NA] error while receiving push data”





, e);





}





}





}





… …


继续处理远程拉取的服务信息,反序列化数据包,将反序列化数据包存储到

serviceInfoMap

中。





public





ServiceInfo processServiceJson(String json) {







// 解析数据包








ServiceInfo serviceInfo = JacksonUtils.





toObj





(json, ServiceInfo.





class





);





ServiceInfo oldService =





serviceInfoMap





.get(serviceInfo.getKey());




if





(





pushEmptyProtection





&& !serviceInfo.validate()) {







//empty or error push, just ignore









return





oldService;





}




boolean





changed =





false





;




if





(oldService !=





null





) {





if





(oldService.getLastRefTime() > serviceInfo.getLastRefTime()) {








NAMING_LOGGER






.warn(





“out of date data received, old-t: ”





+ oldService.getLastRefTime() +





“, new-t: ”








+ serviceInfo.getLastRefTime());





}






// ===获取服务数据存储本地serviceInfoMap中===









serviceInfoMap





.put(serviceInfo.getKey(), serviceInfo);



Map<String, Instance> oldHostMap =





new





HashMap<String, Instance>(oldService.getHosts().size());






for





(Instance host : oldService.getHosts()) {






oldHostMap.put(host.toInetAddr(), host);





}





… …


总结,创建

NamigService

对象时做了几件事:

1. 创建代理对象,执行远程操作。

2. 创建定时任务执行心跳检测,定时向服务发送心跳检测。

3. 创建订阅定时任务,定时从服务拉取服务列表信息。

A. 创建通讯对象(

NIO

协议)->执行远程调用。

B. 创建定时任务线程池->执行


定时任务


,每过一段时间从服务拉取一次服务信息。将拉取的服务信息反序列化后存入本地的缓存(Map<String, ServiceInfo> serviceInfoMap)。

C. 服务信息变更时,会拉取服务信息存储到

serviceInfoMap

中。


1.


2


.


2.


服务注册


我们沿着案例中的服务注册方法调用找到


nacos





api


中的NamingService.


registerInstance


() 并找到它的实现类和方法 com.alibaba.nacos.client.naming.


NacosNamingService


,代码如下:






/***









* 服务注册









*










@param












serviceName










服务名字









*










@param












ip










服务IP









*










@param












port










服务端口









*










@param












clusterName










集群名字









*










@throws










NacosException









*/







@Override







public void






registerInstance(String serviceName, String ip,






int






port, String clusterName)






throws






NacosException {






registerInstance(serviceName, Constants.DEFAULT_GROUP, ip, port, clusterName);





}



@Override







public void






registerInstance(String serviceName, String groupName, String ip,






int






port, String clusterName)






throws






NacosException {








//设置实例IP:Port,默认权重为1.0









Instance instance =






new






Instance();





instance.setIp(ip);





instance.setPort(port);





instance.setWeight(




1.0




);





instance.setClusterName(clusterName);







//注册实例









registerInstance(serviceName, groupName, instance);





}



@Override







public void






registerInstance(String serviceName, Instance instance)






throws






NacosException {






registerInstance(serviceName, Constants.DEFAULT_GROUP, instance);





}





/***









* 实例注册









*










@param












serviceName










name of service









*










@param












groupName










group of service









*










@param












instance










instance to register









*










@throws










NacosException









*/







@Override







public void






registerInstance(String serviceName, String groupName, Instance instance)






throws






NacosException {






NamingUtils.checkInstanceIsLegal(instance);





String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);







// 该字段表示注册的实例是否是临时实例还是持久化实例。









// 如果是临时实例,则不会在 Nacos 服务端持久化存储,需要通过上报心跳的方式进行包活,









// 如果一段时间内没有上报心跳,则会被 Nacos 服务端摘除。











if






(instance.isEphemeral()) {








// 为注册服务设置一个定时任务获取心跳信息,默认为5s汇报一次









// 心跳检测任务   服务名+@+组名 作为key执行心跳检测









BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);





beatReactor.addBeatInfo(groupedServiceName, beatInfo);





}







// 注册到服务端 serverProxy在NamingServer创建的时候创建 代理对象 执行远程调用









serverProxy.registerService(groupedServiceName, groupName, instance);





}



注册主要做了两件事


:


第一件事:为注册的服务设置一个




定时任务




,定时




拉取






服务




信息。


第二件事:将服务




注册




到服务端。


1


:


启动一个定时任务,定时拉取服务信息,时间间隔为5s,如果拉下来服务正常,不做处理,如果不正常,重新注册


2


:


发送


http


请求给注册中心服务端,调用服务注册接口,注册服务


上面代码我们可以看到定时任务添加,但并未完全看到远程请求,serverProxy.


registerService


()方法如下,会先封装请求参数,接下来调用


reqApi


() 而


reqApi


()最后会调用


callServer


() ,代码如下:






public void






registerService(String serviceName, String groupName, Instance instance)






throws






NacosException {






NAMING_LOGGER.info(






“[REGISTER-SERVICE] {} registering service {} with instance: {}”






, namespaceId, serviceName, instance);







//封装Http请求参数











final






Map<String, String> params =






new






HashMap<String, String>(




16




);





params.put(CommonParams.NAMESPACE_ID, namespaceId);





params.put(CommonParams.SERVICE_NAME, serviceName);





params.put(CommonParams.GROUP_NAME, groupName);





params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());





params.put(






“ip”






, instance.getIp());





params.put(






“port”






, String.valueOf(instance.getPort()));





params.put(






“weight”






, String.valueOf(instance.getWeight()));





params.put(






“enable”






, String.valueOf(instance.isEnabled()));





params.put(






“healthy”






, String.valueOf(instance.isHealthy()));





params.put(






“ephemeral”






, String.valueOf(instance.isEphemeral()));





params.put(






“metadata”






, JacksonUtils.toJson(instance.getMetadata()));







//执行Http请求









reqApi(UtilAndComs.nacosUrlInstance, params, HttpMethod.POST);





}






/**







* Request api.







* 请求API







*







*







@param








api







api







*







@param








params







parameters







*







@param








body







body







*







@param








servers







servers







*







@param








method







http method







*







@return







result







*







@throws







NacosException nacos exception







*/







public





String reqApi(String api, Map<String, String> params, Map<String, String> body, List<String> servers,





String method)





throws





NacosException {




params.put(CommonParams.






NAMESPACE_ID






, getNamespaceId());




if





(CollectionUtils.





isEmpty





(servers) && StringUtils.





isBlank





(





nacosDomain





)) {







throw new





NacosException(NacosException.






INVALID_PARAM






,





“no server available”





);





}



NacosException exception =





new





NacosException();




if





(StringUtils.





isNotBlank





(





nacosDomain





)) {







for





(





int





i =




0




; i <





maxRetry





; i++) {







try





{







// 远程调用 入口









return





callServer




(api, params, body,





nacosDomain





, method);





}





catch





(NacosException e) {






exception = e;






if





(






NAMING_LOGGER






.isDebugEnabled()) {








NAMING_LOGGER






.debug(





“request {} failed.”





,





nacosDomain





, e);





}





}





}





}





else





{






Random random =





new





Random(System.





currentTimeMillis





());






int





index = random.nextInt(servers.size());




for





(





int





i =




0




; i < servers.size(); i++) {






String server = servers.get(index);






try





{







return





callServer(api, params, body, server, method);





}





catch





(NacosException e) {






exception = e;






if





(






NAMING_LOGGER






.isDebugEnabled()) {








NAMING_LOGGER






.debug(





“request {} failed.”





, server, e);





}





}





index = (index +




1




) % servers.size();





}





}





NAMING_LOGGER






.error(





“request: {} failed, servers: {}, code: {}, msg: {}”





, api, servers, exception.getErrCode(),





exception.getErrMsg());




throw new





NacosException(exception.getErrCode(),






“failed to req API:”





+ api +





” after all servers(”





+ servers +





“) tried: ”





+ exception.getMessage());



}







/***









*执行远程调用









**/









public






String callServer(String api, Map<String, String> params, Map<String, String> body, String curServer,





String method)






throws






NacosException {








long






start = System.






currentTimeMillis






();







long






end =




0




;





injectSecurityInfo(params);







//封装请求头部









Header header = builderHeader();







//请求是Http还是Https协议









String url;







if






(curServer.startsWith(UtilAndComs.HTTPS) || curServer.startsWith(UtilAndComs.HTTP)) {






url = curServer + api;





}






else






{








if






(!IPUtil.containsPort(curServer)) {






curServer = curServer + IPUtil.IP_PORT_SPLITER + serverPort;





}





url = NamingHttpClientManager.getInstance().getPrefix() + curServer + api;





}







try






{








//执行远程请求,并获取结果集









HttpRestResult<String> restResult = nacosRestTemplate.exchangeForm(url, header, Query.newInstance().initParams(params), body, method, String.






class






);





end = System.






currentTimeMillis






();





MetricsMonitor.getNamingRequestMonitor(method, url, String.valueOf(restResult.getCode())).observe(end – start);







//结果集解析











if






(restResult.ok()) {








return






restResult.getData();





}







if






(HttpStatus.SC_NOT_MODIFIED == restResult.getCode()) {








return






StringUtils.EMPTY;





}







throw new






NacosException(restResult.getCode(), restResult.getMessage());





}






catch






(Exception e) {






NAMING_LOGGER.error(






“[NA] failed to request”






, e);







throw new






NacosException(NacosException.SERVER_ERROR, e);





}





}



执行远程


Http


请求的对象是


NacosRestTemplate


,该对象就是封装了普通的


Http


请求。





/**







* Execute the HTTP method to the given URI template, writing the given request entity to the request, and returns







* the response as {








@link







HttpRestResult}.







*







*







@param








url







url







*







@param








header







http header param







*







@param








query







http query param 查询条件封装







*







@param








bodyValues







http body param







*







@param








httpMethod







http method







*







@param








responseType







return type







*







@return







{








@link







HttpRestResult}







*







@throws







Exception ex







*/







public





<




T




> HttpRestResult<




T




> exchangeForm(String url, Header header, Query query, Map<String, String> bodyValues,





String httpMethod, Type responseType)





throws





Exception {






RequestHttpEntity requestHttpEntity =





new





RequestHttpEntity(





header.setContentType(MediaType.






APPLICATION_FORM_URLENCODED






), query, bodyValues);






return





execute




(url, httpMethod, requestHttpEntity, responseType);





}



执行远程调用请求 使用


http


协议





/**







* 执行远程请求







*







*







@param








url












*







@param








httpMethod












*







@param








requestEntity












*







@param








responseType












*







@param








<T>












*







@return












*







@throws







Exception







*/






@SuppressWarnings




(





“unchecked”





)






private





<




T




> HttpRestResult<




T




> execute(String url, String httpMethod, RequestHttpEntity requestEntity,





Type responseType)





throws





Exception {







// url:http://127.0.0.1:8848/nacos/v1/ns/instance








URI uri = HttpUtils.





buildUri





(url, requestEntity.getQuery());






if





(





logger





.isDebugEnabled()) {







logger





.debug(





“HTTP method: {}, url: {}, body: {}”





, httpMethod, uri, requestEntity.getBody());





}



ResponseHandler<




T




> responseHandler =





super





.selectResponseHandler(responseType);





HttpClientResponse response =





null





;






try





{







// HttpClientRequest执行远程调用








response =





this





.




requestClient




().




execute




(uri, httpMethod, requestEntity);






return





responseHandler.handle(response);





}





finally





{







if





(response !=





null





) {






response.close();





}





}





}




/**







* 获取HttpClientRequest







*/







private





HttpClientRequest




requestClient




() {







if





(CollectionUtils.





isNotEmpty





(





interceptors





)) {







if





(





logger





.isDebugEnabled()) {







logger





.debug(





“Execute via interceptors :{}”





,





interceptors





);





}






return new





InterceptingHttpClientRequest(





requestClient





,





interceptors





.iterator());





}






return






requestClient





;





}


这是远程调用使用的是

JdkHttpClientRequest

发起的远程调用




@Override






public





HttpClientResponse execute(URI uri, String httpMethod, RequestHttpEntity requestHttpEntity)






throws





Exception {







final





Object body = requestHttpEntity.getBody();






final





Header headers = requestHttpEntity.getHeaders();





replaceDefaultConfig(requestHttpEntity.getHttpClientConfig());



HttpURLConnection conn = (HttpURLConnection) uri.toURL().openConnection();





Map<String, String> headerMap = headers.getHeader();






if





(headerMap !=





null





&& headerMap.size() >




0




) {







for





(Map.Entry<String, String> entry : headerMap.entrySet()) {






conn.setRequestProperty(entry.getKey(), entry.getValue());





}





}



conn.setConnectTimeout(





this





.





httpClientConfig





.getConTimeOutMillis());





conn.setReadTimeout(





this





.





httpClientConfig





.getReadTimeOutMillis());





conn.setRequestMethod(httpMethod);






if





(body !=





null





&& !





“”





.equals(body)) {






String contentType = headers.getValue(HttpHeaderConsts.






CONTENT_TYPE






);





String bodyStr = JacksonUtils.





toJson





(body);






if





(MediaType.






APPLICATION_FORM_URLENCODED






.equals(contentType)) {






Map<String, String> map = JacksonUtils.





toObj





(bodyStr, HashMap.





class





);





bodyStr = HttpUtils.





encodingParams





(map, headers.getCharset());





}






if





(bodyStr !=





null





) {






conn.setDoOutput(





true





);






byte





[] b = bodyStr.getBytes();





conn.setRequestProperty(





“Content-Length”





, String.





valueOf





(b.





length





));






// 获取远程网络输入流 执行远程请求








OutputStream outputStream = conn.getOutputStream();





outputStream.write(b,




0




, b.





length





);





outputStream.flush();





IoUtils.





closeQuietly





(outputStream);





}





}





conn.connect();






return new





JdkHttpClientResponse(conn);





}



远程调用,调用的是


nacos


的服务信息。远程掉用




url:http://127.0.0.1:8848/nacos/v1/ns/instance




执行

Http

请求/

nacos

为服务根地址,在nacos-console配置文件中配置。





#*************** Spring Boot Related Configurations ***************#







### Default web context path:






默认web服务根地址







server.servlet.contextPath





=





/nacos







### Default web server port:







server.port





=





8848




远程调用


post


请求/v1/ns/instance为


nacos


服务地址,在nacos-naming包中。


HealthController

执行心跳检测服务





/**







* Health status related operation controller.







* 心跳相关控制器







*







@author







nkorange







*







@author







nanamikon







*







@since







0.8.0







*/






@RestController




(





“namingHealthController”





)





@RequestMapping




(UtilsAndCommons.






NACOS_NAMING_CONTEXT






+





“/health”





)






public class





HealthController {






… …





}



InstanceController

执行服务实例注册服务





/**







* Instance operation controller.







* 服务实例控制







*







@author







nkorange







*/






@RestController






// /vi/ns/instance






@RequestMapping




(UtilsAndCommons.






NACOS_NAMING_CONTEXT






+





“/instance”





)






public class





InstanceController {







/**







* Register new instance.







*







*







@param








request







http request







*







@return







‘ok’ if success







*







@throws







Exception any error during register







*/






@CanDistro





//Distro协议(数据临时一致性协议)






@PostMapping





@Secured




(parser = NamingResourceParser.





class





, action = ActionTypes.






WRITE






)






public





String register(HttpServletRequest request)





throws





Exception {





final





String namespaceId = WebUtils





.





optional





(request, CommonParams.






NAMESPACE_ID






, Constants.






DEFAULT_NAMESPACE_ID






);






final





String serviceName = WebUtils.





required





(request, CommonParams.






SERVICE_NAME






);





NamingUtils.





checkServiceNameFormat





(serviceName);




final





Instance instance = parseInstance(request);






// 注册服务实例









serviceManager





.registerInstance(namespaceId, serviceName, instance);






return






“ok”





;





}




}


1.


2


.


3.


服务发现


我们沿着案例中的服务发现方法调用找到


nacos





api


中的NamingService.


getAllInstances


() 并找到它的实现类和方法com.alibaba.nacos.client.naming.NacosNamingService.


getAllInstances


() ,代码如下:




@Override






public





List<Instance> getAllInstances(String serviceName, String groupName, List<String> clusters,






boolean





subscribe)





throws





NacosException {




ServiceInfo serviceInfo;






if





(subscribe) {







// 开启服务订阅则从本地获取服务列表 本地服务列表存储在serviceInfoMap中







// 服务订阅会将服务列表定时更新存储到serviceInfoMao中








serviceInfo =





hostReactor





.




getServiceInfo




(NamingUtils.





getGroupedName





(serviceName, groupName),





StringUtils.





join





(clusters,





“,”





));





}





else





{







// 没有开启服务订阅从远端获取 请求nacos服务发起远程调用获取服务信息








serviceInfo =





hostReactor








.getServiceInfoDirectlyFromServer(NamingUtils.





getGroupedName





(serviceName, groupName),





StringUtils.





join





(clusters,





“,”





));





}





List<Instance> list;






if





(serviceInfo ==





null





|| CollectionUtils.





isEmpty





(list = serviceInfo.getHosts())) {







return new





ArrayList<Instance>();





}






return





list;





}



上面的代码调用了hostReactor.


getServiceInfo


() 方法








public






ServiceInfo getServiceInfo(






final






String serviceName,






final






String clusters) {






NAMING_LOGGER.debug(






“failover-mode: ”






+ failoverReactor.isFailoverSwitch());





String key = ServiceInfo.getKey(serviceName, clusters);







if






(failoverReactor.isFailoverSwitch()) {








return






failoverReactor.getService(key);





}







/*1。先从本地缓存中获取服务对象,因为启动是第一次进来,所以缓存站不存在*/









ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);







if






(






null






== serviceObj) {








/*构建服务实例*/









serviceObj =






new






ServiceInfo(serviceName, clusters);







/*将服务实例存放到缓存中*/









serviceInfoMap.put(serviceObj.getKey(), serviceObj);







/*更新nacos-上的服务*/









updatingMap.put(serviceName,






new






Object());







/*主动获取,并且更新到缓存本地,以及已过期的服务更新等*/









updateServiceNow(serviceName, clusters);





updatingMap.remove(serviceName);





}






else if






(updatingMap.containsKey(serviceName)) {








if






(UPDATE_HOLD_INTERVAL >




0




) {






updateServiceNow(serviceName, clusters);







// hold a moment waiting for update finish











synchronized






(serviceObj) {








try






{






serviceObj.wait(UPDATE_HOLD_INTERVAL);





}






catch






(InterruptedException e) {






NAMING_LOGGER





.error(






“[getServiceInfo] serviceName:”






+ serviceName +






“, clusters:”






+ clusters, e);





}





}





}





}







/*2.开启定时任务*/









scheduleUpdateIfAbsent(serviceName, clusters);







return






serviceInfoMap.get(serviceObj.getKey());





}



该方法会先调用


getServiceInfo0


() 方法从本地缓存获取数据







private





ServiceInfo getServiceInfo0(String serviceName, String clusters) {







// 获取对应的key值 Group@服务名 格式








String key = ServiceInfo.





getKey





(serviceName, clusters);






// 获取key对应的缓存 这里的值是NamingService执行定时任务获取的









return






serviceInfoMap





.get(key);





}



缓存没有数据,就构建实例更新到


Nacos


,并从


Nacos


中获取最新数据





updateServiceNow

(serviceName, clusters);







从远程服务器获取更新数据







private void





updateServiceNow(String serviceName, String clusters) {







try





{






updateService(serviceName, clusters);





}





catch





(NacosException e) {








NAMING_LOGGER






.error(





“[NA] failed to update serviceName: ”





+ serviceName, e);





}





}



最终会调用


updateService


()方法,在该方法中完成远程请求和数据处理,源码如下:





/**







* Update service now.







*







*







@param








serviceName







service name







*







@param








clusters







clusters







*/







public void





updateService(String serviceName, String clusters)





throws





NacosException {







// 从本地缓存列表获取服务








ServiceInfo oldService =




getServiceInfo0




(serviceName, clusters);






try





{







// 代理发起http请求远程调用 获取服务以及提供者端口信息,端口等








String result =





serverProxy





.queryList(serviceName, clusters,





pushReceiver





.getUdpPort(),





false





);




if





(StringUtils.





isNotEmpty





(result)) {







// 反序列化服务信息 并存储到serviceInfoMap中








processServiceJson




(result);





}





}





finally





{







if





(oldService !=





null





) {







synchronized





(oldService) {






oldService.notifyAll();





}





}





}





}




getServiceInfo0




(




)


方法在前面已经介绍过了。


processServiceJson




(




)


方法在服务创建介绍过。

回到开头,没有开启服务订阅会从远端获取,请求

nacos

服务发起远程调用获取服务信息。调用getServiceInfoDirectlyFromServer()方法。





/**







* 直接从nacos服务获取服务信息







*/







public





ServiceInfo getServiceInfoDirectlyFromServer(





final





String serviceName,





final





String clusters)






throws





NacosException {






String result =





serverProxy





.




queryList




(serviceName, clusters,




0




,





false





);






if





(StringUtils.





isNotEmpty





(result)) {







return





JacksonUtils.





toObj





(result, ServiceInfo.





class





);





}






return null





;





}


这里仍然是

NamingService

对象创建的代理对象发起远程调用获取服务信息


queryList




(




)






/**







* Query instance list.







*/







public





String queryList(String serviceName, String clusters,





int





udpPort,





boolean





healthyOnly)






throws





NacosException {





final





Map<String, String> params =





new





HashMap<String, String>(




8




);





params.put(CommonParams.






NAMESPACE_ID






,





namespaceId





);





params.put(CommonParams.






SERVICE_NAME






, serviceName);





params.put(





“clusters”





, clusters);





params.put(





“udpPort”





, String.





valueOf





(udpPort));





params.put(





“clientIP”





, NetUtils.





localIP





());





params.put(





“healthyOnly”





, String.





valueOf





(healthyOnly));




return





reqApi




(UtilAndComs.





nacosUrlBase





+





“/instance/list”





, params, HttpMethod.






GET






);





}




reqApi




(




)


在服务注册环节已经介绍过了。


1.


2


.


4.


服务下线


我们沿着案例中的服务下线方法调用找到


nacos





api


中的NamingService.


deregisterInstance


()并找到它的实现类和方法 NacosNamingService.deregisterInstance(),代码如下:




@Override







public void






deregisterInstance(String serviceName, String groupName, String ip,






int






port, String clusterName)






throws






NacosException {








//构建实例信息









Instance instance =






new






Instance();





instance.setIp(ip);





instance.setPort(port);





instance.setClusterName(clusterName);







//服务下线操作









deregisterInstance(serviceName, groupName, instance);





}



@Override







public void






deregisterInstance(String serviceName, String groupName, Instance instance)






throws






NacosException {








if






(instance.isEphemeral()) {








//移除心跳信息监测的定时任务









beatReactor.removeBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), instance.getIp(),





instance.getPort());





}







//发送远程请求执行服务下线销毁操作









serverProxy.deregisterService(NamingUtils.getGroupedName(serviceName, groupName), instance);





}



发起远程


delete


请求。





/**







* deregister instance from a service.







*







*







@param








serviceName







name of service







*







@param








instance







instance







*







@throws







NacosException nacos exception







*/







public void





deregisterService(String serviceName, Instance instance)





throws





NacosException {






NAMING_LOGGER











.info(





“[DEREGISTER-SERVICE] {} deregistering service {} with instance: {}”





,





namespaceId





, serviceName,





instance);




final





Map<String, String> params =





new





HashMap<String, String>(




8




);





params.put(CommonParams.






NAMESPACE_ID






,





namespaceId





);





params.put(CommonParams.






SERVICE_NAME






, serviceName);





params.put(CommonParams.






CLUSTER_NAME






, instance.getClusterName());





params.put(





“ip”





, instance.getIp());





params.put(





“port”





, String.





valueOf





(instance.getPort()));





params.put(





“ephemeral”





, String.





valueOf





(instance.isEphemeral()));



reqApi(UtilAndComs.





nacosUrlInstance





, params, HttpMethod.






DELETE






);





}


这里会远程调用

nacos



naming

服务

InstanceController

接口中的

delete

方法销毁服务。





/**







* Deregister instances.







*







*







@param








request







http request







*







@return







‘ok’ if success







*







@throws







Exception any error during deregister







*/






@CanDistro





@DeleteMapping





@Secured




(parser = NamingResourceParser.





class





, action = ActionTypes.






WRITE






)






public





String deregister(HttpServletRequest request)





throws





Exception {






Instance instance = getIpAddress(request);





String namespaceId = WebUtils.





optional





(request, CommonParams.






NAMESPACE_ID






, Constants.






DEFAULT_NAMESPACE_ID






);





String serviceName = WebUtils.





required





(request, CommonParams.






SERVICE_NAME






);





NamingUtils.





checkServiceNameFormat





(serviceName);



Service service =





serviceManager





.getService(namespaceId, serviceName);






if





(service ==





null





) {






Loggers.






SRV_LOG






.warn(





“remove instance from non-exist service: {}”





, serviceName);






return






“ok”





;





}






// 移除服务数据









serviceManager





.removeInstance(namespaceId, serviceName, instance.isEphemeral(), instance);






return






“ok”





;





}



服务下线方法比较简单,和服务注册做的事情正好相反,也做了两件事,第一件事:不在进行心跳检测。 第二件事:请求服务端服务下线接口。


1.


2


.


4.


服务订阅


我们可以查看订阅服务的案例,会先创建一个线程池,接下来会把线程池封装到监听器中,而监听器中可以监听指定实例信息,代码如下:






//服务订阅







Executor executor =






new






ThreadPoolExecutor(




1




,




1




,




0L




, TimeUnit.MILLISECONDS,






new









LinkedBlockingQueue<Runnable>(),






new






ThreadFactory() {






@Override







public






Thread newThread(Runnable r) {






Thread thread =






new






Thread(r);





thread.setName(






“test-thread”






);







return






thread;





}





});





naming.subscribe(






“nacos.test.3”






,






new






AbstractEventListener() {








//EventListener onEvent is sync to handle, If process too low in onEvent,maybe block other onEvent callback.









//So you can override getExecutor() to async handle event.









@Override







public






Executor getExecutor() {








return






executor




;





}





//读取监听到的服务实例









@Override







public void






onEvent(Event event) {






System.








out








.println(((NamingEvent) event).getServiceName());





System.








out








.println(((NamingEvent) event).getInstances());





}





});



我们沿着案例中的服务订阅方法调用找到


nacos





api


中的NamingService.


subscribe


()并找到它的实现类和方法NacosNamingService.


deregisterInstance


(),代码如下:






public void






subscribe(String serviceName, String clusters, EventListener eventListener) {








//注册监听









notifier.registerListener(serviceName, clusters, eventListener);







//获取并更新服务实例









getServiceInfo(serviceName, clusters);





}



此时会注册监听,注册监听就是将当前的监听对象信息注入到


listenerMap


集合中,在监听对象的指定方法


onEvent


中可以读取实例信息,代码如下:






public void






registerListener(String serviceName, String clusters, EventListener listener) {






String key = ServiceInfo.getKey(serviceName, clusters);





ConcurrentHashSet<EventListener> eventListeners = listenerMap.get(key);







if






(eventListeners ==






null






) {








synchronized






(lock) {






eventListeners = listenerMap.get(key);







if






(eventListeners ==






null






) {






eventListeners =






new






ConcurrentHashSet<EventListener>();





listenerMap.put(key, eventListeners);





}





}





}







//将当前监听对象放入到集合中,在监听对象的onEvent中可以读出对应的实例对象









eventListeners.add(listener);





}



get


ServiceInfo


(


serviceName


,


clusters


)


获取服务实例,先从本地缓存获取,本地获取不到就从服务器获取,前面服务发现已经介绍过了。




1.




3.




服务端工作流程




注册中心服务端的主要功能包括,接收客户端的




服务注册









服务发现









服务下线




的功能,但是除了这些和客户端的交互之外,服务端还要做一些更重要的事情,就是我们常常会在分布式系统中听到的


AP





CP


,作为一个集群,


nacos


即实现了


AP


也实现了


CP


,其中


AP


使用的自己实现的


Distro




协议




,而


CP


是采用


raft




协议




实现的,这个过程中牵涉到




心跳









选主




等操作。


我们来学习一下注册中心服务端接收客户端服务注册的功能。


1.


3


.1


.


注册处理


我们先来学习一下


Nacos


的工具类


WebUtils


,该工具类在


nacos





core


工程下,该工具类是用于




处理请求参数转化




的,里面提供了


2


个常被用到的方法


required


()和


optional


():


required


方法通过参数名key,解析HttpServletRequest请求中的参数,并转码为UTF-8编码。


optional


方法在required方法的基础上增加了默认值,如果获取不到,则返回默认值。


代码如下:






/**









* required方法通过参数名key,解析HttpServletRequest请求中的参数,并转码为UTF-8编码。









*/









public static






String required(






final






HttpServletRequest req,






final






String key) {






String value = req.getParameter(key);







if






(StringUtils.isEmpty(value)) {








throw new






IllegalArgumentException(






“Param ‘”






+ key +






“‘ is required.”






);





}





String encoding = req.getParameter(






“encoding”






);







return






resolveValue(value, encoding);





}





/**









* optional方法在required方法的基础上增加了默认值,如果获取不到,则返回默认值。









*/









public static






String optional(






final






HttpServletRequest req,






final






String key,






final






String defaultValue) {








if






(!req.getParameterMap().containsKey(key) ||





req.getParameterMap().get(key)[




0




] ==






null






) {








return






defaultValue;





}





String value = req.getParameter(key);







if






(StringUtils.isBlank(value)) {








return






defaultValue;





}





String encoding = req.getParameter(






“encoding”






);







return






resolveValue(value, encoding);





}



nacos server





client


使用了


http


协议来交互,那么在


server


端必定提供了


http


接口的入口,并且在


core


模块看到其依赖了


spring boot starter


,所以它的


http


接口由集成了


Spring





web


服务器支持,简单地说就是像我们平时写的业务服务一样,有


controller


层和


service


层。





OpenAPI


作为入口来学习,我们找到 /nacos/v1/ns/instance服务注册接口,在


nacos





naming


工程中我们可以看到


InstanceController


正是我们要找的对象,如下图:



处理服务注册,我们直接找对应的


POST


方法即可,代码如下:






/**









* Register new instance.









* 接收客户端注册信息









*









*










@param












request










http request









*










@return










‘ok’ if success









*










@throws










Exception any error during register









*/







@CanDistro





@PostMapping





@Secured(parser = NamingResourceParser.






class






, action = ActionTypes.WRITE)







public






String register(HttpServletRequest request)






throws






Exception {








//获取namespaceid,该参数是可选参数











final






String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);







//获取服务名字











final






String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);







//校验服务的名字,服务的名字格式为groupName@@serviceName









NamingUtils.checkServiceNameFormat(serviceName);







//创建实例











final






Instance instance = parseInstance(request);







//注册服务









serviceManager.registerInstance(namespaceId, serviceName, instance);







return








“ok”






;





}



如上图,该方法主要用于接收客户端




注册信息




,并且会校验参数是否存在问题,如果不存在问题就创建服务的实例,服务实例创建后将服务实例注册到


Nacos


中,注册的方法如下:






public void






registerInstance(String namespaceId, String serviceName, Instance instance)






throws






NacosException {








//判断本地缓存中是否存在该命名空间,如果不存在就创建,之后判断该命名空间下是否









//存在该服务,如果不存在就创建空的服务









//如果实例为空,则创建实例,并且会将创建的实例存入到serviceMap集合中









createEmptyService(namespaceId, serviceName, instance.isEphemeral());







//从serviceMap集合中获取创建的实例









Service service = getService(namespaceId, serviceName);







if






(service ==






null






) {








throw new






NacosException(NacosException.INVALID_PARAM,






“service not found, namespace: ”






+ namespaceId +






“, service: ”






+ serviceName);





}







//服务注册,这一步才会把服务的实例信息和服务绑定起来









addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);





}



注册的方法中会先创建该实例对象,创建前先检查本地缓存是否存在该实例对象,如果不存在就创建,最后注册该服务,并且该服务会和实例信息捆绑到一起,并将信息同步到磁盘,数据同步到磁盘就涉及到数据一致性了,我们接下来讲解


Nacos


的数据一致性。


1.


3


.2


.


一致性算法Distro协议介绍


Distro


是阿里巴巴的




私有协议




,目前流行的


Nacos


服务管理框架就采用了


Distro


协议。


Distro


协议被定位为临时数据的一致性协议:该类型协议,不需要把数据存储到磁盘或者数据库,因为临时数据通常和服务器保持一个


session


会话,该会话只要存在,数据就不会丢失 。


Distro


协议保证




写必须永远是成功




的,即使可能会发生网络分区。当网络恢复时,把各数据分片的数据进行合并。


Distro


协议具有以下特点:


1


:


专门为了注册中心而创造出的协议;


2


:


客户端与服务端有两个重要的交互,服务注册与心跳发送;


3


:


客户端以服务为维度向服务端注册,注册后每隔一段时间向服务端发送一次心跳,心跳包需要带上注册服


务的全部信息,在客户端看来,服务端节点对等,所以请求的节点是随机的;


4


:


客户端请求失败则换一个节点重新发送请求;


5


:


服务端节点都存储所有数据,但每个节点只负责其中一部分服务,在接收到客户端的“写”


(


注册、心


跳、下线等


)


请求后,服务端节点判断请求的服务是否为自己负责,如果是,则处理,否则交由负责的节点


处理;


6


:


每个服务端节点主动发送健康检查到其他节点,响应的节点被该节点视为健康节点;


7


:


服务端在接收到客户端的服务心跳后,如果该服务不存在,则将该心跳请求当做注册请求来处理;


8


:


服务端如果长时间未收到客户端心跳,则下线该服务;


9


:


负责的节点在接收到服务注册、服务心跳等写请求后将数据写入后即返回,后台异步地将数据同步给其他


节点;


10


:


节点在收到读请求后直接从本机获取后返回,无论数据是否为最新。


1.


3


.3


.


Distro


服务启动-


寻址模式


Distro


协议服务端节点发现使用




寻址机制




来实现服务端节点的管理。在


Nacos


中,寻址模式有三种:


单机模式


(


StandaloneMemberLookup


)


文件模式


(


FileConfigMemberLookup


)


服务器模式


(


AddressServerMemberLookup


)


三种寻址模式如下图:


在com.alibaba.nacos.core.cluster.lookup.LookupFactory中有创建寻址方式,可以创建集群启动方式、单机启动方式,不同启动方式就决定了不同寻址模式。






/**









* Create the target addressing pattern.









* 创建寻址模式









*









*










@param












memberManager










{











@link










ServerMemberManager}









*










@return










{











@link










MemberLookup}









*










@throws










NacosException NacosException









*/









public static






MemberLookup createLookUp(ServerMemberManager memberManager)






throws






NacosException {








//NacosServer 集群方式启动











if






(!EnvUtil.getStandaloneMode()) {






String lookupType = EnvUtil.getProperty(LOOKUP_MODE_TYPE);







//由参数中传入的寻址方式得到LookupType对象









LookupType type = chooseLookup(lookupType);







//选择寻址方式









LOOK_UP =






find






(type);







//设置当前寻址方式









currentLookupType = type;





}






else






{








//NacosServer单机启动









LOOK_UP =






new






StandaloneMemberLookup();





}





LOOK_UP.injectMemberManager(memberManager);





Loggers.CLUSTER.info(






“Current addressing mode selection : {}”






,





LOOK_UP.getClass().getSimpleName());







return






LOOK_UP;





}





/***









* 选择寻址方式









*










@param












type















*










@return















*/









private static






MemberLookup find(LookupType type) {








//文件寻址模式,也就是配置cluster.conf配置文件将多个节点串联起来,









// 通过配置文件寻找其他节点,以达到和其他节点通信的目的











if






(LookupType.FILE_CONFIG.equals(type)) {






LOOK_UP =






new






FileConfigMemberLookup();







return






LOOK_UP;





}







//服务器模式











if






(LookupType.ADDRESS_SERVER.equals(type)) {






LOOK_UP =






new






AddressServerMemberLookup();







return






LOOK_UP;





}







// unpossible to run here











throw new






IllegalArgumentException();





}





单节点寻址模式




会直接创建StandaloneMemberLookup对象,而




文件寻址模式




会创建FileConfigMemberLookup对象,




服务器寻址模式




会创建AddressServerMemberLookup;


1.


3


.3.1


.


单机


寻址


模式

单机模式直接寻找自己的IP:PORT地址。





public class





StandaloneMemberLookup





extends





AbstractMemberLookup {




@Override






public void





start() {







if





(





start





.compareAndSet(





false





,





true





)) {







// 获取自己的IP:port








String url = InetUtils.





getSelfIP





() +





“:”





+ EnvUtil.





getPort





();





afterLookup(MemberUtil.





readServerConf





(Collections.





singletonList





(url)));





}





}





}



1.


3


.3.2.文件寻址模式



文件寻址模式主要在创建集群的时候,通过


cluster


.conf 来配置集群,程序可以通过监听


cluster


.conf 文件变化实现动态管理节点,FileConfigMemberLookup源码如下:






public class






FileConfigMemberLookup






extends






AbstractMemberLookup {








//创建文件监听器











private






FileWatcher






watcher






=






new






FileWatcher() {








//文件发生变更事件









@Override







public void






onChange(FileChangeEvent event) {






readClusterConfFromDisk();





}





//检查context是否包含cluster.conf









@Override







public boolean






interest(String context) {








return






StringUtils.contains(context,






“cluster.conf”






);





}





};



@Override







public void






start()






throws






NacosException {








if






(start.compareAndSet(






false






,






true






)) {






readClusterConfFromDisk();







// 使用inotify机制来监视文件更改,并自动触发对cluster.conf的读取











try






{






WatchFileCenter.registerWatcher(EnvUtil.getConfPath(),






watcher






);





}






catch






(Throwable e) {






Loggers.CLUSTER.error(






“An exception occurred in the launch file monitor : {}”






, e.getMessage());





}





}





}



@Override







public void






destroy()






throws






NacosException {






WatchFileCenter.deregisterWatcher(EnvUtil.getConfPath(),






watcher






);





}





private void






readClusterConfFromDisk() {






Collection<Member> tmpMembers =






new






ArrayList<>();







try






{






List<String> tmp = EnvUtil.readClusterConf();





tmpMembers = MemberUtil.readServerConf(tmp);





}






catch






(Throwable e) {






Loggers.CLUSTER.error(






“nacos-XXXX [serverlist] failed to get serverlist from disk!, error : {}”






, e.getMessage());





}





afterLookup(tmpMembers);





}





}



1.


3


.3.3


.


服务器寻址模式


使用地址服务器存储节点信息,会创建AddressServerMemberLookup,服务端定时拉取信息进行管理;






public class






AddressServerMemberLookup






extends






AbstractMemberLookup {








private final






GenericType<RestResult<String>>






genericType






=







new






GenericType<RestResult<String>>() {






};







public






String






domainName






;







public






String






addressPort






;







public






String






addressUrl






;







public






String






envIdUrl






;







public






String






addressServerUrl






;







private volatile boolean








isAddressServerHealth






=






true






;







private int








addressServerFailCount






=




0




;







private int








maxFailCount






=




12




;







private final






NacosRestTemplate






restTemplate






=





HttpClientBeanHolder.getNacosRestTemplate(Loggers.CORE);







private volatile boolean








shutdown






=






false






;



@Override







public void






start()






throws






NacosException {








if






(start.compareAndSet(






false






,






true






)) {








this






.






maxFailCount






= Integer.






parseInt






(EnvUtil.getProperty(






“maxHealthCheckFailCount”






,






“12”






));





initAddressSys();





run();





}





}





/***









* 获取服务器地址









*/











private void






initAddressSys() {






String envDomainName = System.






getenv






(






“address_server_domain”






);







if






(StringUtils.isBlank(envDomainName)) {








domainName






= EnvUtil.getProperty(






“address.server.domain”






,






“jmenv.tbsite.net”






);





}






else






{








domainName






= envDomainName;





}





String envAddressPort = System.






getenv






(






“address_server_port”






);







if






(StringUtils.isBlank(envAddressPort)) {








addressPort






= EnvUtil.getProperty(






“address.server.port”






,






“8080”






);





}






else






{








addressPort






= envAddressPort;





}





String envAddressUrl = System.






getenv






(






“address_server_url”






);







if






(StringUtils.isBlank(envAddressUrl)) {








addressUrl






= EnvUtil.getProperty(






“address.server.url”






, EnvUtil.getContextPath() +






“/”






+






“serverlist”






);





}






else






{








addressUrl






= envAddressUrl;





}







addressServerUrl






=






“http://”






+






domainName






+






“:”






+






addressPort






+






addressUrl






;







envIdUrl






=






“http://”






+






domainName






+






“:”






+






addressPort






+






“/env”






;





Loggers.CORE.info(






“ServerListService address-server port:”






+






addressPort






);





Loggers.CORE.info(






“ADDRESS_SERVER_URL:”






+






addressServerUrl






);





}



@SuppressWarnings




(






“PMD.UndefineMagicConstantRule”






)







private void






run()






throws






NacosException {








// With the address server, you need to perform a synchronous member node pull at startup









// Repeat three times, successfully jump out











boolean






success =






false






;





Throwable ex =






null






;







int






maxRetry = EnvUtil.getProperty(






“nacos.core.address-server.retry”






, Integer.






class






,




5




);







for






(






int






i =




0




; i < maxRetry; i++) {








try






{








//拉取集群节点信息









syncFromAddressUrl();





success =






true






;







break






;





}






catch






(Throwable e) {






ex = e;





Loggers.CLUSTER.error(






“[serverlist] exception, error : {}”






, ExceptionUtil.getAllExceptionMsg(ex));





}





}







if






(!success) {








throw new






NacosException(NacosException.SERVER_ERROR, ex);





}







//创建定时任务









GlobalExecutor.scheduleByCommon(






new






AddressServerSyncTask(),




5_000L




);





}



@Override







public void






destroy()






throws






NacosException {








shutdown






=






true






;





}



@Override







public






Map<String, Object> info() {






Map<String, Object> info =






new






HashMap<>(




4




);





info.put(






“addressServerHealth”






,






isAddressServerHealth






);





info.put(






“addressServerUrl”






,






addressServerUrl






);





info.put(






“envIdUrl”






,






envIdUrl






);





info.put(






“addressServerFailCount”






,






addressServerFailCount






);







return






info;





}





private void






syncFromAddressUrl()






throws






Exception {






RestResult<String> result =






restTemplate






.get(






addressServerUrl






, Header.EMPTY, Query.EMPTY,






genericType






.getType());







if






(result.ok()) {








isAddressServerHealth






=






true






;





Reader reader =






new






StringReader(result.getData());







try






{






afterLookup(MemberUtil.readServerConf(EnvUtil.analyzeClusterConf(reader)));





}






catch






(Throwable e) {






Loggers.CLUSTER.error(






“[serverlist] exception for analyzeClusterConf, error : {}”






, ExceptionUtil.getAllExceptionMsg(e));





}







addressServerFailCount






=




0




;





}






else






{








addressServerFailCount






++;







if






(






addressServerFailCount






>=






maxFailCount






) {








isAddressServerHealth






=






false






;





}





Loggers.CLUSTER.error(






“[serverlist] failed to get serverlist, error code {}”






, result.getCode());





}





}





// 定时任务











class






AddressServerSyncTask






implements






Runnable {






@Override







public void






run() {








if






(






shutdown






) {








return






;





}







try






{








//拉取服务列表









syncFromAddressUrl();





}






catch






(Throwable ex) {








addressServerFailCount






++;







if






(






addressServerFailCount






>=






maxFailCount






) {








isAddressServerHealth






=






false






;





}





Loggers.CLUSTER.error(






“[serverlist] exception, error : {}”






, ExceptionUtil.getAllExceptionMsg(ex));





}






finally






{






GlobalExecutor.scheduleByCommon(






this






,




5_000L




);





}





}





}





}



1.


3


.


5.集群


数据同步


Nacos


数据同步分为




全量同步









增量同步




,所谓全量同步就是初始化数据一次性同步,而




增量同步




是指有数据增加的时候,只同步增加的数据。


1.


3


.


5


.1


.


全量同步


全量同步流程比较复杂,流程如上图:


1


:


启动一个定时任务线程DistroLoadDataTask加载数据,调用load()方法加载数据


2


:


调用loadAllDataSnapshotFromRemote()方法从远程机器同步所有的数据


3


:


从namingProxy代理获取所有的数据data


4


:


构造http请求,调用httpGet方法从指定的server获取数据


5


:


从获取的结果result中获取数据bytes


6


:


处理数据processData


7


:


从data反序列化出datumMap


8


:


把数据存储到dataStore,也就是本地缓存dataMap


9


:


监听器不包括key,就创建一个空的service,并且绑定监听器


10


:


监听器listener执行成功后,就更新data store







任务启动


在com.alibaba.nacos.core.distributed.distro.


DistroProtocol


的构造函数中调用


startDistroTask


()方法,该方法会执行


startVerifyTask


()和


startLoadTask


() ,我们重点关注


startLoadTask


() ,该方法代码如下:






/***









* 启动DistroTask









*/









private void






startDistroTask() {








if






(EnvUtil.getStandaloneMode()) {






isInitialized =






true






;







return






;





}







//启动startVerifyTask,做数据同步校验









startVerifyTask();







//启动DistroLoadDataTask,批量加载数据









startLoadTask();





}





//启动DistroLoadDataTask









private void






startLoadTask() {








//处理状态回调对象









DistroCallback loadCallback =






new






DistroCallback() {








//处理成功









@Override







public void






onSuccess() {






isInitialized =






true






;





}





//处理失败









@Override







public void






onFailed(Throwable throwable) {






isInitialized =






false






;





}





};







//执行DistroLoadDataTask,是一个多线程









GlobalExecutor.submitLoadDataTask(






new






DistroLoadDataTask(memberManager, distroComponentHolder, distroConfig, loadCallback));





}





/***









* 启动startVerifyTask









* 数据校验









*/









private void






startVerifyTask() {






GlobalExecutor.schedulePartitionDataTimedSync(







new






DistroVerifyTask(memberManager, distroComponentHolder), distroConfig.getVerifyIntervalMillis());





}


数据校验





public class





DistroVerifyTask





implements





Runnable {





private final





ServerMemberManager





serverMemberManager





;




private final





DistroComponentHolder





distroComponentHolder





;




public





DistroVerifyTask(ServerMemberManager serverMemberManager, DistroComponentHolder distroComponentHolder) {







this





.





serverMemberManager





= serverMemberManager;






this





.





distroComponentHolder





= distroComponentHolder;





}



@Override






public void





run() {







try





{







// 获取集群中所有节点








List<Member> targetServer =





serverMemberManager





.allMembersWithoutSelf();






if





(Loggers.






DISTRO






.isDebugEnabled()) {






Loggers.






DISTRO






.debug(





“server list is: {}”





, targetServer);





}






for





(String each :





distroComponentHolder





.getDataStorageTypes()) {







// 同步数据






校验








verifyForDataStorage(each, targetServer);





}





}





catch





(Exception e) {






Loggers.






DISTRO






.error(





“[DISTRO-FAILED] verify task failed.”





, e);





}





}




private void





verifyForDataStorage(String type, List<Member> targetServer) {






DistroData distroData =





distroComponentHolder





.findDataStorage(type).getVerifyData();






if





(





null





== distroData) {







return





;





}





distroData.setType(DataOperation.






VERIFY






);






for





(Member member : targetServer) {







try





{







// 同步数据






校验









distroComponentHolder





.findTransportAgent(type).




syncVerifyData




(distroData, member.getAddress());





}





catch





(Exception e) {






Loggers.






DISTRO






.error(String





.





format





(





“[DISTRO-FAILED] verify data for type %s to %s failed.”





, type, member.getAddress()), e);





}





}





}





}


执行校验




@Override






public boolean





syncVerifyData(DistroData verifyData, String targetServer) {







if





(!





memberManager





.hasMember(targetServer)) {







return true





;





}





NamingProxy.





syncCheckSums





(verifyData.getContent(), targetServer);






return true





;





}






/**







* 同步检查总结







*/







public static void





syncCheckSums(





byte





[] checksums, String server) {







try





{






Map<String, String> headers =





new





HashMap<>(




128




);



headers.put(HttpHeaderConsts.






CLIENT_VERSION_HEADER






, VersionUtils.





version





);





headers.put(HttpHeaderConsts.






USER_AGENT_HEADER






, UtilsAndCommons.






SERVER_VERSION






);





headers.put(HttpHeaderConsts.






CONNECTION






,





“Keep-Alive”





);



HttpClient.





asyncHttpPutLarge





(






“http://”





+ server + EnvUtil.





getContextPath





() + UtilsAndCommons.






NACOS_NAMING_CONTEXT











+






TIMESTAMP_SYNC_URL






+





“?source=”





+ NetUtils.





localServer





(), headers, checksums,






new





Callback<String>() {






@Override






public void





onReceive(RestResult<String> result) {







if





(!result.ok()) {






Loggers.






DISTRO






.error(





“failed to req API: {}, code: {}, msg: {}”





,






“http://”





+




server




+ EnvUtil.





getContextPath





()





+ UtilsAndCommons.






NACOS_NAMING_CONTEXT






+






TIMESTAMP_SYNC_URL






,





result.getCode(), result.getMessage());





}





}



@Override






public void





onError(Throwable throwable) {






Loggers.






DISTRO






.error(





“failed to req API:”





+





“http://”





+




server




+ EnvUtil.





getContextPath





()





+ UtilsAndCommons.






NACOS_NAMING_CONTEXT






+






TIMESTAMP_SYNC_URL






, throwable);





}



@Override






public void





onCancel() {




}





});





}





catch





(Exception e) {






Loggers.






DISTRO






.warn(





“NamingProxy”





, e);





}





}








数据执行加载


上面方法会调用


DistroLoadDataTask


对象,而该对象其实是个线程,因此会执行它的


run


方法,


run


方法会调用


load


()方法实现数据全量加载,代码如下:






/***









* 数据加载过程









*/







@Override







public void






run() {








try






{








//加载数据









load();







if






(!checkCompleted()) {






GlobalExecutor.submitLoadDataTask(






this






, distroConfig.getLoadDataRetryDelayMillis());





}






else






{






loadCallback.onSuccess();





Loggers.DISTRO.info(






“[DISTRO-INIT] load snapshot data success”






);





}





}






catch






(Exception e) {






loadCallback.onFailed(e);





Loggers.DISTRO.error(






“[DISTRO-INIT] load snapshot data failed. ”






, e);





}





}





/***









* 加载数据,并同步









*










@throws










Exception









*/









private void






load()






throws






Exception {








while






(memberManager.allMembersWithoutSelf().isEmpty()) {






Loggers.DISTRO.info(






“[DISTRO-INIT] waiting server list init…”






);





TimeUnit.SECONDS.sleep(




1




);





}







while






(distroComponentHolder.getDataStorageTypes().isEmpty()) {






Loggers.DISTRO.info(






“[DISTRO-INIT] waiting distro data storage register…”






);





TimeUnit.SECONDS.sleep(




1




);





}







//同步数据











for






(String each : distroComponentHolder.getDataStorageTypes()) {








if






(!loadCompletedMap.containsKey(each) || !loadCompletedMap.get(each)) {








//从远程机器上同步所有数据









loadCompletedMap.put(each, loadAllDataSnapshotFromRemote(each));





}





}





}








数据同步


数据同步会通过


Http


请求从远程服务器获取数据,并同步到当前服务的缓存中,执行流程如下:


1


:


loadAllDataSnapshotFromRemote()从远程加载所有数据,并处理同步到本机


2


:


transportAgent.getDatumSnapshot()远程加载数据,通过Http请求执行远程加载


3


:


dataProcessor.processSnapshot()处理数据同步到本地


数据处理完整逻辑代码如下:loadAllDataSnapshotFromRemote()方法


:






/***









* 从远程机器上同步所有数据









*/









private boolean






loadAllDataSnapshotFromRemote(String resourceType) {






DistroTransportAgent transportAgent = distroComponentHolder.findTransportAgent(resourceType);





DistroDataProcessor dataProcessor = distroComponentHolder.findDataProcessor(resourceType);







if






(






null






== transportAgent ||






null






== dataProcessor) {






Loggers.DISTRO.warn(






“[DISTRO-INIT] Can’t find component for type {}, transportAgent: {}, dataProcessor: {}”






,





resourceType, transportAgent, dataProcessor);







return false






;





}







//遍历集群成员节点,不包括自己











for






(Member each : memberManager.allMembersWithoutSelf()) {








try






{






Loggers.DISTRO.info(






“[DISTRO-INIT] load snapshot {} from {}”






, resourceType, each.getAddress());







//从远程节点加载数据,调用http请求接口: distro/datums;









DistroData distroData = transportAgent.getDatumSnapshot(each.getAddress());







//处理数据











boolean






result = dataProcessor.processSnapshot(distroData);





Loggers.DISTRO.info(






“[DISTRO-INIT] load snapshot {} from {} result: {}”






, resourceType, each.getAddress(), result);







if






(result) {








return true






;





}





}






catch






(Exception e) {






Loggers.DISTRO.error(






“[DISTRO-INIT] load snapshot {} from {} failed.”






, resourceType, each.getAddress(), e);





}





}







return false






;





}



远程加载数据代码如下:transportAgent.


getDatumSnapshot


() 方法






/***









* 从namingProxy代理获取所有的数据data,从获取的结果result中获取数据bytes;









*










@param












targetServer










target server.









*










@return















*/







@Override







public






DistroData getDatumSnapshot(String targetServer) {








try






{








//从namingProxy代理获取所有的数据data,从获取的结果result中获取数据bytes;











byte






[] allDatum = NamingProxy.getAllData(targetServer);







//将数据封装成DistroData











return new






DistroData(






new






DistroKey(






“snapshot”






, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), allDatum);





}






catch






(Exception e) {








throw new






DistroException(String.






format






(






“Get snapshot from %s failed.”






, targetServer), e);





}





}





/**









* Get all datum from target server.









* NamingProxy.getAllData









* 执行HttpGet请求,并获取返回数据









*









*










@param












server










target server address









*










@return










all datum byte array









*










@throws










Exception exception









*/









public static byte






[] getAllData(String server)






throws






Exception {








//参数封装









Map<String, String> params =






new






HashMap<>(




8




);







//组装URL,并执行HttpGet请求,获取结果集









RestResult<String> result = HttpClient.httpGet(






“http://”






+ server + EnvUtil.getContextPath() +





UtilsAndCommons.NACOS_NAMING_CONTEXT + ALL_DATA_GET_URL,






new






ArrayList<>(), params);







//返回数据











if






(result.ok()) {








return






result.getData().getBytes();





}







throw new






IOException(






“failed to req API: ”






+






“http://”









+ server + EnvUtil.getContextPath()





+ UtilsAndCommons.NACOS_NAMING_CONTEXT





+ ALL_DATA_GET_URL +






“. code:”









+ result.getCode() +






” msg: ”









+ result.getMessage());





}



处理数据同步到本地




@Override






public boolean





processSnapshot(DistroData distroData) {







try





{







return





processData




(distroData.getContent());





}





catch





(Exception e) {







return false





;





}





}



dataProcessor.


processSnapshot


()






/**









* 数据处理并更新本地缓存









*









*










@param












data















*










@return















*










@throws










Exception









*/









private boolean






processData(






byte






[] data)






throws






Exception {








if






(data.






length






>




0




) {








//从data反序列化出datumMap









Map<String, Datum<Instances>> datumMap = serializer.deserializeMap(data, Instances.






class






);







// 把数据存储到dataStore,也就是本地缓存dataMap











for






(Map.Entry<String, Datum<Instances>> entry : datumMap.entrySet()) {






dataStore.put(entry.getKey(), entry.getValue());







//监听器不包括key,就创建一个空的service,并且绑定监听器











if






(!listeners.containsKey(entry.getKey())) {








// pretty sure the service not exist:











if






(switchDomain.isDefaultInstanceEphemeral()) {








// create empty service









//创建一个空的service









Loggers.DISTRO.info(






“creating service {}”






, entry.getKey());





Service service =






new






Service();





String serviceName = KeyBuilder.getServiceName(entry.getKey());





String namespaceId = KeyBuilder.getNamespace(entry.getKey());





service.setName(serviceName);





service.setNamespaceId(namespaceId);





service.setGroupName(Constants.DEFAULT_GROUP);







// now validate the service. if failed, exception will be thrown









service.setLastModifiedMillis(System.






currentTimeMillis






());





service.recalculateChecksum();







// The Listener corresponding to the key value must not be empty









// 与键值对应的监听器不能为空,这里的监听器类型是 ServiceManager









RecordListener listener = listeners.get(KeyBuilder.SERVICE_META_KEY_PREFIX).peek();







if






(Objects.isNull(listener)) {








return false






;





}







//为空的绑定监听器









listener.onChange(KeyBuilder.buildServiceMetaKey(namespaceId, serviceName), service);





}





}





}







//循环所有datumMap











for






(Map.Entry<String, Datum<Instances>> entry : datumMap.entrySet()) {








if






(!listeners.containsKey(entry.getKey())) {








// Should not happen:









Loggers.DISTRO.warn(






“listener of {} not found.”






, entry.getKey());







continue






;





}







try






{








//执行监听器的onChange监听方法











for






(RecordListener listener : listeners.get(entry.getKey())) {






listener.onChange(entry.getKey(), entry.getValue().value);





}





}






catch






(Exception e) {






Loggers.DISTRO.error(






“[NACOS-DISTRO] error while execute listener of key: {}”






, entry.getKey(), e);







continue






;





}







// Update data store if listener executed successfully:









// 监听器listener执行成功后,就更新dataStore









dataStore.put(entry.getKey(), entry.getValue());





}





}







return true






;





}



到此实现数据全量同步,其实全量同步最终封装的协议还是


Http




1.


3


.


5


.2


.


增量同步


新增数据使用异步广播同步:


1


:


DistroProtocol 使用 sync() 方法接收增量数据


2


:


向其他节点发布广播任务


调用 distroTaskEngineHolder 发布延迟任务


3


:


调用 DistroDelayTaskProcessor.process() 方法进行任务投递:将延迟任务转换为异步变更任务


4


:


执行变更任务 DistroSyncChangeTask.run() 方法:向指定节点发送消息


调用 DistroHttpAgent.syncData() 方法发送数据


调用 NamingProxy.syncData() 方法发送数据


5


:


异常任务调用 handleFailedTask() 方法进行处理


调用 DistroFailedTaskHandler 处理失败任务


调用 DistroHttpCombinedKeyTaskFailedHandler 将失败任务重新投递成延迟任务。







增量数据入口


我们回到服务注册,服务注册的InstanceController.


register


() 就是数据入口,它会调用ServiceManager.


registerInstance


(),执行数据同步的时候,调用


addInstance


() ,在该方法中会执行 DistroConsistencyServiceImpl.


put


(),该方法是增量同步的入口,会调用distroProtocol.


sync


()方法,代码如下:






/***









* 数据保存









*










@param












key










key of data, this key should be globally unique









*










@param












value










value of data









*










@throws










NacosException









*/







@Overridesync()







public void






put(String key, Record value)






throws






NacosException {








//将数据存入到dataStore中









onPut(key, value);







//使用distroProtocol同步数据









distroProtocol.sync(






new






DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX),





DataOperation.CHANGE, globalConfig.getTaskDispatchPeriod() /




2




);





}



sync


()


方法会执行任务发布,代码如下:






public void






sync(DistroKey distroKey, DataOperation action,






long






delay) {








//向除了自己外的所有节点广播











for






(Member each : memberManager.allMembersWithoutSelf()) {






DistroKey distroKeyWithTarget =






new






DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(), each.getAddress());





DistroDelayTask distroDelayTask =






new






DistroDelayTask(distroKeyWithTarget, action, delay);







//从distroTaskEngineHolder获取延时执行引擎,并将distroDelayTask任务添加进来









//执行延时任务发布









distroTaskEngineHolder.getDelayTaskExecuteEngine().





addTask(distroKeyWithTarget, distroDelayTask);







if






(Loggers.DISTRO.isDebugEnabled()) {






Loggers.DISTRO.debug(






“[DISTRO-SCHEDULE] {} to {}”






, distroKey, each.getAddress());





}





}





}








增量同步操作




延迟任务




对象我们可以从


DistroTaskEngineHolder


构造函数中得知是


DistroDelayTaskProcessor


,代码如下:






/***









* 构造函数指定任务处理器









*










@param












distroComponentHolder















*/









public






DistroTaskEngineHolder(DistroComponentHolder distroComponentHolder) {






DistroDelayTaskProcessor defaultDelayTaskProcessor =






new






DistroDelayTaskProcessor(






this






, distroComponentHolder);







//指定任务处理器defaultDelayTaskProcessor









delayTaskExecuteEngine.setDefaultTaskProcessor(defaultDelayTaskProcessor);





}



它延迟执行的时候会执行


process


方法,该方法正是执行数据同步的地方,它会执行DistroSyncChangeTask任务,代码如下:






/***









* 任务处理过程









*










@param












task










task.









*










@return















*/







@Override







public boolean






process(NacosTask task) {








if






(!(task






instanceof






DistroDelayTask)) {








return true






;





}





DistroDelayTask distroDelayTask = (DistroDelayTask) task;





DistroKey distroKey = distroDelayTask.getDistroKey();







if






(DataOperation.CHANGE.equals(distroDelayTask.getAction())) {








//将延迟任务变更成异步任务,异步任务对象是一个线程









DistroSyncChangeTask syncChangeTask =






new






DistroSyncChangeTask(distroKey, distroComponentHolder);







//将任务添加到NacosExecuteTaskExecuteEngine中,并执行









distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, syncChangeTask);







return true






;





}







return false






;





}



DistroSyncChangeTask


实质上是任务的开始,它自身是一个线程,所以会执行它的


run


方法,而


run


方法这是数据同步操作,代码如下:






/***









* 执行数据同步









*/







@Override







public void






run() {






Loggers.DISTRO.info(






“[DISTRO-START] {}”






, toString());







try






{








//获取本地缓存数据









String type = getDistroKey().getResourceType();





DistroData distroData = distroComponentHolder.findDataStorage(type).getDistroData(getDistroKey());





distroData.setType(DataOperation.CHANGE);







//向其他节点同步数据











boolean






result = distroComponentHolder.findTransportAgent(type).syncData(distroData, getDistroKey().getTargetServer());







if






(!result) {






handleFailedTask();





}





Loggers.DISTRO.info(






“[DISTRO-END] {} result: {}”






, toString(), result);





}






catch






(Exception e) {






Loggers.DISTRO.warn(






“[DISTRO] Sync data change failed.”






, e);





handleFailedTask();





}





}



数据同步会执行调用


syncData


,该方法其实就是通过


Http


协议将数据发送到其他节点实现数据同步,代码如下:






/***









* 向其他节点同步数据









*










@param












data










data









*










@param












targetServer










target server









*










@return















*/







@Override







public boolean






syncData(DistroData data, String targetServer) {








if






(!memberManager.hasMember(targetServer)) {








return true






;





}







//获取数据字节数组











byte






[] dataContent = data.getContent();







//通过Http协议同步数据











return






NamingProxy.syncData(dataContent, data.getDistroKey().getTargetServer());





}








详细增量数据同步(单机)


Distro

一致性算法,临时数据一致性数据结构

DataStore

,临时数据会被存储到这个数据结构中。





/**







* Store of data.







* Distro 临时数据一致性 数据存储结构







*







@author







nkorange







*







@since







1.0.0







*/






@Component






public class





DataStore {





private





Map<String, Datum>





dataMap





=





new





ConcurrentHashMap<>(




1024




);




public void





put(String key, Datum value) {







dataMap





.put(key, value);





}




public





Datum remove(String key) {







return






dataMap





.remove(key);





}




public





Set<String> keys() {







return






dataMap





.keySet();





}




public





Datum get(String key) {







return






dataMap





.get(key);





}




public boolean





contains(String key) {







return






dataMap





.containsKey(key);





}




/**







* Batch get datum for a list of keys.







*







*







@param








keys







of datum







*







@return







list of datum







*/









public





Map<String, Datum> batchGet(List<String> keys) {






Map<String, Datum> map =





new





HashMap<>(




128




);






for





(String key : keys) {






Datum datum =





dataMap





.get(key);






if





(datum ==





null





) {







continue





;





}





map.put(key, datum);





}






return





map;





}




public int





getInstanceCount() {







int





count =




0




;






for





(Map.Entry<String, Datum> entry :





dataMap





.entrySet()) {







try





{






Datum instancesDatum = entry.getValue();






if





(instancesDatum.





value






instanceof





Instances) {






count += ((Instances) instancesDatum.





value





).getInstanceList().size();





}





}





catch





(Exception ignore) {






}





}






return





count;





}




public





Map<String, Datum> getDataMap() {







return






dataMap





;





}





}


1)添加任务

单机数据同步,首先要添加任务。服务注册会请求InstanceController

post

方法注册服务数据。





/**







* Register new instance.







* 服务数据注册







*







@param








request







http request







*







@return







‘ok’ if success







*







@throws







Exception any error during register







*/






@CanDistro





//Distro协议(数据临时一致性协议)






@PostMapping





@Secured




(parser = NamingResourceParser.





class





, action = ActionTypes.






WRITE






)






public





String




register




(HttpServletRequest request)





throws





Exception {





final





String namespaceId = WebUtils





.





optional





(request, CommonParams.






NAMESPACE_ID






, Constants.






DEFAULT_NAMESPACE_ID






);






final





String serviceName = WebUtils.





required





(request, CommonParams.






SERVICE_NAME






);





NamingUtils.





checkServiceNameFormat





(serviceName);




final





Instance instance = parseInstance(request);






// 注册服务实例









serviceManager





.registerInstance(namespaceId, serviceName, instance);






return






“ok”





;





}


注册服务数据





public void





registerInstance(String namespaceId, String serviceName, Instance instance)





throws





NacosException {




createEmptyService(namespaceId, serviceName, instance.isEphemeral());



Service service = getService(namespaceId, serviceName);




if





(service ==





null





) {







throw new





NacosException(NacosException.






INVALID_PARAM






,






“service not found, namespace: ”





+ namespaceId +





“, service: ”





+ serviceName);





}






// 添加服务实例








addInstance




(namespaceId, serviceName, instance.isEphemeral(), instance);





}



添加服务实例





/**







* Add instance to service.







* 将实例添加到服务







*







@param








namespaceId







namespace







*







@param








serviceName







service name







*







@param








ephemeral







whether instance is ephemeral







*







@param








ips







instances







*







@throws







NacosException nacos exception







*/







public void





addInstance(String namespaceId, String serviceName,





boolean





ephemeral, Instance… ips)






throws





NacosException {




String key = KeyBuilder.





buildInstanceListKey





(namespaceId, serviceName, ephemeral);



Service service = getService(namespaceId, serviceName);






// 同步 防止高并发 这种同步锁效率低









synchronized





(service) {






List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);



Instances instances =





new





Instances();





instances.setInstanceList(instanceList);




consistencyService





.




put




(key, instances);





}





}



Put

到中DistroConsistencyServiceImpl

执行数据同步。




@Override






public void





put(String key, Record value)





throws





NacosException {






onPut(key, value);






// 数据同步









distroProtocol





.




sync




(





new





DistroKey(key, KeyBuilder.






INSTANCE_LIST_KEY_PREFIX






), DataOperation.






CHANGE






,






globalConfig





.getTaskDispatchPeriod() /




2




);





}



同步数据到远端的服务





/**







* Start to sync data to all remote server.







* 同步远端服务数据







*







@param








distroKey







distro key of sync data







*







@param








action







the action of data operation







*/







public void





sync(DistroKey distroKey, DataOperation action,





long





delay) {







// 向除了自己以






外的所有






节点广播







for





(Member each :





memberManager





.allMembersWithoutSelf()) {






DistroKey distroKeyWithTarget =





new





DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(),





each.getAddress());






// 创建 Distro定时任务








DistroDelayTask distroDelayTask =





new





DistroDelayTask(distroKeyWithTarget, action, delay);






// 添加定时任务









distroTaskEngineHolder









// 获取延迟执行引擎








.getDelayTaskExecuteEngine()






// 添加延迟任务








.




addTask




(distroKeyWithTarget, distroDelayTask);






if





(Loggers.






DISTRO






.isDebugEnabled()) {






Loggers.






DISTRO






.debug(





“[DISTRO-SCHEDULE] {} to {}”





, distroKey, each.getAddress());





}





}





}


创建

Distro

定时任务,可以看到里面有一些定时任务信息。





public class





DistroDelayTask





extends





AbstractDelayTask {





private final





DistroKey





distroKey





;




private





DataOperation





action





;




private long






createTime





;




public





DistroDelayTask(DistroKey distroKey,





long





delayTime) {







this





(distroKey, DataOperation.






CHANGE






, delayTime);





}




public





DistroDelayTask(DistroKey distroKey, DataOperation action,





long





delayTime) {







this





.





distroKey





= distroKey;






this





.





action





= action;






this





.





createTime





= System.





currentTimeMillis





();





setLastProcessTime(





createTime





);





setTaskInterval(delayTime);





}




public





DistroKey getDistroKey() {







return






distroKey





;





}




public





DataOperation getAction() {







return






action





;





}




public long





getCreateTime() {







return






createTime





;





}



@Override






public void





merge(AbstractDelayTask task) {







if





(!(task





instanceof





DistroDelayTask)) {







return





;





}





DistroDelayTask newTask = (DistroDelayTask) task;






if





(!





action





.equals(newTask.getAction()) &&





createTime





< newTask.getCreateTime()) {







action





= newTask.getAction();






createTime





= newTask.getCreateTime();





}





setLastProcessTime(newTask.getLastProcessTime());





}





}



添加延迟任务,将任务存储到


task

(ConcurrentHashMap<Object, AbstractDelayTask>)

中,后面执行任务的时候会从这个


task


中获取任务。




@Override






public void





addTask(Object key, AbstractDelayTask newTask) {







// 同步锁 防止重复添加









lock





.lock();






try





{






AbstractDelayTask existTask =





tasks





.get(key);






if





(





null





!= existTask) {






newTask.merge(existTask);





}






// 任务存储 任务添加结束









tasks





.put(key, newTask);





}





finally





{







lock





.unlock();





}





}



2)任务执行

添加任务前DistroTaskEngineHolder会添加定时任务获取延迟执行引擎。





/**







* Start to sync data to all remote server.







* 同步远端服务数据







*







@param








distroKey







distro key of sync data







*







@param








action







the action of data operation







*/







public void





sync(DistroKey distroKey, DataOperation action,





long





delay) {







// 向除了自己以外的所有节点广播







for





(Member each :





memberManager





.allMembersWithoutSelf()) {






DistroKey distroKeyWithTarget =





new





DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(),





each.getAddress());






// 创建 Distro定时任务








DistroDelayTask distroDelayTask =





new





DistroDelayTask(distroKeyWithTarget, action, delay);






// 添加定时任务









distroTaskEngineHolder









// 获取延迟执行引擎








.get




DelayTask




ExecuteEngine()






// 添加延迟任务








.addTask(distroKeyWithTarget, distroDelayTask);






if





(Loggers.






DISTRO






.isDebugEnabled()) {






Loggers.






DISTRO






.debug(





“[DISTRO-SCHEDULE] {} to {}”





, distroKey, each.getAddress());





}





}





}



临时数据同步引擎,延迟执行任务。创建任务处理器并指定其处理任务。





/**







* Distro task engine holder.







* Doistro任务执行者







* 临时数据同步 引擎 延迟执行







*







@author







xiweng.yy







*/






@Component






public class





DistroTaskEngineHolder {







// 延迟执行 引擎









private final





DistroDelayTaskExecuteEngine





delayTaskExecuteEngine





=





new





DistroDelayTaskExecuteEngine();




private final





DistroExecuteTaskExecuteEngine





executeWorkersManager





=





new





DistroExecuteTaskExecuteEngine();




public





DistroTaskEngineHolder(DistroComponentHolder distroComponentHolder) {







// 任务处理器








DistroDelayTaskProcessor defaultDelayTaskProcessor =





new





DistroDelayTaskProcessor(





this





, distroComponentHolder);






// 指定任务处理器









delayTaskExecuteEngine





.setDefaultTaskProcessor(defaultDelayTaskProcessor);





}




public





DistroDelayTaskExecuteEngine getDelayTaskExecuteEngine() {







return






delayTaskExecuteEngine





;





}




public





DistroExecuteTaskExecuteEngine getExecuteWorkersManager() {







return






executeWorkersManager





;





}




public void





registerNacosTaskProcessor(Object key, NacosTaskProcessor nacosTaskProcessor) {







this





.





delayTaskExecuteEngine





.addProcessor(key, nacosTaskProcessor);





}





}


跟踪延迟执行引擎创建,可以看到其父类

NacosDelayTaskExecuteEngine


创建了延迟执行引擎。





/**







* 创建任务执行器







*/







public





NacosDelayTaskExecuteEngine(String name,





int





initCapacity, Logger logger,





long





processInterval) {







super





(logger);






tasks





=





new





ConcurrentHashMap<Object, AbstractDelayTask>(initCapacity);






processingExecutor





= ExecutorFactory.





newSingleScheduledExecutorService





(





new





NameThreadFactory(name));






processingExecutor









// 定时执行ProcessRunnable任务








.scheduleWithFixedDelay(





new





ProcessRunnable




(), processInterval, processInterval, TimeUnit.






MILLISECONDS






);





}



定时执行器中创建了一个执行线程,会定时去执行这个线程



ProcessRunnable





/**







* 定时任务执行的线程







*/







private class





ProcessRunnable





implements





Runnable {




@Override






public void





run() {







try





{






processTasks




();





}





catch





(Throwable e) {






getEngineLog().error(e.toString(), e);





}





}





}


线程回调NacosDelayTaskExecuteEngine中的

processTasks

方法





/**







* process tasks in execute engine.







*/







protected void





processTasks() {






Collection<Object> keys = getAllTaskKeys();






for





(Object taskKey : keys) {







// 从task中获取任务 这个任务






在任务添加时






已经添加到了task中








AbstractDelayTask task =




removeTask




(taskKey);






if





(





null





== task) {







continue





;





}






// 获取任务执行对象  DistroDelayTaskProcessor







// 先从缓存中获取 缓存中没有就获取前面创建的对象 这个对象在DistroTaskEngineHolder创建的时候已经被创建了








NacosTaskProcessor processor =




getProcessor




(taskKey);






if





(





null





== processor) {






getEngineLog().error(





“processor not found for task, so discarded. ”





+ task);






continue





;





}






try





{







// ReAdd task if process failed









if





(!processor.




process




(task)) {







// 存储任务








retryFailedTask




(taskKey, task);





}





}





catch





(Throwable e) {






getEngineLog().error(





“Nacos task execute error : ”





+ e.toString(), e);





retryFailedTask(taskKey, task);





}





}





}



任务执行成功后会将任务存储到服务器缓存中。





private void





retryFailedTask(Object key, AbstractDelayTask task) {







// 设置任务进程时间








task.setLastProcessTime(System.





currentTimeMillis





());






// 存储任务








addTask(key, task);





}





@Override






public void





addTask(Object key, AbstractDelayTask newTask) {







// 同步锁 防止重复添加









lock





.lock();






try





{







// 先获取任务








AbstractDelayTask existTask =





tasks





.get(key);






// 如果任务不为空 合并更新任务









if





(





null





!= existTask) {






newTask.merge(existTask);





}






// 任务存储 任务添加结束









tasks





.put(key, newTask);





}





finally





{







lock





.unlock();





}





}


前面做了

3

件事,获取任务、任务执行对象、执行任务。

获取任务





/**







* 获取一个任务 并将任务从tasks中移除







*/






@Override






public





AbstractDelayTask removeTask(Object key) {







lock





.lock();






try





{






AbstractDelayTask task =





tasks





.get(key);






if





(





null





!= task && task.shouldProcess()) {







return






tasks





.remove(key);





}





else





{







return null





;





}





}





finally





{







lock





.unlock();





}





}


获取任务执行对象





private final





ConcurrentHashMap<Object, NacosTaskProcessor>





taskProcessors





=





new





ConcurrentHashMap<Object, NacosTaskProcessor>();



@Override






public





NacosTaskProcessor getProcessor(Object key) {







return






taskProcessors





.containsKey(key) ?





taskProcessors





.get(key) :





defaultTaskProcessor





;





}


DistroDelayTaskProcessor执行任务





/**







* 任务执行方法







*/






@Override






public boolean





process(NacosTask task) {







if





(!(task





instanceof





DistroDelayTask)) {







return true





;





}





DistroDelayTask distroDelayTask = (DistroDelayTask) task;





DistroKey distroKey = distroDelayTask.getDistroKey();






if





(DataOperation.






CHANGE






.equals(distroDelayTask.getAction())) {







// 创建数据同步快照






后面回调用它的run方法








DistroSyncChangeTask syncChangeTask =





new





DistroSyncChangeTask(distroKey,





distroComponentHolder





);






// 同步快照任务









distroTaskEngineHolder





.getExecuteWorkersManager().




addTask




(distroKey, syncChangeTask);






return true





;





}






return false





;





}



同步任务继续执行同步





/**







* 执行同步任务







*/






@Override






public void





addTask(Object tag, AbstractExecuteTask task) {







// 获取处理对象








NacosTaskProcessor processor = getProcessor(tag);






if





(





null





!= processor) {







// 处理任务








processor.process(task);






return





;





}






// 获取任务








TaskExecuteWorker worker = getWorker(tag);






// 添加任务








worker.process(task);





}





@Override






public boolean





process(NacosTask task) {







if





(task





instanceof





AbstractExecuteTask) {







// 添加任务








putTask((Runnable) task);





}






return true





;





}






/**







* 获取任务执行器







*/







private





TaskExecuteWorker getWorker(Object tag) {







int





idx = (tag.hashCode() & Integer.






MAX_VALUE






) % workersCount();






// 任务执行器在 创建DistroTaskEngineHolder对象时创建DistroExecuteTaskExecuteEngine时被创建









return






executeWorkers





[idx];





}



将任务添加到队列中





/**







* 将同步任务对象添加到队列(BlockingQueue)中







*/







private void





putTask(Runnable task) {







try





{







// 添加DistroSyncChangeTask(前面创建的)到队列中









queue





.put(task);





}





catch





(InterruptedException ire) {







log





.error(ire.toString(), ire);





}





}


在前面创建DistroTaskEngineHolder对象时,会初始化DistroExecuteTaskExecuteEngine,初始化时会创建其父类

TaskExecuteWorker





public





TaskExecuteWorker(





final





String name,





final int





mod,





final int





total,





final





Logger logger) {







this





.





name





= name +





“_”





+ mod +





“%”





+ total;






this





.





queue





=





new





ArrayBlockingQueue<Runnable>(






QUEUE_CAPACITY






);






this





.





closed





=





new





AtomicBoolean(





false





);






this





.





log





=





null





== logger ? LoggerFactory.





getLogger





(TaskExecuteWorker.





class





) : logger;






new





InnerWorker




(name).start();





}


这里会创建一个线程,此时从队列中取出前面放进队列中的同步任务对象, 去执行任务对象

DistroSyncChangeTask

数据同步对象的

run

方法。





/**







* Inner execute worker.







*/







private class





InnerWorker





extends





Thread {




InnerWorker(String name) {






setDaemon(





false





);





setName(name);





}



@Override






public void





run() {







while





(!





closed





.get()) {







try





{







// 从队列取出任务








Runnable task =





queue





.take();






long





begin = System.





currentTimeMillis





();






// 执行任务对象 DistroSyncChangeTask 数据同步对象








task.run();






long





duration = System.





currentTimeMillis





() – begin;






if





(duration >




1000L




) {







log





.warn(





“distro task {} takes {}ms”





, task, duration);





}





}





catch





(Throwable e) {







log





.error(





“[DISTRO-FAILED] ”





+ e.toString(), e);





}





}





}





}



DistroSyncChangeTask

是一个线程,由

InnerWorker

调用它的

run

方法,执行远程调用同步数据。




@Override






public void





run() {






Loggers.






DISTRO






.info(





“[DISTRO-START] {}”





, toString());






try





{






String type = getDistroKey().getResourceType();






// 查找数据








DistroData distroData =





distroComponentHolder





.findDataStorage(type).getDistroData(getDistroKey());





distroData.setType(DataOperation.






CHANGE






);






// 向其他节点同步数据









boolean





result =





distroComponentHolder





.findTransportAgent(type).




syncData




(distroData, getDistroKey().getTargetServer());






if





(!result) {






handleFailedTask();





}





Loggers.






DISTRO






.info(





“[DISTRO-END] {} result: {}”





, toString(), result);





}





catch





(Exception e) {






Loggers.






DISTRO






.warn(





“[DISTRO] Sync data change failed.”





, e);





handleFailedTask();





}





}


同步数据





/**







* 数据同步







*/






@Override






public boolean





syncData(DistroData data, String targetServer) {







if





(!





memberManager





.hasMember(targetServer)) {







return true





;





}






byte





[] dataContent = data.getContent();






// 远程同步请求









return





NamingProxy.





syncData





(dataContent, data.getDistroKey().getTargetServer());





}


代理对象远程同步数据





/**







* Synchronize datum to target server.







* 同步数据到目标服务器







*







*







@param








data







datum







*







@param








curServer







target server address







*







@return







true if sync successfully, otherwise false







*/







public static boolean





syncData(





byte





[] data, String curServer) {






Map<String, String> headers =





new





HashMap<>(




128




);



headers.put(HttpHeaderConsts.






CLIENT_VERSION_HEADER






, VersionUtils.





version





);





headers.put(HttpHeaderConsts.






USER_AGENT_HEADER






, UtilsAndCommons.






SERVER_VERSION






);





headers.put(HttpHeaderConsts.






ACCEPT_ENCODING






,





“gzip,deflate,sdch”





);





headers.put(HttpHeaderConsts.






CONNECTION






,





“Keep-Alive”





);





headers.put(HttpHeaderConsts.






CONTENT_ENCODING






,





“gzip”





);




try





{







// http发送同步数据请求 /v1/ns/distro/datum








RestResult<String> result = HttpClient.





httpPutLarge





(






“http://”





+ curServer + EnvUtil.





getContextPath





() + UtilsAndCommons.






NACOS_NAMING_CONTEXT











+






DATA_ON_SYNC_URL






, headers, data);






if





(result.ok()) {







return true





;





}






if





(HttpURLConnection.






HTTP_NOT_MODIFIED






== result.getCode()) {







return true





;





}






throw new





IOException(





“failed to req API:”





+





“http://”





+ curServer + EnvUtil.





getContextPath





()





+ UtilsAndCommons.






NACOS_NAMING_CONTEXT






+






DATA_ON_SYNC_URL






+





“. code:”





+ result.getCode() +





” msg: ”








+ result.getData());





}





catch





(Exception e) {






Loggers.






SRV_LOG






.warn(





“NamingProxy”





, e);





}






return false





;





}



H


ttp

发送同步数据请求 /v1/ns/distro/datum调用DistroController开始同步数据。





/**







* Synchronize datum.







* 数据同步(临时)







*







@param








dataMap







data map







*







@return







‘ok’ if success







*







@throws







Exception if failed







*/






@PutMapping




(





“/datum”





)






public





ResponseEntity onSyncDatum(




@RequestBody




Map<String, Datum<Instances>> dataMap)





throws





Exception {





if





(dataMap.isEmpty()) {






Loggers.






DISTRO






.error(





“[onSync] receive empty entity!”





);






throw new





NacosException(NacosException.






INVALID_PARAM






,





“receive empty entity!”





);





}




for





(Map.Entry<String, Datum<Instances>> entry : dataMap.entrySet()) {







if





(KeyBuilder.





matchEphemeralInstanceListKey





(entry.getKey())) {






String namespaceId = KeyBuilder.





getNamespace





(entry.getKey());





String serviceName = KeyBuilder.





getServiceName





(entry.getKey());






if





(!





serviceManager





.containService(namespaceId, serviceName) &&





switchDomain








.isDefaultInstanceEphemeral()) {







serviceManager





.createEmptyService(namespaceId, serviceName,





true





);





}





DistroHttpData distroHttpData =





new





DistroHttpData(createDistroKey(entry.getKey()), entry.getValue());






distroProtocol





.




onReceive




(distroHttpData);





}





}






return





ResponseEntity.





ok





(





“ok”





);





}


接收同步的发布数据,找到处理器处理。





/**







* Receive synced distro data, find processor to process.







* 接收同步的发布数据,找到处理器处理







*







@param








distroData







Received data







*







@return







true if handle receive data successfully, otherwise false







*/







public boolean





onReceive(DistroData distroData) {






String resourceType = distroData.getDistroKey().getResourceType();





DistroDataProcessor dataProcessor =





distroComponentHolder





.findDataProcessor(resourceType);






if





(





null





== dataProcessor) {






Loggers.






DISTRO






.warn(





“[DISTRO] Can’t find data process for received data {}”





, resourceType);






return false





;





}






return





dataProcessor.




processData




(distroData);





}


数据处理





/**







* 数据处理







*/






@Override






public boolean





processData(DistroData distroData) {






DistroHttpData distroHttpData = (DistroHttpData) distroData;





Datum<Instances> datum = (Datum<Instances>) distroHttpData.getDeserializedContent();






// 数据同步 将数据添加到dataStore中








onPut(datum.





key





, datum.





value





);






return true





;





}


数据同步 将数据添加到

dataStore





/**







* Put a new record.







*







*







@param








key







key of record







*







@param








value







record







*/







public void





onPut(String key, Record value) {





if





(KeyBuilder.





matchEphemeralInstanceListKey





(key)) {






Datum<Instances> datum =





new





Datum<>();





datum.





value





= (Instances) value;





datum.





key





= key;





datum.





timestamp





.incrementAndGet();






dataStore





.put(key, datum);





}




if





(!





listeners





.containsKey(key)) {







return





;





}




notifier





.addTask(key, DataOperation.






CHANGE






);





}



Distro

临时数据一致性 数据存储结构,添加进这个数据模型中。





public void





put(String key, Datum value) {







// key:Group@服务名称









dataMap





.put(key, value);





}



2


.0.


Sentinel Dashboard数据持久化


Sentinel


的理念是开发者只需要关注资源的定义,当资源定义成功后可以动态增加各种流控降级规则。


Sentinel


提供两种方式修改规则:



通过


API


直接修改( loadRules )



通过


DataSource


适配不同数据源修改


手动通过


API


修改比较直观,可以通过以下几个


API


修改不同的规则:


FlowRuleManager.


loadRules


(List<FlowRule> rules); // 修改流控规则


DegradeRuleManager.


loadRules


(List<DegradeRule> rules); // 修改降级规则


手动修改规则


(


硬编码方式


)


一般仅用于




测试和演示




,生产上一般通过动态规则源的方式来动态管理规则。




2.1




.




动态配置原理




loadRules


()方法只接受内存态的规则对象,但更多时候规则存储在文件、数据库或者配置中心当中。


DataSource


接口给我们提供了对接任意配置源的能力。相比直接通过


API


修改规则,实现


DataSource


接口是更加可靠的做法。


我们推荐通过控制台设置规则后将规则推送到统一的规则中心,客户端实现


ReadableDataSource


接口端监听规则中心实时获取变更,流程如下:


DataSource


扩展常见的实现方式有:




拉模式



:客户端主动向某个规则管理中心定期




轮询拉取




规则,这个规则中心可以是


RDBMS


、文件,甚至是


VCS


等。这样做的方式是




简单




,缺点是




无法及时获取变更








推模式



:规则中心统一推送,客户端通过注册监听器的方式时刻监听变化,比如使用


Nacos





Zookeeper


等配置中心。这种方式有更好的实时性和一致性保证。


Sentinel


目前支持以下数据源扩展:



Pull-based: 动态文件数据源、


Consul


,


Eureka



Push-based:


ZooKeeper


,


Redis


,


Nacos


,


Apollo


,


etcd




2.2




.




Sentinel+Nacos数据持久化




我们要想实现


Sentinel


+


Nacos


数据持久化,需要下载


Sentinel


控制台源码,关于源码下载我们这里就不再重复了。





Sentinel Dashboard


中配置规则之后重启应用就会丢失,所以实际生产环境中需要配置规则的持久化实现,


Sentinel


提供多种不同的数据源来持久化规则配置,包括


file





redis





nacos





zk




这就需要涉及到


Sentinel Dashboard


的规则管理及推送功能:集中管理和推送规则。


sentinel





core


提供


API


和扩展接口来接收信息。开发者需要根据自己的环境,选取一个可靠的推送规则方式;同时,规则最好在控制台中集中管理。


我们采用


Push


模式,即


Sentinel





Dashboard


统一管理配置,然后将规则统一推送到


Nacos


并持久化


(


生成配置文件


)


,最后客户端监听


Nacos


(


这一部了解使用过


Nacos


的话应该很熟,采用ConfifigService.


getConfg


()方法获取配置文件


)


,下发配置生成


Rule




这张图的意思我们解释说明一下:


1


:


Sentinel Dashboard界面配置流控规则—发布/推送—>Nacos生成配置文件并持久化;


2


:


通过Nacos配置文件修改流控规则—拉取—>Sentinel Dashboard界面显示最新的流控规则。





Nacos


控制台上修改流控制,虽然可以同步到


Sentinel Dashboard


,但是


Nacos


此时应该作为一个流控规则的持久化平台,所以正常操作过程应该是开发者在


Sentinel Dashboard


上修改流控规则后同步到


Nacos


,遗憾的是目前


Sentinel Dashboard


不支持该功能。


如果公司没有统一在


Sentinel Dashboard





Nacos


中二选一进行配置,而是一会在


Sentinel Dashboard


配置,一会在


Nacos


配置。那么就会出现很严重的问题


(


流控规则达不到预期,配置数据不一致


)


,所以推荐使用


Sentinel Dashboard


统一界面进行配置管理流控规则。


我们接下来基于


Sentinel


1.8.1开始改造


Sentinel Dashboard


,使他能结合


Nacos


实现数据持久化。


2.2.1


.


Dashboard改造分析


Sentinel Dashboard


的流控规则下的所有操作,都会调用


Sentinel





Dashboard


源码中的


FlowController


V1类,这个类中包含流控规则本地化的


CRUD


操作;


在com.alibaba.csp.sentinel.dashboard.controller.v2包下存在一个


FlowController


V2;类,这个类同样提供流控规则的


CURD


,与V1不同的是,它可以实现指定数据源的规则拉取和发布。


上面代码就是


FlowController


V2 部分代码,分别实现了拉取规则和推送规则:


1


:


DynamicRuleProvider:动态规则的拉取,从指定数据源中获取控制后在Sentinel Dashboard中展示。


2


:


DynamicRulePublisher:动态规则发布,将在Sentinel Dashboard中修改的规则同步到指定数据源中。


我们只需要扩展这两个类,然后集成


Nacos


来实现


Sentinel Dashboard


规则同步。


2.2.2


.


页面改造


在目录resources/app/scripts/directives/sidebar找到


sidebar


.html,里面有关于V1版本的请求入口:


<li


ui-sref-active


=


“active”


ng-if


=


“!entry.isGateway”


>


<a


ui-sref


=


“dashboard.flowV1({app: entry.app})”


>


<i


class


=


“glyphicon glyphicon-filter”


></i>


流控规则


</a>


</li>


对应的


JS


(


app.js


)


请求如下,可以看到请求就是


V


1版本的


Controller


,那么之后的改造需要重新对应


V


2版本的


Controller




.


state


(


‘dashboard.flowV1’


, {


templateUrl


:


‘app/views/flow_v1.html’


,


url


:


‘/flow/:app’


,


controller


:


‘FlowControllerV


1





,


resolve


: {


loadMyFiles


: [


‘$ocLazyLoad’


,


function


(


$ocLazyLoad


) {


return


$ocLazyLoad


.


load


({


name


:


‘sentinelDashboardApp’


,


files


: [


‘app/scripts/controllers/flow_v1.js’


,


]


});


}]


}


})


2.2.3


.


Nacos配置


在源码中虽然官方提供了


test


示例


(





test


目录


)


下关于


Nacos


等持久化示例,但是具体的实现还需要一些细节,比如在


Sentinel Dashboard


配置


Nacos





serverAddr





namespace





groupId


,并且通过


Nacos


获取配置文件获取服务列表等。


我们可以打开


NacosConfig


源码,


NacosConfig


中ConfigFactory.


createConfigService


(“localhost”) 并没有实现创建具体的


nacos config


service


,而是默认 localhost ,application.properties文件中也没有


Nacos


的相关配置,这些都需要我们额外配置,


NacosConfig


代码如下:




@Configuration







public class






NacosConfig {






@Bean







public






Converter<List<FlowRuleEntity>, String> flowRuleEntityEncoder() {








return






JSON::toJSONString;





}





@Bean







public






Converter<String, List<FlowRuleEntity>> flowRuleEntityDecoder() {








return






s -> JSON.parseArray(s, FlowRuleEntity.






class






);





}





@Bean







public






ConfigService nacosConfigService()






throws






Exception {








return






ConfigFactory.createConfigService(






“localhost”






);





}





}



如果我们需要把数据存储到


Nacos


,在


NacosConfig


U


tils


已经指定了默认的流控规则配置文件的


groupId


等,如果需要指定的话这里也需要修改.






public final class






NacosConfigUtil {








//Nacos中对应的GroupID











public static final






String








GROUP_ID








=






“SENTINEL_GROUP”






;







//文件后半部分











public static final






String








FLOW_DATA_ID_POSTFIX








=






“-flow-rules”






;







public static final






String








PARAM_FLOW_DATA_ID_POSTFIX








=






“-param-rules”






;







public static final






String








CLUSTER_MAP_DATA_ID_POSTFIX








=






“-cluster-map”






;







//略…







}



我们在


application


.properties中配置


Nacos




spring.cloud.sentinel.datasource.flow.nacos.server-addr


=


nacos


:


8848


spring.cloud.sentinel.datasource.flow.nacos.data-id


=


${spring.application.name}-flow-rules


spring.cloud.sentinel.datasource.flow.nacos.group-id


=


SENTINEL_GROUP


spring.cloud.sentinel.datasource.flow.nacos.data-type


=


json


spring.cloud.sentinel.datasource.flow.nacos.rule-type


=


flow


2.3.4


.


Dashboard持久化改造


我们接下来开始改造


Dashboard


源码,官方提供的


Nacos


持久化用例都是在


test


目录下,所以


scope


需要去除


test


,需要sentinel-datasource-nacos包的支持。之后将修改好的源码放在源码主目录com.alibaba.csp.sentinel.dashboard下,而不是继续在


test


目录下。


<!– for Nacos rule publisher sample –>


<dependency>


<groupId>


com.alibaba.csp


</groupId>


<artifactId>


sentinel-datasource-nacos


</artifactId>


<!–<scope>test</scope>–>


</dependency>


找到resources/app/scripts/directives/sidebar/


sidebar


.html文件修改,修改flflowV1为flflow,去掉V1,这样的话会调用


FlowController


V2接口


修改前:


<li


ui-sref-active


=


“active”


ng-if


=


“!entry.isGateway”


>


<a


ui-sref


=


“dashboard.flowV1({app: entry.app})”


>


<i


class


=


“glyphicon glyphicon-filter”


>


</i>


流控规则


</a>


</li>


修改后:


<li


ui-sref-active


=


“active”


ng-if


=


“!entry.isGateway”


>


<a


ui-sref


=


“dashboard.flow({app: entry.app})”


>


<i


class


=


“glyphicon glyphicon-filter”


>


</i>


流控规则


</a>


</li>


这样就可以通过


js


跳转至


FlowControllerV2


了,


app


.js代码如下:


.


state


(


‘dashboard.flow’


, {


templateUrl


:


‘app/views/flow_v2.html’


,


url


:


‘/v2/flow/:app’


,


controller


:


‘FlowControllerV2’


,


resolve


: {


loadMyFiles


: [


‘$ocLazyLoad’


,


function


(


$ocLazyLoad


) {


return


$ocLazyLoad


.


load


({


name


:


‘sentinelDashboardApp’


,


files


: [


‘app/scripts/controllers/flow_v2.js’


,


]


});


}]


}


})


2.3.5


.


Nacos配置创建


我们采用官方的约束,即默认


Nacos


适配的


dataId





groupId


约定,所以不需要修改


NacosConfigUtil


.java了,配置如下:


groupId


:


SENTINEL_GROUP


流控规则 dataId


:


{appName}-flow-rules,比如应用名为 appA,则 dataId 为 ap


pA-flow- rules


我们在


application


.properties 中配置


Nacos


服务信息:


# nacos config server


sentinel.nacos.serverAddr


=


nacos


:


8848


sentinel.nacos.namespace


=


sentinel.nacos.group-id


=


SENTINEL_GROUP


sentinel.nacos.


password=


nacos


sentinel.nacos.


username=


nacos


接下来创建读取


nacos


配置的NacosPropertiesConfiguration文件并且


application


.properties指定配置




@ConfigurationProperties


(


prefix


=


“sentinel.nacos”


)


public class


NacosPropertiesConfiguration


{


private


String


serverAddr


;


private


String


dataId


;


private


String


groupId


=


“SENTINEL_GROUP”


;


// 默认分组


private


String


namespace


;


private


String


user


name


;


private


String


password


;


//get set


}


2.3.6


.


改造源码


2.3.6.1


.


改造NacosConfifig


我们最后改造


NacosConfig


,让


NacosConfig


做两件事:


1


)


注入


Convert


转换器,将


FlowRuleEntity


转化成


FlowRule


,以及反向转化


2


)


注入


Nacos


配置服务


ConfifigService




@EnableConfigurationProperties(NacosPropertiesConfiguration.






class






)





@Configuration







public class






NacosConfig {






@Bean







public






Converter<List<FlowRuleEntity>, String> flowRuleEntityEncoder() {








return






JSON::toJSONString;





}



@Bean







public






Converter<String, List<FlowRuleEntity>> flowRuleEntityDecoder() {








return






s -> JSON.parseArray(s, FlowRuleEntity.






class






);





}



@Bean







public






ConfigService nacosConfigService(NacosPropertiesConfiguration nacosPropertiesConfiguration)






throws






Exception {






Properties properties =






new






Properties();





properties.put(PropertyKeyConst.SERVER_ADDR,





nacosPropertiesConfiguration.getServerAddr());





properties.put(PropertyKeyConst.NAMESPACE,





nacosPropertiesConfiguration.getNamespace());





properties.put(PropertyKeyConst.USERNAME,





nacosPropertiesConfiguration.getUwername());





properties.put(PropertyKeyConst.PASSWORD,





nacosPropertiesConfiguration.getPassword());







return






ConfigFactory.createConfigService(properties);







// return ConfigFactory.createConfigService(“localhost”);









}





}



2.3.6.2


.


动态获取流控规则


动态实现从


Nacos


配置中心获取流控规则需要重写


FlowRuleNacosProvider





FlowRuleNacosPublisher


类。


1)重写


FlowRuleNacosProvider






@Service(






“flowRuleNacosProvider”






)







public class






FlowRuleNacosProvider






implements






DynamicRuleProvider<List<FlowRuleEntity>> {








public static final






Logger








log








= LoggerFactory.getLogger(FlowRuleNacosProvider.






class






);





@Autowired







private






ConfigService






configService






;





@Autowired







private






Converter<String, List<FlowRuleEntity>>






converter






;







/**









* 1)通过ConfigService的getConfig()方法从Nacos Config Server读取指定配置信息









* 2)通过转为converter转化为FlowRule规则









*










@param












appName















*










@return















*










@throws










Exception









*/









@Override







public






List<FlowRuleEntity> getRules(String appName)






throws






Exception {






String rules =






configService






.getConfig(appName + NacosConfigUtil.FLOW_DATA_ID_POSTFIX, NacosConfigUtil.GROUP_ID,




3000




);









log








.info(






“obtain flow rules from nacos config:{}”






, rules);







if






(StringUtil.isEmpty(rules)) {








return new






ArrayList<>();





}







return








converter






.convert(rules);





}





}



2)重写


FlowRuleNacosPublisher


类:




@Service(






“flowRuleNacosPublisher”






)







public class






FlowRuleNacosPublisher






implements






DynamicRulePublisher<List<FlowRuleEntity>> {








public static final






Logger








log








= LoggerFactory.getLogger(FlowRuleNacosPublisher.






class






);





@Autowired







private






ConfigService






configService






;





@Autowired







private






Converter<List<FlowRuleEntity>, String>






converter






;







/**









* 通过configService的publishConfig()方法将rules发布到nacos









*










@param












app










app name









*










@param












rules










list of rules to push









*










@throws










Exception









*/









@Override







public void






publish(String app, List<FlowRuleEntity> rules)






throws






Exception {






AssertUtil.notEmpty(app,






“app name cannot be empty”






);







if






(rules ==






null






) {








return






;





}









log








.info(






“sentinel dashboard push rules: {}”






, rules);







configService






.publishConfig(app + NacosConfigUtil.FLOW_DATA_ID_POSTFIX, NacosConfigUtil.GROUP_ID,






converter






.convert(rules));





}





}



3)替换默认对象


修改


FlowController


V2类,使用@


Qulififier


将上面配置的两个类注入进来


2.3.6.3.


数据持久化测试


我们接下来将程序打包,如下图:


打包好程序后,再将程序运行起来:


java -Dserver.port=8858 -Dcsp.sentinel.dashboard.server=localhost:8858 -Dproject.name=sentinel-dashboard -jar sentinel-dashboard.jar


运行起来后,我们可以发现在


Nacos


中多了一个服务,如下图:


我们随意增加一个流控规则,如下图:


点击新增之后,我们可以发现


SENTILE


_


GROUP


组下多了一个文件,


sentinel





flow





rule


文件,效果如下:


2.3.系统规则定义

上面改造样子使用的限流规则,还有其他一些规则也可以去配置。

我们来配置一个系统服务规则:



NacosConfigUtil

中新增配置定义为系统规则配置”-system-rules”





public final class





NacosConfigUtil {





public static final





String






GROUP_ID






=





“SENTINEL_GROUP”





;




public static final





String






FLOW_DATA_ID_POSTFIX






=





“-flow-rules”





;






public static final





String






PARAM_FLOW_DATA_ID_POSTFIX






=





“-param-rules”





;






public static final





String






CLUSTER_MAP_DATA_ID_POSTFIX






=





“-cluster-map”





;




// 系统规则









public static final





String






SYSTEM_DATA_ID_POSTFIX






=





“-system-rules”





;






/**







* cc for `cluster-client`







*/









public static final





String






CLIENT_CONFIG_DATA_ID_POSTFIX






=





“-cc-config”





;






/**







* cs for `cluster-server`







*/









public static final





String






SERVER_TRANSPORT_CONFIG_DATA_ID_POSTFIX






=





“-cs-transport-config”





;






public static final





String






SERVER_FLOW_CONFIG_DATA_ID_POSTFIX






=





“-cs-flow-config”





;






public static final





String






SERVER_NAMESPACE_SET_DATA_ID_POSTFIX






=





“-cs-namespace-set”





;




private





NacosConfigUtil() {}





}


复制

FlowRuleNacosProvider





FlowRuleNacosPublisher




规则推送,并修改为系统规则。

SystemRuleNacosProvider:




@Component




(





“systemRuleNacosProvider”





)






public class





SystemRuleNacosProvider





implements





DynamicRuleProvider<List<SystemRuleEntity>> {




@Autowired






private





ConfigService





configService





;





@Autowired






private





Converter<String, List<SystemRuleEntity>>





converter





;



@Override






public





List<SystemRuleEntity> getRules(String appName)





throws





Exception {






String rules =





configService





.getConfig(appName + NacosConfigUtil.






SYSTEM_DATA_ID_POSTFIX






,





NacosConfigUtil.






GROUP_ID






,




3000




);






if





(StringUtil.





isEmpty





(rules)) {







return new





ArrayList<>();





}






return






converter





.convert(rules);





}





}


SystemRuleNacosPublisher:




@Component




(





“systemRuleNacosPublisher”





)






public class





SystemRuleNacosPublisher





implements





DynamicRulePublisher<List<SystemRuleEntity>> {




@Autowired






private





ConfigService





configService





;





@Autowired






private





Converter<List<SystemRuleEntity>, String>





converter





;



@Override






public void





publish(String app, List<SystemRuleEntity> rules)





throws





Exception {






AssertUtil.





notEmpty





(app,





“app name cannot be empty”





);






if





(rules ==





null





) {







return





;





}






configService





.publishConfig(app + NacosConfigUtil.






SYSTEM_DATA_ID_POSTFIX






,





NacosConfigUtil.






GROUP_ID






,





converter





.convert(rules));





}





}


规则转换器:在

NacosConfig

中添加规则转换器,即数据发送给nacos和接收nacos信息需要处理一下。




@Configuration






public class





NacosConfig {






@Value




(





“${nacos.address}”





)






private





String





address





;





@Value




(





“${nacos.namespace}”





)






private





String





namespace





;





@Value




(





“${nacos.username}”





)






private





String





username





;





@Value




(





“${nacos.password}”





)






private





String





password





;



@Bean






public





ConfigService nacosConfigService()





throws





Exception {







// nacos配置








Properties properties =





new





Properties();





properties.put(PropertyKeyConst.






SERVER_ADDR






,





address





);





properties.put(PropertyKeyConst.






NAMESPACE






,





namespace





);





properties.put(PropertyKeyConst.






USERNAME






,





username





);





properties.put(PropertyKeyConst.






PASSWORD






,





password





);






//return ConfigFactory.createConfigService(“localhost”);









return





ConfigFactory.





createConfigService





(properties);





}




/**







* 编码 sentinel将数据发送给nacos要编码







*/








@Bean






public





Converter<List<FlowRuleEntity>, String> flowRuleEntityEncoder() {







return





JSON::





toJSONString





;





}




/**







* 解码 sentinel从nacos获取数据要解码







*/








@Bean






public





Converter<String, List<FlowRuleEntity>> RuleEntityDecoder() {







return





s -> JSON.





parseArray





(s, FlowRuleEntity.





class





);





}




/**







* 规则转换器 推送 将队则转成JSON字符串







*/








@Bean






public





Converter<List<SystemRuleEntity>, String> systemRuleEntityEncoder() {







return





JSON::





toJSONString





;





}




/**







* 规则转换器 读取 将nacos中的JSON规则转换成SystemRuleEntity







*/








@Bean






public





Converter<String, List<SystemRuleEntity>> systemRuleEntityDecoder() {







return





s -> JSON.





parseArray





(s, SystemRuleEntity.





class





);





}





}


修改

SystemController




@RestController





@RequestMapping




(





“/system”





)






public class





SystemController {





private final





Logger





logger





= LoggerFactory.





getLogger





(SystemController.





class





);



@Autowired






private





RuleRepository<SystemRuleEntity, Long>





repository





;





@Autowired





@Qualifier




(





“systemRuleNacosProvider”





)






private





DynamicRuleProvider<List<SystemRuleEntity>>





ruleProvider





;





@Autowired





@Qualifier




(





“systemRuleNacosPublisher”





)






private





DynamicRulePublisher<List<SystemRuleEntity>>





rulePublisher





;




private





<




R




> Result<




R




> checkBasicParams(String app, String ip, Integer port) {







if





(StringUtil.





isEmpty





(app)) {







return





Result.





ofFail





(-




1




,





“app can’t be null or empty”





);





}






if





(port <=




0




|| port >




65535




) {







return





Result.





ofFail





(-




1




,





“port should be in (0, 65535)”





);





}






return null





;





}



@GetMapping




(





“/rules.json”





)





@AuthAction




(PrivilegeType.






READ_RULE






)






public





Result<List<SystemRuleEntity>> apiQueryMachineRules(String app, String ip,





Integer port) {






Result<List<SystemRuleEntity>> checkResult = checkBasicParams(app, ip, port);






if





(checkResult !=





null





) {







return





checkResult;





}






try





{






List<SystemRuleEntity> rules =





ruleProvider





.getRules(app);





rules =





repository





.saveAll(rules);






return





Result.





ofSuccess





(rules);





}





catch





(Throwable throwable) {







logger





.error(





“Query machine system rules error”





, throwable);






return





Result.





ofThrowable





(-




1




, throwable);





}





}




private int





countNotNullAndNotNegative(Number… values) {







int





notNullCount =




0




;






for





(





int





i =




0




; i < values.





length





; i++) {







if





(values[i] !=





null





&& values[i].doubleValue() >=




0




) {






notNullCount++;





}





}






return





notNullCount;





}



@RequestMapping




(





“/new.json”





)





@AuthAction




(PrivilegeType.






WRITE_RULE






)






public





Result<SystemRuleEntity> apiAdd(String app, String ip, Integer port,





Double highestSystemLoad, Double highestCpuUsage, Long avgRt,





Long maxThread, Double qps) {




Result<SystemRuleEntity> checkResult = checkBasicParams(app, ip, port);






if





(checkResult !=





null





) {







return





checkResult;





}




int





notNullCount = countNotNullAndNotNegative(highestSystemLoad, avgRt, maxThread, qps, highestCpuUsage);






if





(notNullCount !=




1




) {







return





Result.





ofFail





(-




1




,





“only one of [highestSystemLoad, avgRt, maxThread, qps,highestCpuUsage] ”








+





“value must be set > 0, but ”





+ notNullCount +





” values get”





);





}






if





(





null





!= highestCpuUsage && highestCpuUsage >




1




) {







return





Result.





ofFail





(-




1




,





“highestCpuUsage must between [0.0, 1.0]”





);





}





SystemRuleEntity entity =





new





SystemRuleEntity();





entity.setApp(app.trim());





entity.setIp(ip.trim());





entity.setPort(port);






// -1 is a fake value









if





(





null





!= highestSystemLoad) {






entity.setHighestSystemLoad(highestSystemLoad);





}





else





{






entity.setHighestSystemLoad(-




1D




);





}




if





(





null





!= highestCpuUsage) {






entity.setHighestCpuUsage(highestCpuUsage);





}





else





{






entity.setHighestCpuUsage(-




1D




);





}




if





(avgRt !=





null





) {






entity.setAvgRt(avgRt);





}





else





{






entity.setAvgRt(-




1L




);





}






if





(maxThread !=





null





) {






entity.setMaxThread(maxThread);





}





else





{






entity.setMaxThread(-




1L




);





}






if





(qps !=





null





) {






entity.setQps(qps);





}





else





{






entity.setQps(-




1D




);





}





Date date =





new





Date();





entity.setGmtCreate(date);





entity.setGmtModified(date);






try





{






entity =





repository





.save(entity);





}





catch





(Throwable throwable) {







logger





.error(





“Add SystemRule error”





, throwable);






return





Result.





ofThrowable





(-




1




, throwable);





}





publishRules(app);






return





Result.





ofSuccess





(entity);





}



@GetMapping




(





“/save.json”





)





@AuthAction




(PrivilegeType.






WRITE_RULE






)






public





Result<SystemRuleEntity> apiUpdateIfNotNull(Long id, String app, Double highestSystemLoad,





Double highestCpuUsage, Long avgRt, Long maxThread, Double qps) {







if





(id ==





null





) {







return





Result.





ofFail





(-




1




,





“id can’t be null”





);





}





SystemRuleEntity entity =





repository





.findById(id);






if





(entity ==





null





) {







return





Result.





ofFail





(-




1




,





“id ”





+ id +





” dose not exist”





);





}




if





(StringUtil.





isNotBlank





(app)) {






entity.setApp(app.trim());





}






if





(highestSystemLoad !=





null





) {







if





(highestSystemLoad <




0




) {







return





Result.





ofFail





(-




1




,





“highestSystemLoad must >= 0”





);





}





entity.setHighestSystemLoad(highestSystemLoad);





}






if





(highestCpuUsage !=





null





) {







if





(highestCpuUsage <




0




) {







return





Result.





ofFail





(-




1




,





“highestCpuUsage must >= 0”





);





}






if





(highestCpuUsage >




1




) {







return





Result.





ofFail





(-




1




,





“highestCpuUsage must <= 1”





);





}





entity.setHighestCpuUsage(highestCpuUsage);





}






if





(avgRt !=





null





) {







if





(avgRt <




0




) {







return





Result.





ofFail





(-




1




,





“avgRt must >= 0”





);





}





entity.setAvgRt(avgRt);





}






if





(maxThread !=





null





) {







if





(maxThread <




0




) {







return





Result.





ofFail





(-




1




,





“maxThread must >= 0”





);





}





entity.setMaxThread(maxThread);





}






if





(qps !=





null





) {







if





(qps <




0




) {







return





Result.





ofFail





(-




1




,





“qps must >= 0”





);





}





entity.setQps(qps);





}





Date date =





new





Date();





entity.setGmtModified(date);






try





{






entity =





repository





.save(entity);





}





catch





(Throwable throwable) {







logger





.error(





“save error:”





, throwable);






return





Result.





ofThrowable





(-




1




, throwable);





}





publishRules(app);






return





Result.





ofSuccess





(entity);





}



@RequestMapping




(





“/delete.json”





)





@AuthAction




(PrivilegeType.






DELETE_RULE






)






public





Result<?> delete(Long id) {







if





(id ==





null





) {







return





Result.





ofFail





(-




1




,





“id can’t be null”





);





}





SystemRuleEntity oldEntity =





repository





.findById(id);






if





(oldEntity ==





null





) {







return





Result.





ofSuccess





(





null





);





}






try





{







repository





.delete(id);





}





catch





(Throwable throwable) {







logger





.error(





“delete error:”





, throwable);






return





Result.





ofThrowable





(-




1




, throwable);





}





publishRules(oldEntity.getApp());






return





Result.





ofSuccess





(id);





}




private void





publishRules(String app) {






List<SystemRuleEntity> rules =





repository





.findAllByApp(app);






try





{







rulePublisher





.publish(app, rules);





}





catch





(Exception e) {






e.printStackTrace();





}





}





}


重启服务,系统规则就配置好了。

服务端拉取规则配置需要引入相关依赖包,配置相关

sentinel

配置。

在服务端引入相关依赖包。





<!– sentinel nacos数据持久化–>






<





dependency





>





<





groupId





>




com.alibaba.csp




</





groupId





>





<





artifactId





>




sentinel-datasource-nacos




</





artifactId





>





<





version





>




1.8.0




</





version





>





</





dependency





>


配置

sentinel

支持

nacos

配置,从

nacos

中加载对应的配置规则。





spring





:






application





:






name





: hailtaxi-driver






cloud





:






sentinel





:






transport





:






port





: 8719






dashboard





: 127.0.0.1:8080






datasource





:






nacos





:






server-addr





: 127.0.0.1:8848






username





: nacos






password





: nacos






namespace





: 1cc805fd-009f-4ca9-8ceb-f424c9b8babd






groupId





: DEFAULT_GROUP






dataId





: ${






spring.application.name





}-flow-rules






rule-type





: flow






system





:






nacos





:






server-addr





: 127.0.0.1:8848






username





: nacos






password





: nacos






namespace





: 1cc805fd-009f-4ca9-8ceb-f424c9b8babd






groupId





: DEFAULT_GROUP






dataId





: ${






spring.application.name





}-system-rules






rule-type





: system


启动服务,新增系统规则,配置

CPU

使用率。

访问测试接口:http://127.0.0.1:18888/driver/info/1



CUP

使用率低于10%,接口才能被调用。



CPU

使用率高于10%,系统将会被熔断,拒绝被访问。