From 09842b88b62b8d9ace0a30c974fde4121317b854 Mon Sep 17 00:00:00 2001 From: Dave Collins Date: Thu, 10 Dec 2020 01:30:56 -0600 Subject: [PATCH] multi: Handle chain ntfn callback in server. This further decouples the block manager from the server by moving the chain callback code from the block manager to server where it more naturally belongs since it is not directly related to sync, rather it is in response to it. The following is a high level overview of the changes: - Move blockchain notification callback to server - Use instance vars and methods directly on server - Move funcs and consts only used by callback from blockmanager.go to server.go - Remove no longer needed methods from peerNotifier interface - RelayInventory - TransactionConfirmed - Remove no longer needed fields from blockManagerConfig struct - FeeEstimator - BgBlkTmplGenerator - NotifyWinningTickets - PruneRebroadcastInventory - Move lottery data duplicate filter state to server - Move block announce duplicate filter state to server - Misc consistency nits This is a part of the overall effort to decouple the block manager from the server so it can be split out into a separate internal netsync package. --- blockmanager.go | 463 +---------------------------------------------- server.go | 465 +++++++++++++++++++++++++++++++++++++++++++++--- 2 files changed, 446 insertions(+), 482 deletions(-) diff --git a/blockmanager.go b/blockmanager.go index 53b4776b90..1e32961884 100644 --- a/blockmanager.go +++ b/blockmanager.go @@ -19,9 +19,7 @@ import ( "github.com/decred/dcrd/chaincfg/v3" "github.com/decred/dcrd/database/v2" "github.com/decred/dcrd/dcrutil/v3" - "github.com/decred/dcrd/internal/fees" "github.com/decred/dcrd/internal/mempool" - "github.com/decred/dcrd/internal/mining" "github.com/decred/dcrd/internal/rpcserver" peerpkg "github.com/decred/dcrd/peer/v2" "github.com/decred/dcrd/txscript/v3" @@ -49,21 +47,6 @@ const ( // maxRequestedTxns is the maximum number of requested transactions // hashes to store in memory. maxRequestedTxns = wire.MaxInvPerMsg - - // maxReorgDepthNotify specifies the maximum reorganization depth for - // which winning ticket notifications will be sent over RPC. The reorg - // depth is the number of blocks that would be reorganized out of the - // current best chain if a side chain being considered for notifications - // were to ultimately be extended to be longer than the current one. - // - // In effect, this helps to prevent large reorgs by refusing to send the - // winning ticket information to RPC clients, such as voting wallets, - // which depend on it to cast votes. - // - // This check also doubles to help reduce exhaustion attacks that could - // otherwise arise from sending old orphan blocks and forcing nodes to - // do expensive lottery data calculations for them. - maxReorgDepthNotify = 6 ) // zeroHash is the zero value hash (all zeros). It is defined as a convenience. @@ -222,14 +205,6 @@ type peerNotifier interface { // UpdatePeerHeights updates the heights of all peers who have // announced the latest connected main chain block, or a recognized orphan. UpdatePeerHeights(latestBlkHash *chainhash.Hash, latestHeight int64, updateSource *peerpkg.Peer) - - // RelayInventory relays the passed inventory vector to all connected peers - // that are not already known to have it. - RelayInventory(invVect *wire.InvVect, data interface{}, immediate bool) - - // TransactionConfirmed marks the provided single confirmation transaction - // as no longer needing rebroadcasting. - TransactionConfirmed(tx *dcrutil.Tx) } // blockManangerConfig is a configuration struct for a blockManager. @@ -244,16 +219,12 @@ type blockManagerConfig struct { // SigCache defines the signature cache to use. SigCache *txscript.SigCache - // The following fields provide access to the fee estimator, mempool and - // the background block template generator. - FeeEstimator *fees.Estimator - TxMemPool *mempool.TxPool - BgBlkTmplGenerator *mining.BgBlkTmplGenerator + // The following field provides access to the mempool. + TxMemPool *mempool.TxPool - // The following fields are blockManager callbacks. - NotifyWinningTickets func(*rpcserver.WinningTicketsNtfnData) - PruneRebroadcastInventory func() - RpcServer func() *rpcserver.Server + // RpcServer returns an instance of an RPC server to use for notifications. + // It may return nil if there is no active RPC server. + RpcServer func() *rpcserver.Server // DisableCheckpoints indicates whether or not the block manager should make // use of checkpoints. @@ -319,17 +290,6 @@ type blockManager struct { prevOrphans map[chainhash.Hash][]*orphanBlock oldestOrphan *orphanBlock - // lotteryDataBroadcastMutex is a mutex protecting the map - // that checks if block lottery data has been broadcasted - // yet for any given block, so notifications are never - // duplicated. - lotteryDataBroadcast map[chainhash.Hash]struct{} - lotteryDataBroadcastMutex sync.RWMutex - - // The following fields are used to filter duplicate block announcements. - announcedBlockMtx sync.Mutex - announcedBlock *chainhash.Hash - // The following fields are used to track the height being synced to from // peers. syncHeightMtx sync.Mutex @@ -1124,8 +1084,7 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) { // Notify stake difficulty subscribers and prune invalidated // transactions. best := b.cfg.Chain.BestSnapshot() - r := b.cfg.RpcServer() - if r != nil { + if r := b.cfg.RpcServer(); r != nil { // Update registered websocket clients on the // current stake difficulty. r.NotifyStakeDifficulty( @@ -1706,8 +1665,7 @@ out: // Notify stake difficulty subscribers and prune // invalidated transactions. best := b.cfg.Chain.BestSnapshot() - r := b.cfg.RpcServer() - if r != nil { + if r := b.cfg.RpcServer(); r != nil { r.NotifyStakeDifficulty( &rpcserver.StakeDifficultyNtfnData{ BlockHash: best.Hash, @@ -1743,13 +1701,12 @@ out: continue } - r := b.cfg.RpcServer() onMainChain := !isOrphan && forkLen == 0 if onMainChain { // Notify stake difficulty subscribers and prune // invalidated transactions. best := b.cfg.Chain.BestSnapshot() - if r != nil { + if r := b.cfg.RpcServer(); r != nil { r.NotifyStakeDifficulty( &rpcserver.StakeDifficultyNtfnData{ BlockHash: best.Hash, @@ -1788,409 +1745,6 @@ out: bmgrLog.Trace("Block handler done") } -// notifiedWinningTickets returns whether or not the winning tickets -// notification for the specified block hash has already been sent. -func (b *blockManager) notifiedWinningTickets(hash *chainhash.Hash) bool { - b.lotteryDataBroadcastMutex.Lock() - _, beenNotified := b.lotteryDataBroadcast[*hash] - b.lotteryDataBroadcastMutex.Unlock() - return beenNotified -} - -// headerApprovesParent returns whether or not the vote bits in the passed -// header indicate the regular transaction tree of the parent block should be -// considered valid. -func headerApprovesParent(header *wire.BlockHeader) bool { - return dcrutil.IsFlagSet16(header.VoteBits, dcrutil.BlockValid) -} - -// isDoubleSpendOrDuplicateError returns whether or not the passed error, which -// is expected to have come from mempool, indicates a transaction was rejected -// either due to containing a double spend or already existing in the pool. -func isDoubleSpendOrDuplicateError(err error) bool { - switch { - case errors.Is(err, mempool.ErrDuplicate): - return true - case errors.Is(err, mempool.ErrAlreadyExists): - return true - case errors.Is(err, blockchain.ErrMissingTxOut): - return true - } - - return false -} - -// handleBlockchainNotification handles notifications from blockchain. It does -// things such as request orphan block parents and relay accepted blocks to -// connected peers. -func (b *blockManager) handleBlockchainNotification(notification *blockchain.Notification) { - switch notification.Type { - // A block that intends to extend the main chain has passed all sanity and - // contextual checks and the chain is believed to be current. Relay it to - // other peers. - case blockchain.NTNewTipBlockChecked: - // WARNING: The chain lock is not released before sending this - // notification, so care must be taken to avoid calling chain functions - // which could result in a deadlock. - block, ok := notification.Data.(*dcrutil.Block) - if !ok { - bmgrLog.Warnf("New tip block checked notification is not a block.") - break - } - - // Generate the inventory vector and relay it immediately. - iv := wire.NewInvVect(wire.InvTypeBlock, block.Hash()) - b.cfg.PeerNotifier.RelayInventory(iv, block.MsgBlock().Header, true) - b.announcedBlockMtx.Lock() - b.announcedBlock = block.Hash() - b.announcedBlockMtx.Unlock() - - // A block has been accepted into the block chain. Relay it to other peers - // (will be ignored if already relayed via NTNewTipBlockChecked) and - // possibly notify RPC clients with the winning tickets. - case blockchain.NTBlockAccepted: - // Don't relay or notify RPC clients with winning tickets if we - // are not current. Other peers that are current should already - // know about it and clients, such as wallets, shouldn't be voting on - // old blocks. - if !b.IsCurrent() { - return - } - - band, ok := notification.Data.(*blockchain.BlockAcceptedNtfnsData) - if !ok { - bmgrLog.Warnf("Chain accepted notification is not " + - "BlockAcceptedNtfnsData.") - break - } - block := band.Block - - // Send a winning tickets notification as needed. The notification will - // only be sent when the following conditions hold: - // - // - The RPC server is running - // - The block that would build on this one is at or after the height - // voting begins - // - The block that would build on this one would not cause a reorg - // larger than the max reorg notify depth - // - This block is after the final checkpoint height - // - A notification for this block has not already been sent - // - // To help visualize the math here, consider the following two competing - // branches: - // - // 100 -> 101 -> 102 -> 103 -> 104 -> 105 -> 106 - // \-> 101' -> 102' - // - // Further, assume that this is a notification for block 103', or in - // other words, it is extending the shorter side chain. The reorg depth - // would be 106 - (103 - 3) = 6. This should intuitively make sense, - // because if the side chain were to be extended enough to become the - // best chain, it would result in a reorg that would remove 6 blocks, - // namely blocks 101, 102, 103, 104, 105, and 106. - blockHash := block.Hash() - bestHeight := band.BestHeight - blockHeight := int64(block.MsgBlock().Header.Height) - reorgDepth := bestHeight - (blockHeight - band.ForkLen) - if b.cfg.RpcServer() != nil && - blockHeight >= b.cfg.ChainParams.StakeValidationHeight-1 && - reorgDepth < maxReorgDepthNotify && - blockHeight > b.cfg.ChainParams.LatestCheckpointHeight() && - !b.notifiedWinningTickets(blockHash) { - - // Obtain the winning tickets for this block. handleNotifyMsg - // should be safe for concurrent access of things contained - // within blockchain. - wt, _, _, err := b.cfg.Chain.LotteryDataForBlock(blockHash) - if err != nil { - bmgrLog.Errorf("Couldn't calculate winning tickets for "+ - "accepted block %v: %v", blockHash, err.Error()) - } else { - // Notify registered websocket clients of newly - // eligible tickets to vote on. - b.cfg.NotifyWinningTickets(&rpcserver.WinningTicketsNtfnData{ - BlockHash: *blockHash, - BlockHeight: blockHeight, - Tickets: wt, - }) - - b.lotteryDataBroadcastMutex.Lock() - b.lotteryDataBroadcast[*blockHash] = struct{}{} - b.lotteryDataBroadcastMutex.Unlock() - } - } - - // Generate the inventory vector and relay it immediately if not already - // known to have been sent in NTNewTipBlockChecked. - b.announcedBlockMtx.Lock() - sent := b.announcedBlock != nil && *b.announcedBlock == *blockHash - b.announcedBlock = nil - b.announcedBlockMtx.Unlock() - if !sent { - iv := wire.NewInvVect(wire.InvTypeBlock, blockHash) - b.cfg.PeerNotifier.RelayInventory(iv, block.MsgBlock().Header, true) - } - - // Inform the background block template generator about the accepted - // block. - if b.cfg.BgBlkTmplGenerator != nil { - b.cfg.BgBlkTmplGenerator.BlockAccepted(block) - } - - if !b.cfg.FeeEstimator.IsEnabled() { - // fee estimation can only start after we have performed an initial - // sync, otherwise we'll start adding mempool transactions at the - // wrong height. - b.cfg.FeeEstimator.Enable(block.Height()) - } - - // A block has been connected to the main block chain. - case blockchain.NTBlockConnected: - ntfn, ok := notification.Data.(*blockchain.BlockConnectedNtfnsData) - if !ok { - bmgrLog.Warnf("Block connected notification is not " + - "BlockConnectedNtfnsData.") - break - } - block := ntfn.Block - parentBlock := ntfn.ParentBlock - isTreasuryEnabled := ntfn.IsTreasuryActive - - // Account for transactions mined in the newly connected block for fee - // estimation. This must be done before attempting to remove - // transactions from the mempool because the mempool will alert the - // estimator of the txs that are leaving - b.cfg.FeeEstimator.ProcessBlock(block) - - // TODO: In the case the new tip disapproves the previous block, any - // transactions the previous block contains in its regular tree which - // double spend the same inputs as transactions in either tree of the - // current tip should ideally be tracked in the pool as eligible for - // inclusion in an alternative tip (side chain block) in case the - // current tip block does not get enough votes. However, the - // transaction pool currently does not provide any way to distinguish - // this condition and thus only provides tracking based on the current - // tip. In order to handle this condition, the pool would have to - // provide a way to track and independently query which txns are - // eligible based on the current tip both approving and disapproving the - // previous block as well as the previous block itself. - - // Remove all of the regular and stake transactions in the connected - // block from the transaction pool. Also, remove any transactions which - // are now double spends as a result of these new transactions. - // Finally, remove any transaction that is no longer an orphan. - // Transactions which depend on a confirmed transaction are NOT removed - // recursively because they are still valid. Also, the coinbase of the - // regular tx tree is skipped because the transaction pool doesn't (and - // can't) have regular tree coinbase transactions in it. - // - // Also, in the case the RPC server is enabled, stop rebroadcasting any - // transactions in the block that were setup to be rebroadcast. - txMemPool := b.cfg.TxMemPool - handleConnectedBlockTxns := func(txns []*dcrutil.Tx) { - for _, tx := range txns { - txMemPool.RemoveTransaction(tx, false, isTreasuryEnabled) - txMemPool.MaybeAcceptDependents(tx, isTreasuryEnabled) - txMemPool.RemoveDoubleSpends(tx, isTreasuryEnabled) - txMemPool.RemoveOrphan(tx, isTreasuryEnabled) - acceptedTxs := txMemPool.ProcessOrphans(tx, - isTreasuryEnabled) - b.cfg.PeerNotifier.AnnounceNewTransactions(acceptedTxs) - - // Now that this block is in the blockchain, mark the - // transaction (except the coinbase) as no longer needing - // rebroadcasting. - b.cfg.PeerNotifier.TransactionConfirmed(tx) - } - } - - // Add regular transactions back to the mempool, - // excluding the coinbase since it does not belong in the mempool. - handleConnectedBlockTxns(block.Transactions()[1:]) - if isTreasuryEnabled { - // Skip treasurybase - handleConnectedBlockTxns(block.STransactions()[1:]) - } else { - handleConnectedBlockTxns(block.STransactions()) - } - - // In the case the regular tree of the previous block was disapproved, - // add all of the its transactions, with the exception of the coinbase, - // back to the transaction pool to be mined in a future block. - // - // Notice that some of those transactions might have been included in - // the current block and others might also be spending some of the same - // outputs that transactions in the previous originally block spent. - // This is the expected behavior because disapproval of the regular tree - // of the previous block essentially makes it as if those transactions - // never happened. - // - // Finally, if transactions fail to add to the pool for some reason - // other than the pool already having it (a duplicate) or now being a - // double spend, remove all transactions that depend on it as well. - // The dependents are not removed for double spends because the only - // way a transaction which was not a double spend in the previous block - // to now be one is due to some transaction in the current block - // (probably the same one) also spending those outputs, and, in that - // case, anything that happens to be in the pool which depends on the - // transaction is still valid. - if !headerApprovesParent(&block.MsgBlock().Header) { - for _, tx := range parentBlock.Transactions()[1:] { - _, err := txMemPool.MaybeAcceptTransaction(tx, false, true) - if err != nil && !isDoubleSpendOrDuplicateError(err) { - txMemPool.RemoveTransaction(tx, true, - isTreasuryEnabled) - } - } - } - - if r := b.cfg.RpcServer(); r != nil { - // Filter and update the rebroadcast inventory. - b.cfg.PruneRebroadcastInventory() - - // Notify registered websocket clients of incoming block. - r.NotifyBlockConnected(block) - } - - if b.cfg.BgBlkTmplGenerator != nil { - b.cfg.BgBlkTmplGenerator.BlockConnected(block) - } - - // Stake tickets are spent or missed from the most recently connected block. - case blockchain.NTSpentAndMissedTickets: - tnd, ok := notification.Data.(*blockchain.TicketNotificationsData) - if !ok { - bmgrLog.Warnf("Tickets connected notification is not " + - "TicketNotificationsData") - break - } - - if r := b.cfg.RpcServer(); r != nil { - r.NotifySpentAndMissedTickets(tnd) - } - - // Stake tickets are matured from the most recently connected block. - case blockchain.NTNewTickets: - tnd, ok := notification.Data.(*blockchain.TicketNotificationsData) - if !ok { - bmgrLog.Warnf("Tickets connected notification is not " + - "TicketNotificationsData") - break - } - - if r := b.cfg.RpcServer(); r != nil { - r.NotifyNewTickets(tnd) - } - - // A block has been disconnected from the main block chain. - case blockchain.NTBlockDisconnected: - ntfn, ok := notification.Data.(*blockchain.BlockDisconnectedNtfnsData) - if !ok { - bmgrLog.Warnf("Block disconnected notification is not " + - "BlockDisconnectedNtfnsData.") - break - } - block := ntfn.Block - parentBlock := ntfn.ParentBlock - isTreasuryEnabled := ntfn.IsTreasuryActive - - // In the case the regular tree of the previous block was disapproved, - // disconnecting the current block makes all of those transactions valid - // again. Thus, with the exception of the coinbase, remove all of those - // transactions and any that are now double spends from the transaction - // pool. Transactions which depend on a confirmed transaction are NOT - // removed recursively because they are still valid. - txMemPool := b.cfg.TxMemPool - if !headerApprovesParent(&block.MsgBlock().Header) { - for _, tx := range parentBlock.Transactions()[1:] { - txMemPool.RemoveTransaction(tx, false, isTreasuryEnabled) - txMemPool.MaybeAcceptDependents(tx, isTreasuryEnabled) - txMemPool.RemoveDoubleSpends(tx, isTreasuryEnabled) - txMemPool.RemoveOrphan(tx, isTreasuryEnabled) - txMemPool.ProcessOrphans(tx, isTreasuryEnabled) - } - } - - // Add all of the regular and stake transactions in the disconnected - // block, with the exception of the regular tree coinbase, back to the - // transaction pool to be mined in a future block. - // - // Notice that, in the case the previous block was disapproved, some of - // the transactions in the block being disconnected might have been - // included in the previous block and others might also have been - // spending some of the same outputs. This is the expected behavior - // because disapproval of the regular tree of the previous block - // essentially makes it as if those transactions never happened, so - // disconnecting the block that disapproved those transactions - // effectively revives them. - // - // Finally, if transactions fail to add to the pool for some reason - // other than the pool already having it (a duplicate) or now being a - // double spend, remove all transactions that depend on it as well. - // The dependents are not removed for double spends because the only - // way a transaction which was not a double spend in the block being - // disconnected to now be one is due to some transaction in the previous - // block (probably the same one), which was disapproved, also spending - // those outputs, and, in that case, anything that happens to be in the - // pool which depends on the transaction is still valid. - handleDisconnectedBlockTxns := func(txns []*dcrutil.Tx) { - for _, tx := range txns { - _, err := txMemPool.MaybeAcceptTransaction(tx, false, true) - if err != nil && !isDoubleSpendOrDuplicateError(err) { - txMemPool.RemoveTransaction(tx, true, - isTreasuryEnabled) - } - } - } - handleDisconnectedBlockTxns(block.Transactions()[1:]) - - if isTreasuryEnabled { - // Skip treasurybase - handleDisconnectedBlockTxns(block.STransactions()[1:]) - } else { - handleDisconnectedBlockTxns(block.STransactions()) - } - - if b.cfg.BgBlkTmplGenerator != nil { - b.cfg.BgBlkTmplGenerator.BlockDisconnected(block) - } - - // Notify registered websocket clients. - if r := b.cfg.RpcServer(); r != nil { - // Filter and update the rebroadcast inventory. - b.cfg.PruneRebroadcastInventory() - - // Notify registered websocket clients. - r.NotifyBlockDisconnected(block) - } - - // Chain reorganization has commenced. - case blockchain.NTChainReorgStarted: - if b.cfg.BgBlkTmplGenerator != nil { - b.cfg.BgBlkTmplGenerator.ChainReorgStarted() - } - - // Chain reorganization has concluded. - case blockchain.NTChainReorgDone: - if b.cfg.BgBlkTmplGenerator != nil { - b.cfg.BgBlkTmplGenerator.ChainReorgDone() - } - - // The blockchain is reorganizing. - case blockchain.NTReorganization: - rd, ok := notification.Data.(*blockchain.ReorganizationNtfnsData) - if !ok { - bmgrLog.Warnf("Chain reorganization notification is malformed") - break - } - - // Notify registered websocket clients. - if r := b.cfg.RpcServer(); r != nil { - r.NotifyReorganization(rd) - } - } -} - // NewPeer informs the block manager of a newly active peer. func (b *blockManager) NewPeer(peer *peerpkg.Peer) { // Ignore if we are shutting down. @@ -2484,7 +2038,6 @@ func newBlockManager(config *blockManagerConfig) (*blockManager, error) { bmgrLog.Info("Checkpoints are disabled") } - bm.lotteryDataBroadcast = make(map[chainhash.Hash]struct{}) bm.syncHeightMtx.Lock() bm.syncHeight = best.Height bm.syncHeightMtx.Unlock() diff --git a/server.go b/server.go index 1406cf7682..c5e5c36b4c 100644 --- a/server.go +++ b/server.go @@ -82,6 +82,21 @@ const ( // maxCachedNaSubmissions is the maximum number of network address // submissions cached. maxCachedNaSubmissions = 20 + + // maxReorgDepthNotify specifies the maximum reorganization depth for which + // winning ticket notifications will be sent over RPC. The reorg depth is + // the number of blocks that would be reorganized out of the current best + // chain if a side chain being considered for notifications were to + // ultimately be extended to be longer than the current one. + // + // In effect, this helps to prevent large reorgs by refusing to send the + // winning ticket information to RPC clients, such as voting wallets, which + // depend on it to cast votes. + // + // This check also doubles to help reduce exhaustion attacks that could + // otherwise arise from sending old orphan blocks and forcing nodes to do + // expensive lottery data calculations for them. + maxReorgDepthNotify = 6 ) var ( @@ -475,6 +490,15 @@ type server struct { addrIndex *indexers.AddrIndex existsAddrIndex *indexers.ExistsAddrIndex cfIndex *indexers.CFIndex + + // The following fields are used to filter duplicate block announcements. + announcedBlockMtx sync.Mutex + announcedBlock *chainhash.Hash + + // These following fields are used to filter duplicate block lottery data + // anouncements. + lotteryDataBroadcastMtx sync.RWMutex + lotteryDataBroadcast map[chainhash.Hash]struct{} } // serverPeer extends the peer to maintain state shared by the server and @@ -2582,6 +2606,405 @@ func (s *server) UpdatePeerHeights(latestBlkHash *chainhash.Hash, latestHeight i } } +// notifiedWinningTickets returns whether or not the winning tickets +// notification for the specified block hash has already been sent. +func (s *server) notifiedWinningTickets(hash *chainhash.Hash) bool { + s.lotteryDataBroadcastMtx.Lock() + _, beenNotified := s.lotteryDataBroadcast[*hash] + s.lotteryDataBroadcastMtx.Unlock() + return beenNotified +} + +// headerApprovesParent returns whether or not the vote bits in the passed +// header indicate the regular transaction tree of the parent block should be +// considered valid. +func headerApprovesParent(header *wire.BlockHeader) bool { + return dcrutil.IsFlagSet16(header.VoteBits, dcrutil.BlockValid) +} + +// isDoubleSpendOrDuplicateError returns whether or not the passed error, which +// is expected to have come from mempool, indicates a transaction was rejected +// either due to containing a double spend or already existing in the pool. +func isDoubleSpendOrDuplicateError(err error) bool { + switch { + case errors.Is(err, mempool.ErrDuplicate): + return true + case errors.Is(err, mempool.ErrAlreadyExists): + return true + case errors.Is(err, blockchain.ErrMissingTxOut): + return true + } + + return false +} + +// handleBlockchainNotification handles notifications from blockchain. It does +// things such as request orphan block parents and relay accepted blocks to +// connected peers. +func (s *server) handleBlockchainNotification(notification *blockchain.Notification) { + switch notification.Type { + // A block that intends to extend the main chain has passed all sanity and + // contextual checks and the chain is believed to be current. Relay it to + // other peers. + case blockchain.NTNewTipBlockChecked: + // WARNING: The chain lock is not released before sending this + // notification, so care must be taken to avoid calling chain functions + // which could result in a deadlock. + block, ok := notification.Data.(*dcrutil.Block) + if !ok { + bmgrLog.Warnf("New tip block checked notification is not a block.") + break + } + + // Generate the inventory vector and relay it immediately. + iv := wire.NewInvVect(wire.InvTypeBlock, block.Hash()) + s.RelayInventory(iv, block.MsgBlock().Header, true) + s.announcedBlockMtx.Lock() + s.announcedBlock = block.Hash() + s.announcedBlockMtx.Unlock() + + // A block has been accepted into the block chain. Relay it to other peers + // (will be ignored if already relayed via NTNewTipBlockChecked) and + // possibly notify RPC clients with the winning tickets. + case blockchain.NTBlockAccepted: + // Don't relay or notify RPC clients with winning tickets if we are not + // current. Other peers that are current should already know about it + // and clients, such as wallets, shouldn't be voting on old blocks. + if !s.blockManager.IsCurrent() { + return + } + + band, ok := notification.Data.(*blockchain.BlockAcceptedNtfnsData) + if !ok { + bmgrLog.Warnf("Chain accepted notification is not " + + "BlockAcceptedNtfnsData.") + break + } + block := band.Block + + // Send a winning tickets notification as needed. The notification will + // only be sent when the following conditions hold: + // + // - The RPC server is running + // - The block that would build on this one is at or after the height + // voting begins + // - The block that would build on this one would not cause a reorg + // larger than the max reorg notify depth + // - This block is after the final checkpoint height + // - A notification for this block has not already been sent + // + // To help visualize the math here, consider the following two competing + // branches: + // + // 100 -> 101 -> 102 -> 103 -> 104 -> 105 -> 106 + // \-> 101' -> 102' + // + // Further, assume that this is a notification for block 103', or in + // other words, it is extending the shorter side chain. The reorg depth + // would be 106 - (103 - 3) = 6. This should intuitively make sense, + // because if the side chain were to be extended enough to become the + // best chain, it would result in a reorg that would remove 6 blocks, + // namely blocks 101, 102, 103, 104, 105, and 106. + blockHash := block.Hash() + bestHeight := band.BestHeight + blockHeight := int64(block.MsgBlock().Header.Height) + reorgDepth := bestHeight - (blockHeight - band.ForkLen) + if s.rpcServer != nil && + blockHeight >= s.chainParams.StakeValidationHeight-1 && + reorgDepth < maxReorgDepthNotify && + blockHeight > s.chainParams.LatestCheckpointHeight() && + !s.notifiedWinningTickets(blockHash) { + + // Obtain the winning tickets for this block. handleNotifyMsg + // should be safe for concurrent access of things contained within + // blockchain. + wt, _, _, err := s.chain.LotteryDataForBlock(blockHash) + if err != nil { + bmgrLog.Errorf("Couldn't calculate winning tickets for "+ + "accepted block %v: %v", blockHash, err.Error()) + } else { + // Notify registered websocket clients of newly eligible tickets + // to vote on. + s.rpcServer.NotifyWinningTickets(&rpcserver.WinningTicketsNtfnData{ + BlockHash: *blockHash, + BlockHeight: blockHeight, + Tickets: wt, + }) + + s.lotteryDataBroadcastMtx.Lock() + s.lotteryDataBroadcast[*blockHash] = struct{}{} + s.lotteryDataBroadcastMtx.Unlock() + } + } + + // Generate the inventory vector and relay it immediately if not already + // known to have been sent in NTNewTipBlockChecked. + s.announcedBlockMtx.Lock() + sent := s.announcedBlock != nil && *s.announcedBlock == *blockHash + s.announcedBlock = nil + s.announcedBlockMtx.Unlock() + if !sent { + iv := wire.NewInvVect(wire.InvTypeBlock, blockHash) + s.RelayInventory(iv, block.MsgBlock().Header, true) + } + + // Inform the background block template generator about the accepted + // block. + if s.bg != nil { + s.bg.BlockAccepted(block) + } + + if !s.feeEstimator.IsEnabled() { + // fee estimation can only start after we have performed an initial + // sync, otherwise we'll start adding mempool transactions at the + // wrong height. + s.feeEstimator.Enable(block.Height()) + } + + // A block has been connected to the main block chain. + case blockchain.NTBlockConnected: + ntfn, ok := notification.Data.(*blockchain.BlockConnectedNtfnsData) + if !ok { + bmgrLog.Warnf("Block connected notification is not " + + "BlockConnectedNtfnsData.") + break + } + block := ntfn.Block + parentBlock := ntfn.ParentBlock + isTreasuryEnabled := ntfn.IsTreasuryActive + + // Account for transactions mined in the newly connected block for fee + // estimation. This must be done before attempting to remove + // transactions from the mempool because the mempool will alert the + // estimator of the txs that are leaving + s.feeEstimator.ProcessBlock(block) + + // TODO: In the case the new tip disapproves the previous block, any + // transactions the previous block contains in its regular tree which + // double spend the same inputs as transactions in either tree of the + // current tip should ideally be tracked in the pool as eligible for + // inclusion in an alternative tip (side chain block) in case the + // current tip block does not get enough votes. However, the + // transaction pool currently does not provide any way to distinguish + // this condition and thus only provides tracking based on the current + // tip. In order to handle this condition, the pool would have to + // provide a way to track and independently query which txns are + // eligible based on the current tip both approving and disapproving the + // previous block as well as the previous block itself. + + // Remove all of the regular and stake transactions in the connected + // block from the transaction pool. Also, remove any transactions which + // are now double spends as a result of these new transactions. + // Finally, remove any transaction that is no longer an orphan. + // Transactions which depend on a confirmed transaction are NOT removed + // recursively because they are still valid. Also, the coinbase of the + // regular tx tree is skipped because the transaction pool doesn't (and + // can't) have regular tree coinbase transactions in it. + // + // Also, in the case the RPC server is enabled, stop rebroadcasting any + // transactions in the block that were setup to be rebroadcast. + txMemPool := s.txMemPool + handleConnectedBlockTxns := func(txns []*dcrutil.Tx) { + for _, tx := range txns { + txMemPool.RemoveTransaction(tx, false, isTreasuryEnabled) + txMemPool.MaybeAcceptDependents(tx, isTreasuryEnabled) + txMemPool.RemoveDoubleSpends(tx, isTreasuryEnabled) + txMemPool.RemoveOrphan(tx, isTreasuryEnabled) + acceptedTxs := txMemPool.ProcessOrphans(tx, isTreasuryEnabled) + s.AnnounceNewTransactions(acceptedTxs) + + // Now that this block is in the blockchain, mark the + // transaction (except the coinbase) as no longer needing + // rebroadcasting. + s.TransactionConfirmed(tx) + } + } + + // Add regular transactions back to the mempool, excluding the coinbase + // since it does not belong in the mempool. + handleConnectedBlockTxns(block.Transactions()[1:]) + if isTreasuryEnabled { + // Skip treasurybase + handleConnectedBlockTxns(block.STransactions()[1:]) + } else { + handleConnectedBlockTxns(block.STransactions()) + } + + // In the case the regular tree of the previous block was disapproved, + // add all of the its transactions, with the exception of the coinbase, + // back to the transaction pool to be mined in a future block. + // + // Notice that some of those transactions might have been included in + // the current block and others might also be spending some of the same + // outputs that transactions in the previous originally block spent. + // This is the expected behavior because disapproval of the regular tree + // of the previous block essentially makes it as if those transactions + // never happened. + // + // Finally, if transactions fail to add to the pool for some reason + // other than the pool already having it (a duplicate) or now being a + // double spend, remove all transactions that depend on it as well. + // The dependents are not removed for double spends because the only + // way a transaction which was not a double spend in the previous block + // to now be one is due to some transaction in the current block + // (probably the same one) also spending those outputs, and, in that + // case, anything that happens to be in the pool which depends on the + // transaction is still valid. + if !headerApprovesParent(&block.MsgBlock().Header) { + for _, tx := range parentBlock.Transactions()[1:] { + _, err := txMemPool.MaybeAcceptTransaction(tx, false, true) + if err != nil && !isDoubleSpendOrDuplicateError(err) { + txMemPool.RemoveTransaction(tx, true, isTreasuryEnabled) + } + } + } + + if r := s.rpcServer; r != nil { + // Filter and update the rebroadcast inventory. + s.PruneRebroadcastInventory() + + // Notify registered websocket clients of incoming block. + r.NotifyBlockConnected(block) + } + + if s.bg != nil { + s.bg.BlockConnected(block) + } + + // Stake tickets are spent or missed from the most recently connected block. + case blockchain.NTSpentAndMissedTickets: + tnd, ok := notification.Data.(*blockchain.TicketNotificationsData) + if !ok { + bmgrLog.Warnf("Tickets connected notification is not " + + "TicketNotificationsData") + break + } + + if r := s.rpcServer; r != nil { + r.NotifySpentAndMissedTickets(tnd) + } + + // Stake tickets are matured from the most recently connected block. + case blockchain.NTNewTickets: + tnd, ok := notification.Data.(*blockchain.TicketNotificationsData) + if !ok { + bmgrLog.Warnf("Tickets connected notification is not " + + "TicketNotificationsData") + break + } + + if r := s.rpcServer; r != nil { + r.NotifyNewTickets(tnd) + } + + // A block has been disconnected from the main block chain. + case blockchain.NTBlockDisconnected: + ntfn, ok := notification.Data.(*blockchain.BlockDisconnectedNtfnsData) + if !ok { + bmgrLog.Warnf("Block disconnected notification is not " + + "BlockDisconnectedNtfnsData.") + break + } + block := ntfn.Block + parentBlock := ntfn.ParentBlock + isTreasuryEnabled := ntfn.IsTreasuryActive + + // In the case the regular tree of the previous block was disapproved, + // disconnecting the current block makes all of those transactions valid + // again. Thus, with the exception of the coinbase, remove all of those + // transactions and any that are now double spends from the transaction + // pool. Transactions which depend on a confirmed transaction are NOT + // removed recursively because they are still valid. + txMemPool := s.txMemPool + if !headerApprovesParent(&block.MsgBlock().Header) { + for _, tx := range parentBlock.Transactions()[1:] { + txMemPool.RemoveTransaction(tx, false, isTreasuryEnabled) + txMemPool.MaybeAcceptDependents(tx, isTreasuryEnabled) + txMemPool.RemoveDoubleSpends(tx, isTreasuryEnabled) + txMemPool.RemoveOrphan(tx, isTreasuryEnabled) + txMemPool.ProcessOrphans(tx, isTreasuryEnabled) + } + } + + // Add all of the regular and stake transactions in the disconnected + // block, with the exception of the regular tree coinbase, back to the + // transaction pool to be mined in a future block. + // + // Notice that, in the case the previous block was disapproved, some of + // the transactions in the block being disconnected might have been + // included in the previous block and others might also have been + // spending some of the same outputs. This is the expected behavior + // because disapproval of the regular tree of the previous block + // essentially makes it as if those transactions never happened, so + // disconnecting the block that disapproved those transactions + // effectively revives them. + // + // Finally, if transactions fail to add to the pool for some reason + // other than the pool already having it (a duplicate) or now being a + // double spend, remove all transactions that depend on it as well. + // The dependents are not removed for double spends because the only + // way a transaction which was not a double spend in the block being + // disconnected to now be one is due to some transaction in the previous + // block (probably the same one), which was disapproved, also spending + // those outputs, and, in that case, anything that happens to be in the + // pool which depends on the transaction is still valid. + handleDisconnectedBlockTxns := func(txns []*dcrutil.Tx) { + for _, tx := range txns { + _, err := txMemPool.MaybeAcceptTransaction(tx, false, true) + if err != nil && !isDoubleSpendOrDuplicateError(err) { + txMemPool.RemoveTransaction(tx, true, isTreasuryEnabled) + } + } + } + handleDisconnectedBlockTxns(block.Transactions()[1:]) + + if isTreasuryEnabled { + // Skip treasurybase + handleDisconnectedBlockTxns(block.STransactions()[1:]) + } else { + handleDisconnectedBlockTxns(block.STransactions()) + } + + if s.bg != nil { + s.bg.BlockDisconnected(block) + } + + // Notify registered websocket clients. + if r := s.rpcServer; r != nil { + // Filter and update the rebroadcast inventory. + s.PruneRebroadcastInventory() + + // Notify registered websocket clients. + r.NotifyBlockDisconnected(block) + } + + // Chain reorganization has commenced. + case blockchain.NTChainReorgStarted: + if s.bg != nil { + s.bg.ChainReorgStarted() + } + + // Chain reorganization has concluded. + case blockchain.NTChainReorgDone: + if s.bg != nil { + s.bg.ChainReorgDone() + } + + // The blockchain is reorganizing. + case blockchain.NTReorganization: + rd, ok := notification.Data.(*blockchain.ReorganizationNtfnsData) + if !ok { + bmgrLog.Warnf("Chain reorganization notification is malformed") + break + } + + // Notify registered websocket clients. + if r := s.rpcServer; r != nil { + r.NotifyReorganization(rd) + } + } +} + // rebroadcastHandler keeps track of user submitted inventories that we have // sent out but have not yet made it into a block. We periodically rebroadcast // them in case our peers restarted or otherwise lost track of them. @@ -3073,6 +3496,7 @@ func newServer(ctx context.Context, listenAddrs []string, db database.DB, chainP services: services, sigCache: sigCache, subsidyCache: standalone.NewSubsidyCache(chainParams), + lotteryDataBroadcast: make(map[chainhash.Hash]struct{}), } // Create the transaction and address indexes if needed. @@ -3147,18 +3571,14 @@ func newServer(ctx context.Context, listenAddrs []string, db database.DB, chainP // Create a new block chain instance with the appropriate configuration. s.chain, err = blockchain.New(ctx, &blockchain.Config{ - DB: s.db, - ChainParams: s.chainParams, - Checkpoints: checkpoints, - TimeSource: s.timeSource, - Notifications: func(notification *blockchain.Notification) { - if s.blockManager != nil { - s.blockManager.handleBlockchainNotification(notification) - } - }, - SigCache: s.sigCache, - SubsidyCache: s.subsidyCache, - IndexManager: indexManager, + DB: s.db, + ChainParams: s.chainParams, + Checkpoints: checkpoints, + TimeSource: s.timeSource, + Notifications: s.handleBlockchainNotification, + SigCache: s.sigCache, + SubsidyCache: s.subsidyCache, + IndexManager: indexManager, }) if err != nil { return nil, err @@ -3231,20 +3651,12 @@ func newServer(ctx context.Context, listenAddrs []string, db database.DB, chainP } s.txMemPool = mempool.New(&txC) s.blockManager, err = newBlockManager(&blockManagerConfig{ - PeerNotifier: &s, - Chain: s.chain, - ChainParams: s.chainParams, - SigCache: s.sigCache, - SubsidyCache: s.subsidyCache, - FeeEstimator: s.feeEstimator, - TxMemPool: s.txMemPool, - BgBlkTmplGenerator: nil, // Created later. - NotifyWinningTickets: func(wtnd *rpcserver.WinningTicketsNtfnData) { - if s.rpcServer != nil { - s.rpcServer.NotifyWinningTickets(wtnd) - } - }, - PruneRebroadcastInventory: s.PruneRebroadcastInventory, + PeerNotifier: &s, + Chain: s.chain, + ChainParams: s.chainParams, + SigCache: s.sigCache, + SubsidyCache: s.subsidyCache, + TxMemPool: s.txMemPool, RpcServer: func() *rpcserver.Server { return s.rpcServer }, @@ -3329,7 +3741,6 @@ func newServer(ctx context.Context, listenAddrs []string, db database.DB, chainP s.bg = mining.NewBgBlkTmplGenerator(tg, cfg.miningAddrs, cfg.AllowUnsyncedMining) - s.blockManager.cfg.BgBlkTmplGenerator = s.bg s.cpuMiner = cpuminer.New(&cpuminer.Config{ ChainParams: s.chainParams,