-
Notifications
You must be signed in to change notification settings - Fork 20.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
core: use a wrapped map w/ sync.RWMutex
for TxPool.all
to remove contention in TxPool.Get
.
#16670
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -200,11 +200,11 @@ type TxPool struct { | |
locals *accountSet // Set of local transaction to exempt from eviction rules | ||
journal *txJournal // Journal of local transaction to back up to disk | ||
|
||
pending map[common.Address]*txList // All currently processable transactions | ||
queue map[common.Address]*txList // Queued but non-processable transactions | ||
beats map[common.Address]time.Time // Last heartbeat from each known account | ||
all map[common.Hash]*types.Transaction // All transactions to allow lookups | ||
priced *txPricedList // All transactions sorted by price | ||
pending map[common.Address]*txList // All currently processable transactions | ||
queue map[common.Address]*txList // Queued but non-processable transactions | ||
beats map[common.Address]time.Time // Last heartbeat from each known account | ||
all *txLookup // All transactions to allow lookups | ||
priced *txPricedList // All transactions sorted by price | ||
|
||
wg sync.WaitGroup // for shutdown sync | ||
|
||
|
@@ -226,12 +226,12 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain block | |
pending: make(map[common.Address]*txList), | ||
queue: make(map[common.Address]*txList), | ||
beats: make(map[common.Address]time.Time), | ||
all: make(map[common.Hash]*types.Transaction), | ||
all: newTxLookup(), | ||
chainHeadCh: make(chan ChainHeadEvent, chainHeadChanSize), | ||
gasPrice: new(big.Int).SetUint64(config.PriceLimit), | ||
} | ||
pool.locals = newAccountSet(pool.signer) | ||
pool.priced = newTxPricedList(&pool.all) | ||
pool.priced = newTxPricedList(pool.all) | ||
pool.reset(nil, chain.CurrentBlock().Header()) | ||
|
||
// If local transactions and journaling is enabled, load from disk | ||
|
@@ -605,7 +605,7 @@ func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error { | |
func (pool *TxPool) add(tx *types.Transaction, local bool) (bool, error) { | ||
// If the transaction is already known, discard it | ||
hash := tx.Hash() | ||
if pool.all[hash] != nil { | ||
if pool.all.Get(hash) != nil { | ||
log.Trace("Discarding already known transaction", "hash", hash) | ||
return false, fmt.Errorf("known transaction: %x", hash) | ||
} | ||
|
@@ -616,15 +616,15 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (bool, error) { | |
return false, err | ||
} | ||
// If the transaction pool is full, discard underpriced transactions | ||
if uint64(len(pool.all)) >= pool.config.GlobalSlots+pool.config.GlobalQueue { | ||
if uint64(pool.all.Count()) >= pool.config.GlobalSlots+pool.config.GlobalQueue { | ||
// If the new transaction is underpriced, don't accept it | ||
if !local && pool.priced.Underpriced(tx, pool.locals) { | ||
log.Trace("Discarding underpriced transaction", "hash", hash, "price", tx.GasPrice()) | ||
underpricedTxCounter.Inc(1) | ||
return false, ErrUnderpriced | ||
} | ||
// New transaction is better than our worse ones, make room for it | ||
drop := pool.priced.Discard(len(pool.all)-int(pool.config.GlobalSlots+pool.config.GlobalQueue-1), pool.locals) | ||
drop := pool.priced.Discard(pool.all.Count()-int(pool.config.GlobalSlots+pool.config.GlobalQueue-1), pool.locals) | ||
for _, tx := range drop { | ||
log.Trace("Discarding freshly underpriced transaction", "hash", tx.Hash(), "price", tx.GasPrice()) | ||
underpricedTxCounter.Inc(1) | ||
|
@@ -642,11 +642,11 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (bool, error) { | |
} | ||
// New transaction is better, replace old one | ||
if old != nil { | ||
delete(pool.all, old.Hash()) | ||
pool.all.Remove(old.Hash()) | ||
pool.priced.Removed() | ||
pendingReplaceCounter.Inc(1) | ||
} | ||
pool.all[tx.Hash()] = tx | ||
pool.all.Add(tx) | ||
pool.priced.Put(tx) | ||
pool.journalTx(from, tx) | ||
|
||
|
@@ -689,12 +689,12 @@ func (pool *TxPool) enqueueTx(hash common.Hash, tx *types.Transaction) (bool, er | |
} | ||
// Discard any previous transaction and mark this | ||
if old != nil { | ||
delete(pool.all, old.Hash()) | ||
pool.all.Remove(old.Hash()) | ||
pool.priced.Removed() | ||
queuedReplaceCounter.Inc(1) | ||
} | ||
if pool.all[hash] == nil { | ||
pool.all[hash] = tx | ||
if pool.all.Get(hash) == nil { | ||
pool.all.Add(tx) | ||
pool.priced.Put(tx) | ||
} | ||
return old != nil, nil | ||
|
@@ -725,22 +725,22 @@ func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.T | |
inserted, old := list.Add(tx, pool.config.PriceBump) | ||
if !inserted { | ||
// An older transaction was better, discard this | ||
delete(pool.all, hash) | ||
pool.all.Remove(hash) | ||
pool.priced.Removed() | ||
|
||
pendingDiscardCounter.Inc(1) | ||
return | ||
} | ||
// Otherwise discard any previous transaction and mark this | ||
if old != nil { | ||
delete(pool.all, old.Hash()) | ||
pool.all.Remove(old.Hash()) | ||
pool.priced.Removed() | ||
|
||
pendingReplaceCounter.Inc(1) | ||
} | ||
// Failsafe to work around direct pending inserts (tests) | ||
if pool.all[hash] == nil { | ||
pool.all[hash] = tx | ||
if pool.all.Get(hash) == nil { | ||
pool.all.Add(tx) | ||
pool.priced.Put(tx) | ||
} | ||
// Set the potentially new pending nonce and notify any subsystems of the new tx | ||
|
@@ -839,7 +839,7 @@ func (pool *TxPool) Status(hashes []common.Hash) []TxStatus { | |
|
||
status := make([]TxStatus, len(hashes)) | ||
for i, hash := range hashes { | ||
if tx := pool.all[hash]; tx != nil { | ||
if tx := pool.all.Get(hash); tx != nil { | ||
from, _ := types.Sender(pool.signer, tx) // already validated | ||
if pool.pending[from] != nil && pool.pending[from].txs.items[tx.Nonce()] != nil { | ||
status[i] = TxStatusPending | ||
|
@@ -854,24 +854,21 @@ func (pool *TxPool) Status(hashes []common.Hash) []TxStatus { | |
// Get returns a transaction if it is contained in the pool | ||
// and nil otherwise. | ||
func (pool *TxPool) Get(hash common.Hash) *types.Transaction { | ||
pool.mu.RLock() | ||
defer pool.mu.RUnlock() | ||
|
||
return pool.all[hash] | ||
return pool.all.Get(hash) | ||
} | ||
|
||
// removeTx removes a single transaction from the queue, moving all subsequent | ||
// transactions back to the future queue. | ||
func (pool *TxPool) removeTx(hash common.Hash, outofbound bool) { | ||
// Fetch the transaction we wish to delete | ||
tx, ok := pool.all[hash] | ||
if !ok { | ||
tx := pool.all.Get(hash) | ||
if tx == nil { | ||
return | ||
} | ||
addr, _ := types.Sender(pool.signer, tx) // already validated during insertion | ||
|
||
// Remove it from the list of known transactions | ||
delete(pool.all, hash) | ||
pool.all.Remove(hash) | ||
if outofbound { | ||
pool.priced.Removed() | ||
} | ||
|
@@ -924,15 +921,15 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) { | |
for _, tx := range list.Forward(pool.currentState.GetNonce(addr)) { | ||
hash := tx.Hash() | ||
log.Trace("Removed old queued transaction", "hash", hash) | ||
delete(pool.all, hash) | ||
pool.all.Remove(hash) | ||
pool.priced.Removed() | ||
} | ||
// Drop all transactions that are too costly (low balance or out of gas) | ||
drops, _ := list.Filter(pool.currentState.GetBalance(addr), pool.currentMaxGas) | ||
for _, tx := range drops { | ||
hash := tx.Hash() | ||
log.Trace("Removed unpayable queued transaction", "hash", hash) | ||
delete(pool.all, hash) | ||
pool.all.Remove(hash) | ||
pool.priced.Removed() | ||
queuedNofundsCounter.Inc(1) | ||
} | ||
|
@@ -946,7 +943,7 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) { | |
if !pool.locals.contains(addr) { | ||
for _, tx := range list.Cap(int(pool.config.AccountQueue)) { | ||
hash := tx.Hash() | ||
delete(pool.all, hash) | ||
pool.all.Remove(hash) | ||
pool.priced.Removed() | ||
queuedRateLimitCounter.Inc(1) | ||
log.Trace("Removed cap-exceeding queued transaction", "hash", hash) | ||
|
@@ -991,7 +988,7 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) { | |
for _, tx := range list.Cap(list.Len() - 1) { | ||
// Drop the transaction from the global pools too | ||
hash := tx.Hash() | ||
delete(pool.all, hash) | ||
pool.all.Remove(hash) | ||
pool.priced.Removed() | ||
|
||
// Update the account nonce to the dropped transaction | ||
|
@@ -1013,7 +1010,7 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) { | |
for _, tx := range list.Cap(list.Len() - 1) { | ||
// Drop the transaction from the global pools too | ||
hash := tx.Hash() | ||
delete(pool.all, hash) | ||
pool.all.Remove(hash) | ||
pool.priced.Removed() | ||
|
||
// Update the account nonce to the dropped transaction | ||
|
@@ -1082,15 +1079,15 @@ func (pool *TxPool) demoteUnexecutables() { | |
for _, tx := range list.Forward(nonce) { | ||
hash := tx.Hash() | ||
log.Trace("Removed old pending transaction", "hash", hash) | ||
delete(pool.all, hash) | ||
pool.all.Remove(hash) | ||
pool.priced.Removed() | ||
} | ||
// Drop all transactions that are too costly (low balance or out of gas), and queue any invalids back for later | ||
drops, invalids := list.Filter(pool.currentState.GetBalance(addr), pool.currentMaxGas) | ||
for _, tx := range drops { | ||
hash := tx.Hash() | ||
log.Trace("Removed unpayable pending transaction", "hash", hash) | ||
delete(pool.all, hash) | ||
pool.all.Remove(hash) | ||
pool.priced.Removed() | ||
pendingNofundsCounter.Inc(1) | ||
} | ||
|
@@ -1162,3 +1159,68 @@ func (as *accountSet) containsTx(tx *types.Transaction) bool { | |
func (as *accountSet) add(addr common.Address) { | ||
as.accounts[addr] = struct{}{} | ||
} | ||
|
||
// txLookup is used to track all transactions to allow lookups without contention | ||
type txLookup struct { | ||
all map[common.Hash]*types.Transaction | ||
lock sync.RWMutex | ||
} | ||
|
||
func newTxLookup() *txLookup { | ||
return &txLookup{ | ||
all: make(map[common.Hash]*types.Transaction), | ||
} | ||
} | ||
|
||
// calls f on each key and value present in the map | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please add the method name too when documenting, that's the general Go style. Eg. `// Range calls f on each key and value present in the map. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (Please do this and the punctuation mark for every method). |
||
func (t *txLookup) Range(f func(hash common.Hash, tx *types.Transaction) bool) { | ||
t.lock.RLock() | ||
defer t.lock.RUnlock() | ||
|
||
for key, value := range t.all { | ||
if !f(key, value) { | ||
break | ||
} | ||
} | ||
} | ||
|
||
// returns a transaction if it exists in the lookup, or nil if not found | ||
func (t *txLookup) Get(hash common.Hash) *types.Transaction { | ||
t.lock.RLock() | ||
defer t.lock.RUnlock() | ||
|
||
return t.all[hash] | ||
} | ||
|
||
// returns a transaction if it exists in the lookup, and a bool indicating if it was found | ||
func (t *txLookup) Find(hash common.Hash) (*types.Transaction, bool) { | ||
t.lock.RLock() | ||
defer t.lock.RUnlock() | ||
|
||
value, ok := t.all[hash] | ||
return value, ok | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think we really need this method. |
||
|
||
// returns the current number of items in the lookup | ||
func (t *txLookup) Count() int { | ||
t.lock.RLock() | ||
defer t.lock.RUnlock() | ||
|
||
return len(t.all) | ||
} | ||
|
||
// add a transaction to the lookup | ||
func (t *txLookup) Add(tx *types.Transaction) { | ||
t.lock.Lock() | ||
defer t.lock.Unlock() | ||
|
||
t.all[tx.Hash()] = tx | ||
} | ||
|
||
// remove a transaction from the lookup | ||
func (t *txLookup) Remove(hash common.Hash) { | ||
t.lock.Lock() | ||
defer t.lock.Unlock() | ||
|
||
delete(t.all, hash) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please end method/type docs with punctuation marks, they should be complete sentences.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, I'd add an emphasis as to why this type exists and how it should be used.