The consensus reactor is the service for consensus p2p messages.
Channel
StateChannel = byte(0x20)
DataChannel = byte(0x21)
VoteChannel = byte(0x22)
VoteSetBitsChannel = byte(0x23)
- State, HasVoteMessage , VoteSetMaj23Message, NewValidBlockMessage, NewRoundStepMessage
- Data, ProposalMessage, ProposalPOLMessage, BlockPartMessage
- Vote, VoteMessage
- VoteSetBits, VoteSetBitsMessage
Messages
// NewRoundStepMessage is sent for every step taken in the ConsensusState.
// For every height/round/step transition
type NewRoundStepMessage struct {
Height int64
Round int
Step cstypes.RoundStepType
SecondsSinceStartTime int
LastCommitRound int
}
// NewValidBlockMessage is sent when a validator observes a valid block B in some round r,
//i.e., there is a Proposal for block B and 2/3+ prevotes for the block B in the round r.
// In case the block is also committed, then IsCommit flag is set to true.
type NewValidBlockMessage struct {
Height int64
Round int
BlockPartsHeader types.PartSetHeader
BlockParts *bits.BitArray
IsCommit bool
}
// ProposalMessage is sent when a new block is proposed.
type ProposalMessage struct {
Proposal *types.Proposal
}
// ProposalPOLMessage is sent when a previous proposal is re-proposed.
type ProposalPOLMessage struct {
Height int64
ProposalPOLRound int
ProposalPOL *bits.BitArray
}
// BlockPartMessage is sent when gossipping a piece of the proposed block.
type BlockPartMessage struct {
Height int64
Round int
Part *types.Part
}
// VoteMessage is sent when voting for a proposal (or lack thereof).
type VoteMessage struct {
Vote *types.Vote
}
// HasVoteMessage is sent to indicate that a particular vote has been received.
type HasVoteMessage struct {
Height int64
Round int
Type types.SignedMsgType
Index int
}
// VoteSetMaj23Message is sent to indicate that a given BlockID has seen +2/3 votes.
type VoteSetMaj23Message struct {
Height int64
Round int
Type types.SignedMsgType
BlockID types.BlockID
}
// VoteSetBitsMessage is sent to communicate the bit-array of votes seen for the BlockID.
type VoteSetBitsMessage struct {
Height int64
Round int
Type types.SignedMsgType
BlockID types.BlockID
Votes *bits.BitArray
}
OnStart
Start a stats go routine and subsribe to eventSwitch for EventNewRoundStep, EventValidBlock and EventVote. These events will be broadcasted to p2p network.
// OnStart implements BaseService by subscribing to events, which later will be
// broadcasted to other peers and starting state if we're not in fast sync.
func (conR *Reactor) OnStart() error {
conR.Logger.Info("Reactor ", "fastSync", conR.FastSync())
// start routine that computes peer statistics for evaluating peer quality
go conR.peerStatsRoutine()
conR.subscribeToBroadcastEvents()
if !conR.FastSync() {
err := conR.conS.Start()
if err != nil {
return err
}
}
return nil
}
PeerState
When a peer is about to be added, a PeerState will be created to keep track of it.
// PeerState contains the known state of a peer, including its connection and
// threadsafe access to its PeerRoundState.
// NOTE: THIS GETS DUMPED WITH rpc/core/consensus.go.
// Be mindful of what you Expose.
type PeerState struct {
peer p2p.Peer
logger log.Logger
mtx sync.Mutex // NOTE: Modify below using setters, never directly.
PRS cstypes.PeerRoundState `json:"round_state"` // Exposed.
Stats *peerStateStats `json:"stats"` // Exposed.
}
AddPeer
A few go routines will be created for a newly added peer.
// AddPeer implements Reactor by spawning multiple gossiping goroutines for the
// peer.
func (conR *Reactor) AddPeer(peer p2p.Peer) {
if !conR.IsRunning() {
return
}
peerState, ok := peer.Get(types.PeerStateKey).(*PeerState)
if !ok {
panic(fmt.Sprintf("peer %v has no state", peer))
}
// Begin routines for this peer.
go conR.gossipDataRoutine(peer, peerState)
go conR.gossipVotesRoutine(peer, peerState)
go conR.queryMaj23Routine(peer, peerState)
// Send our state to peer.
// If we're fast_syncing, broadcast a RoundStepMessage later upon SwitchToConsensus().
if !conR.FastSync() {
conR.sendNewRoundStepMessage(peer)
}
}
- gossipDataRoutine
- Send block parts
- Help peer to catch up with my height, when peer.Height < state.Height
- Send Proposal or ProposalPOL to peer
- gossipVotesRoutine
- Gossip Vote to peer if Height match
- NewHeight, vote for rs.LastCommit
- Send POL prevotes if need
- Send prevotes if need
- Send precommit if need
- If peer is lagging by height 1, send LastCommit
- If peer is lagging by more than 1, send Commit at prs.Height
- Gossip Vote to peer if Height match
- queryMaj23Routine
- send Height/Round/Prevotes
- send Height/Round/Precommits
- send Height/Round/ProposalPOL
- send Height/CatchupCommitRound/CatchupCommit
Receive
It handles messages from p2p Switch.
// Receive implements Reactor
// NOTE: We process these messages even when we're fast_syncing.
// Messages affect either a peer state or the consensus state.
// Peer state updates can happen in parallel, but processing of
// proposals, block parts, and votes are ordered by the receiveRoutine
// NOTE: blocks on consensus state for proposals, block parts, and votes
func (conR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
if !conR.IsRunning() {
conR.Logger.Debug("Receive", "src", src, "chId", chID, "bytes", msgBytes)
return
}
msg, err := decodeMsg(msgBytes)
if err != nil {
conR.Logger.Error("Error decoding message", "src", src, "chId", chID, "msg", msg, "err", err, "bytes", msgBytes)
conR.Switch.StopPeerForError(src, err)
return
}
if err = msg.ValidateBasic(); err != nil {
conR.Logger.Error("Peer sent us invalid msg", "peer", src, "msg", msg, "err", err)
conR.Switch.StopPeerForError(src, err)
return
}
conR.Logger.Debug("Receive", "src", src, "chId", chID, "msg", msg)
// Get peer states
ps, ok := src.Get(types.PeerStateKey).(*PeerState)
if !ok {
panic(fmt.Sprintf("Peer %v has no state", src))
}
switch chID {
case StateChannel:
switch msg := msg.(type) {
case *NewRoundStepMessage:
ps.ApplyNewRoundStepMessage(msg)
case *NewValidBlockMessage:
ps.ApplyNewValidBlockMessage(msg)
case *HasVoteMessage:
ps.ApplyHasVoteMessage(msg)
case *VoteSetMaj23Message:
cs := conR.conS
cs.mtx.Lock()
height, votes := cs.Height, cs.Votes
cs.mtx.Unlock()
if height != msg.Height {
return
}
// Peer claims to have a maj23 for some BlockID at H,R,S,
err := votes.SetPeerMaj23(msg.Round, msg.Type, ps.peer.ID(), msg.BlockID)
if err != nil {
conR.Switch.StopPeerForError(src, err)
return
}
// Respond with a VoteSetBitsMessage showing which votes we have.
// (and consequently shows which we don't have)
var ourVotes *bits.BitArray
switch msg.Type {
case types.PrevoteType:
ourVotes = votes.Prevotes(msg.Round).BitArrayByBlockID(msg.BlockID)
case types.PrecommitType:
ourVotes = votes.Precommits(msg.Round).BitArrayByBlockID(msg.BlockID)
default:
panic("Bad VoteSetBitsMessage field Type. Forgot to add a check in ValidateBasic?")
}
src.TrySend(VoteSetBitsChannel, cdc.MustMarshalBinaryBare(&VoteSetBitsMessage{
Height: msg.Height,
Round: msg.Round,
Type: msg.Type,
BlockID: msg.BlockID,
Votes: ourVotes,
}))
default:
conR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg)))
}
case DataChannel:
if conR.FastSync() {
conR.Logger.Info("Ignoring message received during fastSync", "msg", msg)
return
}
switch msg := msg.(type) {
case *ProposalMessage:
ps.SetHasProposal(msg.Proposal)
conR.conS.peerMsgQueue <- msgInfo{msg, src.ID()}
case *ProposalPOLMessage:
ps.ApplyProposalPOLMessage(msg)
case *BlockPartMessage:
ps.SetHasProposalBlockPart(msg.Height, msg.Round, msg.Part.Index)
conR.metrics.BlockParts.With("peer_id", string(src.ID())).Add(1)
conR.conS.peerMsgQueue <- msgInfo{msg, src.ID()}
default:
conR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg)))
}
case VoteChannel:
if conR.FastSync() {
conR.Logger.Info("Ignoring message received during fastSync", "msg", msg)
return
}
switch msg := msg.(type) {
case *VoteMessage:
cs := conR.conS
cs.mtx.RLock()
height, valSize, lastCommitSize := cs.Height, cs.Validators.Size(), cs.LastCommit.Size()
cs.mtx.RUnlock()
ps.EnsureVoteBitArrays(height, valSize)
ps.EnsureVoteBitArrays(height-1, lastCommitSize)
ps.SetHasVote(msg.Vote)
cs.peerMsgQueue <- msgInfo{msg, src.ID()}
default:
// don't punish (leave room for soft upgrades)
conR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg)))
}
case VoteSetBitsChannel:
if conR.FastSync() {
conR.Logger.Info("Ignoring message received during fastSync", "msg", msg)
return
}
switch msg := msg.(type) {
case *VoteSetBitsMessage:
cs := conR.conS
cs.mtx.Lock()
height, votes := cs.Height, cs.Votes
cs.mtx.Unlock()
if height == msg.Height {
var ourVotes *bits.BitArray
switch msg.Type {
case types.PrevoteType:
ourVotes = votes.Prevotes(msg.Round).BitArrayByBlockID(msg.BlockID)
case types.PrecommitType:
ourVotes = votes.Precommits(msg.Round).BitArrayByBlockID(msg.BlockID)
default:
panic("Bad VoteSetBitsMessage field Type. Forgot to add a check in ValidateBasic?")
}
ps.ApplyVoteSetBitsMessage(msg, ourVotes)
} else {
ps.ApplyVoteSetBitsMessage(msg, nil)
}
default:
// don't punish (leave room for soft upgrades)
conR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg)))
}
default:
conR.Logger.Error(fmt.Sprintf("Unknown chId %X", chID))
}
}
SwitchToConsensus
It will be called from BlockChain reactor to switch from FastSync to consensus mode, starting the consensus state.
// SwitchToConsensus switches from fast_sync mode to consensus mode.
// It resets the state, turns off fast_sync, and starts the consensus state-machine
func (conR *Reactor) SwitchToConsensus(state sm.State, blocksSynced int) {
conR.Logger.Info("SwitchToConsensus")
conR.conS.reconstructLastCommit(state)
// NOTE: The line below causes broadcastNewRoundStepRoutine() to
// broadcast a NewRoundStepMessage.
conR.conS.updateToState(state)
conR.mtx.Lock()
conR.fastSync = false
conR.mtx.Unlock()
conR.metrics.FastSyncing.Set(0)
if blocksSynced > 0 {
// dont bother with the WAL if we fast synced
conR.conS.doWALCatchup = false
}
err := conR.conS.Start()
if err != nil {
panic(fmt.Sprintf(`Failed to start consensus state: %v
conS:
%+v
conR:
%+v`, err, conR.conS, conR))
}
}
版权声明:本文为m0_37889044原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。