基于netty、zookeeper手写RPC框架之五——心跳机制优化及添加负载均衡

  • Post author:
  • Post category:其他


本系列的第二篇,加入netty心跳机制,只是简单地描述了如何监听不活跃的服务端,即自定义一个标记服务是否活跃的规则及空闲链接的监听器,在标记为不活跃后,客户端应该怎么处理,服务端再次活跃了,客户端又如何处理,先看下这个图

monitor负责监控server们的是否活跃,如果不活跃,修改zk上对应节点的值,client则监听zk上的事件,当发生变动的节点的值为不活跃,则加进不活跃的优先队列,这里设置为优先队列是用来定时去除不活跃的节的,在执行任务时,如果不活跃时间最长的节点都没有达到不活跃最长的时长,则不进行删除,如果重新活跃了,就把他加进活跃节点的list里面,现在来看看如何实现。

首先看server和monitor之间的交互,新建一个类用来表示serverchannel的状态

/**
 * 用于表示serverchannel的状态类
 */
@Data
public class ChannelStatus {

    //重新活跃的时间
    private volatile long reActive;
    //是否活跃
    private volatile boolean active = true;
    //持续重新活跃的次数
    private AtomicInteger reActiveCount = new AtomicInteger(0);
    //持续不活跃的次数
    private AtomicInteger inActiveCount = new AtomicInteger(0);
    //对应的channelId
    private String channelId;

    //不活跃的时间
    private volatile long InActive;

    public ChannelStatus() {

    }

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;
        ChannelStatus that = (ChannelStatus) o;
        return reActive == that.reActive &&
                active == that.active &&
                InActive == that.InActive &&
                Objects.equals(reActiveCount, that.reActiveCount) &&
                Objects.equals(inActiveCount, that.inActiveCount);
    }

    @Override
    public int hashCode() {
        return Objects.hash(reActive, active, reActiveCount, inActiveCount, InActive);
    }

}

监控server的类,当不活跃时候,通知zk

/**
 * @author lulu
 * @Date 2019/11/18 22:29
 */
public class HeartbeatHandler extends ChannelInboundHandlerAdapter {

    private final static int MAX_IN_ACTIVE_COUNT = 3;
    private final static int COUNT_MINUTE = 2;
    private final static int MIN_RE_ACTIVE_COUNT = 3;

    //维护channelId和具体地址的map,当发生变化时对其进行删除
    private ConcurrentHashMap<String, ChannelStatus> channelUrlMap;


    public HeartbeatHandler(ConcurrentHashMap<String, ChannelStatus> map) {
        channelUrlMap = map;
    }


    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        String url = msg.toString();
        String id = ctx.channel().id().asShortText();
        System.out.println("收到channelId:" + id + "发来信息:" + url);
        ChannelStatus status;
        if ((status = channelUrlMap.get(url)) == null) {
            status = new ChannelStatus();
            status.setChannelId(id);
            channelUrlMap.put(url, status);
        } else {
            //如果收到不活跃的节点重连发来的信息,
            if (!status.isActive()) {
                //记录重连的心跳次数
                System.out.println(url + "尝试重连");
                int i = status.getReActiveCount().incrementAndGet();
                //第一次重连的话,记录重连时间
                if (i == 1) {
                    String s = ctx.channel().id().asShortText();
                    status.setChannelId(s);
                    status.setReActive(System.currentTimeMillis());
                    //如果大于最小重连心跳次数
                } else if (i >= MIN_RE_ACTIVE_COUNT) {
                    //计算重连阶段的时间
                    long minute = (System.currentTimeMillis() - status.getReActive()) / (1000 * 60) + 1;
                    //如果大于要求的时间,则是认为活跃
                    if (minute >= COUNT_MINUTE) {
                        status.setActive(true);
                        status.setInActiveCount(new AtomicInteger(0));
                        // 通知连接池重新加入该节点
                        updateOrRemove(url, ctx, true, ZKConsts.REACTIVE);
                    }
                }
            }
        }
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {

        if (evt instanceof IdleStateEvent) {

            IdleStateEvent state = (IdleStateEvent) evt;
            //在一定时间内读写空闲才会关闭链接
            if (state.state().equals(IdleState.ALL_IDLE)) {
                String s = ctx.channel().id().asShortText();
                Integer inActiveCount = 0;
                ChannelStatus channelStatus = null;
                String url = null;
                Object[] objects = getStatusValuesByChannelId(s);
                Assert.isTrue(objects != null && objects.length > 0, "该channelId没有东西");
                inActiveCount = (Integer) objects[0];
                channelStatus = (ChannelStatus) objects[1];
                url = (String) objects[2];
                if (inActiveCount == 1) {
                    channelStatus.setInActive(System.currentTimeMillis());
                }
                //1分钟内出现2次以上不活跃现象,有的话就把它去掉
                long minute = (System.currentTimeMillis() - channelStatus.getInActive()) / (1000 * 60) + 1;
                System.out.printf("第%s次不活跃,当前分钟%d%n", channelStatus.getInActiveCount().get(), minute);
                if (inActiveCount >= MAX_IN_ACTIVE_COUNT && minute <= COUNT_MINUTE) {
                    System.out.println("移除不活跃的ip" + channelStatus.toString());
                    //设置不活跃
                    channelStatus.setActive(false);
                    updateOrRemove(url, ctx, true, ZKConsts.INACTIVE);
                } else {
                    //重新计算,是活跃的状态
                    if (minute > COUNT_MINUTE) {
//                        System.out.println("新周期开始");
                        channelStatus.setActive(true);
                        channelStatus.setInActive(0);
                        channelStatus.setInActiveCount(new AtomicInteger(0));
                    }
                }

            }

        }
    }

    /**
     * 通过channelId获取server的状态信息
     *
     * @param channelId
     * @return
     */
    public Object[] getStatusValuesByChannelId(String channelId) {
        Iterator<Map.Entry<String, ChannelStatus>> iterator = channelUrlMap.entrySet().iterator();
        Integer inActiveCount = 0;
        ChannelStatus channelStatus = null;
        String url = null;
        System.out.println();
        while (iterator.hasNext()) {
            Map.Entry<String, ChannelStatus> next = iterator.next();
            ChannelStatus status = next.getValue();
            if (status.getChannelId().equals(channelId)) {
                channelStatus = status;
                url = next.getKey();
                inActiveCount = channelStatus.getInActiveCount().incrementAndGet();
                return new Object[]{inActiveCount, channelStatus, url};
            }
        }
        return null;
    }

    /**
     * 通过ID获取地址,并删除zk上相关的,用于心跳监听的类
     *
     * @param ctx
     */
    private void updateOrRemove(String url, ChannelHandlerContext ctx, Boolean update, String data) {
        //移除不活跃的节点
        RegisterForClient.getInstance().removeOrUpdate(url, update, data);
        //如果不为重新唤醒,则断开连接并且做相应的通知
        if (!data.equals(ZKConsts.REACTIVE)) {
            channelUrlMap.get(url).setChannelId(null);
            ctx.channel().close();
        }

    }


    //当出现异常时关闭链接
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        Object[] values = getStatusValuesByChannelId(ctx.channel().id().asShortText());
        updateOrRemove((String) values[2], ctx, false, null);
    }


}

心跳发送类

/**
 * @author lulu
 * @Date 2019/11/18 23:30
 * 服务端的发送心跳包类
 */
@Getter
public class BeatDataSender {
    //状态
    private String activeStatus;


    //负责定期发送心跳包的线程池
    private ScheduledExecutorService service;
    //失败后重连的线程池
    private ScheduledExecutorService retryConnect;
    private boolean reconnect = false;

    public BeatDataSender(String localAddress, String remoteIp, Integer remotePort, String serviceName) {
        service = Executors.newSingleThreadScheduledExecutor();
        retryConnect = Executors.newSingleThreadScheduledExecutor();
        this.send(localAddress, remoteIp, remotePort, serviceName);
        //如果重连了尝试重新发送心跳包
        retryConnect.scheduleAtFixedRate(() -> {
            if (activeStatus == ZKConsts.INACTIVE) {
                System.out.println("server尝试重连监控器");
                send(localAddress, remoteIp, remotePort, serviceName);
                activeStatus = ZKConsts.REACTIVE;
                reconnect = true;
            }
        }, 3, 3, TimeUnit.MINUTES);
    }

    public void close() {
        this.service.shutdown();
        this.retryConnect.shutdown();
    }


    public void send(String localAddress, String remoteIp, Integer remotePort, String serviceName) {
        EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            ChannelFuture connect = bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline().addLast(new StringEncoder())
                                    .addLast(new StringEncoder())
                                    .addLast(new ChannelInboundHandlerAdapter() {
                                        @Override
                                        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
                                            activeStatus = ZKConsts.INACTIVE;
                                            System.out.println("由于不活跃次数在2分钟内超过3次,链接被关闭");
                                            ctx.channel().close();
                                        }
                                    });

                        }
                    })
                    .connect(remoteIp, remotePort).addListener(new ChannelFutureListener() {
                        @Override
                        public void operationComplete(ChannelFuture future) throws Exception {
                            if (future.isSuccess()) {
                                System.out.println("心跳客户端绑定" + "hostname:" + remoteIp + "remotePort:" + remotePort);
                                future.channel().writeAndFlush(serviceName + "@" + localAddress);
                                //这里只是演示心跳机制不活跃的情况下重连,普通的做法只需要定时发送本机地址即可
                                //进入重连状态后,就稳定发送心跳包
                                service.scheduleAtFixedRate(() -> {
                                    if (future.channel().isActive()) {
                                        if (reconnect) {
                                            future.channel().writeAndFlush(serviceName + "@" + localAddress);
                                        }
                                    }
                                }, 30, 30, TimeUnit.SECONDS);
                            } else {
                                System.out.println("3s后重连");
                                TimeUnit.SECONDS.sleep(3);
                                //重新发送
                                send(localAddress, remoteIp, remotePort, serviceName);
                            }

                        }
                    });


        } catch (Exception e) {
            e.printStackTrace();
        }
    }


}

这里的心跳发送类主要作用是发送心跳,当处于不活跃状态时候,就尝试重新发送

server和monitor的交互大概就到这里

接下来就是zk和client的交互了,client要为每个服务下的节点设置监听并作出相应的处理,这里采用观察者模式对zk和client的连接池进行解耦,这里做了一个代码的拆分,把client的功能(RegisterForClient)和server(RegisterForServer)的功能分开了

首先看事件发布者的接口

/**
 * @author: lele
 * @date: 2019/11/22 下午3:29
 */
public interface NodeChangePublisher {

     /**
      * 事件标识
      */
     int inactive=0;
     int remove=1;
     int add=2;
     int reactive=3;


     void addListener(NodeChangeListener listener) ;

     void removeListener(NodeChangeListener listener) ;

     void notifyListener(int state, String path) ;

}

注册中心给client方的实现类

自定义线程工厂类,建造者模式


/**
 * @author lulu
 * @Date 2019/11/22 23:43
 */
public final class RpcThreadFactoryBuilder {


    private String namePrefix="default";

    private int priority=5;

    private boolean daemon=false;

    private String groupName="rpc";

    public RpcThreadFactoryBuilder setNamePrefix(String namePrefix){
        this.namePrefix=namePrefix;
        return this;
    }

    public RpcThreadFactoryBuilder setPriority(int priority){
        if(priority>10||priority<0){
            throw new UnsupportedOperationException("线程优先级设置不正确");
        }
        this.priority=priority;
        return this;
    }

    public RpcThreadFactoryBuilder setDaemon(boolean daemon){
        this.daemon=daemon;
        return this;
    }

    public RpcThreadFactoryBuilder setGroupName(String groupName){
        this.groupName=groupName;
        return this;
    }

    public ThreadFactory build(){
        return new BaseThreadFactory(this);
    }

    /**
     * 启动客户端链接的自定义线程工厂
     */
    static class BaseThreadFactory implements ThreadFactory {

        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;

        BaseThreadFactory(RpcThreadFactoryBuilder builder) {
            group = new ThreadGroup(builder.groupName);
            group.setDaemon(builder.daemon);
            group.setMaxPriority(builder.priority);
            namePrefix = builder.namePrefix+"-"+
                    poolNumber.getAndIncrement() +
                    "-thread-";
        }

        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r,
                    namePrefix + threadNumber.getAndIncrement(),
                    0);
            return t;
        }
    }

}

,这里主要是结合线程池来提交通知任务

/**
 * @author lulu
 * @Date 2019/11/23 22:49
 *
 */
public class RegisterForClient implements NodeChangePublisher {

    private CuratorFramework client = null;

    private List<PathChildrenCache> nodeListenList = new ArrayList<>();

    private List<NodeChangeListener> nodeChangeListeners = new ArrayList<>();

    private ThreadPoolExecutor notifyPool = new ThreadPoolExecutor(
            16, 16, 5, TimeUnit.MINUTES, new ArrayBlockingQueue<>(1024)
            , new RpcThreadFactoryBuilder().setNamePrefix("notifyPool").build()
    );




    private static class Holder {
        private static final RegisterForClient j = new RegisterForClient();
    }

    public static RegisterForClient getInstance() {
        return Holder.j;
    }

//添加监听者
    private RegisterForClient() {
        client = ZkUtils.getClient();
        this.addListener(new NodeChangeListener.AddServer());
        this.addListener(new NodeChangeListener.RemoveServer());
        this.addListener(new NodeChangeListener.InactiveServer());
        this.addListener(new NodeChangeListener.ReActiveServer());
    }



    /**
     * 获取所有的url
     *
     * @return
     */
    public Map<String, List<URL>> getAllURL() {
        Map<String, List<URL>> mapList = null;
        try {
            List<String> servcieList = client.getChildren().forPath("/");

            mapList = new HashMap<>(servcieList.size());
            for (String s : servcieList) {
                //返回对应的service及其可用的url
                mapList.put(s, getService(s));
                //为每个服务添加监听
                addListenerForService(s);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        return mapList;
    }

    private void addListenerForService(String serviceName) throws Exception {
        //设置监听,监听所有服务下的节点变化,连接管理收到通知后移除相应的节点
        final PathChildrenCache childrenCache = new PathChildrenCache(client, ZkUtils.getPath(serviceName), true);
        nodeListenList.add(childrenCache);
        //同步初始监听节点
        childrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
        childrenCache.getListenable().addListener(new PathChildrenCacheListener() {
            @Override
            public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
                if (event.getType().equals(PathChildrenCacheEvent.Type.INITIALIZED)) {
                    //建立完监听
                    return;
                }
                if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)) {
                    String path = event.getData().getPath();
                    notifyPool.submit(() -> {
                        System.out.println("删除远程服务端节点:" + path);
                        notifyListener(NodeChangePublisher.remove, path);
                    });
                }
                if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)) {
                    String path = event.getData().getPath();
                    notifyPool.submit(() -> {
                        byte[] status = event.getData().getData();
                        String serverStatus= new String(status);
                        if (serverStatus.equals(ACTIVE)) {
                            notifyPool.submit(() -> {
                                System.out.println("远程服务端上线事件:" + NodeChangePublisher.add + path);
                                notifyListener(NodeChangePublisher.add, path);
                            });
                        } else if (serverStatus.equals(ZKConsts.INACTIVE)) {
                            //失效事件
                            notifyPool.submit(() -> {
                                System.out.println("远程服务端下线事件:" + NodeChangePublisher.inactive + path);
                                notifyListener(NodeChangePublisher.inactive, path);
                            });
                        } else if (serverStatus.equals(ZKConsts.REACTIVE)) {
                            notifyPool.submit(() -> {
                                System.out.println("远程服务端重新上线事件:" + path);
                                notifyListener(NodeChangePublisher.reactive, path);
                            });
                        }
                    });

                }
            }
        });
    }


    public List<URL> getService(String serviceName) {
        List<URL> urls = null;
        try {
            List<String> urlList = client.getChildren().forPath(ZkUtils.getPath(serviceName));
            if (urlList != null) {
                urls = new ArrayList<>(urlList.size());
            }
            for (String s : urlList) {
                String[] url = s.split(":");
                urls.add(new URL(url[0], Integer.valueOf(url[1])));
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        return urls;
    }

    //hostname:port,遍历所有interface节点,把对应的url节点去掉,或者标记为不活跃的状态
    public void removeOrUpdate(String sl, Boolean update, String data) {
        String[] serviceUrl = sl.split("@");
        try {
            String url = serviceUrl[1];
            String anInterface = serviceUrl[0];
            List<String> urlList = client.getChildren().forPath(ZkUtils.getPath(anInterface));
            for (String s : urlList) {
                if (s.equals(url)) {
                    if (update) {
                        client.setData().forPath(ZkUtils.getPath(anInterface, url), data.getBytes());
                    } else {
                        client.delete().forPath(ZkUtils.getPath(anInterface, url));
                    }
                }
            }

        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    //同步模式下使用,可以当作废弃
    public URL random(String serviceName) {

        //通过服务名获取具体的url
        try {
            List<String> urlList = client.getChildren().forPath(ZkUtils.getPath(serviceName));
            String[] url = urlList.get(0).split(":");
            return new URL(url[0], Integer.valueOf(url[1]));
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;

    }

    public void close() {
        ZkUtils.closeZKClient(client);
        nodeListenList.forEach(e -> {
            try {
                e.close();
            } catch (IOException e1) {
                e1.printStackTrace();
            }
        });
    }

    @Override
    public void addListener(NodeChangeListener listener) {
        nodeChangeListeners.add(listener);
    }

    @Override
    public void removeListener(NodeChangeListener listener) {
        nodeChangeListeners.remove(listener);
    }

    @Override
    public void notifyListener(int state, String path) {
        int i = path.lastIndexOf("/");
        String serviceName = path.substring(1, i);
        String[] split = path.substring(i + 1).split(":");
        URL url = new URL(split[0], Integer.valueOf(split[1]));
        for (NodeChangeListener nodeChangeListener : nodeChangeListeners) {
            nodeChangeListener.change(state, url, serviceName);
        }
    }
}

然后到监听者类,对应服务上线,服务不活跃移除,服务再次活跃重新加入,下线这四个事件

/**
 * @author: lele
 * @date: 2019/11/22 下午3:26
 */
public interface NodeChangeListener {

    ConnectManager connect=ConnectManager.getInstance();

    //相应的处理
    void change(int state,URL url,String serviceName);




    class AddServer implements NodeChangeListener{

        @Override
        public void change(int state, URL url,String serviceName) {
            if(state==NodeChangePublisher.add){
                System.out.println(Thread.currentThread().getName()+"addNode的listern事件被触发");
                connect.addServerAfter(url,serviceName);
            }
        }
    }

    class ReActiveServer implements NodeChangeListener{

        @Override
        public void change(int state, URL url, String serviceName) {
            if(state==NodeChangePublisher.reactive){
                System.out.println("reActive的listern事件被触发");
                connect.reAddActiveURL(url,serviceName);
            }
        }
    }

    class InactiveServer implements NodeChangeListener{

        @Override
        public void change(int state, URL url, String serviceName) {
            if(state==NodeChangePublisher.inactive){
                System.out.println("InActive的listern事件被触发");

                connect.addInactiveURL(url,serviceName);
            }
        }
    }

    class RemoveServer implements NodeChangeListener{

        @Override
        public void change(int state, URL url, String serviceName) {
            if(state==NodeChangePublisher.remove){
                System.out.println("RemovServer的listern事件被触发");
                connect.removeURL(url,serviceName,true);
            }
        }
    }



}

client的连接池类

/**
 * @author: lele
 * @date: 2019/11/21 上午11:58
 * 管理连接池
 * 服务端在注册后,不一定可以获得,因为还没提供服务,需要zk设置节点 状态为Active
 * //todo 定时更新链接
 */

public class ConnectManager  {


    private Boolean isShutDown = false;

    private ScheduledExecutorService removeInactiveTask;
    private final Random random = new Random();

    /**
     * 客户端链接服务端超时时间
     */
    private long connectTimeoutMillis = 6000;

    /**
     * 不活跃的链接存活时间,单位ms,这里表示5分钟不活跃就去掉
     */
    private long maxInActiveTime = 1000 * 60 * 5;

    /**
     * 自定义6个线程组用于客户端服务
     */
    private EventLoopGroup eventLoopGroup = new NioEventLoopGroup(6);
    /**
     * 标示非init状态下,addServerAfter才能起作用
     */
    private CountDownLatch serverInitCountDownLatch;

    /**
     * 存放服务对应的访问数,用于轮询
     */
    private Map<String, AtomicInteger> pollingMap = new ConcurrentHashMap<>();

    /**
     * 对于每个服务都有一个锁,每个锁都有一个条件队列,用于控制链接获取以及添加链接
     */
    private Map<String, Object[]> serviceCondition = new ConcurrentHashMap<>();


    /**
     * 新增/删除链接时的锁
     */
    private Map<String, ReentrantLock[]> addOrRemoveConnectionLock = new ConcurrentHashMap<>();

    /**
     * 新增/删除不活跃链接时的锁
     */
    private Map<String, ReentrantLock[]> addOrRemoveInactiveLock = new ConcurrentHashMap<>();


    /**
     * 存放服务端地址和handler的关系
     */
    private Map<String, List<NettyAsynHandler>> serverClientMap = new ConcurrentHashMap<>();


    /**
     * 存放不活跃的服务端地址和handler的关系,当活跃时添加回正式的handler
     */
    private Map<String, PriorityQueue<NettyAsynHandler>> inactiveClientMap = new ConcurrentHashMap<>();

    /**
     * 用来初始化客户端
     */
    private ThreadPoolExecutor clientBooter = new ThreadPoolExecutor(
            16, 16, 600, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(1024)
            , new RpcThreadFactoryBuilder().setNamePrefix("clientBooter").build(), new ThreadPoolExecutor.AbortPolicy());


    private static class Holder {
        private static final ConnectManager j = new ConnectManager();
    }

    private ConnectManager() {
        //初始化时把所有的url加进去,这里可能没有可用链接,所以需要添加对节点的监听

        Map<String, List<URL>> allURL = RegisterForClient.getInstance().getAllURL();
        for (String s : allURL.keySet()) {
            //为每个服务添加锁和条件队列,通过条件队列控制客户端链接获取
            addLockToService(s);
        }
        addServerInit(allURL);
        //定时清理不用的链接
        removeInactiveTask = Executors.newSingleThreadScheduledExecutor();
        removeInactiveTask.scheduleAtFixedRate(() -> removeInactiveURL(), 10, 10, TimeUnit.MINUTES);
    }

    //为每个服务添加对应的锁
    private void addLockToService(String serviceName) {
        ReentrantLock lock = new ReentrantLock();
        Condition getConnection = lock.newCondition();
        //获取可用客户端的链接及条件队列
        serviceCondition.put(serviceName, new Object[]{lock, getConnection});
        //为创建客户端链接添加锁
        ReentrantLock addConnection = new ReentrantLock();
        ReentrantLock removeConnection = new ReentrantLock();
        addOrRemoveConnectionLock.put(serviceName, new ReentrantLock[]{addConnection, removeConnection});

        ReentrantLock addInactive = new ReentrantLock();
        ReentrantLock removeInactive = new ReentrantLock();
        addOrRemoveInactiveLock.put(serviceName, new ReentrantLock[]{addInactive, removeInactive});
    }

    public static ConnectManager getInstance() {

        return Holder.j;
    }


    /**
     * 添加该服务对应的链接和handler
     *
     * @param serviceName
     * @param handler     由于创建客户端链接的线程都会访问这段代码,这里也会存在并发情况,不然会导致多个server上线后,获取异常
     */
    public void addConnection(String serviceName, NettyAsynHandler handler) {
        ReentrantLock lock = addOrRemoveConnectionLock.get(serviceName)[0];
        lock.lock();
        List<NettyAsynHandler> nettyAsynHandlers;
        if (!serverClientMap.containsKey(serviceName)) {
            nettyAsynHandlers = new ArrayList<>();
        } else {
            nettyAsynHandlers = serverClientMap.get(serviceName);
        }
        nettyAsynHandlers.add(handler);
        //添加服务名和对应的url:客户端链接
        serverClientMap.put(serviceName, nettyAsynHandlers);
        //如果处于初始化状态,则countdown防止新增节点事件再次新增客户端
        if (serverInitCountDownLatch.getCount() != 0) {
            System.out.println("连接池初始化新建客户端链接:" + handler.getUrl());
            serverInitCountDownLatch.countDown();
        } else {
            System.out.println("连接池初始化后新建客户端链接:" + handler.getUrl());
        }

        //唤醒等待客户端链接的线程
        signalAvailableHandler(serviceName);
        lock.unlock();
    }

//通过对应的负载均衡策略挑选可用客户端连接
public NettyAsynHandler chooseHandler(String serviceName,Integer mode){
    List<NettyAsynHandler> handlers = mayWaitBeforeGetConnection(serviceName);
    NettyAsynHandler choose = FetchPolicy.getPolicyMap().get(mode).choose(serviceName, handlers);
    return choose;
}

//等待可用的客户端连接
  private  List<NettyAsynHandler> mayWaitBeforeGetConnection(String serviceName) {
        List<NettyAsynHandler> nettyAsynHandlers = serverClientMap.get(serviceName);
        int size = 0;
        //先尝试获取
        if (nettyAsynHandlers != null) {
            size = nettyAsynHandlers.size();
        }
        //不行就自选等待
        while (!isShutDown && size <= 0) {
            try {
                //自旋等待可用服务出现,因为客户端与服务链接需要一定的时间,如果直接返回会出现空指针异常
                boolean available = waitingForHandler(serviceName);
                if (available) {
                    nettyAsynHandlers = serverClientMap.get(serviceName);
                }
            } catch (InterruptedException e) {
                throw new RuntimeException("出错", e);
            }
        }
        return nettyAsynHandlers;
    }





    /**
     * 等待一定时间,等handler和相应的server建立建立链接,用条件队列控制
     *
     * @param serviceName
     * @return
     * @throws InterruptedException
     */
    private boolean waitingForHandler(String serviceName) throws InterruptedException {
        Object[] objects = serviceCondition.get(serviceName);
        ReentrantLock lock = (ReentrantLock) objects[0];
        lock.lock();
        Condition condition = (Condition) objects[1];
        try {
            return condition.await(this.connectTimeoutMillis, TimeUnit.MILLISECONDS);
        } finally {
            lock.unlock();
        }
    }

    /**
     * 去掉所有与该url链接的客户端,并且关闭客户端链接
     *
     * @param url
     */
    public void removeURL(URL url, String serviceName, boolean close) {
        ReentrantLock lock = addOrRemoveConnectionLock.get(serviceName)[1];
        lock.lock();
        NettyAsynHandler target = null;
        //倒序遍历删除对应的handler
        List<NettyAsynHandler> nettyAsynHandlers = serverClientMap.get(serviceName);
        for (int i = nettyAsynHandlers.size() - 1; i >= 0; i--) {
            if ((target = nettyAsynHandlers.get(i)).getUrl().equals(url)) {
                nettyAsynHandlers.remove(i);
                if (close) {
                    target.close();
                }
            }
        }
        System.out.println("active:" + serverClientMap.get(serviceName).toString());
        lock.unlock();
    }

    /**
     * 定时清除不活跃的链接
     */
    public void removeInactiveURL() {
        /**
         * 移除不活跃列表
         */
        Collection<PriorityQueue<NettyAsynHandler>> values = inactiveClientMap.values();
        Iterator<PriorityQueue<NettyAsynHandler>> iterator = values.iterator();
        while (iterator.hasNext()) {
            PriorityQueue<NettyAsynHandler> list = iterator.next();
            //遍历所有客户端并根据超时时间删除
            NettyAsynHandler target;
            long current = System.currentTimeMillis();
            while ((current - (target = list.peek()).getInActiveTime()) > maxInActiveTime) {
                list.poll();
                target.close();
            }
        }


    }


    /**
     * 去掉可用的服务,把他加入到不活跃的列表
     * 由于是通过线程异步操作,可能存在并发问题
     *
     * @param url
     */
    public void addInactiveURL(URL url, String serviceName) {
        ReentrantLock lock = addOrRemoveInactiveLock.get(serviceName)[0];
        lock.lock();
        System.out.println("不活跃链接加入_" + url.toString());
        List<NettyAsynHandler> nettyAsynHandlers = serverClientMap.get(serviceName);
        NettyAsynHandler inActive = null;
        for (NettyAsynHandler nettyAsynHandler : nettyAsynHandlers) {
            if (nettyAsynHandler.getUrl().equals(url)) {
                nettyAsynHandler.setInActiveTime(System.currentTimeMillis());
                inActive = nettyAsynHandler;
                break;
            }
        }
        PriorityQueue<NettyAsynHandler> inActiveHandlers = null;
        if ((inActiveHandlers = inactiveClientMap.get(serviceName)) == null) {
            inActiveHandlers = new PriorityQueue<>();
        }
        inActiveHandlers.offer(inActive);
        inactiveClientMap.put(serviceName, inActiveHandlers);
        System.out.println("inactive:" + inactiveClientMap.get(serviceName).toString());

        lock.unlock();
        //删除url
        removeURL(url, serviceName, false);
    }

    /**
     * 重新添加进活跃队列
     *
     * @param url
     * @param serviceName
     */
    public void reAddActiveURL(URL url, String serviceName) {
        ReentrantLock lock = addOrRemoveInactiveLock.get(serviceName)[1];
        lock.lock();
        PriorityQueue<NettyAsynHandler> list;
        if ((list = inactiveClientMap.get(serviceName)) != null) {
            Iterator<NettyAsynHandler> iterator = list.iterator();
            NettyAsynHandler nettyAsynHandler;
            while (iterator.hasNext()) {
                nettyAsynHandler = iterator.next();
                if (nettyAsynHandler.getUrl().equals(url)) {
                    nettyAsynHandler.setInActiveTime(0);
                    addConnection(serviceName, nettyAsynHandler);
                    list.remove(nettyAsynHandler);

                    System.out.printf("%s服务下的%s重新添加进活跃队列%n", serviceName, nettyAsynHandler.toString());
                    break;
                }
            }
        }
        lock.unlock();
    }


    /**
     * 释放对应服务的条件队列,代表有客户端链接可用了
     *
     * @param serviceName
     */
    private void signalAvailableHandler(String serviceName) {
        Object[] objects = serviceCondition.get(serviceName);
        ReentrantLock lock = (ReentrantLock) objects[0];
        lock.lock();
        Condition condition = (Condition) objects[1];
        try {
            condition.signalAll();
        } finally {
            lock.unlock();
        }
    }

    /**
     * 添加server,并启动对应的服务器
     *
     * @param allURL
     */
    public void addServerInit(Map<String, List<URL>> allURL) {
        Collection<List<URL>> values = allURL.values();

        Iterator<List<URL>> iterator = values.iterator();
        int res = 0;
        while (iterator.hasNext()) {
            List<URL> next = iterator.next();
            res += next.size();
        }
        serverInitCountDownLatch = new CountDownLatch(res);
        for (String s : allURL.keySet()) {
            pollingMap.put(s, new AtomicInteger(0));
            List<URL> urls = allURL.get(s);
            for (URL url : urls) {
                //提交创建任务
                clientBooter.submit(new Runnable() {
                    @Override
                    public void run() {
                        createClient(s, eventLoopGroup, url);
                    }
                });
            }
        }
    }

    /**
     * 当新节点出现后添加,但这里有个隐患,就是当client尚未监听完所有节点时
     * addServerAfter是不允许操作的
     *
     * @param url
     * @param serviceName
     */
    public void addServerAfter(URL url, String serviceName) {

        if (serverInitCountDownLatch.getCount() == 0) {
            //如果还没监听完,就不可以加链接
            List<NettyAsynHandler> list = null;
            if ((list = serverClientMap.get(serviceName)) == null) {
                list = new ArrayList<>();
                serverClientMap.put(serviceName, list);
                addLockToService(serviceName);
            } else {
                boolean exists = list.stream().filter(e -> e.getUrl().equals(url)).findFirst().isPresent();
                if (exists) {
                    return;
                }
            }
            clientBooter.submit(new Runnable() {
                @Override
                public void run() {
                    createClient(serviceName, eventLoopGroup, url);
                }
            });
        }

    }


    /**
     * 创建客户端,持久化链接
     *
     * @param serviceName
     * @param eventLoopGroup
     * @param url
     */
    public void createClient(String serviceName, EventLoopGroup eventLoopGroup, URL url) {
        System.out.println(Thread.currentThread().getName() + "准备新建客户端");
        Bootstrap b = new Bootstrap();
        b.group(eventLoopGroup)
                .channel(NioSocketChannel.class)
                .handler((new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline()
                                //把request实体变为字节
                                .addLast(new RpcEncoder(RpcRequest.class))
                                //把返回的response字节变为对象
                                .addLast(new RpcDecoder(RpcResponse.class))
                                .addLast(new NettyAsynHandler(url));
                    }
                }));

        ChannelFuture channelFuture = b.connect(url.getHostname(), url.getPort());


        channelFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture channelFuture) throws Exception {
                //链接成功后的操作,把相应的url地址和客户端链接存入
                if (channelFuture.isSuccess()) {
                    NettyAsynHandler handler = channelFuture.channel().pipeline().get(NettyAsynHandler.class);
                    addConnection(serviceName, handler);
                }
            }
        });
    }


    /**
     * 关闭方法,关闭每个客户端链接,释放所有锁,关掉创建链接的线程池,和客户端的处理器
     */
    public void stop() {
        isShutDown = true;
        serverClientMap.values().forEach(e -> e.forEach(k -> k.close()));
        inactiveClientMap.values().forEach(e -> e.forEach(k -> k.close()));
        for (String s : serviceCondition.keySet()) {
            signalAvailableHandler(s);
        }

        clientBooter.shutdown();
        eventLoopGroup.shutdownGracefully();
    }


}

连接池基本都有注释,所以这里不做过多讲解,下面到负载均衡策略

这里定义了四个:随机、轮询、权重随机、最少请求,具体实现

/**
 * @author: lele
 * @date: 2019/11/22 下午3:24
 * 获取链接机制,轮询、随机、权重
 */
public interface FetchPolicy {
    Random RANDOM = new Random();
    int random = 1;
    int polling = 2;
    int weight = 3;
    int bestRequest = 4;
    //策略类
    Map<Integer, FetchPolicy> policyMap = new HashMap<>();
    static Map<Integer, FetchPolicy> getPolicyMap() {
        policyMap.put(random, new RandomFetch());
        policyMap.put(polling, new PollingFetch());
        policyMap.put(weight, new WeightFetch());
        policyMap.put(bestRequest, new BestRequestFetch());
        return policyMap;
    }

    NettyAsynHandler choose(String serviceName, List<NettyAsynHandler> handlers);


    class WeightFetch implements FetchPolicy {
        @Override
        public NettyAsynHandler choose(String serviceName, List<NettyAsynHandler> handlers) {
            int length = handlers.size();
            //总权重
            int totalWeight = 0;
            //是否权重一致
            boolean sameWeight = true;
            //先把所有权重加起来,并且判断权重是否一致
            for (int i = 0; i < length; i++) {
                int weight = handlers.get(i).getWeight();
                totalWeight += weight;
                if (sameWeight && i > 0
                        && weight != handlers.get(i - 1).getWeight()) {
                    sameWeight = false;
                }
            }
            //不断减去对应权重所在的区间
            if (totalWeight > 0 && !sameWeight) {
                int offset = RANDOM.nextInt(totalWeight);
                for (int i = 0; i < length; i++) {
                    offset -= handlers.get(i).getWeight();
                    if (offset < 0) {
                        return handlers.get(i);
                    }
                }
            }
            // 如果权重都一样,则轮询返回
            return FetchPolicy.getPolicyMap().get(polling).choose(serviceName,handlers);
        }
    }

    /**
     * 主要通过NettyAsynHandler的requestCount属性挑取最小请求的handler进行返回
     */
    class BestRequestFetch implements FetchPolicy {

        @Override
        public NettyAsynHandler choose(String serviceName, List<NettyAsynHandler> handlers) {
            int minRequest = Integer.MAX_VALUE;
            NettyAsynHandler res = null;
            for (NettyAsynHandler handler : handlers) {
                if (handler.getRequestCount().get() < minRequest) {
                    res = handler;
                }
            }

            if(res==null){
                // 如果找不到,则轮询返回
                return FetchPolicy.getPolicyMap().get(polling).choose(serviceName,handlers);
            }
            return res;
        }
    }

    /**
     * 记录每个服务对应的请求次数,并返回对应的handler
     */
    class PollingFetch implements FetchPolicy {
        private static Map<String, AtomicInteger> pollingMap = new ConcurrentHashMap<>();

        @Override
        public NettyAsynHandler choose(String serviceName, List<NettyAsynHandler> handlers) {
            if (pollingMap.get(serviceName) == null) {
                pollingMap.put(serviceName, new AtomicInteger(0));
            }
            int next = pollingMap.get(serviceName).incrementAndGet();
            int index = RANDOM.nextInt(next);
            return handlers.get(index);
        }
    }

    /**
     * 随机
     */
    class RandomFetch implements FetchPolicy {

        @Override
        public NettyAsynHandler choose(String serviceName, List<NettyAsynHandler> handlers) {

            int index = RANDOM.nextInt(handlers.size());
            //取出相应的handler
            NettyAsynHandler nettyAsynHandler = handlers.get(index-1);
           
            return nettyAsynHandler;
        }
    }

}

然后spring扫描相关的注解和代理工厂要做相关的改动

@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)

//用于接口上,name为服务名,zk则在注册服务改为 服务名/ip,服务端通过传来的接口名通过反射获取类,或者通过给spring托管获取其class
public @interface RpcStudyClient {
    String name();
    //结果返回是异步还是同步模式
    int mode() default sync;
    int fetch() default FetchPolicy.polling;
    int sync=0;
    int asyn=1;

}

其他的相关改动

//代理工厂的改动 
RpcRequest rpcRequest = new RpcRequest(requestId, interfaceClass.getName(), method.getName(), args, method.getParameterTypes(), annotation.mode());
                //发送请求
                //这里的管理连接池通过服务名去访问zk,获取可用的url
                RpcFuture res = protocol.sendFuture(annotation.fetch(),annotation.name(), rpcRequest);

//发送请求的改动
 public RpcFuture sendFuture(int fetch,String serviceName, RpcRequest request) {
        NettyAsynHandler handler = ConnectManager.getInstance().chooseHandler(serviceName,fetch);
        RpcFuture future = handler.sendRequest(request);
        return future;
    }
//连接池改动
public NettyAsynHandler chooseHandler(String serviceName,Integer mode){
    List<NettyAsynHandler> handlers = mayWaitBeforeGetConnection(serviceName);
    NettyAsynHandler choose = FetchPolicy.getPolicyMap().get(mode).choose(serviceName, handlers);
    return choose;
}

心跳机制演示

这里客户端启动、监控器启动、服务端启动

然后服务端与监控器建立连接后,监控器对不活跃的节点进行记录,然后发送到zk端,client通过zk对相关的客户端链接加入到不活跃节点,而server则尝试重连,当监控器觉得该节点活跃了,就发到zk端让他通知client把他加入到活跃队列

监控器:

后来活跃了

client:

服务端

然后演示不活跃节点移除,这里我直接在zk上修改该server为不活跃,看一段时间后有无移除该服务,这里把检查任务的间隙调小一点,

执行 set /register/user/DESKTOP-QU1B3IU:8081 0 后,客户端把他加入不活跃链接,8082也是同样的操作,因为8081比8082下线时间早,所以先移除8081

负载均衡的就不演示了,有兴趣可以自己尝试,手写RPC系列大概就到这里,后续如果有更新就在github上面提交了,经过这段时间的代码编写,从一开始模仿别人的,到根据实际的情况把自己的想法写进去,还是学到挺多东西的,并发,负载均衡实现,spring相关的注解用法,zk、netty一些简单应用,项目地址:

https://github.com/97lele/rpcstudy

本系列参考资料:

关于netty相关的可以查看相关书籍或博客,书籍比较全一点

整体结构:

https://www.bilibili.com/video/av75673208

连接池和异步:

https://www.cnblogs.com/luxiaoxun/p/5272384.html

spring仿feign:

https://github.com/admin801122/RPCServer

心跳简介:

https://www.cnblogs.com/demingblog/p/9957143.html

负载均衡:

https://blog.csdn.net/wudiyong22/article/details/80829808



版权声明:本文为weixin_37703281原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。