初始化请求例子_Codis Proxy是如何处理一个请求的

  • Post author:
  • Post category:其他


前面我们分析了Codis各组成部件,其中Proxy是用来处理客户端请求的,今天我们具体分析下一次请求在Codis内部是如何处理的。


一、Proxy启动函数

前面我们讲了Proxy启动是通过以下这行代码来启动的:

go s.serveProxy()

这个里面会有接受连接,并处理连接的代码:

go func(l net.Listener) (err error) {    defer func() {      eh 

NewSession会返回一个Session的数据结构,重点看下Start方法:

//此处省略无关紧要代码go func() {      s.loopWriter(tasks)      decrSessions()    }()    go func() {      s.loopReader(tasks, d)      tasks.Close()    }()

一个Session重点就是上面两个协程,其中一个处理写事件,另一个处理读事件,读、写是相对于数据流的方向的,针对Codis来说,从客户端读取请求数据就是读,把响应返回给客户端就是写。

其中两个协程的函数都有个tasks的参数,这个tasks初始化代码如下:

tasks := NewRequestChanBuffer(1024)func NewRequestChanBuffer(n int) *RequestChan {  if n <= 0 {    n = DefaultRequestChanBuffer  }  var ch = &RequestChan{    buff: make([]*Request, n),  }  ch.cond = sync.NewCond(&ch.lock)  return ch}

即tasks是一个RequestChan的结构,其核心就是一个buff的数组,读和写的协程就是通过这个来交换数据,作为任务队列来使用的,即从客户端读取响应后发送给后端Redis Server,并且读取后端Redis Server返回的响应后再将请求写回到这个队列,然后由写的协程将响应写回给客户端。


二、细节分析

下面我们来具体分析实现,先看loopReader,前面讲过这个里面要读取客户端请求过来的命令,并且转发到后端Redis Server:

for !s.quit {    //处理客户端发送过来的数据    multi, err := s.Conn.DecodeMultiBulk()    if err != nil {      return err    }    if len(multi) == 0 {      continue    }   //省略一些代码    r := &Request{}    r.Multi = multi    r.Batch = &sync.WaitGroup{}    r.Database = s.database    r.UnixNano = start.UnixNano()    //转发请求    if err := s.handleRequest(r, d); err != nil {      r.Resp = redis.NewErrorf("ERR handle request, %s", err)      tasks.PushBack(r)      if breakOnFailure {        return err      }    } else {      tasks.PushBack(r)    }  }

其中s.Conn.DecodeMultiBulk即将客户端请求的数据解码后以数组格式返回,举个例子,客户端发送请求:

get ok

则multi是这样的:

b08dba48bbf8ada14a22c432f5cb67ae.png

可以看到multi第0项成员为get,第1项为ok。

读取到客户端原始请求数据后,Codis然后调用s.handleRequest将数据发送给后端Redis Server,handleRequest里面就是具体的命令转发了:

switch opstr {  case "SELECT":    return s.handleSelect(r)  case "PING":    return s.handleRequestPing(r, d)  case "INFO":    return s.handleRequestInfo(r, d)  case "MGET":    return s.handleRequestMGet(r, d)  case "MSET":    return s.handleRequestMSet(r, d)  case "DEL":    return s.handleRequestDel(r, d)  case "EXISTS":    return s.handleRequestExists(r, d)  case "SLOTSINFO":    return s.handleRequestSlotsInfo(r, d)  case "SLOTSSCAN":    return s.handleRequestSlotsScan(r, d)  case "SLOTSMAPPING":    return s.handleRequestSlotsMapping(r, d)  default:    return d.dispatch(r)

以一个默认的GET命令来说,会走到dispatch这里,

func (s *Router) dispatch(r *Request) error {  hkey := getHashKey(r.Multi, r.OpStr)  var id = Hash(hkey) % MaxSlotNum  slot := &s.slots[id]  return slot.forward(r, hkey)}

dispatch调用slot.forward(r, hkey),然后调用到forwardSync::Forward

func (d *forwardSync) Forward(s *Slot, r *Request, hkey []byte) error {  s.lock.RLock()  bc, err := d.process(s, r, hkey)  s.lock.RUnlock()  if err != nil {    return err  }  bc.PushBack(r)  return nil}

d.process只是返回后端的连接,当然还有一些判断是否迁移的逻辑,我们先跳过,返回类型是BackendConn指针,然后又将请求r通过PushBack发送给b.input这个通道:

func (bc *BackendConn) PushBack(r *Request) {  if r.Batch != nil {    r.Batch.Add(1)  }  bc.input 

请求发给input通道后,又是哪里在处理input通道上的数据呢?BackendConn也有个loopWriter会一直处理input通道中的数据:

c, tasks, err := bc.newBackendReader(round, bc.config)//省略一些代码for r := range bc.input {    if r.IsReadOnly() && r.IsBroken() {      bc.setResponse(r, nil, ErrRequestIsBroken)      continue    }    if err := p.EncodeMultiBulk(r.Multi); err != nil {      return bc.setResponse(r, nil, fmt.Errorf("backend conn failure, %s", err))    }    if err := p.Flush(len(bc.input) == 0); err != nil {      return bc.setResponse(r, nil, fmt.Errorf("backend conn failure, %s", err))    } else {      tasks 

这个协程会处理input中通道的请求发送给后端的Redis Server,处理完后,然后丢给tasks通道,tasks通道又有一个协程在处理,就是BackendConn的loopReader:

for r := range tasks {    resp, err := c.Decode()    if err != nil {      return bc.setResponse(r, nil, fmt.Errorf("backend conn failure, %s", err))    }    //省略一些代码    bc.setResponse(r, resp, nil)

它会处理后端的响应,然后设置到请求的相应字段。

所有这些处理完成后,就是Session的loopWriter将数据发送给客户端了:

return tasks.PopFrontAll(func(r *Request) error {    resp, err := s.handleResponse(r)    if err := p.Encode(resp); err != nil {      return s.incrOpFails(r, err)    }    fflush := tasks.IsEmpty()    //将响应发送给客户端    if err := p.Flush(fflush); err != nil {      return s.incrOpFails(r, err)    } else {      s.incrOpStats(r, resp.Type)    }    return nil  })


三、总结

我们来总结下一个请求处理过程:

1、Session的loopReader协程读取客户端发送过来的数据;

2、在上面读取完数据后,通过slot.forward转发到相应的Server,Codis用BackendConn来表示一个后端连接;

3、BackendConn也有专门处理读、写请求的协程,先由BackendConn::loopWriter将请求发往后端Redis Server;

4、再由BackendConn::loopReader处理后端Redis Server的处理结果;

5、上面处理完后由Session的loopWriter将处理结果发送给客户端。

Proxy请求处理分了2层,一层是前端客户端的连接,由Session模块处理;

第2层是处理与后端Codis Server的连接,由BackendConn处理;

两者都实现了基于读、写事件驱动的异步编程来提高系统的吞吐率,Session是通过队列+锁的方式来传递任务,典型的生产者、消费者模型;而BackendConn则是由通道的方式实现。



版权声明:本文为weixin_29207149原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。