1. peer.go
首先定位到 tendermint/p2p/peer.go 文件,来看 peer 结构体,peer 实现了 Peer。Peer 负责发现节点,广播内容, 接收内容。
// peer 实现了 Peer,Peer是一个接口,表示连接在 reactor 上的 peer
// 需要执行一次握手连接,在使用 peer 之前.
type peer struct {
// 实现了服务的启动、停止、重启
service.BaseService
// 原始的 peerConn 和 multiplex 连接
// 因为 peer 既可能是一个客户端的连接也可能是一个服务端的连接,所以设计了一个 peerConn 结构
// peerConn 的创建使用 newOutboundPeerConn 和 newInboundPeerConn。这两个函数是在switch组件中被调用的
peerConn
// mconn 的创建使用 createMConnection,这个函数是在 newPeer() 中调用的
mconn *tmconn.MConnection
// peer 的节点信息
// channels = nodeInfo.
// channels 将被缓存,以避免在 hasChannel 方法中复制
nodeInfo NodeInfo
channels []byte
// 用户数据
Data *cmap.CMap
// 程序包的标识和计时器
metrics *Metrics
metricsTicker *time.Ticker
}
来看 service.BaseService.
type BaseService struct {
Logger log.Logger
name string // 新创建的服务名
// 启动和停止的标识
// started = 0 : 未开始;started = 1 : 已开始
// stopped = 0 : 未停止;stopped = 1 : 已停止
started uint32 // atomic
stopped uint32 // atomic
quit chan struct{} // 控制退出
// Service 定义了启动、停止和重启
impl Service
}
peer实现的 ID() 返回 peerID,IsOutbound() 判断连接是否为出站链接,IsPersistent() 判断perr是否为持久的,NodeInfo() 返回 peer 的节点信息,SocketAddr() 返回 socket 的地址,Status() 返回 peer 的链接状态,Send() 通过 channelID 发送字节数据到 channel 中等等,这些方法都是简单实现的。直接看启动方法 OnStart().
func (p *peer) OnStart() error {
// 不需要调用 BaseService.OnStart(),所以直接返回了 nil
if err := p.BaseService.OnStart(); err != nil {
return err
}
// 判断服务是否已经在运行或已停止,是则返回错误;
// 并且调用对应的 OnStart() 方法
if err := p.mconn.Start(); err != nil {
return err
}
// 心跳检测:
// 每隔 10s 获取链接状态,并且将发送数据的通道大小与 peerID 关联起来
go p.metricsReporter()
return nil
}
根据 p.mconn.Start(),找到 MConnection.OnStart(),此方法再次调用了 BaseService.OnStart(),无错则初始化链接的默认配置,并且此方法开启了两个线程,sendRoutine() 轮询发送数据包到 sending(发送缓冲区),一个 PacketMsg 包中可能并没有包含完整的内容,只有EOF为1才标识发送完成了;recvRoutine() 轮询读取数据包并使用通道 recving(接收缓冲区)重建消息(就是先将接收到的内容放入缓存区,只有所有内容都收到之后才会组装成一个完整的内容)。由于TCP是流式的,需要正确的协议才能解析出来正确的报文,所以读取报文是和发送报文必须相同的协议。
1.1 创建连接
newPeer() 负责构造一个新的 peer,并创建连接 createMConnection()。newPeer() 在 transport.go 文件中被 wrapPeer() 函数调用,负责后面的拨号 Dial() 与接收 Accept()。
func createMConnection(
conn net.Conn,
p *peer,
reactorsByCh map[byte]Reactor,
chDescs []*tmconn.ChannelDescriptor,
onPeerError func(Peer, interface{}),
config tmconn.MConnConfig,
) *tmconn.MConnection {
onReceive := func(chID byte, msgBytes []byte) {
reactor := reactorsByCh[chID]
if reactor == nil {
// 这里 panic 是因为 net.Conn 会使用 reactor,并在 onPeerError 函数中执行
panic(fmt.Sprintf("Unknown channel %X", chID))
}
labels := []string{
"peer_id", string(p.ID()), // 默认的节点ID
"chID", fmt.Sprintf("%#x", chID),
}
p.metrics.PeerReceiveBytesTotal.With(labels...).Add(float64(len(msgBytes)))
// 当来自 peer 的 msgBytes 被接收完毕时,Receive() 被 switch 调用
reactor.Receive(chID, p, msgBytes)
}
onError := func(r interface{}) {
onPeerError(p, r)
}
// MConnection 封装了 net.Conn,并通过配置创建多元连接
return tmconn.NewMConnectionWithConfig(
conn,
chDescs,
onReceive,
onError,
config,
)
}
2. transport.go
创建节点时会创建一个 Transport。Transport 发出并连接到 Peers,Peer 的实现留给 Transport,每个 Transport 还负责过滤特定于其域的 Peers,来看 Transport 结构体。
type Transport interface {
// 监听地址.
NetAddress() NetAddress
// 接收一个新连接好的 Peer.
Accept(peerConfig) (Peer, error)
// 根据配置地址使用 Dial 连接到 Peer.
Dial(NetAddress, peerConfig) (Peer, error)
// 清除与 Peer 有关联的资源.
Cleanup(Peer)
}
MultiplexTransport 接受并拨号tcp连接并将其升级到多路复用。
Accept() 方法内部是一个 select 实现,一个 case 等待接收 peer,若可以接收且没有错误,则创建并返回一个有加入数据包标识的 peer;一个 case 等待 peer 关闭,返回关闭后的信息 ErrTransportClosed{}。Accept() 被 switch.go 中的 acceptRoutine() 函数调用。
来看 Dial().
func (mt *MultiplexTransport) Dial(
addr NetAddress,
cfg peerConfig,
) (Peer, error) {
// 超时机制的连接
c, err := addr.DialTimeout(mt.dialTimeout)
if err != nil {
return nil, err
}
// 判断是否已连接,然后获取连接的 ip 地址,并开启一个协程将它保存到连接池中
// filterConn() 是一个验证器,可以扩展验证条件(扩展位置:node/node.go --> createTransport() --> func(_ p2p.ConnSet, c net.Conn, _ []net.IP) error)
if err := mt.filterConn(c); err != nil {
return nil, err
}
// upgradeSecretConn(): 设置连接读写的关闭时间,然后执行一次握手返回已验证的标识,最后再次设置关闭时间,此时间值为 nil
// PubKeyToID(): 对于传出连接,确保连接密钥与已拨密钥匹配
// handshake(): 同样是两次设置连接读写的关闭时间,中间开启两个协程,一个对默认节点信息编码,一个对传过来的节点信息解码
// Validate(): 检查本身报告的默认节点是否是安全的
// 确保连接的秘钥与本身报告的秘钥相匹配
// 如果本身的节点ID与传过来的节点Id相等,则根据传过来的节点id和已有的TCP地址,返回一个新的 NewNetAddress
// CompatibleWith(): 判断两个节点是否相兼容
secretConn, nodeInfo, err := mt.upgrade(c, &addr)
if err != nil {
return nil, err
}
cfg.outbound = true
// 根据新的配置信息,创建一个新的 peer 并返回
p := mt.wrapPeer(secretConn, nodeInfo, cfg, &addr)
return p, nil
}
Listen() 方法监听TCP连接地址,然后开启了一个协程 acceptPeers(),循环执行 Accept() 方式,将返回的结果传入到另一个匿名函数协程中。匿名协程若 panic(),则调用 recover() 恢复并将错误信息返回;匿名协程会调用 filterConn(),filterConn()方法没有错则调用upgrade(),upgrade()方法没有出错,则获取连接地址、秘钥、NetAddress,根据这些信息创建一个 accept{} 并将它放到 acceptc 通道中。
3. switch.go
创建节点时,先创建 Transport,然后创建 Switch。Switch 监听TCP连接,维护新peer的创建和删除,处理对等连接并公开 API 以接收“reactor”上的传入消息。每个“Reactor”负责处理一个/或多个“Channels”的传入消息。因此,当发送传出消息通常在对等机上执行时,传入消息在 reactor 上接收。
来看 OnStart() 方法
func (sw *Switch) OnStart() error {
// 首先调用Reactor 启动所有的Reactor.
// 在创建Switch中 sw.reactors 里面是空的,通过 AddReactor 专门添加Reactor
// 在 tendermint 里面有6个 Reactor,它们是在 node/node.go 文件中被添加的 类似于下面这样
// sw.AddReactor("MEMPOOL", mempoolReactor)
// sw.AddReactor("BLOCKCHAIN", bcReactor)
// sw.AddReactor("CONSENSUS", consensusReactor)
// sw.AddReactor("EVIDENCE", evidenceReactor)
for _, reactor := range sw.reactors {
err := reactor.Start()
if err != nil {
return errors.Wrapf(err, "failed to start %v", reactor)
}
}
// Start accepting Peers.
go sw.acceptRoutine()
return nil
}
在启动Switch之前会调用 AddReactor(), 而且应该是将所有的 Reactor 都添加完成再启动,否则的话后面添加的 Reactor 就不会启动了。接下来看看 AddReactor()。
func (sw *Switch) AddReactor(name string, reactor Reactor) Reactor {
// 调用reactor的GetChannels方法获取相关的通道描述
// 也就是说一个Reactor可以启用好几个通道,但是这个通道ID是所有 Reactor 都不可以重复的
for _, chDesc := range reactor.GetChannels() {
chID := chDesc.ID
// No two reactors can share the same channel.
if sw.reactorsByCh[chID] != nil {
panic(fmt.Sprintf("Channel %X has multiple reactors %v & %v", chID, sw.reactorsByCh[chID], reactor))
}
sw.chDescs = append(sw.chDescs, chDesc)
sw.reactorsByCh[chID] = reactor
}
sw.reactors[name] = reactor
// 这个接口把 Switch 的对象又传递给 Reactor, 这样 Reactor 也可以调用 Switch 的函数了
reactor.SetSwitch(sw)
return reactor
}
Switch 除了实现了 BaseService 的方法外,本身也实现了创建、销毁、获取“Reactor”的方法,和设置、获取节点信息的方法,和创建(AddPersistentPeers)、停止(如果对等连接由于某些原因产生了一个内部错误,并且是持久化的对等连接就会尝试重置连接)、重启、异步连接(DialPeersAsync:按随机顺序异步 Dial 对等连接列表)、连接(DialPeerWithAddress:如果 sw.addPeer 连接并验证成功,则使用 address 拨号给定的对等连接并运行 sw.addPeer)对等连接的方法。
4. PEX
分析到这里,大致明白了tendermint的P2P是如何向对等体发送消息,如何将接收到的消息返回对等体的。可是P2P还有一个非常重要的功能就是进行节点发现——PEX,PEX也是一个 Reactor 的具体实现。先看创建实例 NewReactor 的代码。
// NewReactor creates new PEX reactor.
// 这个函数是在tendermint启动node节点的时候调用的(在调用 NewSwitch后)
// 创建一个AddrBook实例 然后将实例传递给此函数返回一个PEX的 Reactor 实例
func NewReactor(b AddrBook, config *ReactorConfig) *Reactor {
r := &Reactor{
book: b,
config: config,
ensurePeersPeriod: defaultEnsurePeersPeriod,
requestsSent: cmap.NewCMap(),
lastReceivedRequests: cmap.NewCMap(),
crawlPeerInfos: make(map[p2p.ID]crawlPeerInfo),
}
r.BaseReactor = *p2p.NewBaseReactor("Reactor", r)
return r
}
来看 OnStart() 方法
// OnStart implements BaseService
func (r *Reactor) OnStart() error {
// 启动地址簿
err := r.book.Start()
if err != nil && err != service.ErrAlreadyStarted {
return err
}
// 检查配置的种子节点格式是否正确
numOnline, seedAddrs, err := r.checkSeeds()
if err != nil {
return err
} else if numOnline == 0 && r.book.Empty() {
return errors.New("address book is empty and couldn't resolve any seed nodes")
}
r.seedAddrs = seedAddrs
// Check if this node should run
// in seed/crawler mode
// 根据配置文件是否是种子模式来启动不同的 routine
if r.config.SeedMode {
// 每隔30S调用一次 crawlPeers 和 attemptDisconnects
// crawlPeers: 如果上次crawl连接的时间和此处相差不到2分钟,则不进行连接;
// 根据地址簿提供的地址进行拨号的流程——dialPeer,
// 如果连接失败了就继续连接;
// 如果连接成功了,就向这个地址发送一个报文,这个报文的目的就是请求此peer知道的的peer
// dialPeer: 首先从地址簿中找出这个地址最后一次尝试连接的时间和尝试连接的次数
// 如果这个地址不是持久化地址且尝试的次数过多的话,就标记为坏的地址,其实就是从地址簿中移除掉
// 尝试的次数越多 下次尝试连接的间隔越长,然后进行连接
// 如果是校验失败了,就直接从地址簿移除,根本不给尝试的机会,主要目的应该是防止一些恶意的攻击
// attemptDisconnects: 查看每一个peer和自己连接的时长,如果超过28个小时就断开
go r.crawlPeersRoutine()
} else {
// 每个一段时间调用 ensurePeers,时间间隔由我们控制
// ensurePeers: 先获取当前正在连接的peer的信息
// 然后从地址簿中根据偏差值挑出需要数量([10, 90])的peer地址,进行连接
// 然后遍历可以拨号的地址,每个地址开一个go程
// 如果地址簿中存储的peer地址过少,就补充地址簿的数量
// 最后,如果没有进行拨号,则会根据配置的种子地址,调用r.Switch.DialPeerWithAddress(seedAddr, false) 尝试连接peer
go r.ensurePeersRoutine()
}
return nil
}
作为种子模式的pex就是每隔一段时间就会检查当前peer集合中是否有连接时间超过28小时的peer, 如果有,除非设置了持久连接否则就断开, 然后去爬去新的节点, 如果从地址簿中爬去到新的节点就发送一份请求地址交换的报文过去。
作为非种子节点就是一直进行连接,尽量保存足够的连接数量。 如果数量不够就先从地址簿从取一些地址进行连接。如果地址簿发现保存的地址数量太少就会尝试向已知的peer发送请求新peer的报文请求。如果这个时候还是没有足够的peer就只能向种子节点去请求。
来看 Receive(),它主要负责处理发送请求新连接的报文后的响应。
5. 其它
通过 Transport,Switch 这两个文件就能使用 Peer 了,其它的反应器(base_reactor.go)、连接(conn_set.go)、密钥(key.go)、网关信息(netaddress.go)、节点信息(node_info.go)都是通过这两个文件调用的。