手写RPC框架项目代码解读——Socket版本
项目地址:
https://github.com/Snailclimb/guide-rpc-framework
。
整体流程图
约定的名字
1、RPC服务名 = 接口名字 + 组号 + 版本号
2、zookeeper服务路径规定如下:
"/my-rpc/" + 服务名 + "/" + IP地址 + 端口号
例如:
"/my-rpc/github.javaguide.HelloService/172.20.170.166:9998"
问题描述
1、如果在服务器运行时,IP地址改变,那么钩子函数将不能在zookeeper客户端中删去结点
原因:inetSocketAddress改变,但是静态的REGISTERED_PATH_SET不会改变。
解决:REGISTERED_PATH_SET直接删除,不要加if判断
public static void clearRegistry(CuratorFramework zkClient, InetSocketAddress inetSocketAddress) {
System.out.println(REGISTERED_PATH_SET);
REGISTERED_PATH_SET.stream().parallel().forEach(p -> {
try {
// if (p.endsWith(inetSocketAddress.toString())) {
System.out.println(zkClient.getChildren().forPath("/my-rpc/github.javaguide.HelloService"));
zkClient.delete().forPath(p);
// System.out.println(p);
System.out.println("---");
System.out.println(zkClient.getChildren().forPath("/my-rpc/github.javaguide.HelloService"));
// }
} catch (Exception e) {
log.error("clear registry for path [{}] fail", p);
}
});
log.info("All registered services on the server are cleared:[{}]", REGISTERED_PATH_SET.toString());
}
1、基于Socket
1.1 服务端
1、初始化提供服务的服务类,初始化socket服务器,初始化服务配置类,将服务类添加到服务配置类的属性中,把配置类注册到socket服务器中,并开启服务。
public static void main(String[] args) {
HelloService helloService = new HelloServiceImpl();
SocketRpcServer socketRpcServer = new SocketRpcServer();
RpcServiceConfig rpcServiceConfig = new RpcServiceConfig();
rpcServiceConfig.setService(helloService);
socketRpcServer.registerService(rpcServiceConfig);
socketRpcServer.start();
}
1.1.1 设置服务
1、服务配置类有服务这个属性,设置该属性为需要添加到注册中心的服务。
rpcServiceConfig.setService(helloService);
1.1.2 注册服务
socketRpcServer.registerService(rpcServiceConfig);
1、在registerService方法中,调用publishService方法
public void registerService(RpcServiceConfig rpcServiceConfig) {
serviceProvider.publishService(rpcServiceConfig);
}
2、publishService方法,首先获得服务器的IP地址,将服务配置添加到ZkServiceProviderImpl中。
@Override
public void publishService(RpcServiceConfig rpcServiceConfig) {
try {
String host = InetAddress.getLocalHost().getHostAddress();
this.addService(rpcServiceConfig);
serviceRegistry.registerService(rpcServiceConfig.getRpcServiceName(), new InetSocketAddress(host, NettyRpcServer.PORT));
} catch (UnknownHostException e) {
log.error("occur exception when getHostAddress", e);
}
}
3、ZkServiceProviderImpl的addService方法,如果registeredService中包含了RPC服务名(接口名字+组号+版本号),直接返回,如果没有包含则添加进registeredService;并且在serviceMap中添加键值对,键是服务名,值是服务类。
@Override
public void addService(RpcServiceConfig rpcServiceConfig) {
String rpcServiceName = rpcServiceConfig.getRpcServiceName();
if (registeredService.contains(rpcServiceName)) {
return;
}
registeredService.add(rpcServiceName);
serviceMap.put(rpcServiceName, rpcServiceConfig.getService());
log.info("Add service: {} and interfaces:{}", rpcServiceName, rpcServiceConfig.getService().getClass().getInterfaces());
}
4、调用ZkServiceRegistryImpl的registerService方法,首先拼接服务路径,服务路径规定如下:
"/my-rpc/" + 服务名 + "/" + IP地址 + 端口号
例如:
"/my-rpc/github.javaguide.HelloService/172.20.170.166:9998"
获取zookeeper客户端,创造永久结点
@Override
public void registerService(String rpcServiceName, InetSocketAddress inetSocketAddress) {
String servicePath = CuratorUtils.ZK_REGISTER_ROOT_PATH + "/" + rpcServiceName + inetSocketAddress.toString();
CuratorFramework zkClient = CuratorUtils.getZkClient();
CuratorUtils.createPersistentNode(zkClient, servicePath);
}
5、获得zookeeper客户端,如果能够连接zk客户端,则返回,如果未连接成功,尝试一次新连接,等待30s失败后抛出异常
public static CuratorFramework getZkClient() {
// check if user has set zk address
Properties properties = PropertiesFileUtil.readPropertiesFile(RpcConfigEnum.RPC_CONFIG_PATH.getPropertyValue());
String zookeeperAddress = properties != null && properties.getProperty(RpcConfigEnum.ZK_ADDRESS.getPropertyValue()) != null ? properties.getProperty(RpcConfigEnum.ZK_ADDRESS.getPropertyValue()) : DEFAULT_ZOOKEEPER_ADDRESS;
// if zkClient has been started, return directly
if (zkClient != null && zkClient.getState() == CuratorFrameworkState.STARTED) {
return zkClient;
}
// Retry strategy. Retry 3 times, and will increase the sleep time between retries.
RetryPolicy retryPolicy = new ExponentialBackoffRetry(BASE_SLEEP_TIME, MAX_RETRIES);
zkClient = CuratorFrameworkFactory.builder()
// the server to connect to (can be a server list)
.connectString(zookeeperAddress)
.retryPolicy(retryPolicy)
.build();
zkClient.start();
try {
// wait 30s until connect to the zookeeper
if (!zkClient.blockUntilConnected(30, TimeUnit.SECONDS)) {
throw new RuntimeException("Time out waiting to connect to ZK!");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
return zkClient;
}
6、createPersistentNode方法,主要做两件事,一是在zookeeper客户端中创建PERSISTENT类型的节点,二是在REGISTERED_PATH_SET中加入服务路径。
public static void createPersistentNode(CuratorFramework zkClient, String path) {
try {
if (REGISTERED_PATH_SET.contains(path) || zkClient.checkExists().forPath(path) != null) {
log.info("The node already exists. The node is:[{}]", path);
} else {
//eg: /my-rpc/github.javaguide.HelloService/127.0.0.1:9999
zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path);
log.info("The node was created successfully. The node is:[{}]", path);
}
REGISTERED_PATH_SET.add(path);
} catch (Exception e) {
log.error("create persistent node for path [{}] fail", path);
}
}
1.1.3 启动
调用start方法
socketRpcServer.start();
1、创建ServerSocket,监听设定好的端口,如果没有消息就处于阻塞,收到消息则执行处理请求,这里不断循环
public void start() {
try (ServerSocket server = new ServerSocket()) {
String host = InetAddress.getLocalHost().getHostAddress();
server.bind(new InetSocketAddress(host, PORT));
CustomShutdownHook.getCustomShutdownHook().clearAll();
Socket socket;
while ((socket = server.accept()) != null) {
log.info("client connected [{}]", socket.getInetAddress());
threadPool.execute(new SocketRpcRequestHandlerRunnable(socket));
}
threadPool.shutdown();
} catch (IOException e) {
log.error("occur IOException:", e);
}
}
2、SocketRpcRequestHandlerRunnable实现了Runnable接口,因此excute时会执行实现的run方法。获得输入流和输出流,读取输入流,并强转成RpcRequest,使用Handler处理请求,将处理完的响应写回输出流并刷新。
@Override
public void run() {
log.info("server handle message from client by thread: [{}]", Thread.currentThread().getName());
try (ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream());
ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream())) {
RpcRequest rpcRequest = (RpcRequest) objectInputStream.readObject();
Object result = rpcRequestHandler.handle(rpcRequest);
objectOutputStream.writeObject(RpcResponse.success(result, rpcRequest.getRequestId()));
objectOutputStream.flush();
} catch (IOException | ClassNotFoundException e) {
log.error("occur exception:", e);
}
}
3、handle方法获得服务名字(接口名,group名和version名),利用服务名字获得服务,如果不存在该服务,则抛出异常,如果存在该服务则返回服务。接着调用服务的指定方法,使用了反射,获取服务的方法,调用方法,并将方法的返回值返回。
public Object handle(RpcRequest rpcRequest) {
Object service = serviceProvider.getService(rpcRequest.getRpcServiceName());
return invokeTargetMethod(rpcRequest, service);
}
@Override
public Object getService(String rpcServiceName) {
Object service = serviceMap.get(rpcServiceName);
if (null == service) {
throw new RpcException(RpcErrorMessageEnum.SERVICE_CAN_NOT_BE_FOUND);
}
return service;
}
private Object invokeTargetMethod(RpcRequest rpcRequest, Object service) {
Object result;
try {
Method method = service.getClass().getMethod(rpcRequest.getMethodName(), rpcRequest.getParamTypes());
result = method.invoke(service, rpcRequest.getParameters());
log.info("service:[{}] successful invoke method:[{}]", rpcRequest.getInterfaceName(), rpcRequest.getMethodName());
} catch (NoSuchMethodException | IllegalArgumentException | InvocationTargetException | IllegalAccessException e) {
throw new RpcException(e.getMessage(), e);
}
return result;
}
1.1.3.1 钩子函数
在上面的start方法中,产生了钩子函数
CustomShutdownHook.getCustomShutdownHook().clearAll();
1、在虚拟机关闭时调用钩子函数,首先获得本机的IP地址和端口号,并在zookeeper客户端中删去这个地址。
public void clearAll() {
log.info("addShutdownHook for clearAll");
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try {
InetSocketAddress inetSocketAddress = new InetSocketAddress(InetAddress.getLocalHost().getHostAddress(), NettyRpcServer.PORT);
CuratorUtils.clearRegistry(CuratorUtils.getZkClient(), inetSocketAddress);
} catch (UnknownHostException ignored) {
}
ThreadPoolFactoryUtil.shutDownAllThreadPool();
}));
}
2、clearRegistry清理服务器在zookeeper客户端中的服务地址。如果Set中以IP地址+端口号结尾,则在zookeeper客户端中删去这个服务名的地址。
public static void clearRegistry(CuratorFramework zkClient, InetSocketAddress inetSocketAddress) {
REGISTERED_PATH_SET.stream().parallel().forEach(p -> {
try {
if (p.endsWith(inetSocketAddress.toString())) {
zkClient.delete().forPath(p);
}
} catch (Exception e) {
log.error("clear registry for path [{}] fail", p);
}
});
log.info("All registered services on the server are cleared:[{}]", REGISTERED_PATH_SET.toString());
}
3、在clearAll方法中还会调用shutDownAllThreadPool方法,关闭该服务器的所有线程。map中存储的键为线程名字,值为线程,一次获取值(即线程),调用shutdown方法,线程停止接收新任务,在执行完正在执行的任务后被关闭。再调用awaitTermination方法,等待10s后不管是否执行完任务,都会关闭线程,如果提前执行完任务会提前关闭线程。
public static void shutDownAllThreadPool() {
log.info("call shutDownAllThreadPool method");
THREAD_POOLS.entrySet().parallelStream().forEach(entry -> {
ExecutorService executorService = entry.getValue();
executorService.shutdown();
log.info("shut down thread pool [{}] [{}]", entry.getKey(), executorService.isTerminated());
try {
executorService.awaitTermination(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
log.error("Thread pool never terminated");
executorService.shutdownNow();
}
});
}
1.2 客户端
初始化客户端和配置类,初始化客户代理,传入客户端和配置类,调用获得代理对象的方法
public static void main(String[] args) {
RpcRequestTransport rpcRequestTransport = new SocketRpcClient();
RpcServiceConfig rpcServiceConfig = new RpcServiceConfig();
RpcClientProxy rpcClientProxy = new RpcClientProxy(rpcRequestTransport, rpcServiceConfig);
HelloService helloService = rpcClientProxy.getProxy(HelloService.class);
String hello = helloService.hello(new Hello("111", "222"));
System.out.println(hello);
}
1.2.1 获得客户代理
1、获得代理对象,使用了Java动态代理,newProxyInstance传入三个参数分别是:类加载器,服务的接口类,实现InvocationHandler的类,这样生成代理对象后,调用方法就会调用实现类的invoke方法。
@SuppressWarnings("unchecked")
public <T> T getProxy(Class<T> clazz) {
return (T) Proxy.newProxyInstance(clazz.getClassLoader(), new Class<?>[]{clazz}, this);
}
1.2.2 调用服务
直接调用本地方法一样,调用hello方法
String hello = helloService.hello(new Hello("111", "222"));
1、调用invoke方法。首先生成RPC请求,请求包括方法名,参数,接口名,参数类型,请求ID,分组,版本。对于Socket还是Netty不同处理。对于Socket,直接调用客户端的方法sendRpcRequest发送请求,获得响应。
@SneakyThrows
@SuppressWarnings("unchecked")
@Override
public Object invoke(Object proxy, Method method, Object[] args) {
log.info("invoked method: [{}]", method.getName());
RpcRequest rpcRequest = RpcRequest.builder().methodName(method.getName())
.parameters(args)
.interfaceName(method.getDeclaringClass().getName())
.paramTypes(method.getParameterTypes())
.requestId(UUID.randomUUID().toString())
.group(rpcServiceConfig.getGroup())
.version(rpcServiceConfig.getVersion())
.build();
RpcResponse<Object> rpcResponse = null;
if (rpcRequestTransport instanceof NettyRpcClient) {
// System.out.println(rpcRequest.getGroup() + " " + rpcRequest.getVersion());
CompletableFuture<RpcResponse<Object>> completableFuture = (CompletableFuture<RpcResponse<Object>>) rpcRequestTransport.sendRpcRequest(rpcRequest);
rpcResponse = completableFuture.get();
}
if (rpcRequestTransport instanceof SocketRpcClient) {
rpcResponse = (RpcResponse<Object>) rpcRequestTransport.sendRpcRequest(rpcRequest);
}
this.check(rpcResponse, rpcRequest);
return rpcResponse.getData();
}
2、发送RPC请求。首先获得服务器地址,然后连接地址,获得输出流,写入RPC请求,获得输入流,读取RPC响应。
@Override
public Object sendRpcRequest(RpcRequest rpcRequest) {
InetSocketAddress inetSocketAddress = serviceDiscovery.lookupService(rpcRequest);
try (Socket socket = new Socket()) {
socket.connect(inetSocketAddress);
ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
// Send data to the server through the output stream
objectOutputStream.writeObject(rpcRequest);
ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream());
// Read RpcResponse from the input stream
return objectInputStream.readObject();
} catch (IOException | ClassNotFoundException e) {
throw new RpcException("调用服务失败:", e);
}
}
3、lookupService方法。首先获得RPC请求中的服务名,Curator(zookeeper client),获得zookeeper客户端,zookeeper孩子结点。如果节点为空,则抛出没有服务异常。如果有服务,利用负载均衡类选择合适的服务器地址,并根据这个地址获得IP地址和端口号,放到InetSocketAddress对象后返回。
@Override
public InetSocketAddress lookupService(RpcRequest rpcRequest) {
String rpcServiceName = rpcRequest.getRpcServiceName();
CuratorFramework zkClient = CuratorUtils.getZkClient();
List<String> serviceUrlList = CuratorUtils.getChildrenNodes(zkClient, rpcServiceName);
if (CollectionUtil.isEmpty(serviceUrlList)) {
throw new RpcException(RpcErrorMessageEnum.SERVICE_CAN_NOT_BE_FOUND, rpcServiceName);
}
// load balancing
String targetServiceUrl = loadBalance.selectServiceAddress(serviceUrlList, rpcRequest);
log.info("Successfully found the service address:[{}]", targetServiceUrl);
String[] socketAddressArray = targetServiceUrl.split(":");
String host = socketAddressArray[0];
int port = Integer.parseInt(socketAddressArray[1]);
return new InetSocketAddress(host, port);
}
4、selectServiceAddress方法,对边界条件判断,然后调用deSelect方法。
@Override
public String selectServiceAddress(List<String> serviceAddresses, RpcRequest rpcRequest) {
if (CollectionUtil.isEmpty(serviceAddresses)) {
return null;
}
if (serviceAddresses.size() == 1) {
return serviceAddresses.get(0);
}
return doSelect(serviceAddresses, rpcRequest);
}
5、doelect方法,选择服务地址,参考dubbo使用的是一致性哈希算法,首先identityHashCode获取哈希值(这个函数获得的哈希值和hashCode一样,区别是hashCode可能被改写,但是identityHashCode不会变) ,尝试获取对应服务的selector,如果没有则新建一个selector,并调用selector的select方法。
@Override
protected String doSelect(List<String> serviceAddresses, RpcRequest rpcRequest) {
int identityHashCode = System.identityHashCode(serviceAddresses);
// build rpc service name by rpcRequest
String rpcServiceName = rpcRequest.getRpcServiceName();
ConsistentHashSelector selector = selectors.get(rpcServiceName);
// check for updates
if (selector == null || selector.identityHashCode != identityHashCode) {
selectors.put(rpcServiceName, new ConsistentHashSelector(serviceAddresses, 160, identityHashCode));
selector = selectors.get(rpcServiceName);
}
return selector.select(rpcServiceName + Arrays.stream(rpcRequest.getParameters()));
}
6、选择服务器地址。首先根据传进来的字符串获得md5码,再计算md5码的哈希值,tailMap返回键大于输入键的Entry,并选取第一个Entry。如果都小于输入的键,则返回第一个Entry。
public String select(String rpcServiceKey) {
byte[] digest = md5(rpcServiceKey);
return selectForKey(hash(digest, 0));
}
static long hash(byte[] digest, int idx) {
return ((long) (digest[3 + idx * 4] & 255) << 24 | (long) (digest[2 + idx * 4] & 255) << 16 | (long) (digest[1 + idx * 4] & 255) << 8 | (long) (digest[idx * 4] & 255)) & 4294967295L;
}
public String selectForKey(long hashCode) {
Map.Entry<Long, String> entry = virtualInvokers.tailMap(hashCode, true).firstEntry();
if (entry == null) {
entry = virtualInvokers.firstEntry();
}
return entry.getValue();
}
7、检查响应是否正确,如果响应为空、响应中的请求ID不等于发送请求的ID、响应中的状态码为空、状态码不等于成功状态。这些情况均抛出异常,异常中包含不同信息以及接口名。
private void check(RpcResponse<Object> rpcResponse, RpcRequest rpcRequest) {
if (rpcResponse == null) {
throw new RpcException(RpcErrorMessageEnum.SERVICE_INVOCATION_FAILURE, INTERFACE_NAME + ":" + rpcRequest.getInterfaceName());
}
if (!rpcRequest.getRequestId().equals(rpcResponse.getRequestId())) {
throw new RpcException(RpcErrorMessageEnum.REQUEST_NOT_MATCH_RESPONSE, INTERFACE_NAME + ":" + rpcRequest.getInterfaceName());
}
if (rpcResponse.getCode() == null || !rpcResponse.getCode().equals(RpcResponseCodeEnum.SUCCESS.getCode())) {
throw new RpcException(RpcErrorMessageEnum.SERVICE_INVOCATION_FAILURE, INTERFACE_NAME + ":" + rpcRequest.getInterfaceName());
}
}
8、返回响应中的数据
return rpcResponse.getData();
2、 Zookeeper
zookeeper客户端删除多个结点
zkClient.delete().deletingChildrenIfNeeded().forPath("/my-rpc/" + rpcServiceName);
2.1 客户端访问zookeeper
1、获得zookeeper客户端
public static CuratorFramework getZkClient() {
// check if user has set zk address
Properties properties = PropertiesFileUtil.readPropertiesFile(RpcConfigEnum.RPC_CONFIG_PATH.getPropertyValue());
String zookeeperAddress = properties != null && properties.getProperty(RpcConfigEnum.ZK_ADDRESS.getPropertyValue()) != null ? properties.getProperty(RpcConfigEnum.ZK_ADDRESS.getPropertyValue()) : DEFAULT_ZOOKEEPER_ADDRESS;
// if zkClient has been started, return directly
if (zkClient != null && zkClient.getState() == CuratorFrameworkState.STARTED) {
return zkClient;
}
// Retry strategy. Retry 3 times, and will increase the sleep time between retries.
RetryPolicy retryPolicy = new ExponentialBackoffRetry(BASE_SLEEP_TIME, MAX_RETRIES);
zkClient = CuratorFrameworkFactory.builder()
// the server to connect to (can be a server list)
.connectString(zookeeperAddress)
.retryPolicy(retryPolicy)
.build();
zkClient.start();
try {
// wait 30s until connect to the zookeeper
if (!zkClient.blockUntilConnected(30, TimeUnit.SECONDS)) {
throw new RuntimeException("Time out waiting to connect to ZK!");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
return zkClient;
}
2、通过zookeeper客户端获得孩子结点,首先看map里有没有服务名,有则返回,没有则在zookeeper中获取服务路径,创建服务名和路径集合的键值对,并放到map中。重点是
zkClient.getChildren().forPath(servicePath)
,
public static List<String> getChildrenNodes(CuratorFramework zkClient, String rpcServiceName) {
if (SERVICE_ADDRESS_MAP.containsKey(rpcServiceName)) {
return SERVICE_ADDRESS_MAP.get(rpcServiceName);
}
List<String> result = null;
String servicePath = ZK_REGISTER_ROOT_PATH + "/" + rpcServiceName;
try {
result = zkClient.getChildren().forPath(servicePath);
SERVICE_ADDRESS_MAP.put(rpcServiceName, result);
registerWatcher(rpcServiceName, zkClient);
} catch (Exception e) {
log.error("get children nodes for path [{}] fail", servicePath);
}
return result;
}
3、注册watcher,观察服务的变化
private static void registerWatcher(String rpcServiceName, CuratorFramework zkClient) throws Exception {
String servicePath = ZK_REGISTER_ROOT_PATH + "/" + rpcServiceName;
PathChildrenCache pathChildrenCache = new PathChildrenCache(zkClient, servicePath, true);
PathChildrenCacheListener pathChildrenCacheListener = (curatorFramework, pathChildrenCacheEvent) -> {
List<String> serviceAddresses = curatorFramework.getChildren().forPath(servicePath);
SERVICE_ADDRESS_MAP.put(rpcServiceName, serviceAddresses);
};
pathChildrenCache.getListenable().addListener(pathChildrenCacheListener);
pathChildrenCache.start();
}