Zookeeper 源码解读系列, 单机模式(四)

  • Post author:
  • Post category:其他




前情提要

上一篇

【Zookeeper 源码解读系列, 单机模式(三)】

我们讲了单机模式下服务器的启动,配置参数的解析,ZooKeeperServerMain的启动,服务端启动的步骤,启动时加载数据,最后是ZookeeperServer的启动。由于篇幅太长,考虑到大家阅读起来比较困难,就把单机模式下最重要也是最不好理解的一篇拆分出来,单独做为一篇以降低读者阅读时的疲劳感。那么我们就接着上一次的话题继续讲ZookeeperServer启动结束以后,又做了什么事情。本篇也会被收录到

【Zookeeper 源码解读系列目录】

中。



请求处理器链的概念

现在我们先把代码放一边,先总结下到目前为止服务器都做了什么。刚才我们说到服务端启动的时候,在

NIOServerCxnFactory.startup()

的方法里启动了一个线程,就是

NIOServerCxnFactory

这个NIO的线程,这是第一件事情;第二件事情就是加载数据;然后又做了第三件事情,在

ZookeeperServer.startup()

方法里启动了一个session的跟踪器,这个也是一个线程,我们刚刚说国;最后就是

setupRequestProcessors()

,这里看名字是在设置请求处理器(

RequestProcessors

)。顾名思义就是要处理请求的,这里其实一个非常重要的点,为什么说很重要呢?

因为这几乎就是Zookeeper处理事务的核心逻辑,在讲解代码之前,需要先从概念上理清楚逻辑,否则直接看代码真的是一场噩梦。我们先进入

setupRequestProcessors();

看看这里又写了什么事情:

protected void setupRequestProcessors() {
    RequestProcessor finalProcessor = new FinalRequestProcessor(this);
    RequestProcessor syncProcessor = new SyncRequestProcessor(this, finalProcessor);
    ((SyncRequestProcessor)syncProcessor).start();
    firstProcessor = new PrepRequestProcessor(this, syncProcessor);
    //这里启动的是PrepRequestProcessor首个处理器
    ((PrepRequestProcessor)firstProcessor).start();
}

进去以后看到这里面有三个处理器Processor的类而且,全都new了出来,这些是什么呢?那么就要重点介绍一下ZooKeeper的请求处理器(

RequestProcessors

)的逻辑,这个逻辑和我们一般意义上的链表很像,所以权且把这个逻辑叫做处理器链。每当一个请求过来的时候,就会通过不同的请求处理器去处理不同的逻辑。在单机模式下有三种处理器(也是三个线程类):


PrepRequestProcessor、SyncRequestProcessor、FinalRequestProcessor

,这些处理器分开来说,就是做下面这些任务的:


PrepRequestProcessor

->做认证,生成txn事务对象,完成后转交给

SyncRP



SyncRequestProcessor

->事务对象持久化(生成事务文件),打快照,完成后转交给

FinalRP



FinalRequestProcessor

->做内存更改,处理事件响应啥的都是在这里弄的,返回客户端响应

所以整条链就是这样的:


PrepRequestProcessor.next=SyncRequestProcessor.next=FinalRequestProcessor


为什么可以这样写呢?因为这些处理器里面都有一个next的属性,指向下一下处理器。此处要注意记住一点:第一个请求

firstProcessor



NIOServerCnxn

中被调用的,也就是说处理器链的实例化以及启动都是在服务端。

介绍完什么是处理器链的概念,我们现在说到这里,其实遗留了两大问题:


  1. NIOServerCxnFactory

    这个线程做了什么事情?
  2. 我们根据NIO的思想从

    socket-channel

    取出数据以后,谁去做了处理?

根据我们之前的分析,NIO是处理

Socket

连接的,所以问题1取数据应当是

NIOServerCxnFactory

做的。而问题2呢,当然应该是我们的处理器链做的。那么后面我们看下代码里面是不是和我们的猜想一样。



NIOServerCnxnFactory线程

如刚才所说先去看run()方法,其实从道理上讲,

NIOServerCnxnFactory

的逻辑应该和我们客户端

NIOClientCnxn

的逻辑应该是镜像相反的。所以这里面也应该有读和写,那让我们看下代码:

public void run() {
  while (!ss.socket().isClosed()) {
    try {
      /**构建选择器,从channel里面拿出数据,封装为selectedList,略过**/
      for (SelectionKey k : selectedList) {
          //如果取到的是一个connection的请求,则建立连接
          if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
              SocketChannel sc = ((ServerSocketChannel) k .channel()).accept();
              InetAddress ia = sc.socket().getInetAddress();
              int cnxncount = getClientCnxnCount(ia);
              if (maxClientCnxns > 0 && cnxncount >= maxClientCnxns){
                  LOG.warn("Too many connections from " + ia + " - max is " + maxClientCnxns );
                  sc.close();
              } else {
                  LOG.info("Accepted socket connection from "
                           + sc.socket().getRemoteSocketAddress());
                  sc.configureBlocking(false);
                  SelectionKey sk = sc.register(selector, SelectionKey.OP_READ);
                  NIOServerCnxn cnxn = createConnection(sc, sk);//建立连接
                  sk.attach(cnxn);
                  addCnxn(cnxn);
              }
          } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
              //如果不是建立连接的,就是一些写或者读的数据
              NIOServerCnxn c = (NIOServerCnxn) k.attachment();
              c.doIO(k);//和客户端一样开始doIO
          } else {
              if (LOG.isDebugEnabled()) {
                  LOG.debug("Unexpected ops in select " + k.readyOps());
              }
          }
      }
      selected.clear();
    } catch (***Exception e) {
    	/**异常略**/
    }
  }
  closeAll();
  LOG.info("NIOServerCnxn factory exited run method");
}

我们略过选择器的部分,到

for (SelectionKey k : selectedList)

循环这里,碰见了第一个判断条件

if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0)

,这里的连接是和channel连接,所以是

OP_ACCEPT

字段,所以说取到的是一个

Connection

的请求,那么就会走进去,做一些绑定地址,校验参数之类的工作,然后建立连接NIOServerCnxn cnxn = createConnection(sc, sk);,这里没什么好说的。我们走到

else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0)

里面,如果不是建立连接的,就是一些写或者读的数据,当然也会间歇性的收到客户端的ping。那么我们是不是找到了一个很熟悉的方法

c.doIO(k);

呢,所以我们就着重去里面看:

void doIO(SelectionKey k) throws InterruptedException {
    try {
		/**验证socket没有打开,return;,略过**/
        if (k.isReadable()) {//读取准备完毕
            int rc = sock.read(incomingBuffer);
            if (rc < 0) {//剩下的数据小于0,报错
            	/**抛出EndOfStreamException数据异常**/
            }
            if (incomingBuffer.remaining() == 0) {
                boolean isPayload;
                if (incomingBuffer == lenBuffer) {
                    incomingBuffer.flip();
                    isPayload = readLength(k);
                    incomingBuffer.clear();
                } else {
                    isPayload = true;
                }
                if (isPayload) {
                    readPayload();//正式加载数据
                }
                else { return; }
            }
        }
        if (k.isWritable()) {
        	/**写就绪,略过**/
        }
    } catch (***Exception e) {
        /**各种异常的逻辑,略过**/
    }
}

我们进入以后看到如果

k.isReadable()

都就绪了,那么就读取客户端发来的数据,这里没什么可说的,如果一切都做好了那么就开始

readPayload();

正式加载数据了:

private void readPayload() throws IOException, InterruptedException {
    if (incomingBuffer.remaining() != 0) {//是否还有遗留的数据
        int rc = sock.read(incomingBuffer);//接着读剩下的数据
        if (rc < 0) {//剩下的数据小于0,报错
           /**抛出EndOfStreamException数据异常**/
        }
    }
    if (incomingBuffer.remaining() == 0) { //如果全部读完了
        packetReceived();//计数的方法
        incomingBuffer.flip();
        if (!initialized) {//初始化Y/N
            readConnectRequest();//N:重新连接
        } else {
            readRequest(); //Y:开始读取请求
        }
        lenBuffer.clear();
        incomingBuffer = lenBuffer;
    }
}

进入以后又开始判断是不是还有遗留的数据

if (incomingBuffer.remaining() != 0)

,如果有那就继续把剩余的

sock.read(incomingBuffer)

数据都读出来,如果

if (incomingBuffer.remaining() == 0)

如果全部读完了,就可以请求了,

packetReceived();

这是个计数器没什么用略过,继续判断

if (!initialized)

有没有初始化成功,如果没有重新连接

readConnectRequest();

,如果已经就绪开始读取请求

readRequest();

,所以我们还得去

readRequest();

里面看内容:

    private void readRequest() throws IOException {
        zkServer.processPacket(this, incomingBuffer);
    }

里面只有一句话,那我们还得接着往里面走,但是要说明一点:到了这里服务器实例已经开始处理收到的packet了,

incomingBuffer

就是

socket

里面的数据,下面进入

processPacket(this, incomingBuffer);

public void processPacket(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException {
    InputStream bais = new ByteBufferInputStream(incomingBuffer);
    BinaryInputArchive bia = BinaryInputArchive.getArchive(bais);
    RequestHeader h = new RequestHeader();//构造请求头
    h.deserialize(bia, "header");
    incomingBuffer = incomingBuffer.slice();
    //根据header的不同类型进行处理
    if (h.getType() == OpCode.auth) {//这个auth其实就是addauth命令
        /**这里我们先不看,后面ACL会仔细讲解**/
    } else {//不是auth
        if (h.getType() == OpCode.sasl) {//如果是sasl认证
           /**sasl命令认证逻辑也先略过**/
        }
        else {//如果不是auth也不是sasl,到这里来
            Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(), 
              h.getType(), incomingBuffer, cnxn.getAuthInfo());//直接从socket里面拿出一个request
            si.setOwner(ServerCnxn.me);
            submitRequest(si);//拿出后,提交request
        }
    }
    cnxn.incrOutstandingRequests(h);
}

进入以后发现,首先还是解析数据

incomingBuffer

包装成

BinaryInputArchive

,然后

RequestHeader h = new RequestHeader();

构造请求头,这里在客户端似曾相识对吧,还有更熟悉的,后面都是根据

RequestHeader

的类型

getType()

来判断传递过来的是什么命令,几乎和客户端是一样的逻辑。我们这里先略过auth和sasl,直接到命令的分支

else

中,第一件事情就是取出一个请求

si = new Request(cnxn, cnxn.getSessionId(), h.getXid(), h.getType(), incomingBuffer, cnxn.getAuthInfo());

,这里要注意

cnxn

这个参数,当初传入的是可是this,所以这个其实传入的是

NIOServerCnxn

这个类,那么这个类的实例又是被

NIOServerCnxnFactory

实现的,绕了一圈最终还是回到了这里。那么拿出来了

Request

以后还要提交这个请求

submitRequest(si);

,下面就进入这个方法:

public void submitRequest(Request si) {
    if (firstProcessor == null) { //firstProcessor出现了
        synchronized (this) {
            try {
                while (state == State.INITIAL) {//如果正在初始化,那就等着初始化完毕
                    wait(1000);
                }
            } catch (InterruptedException e) {
                LOG.warn("Unexpected interruption", e);
            }
            if (firstProcessor == null || state != State.RUNNING) {
                throw new RuntimeException("Not started");
            }
        }
    }
    try {
        touch(si.cnxn);
        boolean validpacket = Request.isValid(si.type);//判断request的类型是不是合法的
        if (validpacket) {
            firstProcessor.processRequest(si);   //判断完毕以后到这里来
            if (si.cnxn != null) {
                incInProcess();
            }
        } else {
            LOG.warn("Received packet at server of unknown type " + si.type);
            new UnimplementedRequestProcessor().processRequest(si);
        }
    } catch (***Exception e) {
        /**多个异常,略**/
    }
}

一进来就看到一个老熟人儿

firstProcessor

,接着判断如果

firstProcessor

没有初始化好,或者正在初始化,那就等着初始化结束,因为正常情况下

firstProcessor

在刚才

setupRequestProcessors()

方法里已经初始化好了的,而且立刻就开启了。正常情况下是不会走进这里来的,我们跳过到下面的

try-catch

中,首先

validpacket = Request.isValid(si.type)

判断

request

的类型是不是合法的,如果不合法怎么处理的有兴趣的同学可以自己研究下,那么我们只看正常情况,到了

firstProcessor.processRequest(si);

这里终于和我们的处理器链接上头了。



解读处理器链


firstProcessor

有印象的同学一定还记得这个是

PrepRequestProcessor

的实例对象,其实

processRequest(si)

这个方法是一个接口,实现的Processor类有很多,但是这里我们就先去

PrepRequestProcessor.processRequest(si)

看下内容:

   public void processRequest(Request request) {
        submittedRequests.add(request);
    }

进入以后发现里面什么逻辑都没有,只是把这个request加到了一个List里面,这是个什么东西?这其实是一个队列

queue

。到这里我们必须先停一下,捋一捋目前为止客户端和服务端都做了什么,否则我们的逻辑就乱了,那么请大家带着NIO的思想和笔者一起回顾一下之前都讲了什么:

讲解完什么是处理器链以后我留了两问题:

[1.NIOServerCnxnFactory这个线程做了什么事情?]

首先这个里面有一个

NIOServerCnxn

这个和客户端的

NIOClientCnxn

是一样的,所以这俩其实是配套使用的,那么

NIOServerCnxn

这里面就也有一个

doIO()

方法。顾名思义

doIO()

里面既有读数据,也有写数据(这里我们没有讲,有兴趣自己看一下,不影响流程)。那么连接的时候,我们客户端连接完了会发送一个

ConnectRequest

的请求过来,所以读取的时候服务端也会先去看下是不是读取到了

ConnectRequest

请求,读到了就回去进行一个连接处理。如果说不是这个那就是其他命令请求(比如说

create

),如果说读到了这种请求,那么就会调用

ZookeeperServer.processPacket()

构造出

request

,但是这里要提醒一点,客户端发的是什么?是

packet

,所以服务端首先要做的也是读取出来

packet

然后再构造

request

。再然后就

ZookeeperServer.submitRequest()

提交请求。其实这里使用的也是NIO的思想,这个方法我们刚刚看过,也不是同步的取出里请求,而是放到一个队列里,这个队列就是上面

processRequest()

方法里面的

submittedRequests

,放到队列里面以后谁去处理呢?我们一开始讲处理器链的时候就说过,处理器类都是线程,而这个线程早就已经开启了。那么当这个

submittedRequests

队列里有数据的时候,

firstProcessor

这个线程就会从队列

queue

里面拿出

Request

进行处理,然后再去执行整个请求处理链的逻辑,最终执行到

finalProcessor

把结果返回出去。通过以上整理我们画个简图加深一下理解:

客户端


request--包装-->packet--存入-->outgoingQueue--发送-->socket


服务端:


socket--取出包装-->packet--构造-->request--存入-->submittedRequests--取出-->firstProcessor.run()处理request,走处理器链逻辑


可以看出client和server两个地方的逻辑差不多就是镜像的,但是都深入的利用了NIO的思想。



PrepRequestProcessor

现在我们可以去看处理器链是怎么玩的,并且把

[问题2:取出数据以后,谁去做了处理?]

一起解决。所以说我们现在要去的地方就是

PrepRequestProcessor.run()

,看看我们之前的分析对不对:

public void run() {
    try {
        while (true) {
            Request request = submittedRequests.take();//取出请求
            /**日志相关,不重要,略**/
            pRequest(request);//这里重点,开始处理
        }
    } catch (***Exception e) {
        /**异常相关,不重要,略**/
    } 
    LOG.info("PrepRequestProcessor exited loop!");
}

我们进入以后第一眼看到了

Request request = submittedRequests.take()

,拿出请求封装成

request

,这就说明我们分析的没错。既然取出来了,那就要开始处理,所以还得进入

pRequest(request);

看一看,这个方法很长会酌情删除大部分不相干的代码:

protected void pRequest(Request request) throws RequestProcessorException {
    request.hdr = null;
    request.txn = null;
    try {//判断是什么命令
        switch (request.type) {
            case OpCode.create:
            CreateRequest createRequest = new CreateRequest();//构造create request
            pRequest2Txn(request.type, zks.getNextZxid(), request, createRequest, true);
            break;
        case OpCode.***:
        	/**这里case有很多比如delete,setData,getACL,ping之类的,全部略过**/
			break;
        default:
            LOG.warn("unknown type " + request.type);
            break;
        }
    } catch (***Exception e) {
    		/**跳过exception**/
    }
    request.zxid = zks.getZxid();
    nextProcessor.processRequest(request);//指向下一个处理器Sync
}

我先解释一下,这里基本上把代码删除的每几行了,只保留了我们需要讲解的部分,其实这个方法很长,全部贴出会对写文章造成很大干扰,也不方便大家阅读,有兴趣可以自己使用源码看下,下面我们继续。

进入以后可以看到首先

switch (request.type)

判断是什么命令,我们还是以Create命令作为例子,所以只保留了

case OpCode.create:

相关的代码。进入

case

首先就构造了一个

CreateRequest

的实例,然后传入了这样一个方法里

pRequest2Txn(request.type, zks.getNextZxid(), request, createRequest, true);

,那么我们打开

pRequest2Txn(***)

看看里面写了什么,老规矩,我们还是只保留create相关的:

protected void pRequest2Txn(int type, long zxid, Request request, Record record, boolean deserialize)
    throws KeeperException, IOException, RequestProcessorException
{
    request.hdr = new TxnHeader(request.sessionId, request.cxid, zxid,
                                Time.currentWallTime(), type);
    switch (type) { //匹配type
        case OpCode.create:    //如果是createRequest
            zks.sessionTracker.checkSession(request.sessionId, request.getOwner());//验证session
            CreateRequest createRequest = (CreateRequest)record; //构造createRequest,此时为null
            if(deserialize) //从request里面反序列化出来createRequest
                ByteBufferInputStream.byteBuffer2Record(request.request, createRequest);
            String path = createRequest.getPath();//拿到path
            int lastSlash = path.lastIndexOf('/');//下面是一些路径的验证
            if (lastSlash == -1 || path.indexOf('\0') != -1 || failCreate) {
            	/**不通过,抛KeeperException.BadArgumentsException**/
            }
            List<ACL> listACL = removeDuplicates(createRequest.getAcl());//ACL的验证
            if (!fixupACL(request.authInfo, listACL)) {
                throw new KeeperException.InvalidACLException(path);
            }
            String parentPath = path.substring(0, lastSlash);//拿出父节点的路径
            ChangeRecord parentRecord = getRecordForPath(parentPath);//根据路径找到父节点记录record
            checkACL(zks, parentRecord.acl, ZooDefs.Perms.CREATE,
                    request.authInfo);//验证创建权限
            int parentCVersion = parentRecord.stat.getCversion();//取父节点的Cversion
            CreateMode createMode =
                CreateMode.fromFlag(createRequest.getFlags());//取出来createMode
            //如果createMode是序列化的,那么取出parentCVersion,这里是上面parentPath取出来的
            if (createMode.isSequential()) {//如果是顺序节点
                path = path + String.format(Locale.ENGLISH, "%010d", parentCVersion);
            }
            validatePath(path, request.sessionId);
            try {
                if (getRecordForPath(path) != null) {
                    throw new KeeperException.NodeExistsException(path);
                }
            } catch (KeeperException.NoNodeException e) {
                // ignore this one
            }
            //看下父节点是不是临时节点,getEphemeralOwner()存的是Sessionid,如果不是0那就肯定是临时节点
            boolean ephemeralParent = parentRecord.stat.getEphemeralOwner() != 0;
            if (ephemeralParent) {
            //如果是true,父节点就是临时节点抛出异常
                throw new KeeperException.NoChildrenForEphemeralsException(path);
            }
            //newCversion就是要真正创建的节点,取了当前父节点的子节点的版本+1
            int newCversion = parentRecord.stat.getCversion()+1;
            //txn就是事务
            request.txn = new CreateTxn(path, createRequest.getData(),
                    listACL,
                    createMode.isEphemeral(), newCversion);
            StatPersisted s = new StatPersisted();
            if (createMode.isEphemeral()) {
                s.setEphemeralOwner(request.sessionId);
            }
            //刷新父节点记录
            parentRecord = parentRecord.duplicate(request.hdr.getZxid()); 
            parentRecord.childCount++;
            parentRecord.stat.setCversion(newCversion);
            //第一个修改添加父节点修改记录
            addChangeRecord(parentRecord);
            //第二个修改添加子节点修改记录
            addChangeRecord(new ChangeRecord(request.hdr.getZxid(), path, s,
                    0, listACL));
            break;
        case OpCode.****:
            break;
        default:
            LOG.error("Invalid OpCode: {} received by PrepRequestProcessor", type);
    }
}

进入方法以后,首先我们还是去

case OpCode.create:

这里case里面,首先强制转换一个

record

对象为

CreateRequest

对象,

CreateRequest

继承自

Record

强转没问题,但是看清楚这个时候这个对象是空的,然后从request里反序列化

ByteBufferInputStream.byteBuffer2Record(request.request, createRequest)

出来读出

createRequest

,此时

createRequest

就又值了。接着拿到命令里

path = createRequest.getPath();

路径,这个路径就是输入create命令后的那个路径。然后是验证路径,验证acl,不多说。再往后是拿出

parentPath = path.substring(0, lastSlash)

父节点的路径,又根据路径

getRecordForPath(parentPath)

找到父节点记录

record

,这个方法就是从ZKDatabase中拿出最近的一次修改记录,因为一个节点目前的值只和最近的一次修改有关系,所以这里取出的最近的一条修改记录,就相当于取出了父节点最新的记录。再次验证父节点的acl权限,又取

parentCVersion = parentRecord.stat.getCversion();

父节点的Cversion,这个Cversion的意思就是子节点的版本。再后面构造创建模式

CreateMode createMode = CreateMode.fromFlag(createRequest.getFlags());

这是做什么用的呢?这个就是取出创建的是什么样的节点。如果说创建的是

if (createMode.isSequential())

顺序节点,这里对路径做了一个格式操作,再往下判断拿出的父节点是不是临时节点

if (ephemeralParent)

,如说是临时节点就抛出异常,这里的输出就是试图给临时节点创建子节点的时候打印出来的异常信息。再往下更新版本

newCversion = parentRecord.stat.getCversion()+1;

取得就是父节点的版本+1,

newCversion

就是要真正创建的节点的版本。再往下就是

request.txn

终于我们到了和事务有关系的地方了,记住这里,这里给

request

中的

txn

赋值。然后刷新父节点记录,因为每创建了一个子节点就会进行两次修改,第一个修改是修改添加父节点记录,第二个修改是当前要创建的子节点的记录。这里我们点击去,看看添加到哪里去了

addChangeRecord(parentRecord);

,进入:

void addChangeRecord(ChangeRecord c) {
    synchronized (zks.outstandingChanges) {
        zks.outstandingChanges.add(c);
        zks.outstandingChangesForPath.put(c.path, c);
    }
}

这里发现这些修改记录被添加到

outstandingChanges

这个

List

里面了,这其实也是一个NIO中的

queue

,那么就肯定有东西,把修改记录从这个list里面取出来,做持久化啊,更新内存等等。

那这些方法都执行完了以后,我们一级一级往上跳出还是回到

pRequest(request);

这个方法里来:

protected void pRequest(Request request) throws RequestProcessorException {
    request.hdr = null;
    request.txn = null;
    try {//判断是什么命令
        switch (request.type) {
           /**走完了,略过**/
        }
    } catch (***Exception e) {
    		/**跳过exception**/
    }
    request.zxid = zks.getZxid();
    nextProcessor.processRequest(request);//指向下一个处理器Sync
}

现在就剩下最后两行了,其实我们的

PrepRequestProcessor

这第一个处理器也已经执行完了,所以我们该调用下一个处理器了

nextProcessor.processRequest(request);

。先不忙去

SyncRequestProcessor

里面去看,到这里或许会有些同学很迷茫怎么

nextProcessor

就是

SyncRequestProcessor

呢?这里我们就得先回到

ZookeeperServer

中的

setupRequestProcessors()

里面:

protected void setupRequestProcessors() {
    RequestProcessor finalProcessor = new FinalRequestProcessor(this);
    RequestProcessor syncProcessor = new SyncRequestProcessor(this, finalProcessor);
    ((SyncRequestProcessor)syncProcessor).start();
    firstProcessor = new PrepRequestProcessor(this, syncProcessor);
    ((PrepRequestProcessor)firstProcessor).start();
}

当我们

new PrepRequestProcessor

的时候,我们传入了两个参数,一个是

this

肯定就是

PrepRequestProcessor

,第二个就是

syncProcessor

。聪明的同学肯定想到了我们传递的

syncProcessor

就是在这个构造方法里赋值了,没错就是这样:

public PrepRequestProcessor(ZooKeeperServer zks,
        RequestProcessor nextProcessor) {
    super("ProcessThread(sid:" + zks.getServerId() + " cport:"
            + zks.getClientPort() + "):", zks.getZooKeeperServerListener());
    this.nextProcessor = nextProcessor;//赋值传递
    this.zks = zks;
}

看这句话

this.nextProcessor = nextProcessor;

这个就是赋值了,我们可以看到后续

SyncRequestProcessor

也是一样,都是把后面的

Processor

对象传递进来然后直接赋值给

nextProcessor

,完成了传递的过程。好那现在我们去看

SyncRequestProcessor.processRequest(request)

    public void processRequest(Request request) {
        queuedRequests.add(request);
    }



SyncRequestProcessor

到这里来,看到也是把当前的请求加入到一个队列

queuedRequests

里,不用说一定是

SyncRequestProcessor.run()

里面在使用,所以我们过去看:

public void run() {
    try {
        int logCount = 0;
        setRandRoll(r.nextInt(snapCount/2));
        while (true) {
            Request si = null;
            if (toFlush.isEmpty()) {
                si = queuedRequests.take();
            } else {
                si = queuedRequests.poll();
                if (si == null) {
                    flush(toFlush);
                    continue;
                }
            }
            if (si == requestOfDeath) {
                break;
            }
            if (si != null) {
                if (zks.getZKDatabase().append(si)) {//如果流放成功了
                    logCount++;
                    if (logCount > (snapCount / 2 + randRoll)) {
                    	/**打快照,暂时略**/
                    }
                } else if (toFlush.isEmpty()) {
               		/**nextProcessor暂时略**/
                }
                toFlush.add(si);//到这里最终把request都加到toFlush这个list里面
                if (toFlush.size() > 1000) {//如果总数大于1000则把流都flush到文件里
                    flush(toFlush);
                }
            }
        }
    } catch (Throwable t) {
        handleException(this.getName(), t);
        running = false;
    }
    LOG.info("SyncRequestProcessor exited!");
}

到这里来,这里就是取出queuedRequests里的请求,这里也是做持久化的处理器。首先也是取数据

si = queuedRequests.take();

如果

toFlush.isEmpty()==false

说明没有取完,接着取。取完了以后,发现取出来的

request

不是空的

if (si != null)

就把请求加载

Database

类上,这里我们到

zks.getZKDatabase().append()

方法里:

public boolean append(Request si) throws IOException {
    return this.snapLog.append(si);//snapLog事务快照
}


snapLog

我们之前介绍过,这个就是

FileTxnSnapLog

事务快照的工具类,我们接着进

this.snapLog.append()

:

public boolean append(Request si) throws IOException {
    return txnLog.append(si.hdr, si.txn);//txnLog 事务log
}

发现最终被加入事务log的工具类

TxnLog

中,那传递进来的

si.hdr



si.txn

是什么呢?这

si.hdr

这个一看就知道是

RequestHeader

嘛,第二个

si.txn

则是我们在

PrepRequestProcessor

中的

pRequest2Txn(***)

里面new出来的:

request.txn = new CreateTxn(...)

,就是我们创建的

create

命令的事务。我们接着

txnLog.append()

进入

public synchronized boolean append(TxnHeader hdr, Record txn) throws IOException
{
	/**抛开验证往下走**/
  if (logStream==null) {//logStream log流对象
     if(LOG.isInfoEnabled()){
          LOG.info("Creating new log file: " + Util.makeLogName(hdr.getZxid()));
     }
     logFileWrite = new File(logDir, Util.makeLogName(hdr.getZxid())); //生成文件对象
     //放到输出流对象里
     fos = new FileOutputStream(logFileWrite);
     logStream=new BufferedOutputStream(fos);
     oa = BinaryOutputArchive.getArchive(logStream);
     FileHeader fhdr = new FileHeader(TXNLOG_MAGIC,VERSION, dbId);
     fhdr.serialize(oa, "fileheader");
     logStream.flush();
     filePadding.setCurrentSize(fos.getChannel().position());
     streamsToFlush.add(fos);
  }
  filePadding.padFile(fos.getChannel());
  byte[] buf = Util.marshallTxnEntry(hdr, txn);
  if (buf == null || buf.length == 0) {
      throw new IOException("Faulty serialization for header " +
              "and txn");
  }
  Checksum crc = makeChecksumAlgorithm();
  crc.update(buf, 0, buf.length);
  oa.writeLong(crc.getValue(), "txnEntryCRC");
  Util.writeTxnBytes(oa, buf);
  return true;
}

看到一个关键的对象

logStream

,log流对象,紧接着就是一个

File

对象

logFileWrite = new File(logDir, Util.makeLogName(hdr.getZxid()));

把配置的路径

logDir

和事务id(

Zxid

)传进去。再把文件对象

logFileWrite

放到文件输出流对象里

fos = new FileOutputStream(logFileWrite);

,然后再包装为

BufferedOutputStream

对象

logStream

,后面都是文件的标准流程不多解释,做完所有的工作以后

return true

出去,但是直到

return

都没有写文件的步骤,所以这里面只是把事务给放到流里面去,所以写文件步骤一定在外面,那么得一路跳出去了,还是回到

run()

方法里。



持久化事物

————————————————————–回到

run()

方法————————————————————————-

这里就又回到

run()

方法的解读了,如果流放成功了,但是要记得我们在里面就没还有看到

flush()

,说明还没有存到文件里,继续进入

if

语句

logCount++

日志计数器加1,然后

if (logCount > (snapCount / 2 + randRoll))

这里是打快照的一会儿再说,我们假定是false跳过。直接到

toFlush.add(si);

这句话来,这里就把请求

Request

加到了

toFlush

这个

List

里面,然后如果

if (toFlush.size() > 1000)

如果总数大于1000则把流都

flush(toFlush);

出去,那么

flush

的是什么东西呢?我们进入看:

private void flush(LinkedList<Request> toFlush) throws IOException, RequestProcessorException
{	
	/**略**/
    zks.getZKDatabase().commit();
    /**略**/
}

这里就

commit()

提交到了到了

ZKDatabase()

,我们接着进入

commit()

看看提交的是什么:

public void commit() throws IOException {
    this.snapLog.commit();
}

接着进入

snapLog.commit();

public void commit() throws IOException {
    txnLog.commit();
}

继续进入

txnLog.commit();

    public synchronized void commit() throws IOException {
        if (logStream != null) {
            logStream.flush();
        }
       /**略**/
    }

这里我们就看到了IO接口里的flush(),这里就是写出去文件的地方了,写的是什么呢就是logStream这个刚才我们一层又一层的封装的流对象。OK,那我们再回到run()方法里。



打快照SnapLog

————————————————————–回到

run()

方法——————————————————

那么我们回到

if (logCount > (snapCount / 2 + randRoll))

这里,我把这一块单独拿出来:

public void run() {
    try {
      /**略**/
      if (si != null) {
          if (zks.getZKDatabase().append(si)) {//如果流放成功了
              logCount++;//+1记录一下
              if (logCount > (snapCount / 2 + randRoll)) {//这里是什么样的数量下要做快照
                  setRandRoll(r.nextInt(snapCount/2));
                  zks.getZKDatabase().rollLog();//滚动并生成新的快照log文件
                  if (snapInProcess != null && snapInProcess.isAlive()) {
                      LOG.warn("Too busy to snap, skipping");
                  } else {
                      snapInProcess = new ZooKeeperThread("Snapshot Thread") {//启动一个线程打快照
                              public void run() {
                                  try {
                                      zks.takeSnapshot();//打快照
                                  } catch(Exception e) {
                                      LOG.warn("Unexpected exception", e);
                                  }
                              }
                          };
                      snapInProcess.start();
                  }
                  logCount = 0;
              }
          } else if (toFlush.isEmpty()) {
              /**nextProcessor暂时略**/
          }
          /**flush已经讲过,略**/
      }
    } catch (Throwable t) {
        handleException(this.getName(), t);
        running = false;
    }
    LOG.info("SyncRequestProcessor exited!");
}

这个

if

块里的内容其实就是我们一直再说的打快照。这里是单独启动了一个线程打快照

snapInProcess

。我们接着说,如果刚才的流放成功了,那么

logCount+1

记录下来,那么就会判断多少个日志才会打一个快照,这里

snapCount

是可以在配置文件中配置的,然后根据配置的数量进行一个随机数运算,如果说大于这个数量了,首先先把随机数重置

setRandRoll(r.nextInt(snapCount/2));

,然后滚动并生成新的快照文件,把目前所有的事物都存成文件出去。我们点进去

zks.getZKDatabase().rollLog();

public void rollLog() throws IOException {
    this.snapLog.rollLog();
}

接着进入

this.snapLog.rollLog();

public void rollLog() throws IOException {
    txnLog.rollLog();
}

在进入

txnLog.rollLog();

    public synchronized void rollLog() throws IOException {
        if (logStream != null) {
            this.logStream.flush();//写入文件
            this.logStream = null;//置空,相当于生成新文件
            oa = null;
        }
    }

如果

if (logStream != null)

流不是空,就把

logStream

写入文件,然后重新生成一个新的文件

this.logStream = null;

。这里就完成了一个滚动并生成新的快照的逻辑。

到现在我们就剩下了一个事情打快照,来我们看下快照是怎么打的

zks.takeSnapshot();

public void takeSnapshot(){
    try {
        txnLogFactory.save(zkDb.getDataTree(), zkDb.getSessionWithTimeOuts());
    } catch (IOException e) {
        LOG.error("Severe unrecoverable error, exiting", e);
        System.exit(10);
    }
}

这里没什么东西,直接进入

save(***)

:

    public void save(DataTree dataTree,
            ConcurrentHashMap<Long, Integer> sessionsWithTimeouts)
        throws IOException {
        long lastZxid = dataTree.lastProcessedZxid;
        File snapshotFile = new File(snapDir, Util.makeSnapshotName(lastZxid));
        LOG.info("Snapshotting: 0x{} to {}", Long.toHexString(lastZxid),
                snapshotFile);
        snapLog.serialize(dataTree, sessionsWithTimeouts, snapshotFile);//序列化到文件
        
    }

这里面其实也没有很复杂的逻辑就是直接把整个

dataTree

直接序列化到文件里面,那么这里打快照也讲完了。所以这里

SyncRequestProcessor

的作用就已经讲完了,它就是做持久化事物以及打快照用的。那么最终所有的逻辑都会走到

run()

方法的

nextProcessor

这里,也就是

FinalRequestProcessor.processRequest(si)

public void run() {
    try {
      /**略**/
      if (si != null) {
          if (zks.getZKDatabase().append(si)) {
			/**已经讲过,略**/
          } else if (toFlush.isEmpty()) {
              if (nextProcessor != null) {//最终都会走到这里来发给下一个处理器
                  nextProcessor.processRequest(si);
                  if (nextProcessor instanceof Flushable) {
                      ((Flushable)nextProcessor).flush();
                  }
              }
              continue;
          }
          /**flush已经讲过,略**/
      }
    } catch (Throwable t) {
        /**略**/
    }
    LOG.info("SyncRequestProcessor exited!");
}



FinalRequestProcessor

我们之前已经说过这个

Processor

是更新内存用的,所以我们预期

processRequest(si)

这个方法里面应该有很多代码,大多都是和我们主题不相关的以及

switch

分支,所以我们还是挑选

create

命令作为讲解点,暂时无关的代码则会被忽略掉:

public void processRequest(Request request) {
    /**暂时无关代码略**/
    ProcessTxnResult rc = null;
    synchronized (zks.outstandingChanges) {//获取修改记录
        while (!zks.outstandingChanges.isEmpty()
                && zks.outstandingChanges.get(0).zxid <= request.zxid) {
            ChangeRecord cr = zks.outstandingChanges.remove(0);//拿出修改记录cr,往下
            /**跳过检测**/
        }
        if (request.hdr != null) {
           TxnHeader hdr = request.hdr;
           Record txn = request.txn;
           //把head和txn传进去,更新内存
           rc = zks.processTxn(hdr, txn);
        }
        /**集群相关先不看**/
    }
    /**closeSession相关先不看**/
    /**暂时无关代码略**/
    try {
    	/**暂时无关代码略**/
      switch (request.type) {//最后就是返回结果了,所以这里又有了switch
        case OpCode.create: {//如果是create
            lastOp = "CREA";
            rsp = new CreateResponse(rc.path);//把创建的路径传进去
            err = Code.get(rc.err);//如果有问题,传入error
            break;
        }
        case OpCode.****: {
            /**略**/
            break;
        }
      }
    } catch (****Exception e) {
        /**Exception 略**/
    } 
  	/**暂时无关代码略**/
    try {//最终在这里Response数据
        cnxn.sendResponse(hdr, rsp, "response");
        if (closeSession) {
            cnxn.sendCloseSession();
        }
    } catch (IOException e) {
        LOG.error("FIXMSG",e);
    }
}

进入以后我们就看到了一个很熟悉的东西

outstandingChanges

这个队列,这个在PrepRequestProcessor里用来记录父节点和子节点的修改记录,如果里面有记录

outstandingChanges

不是空的,就进入

while

循环,拿出

ChangeRecord cr = zks.outstandingChanges.remove(0);

修改记录cr。往下看,如果

RequestHeader

存在

if (request.hdr != null)

,那么就把head和txn传进

zks.processTxn(hdr, txn);

去更新内存。如果更新内存也没有问题,下一步就是返回结果了,所以这里又有了

switch

用来匹配返回的结果是什么类型的,我们到

case OpCode.create:

发现这里面直接new了一个

CreateResponse(rc.path)

,并且把修改记录的路径

rc.path

传进去了,break出去,就到最后了

cnxn.sendResponse(hdr, rsp, "response");

最终在这里把结果发送到

socket

里面去。我们进入这个方法看下,这是一个接口,所以我们还是去实现方法

NIOServerCnxn.sendResponse(***)

里面:

synchronized public void sendResponse(ReplyHeader h, Record r, String tag) {
    try {
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos);
        try {
            baos.write(fourBytes);
            bos.writeRecord(h, "header");
            if (r != null) {
                bos.writeRecord(r, tag);
            }
            baos.close();
        } catch (IOException e) {
            LOG.error("Error serializing response");
        }
        byte b[] = baos.toByteArray();
        ByteBuffer bb = ByteBuffer.wrap(b);
        bb.putInt(b.length - 4).rewind();
        sendBuffer(bb);//发送到buffer里
        if (h.getXid() > 0) {
            synchronized(this){
                outstandingRequests--;
            }
            synchronized (this.factory) {        
                if (zkServer.getInProcess() < outstandingLimit
                        || outstandingRequests < 1) {
                    sk.selector().wakeup();
                    enableRecv();
                }
            }
        }
     } catch(Exception e) {
        LOG.warn("Unexpected exception. Destruction averted.", e);
     }
}

这里很明显,把我们的

ReplyHeader



Record

转化为字节对象,然后创建

ByteBuffer

,最后

sendBuffer(bb);

发送到

Buffer

里面去,一路点进去就会看到

sock.write(bb);

最后还是用过

socket

发送的。



更新内存

到这里就剩最后一个问题了,内存是怎么更新的,我们回到

zks.processTxn(hdr, txn);

,这个方法传入的参数是Header和Txn事物做什么呢:

    public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) {
        ProcessTxnResult rc;
        int opCode = hdr.getType();
        long sessionId = hdr.getClientId();
        rc = getZKDatabase().processTxn(hdr, txn);//走进这里来
        /**暂时无关代码略**/
        return rc;
    }

继续走到

getZKDatabase().processTxn(hdr, txn);

这个方法里去:

public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) {
    return dataTree.processTxn(hdr, txn);
}

接着到

dataTree.processTxn(hdr, txn);

里面,我们也是只看

create

命令的内容:

case OpCode.create: 
    CreateTxn createTxn = (CreateTxn) txn;
    rc.path = createTxn.getPath();
    //看createNode方法
    createNode(
            createTxn.getPath(),
            createTxn.getData(),
            createTxn.getAcl(),
            createTxn.getEphemeral() ? header.getClientId() : 0,
            createTxn.getParentCVersion(),
            header.getZxid(), header.getTime());
    break;

这里创建了一个createNode,并且传入了路径, 数据,acl,是不是临时节点,父节点的Cversion等等内容。如果点进去这里,其实我们只有两个地方要关注的:

//修改内存中的DataNode
DataNode child = new DataNode(parent, data, longval, stat);
//抛出事件,一个是节点创建的事件,一个是孩子节点改变的事件
dataWatches.triggerWatch(path, Event.EventType.NodeCreated);
childWatches.triggerWatch(parentName.equals("") ? "/" : parentName,
                Event.EventType.NodeChildrenChanged);
return path;

第一修改内存中的数据,也就是DataNode,到了这里整个事物已经持久化好了,所以只要修改内存里面的数据就好了,便于使用。第二create命令完成以后其实就是一个

NodeCreated

的事件,所以就会抛出两个事件出去:1. 节点创建的事件;2. 孩子节点改变

NodeChildrenChanged

的事件。至于这个事件怎么用,我们下节课讲Watch的时候再说。最后

return path;

把修改的路径传递出去。



总结

这样单机模式下服务器的流程就结束了,按照惯例,我会画流程图帮助大家理解这个流程,那么我们的本系列的下一篇

Zookeeper 源码解读系列, 单机模式(五)

就要介绍EventThread这个线程在客户端做了什么事情。



Zookeeper单机模式:服务器启动流程图

Zookeeper单机模式:服务器启动流程图



Zookeeper单机模式:处理器链执行逻辑图

处理器链执行逻辑图



版权声明:本文为Smallc0de原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。