Leader选举中涉及的概念
服务器角色
- leader 处理读写请求
- follower 参与leader选举,只能处理读请求并可将事务请求转交给leader来处理
- observer 不参与leader选举,只处理读请求
服务器状态
- LOOKING 服务器正处在选举状态 说明集群还在投票选举 还没有选出leader
- LEADING 服务器处于leading状态,当前server角色是leader
- FOLLOWING 服务器处于following状态,表示当前server的角色是follower
- OBSERVING 服务器处于observing状态,当前server角色是observer
几个属性
myid
这个代表服务器ID,其值例如1、2、3等,在$ZK_HOME/data/myid文件中配置。myid值越大 在leader选举过程中的权重越大
zxid
最近一次处理成功的事务ID,zxid越大 说明数据越新,在leader选举过程中的权重越大
例如下面例子中的 mZxid
[zk: localhost:2181(CONNECTED) 3] create /wojiushiwo 123
Created /wojiushiwo
[zk: localhost:2181(CONNECTED) 7] stat /wojiushiwo
cZxid = 0x1400000002
ctime = Wed Aug 03 10:55:58 CST 2022
mZxid = 0x1400000002
mtime = Wed Aug 03 10:55:58 CST 2022
pZxid = 0x1400000002
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 3
numChildren = 0
[zk: localhost:2181(CONNECTED) 8] set /wojiushiwo 1234
cZxid = 0x1400000002
ctime = Wed Aug 03 10:55:58 CST 2022
mZxid = 0x1400000003
mtime = Wed Aug 03 10:56:59 CST 2022
pZxid = 0x1400000002
cversion = 0
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 4
numChildren = 0
epoch
代表投票轮数,从0开始,每投完一次票 投票轮数+1。一般而言 同一轮投票的epoch是相同的,当然也有不同的情况,那就是有节点宕机后重新参与leader选举。
源码分析
以zookeeper3.6.1版本为例 分析下Leader选举过程
zookeeper3.6.1 Leader选举算法的实现类为FastLeaderElection
对流程图一些调用进行分析
1、QuorumPeer是个线程实现类,其run方法中有leader选举的操作
2、WorkerSender#run 将当前节点票据从sendqueue队列取出 广播给集群中的节点
3、WorkerReceiver#run 收到集群中其他节点的票据 将其放到recvqueue队列 用于PK投票
FastLeaderElection
// 与集群中其他节点进行通信
QuorumCnxManager manager;
//发送票据的队列
LinkedBlockingQueue<ToSend> sendqueue;
//接受票据的队列
LinkedBlockingQueue<Notification> recvqueue;
//当前节点
QuorumPeer self;
Messenger messenger;
//逻辑时钟 代表选举轮数
AtomicLong logicalclock = new AtomicLong(); /* Election instance */
//数据id 对应myid
long proposedLeader;
//事务id
long proposedZxid;
//选举轮数
long proposedEpoch;
volatile boolean stop;
private SyncedLearnerTracker leadingVoteSet;
public Vote lookForLeader() throws InterruptedException {
//省略无关代码
self.start_fle = Time.currentElapsedTime();
try {
/*
* 存储接收到的投票 key是节点id,即myid
*/
Map<Long, Vote> recvset = new HashMap<Long, Vote>();
int notTimeout = minNotificationInterval;
synchronized (this) {
//逻辑时钟+1 表示开启新一轮投票
logicalclock.incrementAndGet();
//更新当前节点的票据
//getInitId 从myid中取值
//getInitLastLoggedZxid 获得上一次成功执行的事务id
// getPeerEpoch 从currentEpoch文件中取值
updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
}
//省略日志
//发送票据 这里是自己给自己发 先投自己一票
sendNotifications();
SyncedLearnerTracker voteSet;
/*
* Loop in which we exchange notifications until we find a leader
*/
//当集群刚启动时,集群节点状态是LOOKING 正忙着leader选举
while ((self.getPeerState() == ServerState.LOOKING) && (!stop)) {
//前面说过 recvqueue队列存储其他节点广播过来的票据
//这里 取出其他节点的票据
Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS);
/*
* Sends more notifications if haven't received enough.
* Otherwise processes new notification.
*/
//当没有取到票据时
if (n == null) {
//如果queueSendMap集合为空 说明票据都发送出去了
if (manager.haveDelivered()) {
//发送一次票据
//笔者不太理解这里,为什么要再发一次票据 而且这票据还是当前节点的票据,前面不是发过了吗?
sendNotifications();
} else {
//如果queueSendMap集合中还有数据 说明还有票据没有发送出去 可能集群连接断开了 需要重新连接
manager.connectAll();
}
/*
* 由于没有从recvqueue队列取到票据 这里适当延长超时时间,再次尝试
*/
int tmpTimeOut = notTimeout * 2;
notTimeout = Math.min(tmpTimeOut, maxNotificationInterval);
LOG.info("Notification time out: {}", notTimeout);
} else if (validVoter(n.sid) && validVoter(n.leader)) {
//当取到了票据后 ,这里else if中的逻辑判断是:当前节点的myid是否属于集群节点
switch (n.state) {
//集群启动时 只有节点状态是LOOKING的 才有资格参与选举
case LOOKING:
if (getInitLastLoggedZxid() == -1) {
LOG.debug("Ignoring notification as our zxid is -1");
break;
}
if (n.zxid == -1) {
LOG.debug("Ignoring notification from member with -1 zxid {}", n.sid);
break;
}
//当接收到的选票 轮次高于当前节点选票轮次 说明当前节点"落后了"
if (n.electionEpoch > logicalclock.get()) {
System.out.println(">");
//重新设置当前节点选票轮次
logicalclock.set(n.electionEpoch);
//并清空接受到的票据集合(改朝换代了 从头开始)
recvset.clear();
//比较当前票据与其他节点的票据 比出高低后 重新为当前票据赋值
if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
updateProposal(n.leader, n.zxid, n.peerEpoch);
} else {
updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
}
//然后重新发送最新票据
sendNotifications();
} else if (n.electionEpoch < logicalclock.get()) {
//当接收到的选票 轮次低于当前节点选票轮次 说明别的那个节点"落后了",选票直接丢弃不管了
//省略日志
break;
} else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {
//当前节点与别的节点在同一个选举轮次 大多数情况下 会走到这个循环里
//比出高低后 重新为当前票据赋值
updateProposal(n.leader, n.zxid, n.peerEpoch);
//然后重新发送最新票据
sendNotifications();
}
//省略日志
//记录接收到的票据
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
//构造 SyncedLearnerTracker 查看收到的票据中 与当前机器票据一样的票据 并存储在SyncedLearnerTracker里
//统计选票
voteSet = getVoteTracker(recvset, new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch));
//判断投票结果 如果当前节点获取了一半以上的投票
if (voteSet.hasAllQuorums()) {
//如果在finalizeWait时间后再没有投票出现 则认为本轮选举结束 会设置LEADER
//如果依然有投票出现 即下面n不为null,则再次执行上面的大while循环 再次比较票据、PK等
while ((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null) {
if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {
recvqueue.put(n);
break;
}
}
if (n == null) {
//统计的选票中 当前节点票据超过总节点数量票据的一半 则当前节点是LEADER 状态是LEADING
setPeerState(proposedLeader, voteSet);
Vote endVote = new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch);
//清空接受到的票据集合
leaveInstance(endVote);
return endVote;
}
}
break;
//省略无关代码
}
票据比较
protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) {
//省略无关代码
return ((newEpoch > curEpoch)
|| ((newEpoch == curEpoch)
&& ((newZxid > curZxid)
|| ((newZxid == curZxid)
&& (newId > curId)))));
}
票据PK的比较流程:
1、选举轮数较大的直接获胜
2、选举轮数相同的,事务id较大的 直接获胜
3、选举轮数、事务id都相同的,myid值较大的 获胜
投票结果 确认
zk 有两种leader选举衡量方式:基于权重和基于投票数,默认基于投票数
public QuorumMaj(Map<Long, QuorumServer> allMembers) {
this.allMembers = allMembers;
for (QuorumServer qs : allMembers.values()) {
//参与投票的节点
if (qs.type == LearnerType.PARTICIPANT) {
votingMembers.put(Long.valueOf(qs.id), qs);
} else {
observingMembers.put(Long.valueOf(qs.id), qs);
}
}
//half=参与投票节点数的一半
half = votingMembers.size() / 2;
}
public boolean containsQuorum(Set<Long> ackSet) {
//ackSet 是 投给当前节点的节点数
return (ackSet.size() > half);
}
针对前面的代码分析基础,这里简述下3个节点的zk集群启动时选主流程
zk节点1 myid=1、zk节点2 myid=2、zk节点3 myid=3
集群启动时 假设其投票轮数是一致的,并且由于数据同步的存在 其zxid也是一致的
1、节点1启动后 先给自己投一票
2、节点2启动后 先给自己投一票,并且与节点1 交换票据。
- 节点1 收到节点2的票据后 由于自己的myid较小 pk失败 更新自己的票据(投节点2一票)广播票据
- 节点2 收到节点1的票据后 由于自己的myid较大 完胜 广播票据。
- 此时节点2 获得两票 > (3/2=1) 因此节点2 晋升为leader,节点1成为follower
3、节点3启动后 先给自己投一票,并且与节点1、节点2交换票据。虽然节点3的myid比较大,但是节点2已经是leader了,因此节点3也就变成follower了。