From c2c4ecd85531e4189dfc8dde8c507a8edc496081 Mon Sep 17 00:00:00 2001 From: lklimek <842586+lklimek@users.noreply.github.com> Date: Wed, 13 Mar 2024 13:26:43 +0100 Subject: [PATCH] refactor(mempool): use tmsync.Waker when txs are available (#761) * refactor(mempool): use tmsync.Waker for available txs notifications * fix(mempool): panic due to uninitialized txsAvailable * chore: formatting * chore(mempool): remove not needed notifiedTxsAvailable * Revert "chore(mempool): remove not needed notifiedTxsAvailable" This reverts commit cfcd45ccb4ee45ce4712086fc4b44174afe3dd01. * chore: reset notifiedTxsAvailable only when height is changed * revert: need locking on notifyTxsAvailable --- internal/consensus/state.go | 2 ++ internal/consensus/vote_signer.go | 4 ++- internal/mempool/mempool.go | 48 +++++++++++++++++++++++-------- internal/mempool/reactor.go | 2 +- 4 files changed, 42 insertions(+), 14 deletions(-) diff --git a/internal/consensus/state.go b/internal/consensus/state.go index edf21edf1..48dc56934 100644 --- a/internal/consensus/state.go +++ b/internal/consensus/state.go @@ -768,6 +768,8 @@ func (cs *State) handleTimeout( } func (cs *State) handleTxsAvailable(ctx context.Context, stateData *StateData) { + // TODO: Change to trace + cs.logger.Debug("new transactions are available", "height", stateData.Height, "round", stateData.Round, "step", stateData.Step) // We only need to do this for round 0. if stateData.Round != 0 { return diff --git a/internal/consensus/vote_signer.go b/internal/consensus/vote_signer.go index 62ef6cee3..e64926114 100644 --- a/internal/consensus/vote_signer.go +++ b/internal/consensus/vote_signer.go @@ -34,7 +34,9 @@ func (s *voteSigner) signAddVote( } // If the node not in the validator set, do nothing. if !stateData.Validators.HasProTxHash(s.privValidator.ProTxHash) { - s.logger.Error("do nothing, node is not a part of validator set") + s.logger.Error("do nothing, node is not a part of validator set", + "protxhash", s.privValidator.ProTxHash.ShortString(), + "validators", stateData.Validators) return nil } keyVals := []any{"height", stateData.Height, "round", stateData.Round, "quorum_hash", stateData.Validators.QuorumHash} diff --git a/internal/mempool/mempool.go b/internal/mempool/mempool.go index 899dbd3f6..92f788373 100644 --- a/internal/mempool/mempool.go +++ b/internal/mempool/mempool.go @@ -16,6 +16,7 @@ import ( "github.com/dashpay/tenderdash/config" "github.com/dashpay/tenderdash/internal/libs/clist" tmstrings "github.com/dashpay/tenderdash/internal/libs/strings" + tmsync "github.com/dashpay/tenderdash/internal/libs/sync" "github.com/dashpay/tenderdash/libs/log" "github.com/dashpay/tenderdash/types" ) @@ -48,10 +49,12 @@ type TxMempool struct { // Synchronized fields, protected by mtx. mtx *sync.RWMutex notifiedTxsAvailable bool - txsAvailable chan struct{} // one value sent per height when mempool is not empty - preCheck PreCheckFunc - postCheck PostCheckFunc - height int64 // the latest height passed to Update + // txsAvailable is a waker that triggers when transactions are available in the mempool. + // Can be nil if not enabled with EnableTxsAvailable. + txsAvailable *tmsync.Waker + preCheck PreCheckFunc + postCheck PostCheckFunc + height int64 // the latest height passed to Update txs *clist.CList // valid transactions (passed CheckTx) txByKey map[types.TxKey]*clist.CElement @@ -81,6 +84,7 @@ func NewTxMempool( txByKey: make(map[types.TxKey]*clist.CElement), txBySender: make(map[string]*clist.CElement), } + if cfg.CacheSize > 0 { txmp.cache = NewLRUTxCache(cfg.CacheSize) } @@ -149,12 +153,26 @@ func (txmp *TxMempool) EnableTxsAvailable() { txmp.mtx.Lock() defer txmp.mtx.Unlock() - txmp.txsAvailable = make(chan struct{}, 1) + if txmp.txsAvailable != nil { + if err := txmp.txsAvailable.Close(); err != nil { + txmp.logger.Error("failed to close txsAvailable", "err", err) + } + } + txmp.txsAvailable = tmsync.NewWaker() } // TxsAvailable returns a channel which fires once for every height, and only // when transactions are available in the mempool. It is thread-safe. -func (txmp *TxMempool) TxsAvailable() <-chan struct{} { return txmp.txsAvailable } +// +// Note: returned channel might never close if EnableTxsAvailable() was not called before +// calling this function. +func (txmp *TxMempool) TxsAvailable() <-chan struct{} { + if txmp.txsAvailable == nil { + return make(<-chan struct{}) + } + + return txmp.txsAvailable.Sleep() +} // CheckTx adds the given transaction to the mempool if it fits and passes the // application's ABCI CheckTx method. @@ -397,8 +415,10 @@ func (txmp *TxMempool) Update( len(blockTxs), len(deliverTxResponses))) } - txmp.height = blockHeight - txmp.notifiedTxsAvailable = false + if txmp.height != blockHeight { + txmp.height = blockHeight + txmp.notifiedTxsAvailable = false + } if newPreFn != nil { txmp.preCheck = newPreFn @@ -774,8 +794,10 @@ func (txmp *TxMempool) recheckTransactions(ctx context.Context) { // When recheck is complete, trigger a notification for more transactions. _ = g.Wait() + txmp.mtx.Lock() defer txmp.mtx.Unlock() + txmp.notifyTxsAvailable() }() } @@ -830,6 +852,11 @@ func (txmp *TxMempool) purgeExpiredTxs(blockHeight int64) { } } +// notifyTxsAvailable triggers a notification that transactions are available in +// the mempool. It is a no-op if the mempool is empty or if a notification has +// already been sent. +// +// No locking is required to call this method. func (txmp *TxMempool) notifyTxsAvailable() { if txmp.Size() == 0 { return // nothing to do @@ -839,9 +866,6 @@ func (txmp *TxMempool) notifyTxsAvailable() { // channel cap is 1, so this will send once txmp.notifiedTxsAvailable = true - select { - case txmp.txsAvailable <- struct{}{}: - default: - } + txmp.txsAvailable.Wake() } } diff --git a/internal/mempool/reactor.go b/internal/mempool/reactor.go index 83c9de61b..b653f46d9 100644 --- a/internal/mempool/reactor.go +++ b/internal/mempool/reactor.go @@ -29,7 +29,7 @@ type Reactor struct { cfg *config.MempoolConfig mempool *TxMempool - ids *IDs + ids *IDs // Peer IDs assigned for peers peerEvents p2p.PeerEventSubscriber p2pClient *client.Client