Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core/txpool, eth/catalyst: clear transaction pool in Rollback #30534

Merged
merged 10 commits into from
Nov 19, 2024
50 changes: 50 additions & 0 deletions core/txpool/blobpool/blobpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -1714,3 +1714,53 @@ func (p *BlobPool) Status(hash common.Hash) txpool.TxStatus {
}
return txpool.TxStatusUnknown
}

// Clear implements txpool.SubPool, removing all tracked transactions
// from the blob pool and persistent store.
func (p *BlobPool) Clear() {
p.lock.Lock()
defer p.lock.Unlock()

// manually iterating and deleting every entry is super sub-optimal
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably iterate and unreserve the addresses here. It's less optimal that recreating the reservations map in the parent pool, but that's very sensitive datarace wise, so it's simpler to iterate for now.

// However, Clear is not currently used in production so
// performance is not critical at the moment.
for hash := range p.lookup.txIndex {
id, _ := p.lookup.storeidOfTx(hash)
if err := p.store.Delete(id); err != nil {
log.Warn("failed to delete blob tx from backing store", "err", err)
}
}
for hash := range p.lookup.blobIndex {
id, _ := p.lookup.storeidOfBlob(hash)
if err := p.store.Delete(id); err != nil {
log.Warn("failed to delete blob from backing store", "err", err)
}
}

// unreserve each tracked account. Ideally, we could just clear the
// reservation map in the parent txpool context. However, if we clear in
// parent context, to avoid exposing the subpool lock, we have to lock the
// reservations and then lock each subpool.
//
// This creates the potential for a deadlock situation:
//
// * TxPool.Clear locks the reservations
// * a new transaction is received which locks the subpool mutex
// * TxPool.Clear attempts to lock subpool mutex
//
// The transaction addition may attempt to reserve the sender addr which
// can't happen until Clear releases the reservation lock. Clear cannot
// acquire the subpool lock until the transaction addition is completed.
for acct, _ := range p.index {
p.reserve(acct, false)
}
p.lookup = newLookup()
p.index = make(map[common.Address][]*blobTxMeta)
p.spent = make(map[common.Address]*uint256.Int)

var (
basefee = uint256.MustFromBig(eip1559.CalcBaseFee(p.chain.Config(), p.head))
blobfee = uint256.NewInt(params.BlobTxMinBlobGasprice)
)
p.evict = newPriceHeap(basefee, blobfee, p.index)
}
41 changes: 41 additions & 0 deletions core/txpool/legacypool/legacypool.go
Original file line number Diff line number Diff line change
Expand Up @@ -1961,3 +1961,44 @@ func (t *lookup) RemotesBelowTip(threshold *big.Int) types.Transactions {
func numSlots(tx *types.Transaction) int {
return int((tx.Size() + txSlotSize - 1) / txSlotSize)
}

// Clear implements txpool.SubPool, removing all tracked txs from the pool
// and rotating the journal.
func (pool *LegacyPool) Clear() {
pool.mu.Lock()
defer pool.mu.Unlock()

// unreserve each tracked account. Ideally, we could just clear the
// reservation map in the parent txpool context. However, if we clear in
// parent context, to avoid exposing the subpool lock, we have to lock the
// reservations and then lock each subpool.
//
// This creates the potential for a deadlock situation:
//
// * TxPool.Clear locks the reservations
// * a new transaction is received which locks the subpool mutex
// * TxPool.Clear attempts to lock subpool mutex
//
// The transaction addition may attempt to reserve the sender addr which
// can't happen until Clear releases the reservation lock. Clear cannot
// acquire the subpool lock until the transaction addition is completed.
for _, tx := range pool.all.remotes {
senderAddr, _ := types.Sender(pool.signer, tx)
pool.reserve(senderAddr, false)
}
for localSender, _ := range pool.locals.accounts {
pool.reserve(localSender, false)
}

pool.all = newLookup()
pool.priced = newPricedList(pool.all)
pool.pending = make(map[common.Address]*list)
pool.queue = make(map[common.Address]*list)

if !pool.config.NoLocals && pool.config.Journal != "" {
pool.journal = newTxJournal(pool.config.Journal)
if err := pool.journal.rotate(pool.local()); err != nil {
log.Warn("Failed to rotate transaction journal", "err", err)
}
}
}
3 changes: 3 additions & 0 deletions core/txpool/subpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,4 +168,7 @@ type SubPool interface {
// Status returns the known status (unknown/pending/queued) of a transaction
// identified by their hashes.
Status(hash common.Hash) TxStatus

// Clear removes all tracked transactions from the pool
Clear()
}
7 changes: 7 additions & 0 deletions core/txpool/txpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -497,3 +497,10 @@ func (p *TxPool) Sync() error {
return errors.New("pool already terminated")
}
}

// Clear removes all tracked txs from the subpools.
func (p *TxPool) Clear() {
for _, subpool := range p.subpools {
subpool.Clear()
}
}
jwasinger marked this conversation as resolved.
Show resolved Hide resolved
9 changes: 1 addition & 8 deletions eth/catalyst/simulated_beacon.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"crypto/sha256"
"errors"
"fmt"
"math/big"
"sync"
"time"

Expand All @@ -34,7 +33,6 @@ import (
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rpc"
)

Expand Down Expand Up @@ -287,12 +285,7 @@ func (c *SimulatedBeacon) Commit() common.Hash {

// Rollback un-sends previously added transactions.
func (c *SimulatedBeacon) Rollback() {
// Flush all transactions from the transaction pools
maxUint256 := new(big.Int).Sub(new(big.Int).Lsh(common.Big1, 256), common.Big1)
c.eth.TxPool().SetGasTip(maxUint256)
// Set the gas tip back to accept new transactions
// TODO (Marius van der Wijden): set gas tip to parameter passed by config
c.eth.TxPool().SetGasTip(big.NewInt(params.GWei))
c.eth.TxPool().Clear()
}

// Fork sets the head to the provided hash.
Expand Down