Zookeeper中leader选举原理分析

  • Post author:
  • Post category:其他


Leader选举中涉及的概念



服务器角色
  • leader 处理读写请求
  • follower 参与leader选举,只能处理读请求并可将事务请求转交给leader来处理
  • observer 不参与leader选举,只处理读请求


服务器状态
  • LOOKING 服务器正处在选举状态 说明集群还在投票选举 还没有选出leader
  • LEADING 服务器处于leading状态,当前server角色是leader
  • FOLLOWING 服务器处于following状态,表示当前server的角色是follower
  • OBSERVING 服务器处于observing状态,当前server角色是observer


几个属性


myid

这个代表服务器ID,其值例如1、2、3等,在$ZK_HOME/data/myid文件中配置。myid值越大 在leader选举过程中的权重越大


zxid

最近一次处理成功的事务ID,zxid越大 说明数据越新,在leader选举过程中的权重越大

例如下面例子中的 mZxid

[zk: localhost:2181(CONNECTED) 3] create /wojiushiwo 123
Created /wojiushiwo
[zk: localhost:2181(CONNECTED) 7] stat /wojiushiwo
cZxid = 0x1400000002
ctime = Wed Aug 03 10:55:58 CST 2022
mZxid = 0x1400000002
mtime = Wed Aug 03 10:55:58 CST 2022
pZxid = 0x1400000002
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 3
numChildren = 0
[zk: localhost:2181(CONNECTED) 8] set /wojiushiwo 1234
cZxid = 0x1400000002
ctime = Wed Aug 03 10:55:58 CST 2022
mZxid = 0x1400000003
mtime = Wed Aug 03 10:56:59 CST 2022
pZxid = 0x1400000002
cversion = 0
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 4
numChildren = 0


epoch

代表投票轮数,从0开始,每投完一次票 投票轮数+1。一般而言 同一轮投票的epoch是相同的,当然也有不同的情况,那就是有节点宕机后重新参与leader选举。



源码分析

以zookeeper3.6.1版本为例 分析下Leader选举过程

zookeeper3.6.1 Leader选举算法的实现类为FastLeaderElection

在这里插入图片描述

对流程图一些调用进行分析

1、QuorumPeer是个线程实现类,其run方法中有leader选举的操作

2、WorkerSender#run 将当前节点票据从sendqueue队列取出 广播给集群中的节点

3、WorkerReceiver#run 收到集群中其他节点的票据 将其放到recvqueue队列 用于PK投票



FastLeaderElection
    // 与集群中其他节点进行通信
		QuorumCnxManager manager;
    //发送票据的队列
    LinkedBlockingQueue<ToSend> sendqueue;
		//接受票据的队列
    LinkedBlockingQueue<Notification> recvqueue;
		//当前节点
    QuorumPeer self;
    Messenger messenger;
		//逻辑时钟 代表选举轮数 
    AtomicLong logicalclock = new AtomicLong(); /* Election instance */
		//数据id 对应myid
    long proposedLeader;
		//事务id
    long proposedZxid;
		//选举轮数
    long proposedEpoch;
    volatile boolean stop;
    private SyncedLearnerTracker leadingVoteSet;
public Vote lookForLeader() throws InterruptedException {
       //省略无关代码

        self.start_fle = Time.currentElapsedTime();
        try {
            /*
             * 存储接收到的投票 key是节点id,即myid
             */
            Map<Long, Vote> recvset = new HashMap<Long, Vote>();
           
            int notTimeout = minNotificationInterval;

            synchronized (this) {
                //逻辑时钟+1 表示开启新一轮投票
                logicalclock.incrementAndGet();
                //更新当前节点的票据
                //getInitId 从myid中取值
                //getInitLastLoggedZxid 获得上一次成功执行的事务id
                // getPeerEpoch 从currentEpoch文件中取值
                updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
            }

           //省略日志
           //发送票据 这里是自己给自己发 先投自己一票
            sendNotifications();

            SyncedLearnerTracker voteSet;

            /*
             * Loop in which we exchange notifications until we find a leader
             */
						//当集群刚启动时,集群节点状态是LOOKING 正忙着leader选举
            while ((self.getPeerState() == ServerState.LOOKING) && (!stop)) {

                //前面说过 recvqueue队列存储其他节点广播过来的票据
                //这里 取出其他节点的票据
                Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS);
               
                /*
                 * Sends more notifications if haven't received enough.
                 * Otherwise processes new notification.
                 */
                //当没有取到票据时 
                if (n == null) {
                  	//如果queueSendMap集合为空 说明票据都发送出去了
                    if (manager.haveDelivered()) {
                      //发送一次票据
                      //笔者不太理解这里,为什么要再发一次票据 而且这票据还是当前节点的票据,前面不是发过了吗?
                        sendNotifications();
                    } else {
                      //如果queueSendMap集合中还有数据 说明还有票据没有发送出去 可能集群连接断开了 需要重新连接
                        manager.connectAll();
                    }

                    /*
                     * 由于没有从recvqueue队列取到票据 这里适当延长超时时间,再次尝试
                     */
                    int tmpTimeOut = notTimeout * 2;
                    notTimeout = Math.min(tmpTimeOut, maxNotificationInterval);
                    LOG.info("Notification time out: {}", notTimeout);
                } else if (validVoter(n.sid) && validVoter(n.leader)) {
                  //当取到了票据后 ,这里else if中的逻辑判断是:当前节点的myid是否属于集群节点
                    
                    switch (n.state) {
                        //集群启动时 只有节点状态是LOOKING的 才有资格参与选举
                        case LOOKING:
                            if (getInitLastLoggedZxid() == -1) {
                                LOG.debug("Ignoring notification as our zxid is -1");
                                break;
                            }
                            if (n.zxid == -1) {
                                LOG.debug("Ignoring notification from member with -1 zxid {}", n.sid);
                                break;
                            }
                            //当接收到的选票 轮次高于当前节点选票轮次 说明当前节点"落后了"
                            if (n.electionEpoch > logicalclock.get()) {
                                System.out.println(">");
                                //重新设置当前节点选票轮次
                                logicalclock.set(n.electionEpoch);
                                //并清空接受到的票据集合(改朝换代了 从头开始)
                                recvset.clear();
                                //比较当前票据与其他节点的票据 比出高低后 重新为当前票据赋值
                                if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
                                    updateProposal(n.leader, n.zxid, n.peerEpoch);
                                } else {
                                    updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
                                }
                                //然后重新发送最新票据
                                sendNotifications();
                            } else if (n.electionEpoch < logicalclock.get()) {
                                //当接收到的选票 轮次低于当前节点选票轮次 说明别的那个节点"落后了",选票直接丢弃不管了
                                //省略日志
                                break;
                            } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {
                                //当前节点与别的节点在同一个选举轮次 大多数情况下 会走到这个循环里
                                //比出高低后 重新为当前票据赋值
                                updateProposal(n.leader, n.zxid, n.peerEpoch);
                                //然后重新发送最新票据
                                sendNotifications();
                            }

                           //省略日志
                         
                            //记录接收到的票据
                            recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
                            //构造 SyncedLearnerTracker 查看收到的票据中 与当前机器票据一样的票据 并存储在SyncedLearnerTracker里
                            //统计选票
                            voteSet = getVoteTracker(recvset, new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch));
                            //判断投票结果 如果当前节点获取了一半以上的投票
                            if (voteSet.hasAllQuorums()) {

                                //如果在finalizeWait时间后再没有投票出现 则认为本轮选举结束 会设置LEADER
                                //如果依然有投票出现 即下面n不为null,则再次执行上面的大while循环 再次比较票据、PK等
                                while ((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null) {
                                    if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {
                                        recvqueue.put(n);
                                        break;
                                    }
                                }


                                if (n == null) {
                                    //统计的选票中 当前节点票据超过总节点数量票据的一半 则当前节点是LEADER 状态是LEADING
                                    setPeerState(proposedLeader, voteSet);
                                    Vote endVote = new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch);
                                  	 //清空接受到的票据集合
                                    leaveInstance(endVote);
                                    return endVote;
                                }
                            }
                            break;
                        //省略无关代码
    }


票据比较
protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) {
        //省略无关代码
      
        return ((newEpoch > curEpoch)
                || ((newEpoch == curEpoch)
                && ((newZxid > curZxid)
                || ((newZxid == curZxid)
                && (newId > curId)))));
    }

票据PK的比较流程:

1、选举轮数较大的直接获胜

2、选举轮数相同的,事务id较大的 直接获胜

3、选举轮数、事务id都相同的,myid值较大的 获胜



投票结果 确认

zk 有两种leader选举衡量方式:基于权重和基于投票数,默认基于投票数

public QuorumMaj(Map<Long, QuorumServer> allMembers) {
    this.allMembers = allMembers;
    for (QuorumServer qs : allMembers.values()) {
        //参与投票的节点
        if (qs.type == LearnerType.PARTICIPANT) {
            votingMembers.put(Long.valueOf(qs.id), qs);
        } else {
            observingMembers.put(Long.valueOf(qs.id), qs);
        }
    }
    //half=参与投票节点数的一半
    half = votingMembers.size() / 2;
}
public boolean containsQuorum(Set<Long> ackSet) {
        //ackSet 是 投给当前节点的节点数 
        return (ackSet.size() > half);
    }

针对前面的代码分析基础,这里简述下3个节点的zk集群启动时选主流程

zk节点1 myid=1、zk节点2 myid=2、zk节点3 myid=3

集群启动时 假设其投票轮数是一致的,并且由于数据同步的存在 其zxid也是一致的

1、节点1启动后 先给自己投一票

2、节点2启动后 先给自己投一票,并且与节点1 交换票据。

  • 节点1 收到节点2的票据后 由于自己的myid较小 pk失败 更新自己的票据(投节点2一票)广播票据
  • 节点2 收到节点1的票据后 由于自己的myid较大 完胜 广播票据。
  • 此时节点2 获得两票 > (3/2=1) 因此节点2 晋升为leader,节点1成为follower

3、节点3启动后 先给自己投一票,并且与节点1、节点2交换票据。虽然节点3的myid比较大,但是节点2已经是leader了,因此节点3也就变成follower了。



版权声明:本文为zyxwvuuvwxyz原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。