tendermint, switch & reactor
These two parts demonstrate how tendermint handles the underlying p2p and upper layer logics.
The arch looks like below:
reactors
switch
transport
peer
mconnection
peer
mconnection
peer
mconnection
Switch
Switch is a part of tendermint P2P layer.
Switch handles peer connections and exposes an API to receive incoming messages on Reactors
. Each Reactor
is responsible for handling incoming messages of one or more Channels
. So while sending outgoing messages is typically performed on the peer, incoming messages are received on the reactor.
type Switch struct {
service.BaseService
config *config.P2PConfig
reactors map[string]Reactor
chDescs []*conn.ChannelDescriptor
reactorsByCh map[byte]Reactor
peers *PeerSet
dialing *cmap.CMap
reconnecting *cmap.CMap
nodeInfo NodeInfo // our node info
nodeKey *NodeKey // our node privkey
addrBook AddrBook
// peers addresses with whom we'll maintain constant connection
persistentPeersAddrs []*NetAddress
unconditionalPeerIDs map[ID]struct{}
transport Transport
filterTimeout time.Duration
peerFilters []PeerFilterFunc
rng *rand.Rand // seed for randomizing dial times and orders
metrics *Metrics
}
Initialization
Switch is a base service
. It is started when node starts. Evenmore, Switch starts all reactors registered.
// NewNode
sw := createSwitch(
config, transport, p2pMetrics, peerFilters, mempoolReactor, bcReactor,
consensusReactor, evidenceReactor, nodeInfo, nodeKey, p2pLogger,
)
// Node.Start
// Start the switch (the P2P server).
err = n.sw.Start()
if err != nil {
return err
}
Switch takes transport as a input parameter, which is the source of all messages from p2p network.
Switch starts all registered Reactors:
// OnStart implements BaseService. It starts all the reactors and peers.
func (sw *Switch) OnStart() error {
// Start reactors
for _, reactor := range sw.reactors {
err := reactor.Start()
if err != nil {
return errors.Wrapf(err, "failed to start %v", reactor)
}
}
// Start accepting Peers.
go sw.acceptRoutine()
return nil
}
In addition, Switch starts a go routine to accept incoming peers.
acceptRoutine – inbound
It waits for any new peer connection from underlying MultiplexTransport. Once upon success, it will call addPeer().
func (sw *Switch) acceptRoutine() {
for {
p, err := sw.transport.Accept(peerConfig{
chDescs: sw.chDescs,
onPeerError: sw.StopPeerForError,
reactorsByCh: sw.reactorsByCh,
metrics: sw.metrics,
isPersistent: sw.IsPeerPersistent,
})
if err != nil {
switch err := err.(type) {
case ErrRejected:
if err.IsSelf() {
// Remove the given address from the address book and add to our addresses
// to avoid dialing in the future.
addr := err.Addr()
sw.addrBook.RemoveAddress(&addr)
sw.addrBook.AddOurAddress(&addr)
}
sw.Logger.Info(
"Inbound Peer rejected",
"err", err,
"numPeers", sw.peers.Size(),
)
continue
case ErrFilterTimeout:
sw.Logger.Error(
"Peer filter timed out",
"err", err,
)
continue
case ErrTransportClosed:
sw.Logger.Error(
"Stopped accept routine, as transport is closed",
"numPeers", sw.peers.Size(),
)
default:
sw.Logger.Error(
"Accept on transport errored",
"err", err,
"numPeers", sw.peers.Size(),
)
// We could instead have a retry loop around the acceptRoutine,
// but that would need to stop and let the node shutdown eventually.
// So might as well panic and let process managers restart the node.
// There's no point in letting the node run without the acceptRoutine,
// since it won't be able to accept new connections.
panic(fmt.Errorf("accept routine exited: %v", err))
}
break
}
if !sw.IsPeerUnconditional(p.NodeInfo().ID()) {
// Ignore connection if we already have enough peers.
_, in, _ := sw.NumPeers()
if in >= sw.config.MaxNumInboundPeers {
sw.Logger.Info(
"Ignoring inbound connection: already have enough inbound peers",
"address", p.SocketAddr(),
"have", in,
"max", sw.config.MaxNumInboundPeers,
)
sw.transport.Cleanup(p)
continue
}
}
if err := sw.addPeer(p); err != nil {
sw.transport.Cleanup(p)
if p.IsRunning() {
_ = p.Stop()
}
sw.Logger.Info(
"Ignoring inbound connection: error while adding peer",
"err", err,
"id", p.ID(),
)
}
}
}
DialPeerWithAddress – outbound
It dials the given peer and runs sw.addPeer if it connects and authenticates successfully.
addPeer
Whenever a peer connection is established, peer will be started up and added to Switch peerset map.
Reactors will be invoked for the peer added: Reactor.InitPeer -> Reactor.AddPeer
// addPeer starts up the Peer and adds it to the Switch. Error is returned if
// the peer is filtered out or failed to start or can't be added.
func (sw *Switch) addPeer(p Peer) error {
if err := sw.filterPeer(p); err != nil {
return err
}
p.SetLogger(sw.Logger.With("peer", p.SocketAddr()))
// Handle the shut down case where the switch has stopped but we're
// concurrently trying to add a peer.
if !sw.IsRunning() {
// XXX should this return an error or just log and terminate?
sw.Logger.Error("Won't start a peer - switch is not running", "peer", p)
return nil
}
// Add some data to the peer, which is required by reactors.
for _, reactor := range sw.reactors {
p = reactor.InitPeer(p)
}
// Start the peer's send/recv routines.
// Must start it before adding it to the peer set
// to prevent Start and Stop from being called concurrently.
err := p.Start()
if err != nil {
// Should never happen
sw.Logger.Error("Error starting peer", "err", err, "peer", p)
return err
}
// Add the peer to PeerSet. Do this before starting the reactors
// so that if Receive errors, we will find the peer and remove it.
// Add should not err since we already checked peers.Has().
if err := sw.peers.Add(p); err != nil {
return err
}
sw.metrics.Peers.Add(float64(1))
// Start all the reactor protocols on the peer.
for _, reactor := range sw.reactors {
reactor.AddPeer(p)
}
sw.Logger.Info("Added peer", "peer", p)
return nil
}
Reactor
Reactor is responsible for handling incoming messages from Switch on one or more channels.
// Reactor is responsible for handling incoming messages on one or more
// Channel. Switch calls GetChannels when reactor is added to it. When a new
// peer joins our node, InitPeer and AddPeer are called. RemovePeer is called
// when the peer is stopped. Receive is called when a message is received on a
// channel associated with this reactor.
//
// Peer#Send or Peer#TrySend should be used to send the message to a peer.
type Reactor interface {
service.Service // Start, Stop
// SetSwitch allows setting a switch.
SetSwitch(*Switch)
// GetChannels returns the list of MConnection.ChannelDescriptor. Make sure
// that each ID is unique across all the reactors added to the switch.
GetChannels() []*conn.ChannelDescriptor
// InitPeer is called by the switch before the peer is started. Use it to
// initialize data for the peer (e.g. peer state).
//
// NOTE: The switch won't call AddPeer nor RemovePeer if it fails to start
// the peer. Do not store any data associated with the peer in the reactor
// itself unless you don't want to have a state, which is never cleaned up.
InitPeer(peer Peer) Peer
// AddPeer is called by the switch after the peer is added and successfully
// started. Use it to start goroutines communicating with the peer.
AddPeer(peer Peer)
// RemovePeer is called by the switch when the peer is stopped (due to error
// or other reason).
RemovePeer(peer Peer, reason interface{})
// Receive is called by the switch when msgBytes is received from the peer.
//
// NOTE reactor can not keep msgBytes around after Receive completes without
// copying.
//
// CONTRACT: msgBytes are not nil.
Receive(chID byte, peer Peer, msgBytes []byte)
}
There are a few Reactors registered to P2P Switch to watch the peer connection lifecycle.
- Consensus
- BlockChain
- PEX
- Mempool
- Evidence
Consensus
It handles the consensus service.
StateChannel = byte(0x20)
DataChannel = byte(0x21)
VoteChannel = byte(0x22)
VoteSetBitsChannel = byte(0x23)
Blockchain
It handles long-term catchup syncing. there are v0, v1 and v2 in tendermint. Here let’s take v0 as example.
// BlockchainChannel is a channel for blocks and status updates (`BlockStore` height)
BlockchainChannel = byte(0x40)
PEX
It handles PEX (peer exchange) and ensures that an adequate number of peers are connected to the switch. It uses AddrBook
(address book) to store NetAddress
es of the peers.
// PexChannel is a channel for PEX messages
PexChannel = byte(0x00)
Mempool
It handles mempool tx broadcasting amongst peers.
MempoolChannel = byte(0x30)
Evidence
It handles evpool evidence broadcasting amongst peers.
EvidenceChannel = byte(0x38)