前面我们分析了Codis各组成部件,其中Proxy是用来处理客户端请求的,今天我们具体分析下一次请求在Codis内部是如何处理的。
一、Proxy启动函数
前面我们讲了Proxy启动是通过以下这行代码来启动的:
go s.serveProxy()
这个里面会有接受连接,并处理连接的代码:
go func(l net.Listener) (err error) { defer func() { eh
NewSession会返回一个Session的数据结构,重点看下Start方法:
//此处省略无关紧要代码go func() { s.loopWriter(tasks) decrSessions() }() go func() { s.loopReader(tasks, d) tasks.Close() }()
一个Session重点就是上面两个协程,其中一个处理写事件,另一个处理读事件,读、写是相对于数据流的方向的,针对Codis来说,从客户端读取请求数据就是读,把响应返回给客户端就是写。
其中两个协程的函数都有个tasks的参数,这个tasks初始化代码如下:
tasks := NewRequestChanBuffer(1024)func NewRequestChanBuffer(n int) *RequestChan { if n <= 0 { n = DefaultRequestChanBuffer } var ch = &RequestChan{ buff: make([]*Request, n), } ch.cond = sync.NewCond(&ch.lock) return ch}
即tasks是一个RequestChan的结构,其核心就是一个buff的数组,读和写的协程就是通过这个来交换数据,作为任务队列来使用的,即从客户端读取响应后发送给后端Redis Server,并且读取后端Redis Server返回的响应后再将请求写回到这个队列,然后由写的协程将响应写回给客户端。
二、细节分析
下面我们来具体分析实现,先看loopReader,前面讲过这个里面要读取客户端请求过来的命令,并且转发到后端Redis Server:
for !s.quit { //处理客户端发送过来的数据 multi, err := s.Conn.DecodeMultiBulk() if err != nil { return err } if len(multi) == 0 { continue } //省略一些代码 r := &Request{} r.Multi = multi r.Batch = &sync.WaitGroup{} r.Database = s.database r.UnixNano = start.UnixNano() //转发请求 if err := s.handleRequest(r, d); err != nil { r.Resp = redis.NewErrorf("ERR handle request, %s", err) tasks.PushBack(r) if breakOnFailure { return err } } else { tasks.PushBack(r) } }
其中s.Conn.DecodeMultiBulk即将客户端请求的数据解码后以数组格式返回,举个例子,客户端发送请求:
get ok
则multi是这样的:
可以看到multi第0项成员为get,第1项为ok。
读取到客户端原始请求数据后,Codis然后调用s.handleRequest将数据发送给后端Redis Server,handleRequest里面就是具体的命令转发了:
switch opstr { case "SELECT": return s.handleSelect(r) case "PING": return s.handleRequestPing(r, d) case "INFO": return s.handleRequestInfo(r, d) case "MGET": return s.handleRequestMGet(r, d) case "MSET": return s.handleRequestMSet(r, d) case "DEL": return s.handleRequestDel(r, d) case "EXISTS": return s.handleRequestExists(r, d) case "SLOTSINFO": return s.handleRequestSlotsInfo(r, d) case "SLOTSSCAN": return s.handleRequestSlotsScan(r, d) case "SLOTSMAPPING": return s.handleRequestSlotsMapping(r, d) default: return d.dispatch(r)
以一个默认的GET命令来说,会走到dispatch这里,
func (s *Router) dispatch(r *Request) error { hkey := getHashKey(r.Multi, r.OpStr) var id = Hash(hkey) % MaxSlotNum slot := &s.slots[id] return slot.forward(r, hkey)}
dispatch调用slot.forward(r, hkey),然后调用到forwardSync::Forward
func (d *forwardSync) Forward(s *Slot, r *Request, hkey []byte) error { s.lock.RLock() bc, err := d.process(s, r, hkey) s.lock.RUnlock() if err != nil { return err } bc.PushBack(r) return nil}
d.process只是返回后端的连接,当然还有一些判断是否迁移的逻辑,我们先跳过,返回类型是BackendConn指针,然后又将请求r通过PushBack发送给b.input这个通道:
func (bc *BackendConn) PushBack(r *Request) { if r.Batch != nil { r.Batch.Add(1) } bc.input
请求发给input通道后,又是哪里在处理input通道上的数据呢?BackendConn也有个loopWriter会一直处理input通道中的数据:
c, tasks, err := bc.newBackendReader(round, bc.config)//省略一些代码for r := range bc.input { if r.IsReadOnly() && r.IsBroken() { bc.setResponse(r, nil, ErrRequestIsBroken) continue } if err := p.EncodeMultiBulk(r.Multi); err != nil { return bc.setResponse(r, nil, fmt.Errorf("backend conn failure, %s", err)) } if err := p.Flush(len(bc.input) == 0); err != nil { return bc.setResponse(r, nil, fmt.Errorf("backend conn failure, %s", err)) } else { tasks
这个协程会处理input中通道的请求发送给后端的Redis Server,处理完后,然后丢给tasks通道,tasks通道又有一个协程在处理,就是BackendConn的loopReader:
for r := range tasks { resp, err := c.Decode() if err != nil { return bc.setResponse(r, nil, fmt.Errorf("backend conn failure, %s", err)) } //省略一些代码 bc.setResponse(r, resp, nil)
它会处理后端的响应,然后设置到请求的相应字段。
所有这些处理完成后,就是Session的loopWriter将数据发送给客户端了:
return tasks.PopFrontAll(func(r *Request) error { resp, err := s.handleResponse(r) if err := p.Encode(resp); err != nil { return s.incrOpFails(r, err) } fflush := tasks.IsEmpty() //将响应发送给客户端 if err := p.Flush(fflush); err != nil { return s.incrOpFails(r, err) } else { s.incrOpStats(r, resp.Type) } return nil })
三、总结
我们来总结下一个请求处理过程:
1、Session的loopReader协程读取客户端发送过来的数据;
2、在上面读取完数据后,通过slot.forward转发到相应的Server,Codis用BackendConn来表示一个后端连接;
3、BackendConn也有专门处理读、写请求的协程,先由BackendConn::loopWriter将请求发往后端Redis Server;
4、再由BackendConn::loopReader处理后端Redis Server的处理结果;
5、上面处理完后由Session的loopWriter将处理结果发送给客户端。
Proxy请求处理分了2层,一层是前端客户端的连接,由Session模块处理;
第2层是处理与后端Codis Server的连接,由BackendConn处理;
两者都实现了基于读、写事件驱动的异步编程来提高系统的吞吐率,Session是通过队列+锁的方式来传递任务,典型的生产者、消费者模型;而BackendConn则是由通道的方式实现。