前情提要
上一篇
【Zookeeper 源码解读系列, 单机模式(三)】
我们讲了单机模式下服务器的启动,配置参数的解析,ZooKeeperServerMain的启动,服务端启动的步骤,启动时加载数据,最后是ZookeeperServer的启动。由于篇幅太长,考虑到大家阅读起来比较困难,就把单机模式下最重要也是最不好理解的一篇拆分出来,单独做为一篇以降低读者阅读时的疲劳感。那么我们就接着上一次的话题继续讲ZookeeperServer启动结束以后,又做了什么事情。本篇也会被收录到
【Zookeeper 源码解读系列目录】
中。
请求处理器链的概念
现在我们先把代码放一边,先总结下到目前为止服务器都做了什么。刚才我们说到服务端启动的时候,在
NIOServerCxnFactory.startup()
的方法里启动了一个线程,就是
NIOServerCxnFactory
这个NIO的线程,这是第一件事情;第二件事情就是加载数据;然后又做了第三件事情,在
ZookeeperServer.startup()
方法里启动了一个session的跟踪器,这个也是一个线程,我们刚刚说国;最后就是
setupRequestProcessors()
,这里看名字是在设置请求处理器(
RequestProcessors
)。顾名思义就是要处理请求的,这里其实一个非常重要的点,为什么说很重要呢?
因为这几乎就是Zookeeper处理事务的核心逻辑,在讲解代码之前,需要先从概念上理清楚逻辑,否则直接看代码真的是一场噩梦。我们先进入
setupRequestProcessors();
看看这里又写了什么事情:
protected void setupRequestProcessors() {
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
RequestProcessor syncProcessor = new SyncRequestProcessor(this, finalProcessor);
((SyncRequestProcessor)syncProcessor).start();
firstProcessor = new PrepRequestProcessor(this, syncProcessor);
//这里启动的是PrepRequestProcessor首个处理器
((PrepRequestProcessor)firstProcessor).start();
}
进去以后看到这里面有三个处理器Processor的类而且,全都new了出来,这些是什么呢?那么就要重点介绍一下ZooKeeper的请求处理器(
RequestProcessors
)的逻辑,这个逻辑和我们一般意义上的链表很像,所以权且把这个逻辑叫做处理器链。每当一个请求过来的时候,就会通过不同的请求处理器去处理不同的逻辑。在单机模式下有三种处理器(也是三个线程类):
PrepRequestProcessor、SyncRequestProcessor、FinalRequestProcessor
,这些处理器分开来说,就是做下面这些任务的:
PrepRequestProcessor
->做认证,生成txn事务对象,完成后转交给
SyncRP
SyncRequestProcessor
->事务对象持久化(生成事务文件),打快照,完成后转交给
FinalRP
FinalRequestProcessor
->做内存更改,处理事件响应啥的都是在这里弄的,返回客户端响应
所以整条链就是这样的:
PrepRequestProcessor.next=SyncRequestProcessor.next=FinalRequestProcessor
为什么可以这样写呢?因为这些处理器里面都有一个next的属性,指向下一下处理器。此处要注意记住一点:第一个请求
firstProcessor
是
NIOServerCnxn
中被调用的,也就是说处理器链的实例化以及启动都是在服务端。
介绍完什么是处理器链的概念,我们现在说到这里,其实遗留了两大问题:
-
NIOServerCxnFactory
这个线程做了什么事情? -
我们根据NIO的思想从
socket-channel
取出数据以后,谁去做了处理?
根据我们之前的分析,NIO是处理
Socket
连接的,所以问题1取数据应当是
NIOServerCxnFactory
做的。而问题2呢,当然应该是我们的处理器链做的。那么后面我们看下代码里面是不是和我们的猜想一样。
NIOServerCnxnFactory线程
如刚才所说先去看run()方法,其实从道理上讲,
NIOServerCnxnFactory
的逻辑应该和我们客户端
NIOClientCnxn
的逻辑应该是镜像相反的。所以这里面也应该有读和写,那让我们看下代码:
public void run() {
while (!ss.socket().isClosed()) {
try {
/**构建选择器,从channel里面拿出数据,封装为selectedList,略过**/
for (SelectionKey k : selectedList) {
//如果取到的是一个connection的请求,则建立连接
if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
SocketChannel sc = ((ServerSocketChannel) k .channel()).accept();
InetAddress ia = sc.socket().getInetAddress();
int cnxncount = getClientCnxnCount(ia);
if (maxClientCnxns > 0 && cnxncount >= maxClientCnxns){
LOG.warn("Too many connections from " + ia + " - max is " + maxClientCnxns );
sc.close();
} else {
LOG.info("Accepted socket connection from "
+ sc.socket().getRemoteSocketAddress());
sc.configureBlocking(false);
SelectionKey sk = sc.register(selector, SelectionKey.OP_READ);
NIOServerCnxn cnxn = createConnection(sc, sk);//建立连接
sk.attach(cnxn);
addCnxn(cnxn);
}
} else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
//如果不是建立连接的,就是一些写或者读的数据
NIOServerCnxn c = (NIOServerCnxn) k.attachment();
c.doIO(k);//和客户端一样开始doIO
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Unexpected ops in select " + k.readyOps());
}
}
}
selected.clear();
} catch (***Exception e) {
/**异常略**/
}
}
closeAll();
LOG.info("NIOServerCnxn factory exited run method");
}
我们略过选择器的部分,到
for (SelectionKey k : selectedList)
循环这里,碰见了第一个判断条件
if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0)
,这里的连接是和channel连接,所以是
OP_ACCEPT
字段,所以说取到的是一个
Connection
的请求,那么就会走进去,做一些绑定地址,校验参数之类的工作,然后建立连接NIOServerCnxn cnxn = createConnection(sc, sk);,这里没什么好说的。我们走到
else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0)
里面,如果不是建立连接的,就是一些写或者读的数据,当然也会间歇性的收到客户端的ping。那么我们是不是找到了一个很熟悉的方法
c.doIO(k);
呢,所以我们就着重去里面看:
void doIO(SelectionKey k) throws InterruptedException {
try {
/**验证socket没有打开,return;,略过**/
if (k.isReadable()) {//读取准备完毕
int rc = sock.read(incomingBuffer);
if (rc < 0) {//剩下的数据小于0,报错
/**抛出EndOfStreamException数据异常**/
}
if (incomingBuffer.remaining() == 0) {
boolean isPayload;
if (incomingBuffer == lenBuffer) {
incomingBuffer.flip();
isPayload = readLength(k);
incomingBuffer.clear();
} else {
isPayload = true;
}
if (isPayload) {
readPayload();//正式加载数据
}
else { return; }
}
}
if (k.isWritable()) {
/**写就绪,略过**/
}
} catch (***Exception e) {
/**各种异常的逻辑,略过**/
}
}
我们进入以后看到如果
k.isReadable()
都就绪了,那么就读取客户端发来的数据,这里没什么可说的,如果一切都做好了那么就开始
readPayload();
正式加载数据了:
private void readPayload() throws IOException, InterruptedException {
if (incomingBuffer.remaining() != 0) {//是否还有遗留的数据
int rc = sock.read(incomingBuffer);//接着读剩下的数据
if (rc < 0) {//剩下的数据小于0,报错
/**抛出EndOfStreamException数据异常**/
}
}
if (incomingBuffer.remaining() == 0) { //如果全部读完了
packetReceived();//计数的方法
incomingBuffer.flip();
if (!initialized) {//初始化Y/N
readConnectRequest();//N:重新连接
} else {
readRequest(); //Y:开始读取请求
}
lenBuffer.clear();
incomingBuffer = lenBuffer;
}
}
进入以后又开始判断是不是还有遗留的数据
if (incomingBuffer.remaining() != 0)
,如果有那就继续把剩余的
sock.read(incomingBuffer)
数据都读出来,如果
if (incomingBuffer.remaining() == 0)
如果全部读完了,就可以请求了,
packetReceived();
这是个计数器没什么用略过,继续判断
if (!initialized)
有没有初始化成功,如果没有重新连接
readConnectRequest();
,如果已经就绪开始读取请求
readRequest();
,所以我们还得去
readRequest();
里面看内容:
private void readRequest() throws IOException {
zkServer.processPacket(this, incomingBuffer);
}
里面只有一句话,那我们还得接着往里面走,但是要说明一点:到了这里服务器实例已经开始处理收到的packet了,
incomingBuffer
就是
socket
里面的数据,下面进入
processPacket(this, incomingBuffer);
public void processPacket(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException {
InputStream bais = new ByteBufferInputStream(incomingBuffer);
BinaryInputArchive bia = BinaryInputArchive.getArchive(bais);
RequestHeader h = new RequestHeader();//构造请求头
h.deserialize(bia, "header");
incomingBuffer = incomingBuffer.slice();
//根据header的不同类型进行处理
if (h.getType() == OpCode.auth) {//这个auth其实就是addauth命令
/**这里我们先不看,后面ACL会仔细讲解**/
} else {//不是auth
if (h.getType() == OpCode.sasl) {//如果是sasl认证
/**sasl命令认证逻辑也先略过**/
}
else {//如果不是auth也不是sasl,到这里来
Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(),
h.getType(), incomingBuffer, cnxn.getAuthInfo());//直接从socket里面拿出一个request
si.setOwner(ServerCnxn.me);
submitRequest(si);//拿出后,提交request
}
}
cnxn.incrOutstandingRequests(h);
}
进入以后发现,首先还是解析数据
incomingBuffer
包装成
BinaryInputArchive
,然后
RequestHeader h = new RequestHeader();
构造请求头,这里在客户端似曾相识对吧,还有更熟悉的,后面都是根据
RequestHeader
的类型
getType()
来判断传递过来的是什么命令,几乎和客户端是一样的逻辑。我们这里先略过auth和sasl,直接到命令的分支
else
中,第一件事情就是取出一个请求
si = new Request(cnxn, cnxn.getSessionId(), h.getXid(), h.getType(), incomingBuffer, cnxn.getAuthInfo());
,这里要注意
cnxn
这个参数,当初传入的是可是this,所以这个其实传入的是
NIOServerCnxn
这个类,那么这个类的实例又是被
NIOServerCnxnFactory
实现的,绕了一圈最终还是回到了这里。那么拿出来了
Request
以后还要提交这个请求
submitRequest(si);
,下面就进入这个方法:
public void submitRequest(Request si) {
if (firstProcessor == null) { //firstProcessor出现了
synchronized (this) {
try {
while (state == State.INITIAL) {//如果正在初始化,那就等着初始化完毕
wait(1000);
}
} catch (InterruptedException e) {
LOG.warn("Unexpected interruption", e);
}
if (firstProcessor == null || state != State.RUNNING) {
throw new RuntimeException("Not started");
}
}
}
try {
touch(si.cnxn);
boolean validpacket = Request.isValid(si.type);//判断request的类型是不是合法的
if (validpacket) {
firstProcessor.processRequest(si); //判断完毕以后到这里来
if (si.cnxn != null) {
incInProcess();
}
} else {
LOG.warn("Received packet at server of unknown type " + si.type);
new UnimplementedRequestProcessor().processRequest(si);
}
} catch (***Exception e) {
/**多个异常,略**/
}
}
一进来就看到一个老熟人儿
firstProcessor
,接着判断如果
firstProcessor
没有初始化好,或者正在初始化,那就等着初始化结束,因为正常情况下
firstProcessor
在刚才
setupRequestProcessors()
方法里已经初始化好了的,而且立刻就开启了。正常情况下是不会走进这里来的,我们跳过到下面的
try-catch
中,首先
validpacket = Request.isValid(si.type)
判断
request
的类型是不是合法的,如果不合法怎么处理的有兴趣的同学可以自己研究下,那么我们只看正常情况,到了
firstProcessor.processRequest(si);
这里终于和我们的处理器链接上头了。
解读处理器链
firstProcessor
有印象的同学一定还记得这个是
PrepRequestProcessor
的实例对象,其实
processRequest(si)
这个方法是一个接口,实现的Processor类有很多,但是这里我们就先去
PrepRequestProcessor.processRequest(si)
看下内容:
public void processRequest(Request request) {
submittedRequests.add(request);
}
进入以后发现里面什么逻辑都没有,只是把这个request加到了一个List里面,这是个什么东西?这其实是一个队列
queue
。到这里我们必须先停一下,捋一捋目前为止客户端和服务端都做了什么,否则我们的逻辑就乱了,那么请大家带着NIO的思想和笔者一起回顾一下之前都讲了什么:
讲解完什么是处理器链以后我留了两问题:
[1.NIOServerCnxnFactory这个线程做了什么事情?]
首先这个里面有一个
NIOServerCnxn
这个和客户端的
NIOClientCnxn
是一样的,所以这俩其实是配套使用的,那么
NIOServerCnxn
这里面就也有一个
doIO()
方法。顾名思义
doIO()
里面既有读数据,也有写数据(这里我们没有讲,有兴趣自己看一下,不影响流程)。那么连接的时候,我们客户端连接完了会发送一个
ConnectRequest
的请求过来,所以读取的时候服务端也会先去看下是不是读取到了
ConnectRequest
请求,读到了就回去进行一个连接处理。如果说不是这个那就是其他命令请求(比如说
create
),如果说读到了这种请求,那么就会调用
ZookeeperServer.processPacket()
构造出
request
,但是这里要提醒一点,客户端发的是什么?是
packet
,所以服务端首先要做的也是读取出来
packet
然后再构造
request
。再然后就
ZookeeperServer.submitRequest()
提交请求。其实这里使用的也是NIO的思想,这个方法我们刚刚看过,也不是同步的取出里请求,而是放到一个队列里,这个队列就是上面
processRequest()
方法里面的
submittedRequests
,放到队列里面以后谁去处理呢?我们一开始讲处理器链的时候就说过,处理器类都是线程,而这个线程早就已经开启了。那么当这个
submittedRequests
队列里有数据的时候,
firstProcessor
这个线程就会从队列
queue
里面拿出
Request
进行处理,然后再去执行整个请求处理链的逻辑,最终执行到
finalProcessor
把结果返回出去。通过以上整理我们画个简图加深一下理解:
客户端
request--包装-->packet--存入-->outgoingQueue--发送-->socket
服务端:
socket--取出包装-->packet--构造-->request--存入-->submittedRequests--取出-->firstProcessor.run()处理request,走处理器链逻辑
可以看出client和server两个地方的逻辑差不多就是镜像的,但是都深入的利用了NIO的思想。
PrepRequestProcessor
现在我们可以去看处理器链是怎么玩的,并且把
[问题2:取出数据以后,谁去做了处理?]
一起解决。所以说我们现在要去的地方就是
PrepRequestProcessor.run()
,看看我们之前的分析对不对:
public void run() {
try {
while (true) {
Request request = submittedRequests.take();//取出请求
/**日志相关,不重要,略**/
pRequest(request);//这里重点,开始处理
}
} catch (***Exception e) {
/**异常相关,不重要,略**/
}
LOG.info("PrepRequestProcessor exited loop!");
}
我们进入以后第一眼看到了
Request request = submittedRequests.take()
,拿出请求封装成
request
,这就说明我们分析的没错。既然取出来了,那就要开始处理,所以还得进入
pRequest(request);
看一看,这个方法很长会酌情删除大部分不相干的代码:
protected void pRequest(Request request) throws RequestProcessorException {
request.hdr = null;
request.txn = null;
try {//判断是什么命令
switch (request.type) {
case OpCode.create:
CreateRequest createRequest = new CreateRequest();//构造create request
pRequest2Txn(request.type, zks.getNextZxid(), request, createRequest, true);
break;
case OpCode.***:
/**这里case有很多比如delete,setData,getACL,ping之类的,全部略过**/
break;
default:
LOG.warn("unknown type " + request.type);
break;
}
} catch (***Exception e) {
/**跳过exception**/
}
request.zxid = zks.getZxid();
nextProcessor.processRequest(request);//指向下一个处理器Sync
}
我先解释一下,这里基本上把代码删除的每几行了,只保留了我们需要讲解的部分,其实这个方法很长,全部贴出会对写文章造成很大干扰,也不方便大家阅读,有兴趣可以自己使用源码看下,下面我们继续。
进入以后可以看到首先
switch (request.type)
判断是什么命令,我们还是以Create命令作为例子,所以只保留了
case OpCode.create:
相关的代码。进入
case
首先就构造了一个
CreateRequest
的实例,然后传入了这样一个方法里
pRequest2Txn(request.type, zks.getNextZxid(), request, createRequest, true);
,那么我们打开
pRequest2Txn(***)
看看里面写了什么,老规矩,我们还是只保留create相关的:
protected void pRequest2Txn(int type, long zxid, Request request, Record record, boolean deserialize)
throws KeeperException, IOException, RequestProcessorException
{
request.hdr = new TxnHeader(request.sessionId, request.cxid, zxid,
Time.currentWallTime(), type);
switch (type) { //匹配type
case OpCode.create: //如果是createRequest
zks.sessionTracker.checkSession(request.sessionId, request.getOwner());//验证session
CreateRequest createRequest = (CreateRequest)record; //构造createRequest,此时为null
if(deserialize) //从request里面反序列化出来createRequest
ByteBufferInputStream.byteBuffer2Record(request.request, createRequest);
String path = createRequest.getPath();//拿到path
int lastSlash = path.lastIndexOf('/');//下面是一些路径的验证
if (lastSlash == -1 || path.indexOf('\0') != -1 || failCreate) {
/**不通过,抛KeeperException.BadArgumentsException**/
}
List<ACL> listACL = removeDuplicates(createRequest.getAcl());//ACL的验证
if (!fixupACL(request.authInfo, listACL)) {
throw new KeeperException.InvalidACLException(path);
}
String parentPath = path.substring(0, lastSlash);//拿出父节点的路径
ChangeRecord parentRecord = getRecordForPath(parentPath);//根据路径找到父节点记录record
checkACL(zks, parentRecord.acl, ZooDefs.Perms.CREATE,
request.authInfo);//验证创建权限
int parentCVersion = parentRecord.stat.getCversion();//取父节点的Cversion
CreateMode createMode =
CreateMode.fromFlag(createRequest.getFlags());//取出来createMode
//如果createMode是序列化的,那么取出parentCVersion,这里是上面parentPath取出来的
if (createMode.isSequential()) {//如果是顺序节点
path = path + String.format(Locale.ENGLISH, "%010d", parentCVersion);
}
validatePath(path, request.sessionId);
try {
if (getRecordForPath(path) != null) {
throw new KeeperException.NodeExistsException(path);
}
} catch (KeeperException.NoNodeException e) {
// ignore this one
}
//看下父节点是不是临时节点,getEphemeralOwner()存的是Sessionid,如果不是0那就肯定是临时节点
boolean ephemeralParent = parentRecord.stat.getEphemeralOwner() != 0;
if (ephemeralParent) {
//如果是true,父节点就是临时节点抛出异常
throw new KeeperException.NoChildrenForEphemeralsException(path);
}
//newCversion就是要真正创建的节点,取了当前父节点的子节点的版本+1
int newCversion = parentRecord.stat.getCversion()+1;
//txn就是事务
request.txn = new CreateTxn(path, createRequest.getData(),
listACL,
createMode.isEphemeral(), newCversion);
StatPersisted s = new StatPersisted();
if (createMode.isEphemeral()) {
s.setEphemeralOwner(request.sessionId);
}
//刷新父节点记录
parentRecord = parentRecord.duplicate(request.hdr.getZxid());
parentRecord.childCount++;
parentRecord.stat.setCversion(newCversion);
//第一个修改添加父节点修改记录
addChangeRecord(parentRecord);
//第二个修改添加子节点修改记录
addChangeRecord(new ChangeRecord(request.hdr.getZxid(), path, s,
0, listACL));
break;
case OpCode.****:
break;
default:
LOG.error("Invalid OpCode: {} received by PrepRequestProcessor", type);
}
}
进入方法以后,首先我们还是去
case OpCode.create:
这里case里面,首先强制转换一个
record
对象为
CreateRequest
对象,
CreateRequest
继承自
Record
强转没问题,但是看清楚这个时候这个对象是空的,然后从request里反序列化
ByteBufferInputStream.byteBuffer2Record(request.request, createRequest)
出来读出
createRequest
,此时
createRequest
就又值了。接着拿到命令里
path = createRequest.getPath();
路径,这个路径就是输入create命令后的那个路径。然后是验证路径,验证acl,不多说。再往后是拿出
parentPath = path.substring(0, lastSlash)
父节点的路径,又根据路径
getRecordForPath(parentPath)
找到父节点记录
record
,这个方法就是从ZKDatabase中拿出最近的一次修改记录,因为一个节点目前的值只和最近的一次修改有关系,所以这里取出的最近的一条修改记录,就相当于取出了父节点最新的记录。再次验证父节点的acl权限,又取
parentCVersion = parentRecord.stat.getCversion();
父节点的Cversion,这个Cversion的意思就是子节点的版本。再后面构造创建模式
CreateMode createMode = CreateMode.fromFlag(createRequest.getFlags());
这是做什么用的呢?这个就是取出创建的是什么样的节点。如果说创建的是
if (createMode.isSequential())
顺序节点,这里对路径做了一个格式操作,再往下判断拿出的父节点是不是临时节点
if (ephemeralParent)
,如说是临时节点就抛出异常,这里的输出就是试图给临时节点创建子节点的时候打印出来的异常信息。再往下更新版本
newCversion = parentRecord.stat.getCversion()+1;
取得就是父节点的版本+1,
newCversion
就是要真正创建的节点的版本。再往下就是
request.txn
终于我们到了和事务有关系的地方了,记住这里,这里给
request
中的
txn
赋值。然后刷新父节点记录,因为每创建了一个子节点就会进行两次修改,第一个修改是修改添加父节点记录,第二个修改是当前要创建的子节点的记录。这里我们点击去,看看添加到哪里去了
addChangeRecord(parentRecord);
,进入:
void addChangeRecord(ChangeRecord c) {
synchronized (zks.outstandingChanges) {
zks.outstandingChanges.add(c);
zks.outstandingChangesForPath.put(c.path, c);
}
}
这里发现这些修改记录被添加到
outstandingChanges
这个
List
里面了,这其实也是一个NIO中的
queue
,那么就肯定有东西,把修改记录从这个list里面取出来,做持久化啊,更新内存等等。
那这些方法都执行完了以后,我们一级一级往上跳出还是回到
pRequest(request);
这个方法里来:
protected void pRequest(Request request) throws RequestProcessorException {
request.hdr = null;
request.txn = null;
try {//判断是什么命令
switch (request.type) {
/**走完了,略过**/
}
} catch (***Exception e) {
/**跳过exception**/
}
request.zxid = zks.getZxid();
nextProcessor.processRequest(request);//指向下一个处理器Sync
}
现在就剩下最后两行了,其实我们的
PrepRequestProcessor
这第一个处理器也已经执行完了,所以我们该调用下一个处理器了
nextProcessor.processRequest(request);
。先不忙去
SyncRequestProcessor
里面去看,到这里或许会有些同学很迷茫怎么
nextProcessor
就是
SyncRequestProcessor
呢?这里我们就得先回到
ZookeeperServer
中的
setupRequestProcessors()
里面:
protected void setupRequestProcessors() {
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
RequestProcessor syncProcessor = new SyncRequestProcessor(this, finalProcessor);
((SyncRequestProcessor)syncProcessor).start();
firstProcessor = new PrepRequestProcessor(this, syncProcessor);
((PrepRequestProcessor)firstProcessor).start();
}
当我们
new PrepRequestProcessor
的时候,我们传入了两个参数,一个是
this
肯定就是
PrepRequestProcessor
,第二个就是
syncProcessor
。聪明的同学肯定想到了我们传递的
syncProcessor
就是在这个构造方法里赋值了,没错就是这样:
public PrepRequestProcessor(ZooKeeperServer zks,
RequestProcessor nextProcessor) {
super("ProcessThread(sid:" + zks.getServerId() + " cport:"
+ zks.getClientPort() + "):", zks.getZooKeeperServerListener());
this.nextProcessor = nextProcessor;//赋值传递
this.zks = zks;
}
看这句话
this.nextProcessor = nextProcessor;
这个就是赋值了,我们可以看到后续
SyncRequestProcessor
也是一样,都是把后面的
Processor
对象传递进来然后直接赋值给
nextProcessor
,完成了传递的过程。好那现在我们去看
SyncRequestProcessor.processRequest(request)
:
public void processRequest(Request request) {
queuedRequests.add(request);
}
SyncRequestProcessor
到这里来,看到也是把当前的请求加入到一个队列
queuedRequests
里,不用说一定是
SyncRequestProcessor.run()
里面在使用,所以我们过去看:
public void run() {
try {
int logCount = 0;
setRandRoll(r.nextInt(snapCount/2));
while (true) {
Request si = null;
if (toFlush.isEmpty()) {
si = queuedRequests.take();
} else {
si = queuedRequests.poll();
if (si == null) {
flush(toFlush);
continue;
}
}
if (si == requestOfDeath) {
break;
}
if (si != null) {
if (zks.getZKDatabase().append(si)) {//如果流放成功了
logCount++;
if (logCount > (snapCount / 2 + randRoll)) {
/**打快照,暂时略**/
}
} else if (toFlush.isEmpty()) {
/**nextProcessor暂时略**/
}
toFlush.add(si);//到这里最终把request都加到toFlush这个list里面
if (toFlush.size() > 1000) {//如果总数大于1000则把流都flush到文件里
flush(toFlush);
}
}
}
} catch (Throwable t) {
handleException(this.getName(), t);
running = false;
}
LOG.info("SyncRequestProcessor exited!");
}
到这里来,这里就是取出queuedRequests里的请求,这里也是做持久化的处理器。首先也是取数据
si = queuedRequests.take();
如果
toFlush.isEmpty()==false
说明没有取完,接着取。取完了以后,发现取出来的
request
不是空的
if (si != null)
就把请求加载
Database
类上,这里我们到
zks.getZKDatabase().append()
方法里:
public boolean append(Request si) throws IOException {
return this.snapLog.append(si);//snapLog事务快照
}
snapLog
我们之前介绍过,这个就是
FileTxnSnapLog
事务快照的工具类,我们接着进
this.snapLog.append()
:
public boolean append(Request si) throws IOException {
return txnLog.append(si.hdr, si.txn);//txnLog 事务log
}
发现最终被加入事务log的工具类
TxnLog
中,那传递进来的
si.hdr
和
si.txn
是什么呢?这
si.hdr
这个一看就知道是
RequestHeader
嘛,第二个
si.txn
则是我们在
PrepRequestProcessor
中的
pRequest2Txn(***)
里面new出来的:
request.txn = new CreateTxn(...)
,就是我们创建的
create
命令的事务。我们接着
txnLog.append()
进入
public synchronized boolean append(TxnHeader hdr, Record txn) throws IOException
{
/**抛开验证往下走**/
if (logStream==null) {//logStream log流对象
if(LOG.isInfoEnabled()){
LOG.info("Creating new log file: " + Util.makeLogName(hdr.getZxid()));
}
logFileWrite = new File(logDir, Util.makeLogName(hdr.getZxid())); //生成文件对象
//放到输出流对象里
fos = new FileOutputStream(logFileWrite);
logStream=new BufferedOutputStream(fos);
oa = BinaryOutputArchive.getArchive(logStream);
FileHeader fhdr = new FileHeader(TXNLOG_MAGIC,VERSION, dbId);
fhdr.serialize(oa, "fileheader");
logStream.flush();
filePadding.setCurrentSize(fos.getChannel().position());
streamsToFlush.add(fos);
}
filePadding.padFile(fos.getChannel());
byte[] buf = Util.marshallTxnEntry(hdr, txn);
if (buf == null || buf.length == 0) {
throw new IOException("Faulty serialization for header " +
"and txn");
}
Checksum crc = makeChecksumAlgorithm();
crc.update(buf, 0, buf.length);
oa.writeLong(crc.getValue(), "txnEntryCRC");
Util.writeTxnBytes(oa, buf);
return true;
}
看到一个关键的对象
logStream
,log流对象,紧接着就是一个
File
对象
logFileWrite = new File(logDir, Util.makeLogName(hdr.getZxid()));
把配置的路径
logDir
和事务id(
Zxid
)传进去。再把文件对象
logFileWrite
放到文件输出流对象里
fos = new FileOutputStream(logFileWrite);
,然后再包装为
BufferedOutputStream
对象
logStream
,后面都是文件的标准流程不多解释,做完所有的工作以后
return true
出去,但是直到
return
都没有写文件的步骤,所以这里面只是把事务给放到流里面去,所以写文件步骤一定在外面,那么得一路跳出去了,还是回到
run()
方法里。
持久化事物
————————————————————–回到
run()
方法————————————————————————-
这里就又回到
run()
方法的解读了,如果流放成功了,但是要记得我们在里面就没还有看到
flush()
,说明还没有存到文件里,继续进入
if
语句
logCount++
日志计数器加1,然后
if (logCount > (snapCount / 2 + randRoll))
这里是打快照的一会儿再说,我们假定是false跳过。直接到
toFlush.add(si);
这句话来,这里就把请求
Request
加到了
toFlush
这个
List
里面,然后如果
if (toFlush.size() > 1000)
如果总数大于1000则把流都
flush(toFlush);
出去,那么
flush
的是什么东西呢?我们进入看:
private void flush(LinkedList<Request> toFlush) throws IOException, RequestProcessorException
{
/**略**/
zks.getZKDatabase().commit();
/**略**/
}
这里就
commit()
提交到了到了
ZKDatabase()
,我们接着进入
commit()
看看提交的是什么:
public void commit() throws IOException {
this.snapLog.commit();
}
接着进入
snapLog.commit();
:
public void commit() throws IOException {
txnLog.commit();
}
继续进入
txnLog.commit();
:
public synchronized void commit() throws IOException {
if (logStream != null) {
logStream.flush();
}
/**略**/
}
这里我们就看到了IO接口里的flush(),这里就是写出去文件的地方了,写的是什么呢就是logStream这个刚才我们一层又一层的封装的流对象。OK,那我们再回到run()方法里。
打快照SnapLog
————————————————————–回到
run()
方法——————————————————
那么我们回到
if (logCount > (snapCount / 2 + randRoll))
这里,我把这一块单独拿出来:
public void run() {
try {
/**略**/
if (si != null) {
if (zks.getZKDatabase().append(si)) {//如果流放成功了
logCount++;//+1记录一下
if (logCount > (snapCount / 2 + randRoll)) {//这里是什么样的数量下要做快照
setRandRoll(r.nextInt(snapCount/2));
zks.getZKDatabase().rollLog();//滚动并生成新的快照log文件
if (snapInProcess != null && snapInProcess.isAlive()) {
LOG.warn("Too busy to snap, skipping");
} else {
snapInProcess = new ZooKeeperThread("Snapshot Thread") {//启动一个线程打快照
public void run() {
try {
zks.takeSnapshot();//打快照
} catch(Exception e) {
LOG.warn("Unexpected exception", e);
}
}
};
snapInProcess.start();
}
logCount = 0;
}
} else if (toFlush.isEmpty()) {
/**nextProcessor暂时略**/
}
/**flush已经讲过,略**/
}
} catch (Throwable t) {
handleException(this.getName(), t);
running = false;
}
LOG.info("SyncRequestProcessor exited!");
}
这个
if
块里的内容其实就是我们一直再说的打快照。这里是单独启动了一个线程打快照
snapInProcess
。我们接着说,如果刚才的流放成功了,那么
logCount+1
记录下来,那么就会判断多少个日志才会打一个快照,这里
snapCount
是可以在配置文件中配置的,然后根据配置的数量进行一个随机数运算,如果说大于这个数量了,首先先把随机数重置
setRandRoll(r.nextInt(snapCount/2));
,然后滚动并生成新的快照文件,把目前所有的事物都存成文件出去。我们点进去
zks.getZKDatabase().rollLog();
public void rollLog() throws IOException {
this.snapLog.rollLog();
}
接着进入
this.snapLog.rollLog();
public void rollLog() throws IOException {
txnLog.rollLog();
}
在进入
txnLog.rollLog();
public synchronized void rollLog() throws IOException {
if (logStream != null) {
this.logStream.flush();//写入文件
this.logStream = null;//置空,相当于生成新文件
oa = null;
}
}
如果
if (logStream != null)
流不是空,就把
logStream
写入文件,然后重新生成一个新的文件
this.logStream = null;
。这里就完成了一个滚动并生成新的快照的逻辑。
到现在我们就剩下了一个事情打快照,来我们看下快照是怎么打的
zks.takeSnapshot();
:
public void takeSnapshot(){
try {
txnLogFactory.save(zkDb.getDataTree(), zkDb.getSessionWithTimeOuts());
} catch (IOException e) {
LOG.error("Severe unrecoverable error, exiting", e);
System.exit(10);
}
}
这里没什么东西,直接进入
save(***)
:
public void save(DataTree dataTree,
ConcurrentHashMap<Long, Integer> sessionsWithTimeouts)
throws IOException {
long lastZxid = dataTree.lastProcessedZxid;
File snapshotFile = new File(snapDir, Util.makeSnapshotName(lastZxid));
LOG.info("Snapshotting: 0x{} to {}", Long.toHexString(lastZxid),
snapshotFile);
snapLog.serialize(dataTree, sessionsWithTimeouts, snapshotFile);//序列化到文件
}
这里面其实也没有很复杂的逻辑就是直接把整个
dataTree
直接序列化到文件里面,那么这里打快照也讲完了。所以这里
SyncRequestProcessor
的作用就已经讲完了,它就是做持久化事物以及打快照用的。那么最终所有的逻辑都会走到
run()
方法的
nextProcessor
这里,也就是
FinalRequestProcessor.processRequest(si)
:
public void run() {
try {
/**略**/
if (si != null) {
if (zks.getZKDatabase().append(si)) {
/**已经讲过,略**/
} else if (toFlush.isEmpty()) {
if (nextProcessor != null) {//最终都会走到这里来发给下一个处理器
nextProcessor.processRequest(si);
if (nextProcessor instanceof Flushable) {
((Flushable)nextProcessor).flush();
}
}
continue;
}
/**flush已经讲过,略**/
}
} catch (Throwable t) {
/**略**/
}
LOG.info("SyncRequestProcessor exited!");
}
FinalRequestProcessor
我们之前已经说过这个
Processor
是更新内存用的,所以我们预期
processRequest(si)
这个方法里面应该有很多代码,大多都是和我们主题不相关的以及
switch
分支,所以我们还是挑选
create
命令作为讲解点,暂时无关的代码则会被忽略掉:
public void processRequest(Request request) {
/**暂时无关代码略**/
ProcessTxnResult rc = null;
synchronized (zks.outstandingChanges) {//获取修改记录
while (!zks.outstandingChanges.isEmpty()
&& zks.outstandingChanges.get(0).zxid <= request.zxid) {
ChangeRecord cr = zks.outstandingChanges.remove(0);//拿出修改记录cr,往下
/**跳过检测**/
}
if (request.hdr != null) {
TxnHeader hdr = request.hdr;
Record txn = request.txn;
//把head和txn传进去,更新内存
rc = zks.processTxn(hdr, txn);
}
/**集群相关先不看**/
}
/**closeSession相关先不看**/
/**暂时无关代码略**/
try {
/**暂时无关代码略**/
switch (request.type) {//最后就是返回结果了,所以这里又有了switch
case OpCode.create: {//如果是create
lastOp = "CREA";
rsp = new CreateResponse(rc.path);//把创建的路径传进去
err = Code.get(rc.err);//如果有问题,传入error
break;
}
case OpCode.****: {
/**略**/
break;
}
}
} catch (****Exception e) {
/**Exception 略**/
}
/**暂时无关代码略**/
try {//最终在这里Response数据
cnxn.sendResponse(hdr, rsp, "response");
if (closeSession) {
cnxn.sendCloseSession();
}
} catch (IOException e) {
LOG.error("FIXMSG",e);
}
}
进入以后我们就看到了一个很熟悉的东西
outstandingChanges
这个队列,这个在PrepRequestProcessor里用来记录父节点和子节点的修改记录,如果里面有记录
outstandingChanges
不是空的,就进入
while
循环,拿出
ChangeRecord cr = zks.outstandingChanges.remove(0);
修改记录cr。往下看,如果
RequestHeader
存在
if (request.hdr != null)
,那么就把head和txn传进
zks.processTxn(hdr, txn);
去更新内存。如果更新内存也没有问题,下一步就是返回结果了,所以这里又有了
switch
用来匹配返回的结果是什么类型的,我们到
case OpCode.create:
发现这里面直接new了一个
CreateResponse(rc.path)
,并且把修改记录的路径
rc.path
传进去了,break出去,就到最后了
cnxn.sendResponse(hdr, rsp, "response");
最终在这里把结果发送到
socket
里面去。我们进入这个方法看下,这是一个接口,所以我们还是去实现方法
NIOServerCnxn.sendResponse(***)
里面:
synchronized public void sendResponse(ReplyHeader h, Record r, String tag) {
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos);
try {
baos.write(fourBytes);
bos.writeRecord(h, "header");
if (r != null) {
bos.writeRecord(r, tag);
}
baos.close();
} catch (IOException e) {
LOG.error("Error serializing response");
}
byte b[] = baos.toByteArray();
ByteBuffer bb = ByteBuffer.wrap(b);
bb.putInt(b.length - 4).rewind();
sendBuffer(bb);//发送到buffer里
if (h.getXid() > 0) {
synchronized(this){
outstandingRequests--;
}
synchronized (this.factory) {
if (zkServer.getInProcess() < outstandingLimit
|| outstandingRequests < 1) {
sk.selector().wakeup();
enableRecv();
}
}
}
} catch(Exception e) {
LOG.warn("Unexpected exception. Destruction averted.", e);
}
}
这里很明显,把我们的
ReplyHeader
和
Record
转化为字节对象,然后创建
ByteBuffer
,最后
sendBuffer(bb);
发送到
Buffer
里面去,一路点进去就会看到
sock.write(bb);
最后还是用过
socket
发送的。
更新内存
到这里就剩最后一个问题了,内存是怎么更新的,我们回到
zks.processTxn(hdr, txn);
,这个方法传入的参数是Header和Txn事物做什么呢:
public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) {
ProcessTxnResult rc;
int opCode = hdr.getType();
long sessionId = hdr.getClientId();
rc = getZKDatabase().processTxn(hdr, txn);//走进这里来
/**暂时无关代码略**/
return rc;
}
继续走到
getZKDatabase().processTxn(hdr, txn);
这个方法里去:
public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) {
return dataTree.processTxn(hdr, txn);
}
接着到
dataTree.processTxn(hdr, txn);
里面,我们也是只看
create
命令的内容:
case OpCode.create:
CreateTxn createTxn = (CreateTxn) txn;
rc.path = createTxn.getPath();
//看createNode方法
createNode(
createTxn.getPath(),
createTxn.getData(),
createTxn.getAcl(),
createTxn.getEphemeral() ? header.getClientId() : 0,
createTxn.getParentCVersion(),
header.getZxid(), header.getTime());
break;
这里创建了一个createNode,并且传入了路径, 数据,acl,是不是临时节点,父节点的Cversion等等内容。如果点进去这里,其实我们只有两个地方要关注的:
//修改内存中的DataNode
DataNode child = new DataNode(parent, data, longval, stat);
//抛出事件,一个是节点创建的事件,一个是孩子节点改变的事件
dataWatches.triggerWatch(path, Event.EventType.NodeCreated);
childWatches.triggerWatch(parentName.equals("") ? "/" : parentName,
Event.EventType.NodeChildrenChanged);
return path;
第一修改内存中的数据,也就是DataNode,到了这里整个事物已经持久化好了,所以只要修改内存里面的数据就好了,便于使用。第二create命令完成以后其实就是一个
NodeCreated
的事件,所以就会抛出两个事件出去:1. 节点创建的事件;2. 孩子节点改变
NodeChildrenChanged
的事件。至于这个事件怎么用,我们下节课讲Watch的时候再说。最后
return path;
把修改的路径传递出去。
总结
这样单机模式下服务器的流程就结束了,按照惯例,我会画流程图帮助大家理解这个流程,那么我们的本系列的下一篇
Zookeeper 源码解读系列, 单机模式(五)
就要介绍EventThread这个线程在客户端做了什么事情。
Zookeeper单机模式:服务器启动流程图
Zookeeper单机模式:处理器链执行逻辑图