tendermint, switch & reactor
These two parts demonstrate how tendermint handles the underlying p2p and upper layer logics.
The arch looks like below:
Switch is a part of tendermint P2P layer.
Switch handles peer connections and exposes an API to receive incoming messages on Reactors
. Each Reactor
is responsible for handling incoming messages of one or more Channels
. So while sending outgoing messages is typically performed on the peer, incoming messages are received on the reactor.
type Switch struct {
config *config.P2PConfig
reactors map[string]Reactor
chDescs []*conn.ChannelDescriptor
reactorsByCh map[byte]Reactor
peers *PeerSet
dialing *cmap.CMap
reconnecting *cmap.CMap
nodeInfo NodeInfo // our node info
nodeKey *NodeKey // our node privkey
addrBook AddrBook
// peers addresses with whom we'll maintain constant connection
persistentPeersAddrs []*NetAddress
unconditionalPeerIDs map[ID]struct{}
transport Transport
filterTimeout time.Duration
peerFilters []PeerFilterFunc
rng *rand.Rand // seed for randomizing dial times and orders
metrics *Metrics
Switch is a base service
. It is started when node starts. Evenmore, Switch starts all reactors registered.
// NewNode
sw := createSwitch(
config, transport, p2pMetrics, peerFilters, mempoolReactor, bcReactor,
consensusReactor, evidenceReactor, nodeInfo, nodeKey, p2pLogger,
// Node.Start
// Start the switch (the P2P server).
err = n.sw.Start()
if err != nil {
return err
Switch takes transport as a input parameter, which is the source of all messages from p2p network.
Switch starts all registered Reactors:
// OnStart implements BaseService. It starts all the reactors and peers.
func (sw *Switch) OnStart() error {
// Start reactors
for _, reactor := range sw.reactors {
err := reactor.Start()
if err != nil {
return errors.Wrapf(err, "failed to start %v", reactor)
// Start accepting Peers.
go sw.acceptRoutine()
return nil
In addition, Switch starts a go routine to accept incoming peers.
acceptRoutine – inbound
It waits for any new peer connection from underlying MultiplexTransport. Once upon success, it will call addPeer().
func (sw *Switch) acceptRoutine() {
for {
p, err := sw.transport.Accept(peerConfig{
chDescs: sw.chDescs,
onPeerError: sw.StopPeerForError,
reactorsByCh: sw.reactorsByCh,
metrics: sw.metrics,
isPersistent: sw.IsPeerPersistent,
if err != nil {
switch err := err.(type) {
case ErrRejected:
if err.IsSelf() {
// Remove the given address from the address book and add to our addresses
// to avoid dialing in the future.
addr := err.Addr()
"Inbound Peer rejected",
"err", err,
"numPeers", sw.peers.Size(),
case ErrFilterTimeout:
"Peer filter timed out",
"err", err,
case ErrTransportClosed:
"Stopped accept routine, as transport is closed",
"numPeers", sw.peers.Size(),
"Accept on transport errored",
"err", err,
"numPeers", sw.peers.Size(),
// We could instead have a retry loop around the acceptRoutine,
// but that would need to stop and let the node shutdown eventually.
// So might as well panic and let process managers restart the node.
// There's no point in letting the node run without the acceptRoutine,
// since it won't be able to accept new connections.
panic(fmt.Errorf("accept routine exited: %v", err))
if !sw.IsPeerUnconditional(p.NodeInfo().ID()) {
// Ignore connection if we already have enough peers.
_, in, _ := sw.NumPeers()
if in >= sw.config.MaxNumInboundPeers {
"Ignoring inbound connection: already have enough inbound peers",
"address", p.SocketAddr(),
"have", in,
"max", sw.config.MaxNumInboundPeers,
if err := sw.addPeer(p); err != nil {
if p.IsRunning() {
_ = p.Stop()
"Ignoring inbound connection: error while adding peer",
"err", err,
"id", p.ID(),
DialPeerWithAddress – outbound
It dials the given peer and runs sw.addPeer if it connects and authenticates successfully.
Whenever a peer connection is established, peer will be started up and added to Switch peerset map.
Reactors will be invoked for the peer added: Reactor.InitPeer -> Reactor.AddPeer
// addPeer starts up the Peer and adds it to the Switch. Error is returned if
// the peer is filtered out or failed to start or can't be added.
func (sw *Switch) addPeer(p Peer) error {
if err := sw.filterPeer(p); err != nil {
return err
p.SetLogger(sw.Logger.With("peer", p.SocketAddr()))
// Handle the shut down case where the switch has stopped but we're
// concurrently trying to add a peer.
if !sw.IsRunning() {
// XXX should this return an error or just log and terminate?
sw.Logger.Error("Won't start a peer - switch is not running", "peer", p)
return nil
// Add some data to the peer, which is required by reactors.
for _, reactor := range sw.reactors {
p = reactor.InitPeer(p)
// Start the peer's send/recv routines.
// Must start it before adding it to the peer set
// to prevent Start and Stop from being called concurrently.
err := p.Start()
if err != nil {
// Should never happen
sw.Logger.Error("Error starting peer", "err", err, "peer", p)
return err
// Add the peer to PeerSet. Do this before starting the reactors
// so that if Receive errors, we will find the peer and remove it.
// Add should not err since we already checked peers.Has().
if err := sw.peers.Add(p); err != nil {
return err
// Start all the reactor protocols on the peer.
for _, reactor := range sw.reactors {
sw.Logger.Info("Added peer", "peer", p)
return nil
Reactor is responsible for handling incoming messages from Switch on one or more channels.
// Reactor is responsible for handling incoming messages on one or more
// Channel. Switch calls GetChannels when reactor is added to it. When a new
// peer joins our node, InitPeer and AddPeer are called. RemovePeer is called
// when the peer is stopped. Receive is called when a message is received on a
// channel associated with this reactor.
// Peer#Send or Peer#TrySend should be used to send the message to a peer.
type Reactor interface {
service.Service // Start, Stop
// SetSwitch allows setting a switch.
// GetChannels returns the list of MConnection.ChannelDescriptor. Make sure
// that each ID is unique across all the reactors added to the switch.
GetChannels() []*conn.ChannelDescriptor
// InitPeer is called by the switch before the peer is started. Use it to
// initialize data for the peer (e.g. peer state).
// NOTE: The switch won't call AddPeer nor RemovePeer if it fails to start
// the peer. Do not store any data associated with the peer in the reactor
// itself unless you don't want to have a state, which is never cleaned up.
InitPeer(peer Peer) Peer
// AddPeer is called by the switch after the peer is added and successfully
// started. Use it to start goroutines communicating with the peer.
AddPeer(peer Peer)
// RemovePeer is called by the switch when the peer is stopped (due to error
// or other reason).
RemovePeer(peer Peer, reason interface{})
// Receive is called by the switch when msgBytes is received from the peer.
// NOTE reactor can not keep msgBytes around after Receive completes without
// copying.
// CONTRACT: msgBytes are not nil.
Receive(chID byte, peer Peer, msgBytes []byte)
There are a few Reactors registered to P2P Switch to watch the peer connection lifecycle.
- Consensus
- BlockChain
- Mempool
- Evidence
It handles the consensus service.
StateChannel = byte(0x20)
DataChannel = byte(0x21)
VoteChannel = byte(0x22)
VoteSetBitsChannel = byte(0x23)
It handles long-term catchup syncing. there are v0, v1 and v2 in tendermint. Here let’s take v0 as example.
// BlockchainChannel is a channel for blocks and status updates (`BlockStore` height)
BlockchainChannel = byte(0x40)
It handles PEX (peer exchange) and ensures that an adequate number of peers are connected to the switch. It uses AddrBook
(address book) to store NetAddress
es of the peers.
// PexChannel is a channel for PEX messages
PexChannel = byte(0x00)
It handles mempool tx broadcasting amongst peers.
MempoolChannel = byte(0x30)
It handles evpool evidence broadcasting amongst peers.
EvidenceChannel = byte(0x38)