目录
1.0.Nacos源码剖析
1.1.Nacos配置存储mysql数据库
1.2.客户端工作流程
1.2.1服务创建
1.2.2.服务注册
1.2.3.服务发现
1.2.4.服务下线
1.2.4.服务订阅
1.3.服务端工作流程
1.3.1.注册处理
1.3.2.一致性算法Distro协议介绍
1.3.3.Distro服务启动-寻址模式
1.3.3.1.单机寻址模式
1.3.3.2.文件寻址模式
编辑
1.3.3.3.服务器寻址模式
1.3.5.集群数据同步
1.3.5.1.全量同步
◆ 任务启动
◆ 数据执行加载
◆ 数据同步
1.3.5.2.增量同步
◆ 增量数据入口
◆ 增量同步操作
◆ 详细增量数据同步(单机)
2 .0.Sentinel Dashboard数据持久化
2.1.动态配置原理
2.2.Sentinel+Nacos数据持久化
2.2.1.Dashboard改造分析
2.2.2.页面改造
2.2.3.Nacos配置
2.3.4.Dashboard持久化改造
2.3.5.Nacos配置创建
2.3.6.改造源码
2.3.6.1.改造NacosConfifig
2.3.6.2.动态获取流控规则
2.3.6.3.数据持久化测试
2.3.系统规则定义
1.0.
Nacos源码剖析
Nacos
源码有很多值得我们学习的地方,为了深入理解
Nacos
,我们剖析源码,分析如下2个知识点:
1
:
Nacos对注册中心的访问原理
2
:
Nacos注册服务处理流程
|
我们接下来对
Nacos
源码做一个深度剖析,首先搭建
Nacos
源码环境,源码环境搭建起来比较轻松,几乎不会报什么错误,我们这里就不去演示源码环境搭建了。
客户端与注册中心服务端的交互,主要集中在
服务注册
、
服务下线
、
服务发现
、
订阅服务
,其实使用最多的就是
服务注册
和
服务发现
,下面我会从源码的角度分析一下这四个功能。
在
Nacos
源码中
nacos
–
example
中com.alibaba.nacos.example.
NamingExample
类分别演示了这
4
个功能的操作,我们可以把它当做入口,代码如下:
public class
NamingExample
{
public static void
main(String[] args)
throws
NacosException {
Properties properties =
new
Properties();
properties.setProperty(
“serverAddr”
, System.
getProperty
(
“serverAddr”
));
properties.setProperty(
“namespace”
, System.
getProperty
(
“namespace”
));
//采用NamingService实现服务注册
NamingService naming = NamingFactory.createNamingService(properties);
//服务注册
naming.registerInstance(
“nacos.test.3”
,
“11.11.11.11”
,
8888
,
“TEST1”
);
naming.registerInstance(
“nacos.test.3”
,
“2.2.2.2”
,
9999
,
“DEFAULT”
);
//服务发现
System.
out
.println(naming.getAllInstances(
“nacos.test.3”
));
//服务下线
naming.deregisterInstance(
“nacos.test.3”
,
“2.2.2.2”
,
9999
,
“DEFAULT”
);
System.
out
.println(naming.getAllInstances(
“nacos.test.3”
));
//服务订阅
Executor executor =
new
ThreadPoolExecutor(
1
,
1
,
0L
,
TimeUnit.MILLISECONDS,
new
LinkedBlockingQueue<Runnable>(),
new
ThreadFactory() {
@Override
public
Thread newThread(Runnable r) {
Thread thread =
new
Thread(r);
thread.setName(
“test-thread”
);
return
thread;
}
}
);
naming.subscribe(
“nacos.test.3”
,
new
AbstractEventListener() {
//EventListener onEvent is sync to handle, If process too low in onEvent, maybe block other onEvent callback.
//So you can override getExecutor() to async handle event.
@Override
public
Executor getExecutor() {
return
executor
;
}
@Override
public void
onEvent(Event event) {
System.
out
.println(((NamingEvent) event).getServiceName());
System.
out
.println(((NamingEvent) event).getInstances());
}
});
}
}
|
1.1.Nacos配置存储mysql数据库
配置文件存储
mysql
,打开
nacos
–
console
工程下
application
.properties文件,添加
mysql
连接信息:
# 当前配置存储 采用mysql存储
spring.datasource.platform
=
mysql
### Count of DB:
db.num
=
1
### Connect URL of DB:
db.url.0
=
jdbc:mysql://127.0.0.1:3306/nacos?characterEncoding=utf8&connectTimeout=1000
&socketTimeout=3000&autoReconnect=true&useUnicode=true&useSSL=false&serverTimezone=UTC
db.user.0
=
nacos
db.password.0
=
nacos
|
存储配置的表模型在
nacos
–
console
工程下
resources
/
META
–
INF
下
schema
.sql,在
mysql
库中执行脚本。
配置好上面的信息,
nacos
将会把配置文件信息保存到数据库中。
启动
nacos
,在
nacos
–
console
工程下,启动
Nacos
启动类。
@SpringBootApplication
(scanBasePackages =
“com.alibaba.nacos”
)
@ServletComponentScan
// 定时器
@EnableScheduling
public class
Nacos {
public static void
main(String[] args) {
SpringApplication.
run
(Nacos.
class
, args);
}
}
|
1.
2.
客户端工作流程
我们先来看一下客户端是如何实现
服务注册
、
服务发现
、
服务下线
操作
、
服务订阅
操作的。
1.2.1服务创建
一旦创建
NamingService
就会支持服务订阅。
NamingService
对象创建功能:
/**
* 创建NamingService 对象
*/
public static
NamingService createNamingService(Properties properties)
throws
NacosException {
try
{
Class<?> driverImplClass = Class.
forName
(
“com.alibaba.nacos.client.naming.NacosNamingService”
);
Constructor constructor = driverImplClass.getConstructor(Properties.
class
);
// NamingService
NamingService vendorImpl = (NamingService) constructor.newInstance(properties);
return
vendorImpl;
}
catch
(Throwable e) {
throw new
NacosException(NacosException.
CLIENT_INVALID_PARAM
, e);
}
}
|
接下来跟踪看看创建的
NamingService
功能,创建对象时会初始化远程代理对象、心跳检测和订阅定时任务。
/**
* 构造函数 创建NacosNamingService对象
*/
public
NacosNamingService(String serverList)
throws
NacosException {
Properties properties =
new
Properties();
properties.setProperty(PropertyKeyConst.
SERVER_ADDR
, serverList);
// 初始化 配置
init(properties);
}
public
NacosNamingService(Properties properties)
throws
NacosException {
init(properties);
}
/**
* 初始胡过程
*/
private void
init(Properties properties)
throws
NacosException {
ValidatorUtils.
checkInitParam
(properties);
this
.
namespace
= InitUtils.
initNamespaceForNaming
(properties);
InitUtils.
initSerialization
();
initServerAddr(properties);
InitUtils.
initWebRootContext
(properties);
initCacheDir();
initLogName(properties);
// 创建代理对象 执行远程操作
http协议
this
.
serverProxy
=
new
NamingProxy(
this
.
namespace
,
this
.
endpoint
,
this
.
serverList
, properties);
// 定时任务心跳检测
this
.
beatReactor
=
new
BeatReactor(
this
.
serverProxy
, initClientBeatThreadCount(properties));
// 订阅 定时任务 定时从服务拉取服务列表
this
.
hostReactor
=
new
HostReactor(
this
.
serverProxy
,
beatReactor
,
this
.
cacheDir
, isLoadCacheAtStart(properties),
isPushEmptyProtect(properties), initPollingThreadCount(properties));
}
|
继续跟踪
HostReactor
订阅
public
HostReactor(NamingProxy serverProxy, BeatReactor beatReactor, String cacheDir,
boolean
loadCacheAtStart,
boolean
pushEmptyProtection,
int
pollingThreadCount) {
// init executorService
this
.
executor
=
new
ScheduledThreadPoolExecutor(pollingThreadCount,
new
ThreadFactory() {
@Override
public
Thread newThread(Runnable r) {
Thread thread =
new
Thread(r);
thread.setDaemon(
true
);
thread.setName(
“com.alibaba.nacos.client.naming.updater”
);
return
thread;
}
});
this
.
beatReactor
= beatReactor;
this
.
serverProxy
= serverProxy;
this
.
cacheDir
= cacheDir;
if
(loadCacheAtStart) {
this
.
serviceInfoMap
=
new
ConcurrentHashMap<String, ServiceInfo>(DiskCache.
read
(
this
.
cacheDir
));
}
else
{
this
.
serviceInfoMap
=
new
ConcurrentHashMap<String, ServiceInfo>(
16
);
}
this
.
pushEmptyProtection
= pushEmptyProtection;
this
.
updatingMap
=
new
ConcurrentHashMap<String, Object>();
this
.
failoverReactor
=
new
FailoverReactor(
this
, cacheDir);
// go on 创建订阅服务列表对象
this
.
pushReceiver
=
new
PushReceiver(
this
);
this
.
notifier
=
new
InstancesChangeNotifier();
NotifyCenter.
registerToPublisher
(InstancesChangeEvent.
class
,
16384
);
NotifyCenter.
registerSubscriber
(
notifier
);
}
|
跟踪订阅服务列表对象
PushReceiver
public class
PushReceiver
implements
Runnable, Closeable {
private static final
Charset
UTF_8
= Charset.
forName
(
“UTF-8”
);
private static final int
UDP_MSS
=
64
*
1024
;
// 定时任务线程池
private
ScheduledExecutorService
executorService
;
private
DatagramSocket
udpSocket
;
private
HostReactor
hostReactor
;
private volatile boolean
closed
=
false
;
public
PushReceiver(HostReactor hostReactor) {
try
{
this
.
hostReactor
= hostReactor;
// 通讯对象(NIO)->远程调用
this
.
udpSocket
=
new
DatagramSocket();
// 创建定时任务线程池->定时任务
this
.
executorService
=
new
ScheduledThreadPoolExecutor(
1
,
new
ThreadFactory() {
@Override
public
Thread newThread(Runnable r) {
Thread thread =
new
Thread(r);
thread.setDaemon(
true
);
thread.setName(
“com.alibaba.nacos.naming.push.receiver”
);
return
thread;
}
});
// 循环执行定时任务 每过一段时间获取一次数据
this
.
executorService
.execute(
this
);
}
catch
(Exception e) {
NAMING_LOGGER
.error(
“[NA] init udp socket failed”
, e);
}
}
@Override
public void
run() {
while
(!
closed
) {
try
{
// byte[] is initialized with 0 full filled by default
byte
[] buffer =
new byte
[
UDP_MSS
];
DatagramPacket packet =
new
DatagramPacket(buffer, buffer.
length
);
// 接收数据 获取远程数据包 NIO
udpSocket
.receive(packet);
String json =
new
String(IoUtils.
tryDecompress
(packet.getData()),
UTF_8
).trim();
NAMING_LOGGER
.info(
“received push data: ”
+ json +
” from ”
+ packet.getAddress().toString());
// 反序列化
PushPacket pushPacket = JacksonUtils.
toObj
(json, PushPacket.
class
);
String ack;
if
(
“dom”
.equals(pushPacket.
type
) ||
“service”
.equals(pushPacket.
type
)) {
// 处理反序列化数据包
hostReactor
.processServiceJson(pushPacket.
data
);
// send ack to server
// ack确认机制 告诉服务器 已经获取到信息了
ack =
“{
\”
type
\”
:
\”
push-ack
\”
”
+
“,
\”
lastRefTime
\”
:
\”
”
+ pushPacket.
lastRefTime
+
”
\”
,
\”
data
\”
:”
+
”
\”\”
}”
;
}
else if
(
“dump”
.equals(pushPacket.
type
)) {
// dump data to server
ack =
“{
\”
type
\”
:
\”
dump-ack
\”
”
+
“,
\”
lastRefTime
\”
:
\”
”
+ pushPacket.
lastRefTime
+
”
\”
,
\”
data
\”
:”
+
”
\”
”
+ StringUtils.
escapeJavaScript
(JacksonUtils.
toJson
(
hostReactor
.getServiceInfoMap()))
+
”
\”
}”
;
}
else
{
// do nothing send ack only
ack =
“{
\”
type
\”
:
\”
unknown-ack
\”
”
+
“,
\”
lastRefTime
\”
:
\”
”
+ pushPacket.
lastRefTime
+
”
\”
,
\”
data
\”
:”
+
”
\”\”
}”
;
}
udpSocket
.send(
new
DatagramPacket(ack.getBytes(
UTF_8
), ack.getBytes(
UTF_8
).
length
,
packet.getSocketAddress()));
}
catch
(Exception e) {
if
(
closed
) {
return
;
}
NAMING_LOGGER
.error(
“[NA] error while receiving push data”
, e);
}
}
}
… …
|
继续处理远程拉取的服务信息,反序列化数据包,将反序列化数据包存储到
serviceInfoMap
中。
public
ServiceInfo processServiceJson(String json) {
// 解析数据包
ServiceInfo serviceInfo = JacksonUtils.
toObj
(json, ServiceInfo.
class
);
ServiceInfo oldService =
serviceInfoMap
.get(serviceInfo.getKey());
if
(
pushEmptyProtection
&& !serviceInfo.validate()) {
//empty or error push, just ignore
return
oldService;
}
boolean
changed =
false
;
if
(oldService !=
null
) {
if
(oldService.getLastRefTime() > serviceInfo.getLastRefTime()) {
NAMING_LOGGER
.warn(
“out of date data received, old-t: ”
+ oldService.getLastRefTime() +
“, new-t: ”
+ serviceInfo.getLastRefTime());
}
// ===获取服务数据存储本地serviceInfoMap中===
serviceInfoMap
.put(serviceInfo.getKey(), serviceInfo);
Map<String, Instance> oldHostMap =
new
HashMap<String, Instance>(oldService.getHosts().size());
for
(Instance host : oldService.getHosts()) {
oldHostMap.put(host.toInetAddr(), host);
}
… …
|
总结,创建
NamigService
对象时做了几件事:
1. 创建代理对象,执行远程操作。
2. 创建定时任务执行心跳检测,定时向服务发送心跳检测。
3. 创建订阅定时任务,定时从服务拉取服务列表信息。
A. 创建通讯对象(
NIO
协议)->执行远程调用。
B. 创建定时任务线程池->执行
定时任务
,每过一段时间从服务拉取一次服务信息。将拉取的服务信息反序列化后存入本地的缓存(Map<String, ServiceInfo> serviceInfoMap)。
C. 服务信息变更时,会拉取服务信息存储到
serviceInfoMap
中。
1.
2
.
2.
服务注册
我们沿着案例中的服务注册方法调用找到
nacos
–
api
中的NamingService.
registerInstance
() 并找到它的实现类和方法 com.alibaba.nacos.client.naming.
NacosNamingService
,代码如下:
/***
* 服务注册
*
@param
serviceName
服务名字
*
@param
ip
服务IP
*
@param
port
服务端口
*
@param
clusterName
集群名字
*
@throws
NacosException
*/
@Override
public void
registerInstance(String serviceName, String ip,
int
port, String clusterName)
throws
NacosException {
registerInstance(serviceName, Constants.DEFAULT_GROUP, ip, port, clusterName);
}
@Override
public void
registerInstance(String serviceName, String groupName, String ip,
int
port, String clusterName)
throws
NacosException {
//设置实例IP:Port,默认权重为1.0
Instance instance =
new
Instance();
instance.setIp(ip);
instance.setPort(port);
instance.setWeight(
1.0
);
instance.setClusterName(clusterName);
//注册实例
registerInstance(serviceName, groupName, instance);
}
@Override
public void
registerInstance(String serviceName, Instance instance)
throws
NacosException {
registerInstance(serviceName, Constants.DEFAULT_GROUP, instance);
}
/***
* 实例注册
*
@param
serviceName
name of service
*
@param
groupName
group of service
*
@param
instance
instance to register
*
@throws
NacosException
*/
@Override
public void
registerInstance(String serviceName, String groupName, Instance instance)
throws
NacosException {
NamingUtils.checkInstanceIsLegal(instance);
String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
// 该字段表示注册的实例是否是临时实例还是持久化实例。
// 如果是临时实例,则不会在 Nacos 服务端持久化存储,需要通过上报心跳的方式进行包活,
// 如果一段时间内没有上报心跳,则会被 Nacos 服务端摘除。
if
(instance.isEphemeral()) {
// 为注册服务设置一个定时任务获取心跳信息,默认为5s汇报一次
// 心跳检测任务 服务名+@+组名 作为key执行心跳检测
BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);
beatReactor.addBeatInfo(groupedServiceName, beatInfo);
}
// 注册到服务端 serverProxy在NamingServer创建的时候创建 代理对象 执行远程调用
serverProxy.registerService(groupedServiceName, groupName, instance);
}
|
注册主要做了两件事
:
第一件事:为注册的服务设置一个
定时任务
,定时
拉取
服务
信息。
第二件事:将服务
注册
到服务端。
1
:
启动一个定时任务,定时拉取服务信息,时间间隔为5s,如果拉下来服务正常,不做处理,如果不正常,重新注册
2
:
发送
http
请求给注册中心服务端,调用服务注册接口,注册服务
|
上面代码我们可以看到定时任务添加,但并未完全看到远程请求,serverProxy.
registerService
()方法如下,会先封装请求参数,接下来调用
reqApi
() 而
reqApi
()最后会调用
callServer
() ,代码如下:
public void
registerService(String serviceName, String groupName, Instance instance)
throws
NacosException {
NAMING_LOGGER.info(
“[REGISTER-SERVICE] {} registering service {} with instance: {}”
, namespaceId, serviceName, instance);
//封装Http请求参数
final
Map<String, String> params =
new
HashMap<String, String>(
16
);
params.put(CommonParams.NAMESPACE_ID, namespaceId);
params.put(CommonParams.SERVICE_NAME, serviceName);
params.put(CommonParams.GROUP_NAME, groupName);
params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());
params.put(
“ip”
, instance.getIp());
params.put(
“port”
, String.valueOf(instance.getPort()));
params.put(
“weight”
, String.valueOf(instance.getWeight()));
params.put(
“enable”
, String.valueOf(instance.isEnabled()));
params.put(
“healthy”
, String.valueOf(instance.isHealthy()));
params.put(
“ephemeral”
, String.valueOf(instance.isEphemeral()));
params.put(
“metadata”
, JacksonUtils.toJson(instance.getMetadata()));
//执行Http请求
reqApi(UtilAndComs.nacosUrlInstance, params, HttpMethod.POST);
}
|
/**
* Request api.
* 请求API
*
*
@param
api
api
*
@param
params
parameters
*
@param
body
body
*
@param
servers
servers
*
@param
method
http method
*
@return
result
*
@throws
NacosException nacos exception
*/
public
String reqApi(String api, Map<String, String> params, Map<String, String> body, List<String> servers,
String method)
throws
NacosException {
params.put(CommonParams.
NAMESPACE_ID
, getNamespaceId());
if
(CollectionUtils.
isEmpty
(servers) && StringUtils.
isBlank
(
nacosDomain
)) {
throw new
NacosException(NacosException.
INVALID_PARAM
,
“no server available”
);
}
NacosException exception =
new
NacosException();
if
(StringUtils.
isNotBlank
(
nacosDomain
)) {
for
(
int
i =
0
; i <
maxRetry
; i++) {
try
{
// 远程调用 入口
return
callServer
(api, params, body,
nacosDomain
, method);
}
catch
(NacosException e) {
exception = e;
if
(
NAMING_LOGGER
.isDebugEnabled()) {
NAMING_LOGGER
.debug(
“request {} failed.”
,
nacosDomain
, e);
}
}
}
}
else
{
Random random =
new
Random(System.
currentTimeMillis
());
int
index = random.nextInt(servers.size());
for
(
int
i =
0
; i < servers.size(); i++) {
String server = servers.get(index);
try
{
return
callServer(api, params, body, server, method);
}
catch
(NacosException e) {
exception = e;
if
(
NAMING_LOGGER
.isDebugEnabled()) {
NAMING_LOGGER
.debug(
“request {} failed.”
, server, e);
}
}
index = (index +
1
) % servers.size();
}
}
NAMING_LOGGER
.error(
“request: {} failed, servers: {}, code: {}, msg: {}”
, api, servers, exception.getErrCode(),
exception.getErrMsg());
throw new
NacosException(exception.getErrCode(),
“failed to req API:”
+ api +
” after all servers(”
+ servers +
“) tried: ”
+ exception.getMessage());
}
|
/***
*执行远程调用
**/
public
String callServer(String api, Map<String, String> params, Map<String, String> body, String curServer,
String method)
throws
NacosException {
long
start = System.
currentTimeMillis
();
long
end =
0
;
injectSecurityInfo(params);
//封装请求头部
Header header = builderHeader();
//请求是Http还是Https协议
String url;
if
(curServer.startsWith(UtilAndComs.HTTPS) || curServer.startsWith(UtilAndComs.HTTP)) {
url = curServer + api;
}
else
{
if
(!IPUtil.containsPort(curServer)) {
curServer = curServer + IPUtil.IP_PORT_SPLITER + serverPort;
}
url = NamingHttpClientManager.getInstance().getPrefix() + curServer + api;
}
try
{
//执行远程请求,并获取结果集
HttpRestResult<String> restResult = nacosRestTemplate.exchangeForm(url, header, Query.newInstance().initParams(params), body, method, String.
class
);
end = System.
currentTimeMillis
();
MetricsMonitor.getNamingRequestMonitor(method, url, String.valueOf(restResult.getCode())).observe(end – start);
//结果集解析
if
(restResult.ok()) {
return
restResult.getData();
}
if
(HttpStatus.SC_NOT_MODIFIED == restResult.getCode()) {
return
StringUtils.EMPTY;
}
throw new
NacosException(restResult.getCode(), restResult.getMessage());
}
catch
(Exception e) {
NAMING_LOGGER.error(
“[NA] failed to request”
, e);
throw new
NacosException(NacosException.SERVER_ERROR, e);
}
}
|
执行远程
Http
请求的对象是
NacosRestTemplate
,该对象就是封装了普通的
Http
请求。
/**
* Execute the HTTP method to the given URI template, writing the given request entity to the request, and returns
* the response as {
@link
HttpRestResult}.
*
*
@param
url
url
*
@param
header
http header param
*
@param
query
http query param 查询条件封装
*
@param
bodyValues
http body param
*
@param
httpMethod
http method
*
@param
responseType
return type
*
@return
{
@link
HttpRestResult}
*
@throws
Exception ex
*/
public
<
T
> HttpRestResult<
T
> exchangeForm(String url, Header header, Query query, Map<String, String> bodyValues,
String httpMethod, Type responseType)
throws
Exception {
RequestHttpEntity requestHttpEntity =
new
RequestHttpEntity(
header.setContentType(MediaType.
APPLICATION_FORM_URLENCODED
), query, bodyValues);
return
execute
(url, httpMethod, requestHttpEntity, responseType);
}
|
执行远程调用请求 使用
http
协议
/**
* 执行远程请求
*
*
@param
url
*
@param
httpMethod
*
@param
requestEntity
*
@param
responseType
*
@param
<T>
*
@return
*
@throws
Exception
*/
@SuppressWarnings
(
“unchecked”
)
private
<
T
> HttpRestResult<
T
> execute(String url, String httpMethod, RequestHttpEntity requestEntity,
Type responseType)
throws
Exception {
// url:http://127.0.0.1:8848/nacos/v1/ns/instance
URI uri = HttpUtils.
buildUri
(url, requestEntity.getQuery());
if
(
logger
.isDebugEnabled()) {
logger
.debug(
“HTTP method: {}, url: {}, body: {}”
, httpMethod, uri, requestEntity.getBody());
}
ResponseHandler<
T
> responseHandler =
super
.selectResponseHandler(responseType);
HttpClientResponse response =
null
;
try
{
// HttpClientRequest执行远程调用
response =
this
.
requestClient
().
execute
(uri, httpMethod, requestEntity);
return
responseHandler.handle(response);
}
finally
{
if
(response !=
null
) {
response.close();
}
}
}
/**
* 获取HttpClientRequest
*/
private
HttpClientRequest
requestClient
() {
if
(CollectionUtils.
isNotEmpty
(
interceptors
)) {
if
(
logger
.isDebugEnabled()) {
logger
.debug(
“Execute via interceptors :{}”
,
interceptors
);
}
return new
InterceptingHttpClientRequest(
requestClient
,
interceptors
.iterator());
}
return
requestClient
;
}
|
这是远程调用使用的是
JdkHttpClientRequest
发起的远程调用
@Override
public
HttpClientResponse execute(URI uri, String httpMethod, RequestHttpEntity requestHttpEntity)
throws
Exception {
final
Object body = requestHttpEntity.getBody();
final
Header headers = requestHttpEntity.getHeaders();
replaceDefaultConfig(requestHttpEntity.getHttpClientConfig());
HttpURLConnection conn = (HttpURLConnection) uri.toURL().openConnection();
Map<String, String> headerMap = headers.getHeader();
if
(headerMap !=
null
&& headerMap.size() >
0
) {
for
(Map.Entry<String, String> entry : headerMap.entrySet()) {
conn.setRequestProperty(entry.getKey(), entry.getValue());
}
}
conn.setConnectTimeout(
this
.
httpClientConfig
.getConTimeOutMillis());
conn.setReadTimeout(
this
.
httpClientConfig
.getReadTimeOutMillis());
conn.setRequestMethod(httpMethod);
if
(body !=
null
&& !
“”
.equals(body)) {
String contentType = headers.getValue(HttpHeaderConsts.
CONTENT_TYPE
);
String bodyStr = JacksonUtils.
toJson
(body);
if
(MediaType.
APPLICATION_FORM_URLENCODED
.equals(contentType)) {
Map<String, String> map = JacksonUtils.
toObj
(bodyStr, HashMap.
class
);
bodyStr = HttpUtils.
encodingParams
(map, headers.getCharset());
}
if
(bodyStr !=
null
) {
conn.setDoOutput(
true
);
byte
[] b = bodyStr.getBytes();
conn.setRequestProperty(
“Content-Length”
, String.
valueOf
(b.
length
));
// 获取远程网络输入流 执行远程请求
OutputStream outputStream = conn.getOutputStream();
outputStream.write(b,
0
, b.
length
);
outputStream.flush();
IoUtils.
closeQuietly
(outputStream);
}
}
conn.connect();
return new
JdkHttpClientResponse(conn);
}
|
远程调用,调用的是
nacos
的服务信息。远程掉用
url:http://127.0.0.1:8848/nacos/v1/ns/instance
执行
Http
请求/
nacos
为服务根地址,在nacos-console配置文件中配置。
#*************** Spring Boot Related Configurations ***************#
### Default web context path:
默认web服务根地址
server.servlet.contextPath
=
/nacos
### Default web server port:
server.port
=
8848
|
远程调用
post
请求/v1/ns/instance为
nacos
服务地址,在nacos-naming包中。
HealthController
执行心跳检测服务
/**
* Health status related operation controller.
* 心跳相关控制器
*
@author
nkorange
*
@author
nanamikon
*
@since
0.8.0
*/
@RestController
(
“namingHealthController”
)
@RequestMapping
(UtilsAndCommons.
NACOS_NAMING_CONTEXT
+
“/health”
)
public class
HealthController {
… …
}
|
InstanceController
执行服务实例注册服务
/**
* Instance operation controller.
* 服务实例控制
*
@author
nkorange
*/
@RestController
// /vi/ns/instance
@RequestMapping
(UtilsAndCommons.
NACOS_NAMING_CONTEXT
+
“/instance”
)
public class
InstanceController {
/**
* Register new instance.
*
*
@param
request
http request
*
@return
‘ok’ if success
*
@throws
Exception any error during register
*/
@CanDistro
//Distro协议(数据临时一致性协议)
@PostMapping
@Secured
(parser = NamingResourceParser.
class
, action = ActionTypes.
WRITE
)
public
String register(HttpServletRequest request)
throws
Exception {
final
String namespaceId = WebUtils
.
optional
(request, CommonParams.
NAMESPACE_ID
, Constants.
DEFAULT_NAMESPACE_ID
);
final
String serviceName = WebUtils.
required
(request, CommonParams.
SERVICE_NAME
);
NamingUtils.
checkServiceNameFormat
(serviceName);
final
Instance instance = parseInstance(request);
// 注册服务实例
serviceManager
.registerInstance(namespaceId, serviceName, instance);
return
“ok”
;
}
}
|
1.
2
.
3.
服务发现
我们沿着案例中的服务发现方法调用找到
nacos
–
api
中的NamingService.
getAllInstances
() 并找到它的实现类和方法com.alibaba.nacos.client.naming.NacosNamingService.
getAllInstances
() ,代码如下:
@Override
public
List<Instance> getAllInstances(String serviceName, String groupName, List<String> clusters,
boolean
subscribe)
throws
NacosException {
ServiceInfo serviceInfo;
if
(subscribe) {
// 开启服务订阅则从本地获取服务列表 本地服务列表存储在serviceInfoMap中
// 服务订阅会将服务列表定时更新存储到serviceInfoMao中
serviceInfo =
hostReactor
.
getServiceInfo
(NamingUtils.
getGroupedName
(serviceName, groupName),
StringUtils.
join
(clusters,
“,”
));
}
else
{
// 没有开启服务订阅从远端获取 请求nacos服务发起远程调用获取服务信息
serviceInfo =
hostReactor
.getServiceInfoDirectlyFromServer(NamingUtils.
getGroupedName
(serviceName, groupName),
StringUtils.
join
(clusters,
“,”
));
}
List<Instance> list;
if
(serviceInfo ==
null
|| CollectionUtils.
isEmpty
(list = serviceInfo.getHosts())) {
return new
ArrayList<Instance>();
}
return
list;
}
|
上面的代码调用了hostReactor.
getServiceInfo
() 方法
。
public
ServiceInfo getServiceInfo(
final
String serviceName,
final
String clusters) {
NAMING_LOGGER.debug(
“failover-mode: ”
+ failoverReactor.isFailoverSwitch());
String key = ServiceInfo.getKey(serviceName, clusters);
if
(failoverReactor.isFailoverSwitch()) {
return
failoverReactor.getService(key);
}
/*1。先从本地缓存中获取服务对象,因为启动是第一次进来,所以缓存站不存在*/
ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);
if
(
null
== serviceObj) {
/*构建服务实例*/
serviceObj =
new
ServiceInfo(serviceName, clusters);
/*将服务实例存放到缓存中*/
serviceInfoMap.put(serviceObj.getKey(), serviceObj);
/*更新nacos-上的服务*/
updatingMap.put(serviceName,
new
Object());
/*主动获取,并且更新到缓存本地,以及已过期的服务更新等*/
updateServiceNow(serviceName, clusters);
updatingMap.remove(serviceName);
}
else if
(updatingMap.containsKey(serviceName)) {
if
(UPDATE_HOLD_INTERVAL >
0
) {
updateServiceNow(serviceName, clusters);
// hold a moment waiting for update finish
synchronized
(serviceObj) {
try
{
serviceObj.wait(UPDATE_HOLD_INTERVAL);
}
catch
(InterruptedException e) {
NAMING_LOGGER
.error(
“[getServiceInfo] serviceName:”
+ serviceName +
“, clusters:”
+ clusters, e);
}
}
}
}
/*2.开启定时任务*/
scheduleUpdateIfAbsent(serviceName, clusters);
return
serviceInfoMap.get(serviceObj.getKey());
}
|
该方法会先调用
getServiceInfo0
() 方法从本地缓存获取数据
。
private
ServiceInfo getServiceInfo0(String serviceName, String clusters) {
// 获取对应的key值 Group@服务名 格式
String key = ServiceInfo.
getKey
(serviceName, clusters);
// 获取key对应的缓存 这里的值是NamingService执行定时任务获取的
return
serviceInfoMap
.get(key);
}
|
缓存没有数据,就构建实例更新到
Nacos
,并从
Nacos
中获取最新数据
。
updateServiceNow
(serviceName, clusters);
主
动
从远程服务器获取更新数据
。
private void
updateServiceNow(String serviceName, String clusters) {
try
{
updateService(serviceName, clusters);
}
catch
(NacosException e) {
NAMING_LOGGER
.error(
“[NA] failed to update serviceName: ”
+ serviceName, e);
}
}
|
最终会调用
updateService
()方法,在该方法中完成远程请求和数据处理,源码如下:
/**
* Update service now.
*
*
@param
serviceName
service name
*
@param
clusters
clusters
*/
public void
updateService(String serviceName, String clusters)
throws
NacosException {
// 从本地缓存列表获取服务
ServiceInfo oldService =
getServiceInfo0
(serviceName, clusters);
try
{
// 代理发起http请求远程调用 获取服务以及提供者端口信息,端口等
String result =
serverProxy
.queryList(serviceName, clusters,
pushReceiver
.getUdpPort(),
false
);
if
(StringUtils.
isNotEmpty
(result)) {
// 反序列化服务信息 并存储到serviceInfoMap中
processServiceJson
(result);
}
}
finally
{
if
(oldService !=
null
) {
synchronized
(oldService) {
oldService.notifyAll();
}
}
}
}
|
getServiceInfo0
(
)
方法在前面已经介绍过了。
processServiceJson
(
)
方法在服务创建介绍过。
回到开头,没有开启服务订阅会从远端获取,请求
nacos
服务发起远程调用获取服务信息。调用getServiceInfoDirectlyFromServer()方法。
/**
* 直接从nacos服务获取服务信息
*/
public
ServiceInfo getServiceInfoDirectlyFromServer(
final
String serviceName,
final
String clusters)
throws
NacosException {
String result =
serverProxy
.
queryList
(serviceName, clusters,
0
,
false
);
if
(StringUtils.
isNotEmpty
(result)) {
return
JacksonUtils.
toObj
(result, ServiceInfo.
class
);
}
return null
;
}
|
这里仍然是
NamingService
对象创建的代理对象发起远程调用获取服务信息
queryList
(
)
。
/**
* Query instance list.
*/
public
String queryList(String serviceName, String clusters,
int
udpPort,
boolean
healthyOnly)
throws
NacosException {
final
Map<String, String> params =
new
HashMap<String, String>(
8
);
params.put(CommonParams.
NAMESPACE_ID
,
namespaceId
);
params.put(CommonParams.
SERVICE_NAME
, serviceName);
params.put(
“clusters”
, clusters);
params.put(
“udpPort”
, String.
valueOf
(udpPort));
params.put(
“clientIP”
, NetUtils.
localIP
());
params.put(
“healthyOnly”
, String.
valueOf
(healthyOnly));
return
reqApi
(UtilAndComs.
nacosUrlBase
+
“/instance/list”
, params, HttpMethod.
GET
);
}
|
reqApi
(
)
在服务注册环节已经介绍过了。
1.
2
.
4.
服务下线
我们沿着案例中的服务下线方法调用找到
nacos
–
api
中的NamingService.
deregisterInstance
()并找到它的实现类和方法 NacosNamingService.deregisterInstance(),代码如下:
@Override
public void
deregisterInstance(String serviceName, String groupName, String ip,
int
port, String clusterName)
throws
NacosException {
//构建实例信息
Instance instance =
new
Instance();
instance.setIp(ip);
instance.setPort(port);
instance.setClusterName(clusterName);
//服务下线操作
deregisterInstance(serviceName, groupName, instance);
}
@Override
public void
deregisterInstance(String serviceName, String groupName, Instance instance)
throws
NacosException {
if
(instance.isEphemeral()) {
//移除心跳信息监测的定时任务
beatReactor.removeBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), instance.getIp(),
instance.getPort());
}
//发送远程请求执行服务下线销毁操作
serverProxy.deregisterService(NamingUtils.getGroupedName(serviceName, groupName), instance);
}
|
发起远程
delete
请求。
/**
* deregister instance from a service.
*
*
@param
serviceName
name of service
*
@param
instance
instance
*
@throws
NacosException nacos exception
*/
public void
deregisterService(String serviceName, Instance instance)
throws
NacosException {
NAMING_LOGGER
.info(
“[DEREGISTER-SERVICE] {} deregistering service {} with instance: {}”
,
namespaceId
, serviceName,
instance);
final
Map<String, String> params =
new
HashMap<String, String>(
8
);
params.put(CommonParams.
NAMESPACE_ID
,
namespaceId
);
params.put(CommonParams.
SERVICE_NAME
, serviceName);
params.put(CommonParams.
CLUSTER_NAME
, instance.getClusterName());
params.put(
“ip”
, instance.getIp());
params.put(
“port”
, String.
valueOf
(instance.getPort()));
params.put(
“ephemeral”
, String.
valueOf
(instance.isEphemeral()));
reqApi(UtilAndComs.
nacosUrlInstance
, params, HttpMethod.
DELETE
);
}
|
这里会远程调用
nacos
–
naming
服务
InstanceController
接口中的
delete
方法销毁服务。
/**
* Deregister instances.
*
*
@param
request
http request
*
@return
‘ok’ if success
*
@throws
Exception any error during deregister
*/
@CanDistro
@DeleteMapping
@Secured
(parser = NamingResourceParser.
class
, action = ActionTypes.
WRITE
)
public
String deregister(HttpServletRequest request)
throws
Exception {
Instance instance = getIpAddress(request);
String namespaceId = WebUtils.
optional
(request, CommonParams.
NAMESPACE_ID
, Constants.
DEFAULT_NAMESPACE_ID
);
String serviceName = WebUtils.
required
(request, CommonParams.
SERVICE_NAME
);
NamingUtils.
checkServiceNameFormat
(serviceName);
Service service =
serviceManager
.getService(namespaceId, serviceName);
if
(service ==
null
) {
Loggers.
SRV_LOG
.warn(
“remove instance from non-exist service: {}”
, serviceName);
return
“ok”
;
}
// 移除服务数据
serviceManager
.removeInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
return
“ok”
;
}
|
服务下线方法比较简单,和服务注册做的事情正好相反,也做了两件事,第一件事:不在进行心跳检测。 第二件事:请求服务端服务下线接口。
1.
2
.
4.
服务订阅
我们可以查看订阅服务的案例,会先创建一个线程池,接下来会把线程池封装到监听器中,而监听器中可以监听指定实例信息,代码如下:
//服务订阅
Executor executor =
new
ThreadPoolExecutor(
1
,
1
,
0L
, TimeUnit.MILLISECONDS,
new
LinkedBlockingQueue<Runnable>(),
new
ThreadFactory() {
@Override
public
Thread newThread(Runnable r) {
Thread thread =
new
Thread(r);
thread.setName(
“test-thread”
);
return
thread;
}
});
naming.subscribe(
“nacos.test.3”
,
new
AbstractEventListener() {
//EventListener onEvent is sync to handle, If process too low in onEvent,maybe block other onEvent callback.
//So you can override getExecutor() to async handle event.
@Override
public
Executor getExecutor() {
return
executor
;
}
//读取监听到的服务实例
@Override
public void
onEvent(Event event) {
System.
out
.println(((NamingEvent) event).getServiceName());
System.
out
.println(((NamingEvent) event).getInstances());
}
});
|
我们沿着案例中的服务订阅方法调用找到
nacos
–
api
中的NamingService.
subscribe
()并找到它的实现类和方法NacosNamingService.
deregisterInstance
(),代码如下:
public void
subscribe(String serviceName, String clusters, EventListener eventListener) {
//注册监听
notifier.registerListener(serviceName, clusters, eventListener);
//获取并更新服务实例
getServiceInfo(serviceName, clusters);
}
|
此时会注册监听,注册监听就是将当前的监听对象信息注入到
listenerMap
集合中,在监听对象的指定方法
onEvent
中可以读取实例信息,代码如下:
public void
registerListener(String serviceName, String clusters, EventListener listener) {
String key = ServiceInfo.getKey(serviceName, clusters);
ConcurrentHashSet<EventListener> eventListeners = listenerMap.get(key);
if
(eventListeners ==
null
) {
synchronized
(lock) {
eventListeners = listenerMap.get(key);
if
(eventListeners ==
null
) {
eventListeners =
new
ConcurrentHashSet<EventListener>();
listenerMap.put(key, eventListeners);
}
}
}
//将当前监听对象放入到集合中,在监听对象的onEvent中可以读出对应的实例对象
eventListeners.add(listener);
}
|
get
ServiceInfo
(
serviceName
,
clusters
)
获取服务实例,先从本地缓存获取,本地获取不到就从服务器获取,前面服务发现已经介绍过了。
1.
3.
服务端工作流程
注册中心服务端的主要功能包括,接收客户端的
服务注册
,
服务发现
,
服务下线
的功能,但是除了这些和客户端的交互之外,服务端还要做一些更重要的事情,就是我们常常会在分布式系统中听到的
AP
和
CP
,作为一个集群,
nacos
即实现了
AP
也实现了
CP
,其中
AP
使用的自己实现的
Distro
协议
,而
CP
是采用
raft
协议
实现的,这个过程中牵涉到
心跳
、
选主
等操作。
我们来学习一下注册中心服务端接收客户端服务注册的功能。
1.
3
.1
.
注册处理
我们先来学习一下
Nacos
的工具类
WebUtils
,该工具类在
nacos
–
core
工程下,该工具类是用于
处理请求参数转化
的,里面提供了
2
个常被用到的方法
required
()和
optional
():
required
方法通过参数名key,解析HttpServletRequest请求中的参数,并转码为UTF-8编码。
optional
方法在required方法的基础上增加了默认值,如果获取不到,则返回默认值。
|
代码如下:
/**
* required方法通过参数名key,解析HttpServletRequest请求中的参数,并转码为UTF-8编码。
*/
public static
String required(
final
HttpServletRequest req,
final
String key) {
String value = req.getParameter(key);
if
(StringUtils.isEmpty(value)) {
throw new
IllegalArgumentException(
“Param ‘”
+ key +
“‘ is required.”
);
}
String encoding = req.getParameter(
“encoding”
);
return
resolveValue(value, encoding);
}
/**
* optional方法在required方法的基础上增加了默认值,如果获取不到,则返回默认值。
*/
public static
String optional(
final
HttpServletRequest req,
final
String key,
final
String defaultValue) {
if
(!req.getParameterMap().containsKey(key) ||
req.getParameterMap().get(key)[
0
] ==
null
) {
return
defaultValue;
}
String value = req.getParameter(key);
if
(StringUtils.isBlank(value)) {
return
defaultValue;
}
String encoding = req.getParameter(
“encoding”
);
return
resolveValue(value, encoding);
}
|
nacos server
–
client
使用了
http
协议来交互,那么在
server
端必定提供了
http
接口的入口,并且在
core
模块看到其依赖了
spring boot starter
,所以它的
http
接口由集成了
Spring
的
web
服务器支持,简单地说就是像我们平时写的业务服务一样,有
controller
层和
service
层。
以
OpenAPI
作为入口来学习,我们找到 /nacos/v1/ns/instance服务注册接口,在
nacos
–
naming
工程中我们可以看到
InstanceController
正是我们要找的对象,如下图:

处理服务注册,我们直接找对应的
POST
方法即可,代码如下:
/**
* Register new instance.
* 接收客户端注册信息
*
*
@param
request
http request
*
@return
‘ok’ if success
*
@throws
Exception any error during register
*/
@CanDistro
@PostMapping
@Secured(parser = NamingResourceParser.
class
, action = ActionTypes.WRITE)
public
String register(HttpServletRequest request)
throws
Exception {
//获取namespaceid,该参数是可选参数
final
String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
//获取服务名字
final
String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
//校验服务的名字,服务的名字格式为groupName@@serviceName
NamingUtils.checkServiceNameFormat(serviceName);
//创建实例
final
Instance instance = parseInstance(request);
//注册服务
serviceManager.registerInstance(namespaceId, serviceName, instance);
return
“ok”
;
}
|
如上图,该方法主要用于接收客户端
注册信息
,并且会校验参数是否存在问题,如果不存在问题就创建服务的实例,服务实例创建后将服务实例注册到
Nacos
中,注册的方法如下:
public void
registerInstance(String namespaceId, String serviceName, Instance instance)
throws
NacosException {
//判断本地缓存中是否存在该命名空间,如果不存在就创建,之后判断该命名空间下是否
//存在该服务,如果不存在就创建空的服务
//如果实例为空,则创建实例,并且会将创建的实例存入到serviceMap集合中
createEmptyService(namespaceId, serviceName, instance.isEphemeral());
//从serviceMap集合中获取创建的实例
Service service = getService(namespaceId, serviceName);
if
(service ==
null
) {
throw new
NacosException(NacosException.INVALID_PARAM,
“service not found, namespace: ”
+ namespaceId +
“, service: ”
+ serviceName);
}
//服务注册,这一步才会把服务的实例信息和服务绑定起来
addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
}
|
注册的方法中会先创建该实例对象,创建前先检查本地缓存是否存在该实例对象,如果不存在就创建,最后注册该服务,并且该服务会和实例信息捆绑到一起,并将信息同步到磁盘,数据同步到磁盘就涉及到数据一致性了,我们接下来讲解
Nacos
的数据一致性。
1.
3
.2
.
一致性算法Distro协议介绍
Distro
是阿里巴巴的
私有协议
,目前流行的
Nacos
服务管理框架就采用了
Distro
协议。
Distro
协议被定位为临时数据的一致性协议:该类型协议,不需要把数据存储到磁盘或者数据库,因为临时数据通常和服务器保持一个
session
会话,该会话只要存在,数据就不会丢失 。
Distro
协议保证
写必须永远是成功
的,即使可能会发生网络分区。当网络恢复时,把各数据分片的数据进行合并。
Distro
协议具有以下特点:
1
:
专门为了注册中心而创造出的协议;
2
:
客户端与服务端有两个重要的交互,服务注册与心跳发送;
3
:
客户端以服务为维度向服务端注册,注册后每隔一段时间向服务端发送一次心跳,心跳包需要带上注册服
务的全部信息,在客户端看来,服务端节点对等,所以请求的节点是随机的;
4
:
客户端请求失败则换一个节点重新发送请求;
5
:
服务端节点都存储所有数据,但每个节点只负责其中一部分服务,在接收到客户端的“写”
(
注册、心
跳、下线等
)
请求后,服务端节点判断请求的服务是否为自己负责,如果是,则处理,否则交由负责的节点
处理;
6
:
每个服务端节点主动发送健康检查到其他节点,响应的节点被该节点视为健康节点;
7
:
服务端在接收到客户端的服务心跳后,如果该服务不存在,则将该心跳请求当做注册请求来处理;
8
:
服务端如果长时间未收到客户端心跳,则下线该服务;
9
:
负责的节点在接收到服务注册、服务心跳等写请求后将数据写入后即返回,后台异步地将数据同步给其他
节点;
10
:
节点在收到读请求后直接从本机获取后返回,无论数据是否为最新。
|
1.
3
.3
.
Distro
服务启动-
寻址模式
Distro
协议服务端节点发现使用
寻址机制
来实现服务端节点的管理。在
Nacos
中,寻址模式有三种:
单机模式
(
StandaloneMemberLookup
)
文件模式
(
FileConfigMemberLookup
)
服务器模式
(
AddressServerMemberLookup
)
|
三种寻址模式如下图:
在com.alibaba.nacos.core.cluster.lookup.LookupFactory中有创建寻址方式,可以创建集群启动方式、单机启动方式,不同启动方式就决定了不同寻址模式。
/**
* Create the target addressing pattern.
* 创建寻址模式
*
*
@param
memberManager
{
@link
ServerMemberManager}
*
@return
{
@link
MemberLookup}
*
@throws
NacosException NacosException
*/
public static
MemberLookup createLookUp(ServerMemberManager memberManager)
throws
NacosException {
//NacosServer 集群方式启动
if
(!EnvUtil.getStandaloneMode()) {
String lookupType = EnvUtil.getProperty(LOOKUP_MODE_TYPE);
//由参数中传入的寻址方式得到LookupType对象
LookupType type = chooseLookup(lookupType);
//选择寻址方式
LOOK_UP =
find
(type);
//设置当前寻址方式
currentLookupType = type;
}
else
{
//NacosServer单机启动
LOOK_UP =
new
StandaloneMemberLookup();
}
LOOK_UP.injectMemberManager(memberManager);
Loggers.CLUSTER.info(
“Current addressing mode selection : {}”
,
LOOK_UP.getClass().getSimpleName());
return
LOOK_UP;
}
/***
* 选择寻址方式
*
@param
type
*
@return
*/
private static
MemberLookup find(LookupType type) {
//文件寻址模式,也就是配置cluster.conf配置文件将多个节点串联起来,
// 通过配置文件寻找其他节点,以达到和其他节点通信的目的
if
(LookupType.FILE_CONFIG.equals(type)) {
LOOK_UP =
new
FileConfigMemberLookup();
return
LOOK_UP;
}
//服务器模式
if
(LookupType.ADDRESS_SERVER.equals(type)) {
LOOK_UP =
new
AddressServerMemberLookup();
return
LOOK_UP;
}
// unpossible to run here
throw new
IllegalArgumentException();
}
|
单节点寻址模式
会直接创建StandaloneMemberLookup对象,而
文件寻址模式
会创建FileConfigMemberLookup对象,
服务器寻址模式
会创建AddressServerMemberLookup;
1.
3
.3.1
.
单机
寻址
模式
单机模式直接寻找自己的IP:PORT地址。
public class
StandaloneMemberLookup
extends
AbstractMemberLookup {
@Override
public void
start() {
if
(
start
.compareAndSet(
false
,
true
)) {
// 获取自己的IP:port
String url = InetUtils.
getSelfIP
() +
“:”
+ EnvUtil.
getPort
();
afterLookup(MemberUtil.
readServerConf
(Collections.
singletonList
(url)));
}
}
}
|
1.
3
.3.2.文件寻址模式

文件寻址模式主要在创建集群的时候,通过
cluster
.conf 来配置集群,程序可以通过监听
cluster
.conf 文件变化实现动态管理节点,FileConfigMemberLookup源码如下:
public class
FileConfigMemberLookup
extends
AbstractMemberLookup {
//创建文件监听器
private
FileWatcher
watcher
=
new
FileWatcher() {
//文件发生变更事件
@Override
public void
onChange(FileChangeEvent event) {
readClusterConfFromDisk();
}
//检查context是否包含cluster.conf
@Override
public boolean
interest(String context) {
return
StringUtils.contains(context,
“cluster.conf”
);
}
};
@Override
public void
start()
throws
NacosException {
if
(start.compareAndSet(
false
,
true
)) {
readClusterConfFromDisk();
// 使用inotify机制来监视文件更改,并自动触发对cluster.conf的读取
try
{
WatchFileCenter.registerWatcher(EnvUtil.getConfPath(),
watcher
);
}
catch
(Throwable e) {
Loggers.CLUSTER.error(
“An exception occurred in the launch file monitor : {}”
, e.getMessage());
}
}
}
@Override
public void
destroy()
throws
NacosException {
WatchFileCenter.deregisterWatcher(EnvUtil.getConfPath(),
watcher
);
}
private void
readClusterConfFromDisk() {
Collection<Member> tmpMembers =
new
ArrayList<>();
try
{
List<String> tmp = EnvUtil.readClusterConf();
tmpMembers = MemberUtil.readServerConf(tmp);
}
catch
(Throwable e) {
Loggers.CLUSTER.error(
“nacos-XXXX [serverlist] failed to get serverlist from disk!, error : {}”
, e.getMessage());
}
afterLookup(tmpMembers);
}
}
|
1.
3
.3.3
.
服务器寻址模式
使用地址服务器存储节点信息,会创建AddressServerMemberLookup,服务端定时拉取信息进行管理;
public class
AddressServerMemberLookup
extends
AbstractMemberLookup {
private final
GenericType<RestResult<String>>
genericType
=
new
GenericType<RestResult<String>>() {
};
public
String
domainName
;
public
String
addressPort
;
public
String
addressUrl
;
public
String
envIdUrl
;
public
String
addressServerUrl
;
private volatile boolean
isAddressServerHealth
=
true
;
private int
addressServerFailCount
=
0
;
private int
maxFailCount
=
12
;
private final
NacosRestTemplate
restTemplate
=
HttpClientBeanHolder.getNacosRestTemplate(Loggers.CORE);
private volatile boolean
shutdown
=
false
;
@Override
public void
start()
throws
NacosException {
if
(start.compareAndSet(
false
,
true
)) {
this
.
maxFailCount
= Integer.
parseInt
(EnvUtil.getProperty(
“maxHealthCheckFailCount”
,
“12”
));
initAddressSys();
run();
}
}
/***
* 获取服务器地址
*/
private void
initAddressSys() {
String envDomainName = System.
getenv
(
“address_server_domain”
);
if
(StringUtils.isBlank(envDomainName)) {
domainName
= EnvUtil.getProperty(
“address.server.domain”
,
“jmenv.tbsite.net”
);
}
else
{
domainName
= envDomainName;
}
String envAddressPort = System.
getenv
(
“address_server_port”
);
if
(StringUtils.isBlank(envAddressPort)) {
addressPort
= EnvUtil.getProperty(
“address.server.port”
,
“8080”
);
}
else
{
addressPort
= envAddressPort;
}
String envAddressUrl = System.
getenv
(
“address_server_url”
);
if
(StringUtils.isBlank(envAddressUrl)) {
addressUrl
= EnvUtil.getProperty(
“address.server.url”
, EnvUtil.getContextPath() +
“/”
+
“serverlist”
);
}
else
{
addressUrl
= envAddressUrl;
}
addressServerUrl
=
“http://”
+
domainName
+
“:”
+
addressPort
+
addressUrl
;
envIdUrl
=
“http://”
+
domainName
+
“:”
+
addressPort
+
“/env”
;
Loggers.CORE.info(
“ServerListService address-server port:”
+
addressPort
);
Loggers.CORE.info(
“ADDRESS_SERVER_URL:”
+
addressServerUrl
);
}
@SuppressWarnings
(
“PMD.UndefineMagicConstantRule”
)
private void
run()
throws
NacosException {
// With the address server, you need to perform a synchronous member node pull at startup
// Repeat three times, successfully jump out
boolean
success =
false
;
Throwable ex =
null
;
int
maxRetry = EnvUtil.getProperty(
“nacos.core.address-server.retry”
, Integer.
class
,
5
);
for
(
int
i =
0
; i < maxRetry; i++) {
try
{
//拉取集群节点信息
syncFromAddressUrl();
success =
true
;
break
;
}
catch
(Throwable e) {
ex = e;
Loggers.CLUSTER.error(
“[serverlist] exception, error : {}”
, ExceptionUtil.getAllExceptionMsg(ex));
}
}
if
(!success) {
throw new
NacosException(NacosException.SERVER_ERROR, ex);
}
//创建定时任务
GlobalExecutor.scheduleByCommon(
new
AddressServerSyncTask(),
5_000L
);
}
@Override
public void
destroy()
throws
NacosException {
shutdown
=
true
;
}
@Override
public
Map<String, Object> info() {
Map<String, Object> info =
new
HashMap<>(
4
);
info.put(
“addressServerHealth”
,
isAddressServerHealth
);
info.put(
“addressServerUrl”
,
addressServerUrl
);
info.put(
“envIdUrl”
,
envIdUrl
);
info.put(
“addressServerFailCount”
,
addressServerFailCount
);
return
info;
}
private void
syncFromAddressUrl()
throws
Exception {
RestResult<String> result =
restTemplate
.get(
addressServerUrl
, Header.EMPTY, Query.EMPTY,
genericType
.getType());
if
(result.ok()) {
isAddressServerHealth
=
true
;
Reader reader =
new
StringReader(result.getData());
try
{
afterLookup(MemberUtil.readServerConf(EnvUtil.analyzeClusterConf(reader)));
}
catch
(Throwable e) {
Loggers.CLUSTER.error(
“[serverlist] exception for analyzeClusterConf, error : {}”
, ExceptionUtil.getAllExceptionMsg(e));
}
addressServerFailCount
=
0
;
}
else
{
addressServerFailCount
++;
if
(
addressServerFailCount
>=
maxFailCount
) {
isAddressServerHealth
=
false
;
}
Loggers.CLUSTER.error(
“[serverlist] failed to get serverlist, error code {}”
, result.getCode());
}
}
// 定时任务
class
AddressServerSyncTask
implements
Runnable {
@Override
public void
run() {
if
(
shutdown
) {
return
;
}
try
{
//拉取服务列表
syncFromAddressUrl();
}
catch
(Throwable ex) {
addressServerFailCount
++;
if
(
addressServerFailCount
>=
maxFailCount
) {
isAddressServerHealth
=
false
;
}
Loggers.CLUSTER.error(
“[serverlist] exception, error : {}”
, ExceptionUtil.getAllExceptionMsg(ex));
}
finally
{
GlobalExecutor.scheduleByCommon(
this
,
5_000L
);
}
}
}
}
|
1.
3
.
5.集群
数据同步
Nacos
数据同步分为
全量同步
和
增量同步
,所谓全量同步就是初始化数据一次性同步,而
增量同步
是指有数据增加的时候,只同步增加的数据。
1.
3
.
5
.1
.
全量同步
全量同步流程比较复杂,流程如上图:
1
:
启动一个定时任务线程DistroLoadDataTask加载数据,调用load()方法加载数据
2
:
调用loadAllDataSnapshotFromRemote()方法从远程机器同步所有的数据
3
:
从namingProxy代理获取所有的数据data
4
:
构造http请求,调用httpGet方法从指定的server获取数据
5
:
从获取的结果result中获取数据bytes
6
:
处理数据processData
7
:
从data反序列化出datumMap
8
:
把数据存储到dataStore,也就是本地缓存dataMap
9
:
监听器不包括key,就创建一个空的service,并且绑定监听器
10
:
监听器listener执行成功后,就更新data store
|
◆
任务启动
在com.alibaba.nacos.core.distributed.distro.
DistroProtocol
的构造函数中调用
startDistroTask
()方法,该方法会执行
startVerifyTask
()和
startLoadTask
() ,我们重点关注
startLoadTask
() ,该方法代码如下:
/***
* 启动DistroTask
*/
private void
startDistroTask() {
if
(EnvUtil.getStandaloneMode()) {
isInitialized =
true
;
return
;
}
//启动startVerifyTask,做数据同步校验
startVerifyTask();
//启动DistroLoadDataTask,批量加载数据
startLoadTask();
}
//启动DistroLoadDataTask
private void
startLoadTask() {
//处理状态回调对象
DistroCallback loadCallback =
new
DistroCallback() {
//处理成功
@Override
public void
onSuccess() {
isInitialized =
true
;
}
//处理失败
@Override
public void
onFailed(Throwable throwable) {
isInitialized =
false
;
}
};
//执行DistroLoadDataTask,是一个多线程
GlobalExecutor.submitLoadDataTask(
new
DistroLoadDataTask(memberManager, distroComponentHolder, distroConfig, loadCallback));
}
/***
* 启动startVerifyTask
* 数据校验
*/
private void
startVerifyTask() {
GlobalExecutor.schedulePartitionDataTimedSync(
new
DistroVerifyTask(memberManager, distroComponentHolder), distroConfig.getVerifyIntervalMillis());
}
|
数据校验
public class
DistroVerifyTask
implements
Runnable {
private final
ServerMemberManager
serverMemberManager
;
private final
DistroComponentHolder
distroComponentHolder
;
public
DistroVerifyTask(ServerMemberManager serverMemberManager, DistroComponentHolder distroComponentHolder) {
this
.
serverMemberManager
= serverMemberManager;
this
.
distroComponentHolder
= distroComponentHolder;
}
@Override
public void
run() {
try
{
// 获取集群中所有节点
List<Member> targetServer =
serverMemberManager
.allMembersWithoutSelf();
if
(Loggers.
DISTRO
.isDebugEnabled()) {
Loggers.
DISTRO
.debug(
“server list is: {}”
, targetServer);
}
for
(String each :
distroComponentHolder
.getDataStorageTypes()) {
// 同步数据
校验
verifyForDataStorage(each, targetServer);
}
}
catch
(Exception e) {
Loggers.
DISTRO
.error(
“[DISTRO-FAILED] verify task failed.”
, e);
}
}
private void
verifyForDataStorage(String type, List<Member> targetServer) {
DistroData distroData =
distroComponentHolder
.findDataStorage(type).getVerifyData();
if
(
null
== distroData) {
return
;
}
distroData.setType(DataOperation.
VERIFY
);
for
(Member member : targetServer) {
try
{
// 同步数据
校验
distroComponentHolder
.findTransportAgent(type).
syncVerifyData
(distroData, member.getAddress());
}
catch
(Exception e) {
Loggers.
DISTRO
.error(String
.
format
(
“[DISTRO-FAILED] verify data for type %s to %s failed.”
, type, member.getAddress()), e);
}
}
}
}
|
执行校验
@Override
public boolean
syncVerifyData(DistroData verifyData, String targetServer) {
if
(!
memberManager
.hasMember(targetServer)) {
return true
;
}
NamingProxy.
syncCheckSums
(verifyData.getContent(), targetServer);
return true
;
}
/**
* 同步检查总结
*/
public static void
syncCheckSums(
byte
[] checksums, String server) {
try
{
Map<String, String> headers =
new
HashMap<>(
128
);
headers.put(HttpHeaderConsts.
CLIENT_VERSION_HEADER
, VersionUtils.
version
);
headers.put(HttpHeaderConsts.
USER_AGENT_HEADER
, UtilsAndCommons.
SERVER_VERSION
);
headers.put(HttpHeaderConsts.
CONNECTION
,
“Keep-Alive”
);
HttpClient.
asyncHttpPutLarge
(
“http://”
+ server + EnvUtil.
getContextPath
() + UtilsAndCommons.
NACOS_NAMING_CONTEXT
+
TIMESTAMP_SYNC_URL
+
“?source=”
+ NetUtils.
localServer
(), headers, checksums,
new
Callback<String>() {
@Override
public void
onReceive(RestResult<String> result) {
if
(!result.ok()) {
Loggers.
DISTRO
.error(
“failed to req API: {}, code: {}, msg: {}”
,
“http://”
+
server
+ EnvUtil.
getContextPath
()
+ UtilsAndCommons.
NACOS_NAMING_CONTEXT
+
TIMESTAMP_SYNC_URL
,
result.getCode(), result.getMessage());
}
}
@Override
public void
onError(Throwable throwable) {
Loggers.
DISTRO
.error(
“failed to req API:”
+
“http://”
+
server
+ EnvUtil.
getContextPath
()
+ UtilsAndCommons.
NACOS_NAMING_CONTEXT
+
TIMESTAMP_SYNC_URL
, throwable);
}
@Override
public void
onCancel() {
}
});
}
catch
(Exception e) {
Loggers.
DISTRO
.warn(
“NamingProxy”
, e);
}
}
|
◆
数据执行加载
上面方法会调用
DistroLoadDataTask
对象,而该对象其实是个线程,因此会执行它的
run
方法,
run
方法会调用
load
()方法实现数据全量加载,代码如下:
/***
* 数据加载过程
*/
@Override
public void
run() {
try
{
//加载数据
load();
if
(!checkCompleted()) {
GlobalExecutor.submitLoadDataTask(
this
, distroConfig.getLoadDataRetryDelayMillis());
}
else
{
loadCallback.onSuccess();
Loggers.DISTRO.info(
“[DISTRO-INIT] load snapshot data success”
);
}
}
catch
(Exception e) {
loadCallback.onFailed(e);
Loggers.DISTRO.error(
“[DISTRO-INIT] load snapshot data failed. ”
, e);
}
}
/***
* 加载数据,并同步
*
@throws
Exception
*/
private void
load()
throws
Exception {
while
(memberManager.allMembersWithoutSelf().isEmpty()) {
Loggers.DISTRO.info(
“[DISTRO-INIT] waiting server list init…”
);
TimeUnit.SECONDS.sleep(
1
);
}
while
(distroComponentHolder.getDataStorageTypes().isEmpty()) {
Loggers.DISTRO.info(
“[DISTRO-INIT] waiting distro data storage register…”
);
TimeUnit.SECONDS.sleep(
1
);
}
//同步数据
for
(String each : distroComponentHolder.getDataStorageTypes()) {
if
(!loadCompletedMap.containsKey(each) || !loadCompletedMap.get(each)) {
//从远程机器上同步所有数据
loadCompletedMap.put(each, loadAllDataSnapshotFromRemote(each));
}
}
}
|
◆
数据同步
数据同步会通过
Http
请求从远程服务器获取数据,并同步到当前服务的缓存中,执行流程如下:
1
:
loadAllDataSnapshotFromRemote()从远程加载所有数据,并处理同步到本机
2
:
transportAgent.getDatumSnapshot()远程加载数据,通过Http请求执行远程加载
3
:
dataProcessor.processSnapshot()处理数据同步到本地
|
数据处理完整逻辑代码如下:loadAllDataSnapshotFromRemote()方法
:
/***
* 从远程机器上同步所有数据
*/
private boolean
loadAllDataSnapshotFromRemote(String resourceType) {
DistroTransportAgent transportAgent = distroComponentHolder.findTransportAgent(resourceType);
DistroDataProcessor dataProcessor = distroComponentHolder.findDataProcessor(resourceType);
if
(
null
== transportAgent ||
null
== dataProcessor) {
Loggers.DISTRO.warn(
“[DISTRO-INIT] Can’t find component for type {}, transportAgent: {}, dataProcessor: {}”
,
resourceType, transportAgent, dataProcessor);
return false
;
}
//遍历集群成员节点,不包括自己
for
(Member each : memberManager.allMembersWithoutSelf()) {
try
{
Loggers.DISTRO.info(
“[DISTRO-INIT] load snapshot {} from {}”
, resourceType, each.getAddress());
//从远程节点加载数据,调用http请求接口: distro/datums;
DistroData distroData = transportAgent.getDatumSnapshot(each.getAddress());
//处理数据
boolean
result = dataProcessor.processSnapshot(distroData);
Loggers.DISTRO.info(
“[DISTRO-INIT] load snapshot {} from {} result: {}”
, resourceType, each.getAddress(), result);
if
(result) {
return true
;
}
}
catch
(Exception e) {
Loggers.DISTRO.error(
“[DISTRO-INIT] load snapshot {} from {} failed.”
, resourceType, each.getAddress(), e);
}
}
return false
;
}
|
远程加载数据代码如下:transportAgent.
getDatumSnapshot
() 方法
/***
* 从namingProxy代理获取所有的数据data,从获取的结果result中获取数据bytes;
*
@param
targetServer
target server.
*
@return
*/
@Override
public
DistroData getDatumSnapshot(String targetServer) {
try
{
//从namingProxy代理获取所有的数据data,从获取的结果result中获取数据bytes;
byte
[] allDatum = NamingProxy.getAllData(targetServer);
//将数据封装成DistroData
return new
DistroData(
new
DistroKey(
“snapshot”
, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), allDatum);
}
catch
(Exception e) {
throw new
DistroException(String.
format
(
“Get snapshot from %s failed.”
, targetServer), e);
}
}
/**
* Get all datum from target server.
* NamingProxy.getAllData
* 执行HttpGet请求,并获取返回数据
*
*
@param
server
target server address
*
@return
all datum byte array
*
@throws
Exception exception
*/
public static byte
[] getAllData(String server)
throws
Exception {
//参数封装
Map<String, String> params =
new
HashMap<>(
8
);
//组装URL,并执行HttpGet请求,获取结果集
RestResult<String> result = HttpClient.httpGet(
“http://”
+ server + EnvUtil.getContextPath() +
UtilsAndCommons.NACOS_NAMING_CONTEXT + ALL_DATA_GET_URL,
new
ArrayList<>(), params);
//返回数据
if
(result.ok()) {
return
result.getData().getBytes();
}
throw new
IOException(
“failed to req API: ”
+
“http://”
+ server + EnvUtil.getContextPath()
+ UtilsAndCommons.NACOS_NAMING_CONTEXT
+ ALL_DATA_GET_URL +
“. code:”
+ result.getCode() +
” msg: ”
+ result.getMessage());
}
|
处理数据同步到本地
@Override
public boolean
processSnapshot(DistroData distroData) {
try
{
return
processData
(distroData.getContent());
}
catch
(Exception e) {
return false
;
}
}
|
dataProcessor.
processSnapshot
()
/**
* 数据处理并更新本地缓存
*
*
@param
data
*
@return
*
@throws
Exception
*/
private boolean
processData(
byte
[] data)
throws
Exception {
if
(data.
length
>
0
) {
//从data反序列化出datumMap
Map<String, Datum<Instances>> datumMap = serializer.deserializeMap(data, Instances.
class
);
// 把数据存储到dataStore,也就是本地缓存dataMap
for
(Map.Entry<String, Datum<Instances>> entry : datumMap.entrySet()) {
dataStore.put(entry.getKey(), entry.getValue());
//监听器不包括key,就创建一个空的service,并且绑定监听器
if
(!listeners.containsKey(entry.getKey())) {
// pretty sure the service not exist:
if
(switchDomain.isDefaultInstanceEphemeral()) {
// create empty service
//创建一个空的service
Loggers.DISTRO.info(
“creating service {}”
, entry.getKey());
Service service =
new
Service();
String serviceName = KeyBuilder.getServiceName(entry.getKey());
String namespaceId = KeyBuilder.getNamespace(entry.getKey());
service.setName(serviceName);
service.setNamespaceId(namespaceId);
service.setGroupName(Constants.DEFAULT_GROUP);
// now validate the service. if failed, exception will be thrown
service.setLastModifiedMillis(System.
currentTimeMillis
());
service.recalculateChecksum();
// The Listener corresponding to the key value must not be empty
// 与键值对应的监听器不能为空,这里的监听器类型是 ServiceManager
RecordListener listener = listeners.get(KeyBuilder.SERVICE_META_KEY_PREFIX).peek();
if
(Objects.isNull(listener)) {
return false
;
}
//为空的绑定监听器
listener.onChange(KeyBuilder.buildServiceMetaKey(namespaceId, serviceName), service);
}
}
}
//循环所有datumMap
for
(Map.Entry<String, Datum<Instances>> entry : datumMap.entrySet()) {
if
(!listeners.containsKey(entry.getKey())) {
// Should not happen:
Loggers.DISTRO.warn(
“listener of {} not found.”
, entry.getKey());
continue
;
}
try
{
//执行监听器的onChange监听方法
for
(RecordListener listener : listeners.get(entry.getKey())) {
listener.onChange(entry.getKey(), entry.getValue().value);
}
}
catch
(Exception e) {
Loggers.DISTRO.error(
“[NACOS-DISTRO] error while execute listener of key: {}”
, entry.getKey(), e);
continue
;
}
// Update data store if listener executed successfully:
// 监听器listener执行成功后,就更新dataStore
dataStore.put(entry.getKey(), entry.getValue());
}
}
return true
;
}
|
到此实现数据全量同步,其实全量同步最终封装的协议还是
Http
。
1.
3
.
5
.2
.
增量同步
新增数据使用异步广播同步:
1
:
DistroProtocol 使用 sync() 方法接收增量数据
2
:
向其他节点发布广播任务
调用 distroTaskEngineHolder 发布延迟任务
3
:
调用 DistroDelayTaskProcessor.process() 方法进行任务投递:将延迟任务转换为异步变更任务
4
:
执行变更任务 DistroSyncChangeTask.run() 方法:向指定节点发送消息
调用 DistroHttpAgent.syncData() 方法发送数据
调用 NamingProxy.syncData() 方法发送数据
5
:
异常任务调用 handleFailedTask() 方法进行处理
调用 DistroFailedTaskHandler 处理失败任务
调用 DistroHttpCombinedKeyTaskFailedHandler 将失败任务重新投递成延迟任务。
|
◆
增量数据入口
我们回到服务注册,服务注册的InstanceController.
register
() 就是数据入口,它会调用ServiceManager.
registerInstance
(),执行数据同步的时候,调用
addInstance
() ,在该方法中会执行 DistroConsistencyServiceImpl.
put
(),该方法是增量同步的入口,会调用distroProtocol.
sync
()方法,代码如下:
/***
* 数据保存
*
@param
key
key of data, this key should be globally unique
*
@param
value
value of data
*
@throws
NacosException
*/
@Overridesync()
public void
put(String key, Record value)
throws
NacosException {
//将数据存入到dataStore中
onPut(key, value);
//使用distroProtocol同步数据
distroProtocol.sync(
new
DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX),
DataOperation.CHANGE, globalConfig.getTaskDispatchPeriod() /
2
);
}
|
sync
()
方法会执行任务发布,代码如下:
public void
sync(DistroKey distroKey, DataOperation action,
long
delay) {
//向除了自己外的所有节点广播
for
(Member each : memberManager.allMembersWithoutSelf()) {
DistroKey distroKeyWithTarget =
new
DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(), each.getAddress());
DistroDelayTask distroDelayTask =
new
DistroDelayTask(distroKeyWithTarget, action, delay);
//从distroTaskEngineHolder获取延时执行引擎,并将distroDelayTask任务添加进来
//执行延时任务发布
distroTaskEngineHolder.getDelayTaskExecuteEngine().
addTask(distroKeyWithTarget, distroDelayTask);
if
(Loggers.DISTRO.isDebugEnabled()) {
Loggers.DISTRO.debug(
“[DISTRO-SCHEDULE] {} to {}”
, distroKey, each.getAddress());
}
}
}
|
◆
增量同步操作
延迟任务
对象我们可以从
DistroTaskEngineHolder
构造函数中得知是
DistroDelayTaskProcessor
,代码如下:
/***
* 构造函数指定任务处理器
*
@param
distroComponentHolder
*/
public
DistroTaskEngineHolder(DistroComponentHolder distroComponentHolder) {
DistroDelayTaskProcessor defaultDelayTaskProcessor =
new
DistroDelayTaskProcessor(
this
, distroComponentHolder);
//指定任务处理器defaultDelayTaskProcessor
delayTaskExecuteEngine.setDefaultTaskProcessor(defaultDelayTaskProcessor);
}
|
它延迟执行的时候会执行
process
方法,该方法正是执行数据同步的地方,它会执行DistroSyncChangeTask任务,代码如下:
/***
* 任务处理过程
*
@param
task
task.
*
@return
*/
@Override
public boolean
process(NacosTask task) {
if
(!(task
instanceof
DistroDelayTask)) {
return true
;
}
DistroDelayTask distroDelayTask = (DistroDelayTask) task;
DistroKey distroKey = distroDelayTask.getDistroKey();
if
(DataOperation.CHANGE.equals(distroDelayTask.getAction())) {
//将延迟任务变更成异步任务,异步任务对象是一个线程
DistroSyncChangeTask syncChangeTask =
new
DistroSyncChangeTask(distroKey, distroComponentHolder);
//将任务添加到NacosExecuteTaskExecuteEngine中,并执行
distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, syncChangeTask);
return true
;
}
return false
;
}
|
DistroSyncChangeTask
实质上是任务的开始,它自身是一个线程,所以会执行它的
run
方法,而
run
方法这是数据同步操作,代码如下:
/***
* 执行数据同步
*/
@Override
public void
run() {
Loggers.DISTRO.info(
“[DISTRO-START] {}”
, toString());
try
{
//获取本地缓存数据
String type = getDistroKey().getResourceType();
DistroData distroData = distroComponentHolder.findDataStorage(type).getDistroData(getDistroKey());
distroData.setType(DataOperation.CHANGE);
//向其他节点同步数据
boolean
result = distroComponentHolder.findTransportAgent(type).syncData(distroData, getDistroKey().getTargetServer());
if
(!result) {
handleFailedTask();
}
Loggers.DISTRO.info(
“[DISTRO-END] {} result: {}”
, toString(), result);
}
catch
(Exception e) {
Loggers.DISTRO.warn(
“[DISTRO] Sync data change failed.”
, e);
handleFailedTask();
}
}
|
数据同步会执行调用
syncData
,该方法其实就是通过
Http
协议将数据发送到其他节点实现数据同步,代码如下:
/***
* 向其他节点同步数据
*
@param
data
data
*
@param
targetServer
target server
*
@return
*/
@Override
public boolean
syncData(DistroData data, String targetServer) {
if
(!memberManager.hasMember(targetServer)) {
return true
;
}
//获取数据字节数组
byte
[] dataContent = data.getContent();
//通过Http协议同步数据
return
NamingProxy.syncData(dataContent, data.getDistroKey().getTargetServer());
}
|
◆
详细增量数据同步(单机)
Distro
一致性算法,临时数据一致性数据结构
DataStore
,临时数据会被存储到这个数据结构中。
/**
* Store of data.
* Distro 临时数据一致性 数据存储结构
*
@author
nkorange
*
@since
1.0.0
*/
@Component
public class
DataStore {
private
Map<String, Datum>
dataMap
=
new
ConcurrentHashMap<>(
1024
);
public void
put(String key, Datum value) {
dataMap
.put(key, value);
}
public
Datum remove(String key) {
return
dataMap
.remove(key);
}
public
Set<String> keys() {
return
dataMap
.keySet();
}
public
Datum get(String key) {
return
dataMap
.get(key);
}
public boolean
contains(String key) {
return
dataMap
.containsKey(key);
}
/**
* Batch get datum for a list of keys.
*
*
@param
keys
of datum
*
@return
list of datum
*/
public
Map<String, Datum> batchGet(List<String> keys) {
Map<String, Datum> map =
new
HashMap<>(
128
);
for
(String key : keys) {
Datum datum =
dataMap
.get(key);
if
(datum ==
null
) {
continue
;
}
map.put(key, datum);
}
return
map;
}
public int
getInstanceCount() {
int
count =
0
;
for
(Map.Entry<String, Datum> entry :
dataMap
.entrySet()) {
try
{
Datum instancesDatum = entry.getValue();
if
(instancesDatum.
value
instanceof
Instances) {
count += ((Instances) instancesDatum.
value
).getInstanceList().size();
}
}
catch
(Exception ignore) {
}
}
return
count;
}
public
Map<String, Datum> getDataMap() {
return
dataMap
;
}
}
|
1)添加任务
单机数据同步,首先要添加任务。服务注册会请求InstanceController
post
方法注册服务数据。
/**
* Register new instance.
* 服务数据注册
*
@param
request
http request
*
@return
‘ok’ if success
*
@throws
Exception any error during register
*/
@CanDistro
//Distro协议(数据临时一致性协议)
@PostMapping
@Secured
(parser = NamingResourceParser.
class
, action = ActionTypes.
WRITE
)
public
String
register
(HttpServletRequest request)
throws
Exception {
final
String namespaceId = WebUtils
.
optional
(request, CommonParams.
NAMESPACE_ID
, Constants.
DEFAULT_NAMESPACE_ID
);
final
String serviceName = WebUtils.
required
(request, CommonParams.
SERVICE_NAME
);
NamingUtils.
checkServiceNameFormat
(serviceName);
final
Instance instance = parseInstance(request);
// 注册服务实例
serviceManager
.registerInstance(namespaceId, serviceName, instance);
return
“ok”
;
}
|
注册服务数据
public void
registerInstance(String namespaceId, String serviceName, Instance instance)
throws
NacosException {
createEmptyService(namespaceId, serviceName, instance.isEphemeral());
Service service = getService(namespaceId, serviceName);
if
(service ==
null
) {
throw new
NacosException(NacosException.
INVALID_PARAM
,
“service not found, namespace: ”
+ namespaceId +
“, service: ”
+ serviceName);
}
// 添加服务实例
addInstance
(namespaceId, serviceName, instance.isEphemeral(), instance);
}
|
添加服务实例
/**
* Add instance to service.
* 将实例添加到服务
*
@param
namespaceId
namespace
*
@param
serviceName
service name
*
@param
ephemeral
whether instance is ephemeral
*
@param
ips
instances
*
@throws
NacosException nacos exception
*/
public void
addInstance(String namespaceId, String serviceName,
boolean
ephemeral, Instance… ips)
throws
NacosException {
String key = KeyBuilder.
buildInstanceListKey
(namespaceId, serviceName, ephemeral);
Service service = getService(namespaceId, serviceName);
// 同步 防止高并发 这种同步锁效率低
synchronized
(service) {
List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);
Instances instances =
new
Instances();
instances.setInstanceList(instanceList);
consistencyService
.
put
(key, instances);
}
}
|
Put
到中DistroConsistencyServiceImpl
执行数据同步。
@Override
public void
put(String key, Record value)
throws
NacosException {
onPut(key, value);
// 数据同步
distroProtocol
.
sync
(
new
DistroKey(key, KeyBuilder.
INSTANCE_LIST_KEY_PREFIX
), DataOperation.
CHANGE
,
globalConfig
.getTaskDispatchPeriod() /
2
);
}
|
同步数据到远端的服务
/**
* Start to sync data to all remote server.
* 同步远端服务数据
*
@param
distroKey
distro key of sync data
*
@param
action
the action of data operation
*/
public void
sync(DistroKey distroKey, DataOperation action,
long
delay) {
// 向除了自己以
外的所有
节点广播
for
(Member each :
memberManager
.allMembersWithoutSelf()) {
DistroKey distroKeyWithTarget =
new
DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(),
each.getAddress());
// 创建 Distro定时任务
DistroDelayTask distroDelayTask =
new
DistroDelayTask(distroKeyWithTarget, action, delay);
// 添加定时任务
distroTaskEngineHolder
// 获取延迟执行引擎
.getDelayTaskExecuteEngine()
// 添加延迟任务
.
addTask
(distroKeyWithTarget, distroDelayTask);
if
(Loggers.
DISTRO
.isDebugEnabled()) {
Loggers.
DISTRO
.debug(
“[DISTRO-SCHEDULE] {} to {}”
, distroKey, each.getAddress());
}
}
}
|
创建
Distro
定时任务,可以看到里面有一些定时任务信息。
public class
DistroDelayTask
extends
AbstractDelayTask {
private final
DistroKey
distroKey
;
private
DataOperation
action
;
private long
createTime
;
public
DistroDelayTask(DistroKey distroKey,
long
delayTime) {
this
(distroKey, DataOperation.
CHANGE
, delayTime);
}
public
DistroDelayTask(DistroKey distroKey, DataOperation action,
long
delayTime) {
this
.
distroKey
= distroKey;
this
.
action
= action;
this
.
createTime
= System.
currentTimeMillis
();
setLastProcessTime(
createTime
);
setTaskInterval(delayTime);
}
public
DistroKey getDistroKey() {
return
distroKey
;
}
public
DataOperation getAction() {
return
action
;
}
public long
getCreateTime() {
return
createTime
;
}
@Override
public void
merge(AbstractDelayTask task) {
if
(!(task
instanceof
DistroDelayTask)) {
return
;
}
DistroDelayTask newTask = (DistroDelayTask) task;
if
(!
action
.equals(newTask.getAction()) &&
createTime
< newTask.getCreateTime()) {
action
= newTask.getAction();
createTime
= newTask.getCreateTime();
}
setLastProcessTime(newTask.getLastProcessTime());
}
}
|
添加延迟任务,将任务存储到
task
(ConcurrentHashMap<Object, AbstractDelayTask>)
中,后面执行任务的时候会从这个
task
中获取任务。
@Override
public void
addTask(Object key, AbstractDelayTask newTask) {
// 同步锁 防止重复添加
lock
.lock();
try
{
AbstractDelayTask existTask =
tasks
.get(key);
if
(
null
!= existTask) {
newTask.merge(existTask);
}
// 任务存储 任务添加结束
tasks
.put(key, newTask);
}
finally
{
lock
.unlock();
}
}
|
2)任务执行
添加任务前DistroTaskEngineHolder会添加定时任务获取延迟执行引擎。
/**
* Start to sync data to all remote server.
* 同步远端服务数据
*
@param
distroKey
distro key of sync data
*
@param
action
the action of data operation
*/
public void
sync(DistroKey distroKey, DataOperation action,
long
delay) {
// 向除了自己以外的所有节点广播
for
(Member each :
memberManager
.allMembersWithoutSelf()) {
DistroKey distroKeyWithTarget =
new
DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(),
each.getAddress());
// 创建 Distro定时任务
DistroDelayTask distroDelayTask =
new
DistroDelayTask(distroKeyWithTarget, action, delay);
// 添加定时任务
distroTaskEngineHolder
// 获取延迟执行引擎
.get
DelayTask
ExecuteEngine()
// 添加延迟任务
.addTask(distroKeyWithTarget, distroDelayTask);
if
(Loggers.
DISTRO
.isDebugEnabled()) {
Loggers.
DISTRO
.debug(
“[DISTRO-SCHEDULE] {} to {}”
, distroKey, each.getAddress());
}
}
}
|
临时数据同步引擎,延迟执行任务。创建任务处理器并指定其处理任务。
/**
* Distro task engine holder.
* Doistro任务执行者
* 临时数据同步 引擎 延迟执行
*
@author
xiweng.yy
*/
@Component
public class
DistroTaskEngineHolder {
// 延迟执行 引擎
private final
DistroDelayTaskExecuteEngine
delayTaskExecuteEngine
=
new
DistroDelayTaskExecuteEngine();
private final
DistroExecuteTaskExecuteEngine
executeWorkersManager
=
new
DistroExecuteTaskExecuteEngine();
public
DistroTaskEngineHolder(DistroComponentHolder distroComponentHolder) {
// 任务处理器
DistroDelayTaskProcessor defaultDelayTaskProcessor =
new
DistroDelayTaskProcessor(
this
, distroComponentHolder);
// 指定任务处理器
delayTaskExecuteEngine
.setDefaultTaskProcessor(defaultDelayTaskProcessor);
}
public
DistroDelayTaskExecuteEngine getDelayTaskExecuteEngine() {
return
delayTaskExecuteEngine
;
}
public
DistroExecuteTaskExecuteEngine getExecuteWorkersManager() {
return
executeWorkersManager
;
}
public void
registerNacosTaskProcessor(Object key, NacosTaskProcessor nacosTaskProcessor) {
this
.
delayTaskExecuteEngine
.addProcessor(key, nacosTaskProcessor);
}
}
|
跟踪延迟执行引擎创建,可以看到其父类
NacosDelayTaskExecuteEngine
创建了延迟执行引擎。
/**
* 创建任务执行器
*/
public
NacosDelayTaskExecuteEngine(String name,
int
initCapacity, Logger logger,
long
processInterval) {
super
(logger);
tasks
=
new
ConcurrentHashMap<Object, AbstractDelayTask>(initCapacity);
processingExecutor
= ExecutorFactory.
newSingleScheduledExecutorService
(
new
NameThreadFactory(name));
processingExecutor
// 定时执行ProcessRunnable任务
.scheduleWithFixedDelay(
new
ProcessRunnable
(), processInterval, processInterval, TimeUnit.
MILLISECONDS
);
}
|
定时执行器中创建了一个执行线程,会定时去执行这个线程
ProcessRunnable
/**
* 定时任务执行的线程
*/
private class
ProcessRunnable
implements
Runnable {
@Override
public void
run() {
try
{
processTasks
();
}
catch
(Throwable e) {
getEngineLog().error(e.toString(), e);
}
}
}
|
线程回调NacosDelayTaskExecuteEngine中的
processTasks
方法
/**
* process tasks in execute engine.
*/
protected void
processTasks() {
Collection<Object> keys = getAllTaskKeys();
for
(Object taskKey : keys) {
// 从task中获取任务 这个任务
在任务添加时
已经添加到了task中
AbstractDelayTask task =
removeTask
(taskKey);
if
(
null
== task) {
continue
;
}
// 获取任务执行对象 DistroDelayTaskProcessor
// 先从缓存中获取 缓存中没有就获取前面创建的对象 这个对象在DistroTaskEngineHolder创建的时候已经被创建了
NacosTaskProcessor processor =
getProcessor
(taskKey);
if
(
null
== processor) {
getEngineLog().error(
“processor not found for task, so discarded. ”
+ task);
continue
;
}
try
{
// ReAdd task if process failed
if
(!processor.
process
(task)) {
// 存储任务
retryFailedTask
(taskKey, task);
}
}
catch
(Throwable e) {
getEngineLog().error(
“Nacos task execute error : ”
+ e.toString(), e);
retryFailedTask(taskKey, task);
}
}
}
|
任务执行成功后会将任务存储到服务器缓存中。
private void
retryFailedTask(Object key, AbstractDelayTask task) {
// 设置任务进程时间
task.setLastProcessTime(System.
currentTimeMillis
());
// 存储任务
addTask(key, task);
}
@Override
public void
addTask(Object key, AbstractDelayTask newTask) {
// 同步锁 防止重复添加
lock
.lock();
try
{
// 先获取任务
AbstractDelayTask existTask =
tasks
.get(key);
// 如果任务不为空 合并更新任务
if
(
null
!= existTask) {
newTask.merge(existTask);
}
// 任务存储 任务添加结束
tasks
.put(key, newTask);
}
finally
{
lock
.unlock();
}
}
|
前面做了
3
件事,获取任务、任务执行对象、执行任务。
获取任务
/**
* 获取一个任务 并将任务从tasks中移除
*/
@Override
public
AbstractDelayTask removeTask(Object key) {
lock
.lock();
try
{
AbstractDelayTask task =
tasks
.get(key);
if
(
null
!= task && task.shouldProcess()) {
return
tasks
.remove(key);
}
else
{
return null
;
}
}
finally
{
lock
.unlock();
}
}
|
获取任务执行对象
private final
ConcurrentHashMap<Object, NacosTaskProcessor>
taskProcessors
=
new
ConcurrentHashMap<Object, NacosTaskProcessor>();
@Override
public
NacosTaskProcessor getProcessor(Object key) {
return
taskProcessors
.containsKey(key) ?
taskProcessors
.get(key) :
defaultTaskProcessor
;
}
|
DistroDelayTaskProcessor执行任务
/**
* 任务执行方法
*/
@Override
public boolean
process(NacosTask task) {
if
(!(task
instanceof
DistroDelayTask)) {
return true
;
}
DistroDelayTask distroDelayTask = (DistroDelayTask) task;
DistroKey distroKey = distroDelayTask.getDistroKey();
if
(DataOperation.
CHANGE
.equals(distroDelayTask.getAction())) {
// 创建数据同步快照
后面回调用它的run方法
DistroSyncChangeTask syncChangeTask =
new
DistroSyncChangeTask(distroKey,
distroComponentHolder
);
// 同步快照任务
distroTaskEngineHolder
.getExecuteWorkersManager().
addTask
(distroKey, syncChangeTask);
return true
;
}
return false
;
}
|
同步任务继续执行同步
/**
* 执行同步任务
*/
@Override
public void
addTask(Object tag, AbstractExecuteTask task) {
// 获取处理对象
NacosTaskProcessor processor = getProcessor(tag);
if
(
null
!= processor) {
// 处理任务
processor.process(task);
return
;
}
// 获取任务
TaskExecuteWorker worker = getWorker(tag);
// 添加任务
worker.process(task);
}
@Override
public boolean
process(NacosTask task) {
if
(task
instanceof
AbstractExecuteTask) {
// 添加任务
putTask((Runnable) task);
}
return true
;
}
/**
* 获取任务执行器
*/
private
TaskExecuteWorker getWorker(Object tag) {
int
idx = (tag.hashCode() & Integer.
MAX_VALUE
) % workersCount();
// 任务执行器在 创建DistroTaskEngineHolder对象时创建DistroExecuteTaskExecuteEngine时被创建
return
executeWorkers
[idx];
}
|
将任务添加到队列中
/**
* 将同步任务对象添加到队列(BlockingQueue)中
*/
private void
putTask(Runnable task) {
try
{
// 添加DistroSyncChangeTask(前面创建的)到队列中
queue
.put(task);
}
catch
(InterruptedException ire) {
log
.error(ire.toString(), ire);
}
}
|
在前面创建DistroTaskEngineHolder对象时,会初始化DistroExecuteTaskExecuteEngine,初始化时会创建其父类
TaskExecuteWorker
。
public
TaskExecuteWorker(
final
String name,
final int
mod,
final int
total,
final
Logger logger) {
this
.
name
= name +
“_”
+ mod +
“%”
+ total;
this
.
queue
=
new
ArrayBlockingQueue<Runnable>(
QUEUE_CAPACITY
);
this
.
closed
=
new
AtomicBoolean(
false
);
this
.
log
=
null
== logger ? LoggerFactory.
getLogger
(TaskExecuteWorker.
class
) : logger;
new
InnerWorker
(name).start();
}
|
这里会创建一个线程,此时从队列中取出前面放进队列中的同步任务对象, 去执行任务对象
DistroSyncChangeTask
数据同步对象的
run
方法。
/**
* Inner execute worker.
*/
private class
InnerWorker
extends
Thread {
InnerWorker(String name) {
setDaemon(
false
);
setName(name);
}
@Override
public void
run() {
while
(!
closed
.get()) {
try
{
// 从队列取出任务
Runnable task =
queue
.take();
long
begin = System.
currentTimeMillis
();
// 执行任务对象 DistroSyncChangeTask 数据同步对象
task.run();
long
duration = System.
currentTimeMillis
() – begin;
if
(duration >
1000L
) {
log
.warn(
“distro task {} takes {}ms”
, task, duration);
}
}
catch
(Throwable e) {
log
.error(
“[DISTRO-FAILED] ”
+ e.toString(), e);
}
}
}
}
|
DistroSyncChangeTask
是一个线程,由
InnerWorker
调用它的
run
方法,执行远程调用同步数据。
@Override
public void
run() {
Loggers.
DISTRO
.info(
“[DISTRO-START] {}”
, toString());
try
{
String type = getDistroKey().getResourceType();
// 查找数据
DistroData distroData =
distroComponentHolder
.findDataStorage(type).getDistroData(getDistroKey());
distroData.setType(DataOperation.
CHANGE
);
// 向其他节点同步数据
boolean
result =
distroComponentHolder
.findTransportAgent(type).
syncData
(distroData, getDistroKey().getTargetServer());
if
(!result) {
handleFailedTask();
}
Loggers.
DISTRO
.info(
“[DISTRO-END] {} result: {}”
, toString(), result);
}
catch
(Exception e) {
Loggers.
DISTRO
.warn(
“[DISTRO] Sync data change failed.”
, e);
handleFailedTask();
}
}
|
同步数据
/**
* 数据同步
*/
@Override
public boolean
syncData(DistroData data, String targetServer) {
if
(!
memberManager
.hasMember(targetServer)) {
return true
;
}
byte
[] dataContent = data.getContent();
// 远程同步请求
return
NamingProxy.
syncData
(dataContent, data.getDistroKey().getTargetServer());
}
|
代理对象远程同步数据
/**
* Synchronize datum to target server.
* 同步数据到目标服务器
*
*
@param
data
datum
*
@param
curServer
target server address
*
@return
true if sync successfully, otherwise false
*/
public static boolean
syncData(
byte
[] data, String curServer) {
Map<String, String> headers =
new
HashMap<>(
128
);
headers.put(HttpHeaderConsts.
CLIENT_VERSION_HEADER
, VersionUtils.
version
);
headers.put(HttpHeaderConsts.
USER_AGENT_HEADER
, UtilsAndCommons.
SERVER_VERSION
);
headers.put(HttpHeaderConsts.
ACCEPT_ENCODING
,
“gzip,deflate,sdch”
);
headers.put(HttpHeaderConsts.
CONNECTION
,
“Keep-Alive”
);
headers.put(HttpHeaderConsts.
CONTENT_ENCODING
,
“gzip”
);
try
{
// http发送同步数据请求 /v1/ns/distro/datum
RestResult<String> result = HttpClient.
httpPutLarge
(
“http://”
+ curServer + EnvUtil.
getContextPath
() + UtilsAndCommons.
NACOS_NAMING_CONTEXT
+
DATA_ON_SYNC_URL
, headers, data);
if
(result.ok()) {
return true
;
}
if
(HttpURLConnection.
HTTP_NOT_MODIFIED
== result.getCode()) {
return true
;
}
throw new
IOException(
“failed to req API:”
+
“http://”
+ curServer + EnvUtil.
getContextPath
()
+ UtilsAndCommons.
NACOS_NAMING_CONTEXT
+
DATA_ON_SYNC_URL
+
“. code:”
+ result.getCode() +
” msg: ”
+ result.getData());
}
catch
(Exception e) {
Loggers.
SRV_LOG
.warn(
“NamingProxy”
, e);
}
return false
;
}
|
H
ttp
发送同步数据请求 /v1/ns/distro/datum调用DistroController开始同步数据。
/**
* Synchronize datum.
* 数据同步(临时)
*
@param
dataMap
data map
*
@return
‘ok’ if success
*
@throws
Exception if failed
*/
@PutMapping
(
“/datum”
)
public
ResponseEntity onSyncDatum(
@RequestBody
Map<String, Datum<Instances>> dataMap)
throws
Exception {
if
(dataMap.isEmpty()) {
Loggers.
DISTRO
.error(
“[onSync] receive empty entity!”
);
throw new
NacosException(NacosException.
INVALID_PARAM
,
“receive empty entity!”
);
}
for
(Map.Entry<String, Datum<Instances>> entry : dataMap.entrySet()) {
if
(KeyBuilder.
matchEphemeralInstanceListKey
(entry.getKey())) {
String namespaceId = KeyBuilder.
getNamespace
(entry.getKey());
String serviceName = KeyBuilder.
getServiceName
(entry.getKey());
if
(!
serviceManager
.containService(namespaceId, serviceName) &&
switchDomain
.isDefaultInstanceEphemeral()) {
serviceManager
.createEmptyService(namespaceId, serviceName,
true
);
}
DistroHttpData distroHttpData =
new
DistroHttpData(createDistroKey(entry.getKey()), entry.getValue());
distroProtocol
.
onReceive
(distroHttpData);
}
}
return
ResponseEntity.
ok
(
“ok”
);
}
|
接收同步的发布数据,找到处理器处理。
/**
* Receive synced distro data, find processor to process.
* 接收同步的发布数据,找到处理器处理
*
@param
distroData
Received data
*
@return
true if handle receive data successfully, otherwise false
*/
public boolean
onReceive(DistroData distroData) {
String resourceType = distroData.getDistroKey().getResourceType();
DistroDataProcessor dataProcessor =
distroComponentHolder
.findDataProcessor(resourceType);
if
(
null
== dataProcessor) {
Loggers.
DISTRO
.warn(
“[DISTRO] Can’t find data process for received data {}”
, resourceType);
return false
;
}
return
dataProcessor.
processData
(distroData);
}
|
数据处理
/**
* 数据处理
*/
@Override
public boolean
processData(DistroData distroData) {
DistroHttpData distroHttpData = (DistroHttpData) distroData;
Datum<Instances> datum = (Datum<Instances>) distroHttpData.getDeserializedContent();
// 数据同步 将数据添加到dataStore中
onPut(datum.
key
, datum.
value
);
return true
;
}
|
数据同步 将数据添加到
dataStore
中
/**
* Put a new record.
*
*
@param
key
key of record
*
@param
value
record
*/
public void
onPut(String key, Record value) {
if
(KeyBuilder.
matchEphemeralInstanceListKey
(key)) {
Datum<Instances> datum =
new
Datum<>();
datum.
value
= (Instances) value;
datum.
key
= key;
datum.
timestamp
.incrementAndGet();
dataStore
.put(key, datum);
}
if
(!
listeners
.containsKey(key)) {
return
;
}
notifier
.addTask(key, DataOperation.
CHANGE
);
}
|
Distro
临时数据一致性 数据存储结构,添加进这个数据模型中。
public void
put(String key, Datum value) {
// key:Group@服务名称
dataMap
.put(key, value);
}
|
2
.0.
Sentinel Dashboard数据持久化
Sentinel
的理念是开发者只需要关注资源的定义,当资源定义成功后可以动态增加各种流控降级规则。
Sentinel
提供两种方式修改规则:
•
通过
API
直接修改( loadRules )
•
通过
DataSource
适配不同数据源修改
手动通过
API
修改比较直观,可以通过以下几个
API
修改不同的规则:
FlowRuleManager.
loadRules
(List<FlowRule> rules); // 修改流控规则
DegradeRuleManager.
loadRules
(List<DegradeRule> rules); // 修改降级规则
|
手动修改规则
(
硬编码方式
)
一般仅用于
测试和演示
,生产上一般通过动态规则源的方式来动态管理规则。
2.1
.
动态配置原理
loadRules
()方法只接受内存态的规则对象,但更多时候规则存储在文件、数据库或者配置中心当中。
DataSource
接口给我们提供了对接任意配置源的能力。相比直接通过
API
修改规则,实现
DataSource
接口是更加可靠的做法。
我们推荐通过控制台设置规则后将规则推送到统一的规则中心,客户端实现
ReadableDataSource
接口端监听规则中心实时获取变更,流程如下:
DataSource
扩展常见的实现方式有:
•
拉模式
:客户端主动向某个规则管理中心定期
轮询拉取
规则,这个规则中心可以是
RDBMS
、文件,甚至是
VCS
等。这样做的方式是
简单
,缺点是
无法及时获取变更
;
•
推模式
:规则中心统一推送,客户端通过注册监听器的方式时刻监听变化,比如使用
Nacos
、
Zookeeper
等配置中心。这种方式有更好的实时性和一致性保证。
Sentinel
目前支持以下数据源扩展:
•
Pull-based: 动态文件数据源、
Consul
,
Eureka
•
Push-based:
ZooKeeper
,
Redis
,
Nacos
,
Apollo
,
etcd
2.2
.
Sentinel+Nacos数据持久化
我们要想实现
Sentinel
+
Nacos
数据持久化,需要下载
Sentinel
控制台源码,关于源码下载我们这里就不再重复了。
在
Sentinel Dashboard
中配置规则之后重启应用就会丢失,所以实际生产环境中需要配置规则的持久化实现,
Sentinel
提供多种不同的数据源来持久化规则配置,包括
file
,
redis
、
nacos
、
zk
。
这就需要涉及到
Sentinel Dashboard
的规则管理及推送功能:集中管理和推送规则。
sentinel
–
core
提供
API
和扩展接口来接收信息。开发者需要根据自己的环境,选取一个可靠的推送规则方式;同时,规则最好在控制台中集中管理。
我们采用
Push
模式,即
Sentinel
–
Dashboard
统一管理配置,然后将规则统一推送到
Nacos
并持久化
(
生成配置文件
)
,最后客户端监听
Nacos
(
这一部了解使用过
Nacos
的话应该很熟,采用ConfifigService.
getConfg
()方法获取配置文件
)
,下发配置生成
Rule
。
这张图的意思我们解释说明一下:
1
:
Sentinel Dashboard界面配置流控规则—发布/推送—>Nacos生成配置文件并持久化;
2
:
通过Nacos配置文件修改流控规则—拉取—>Sentinel Dashboard界面显示最新的流控规则。
|
在
Nacos
控制台上修改流控制,虽然可以同步到
Sentinel Dashboard
,但是
Nacos
此时应该作为一个流控规则的持久化平台,所以正常操作过程应该是开发者在
Sentinel Dashboard
上修改流控规则后同步到
Nacos
,遗憾的是目前
Sentinel Dashboard
不支持该功能。
如果公司没有统一在
Sentinel Dashboard
或
Nacos
中二选一进行配置,而是一会在
Sentinel Dashboard
配置,一会在
Nacos
配置。那么就会出现很严重的问题
(
流控规则达不到预期,配置数据不一致
)
,所以推荐使用
Sentinel Dashboard
统一界面进行配置管理流控规则。
我们接下来基于
Sentinel
1.8.1开始改造
Sentinel Dashboard
,使他能结合
Nacos
实现数据持久化。
2.2.1
.
Dashboard改造分析
Sentinel Dashboard
的流控规则下的所有操作,都会调用
Sentinel
–
Dashboard
源码中的
FlowController
V1类,这个类中包含流控规则本地化的
CRUD
操作;
在com.alibaba.csp.sentinel.dashboard.controller.v2包下存在一个
FlowController
V2;类,这个类同样提供流控规则的
CURD
,与V1不同的是,它可以实现指定数据源的规则拉取和发布。
上面代码就是
FlowController
V2 部分代码,分别实现了拉取规则和推送规则:
1
:
DynamicRuleProvider:动态规则的拉取,从指定数据源中获取控制后在Sentinel Dashboard中展示。
2
:
DynamicRulePublisher:动态规则发布,将在Sentinel Dashboard中修改的规则同步到指定数据源中。
|
我们只需要扩展这两个类,然后集成
Nacos
来实现
Sentinel Dashboard
规则同步。
2.2.2
.
页面改造
在目录resources/app/scripts/directives/sidebar找到
sidebar
.html,里面有关于V1版本的请求入口:
<li
ui-sref-active
=
“active”
ng-if
=
“!entry.isGateway”
>
<a
ui-sref
=
“dashboard.flowV1({app: entry.app})”
>
<i
class
=
“glyphicon glyphicon-filter”
></i>
流控规则
</a>
</li>
|
对应的
JS
(
app.js
)
请求如下,可以看到请求就是
V
1版本的
Controller
,那么之后的改造需要重新对应
V
2版本的
Controller
。
.
state
(
‘dashboard.flowV1’
, {
templateUrl
:
‘app/views/flow_v1.html’
,
url
:
‘/flow/:app’
,
controller
:
‘FlowControllerV
1
‘
,
resolve
: {
loadMyFiles
: [
‘$ocLazyLoad’
,
function
(
$ocLazyLoad
) {
return
$ocLazyLoad
.
load
({
name
:
‘sentinelDashboardApp’
,
files
: [
‘app/scripts/controllers/flow_v1.js’
,
]
});
}]
}
})
|
2.2.3
.
Nacos配置
在源码中虽然官方提供了
test
示例
(
即
test
目录
)
下关于
Nacos
等持久化示例,但是具体的实现还需要一些细节,比如在
Sentinel Dashboard
配置
Nacos
的
serverAddr
、
namespace
、
groupId
,并且通过
Nacos
获取配置文件获取服务列表等。
我们可以打开
NacosConfig
源码,
NacosConfig
中ConfigFactory.
createConfigService
(“localhost”) 并没有实现创建具体的
nacos config
service
,而是默认 localhost ,application.properties文件中也没有
Nacos
的相关配置,这些都需要我们额外配置,
NacosConfig
代码如下:
@Configuration
public class
NacosConfig {
@Bean
public
Converter<List<FlowRuleEntity>, String> flowRuleEntityEncoder() {
return
JSON::toJSONString;
}
@Bean
public
Converter<String, List<FlowRuleEntity>> flowRuleEntityDecoder() {
return
s -> JSON.parseArray(s, FlowRuleEntity.
class
);
}
@Bean
public
ConfigService nacosConfigService()
throws
Exception {
return
ConfigFactory.createConfigService(
“localhost”
);
}
}
|
如果我们需要把数据存储到
Nacos
,在
NacosConfig
U
tils
已经指定了默认的流控规则配置文件的
groupId
等,如果需要指定的话这里也需要修改.
public final class
NacosConfigUtil {
//Nacos中对应的GroupID
public static final
String
GROUP_ID
=
“SENTINEL_GROUP”
;
//文件后半部分
public static final
String
FLOW_DATA_ID_POSTFIX
=
“-flow-rules”
;
public static final
String
PARAM_FLOW_DATA_ID_POSTFIX
=
“-param-rules”
;
public static final
String
CLUSTER_MAP_DATA_ID_POSTFIX
=
“-cluster-map”
;
//略…
}
|
我们在
application
.properties中配置
Nacos
:
spring.cloud.sentinel.datasource.flow.nacos.server-addr
=
nacos
:
8848
spring.cloud.sentinel.datasource.flow.nacos.data-id
=
${spring.application.name}-flow-rules
spring.cloud.sentinel.datasource.flow.nacos.group-id
=
SENTINEL_GROUP
spring.cloud.sentinel.datasource.flow.nacos.data-type
=
json
spring.cloud.sentinel.datasource.flow.nacos.rule-type
=
flow
|
2.3.4
.
Dashboard持久化改造
我们接下来开始改造
Dashboard
源码,官方提供的
Nacos
持久化用例都是在
test
目录下,所以
scope
需要去除
test
,需要sentinel-datasource-nacos包的支持。之后将修改好的源码放在源码主目录com.alibaba.csp.sentinel.dashboard下,而不是继续在
test
目录下。
<!– for Nacos rule publisher sample –>
<dependency>
<groupId>
com.alibaba.csp
</groupId>
<artifactId>
sentinel-datasource-nacos
</artifactId>
<!–<scope>test</scope>–>
</dependency>
|
找到resources/app/scripts/directives/sidebar/
sidebar
.html文件修改,修改flflowV1为flflow,去掉V1,这样的话会调用
FlowController
V2接口
修改前:
<li
ui-sref-active
=
“active”
ng-if
=
“!entry.isGateway”
>
<a
ui-sref
=
“dashboard.flowV1({app: entry.app})”
>
<i
class
=
“glyphicon glyphicon-filter”
>
</i>
流控规则
</a>
</li>
|
修改后:
<li
ui-sref-active
=
“active”
ng-if
=
“!entry.isGateway”
>
<a
ui-sref
=
“dashboard.flow({app: entry.app})”
>
<i
class
=
“glyphicon glyphicon-filter”
>
</i>
流控规则
</a>
</li>
|
这样就可以通过
js
跳转至
FlowControllerV2
了,
app
.js代码如下:
.
state
(
‘dashboard.flow’
, {
templateUrl
:
‘app/views/flow_v2.html’
,
url
:
‘/v2/flow/:app’
,
controller
:
‘FlowControllerV2’
,
resolve
: {
loadMyFiles
: [
‘$ocLazyLoad’
,
function
(
$ocLazyLoad
) {
return
$ocLazyLoad
.
load
({
name
:
‘sentinelDashboardApp’
,
files
: [
‘app/scripts/controllers/flow_v2.js’
,
]
});
}]
}
})
|
2.3.5
.
Nacos配置创建
我们采用官方的约束,即默认
Nacos
适配的
dataId
和
groupId
约定,所以不需要修改
NacosConfigUtil
.java了,配置如下:
groupId
:
SENTINEL_GROUP
流控规则 dataId
:
{appName}-flow-rules,比如应用名为 appA,则 dataId 为 ap
pA-flow- rules
|
我们在
application
.properties 中配置
Nacos
服务信息:
# nacos config server
sentinel.nacos.serverAddr
=
nacos
:
8848
sentinel.nacos.namespace
=
sentinel.nacos.group-id
=
SENTINEL_GROUP
sentinel.nacos.
password=
nacos
sentinel.nacos.
username=
nacos
|
接下来创建读取
nacos
配置的NacosPropertiesConfiguration文件并且
application
.properties指定配置
。
@ConfigurationProperties
(
prefix
=
“sentinel.nacos”
)
public class
NacosPropertiesConfiguration
{
private
String
serverAddr
;
private
String
dataId
;
private
String
groupId
=
“SENTINEL_GROUP”
;
// 默认分组
private
String
namespace
;
private
String
user
name
;
private
String
password
;
//get set
}
|
2.3.6
.
改造源码
2.3.6.1
.
改造NacosConfifig
我们最后改造
NacosConfig
,让
NacosConfig
做两件事:
1
)
注入
Convert
转换器,将
FlowRuleEntity
转化成
FlowRule
,以及反向转化
2
)
注入
Nacos
配置服务
ConfifigService
@EnableConfigurationProperties(NacosPropertiesConfiguration.
class
)
@Configuration
public class
NacosConfig {
@Bean
public
Converter<List<FlowRuleEntity>, String> flowRuleEntityEncoder() {
return
JSON::toJSONString;
}
@Bean
public
Converter<String, List<FlowRuleEntity>> flowRuleEntityDecoder() {
return
s -> JSON.parseArray(s, FlowRuleEntity.
class
);
}
@Bean
public
ConfigService nacosConfigService(NacosPropertiesConfiguration nacosPropertiesConfiguration)
throws
Exception {
Properties properties =
new
Properties();
properties.put(PropertyKeyConst.SERVER_ADDR,
nacosPropertiesConfiguration.getServerAddr());
properties.put(PropertyKeyConst.NAMESPACE,
nacosPropertiesConfiguration.getNamespace());
properties.put(PropertyKeyConst.USERNAME,
nacosPropertiesConfiguration.getUwername());
properties.put(PropertyKeyConst.PASSWORD,
nacosPropertiesConfiguration.getPassword());
return
ConfigFactory.createConfigService(properties);
// return ConfigFactory.createConfigService(“localhost”);
}
}
|
2.3.6.2
.
动态获取流控规则
动态实现从
Nacos
配置中心获取流控规则需要重写
FlowRuleNacosProvider
与
FlowRuleNacosPublisher
类。
1)重写
FlowRuleNacosProvider
类
@Service(
“flowRuleNacosProvider”
)
public class
FlowRuleNacosProvider
implements
DynamicRuleProvider<List<FlowRuleEntity>> {
public static final
Logger
log
= LoggerFactory.getLogger(FlowRuleNacosProvider.
class
);
@Autowired
private
ConfigService
configService
;
@Autowired
private
Converter<String, List<FlowRuleEntity>>
converter
;
/**
* 1)通过ConfigService的getConfig()方法从Nacos Config Server读取指定配置信息
* 2)通过转为converter转化为FlowRule规则
*
@param
appName
*
@return
*
@throws
Exception
*/
@Override
public
List<FlowRuleEntity> getRules(String appName)
throws
Exception {
String rules =
configService
.getConfig(appName + NacosConfigUtil.FLOW_DATA_ID_POSTFIX, NacosConfigUtil.GROUP_ID,
3000
);
log
.info(
“obtain flow rules from nacos config:{}”
, rules);
if
(StringUtil.isEmpty(rules)) {
return new
ArrayList<>();
}
return
converter
.convert(rules);
}
}
|
2)重写
FlowRuleNacosPublisher
类:
@Service(
“flowRuleNacosPublisher”
)
public class
FlowRuleNacosPublisher
implements
DynamicRulePublisher<List<FlowRuleEntity>> {
public static final
Logger
log
= LoggerFactory.getLogger(FlowRuleNacosPublisher.
class
);
@Autowired
private
ConfigService
configService
;
@Autowired
private
Converter<List<FlowRuleEntity>, String>
converter
;
/**
* 通过configService的publishConfig()方法将rules发布到nacos
*
@param
app
app name
*
@param
rules
list of rules to push
*
@throws
Exception
*/
@Override
public void
publish(String app, List<FlowRuleEntity> rules)
throws
Exception {
AssertUtil.notEmpty(app,
“app name cannot be empty”
);
if
(rules ==
null
) {
return
;
}
log
.info(
“sentinel dashboard push rules: {}”
, rules);
configService
.publishConfig(app + NacosConfigUtil.FLOW_DATA_ID_POSTFIX, NacosConfigUtil.GROUP_ID,
converter
.convert(rules));
}
}
|
3)替换默认对象
修改
FlowController
V2类,使用@
Qulififier
将上面配置的两个类注入进来
2.3.6.3.
数据持久化测试
我们接下来将程序打包,如下图:
打包好程序后,再将程序运行起来:
java -Dserver.port=8858 -Dcsp.sentinel.dashboard.server=localhost:8858 -Dproject.name=sentinel-dashboard -jar sentinel-dashboard.jar
|
运行起来后,我们可以发现在
Nacos
中多了一个服务,如下图:
我们随意增加一个流控规则,如下图:
点击新增之后,我们可以发现
SENTILE
_
GROUP
组下多了一个文件,
sentinel
–
flow
–
rule
文件,效果如下:
2.3.系统规则定义
上面改造样子使用的限流规则,还有其他一些规则也可以去配置。
我们来配置一个系统服务规则:
在
NacosConfigUtil
中新增配置定义为系统规则配置”-system-rules”
public final class
NacosConfigUtil {
public static final
String
GROUP_ID
=
“SENTINEL_GROUP”
;
public static final
String
FLOW_DATA_ID_POSTFIX
=
“-flow-rules”
;
public static final
String
PARAM_FLOW_DATA_ID_POSTFIX
=
“-param-rules”
;
public static final
String
CLUSTER_MAP_DATA_ID_POSTFIX
=
“-cluster-map”
;
// 系统规则
public static final
String
SYSTEM_DATA_ID_POSTFIX
=
“-system-rules”
;
/**
* cc for `cluster-client`
*/
public static final
String
CLIENT_CONFIG_DATA_ID_POSTFIX
=
“-cc-config”
;
/**
* cs for `cluster-server`
*/
public static final
String
SERVER_TRANSPORT_CONFIG_DATA_ID_POSTFIX
=
“-cs-transport-config”
;
public static final
String
SERVER_FLOW_CONFIG_DATA_ID_POSTFIX
=
“-cs-flow-config”
;
public static final
String
SERVER_NAMESPACE_SET_DATA_ID_POSTFIX
=
“-cs-namespace-set”
;
private
NacosConfigUtil() {}
}
|
复制
FlowRuleNacosProvider
与
FlowRuleNacosPublisher
类
规则推送,并修改为系统规则。
SystemRuleNacosProvider:
@Component
(
“systemRuleNacosProvider”
)
public class
SystemRuleNacosProvider
implements
DynamicRuleProvider<List<SystemRuleEntity>> {
@Autowired
private
ConfigService
configService
;
@Autowired
private
Converter<String, List<SystemRuleEntity>>
converter
;
@Override
public
List<SystemRuleEntity> getRules(String appName)
throws
Exception {
String rules =
configService
.getConfig(appName + NacosConfigUtil.
SYSTEM_DATA_ID_POSTFIX
,
NacosConfigUtil.
GROUP_ID
,
3000
);
if
(StringUtil.
isEmpty
(rules)) {
return new
ArrayList<>();
}
return
converter
.convert(rules);
}
}
|
SystemRuleNacosPublisher:
@Component
(
“systemRuleNacosPublisher”
)
public class
SystemRuleNacosPublisher
implements
DynamicRulePublisher<List<SystemRuleEntity>> {
@Autowired
private
ConfigService
configService
;
@Autowired
private
Converter<List<SystemRuleEntity>, String>
converter
;
@Override
public void
publish(String app, List<SystemRuleEntity> rules)
throws
Exception {
AssertUtil.
notEmpty
(app,
“app name cannot be empty”
);
if
(rules ==
null
) {
return
;
}
configService
.publishConfig(app + NacosConfigUtil.
SYSTEM_DATA_ID_POSTFIX
,
NacosConfigUtil.
GROUP_ID
,
converter
.convert(rules));
}
}
|
规则转换器:在
NacosConfig
中添加规则转换器,即数据发送给nacos和接收nacos信息需要处理一下。
@Configuration
public class
NacosConfig {
@Value
(
“${nacos.address}”
)
private
String
address
;
@Value
(
“${nacos.namespace}”
)
private
String
namespace
;
@Value
(
“${nacos.username}”
)
private
String
username
;
@Value
(
“${nacos.password}”
)
private
String
password
;
@Bean
public
ConfigService nacosConfigService()
throws
Exception {
// nacos配置
Properties properties =
new
Properties();
properties.put(PropertyKeyConst.
SERVER_ADDR
,
address
);
properties.put(PropertyKeyConst.
NAMESPACE
,
namespace
);
properties.put(PropertyKeyConst.
USERNAME
,
username
);
properties.put(PropertyKeyConst.
PASSWORD
,
password
);
//return ConfigFactory.createConfigService(“localhost”);
return
ConfigFactory.
createConfigService
(properties);
}
/**
* 编码 sentinel将数据发送给nacos要编码
*/
@Bean
public
Converter<List<FlowRuleEntity>, String> flowRuleEntityEncoder() {
return
JSON::
toJSONString
;
}
/**
* 解码 sentinel从nacos获取数据要解码
*/
@Bean
public
Converter<String, List<FlowRuleEntity>> RuleEntityDecoder() {
return
s -> JSON.
parseArray
(s, FlowRuleEntity.
class
);
}
/**
* 规则转换器 推送 将队则转成JSON字符串
*/
@Bean
public
Converter<List<SystemRuleEntity>, String> systemRuleEntityEncoder() {
return
JSON::
toJSONString
;
}
/**
* 规则转换器 读取 将nacos中的JSON规则转换成SystemRuleEntity
*/
@Bean
public
Converter<String, List<SystemRuleEntity>> systemRuleEntityDecoder() {
return
s -> JSON.
parseArray
(s, SystemRuleEntity.
class
);
}
}
|
修改
SystemController
@RestController
@RequestMapping
(
“/system”
)
public class
SystemController {
private final
Logger
logger
= LoggerFactory.
getLogger
(SystemController.
class
);
@Autowired
private
RuleRepository<SystemRuleEntity, Long>
repository
;
@Autowired
@Qualifier
(
“systemRuleNacosProvider”
)
private
DynamicRuleProvider<List<SystemRuleEntity>>
ruleProvider
;
@Autowired
@Qualifier
(
“systemRuleNacosPublisher”
)
private
DynamicRulePublisher<List<SystemRuleEntity>>
rulePublisher
;
private
<
R
> Result<
R
> checkBasicParams(String app, String ip, Integer port) {
if
(StringUtil.
isEmpty
(app)) {
return
Result.
ofFail
(-
1
,
“app can’t be null or empty”
);
}
if
(port <=
0
|| port >
65535
) {
return
Result.
ofFail
(-
1
,
“port should be in (0, 65535)”
);
}
return null
;
}
@GetMapping
(
“/rules.json”
)
@AuthAction
(PrivilegeType.
READ_RULE
)
public
Result<List<SystemRuleEntity>> apiQueryMachineRules(String app, String ip,
Integer port) {
Result<List<SystemRuleEntity>> checkResult = checkBasicParams(app, ip, port);
if
(checkResult !=
null
) {
return
checkResult;
}
try
{
List<SystemRuleEntity> rules =
ruleProvider
.getRules(app);
rules =
repository
.saveAll(rules);
return
Result.
ofSuccess
(rules);
}
catch
(Throwable throwable) {
logger
.error(
“Query machine system rules error”
, throwable);
return
Result.
ofThrowable
(-
1
, throwable);
}
}
private int
countNotNullAndNotNegative(Number… values) {
int
notNullCount =
0
;
for
(
int
i =
0
; i < values.
length
; i++) {
if
(values[i] !=
null
&& values[i].doubleValue() >=
0
) {
notNullCount++;
}
}
return
notNullCount;
}
@RequestMapping
(
“/new.json”
)
@AuthAction
(PrivilegeType.
WRITE_RULE
)
public
Result<SystemRuleEntity> apiAdd(String app, String ip, Integer port,
Double highestSystemLoad, Double highestCpuUsage, Long avgRt,
Long maxThread, Double qps) {
Result<SystemRuleEntity> checkResult = checkBasicParams(app, ip, port);
if
(checkResult !=
null
) {
return
checkResult;
}
int
notNullCount = countNotNullAndNotNegative(highestSystemLoad, avgRt, maxThread, qps, highestCpuUsage);
if
(notNullCount !=
1
) {
return
Result.
ofFail
(-
1
,
“only one of [highestSystemLoad, avgRt, maxThread, qps,highestCpuUsage] ”
+
“value must be set > 0, but ”
+ notNullCount +
” values get”
);
}
if
(
null
!= highestCpuUsage && highestCpuUsage >
1
) {
return
Result.
ofFail
(-
1
,
“highestCpuUsage must between [0.0, 1.0]”
);
}
SystemRuleEntity entity =
new
SystemRuleEntity();
entity.setApp(app.trim());
entity.setIp(ip.trim());
entity.setPort(port);
// -1 is a fake value
if
(
null
!= highestSystemLoad) {
entity.setHighestSystemLoad(highestSystemLoad);
}
else
{
entity.setHighestSystemLoad(-
1D
);
}
if
(
null
!= highestCpuUsage) {
entity.setHighestCpuUsage(highestCpuUsage);
}
else
{
entity.setHighestCpuUsage(-
1D
);
}
if
(avgRt !=
null
) {
entity.setAvgRt(avgRt);
}
else
{
entity.setAvgRt(-
1L
);
}
if
(maxThread !=
null
) {
entity.setMaxThread(maxThread);
}
else
{
entity.setMaxThread(-
1L
);
}
if
(qps !=
null
) {
entity.setQps(qps);
}
else
{
entity.setQps(-
1D
);
}
Date date =
new
Date();
entity.setGmtCreate(date);
entity.setGmtModified(date);
try
{
entity =
repository
.save(entity);
}
catch
(Throwable throwable) {
logger
.error(
“Add SystemRule error”
, throwable);
return
Result.
ofThrowable
(-
1
, throwable);
}
publishRules(app);
return
Result.
ofSuccess
(entity);
}
@GetMapping
(
“/save.json”
)
@AuthAction
(PrivilegeType.
WRITE_RULE
)
public
Result<SystemRuleEntity> apiUpdateIfNotNull(Long id, String app, Double highestSystemLoad,
Double highestCpuUsage, Long avgRt, Long maxThread, Double qps) {
if
(id ==
null
) {
return
Result.
ofFail
(-
1
,
“id can’t be null”
);
}
SystemRuleEntity entity =
repository
.findById(id);
if
(entity ==
null
) {
return
Result.
ofFail
(-
1
,
“id ”
+ id +
” dose not exist”
);
}
if
(StringUtil.
isNotBlank
(app)) {
entity.setApp(app.trim());
}
if
(highestSystemLoad !=
null
) {
if
(highestSystemLoad <
0
) {
return
Result.
ofFail
(-
1
,
“highestSystemLoad must >= 0”
);
}
entity.setHighestSystemLoad(highestSystemLoad);
}
if
(highestCpuUsage !=
null
) {
if
(highestCpuUsage <
0
) {
return
Result.
ofFail
(-
1
,
“highestCpuUsage must >= 0”
);
}
if
(highestCpuUsage >
1
) {
return
Result.
ofFail
(-
1
,
“highestCpuUsage must <= 1”
);
}
entity.setHighestCpuUsage(highestCpuUsage);
}
if
(avgRt !=
null
) {
if
(avgRt <
0
) {
return
Result.
ofFail
(-
1
,
“avgRt must >= 0”
);
}
entity.setAvgRt(avgRt);
}
if
(maxThread !=
null
) {
if
(maxThread <
0
) {
return
Result.
ofFail
(-
1
,
“maxThread must >= 0”
);
}
entity.setMaxThread(maxThread);
}
if
(qps !=
null
) {
if
(qps <
0
) {
return
Result.
ofFail
(-
1
,
“qps must >= 0”
);
}
entity.setQps(qps);
}
Date date =
new
Date();
entity.setGmtModified(date);
try
{
entity =
repository
.save(entity);
}
catch
(Throwable throwable) {
logger
.error(
“save error:”
, throwable);
return
Result.
ofThrowable
(-
1
, throwable);
}
publishRules(app);
return
Result.
ofSuccess
(entity);
}
@RequestMapping
(
“/delete.json”
)
@AuthAction
(PrivilegeType.
DELETE_RULE
)
public
Result<?> delete(Long id) {
if
(id ==
null
) {
return
Result.
ofFail
(-
1
,
“id can’t be null”
);
}
SystemRuleEntity oldEntity =
repository
.findById(id);
if
(oldEntity ==
null
) {
return
Result.
ofSuccess
(
null
);
}
try
{
repository
.delete(id);
}
catch
(Throwable throwable) {
logger
.error(
“delete error:”
, throwable);
return
Result.
ofThrowable
(-
1
, throwable);
}
publishRules(oldEntity.getApp());
return
Result.
ofSuccess
(id);
}
private void
publishRules(String app) {
List<SystemRuleEntity> rules =
repository
.findAllByApp(app);
try
{
rulePublisher
.publish(app, rules);
}
catch
(Exception e) {
e.printStackTrace();
}
}
}
|
重启服务,系统规则就配置好了。
服务端拉取规则配置需要引入相关依赖包,配置相关
sentinel
配置。
在服务端引入相关依赖包。
<!– sentinel nacos数据持久化–>
<
dependency
>
<
groupId
>
com.alibaba.csp
</
groupId
>
<
artifactId
>
sentinel-datasource-nacos
</
artifactId
>
<
version
>
1.8.0
</
version
>
</
dependency
>
|
配置
sentinel
支持
nacos
配置,从
nacos
中加载对应的配置规则。
spring
:
application
:
name
: hailtaxi-driver
cloud
:
sentinel
:
transport
:
port
: 8719
dashboard
: 127.0.0.1:8080
datasource
:
nacos
:
server-addr
: 127.0.0.1:8848
username
: nacos
password
: nacos
namespace
: 1cc805fd-009f-4ca9-8ceb-f424c9b8babd
groupId
: DEFAULT_GROUP
dataId
: ${
spring.application.name
}-flow-rules
rule-type
: flow
system
:
nacos
:
server-addr
: 127.0.0.1:8848
username
: nacos
password
: nacos
namespace
: 1cc805fd-009f-4ca9-8ceb-f424c9b8babd
groupId
: DEFAULT_GROUP
dataId
: ${
spring.application.name
}-system-rules
rule-type
: system
|
启动服务,新增系统规则,配置
CPU
使用率。
访问测试接口:http://127.0.0.1:18888/driver/info/1
当
CUP
使用率低于10%,接口才能被调用。
当
CPU
使用率高于10%,系统将会被熔断,拒绝被访问。