手写RPC框架项目代码解读——Socket版本

  • Post author:
  • Post category:其他


项目地址:

https://github.com/Snailclimb/guide-rpc-framework



整体流程图

在这里插入图片描述



约定的名字

1、RPC服务名 = 接口名字 + 组号 + 版本号

2、zookeeper服务路径规定如下:

"/my-rpc/" + 服务名 +  "/" + IP地址 + 端口号

例如:

"/my-rpc/github.javaguide.HelloService/172.20.170.166:9998"



问题描述

1、如果在服务器运行时,IP地址改变,那么钩子函数将不能在zookeeper客户端中删去结点

原因:inetSocketAddress改变,但是静态的REGISTERED_PATH_SET不会改变。

解决:REGISTERED_PATH_SET直接删除,不要加if判断

public static void clearRegistry(CuratorFramework zkClient, InetSocketAddress inetSocketAddress) {
    System.out.println(REGISTERED_PATH_SET);
    REGISTERED_PATH_SET.stream().parallel().forEach(p -> {
        try {
            // if (p.endsWith(inetSocketAddress.toString())) {
            System.out.println(zkClient.getChildren().forPath("/my-rpc/github.javaguide.HelloService"));
            zkClient.delete().forPath(p);
            // System.out.println(p);
            System.out.println("---");
            System.out.println(zkClient.getChildren().forPath("/my-rpc/github.javaguide.HelloService"));
            // }
        } catch (Exception e) {
            log.error("clear registry for path [{}] fail", p);
        }
    });
    log.info("All registered services on the server are cleared:[{}]", REGISTERED_PATH_SET.toString());
}



1、基于Socket



1.1 服务端

1、初始化提供服务的服务类,初始化socket服务器,初始化服务配置类,将服务类添加到服务配置类的属性中,把配置类注册到socket服务器中,并开启服务。

public static void main(String[] args) {
    HelloService helloService = new HelloServiceImpl();
    SocketRpcServer socketRpcServer = new SocketRpcServer();
    RpcServiceConfig rpcServiceConfig = new RpcServiceConfig();
    rpcServiceConfig.setService(helloService);
    socketRpcServer.registerService(rpcServiceConfig);
    socketRpcServer.start();
}



1.1.1 设置服务

1、服务配置类有服务这个属性,设置该属性为需要添加到注册中心的服务。

rpcServiceConfig.setService(helloService);



1.1.2 注册服务

socketRpcServer.registerService(rpcServiceConfig);

1、在registerService方法中,调用publishService方法

public void registerService(RpcServiceConfig rpcServiceConfig) {
    serviceProvider.publishService(rpcServiceConfig);
}

2、publishService方法,首先获得服务器的IP地址,将服务配置添加到ZkServiceProviderImpl中。

@Override
public void publishService(RpcServiceConfig rpcServiceConfig) {
    try {
        String host = InetAddress.getLocalHost().getHostAddress();
        this.addService(rpcServiceConfig);
        serviceRegistry.registerService(rpcServiceConfig.getRpcServiceName(), new InetSocketAddress(host, NettyRpcServer.PORT));
    } catch (UnknownHostException e) {
        log.error("occur exception when getHostAddress", e);
    }
}

3、ZkServiceProviderImpl的addService方法,如果registeredService中包含了RPC服务名(接口名字+组号+版本号),直接返回,如果没有包含则添加进registeredService;并且在serviceMap中添加键值对,键是服务名,值是服务类。

@Override
public void addService(RpcServiceConfig rpcServiceConfig) {
    String rpcServiceName = rpcServiceConfig.getRpcServiceName();
    if (registeredService.contains(rpcServiceName)) {
        return;
    }
    registeredService.add(rpcServiceName);
    serviceMap.put(rpcServiceName, rpcServiceConfig.getService());
    log.info("Add service: {} and interfaces:{}", rpcServiceName, rpcServiceConfig.getService().getClass().getInterfaces());
}

4、调用ZkServiceRegistryImpl的registerService方法,首先拼接服务路径,服务路径规定如下:

"/my-rpc/" + 服务名 +  "/" + IP地址 + 端口号

例如:

"/my-rpc/github.javaguide.HelloService/172.20.170.166:9998"

获取zookeeper客户端,创造永久结点

@Override
public void registerService(String rpcServiceName, InetSocketAddress inetSocketAddress) {
    String servicePath = CuratorUtils.ZK_REGISTER_ROOT_PATH + "/" + rpcServiceName + inetSocketAddress.toString();
    CuratorFramework zkClient = CuratorUtils.getZkClient();
    CuratorUtils.createPersistentNode(zkClient, servicePath);
}

5、获得zookeeper客户端,如果能够连接zk客户端,则返回,如果未连接成功,尝试一次新连接,等待30s失败后抛出异常

public static CuratorFramework getZkClient() {
    // check if user has set zk address
    Properties properties = PropertiesFileUtil.readPropertiesFile(RpcConfigEnum.RPC_CONFIG_PATH.getPropertyValue());
    String zookeeperAddress = properties != null && properties.getProperty(RpcConfigEnum.ZK_ADDRESS.getPropertyValue()) != null ? properties.getProperty(RpcConfigEnum.ZK_ADDRESS.getPropertyValue()) : DEFAULT_ZOOKEEPER_ADDRESS;
    // if zkClient has been started, return directly
    if (zkClient != null && zkClient.getState() == CuratorFrameworkState.STARTED) {
        return zkClient;
    }
    // Retry strategy. Retry 3 times, and will increase the sleep time between retries.
    RetryPolicy retryPolicy = new ExponentialBackoffRetry(BASE_SLEEP_TIME, MAX_RETRIES);
    zkClient = CuratorFrameworkFactory.builder()
            // the server to connect to (can be a server list)
            .connectString(zookeeperAddress)
            .retryPolicy(retryPolicy)
            .build();
    zkClient.start();
    try {
        // wait 30s until connect to the zookeeper
        if (!zkClient.blockUntilConnected(30, TimeUnit.SECONDS)) {
            throw new RuntimeException("Time out waiting to connect to ZK!");
        }
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return zkClient;
}

6、createPersistentNode方法,主要做两件事,一是在zookeeper客户端中创建PERSISTENT类型的节点,二是在REGISTERED_PATH_SET中加入服务路径。

public static void createPersistentNode(CuratorFramework zkClient, String path) {
    try {
        if (REGISTERED_PATH_SET.contains(path) || zkClient.checkExists().forPath(path) != null) {
            log.info("The node already exists. The node is:[{}]", path);
        } else {
            //eg: /my-rpc/github.javaguide.HelloService/127.0.0.1:9999
            zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path);
            log.info("The node was created successfully. The node is:[{}]", path);
        }
        REGISTERED_PATH_SET.add(path);
    } catch (Exception e) {
        log.error("create persistent node for path [{}] fail", path);
    }
}



1.1.3 启动

调用start方法

socketRpcServer.start();

1、创建ServerSocket,监听设定好的端口,如果没有消息就处于阻塞,收到消息则执行处理请求,这里不断循环

public void start() {
    try (ServerSocket server = new ServerSocket()) {
        String host = InetAddress.getLocalHost().getHostAddress();
        server.bind(new InetSocketAddress(host, PORT));
        CustomShutdownHook.getCustomShutdownHook().clearAll();
        Socket socket;
        while ((socket = server.accept()) != null) {
            log.info("client connected [{}]", socket.getInetAddress());
            threadPool.execute(new SocketRpcRequestHandlerRunnable(socket));
        }
        threadPool.shutdown();
    } catch (IOException e) {
        log.error("occur IOException:", e);
    }
}

2、SocketRpcRequestHandlerRunnable实现了Runnable接口,因此excute时会执行实现的run方法。获得输入流和输出流,读取输入流,并强转成RpcRequest,使用Handler处理请求,将处理完的响应写回输出流并刷新。

@Override
public void run() {
    log.info("server handle message from client by thread: [{}]", Thread.currentThread().getName());
    try (ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream());
         ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream())) {
        RpcRequest rpcRequest = (RpcRequest) objectInputStream.readObject();
        Object result = rpcRequestHandler.handle(rpcRequest);
        objectOutputStream.writeObject(RpcResponse.success(result, rpcRequest.getRequestId()));
        objectOutputStream.flush();
    } catch (IOException | ClassNotFoundException e) {
        log.error("occur exception:", e);
    }
}

3、handle方法获得服务名字(接口名,group名和version名),利用服务名字获得服务,如果不存在该服务,则抛出异常,如果存在该服务则返回服务。接着调用服务的指定方法,使用了反射,获取服务的方法,调用方法,并将方法的返回值返回。

public Object handle(RpcRequest rpcRequest) {
    Object service = serviceProvider.getService(rpcRequest.getRpcServiceName());
    return invokeTargetMethod(rpcRequest, service);
}
@Override
public Object getService(String rpcServiceName) {
    Object service = serviceMap.get(rpcServiceName);
    if (null == service) {
        throw new RpcException(RpcErrorMessageEnum.SERVICE_CAN_NOT_BE_FOUND);
    }
    return service;
}
private Object invokeTargetMethod(RpcRequest rpcRequest, Object service) {
    Object result;
    try {
        Method method = service.getClass().getMethod(rpcRequest.getMethodName(), rpcRequest.getParamTypes());
        result = method.invoke(service, rpcRequest.getParameters());
        log.info("service:[{}] successful invoke method:[{}]", rpcRequest.getInterfaceName(), rpcRequest.getMethodName());
    } catch (NoSuchMethodException | IllegalArgumentException | InvocationTargetException | IllegalAccessException e) {
        throw new RpcException(e.getMessage(), e);
    }
    return result;
}



1.1.3.1 钩子函数

在上面的start方法中,产生了钩子函数

CustomShutdownHook.getCustomShutdownHook().clearAll();

1、在虚拟机关闭时调用钩子函数,首先获得本机的IP地址和端口号,并在zookeeper客户端中删去这个地址。

public void clearAll() {
    log.info("addShutdownHook for clearAll");
    Runtime.getRuntime().addShutdownHook(new Thread(() -> {
        try {
            InetSocketAddress inetSocketAddress = new InetSocketAddress(InetAddress.getLocalHost().getHostAddress(), NettyRpcServer.PORT);
            CuratorUtils.clearRegistry(CuratorUtils.getZkClient(), inetSocketAddress);
        } catch (UnknownHostException ignored) {
        }
        ThreadPoolFactoryUtil.shutDownAllThreadPool();
    }));
}

2、clearRegistry清理服务器在zookeeper客户端中的服务地址。如果Set中以IP地址+端口号结尾,则在zookeeper客户端中删去这个服务名的地址。

public static void clearRegistry(CuratorFramework zkClient, InetSocketAddress inetSocketAddress) {
    REGISTERED_PATH_SET.stream().parallel().forEach(p -> {
        try {
            if (p.endsWith(inetSocketAddress.toString())) {
                zkClient.delete().forPath(p);
            }
        } catch (Exception e) {
            log.error("clear registry for path [{}] fail", p);
        }
    });
    log.info("All registered services on the server are cleared:[{}]", REGISTERED_PATH_SET.toString());
}

3、在clearAll方法中还会调用shutDownAllThreadPool方法,关闭该服务器的所有线程。map中存储的键为线程名字,值为线程,一次获取值(即线程),调用shutdown方法,线程停止接收新任务,在执行完正在执行的任务后被关闭。再调用awaitTermination方法,等待10s后不管是否执行完任务,都会关闭线程,如果提前执行完任务会提前关闭线程。

public static void shutDownAllThreadPool() {
    log.info("call shutDownAllThreadPool method");
    THREAD_POOLS.entrySet().parallelStream().forEach(entry -> {
        ExecutorService executorService = entry.getValue();
        executorService.shutdown();
        log.info("shut down thread pool [{}] [{}]", entry.getKey(), executorService.isTerminated());
        try {
            executorService.awaitTermination(10, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            log.error("Thread pool never terminated");
            executorService.shutdownNow();
        }
    });
}



1.2 客户端

初始化客户端和配置类,初始化客户代理,传入客户端和配置类,调用获得代理对象的方法

public static void main(String[] args) {
    RpcRequestTransport rpcRequestTransport = new SocketRpcClient();
    RpcServiceConfig rpcServiceConfig = new RpcServiceConfig();
    RpcClientProxy rpcClientProxy = new RpcClientProxy(rpcRequestTransport, rpcServiceConfig);
    HelloService helloService = rpcClientProxy.getProxy(HelloService.class);
    String hello = helloService.hello(new Hello("111", "222"));
    System.out.println(hello);
}



1.2.1 获得客户代理

1、获得代理对象,使用了Java动态代理,newProxyInstance传入三个参数分别是:类加载器,服务的接口类,实现InvocationHandler的类,这样生成代理对象后,调用方法就会调用实现类的invoke方法。

@SuppressWarnings("unchecked")
public <T> T getProxy(Class<T> clazz) {
    return (T) Proxy.newProxyInstance(clazz.getClassLoader(), new Class<?>[]{clazz}, this);
}



1.2.2 调用服务

直接调用本地方法一样,调用hello方法

String hello = helloService.hello(new Hello("111", "222"));

1、调用invoke方法。首先生成RPC请求,请求包括方法名,参数,接口名,参数类型,请求ID,分组,版本。对于Socket还是Netty不同处理。对于Socket,直接调用客户端的方法sendRpcRequest发送请求,获得响应。

@SneakyThrows
@SuppressWarnings("unchecked")
@Override
public Object invoke(Object proxy, Method method, Object[] args) {
    log.info("invoked method: [{}]", method.getName());
    RpcRequest rpcRequest = RpcRequest.builder().methodName(method.getName())
            .parameters(args)
            .interfaceName(method.getDeclaringClass().getName())
            .paramTypes(method.getParameterTypes())
            .requestId(UUID.randomUUID().toString())
            .group(rpcServiceConfig.getGroup())
            .version(rpcServiceConfig.getVersion())
            .build();
    RpcResponse<Object> rpcResponse = null;
    if (rpcRequestTransport instanceof NettyRpcClient) {
        // System.out.println(rpcRequest.getGroup() + " " + rpcRequest.getVersion());
        CompletableFuture<RpcResponse<Object>> completableFuture = (CompletableFuture<RpcResponse<Object>>) rpcRequestTransport.sendRpcRequest(rpcRequest);
        rpcResponse = completableFuture.get();
    }
    if (rpcRequestTransport instanceof SocketRpcClient) {
        rpcResponse = (RpcResponse<Object>) rpcRequestTransport.sendRpcRequest(rpcRequest);
    }
    this.check(rpcResponse, rpcRequest);
    return rpcResponse.getData();
}

2、发送RPC请求。首先获得服务器地址,然后连接地址,获得输出流,写入RPC请求,获得输入流,读取RPC响应。

@Override
public Object sendRpcRequest(RpcRequest rpcRequest) {
    InetSocketAddress inetSocketAddress = serviceDiscovery.lookupService(rpcRequest);
    try (Socket socket = new Socket()) {
        socket.connect(inetSocketAddress);
        ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
        // Send data to the server through the output stream
        objectOutputStream.writeObject(rpcRequest);
        ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream());
        // Read RpcResponse from the input stream
        return objectInputStream.readObject();
    } catch (IOException | ClassNotFoundException e) {
        throw new RpcException("调用服务失败:", e);
    }
}

3、lookupService方法。首先获得RPC请求中的服务名,Curator(zookeeper client),获得zookeeper客户端,zookeeper孩子结点。如果节点为空,则抛出没有服务异常。如果有服务,利用负载均衡类选择合适的服务器地址,并根据这个地址获得IP地址和端口号,放到InetSocketAddress对象后返回。

@Override
public InetSocketAddress lookupService(RpcRequest rpcRequest) {
    String rpcServiceName = rpcRequest.getRpcServiceName();
    CuratorFramework zkClient = CuratorUtils.getZkClient();
    List<String> serviceUrlList = CuratorUtils.getChildrenNodes(zkClient, rpcServiceName);
    if (CollectionUtil.isEmpty(serviceUrlList)) {
        throw new RpcException(RpcErrorMessageEnum.SERVICE_CAN_NOT_BE_FOUND, rpcServiceName);
    }
    // load balancing
    String targetServiceUrl = loadBalance.selectServiceAddress(serviceUrlList, rpcRequest);
    log.info("Successfully found the service address:[{}]", targetServiceUrl);
    String[] socketAddressArray = targetServiceUrl.split(":");
    String host = socketAddressArray[0];
    int port = Integer.parseInt(socketAddressArray[1]);
    return new InetSocketAddress(host, port);
}

4、selectServiceAddress方法,对边界条件判断,然后调用deSelect方法。

@Override
public String selectServiceAddress(List<String> serviceAddresses, RpcRequest rpcRequest) {
    if (CollectionUtil.isEmpty(serviceAddresses)) {
        return null;
    }
    if (serviceAddresses.size() == 1) {
        return serviceAddresses.get(0);
    }
    return doSelect(serviceAddresses, rpcRequest);
}

5、doelect方法,选择服务地址,参考dubbo使用的是一致性哈希算法,首先identityHashCode获取哈希值(这个函数获得的哈希值和hashCode一样,区别是hashCode可能被改写,但是identityHashCode不会变) ,尝试获取对应服务的selector,如果没有则新建一个selector,并调用selector的select方法。

@Override
protected String doSelect(List<String> serviceAddresses, RpcRequest rpcRequest) {
    int identityHashCode = System.identityHashCode(serviceAddresses);
    // build rpc service name by rpcRequest
    String rpcServiceName = rpcRequest.getRpcServiceName();
    ConsistentHashSelector selector = selectors.get(rpcServiceName);
    // check for updates
    if (selector == null || selector.identityHashCode != identityHashCode) {
        selectors.put(rpcServiceName, new ConsistentHashSelector(serviceAddresses, 160, identityHashCode));
        selector = selectors.get(rpcServiceName);
    }
    return selector.select(rpcServiceName + Arrays.stream(rpcRequest.getParameters()));
}

6、选择服务器地址。首先根据传进来的字符串获得md5码,再计算md5码的哈希值,tailMap返回键大于输入键的Entry,并选取第一个Entry。如果都小于输入的键,则返回第一个Entry。

public String select(String rpcServiceKey) {
    byte[] digest = md5(rpcServiceKey);
    return selectForKey(hash(digest, 0));
}
static long hash(byte[] digest, int idx) {
    return ((long) (digest[3 + idx * 4] & 255) << 24 | (long) (digest[2 + idx * 4] & 255) << 16 | (long) (digest[1 + idx * 4] & 255) << 8 | (long) (digest[idx * 4] & 255)) & 4294967295L;
}
public String selectForKey(long hashCode) {
    Map.Entry<Long, String> entry = virtualInvokers.tailMap(hashCode, true).firstEntry();

    if (entry == null) {
        entry = virtualInvokers.firstEntry();
    }

    return entry.getValue();
}

7、检查响应是否正确,如果响应为空、响应中的请求ID不等于发送请求的ID、响应中的状态码为空、状态码不等于成功状态。这些情况均抛出异常,异常中包含不同信息以及接口名。

private void check(RpcResponse<Object> rpcResponse, RpcRequest rpcRequest) {
    if (rpcResponse == null) {
        throw new RpcException(RpcErrorMessageEnum.SERVICE_INVOCATION_FAILURE, INTERFACE_NAME + ":" + rpcRequest.getInterfaceName());
    }

    if (!rpcRequest.getRequestId().equals(rpcResponse.getRequestId())) {
        throw new RpcException(RpcErrorMessageEnum.REQUEST_NOT_MATCH_RESPONSE, INTERFACE_NAME + ":" + rpcRequest.getInterfaceName());
    }

    if (rpcResponse.getCode() == null || !rpcResponse.getCode().equals(RpcResponseCodeEnum.SUCCESS.getCode())) {
        throw new RpcException(RpcErrorMessageEnum.SERVICE_INVOCATION_FAILURE, INTERFACE_NAME + ":" + rpcRequest.getInterfaceName());
    }
}

8、返回响应中的数据

return rpcResponse.getData();



2、 Zookeeper

zookeeper客户端删除多个结点

zkClient.delete().deletingChildrenIfNeeded().forPath("/my-rpc/" + rpcServiceName);



2.1 客户端访问zookeeper

1、获得zookeeper客户端

public static CuratorFramework getZkClient() {
    // check if user has set zk address
    Properties properties = PropertiesFileUtil.readPropertiesFile(RpcConfigEnum.RPC_CONFIG_PATH.getPropertyValue());
    String zookeeperAddress = properties != null && properties.getProperty(RpcConfigEnum.ZK_ADDRESS.getPropertyValue()) != null ? properties.getProperty(RpcConfigEnum.ZK_ADDRESS.getPropertyValue()) : DEFAULT_ZOOKEEPER_ADDRESS;
    // if zkClient has been started, return directly
    if (zkClient != null && zkClient.getState() == CuratorFrameworkState.STARTED) {
        return zkClient;
    }
    // Retry strategy. Retry 3 times, and will increase the sleep time between retries.
    RetryPolicy retryPolicy = new ExponentialBackoffRetry(BASE_SLEEP_TIME, MAX_RETRIES);
    zkClient = CuratorFrameworkFactory.builder()
            // the server to connect to (can be a server list)
            .connectString(zookeeperAddress)
            .retryPolicy(retryPolicy)
            .build();
    zkClient.start();
    try {
        // wait 30s until connect to the zookeeper
        if (!zkClient.blockUntilConnected(30, TimeUnit.SECONDS)) {
            throw new RuntimeException("Time out waiting to connect to ZK!");
        }
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return zkClient;
}

2、通过zookeeper客户端获得孩子结点,首先看map里有没有服务名,有则返回,没有则在zookeeper中获取服务路径,创建服务名和路径集合的键值对,并放到map中。重点是

zkClient.getChildren().forPath(servicePath)

public static List<String> getChildrenNodes(CuratorFramework zkClient, String rpcServiceName) {
    if (SERVICE_ADDRESS_MAP.containsKey(rpcServiceName)) {
        return SERVICE_ADDRESS_MAP.get(rpcServiceName);
    }
    List<String> result = null;
    String servicePath = ZK_REGISTER_ROOT_PATH + "/" + rpcServiceName;
    try {
        result = zkClient.getChildren().forPath(servicePath);
        SERVICE_ADDRESS_MAP.put(rpcServiceName, result);
        registerWatcher(rpcServiceName, zkClient);
    } catch (Exception e) {
        log.error("get children nodes for path [{}] fail", servicePath);
    }
    return result;
}

3、注册watcher,观察服务的变化

private static void registerWatcher(String rpcServiceName, CuratorFramework zkClient) throws Exception {
    String servicePath = ZK_REGISTER_ROOT_PATH + "/" + rpcServiceName;
    PathChildrenCache pathChildrenCache = new PathChildrenCache(zkClient, servicePath, true);
    PathChildrenCacheListener pathChildrenCacheListener = (curatorFramework, pathChildrenCacheEvent) -> {
        List<String> serviceAddresses = curatorFramework.getChildren().forPath(servicePath);
        SERVICE_ADDRESS_MAP.put(rpcServiceName, serviceAddresses);
    };
    pathChildrenCache.getListenable().addListener(pathChildrenCacheListener);
    pathChildrenCache.start();
}



版权声明:本文为qq_41523340原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。