diff --git a/.changelog/4431.feature.md b/.changelog/4431.feature.md new file mode 100644 index 00000000000..c7c9f31a33d --- /dev/null +++ b/.changelog/4431.feature.md @@ -0,0 +1 @@ +go/runtime/scheduling: Allow a higher priority tx even if queue full diff --git a/go/runtime/scheduling/simple/txpool/priorityqueue/priority_queue.go b/go/runtime/scheduling/simple/txpool/priorityqueue/priority_queue.go index cdd30123117..955b82fd9ca 100644 --- a/go/runtime/scheduling/simple/txpool/priorityqueue/priority_queue.go +++ b/go/runtime/scheduling/simple/txpool/priorityqueue/priority_queue.go @@ -23,7 +23,7 @@ type item struct { func (i item) Less(other btree.Item) bool { i2 := other.(*item) if p1, p2 := i.tx.Priority(), i2.tx.Priority(); p1 != p2 { - return p1 > p2 + return p1 < p2 } // If transactions have same priority, sort arbitrary. h1 := i.tx.Hash() @@ -41,6 +41,8 @@ type priorityQueue struct { poolWeights map[transaction.Weight]uint64 weightLimits map[transaction.Weight]uint64 + + lowestPriority uint64 } // Implements api.TxPool. @@ -54,20 +56,36 @@ func (q *priorityQueue) Add(tx *transaction.CheckedTransaction) error { defer q.Unlock() // Check if there is room in the queue. + var needsPop bool if q.poolWeights[transaction.WeightCount] >= q.maxTxPoolSize { - return api.ErrFull + needsPop = true + + if tx.Priority() <= q.lowestPriority { + return api.ErrFull + } } if err := q.checkTxLocked(tx); err != nil { return err } + // Remove the lowest priority transaction when queue is full. + if needsPop { + lpi := q.priorityIndex.Min() + if lpi != nil { + q.removeTxsLocked([]*item{lpi.(*item)}) + } + } + item := &item{tx: tx} q.priorityIndex.ReplaceOrInsert(item) q.transactions[tx.Hash()] = item for k, v := range tx.Weights() { q.poolWeights[k] += v } + if tx.Priority() < q.lowestPriority { + q.lowestPriority = tx.Priority() + } if mlen, qlen := len(q.transactions), q.priorityIndex.Len(); mlen != qlen { panic(fmt.Errorf("inconsistent sizes of the underlying index (%v) and map (%v) after Add", mlen, qlen)) @@ -102,7 +120,7 @@ func (q *priorityQueue) GetBatch(force bool) []*transaction.CheckedTransaction { batchWeights[w] = 0 } toRemove := []*item{} - q.priorityIndex.Ascend(func(i btree.Item) bool { + q.priorityIndex.Descend(func(i btree.Item) bool { item := i.(*item) // Check if the call fits into the batch. @@ -138,7 +156,13 @@ func (q *priorityQueue) GetBatch(force bool) []*transaction.CheckedTransaction { // Remove transactions discovered to be too big to even fit the batch. // This can happen if weight limits changed after the transaction was // already set to be scheduled. - for _, item := range toRemove { + q.removeTxsLocked(toRemove) + + return batch +} + +func (q *priorityQueue) removeTxsLocked(items []*item) { + for _, item := range items { delete(q.transactions, item.tx.Hash()) q.priorityIndex.Delete(item) for k, v := range item.tx.Weights() { @@ -146,7 +170,21 @@ func (q *priorityQueue) GetBatch(force bool) []*transaction.CheckedTransaction { } } - return batch + // Update lowest priority. + if len(items) > 0 { + if lpi := q.priorityIndex.Min(); lpi != nil { + q.lowestPriority = lpi.(*item).tx.Priority() + } else { + q.lowestPriority = 0 + } + } + + if mlen, qlen := len(q.transactions), q.priorityIndex.Len(); mlen != qlen { + panic(fmt.Errorf("inconsistent sizes of the underlying index (%v) and map (%v) after removal", mlen, qlen)) + } + if mlen, plen := uint64(len(q.transactions)), q.poolWeights[transaction.WeightCount]; mlen != plen { + panic(fmt.Errorf("inconsistent sizes of the map (%v) and pool weight count (%v) after removal", mlen, plen)) + } } // Implements api.TxPool. @@ -192,21 +230,13 @@ func (q *priorityQueue) RemoveBatch(batch []hash.Hash) { q.Lock() defer q.Unlock() + items := make([]*item, 0, len(batch)) for _, txHash := range batch { if item, ok := q.transactions[txHash]; ok { - q.priorityIndex.Delete(item) - delete(q.transactions, txHash) - for k, v := range item.tx.Weights() { - q.poolWeights[k] -= v - } + items = append(items, item) } } - if mlen, qlen := len(q.transactions), q.priorityIndex.Len(); mlen != qlen { - panic(fmt.Errorf("inconsistent sizes of the underlying index (%v) and map (%v) after RemoveBatch", mlen, qlen)) - } - if mlen, plen := uint64(len(q.transactions)), q.poolWeights[transaction.WeightCount]; mlen != plen { - panic(fmt.Errorf("inconsistent sizes of the map (%v) and pool weight count (%v) after RemoveBatch", mlen, plen)) - } + q.removeTxsLocked(items) } // Implements api.TxPool. @@ -244,6 +274,7 @@ func (q *priorityQueue) Clear() { q.priorityIndex.Clear(true) q.transactions = make(map[hash.Hash]*item) q.poolWeights = make(map[transaction.Weight]uint64) + q.lowestPriority = 0 } // NOTE: Assumes lock is held. diff --git a/go/runtime/scheduling/simple/txpool/tester.go b/go/runtime/scheduling/simple/txpool/tester.go index eeef9410590..333054975b1 100644 --- a/go/runtime/scheduling/simple/txpool/tester.go +++ b/go/runtime/scheduling/simple/txpool/tester.go @@ -303,9 +303,9 @@ func testPriority(t *testing.T, pool api.TxPool) { pool.Clear() pool.UpdateConfig(api.Config{ - MaxPoolSize: 50, + MaxPoolSize: 3, WeightLimits: map[transaction.Weight]uint64{ - transaction.WeightCount: 10, + transaction.WeightCount: 3, transaction.WeightSizeBytes: 100, }, }) @@ -343,6 +343,38 @@ func testPriority(t *testing.T, pool api.TxPool) { batch, "elements should be returned by priority", ) + + // When the pool is full, a higher priority transaction should still get queued. + highTx := transaction.NewCheckedTransaction( + []byte("hello world 6"), + 6, + nil, + ) + err := pool.Add(highTx) + require.NoError(t, err, "higher priority transaction should still get queued") + + batch = pool.GetBatch(true) + require.Len(t, batch, 3, "three transactions should be returned") + require.EqualValues( + t, + []*transaction.CheckedTransaction{ + txs[2], // 20 + txs[0], // 10 + highTx, // 6 + }, + batch, + "elements should be returned by priority", + ) + + // A lower priority transaction should not get queued. + lowTx := transaction.NewCheckedTransaction( + []byte("hello world 3"), + 3, + nil, + ) + err = pool.Add(lowTx) + require.Error(t, err, "lower priority transaction should not get queued") + require.ErrorIs(t, err, api.ErrFull) } // TxPoolImplementationBenchmarks runs the tx pool implementation benchmarks.