Rpcx实现

  • Post author:
  • Post category:其他


一.rpcx介绍

1.1 rpc是什么

远程过程调用的通信协议。该协议允许运行于一台计算机的程序调用另一台计算机的子程序,而程序员无需额外地为这个交互作用编程。如果涉及的软件采用面向对象编程,那么远程过程调用亦可称作远程调用或远程方法调用。简单地说就是能使应用像调用本地方法一样的调用远程的过程或服务。很显然,这是一种client-server的交互形式,调用者是client,执行者是server。

一个完整的rpc的调用过程如下:

一次完整的RPC调用流程(同步调用,异步另说)如下:

  1. 服务消费方(client)调用以本地调用方式调用服务;
  2. client stub接收到调用后负责将方法、参数等组装成能够进行网络传输的消息体;
  3. client stub找到服务地址,并将消息发送到服务端;
  4. server stub收到消息后进行解码;
  5. server stub根据解码结果调用本地的服务;
  6. 本地服务执行并将结果返回给server stub;
  7. server stub将返回结果打包成消息并发送至消费方;
  8. client stub接收到消息,并进行解码;
  9. 服务消费方得到最终结果。

1.2 rpcx介绍

RPC只是描绘了 Client 与 Server 之间的点对点调用流程,包括 stub、通信、RPC 消息解析等部分,在实际应用中,还需要考虑服务的高可用、负载均衡等问题,所以产品级的 RPC 框架除了点对点的 RPC 协议的具体实现外,还应包括服务的发现与注销、提供服务的多台 Server 的负载均衡、服务的高可用等更多的功能。目前的 RPC 框架大致有两种不同的侧重方向,一种偏重于服务治理,另一种偏重于跨语言调用。 然而rpcx 属于服务治理类型,是一个基于 Go 开发的高性能的轻量级 RPC 框架。

1.2.1 rpcx特点

  • 基于net/rpc,可以将net/rpc实现的RPC项目轻松的转换为分布式的RPC
  • 插件式设计,可以配置所需的插件,比如服务发现、日志、统计分析等
  • 基于TCP长连接,只需很小的额外的消息头
  • 支持多种编解码协议,如Gob、Json、MessagePack、gencode、ProtoBuf等
  • 服务发现:服务发布、订阅、通知等,支持多种发现方式如ZooKeeper、Etcd等
  • 高可用策略:失败重试(Failover)、快速失败(Failfast)
  • 负载均衡:支持随机请求、轮询、低并发优先、一致性 Hash等
  • 规模可扩展,可以根据性能的需求增减服务器
  • 其他:调用统计、访问日志等
  • 功能都可以通过插件的方式完成。

1.2.2 rpcx架构

rpcx中有服务提供者 RPC Server,服务调用者 RPC Client 和服务注册中心 Registry 三个角色。

  • Server 向 Registry 注册服务,并向注册中心发送心跳汇报状态(基于不同的registry有不同的实现)。

  • Client 需要向注册中心查询 RPC 服务者列表,Client 根据 Registry 返回的服务者列表,选取其中一个 Sever 进行 RPC 调用。

  • 当 Server 发生宕机时,Registry 会监测到服务者不可用(zookeeper session机制或者手工心跳),Client 感知后会对本地的服务列表作相应调整。client可能被动感知(zookeeper)或者主动定时拉取。

  • 可选地,Server可以定期向Registry汇报调用统计信息,Client可以根据调用次数选择压力最小的Server。

当前rpcx支持zookeeper, etcd等注册中心。rpcx基于Go net/rpc的底层实现, Client和Server之间通讯是通过TCP进行通讯的,它们之间通过Client发送Request,Server返回Response实现。Request和Response消息的格式都是

Header+Body

的格式。Header和Body具体的格式根据编码方式的不同而不同,可以是二进制,也可以是结构化数据如JSON。

1.2.3 容错

Client提供了两种容错方式:

Failfast



Failover



Failtry

:

  • Failfast: 如果Client调用失败,立即返回,不会重试。

  • Failover: 如果Client调用失败,会尝试从服务列表中选择另外一个服务器调用,直到成功或者到达重试次数。

  • Failtry: 如果Client调用失败,会继续这个服务器重试,直到成功或者到达重试次数。

1.2.4 重选算法

  • 随机选择: 随机选择一个服务器并返回,可能和上一次的重复。

  • RoundRobin: 按顺序选择一个服务器。

  • 一致性哈希 [TODO]:使用Jump Consistent Hash algorithm。

  • CallLeast [TODO]: 根据调用次数选择压力最小的服务器。

1.2.5 序列化

gob 官方提供的序列化方式,基于一个包含元数据的流
jsonrpc 也是官方提供的编码库,以JSON格式传输
msgp 类似json格式的编码,但是更小更快,可以直接编码struct
gencode 一个超级快的序列化库,需要定义schema,但是定义方式和struct类似
protobuf Google推出的广受关注的序列化库,推荐使用gogo-protobuf,可以获得更高的性能

二.rpcx-server

2.1 server结构

// Server is rpcx server that use TCP or UDP.
type Server struct {
	ln                 net.Listener   //监听
	readTimeout        time.Duration  //读取client数据的超时时间
	writeTimeout       time.Duration  //写入client数据的超时时间
	gatewayHTTPServer  *http.Server   
	DisableHTTPGateway bool //使用HTTP网关
	DisableJSONRPC     bool //使用json-rpc

	serviceMapMu sync.RWMutex
	serviceMap   map[string]*service   //server端提供的service的记录表

	mu         sync.RWMutex            
	activeConn map[net.Conn]struct{}   // 存活的连接
	doneChan   chan struct{}           // server完成管道
	seq        uint64                  // server端编号

	inShutdown int32
	onShutdown []func(s *Server)      //禁止一个套接字的IO

	// TLSConfig for creating tls tcp connection.
	tlsConfig *tls.Config            // tcp连接的配置
	// BlockCrypt for kcp.BlockCrypt
	options map[string]interface{}  //kip协议时提供的一些限制

	// CORS options
	corsOptions *CORSOptions

	Plugins PluginContainer  //插件管理,通过实现插件注册插件,增加server的特性

	// AuthFunc can be used to auth.
	AuthFunc func(ctx context.Context, req *protocol.Message, token string) error //认证

	handlerMsgNum int32     //处理消息量
}

2.2 server 启动

func (s *Server) Serve(network, address string) (err error) {
	s.startShutdownListener()
	var ln net.Listener
	ln, err = s.makeListener(network, address)
	if err != nil {
		return
	}

	if network == "http" {
		s.serveByHTTP(ln, "")
		return nil
	}

	// try to start gateway
	ln = s.startGateway(network, ln)

	return s.serveListener(ln)
}
  • 创建监听。
  • 根据协议启动serve, 如果是HTTP协议走serveByHTTP函数;如果是TCP协议开启网关,走serveListener函数。
  • 注意:这个过程是阻塞的。

1.首先看一下serveByHTTP函数:

func (s *Server) serveByHTTP(ln net.Listener, rpcPath string) {
	s.ln = ln

	if rpcPath == "" {
		rpcPath = share.DefaultRPCPath //rpc为空,给一个默认路径
	}
	http.Handle(rpcPath, s)
	srv := &http.Server{Handler: nil} //构建server
	srv.Serve(ln)
}
  • 判断rpcPath, 为空赋值”/_rpcx_”。
  • 根据rpcPath,注册handler。
  • 构建server,启动http server。

2.再看一下serveListener函数:

func (s *Server) serveListener(ln net.Listener) error {

	var tempDelay time.Duration

	s.mu.Lock()
	s.ln = ln
	s.mu.Unlock()

	for {
		conn, e := ln.Accept()   //获取socket连接,epoll
		if e != nil {
			select {
			case <-s.getDoneChan():
				return ErrServerClosed //收到关闭信号,退出关闭
			default:
			}

			if ne, ok := e.(net.Error); ok && ne.Temporary() { //网络错误,延迟重新建立连接
				if tempDelay == 0 {
					tempDelay = 5 * time.Millisecond  //5ms
				} else {
					tempDelay *= 2 //设置延迟时间,开始递增,避免无意义的重试
				}

				if max := 1 * time.Second; tempDelay > max {
					tempDelay = max
				} //限制了最大重试时间在1s

				log.Errorf("rpcx: Accept error: %v; retrying in %v", e, tempDelay)
				time.Sleep(tempDelay)
				continue
			}
			return e
		}
        //如果没有网络错误,下次网络错误的重试时间重新开始
		tempDelay = 0

		if tc, ok := conn.(*net.TCPConn); ok { //取出TCP连接
			tc.SetKeepAlive(true)//设置TCP保持长连接
			tc.SetKeepAlivePeriod(3 * time.Minute)设置TCP探测时间间隔时间为3分钟,如果客户端3分钟没有和服务端通信,则开始探测
			tc.SetLinger(10)
		}

		conn, ok := s.Plugins.DoPostConnAccept(conn)
		if !ok {
			closeChannel(s, conn)
			continue
		}

		s.mu.Lock()
		s.activeConn[conn] = struct{}{} //空struct{}占位,连接不可用
		s.mu.Unlock()

		go s.serveConn(conn)//协程处理连接
	}
}

2.3.rpcx-server类图

三.rpcx-client

3.1 xClient 及其初始化

type xClient struct {
	failMode     FailMode
	selectMode   SelectMode
	cachedClient map[string]RPCClient
	breakers     sync.Map
	servicePath  string
	option       Option
	mu        sync.RWMutex
	servers   map[string]string
	discovery ServiceDiscovery
	selector  Selector

	isShutdown bool
	auth string
	Plugins PluginContainer
	ch chan []*KVPair
	serverMessageChan chan<- *protocol.Message
}

客户端在初始化时会根据参数FailMode、SelectMode、Discovery、Option来确定调用失败后处理模式、路由选择的模式、发现服务器列表以及可选配置项。FailMode和SelectMode为服务治理 (失败模式与路由选择)的选项定义。在大规模的RPC系统中,许多服务节点在提供同一个服务。

3.2 FailMode

如果调用失败,客户端应该选择另一个节点或者立即返回错误,失败处理模式FailMode仅对同步调用有效(xClient.Call),而异步调用(xClient.Go)无效,FailMode一共有下面几种值可选择:

type FailMode int

const (
	// 自动选择另一台服务器
	Failover FailMode = iota
	// 立即返回错误
	Failfast
	// 再次使用当前客户端
	Failtry
	// 如果第一台服务器在指定时间内没有快速响应,则选择另一台服务器
	Failbackup
)

3.3 SelectMode

路由选择模式SelectMode则有下面几种情况可选择:

// SelectMode 定义从候选者中选择服务的算法
type SelectMode int
const (
	// 随机选择:从服务节点中随机选择一个节点。由于节点是随机选择,所以并不能保证节点之间负载的均匀
	RandomSelect SelectMode = iota
	// 轮询模式:从服务节点列表中逐个选择依次使用,能保证每个节点均匀被访问,在节点服务能力相差不大时适用。
	RoundRobin
	// 加权轮询模式:使用基于权重的轮询算法
	WeightedRoundRobin
	// 加权网络质量优先,客户端会基于ping(ICMP) 探测各个节点的网络质量,网络质量越好则节点的权重也就越高。
	WeightedICMP
	// 一致性Hash:使用 JumpConsistentHash 选择节点, 相同的servicePath, serviceMethod 和参数会路由到同一个节点上。 JumpConsistentHash 是一个快速计算一致性哈希的算法,但是有一个缺陷是它不能删除节点,如果删除节点,路由需要重新计算一致性哈希。
	ConsistentHash
	// 最近的服务器:它要求服务在注册的时候要设置它所在的地理经纬度
	Closest

	// 通过用户进行选择
	SelectByUser = 1000
)

3.4 Option

一些其他的配置选项:

type Option struct {
	Group string
	Retries int   //重试次数
	TLSConfig *tls.Config
	Block interface{}
	RPCPath string
	ConnectTimeout time.Duration  //超时时间
	ReadTimeout time.Duration
	WriteTimeout time.Duration
	BackupLatency time.Duration
	GenBreaker func() Breaker
	SerializeType protocol.SerializeType  //默认通信协议
	CompressType  protocol.CompressType
	Heartbeat         bool       //是否启动心跳
	HeartbeatInterval time.Duration  //心跳的超时时间
}


注意:

TCP有保活机制,为什么还需要在应用层维持心跳包,这个跟教师端虽然使用的TCP连接IRC服务器,但依然在上层封装Ping和Pong的原理是一样的。tcp的keep-alive默认是7200秒,也就是2小时,首先是检测时间太长,这么长的时间只能检测连接是否存在并不能检测数据是否能正常收发。而且keep-alive的数据包如果碰到四层负载均衡的中继设备,TCP内部的包会被中继设备接收并不会传到对端,TCP内部的数据包才会被转发。最后,由于前些年微信等软件占用运营商过多的网络资源,keep-alive包会被运营商过滤一部分。

3.5 调用

func (c *xClient) Call(ctx context.Context, serviceMethod string, args interface{}, reply interface{}) error
func (c *xClient) Go(ctx context.Context, serviceMethod string, args interface{}, reply interface{},done chan *Call) (*Call, error)
func (c *xClient) Fork(ctx context.Context, serviceMethod string, args interface{}, reply interface{}) error
func (c *xClient) Broadcast(ctx context.Context, serviceMethod string, args interface{}, reply interface{}) error
  • Go()方法是异步调用,在异步调用中,失败模式FailMode将会不起作用,它即时返回一个Call结构体实例。
  • Call()方法是同步调用,也是最常用的调用方式,它会根据选择器确定服务器,支持失败模式FailMode,可以设置Option可选项,来进行远程调用,直到服务器返回数据或者超时。
  • Broadcast()方法将请求发送到该服务的所有节点。如果所有的节点都正常返回才算成功。只有在所有节点没有错误的情况下, Broadcast()方法将返回其中的一个节点的返回信息。 如果有节点返回错误的话,Broadcast()方法将返回这些错误信息中的一个。失败模式FailMode和路由选择SelectMode在该方法中都不会生效,最好设置超时避免程序挂起。
  • Fork()方法将请求发送到该服务的所有节点。如果有任何一个节点正常返回,则成功,Fork()方法将返回其中的一个节点的返回结果。 如果所有节点返回错误的话,Fork()方法将返回这些错误信息中的一个。失败模式FailMode和路由选择SelectMode在该方法中都不会生效。



版权声明:本文为qq_35703848原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。