Skip to content

Commit

Permalink
blockmanager: Rework chain current logic.
Browse files Browse the repository at this point in the history
This reworks that logic that determines whether or not the block manager
believes the chain is current (synced) to use a flag that is protected
by a separate mutex and is updated on the fly versus needing to go
through a channel and using the current sync peer.

While the primary objective is to decouple the block manager from the
server so it can be split out into a separate internal netsync package,
this also has the benefit of much faster state querying and allowing
looser coupling of block processing without the potential of deadlocks.
  • Loading branch information
davecgh committed Dec 10, 2020
1 parent 46b8ea1 commit b329bdc
Showing 1 changed file with 43 additions and 40 deletions.
83 changes: 43 additions & 40 deletions blockmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,13 +219,6 @@ type processTransactionMsg struct {
reply chan processTransactionResponse
}

// isCurrentMsg is a message type to be sent across the message channel for
// requesting whether or not the block manager believes it is synced with
// the currently connected peers.
type isCurrentMsg struct {
reply chan bool
}

// headerNode is used as a node in a list of headers that are linked together
// between checkpoints.
type headerNode struct {
Expand Down Expand Up @@ -354,6 +347,11 @@ type blockManager struct {
// peers.
syncHeightMtx sync.Mutex
syncHeight int64

// The following fields are used to track whether or not the manager
// believes it is fully synced to the network.
isCurrentMtx sync.RWMutex
isCurrent bool
}

// resetHeaderState sets the headers-first mode state to values appropriate for
Expand Down Expand Up @@ -466,6 +464,15 @@ func (b *blockManager) startSync() {
}
}

// Update the state of whether or not the manager believes the chain is
// fully synced to whatever the chain believes when there is no candidate
// for a sync peer.
if bestPeer == nil {
b.isCurrentMtx.Lock()
b.isCurrent = b.cfg.Chain.IsCurrent()
b.isCurrentMtx.Unlock()
}

// Start syncing from the best peer if one was selected.
if bestPeer != nil {
// Clear the requestedBlocks if the sync peer changes, otherwise
Expand All @@ -484,6 +491,14 @@ func (b *blockManager) startSync() {
bmgrLog.Infof("Syncing to block height %d from peer %v",
bestPeer.LastBlock(), bestPeer.Addr())

// The chain is not synced whenever the current best height is less than
// the height to sync to.
b.isCurrentMtx.Lock()
if best.Height < bestPeer.LastBlock() {
b.isCurrent = false
}
b.isCurrentMtx.Unlock()

// When the current height is less than a known checkpoint we
// can use block headers to learn about which blocks comprise
// the chain up to the checkpoint and perform less validation
Expand Down Expand Up @@ -1015,29 +1030,17 @@ func (b *blockManager) processBlockAndOrphans(block *dcrutil.Block, flags blockc
return 0, false, err
}

return forkLen, false, nil
}

// current returns true if we believe we are synced with our peers, false if we
// still have blocks to check
func (b *blockManager) current() bool {
if !b.cfg.Chain.IsCurrent() {
return false
}

// if blockChain thinks we are current and we have no syncPeer it
// is probably right.
if b.syncPeer == nil {
return true
}

// No matter what chain thinks, if we are below the block we are syncing
// to we are not current.
if b.cfg.Chain.BestSnapshot().Height < b.syncPeer.LastBlock() {
return false
// The chain is considered synced once both the blockchain believes it is
// current and the sync height is reached or exceeded.
best := b.cfg.Chain.BestSnapshot()
syncHeight := b.SyncHeight()
if best.Height >= syncHeight && b.cfg.Chain.IsCurrent() {
b.isCurrentMtx.Lock()
b.isCurrent = true
b.isCurrentMtx.Unlock()
}

return true
return forkLen, false, nil
}

// handleBlockMsg handles block messages from all peers.
Expand Down Expand Up @@ -1173,7 +1176,7 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) {
// chain was not yet current or lost the lock announcement race.
blockHeight := int64(bmsg.block.MsgBlock().Header.Height)
peer.UpdateLastBlockHeight(blockHeight)
if isOrphan || (onMainChain && b.current()) {
if isOrphan || (onMainChain && b.IsCurrent()) {
go b.cfg.PeerNotifier.UpdatePeerHeights(blockHash, blockHeight,
bmsg.peer)
}
Expand Down Expand Up @@ -1209,9 +1212,8 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) {
"peer %s: %v", peer.Addr(), err)
return
}
bmgrLog.Infof("Downloading headers for blocks %d to %d from "+
"peer %s", prevHeight+1, b.nextCheckpoint.Height,
b.syncPeer.Addr())
bmgrLog.Infof("Downloading headers for blocks %d to %d from peer %s",
prevHeight+1, b.nextCheckpoint.Height, b.syncPeer.Addr())
return
}

Expand Down Expand Up @@ -1488,7 +1490,7 @@ func (b *blockManager) handleInvMsg(imsg *invMsg) {
}

fromSyncPeer := peer == b.syncPeer
isCurrent := b.current()
isCurrent := b.IsCurrent()

// If this inv contains a block announcement, and this isn't coming from
// our current sync peer or we're current, then update the last
Expand Down Expand Up @@ -1801,9 +1803,6 @@ out:
err: err,
}

case isCurrentMsg:
msg.reply <- b.current()

default:
bmgrLog.Warnf("Invalid message type in block handler: %T", msg)
}
Expand Down Expand Up @@ -1882,7 +1881,7 @@ func (b *blockManager) handleBlockchainNotification(notification *blockchain.Not
// are not current. Other peers that are current should already
// know about it and clients, such as wallets, shouldn't be voting on
// old blocks.
if !b.current() {
if !b.IsCurrent() {
return
}

Expand Down Expand Up @@ -2481,10 +2480,13 @@ func (b *blockManager) ProcessTransaction(tx *dcrutil.Tx, allowOrphans bool,

// IsCurrent returns whether or not the block manager believes it is synced with
// the connected peers.
//
// This function is safe for concurrent access.
func (b *blockManager) IsCurrent() bool {
reply := make(chan bool)
b.msgChan <- isCurrentMsg{reply: reply}
return <-reply
b.isCurrentMtx.Lock()
isCurrent := b.isCurrent
b.isCurrentMtx.Unlock()
return isCurrent
}

// TicketPoolValue returns the current value of the total stake in the ticket
Expand All @@ -2508,6 +2510,7 @@ func newBlockManager(config *blockManagerConfig) (*blockManager, error) {
quit: make(chan struct{}),
orphans: make(map[chainhash.Hash]*orphanBlock),
prevOrphans: make(map[chainhash.Hash][]*orphanBlock),
isCurrent: config.Chain.IsCurrent(),
}

best := bm.cfg.Chain.BestSnapshot()
Expand Down

0 comments on commit b329bdc

Please sign in to comment.