Skip to content

Commit

Permalink
Merge pull request #4431 from oasisprotocol/kostko/feature/rt-fullq-p…
Browse files Browse the repository at this point in the history
…riority

go/runtime/scheduling: Allow a higher priority tx even if queue full
  • Loading branch information
kostko authored Jan 18, 2022
2 parents 60ac7ae + 1ef5b76 commit 19e6fd8
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 18 deletions.
1 change: 1 addition & 0 deletions .changelog/4431.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
go/runtime/scheduling: Allow a higher priority tx even if queue full
63 changes: 47 additions & 16 deletions go/runtime/scheduling/simple/txpool/priorityqueue/priority_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type item struct {
func (i item) Less(other btree.Item) bool {
i2 := other.(*item)
if p1, p2 := i.tx.Priority(), i2.tx.Priority(); p1 != p2 {
return p1 > p2
return p1 < p2
}
// If transactions have same priority, sort arbitrary.
h1 := i.tx.Hash()
Expand All @@ -41,6 +41,8 @@ type priorityQueue struct {

poolWeights map[transaction.Weight]uint64
weightLimits map[transaction.Weight]uint64

lowestPriority uint64
}

// Implements api.TxPool.
Expand All @@ -54,20 +56,36 @@ func (q *priorityQueue) Add(tx *transaction.CheckedTransaction) error {
defer q.Unlock()

// Check if there is room in the queue.
var needsPop bool
if q.poolWeights[transaction.WeightCount] >= q.maxTxPoolSize {
return api.ErrFull
needsPop = true

if tx.Priority() <= q.lowestPriority {
return api.ErrFull
}
}

if err := q.checkTxLocked(tx); err != nil {
return err
}

// Remove the lowest priority transaction when queue is full.
if needsPop {
lpi := q.priorityIndex.Min()
if lpi != nil {
q.removeTxsLocked([]*item{lpi.(*item)})
}
}

item := &item{tx: tx}
q.priorityIndex.ReplaceOrInsert(item)
q.transactions[tx.Hash()] = item
for k, v := range tx.Weights() {
q.poolWeights[k] += v
}
if tx.Priority() < q.lowestPriority {
q.lowestPriority = tx.Priority()
}

if mlen, qlen := len(q.transactions), q.priorityIndex.Len(); mlen != qlen {
panic(fmt.Errorf("inconsistent sizes of the underlying index (%v) and map (%v) after Add", mlen, qlen))
Expand Down Expand Up @@ -102,7 +120,7 @@ func (q *priorityQueue) GetBatch(force bool) []*transaction.CheckedTransaction {
batchWeights[w] = 0
}
toRemove := []*item{}
q.priorityIndex.Ascend(func(i btree.Item) bool {
q.priorityIndex.Descend(func(i btree.Item) bool {
item := i.(*item)

// Check if the call fits into the batch.
Expand Down Expand Up @@ -138,15 +156,35 @@ func (q *priorityQueue) GetBatch(force bool) []*transaction.CheckedTransaction {
// Remove transactions discovered to be too big to even fit the batch.
// This can happen if weight limits changed after the transaction was
// already set to be scheduled.
for _, item := range toRemove {
q.removeTxsLocked(toRemove)

return batch
}

func (q *priorityQueue) removeTxsLocked(items []*item) {
for _, item := range items {
delete(q.transactions, item.tx.Hash())
q.priorityIndex.Delete(item)
for k, v := range item.tx.Weights() {
q.poolWeights[k] -= v
}
}

return batch
// Update lowest priority.
if len(items) > 0 {
if lpi := q.priorityIndex.Min(); lpi != nil {
q.lowestPriority = lpi.(*item).tx.Priority()
} else {
q.lowestPriority = 0
}
}

if mlen, qlen := len(q.transactions), q.priorityIndex.Len(); mlen != qlen {
panic(fmt.Errorf("inconsistent sizes of the underlying index (%v) and map (%v) after removal", mlen, qlen))
}
if mlen, plen := uint64(len(q.transactions)), q.poolWeights[transaction.WeightCount]; mlen != plen {
panic(fmt.Errorf("inconsistent sizes of the map (%v) and pool weight count (%v) after removal", mlen, plen))
}
}

// Implements api.TxPool.
Expand Down Expand Up @@ -192,21 +230,13 @@ func (q *priorityQueue) RemoveBatch(batch []hash.Hash) {
q.Lock()
defer q.Unlock()

items := make([]*item, 0, len(batch))
for _, txHash := range batch {
if item, ok := q.transactions[txHash]; ok {
q.priorityIndex.Delete(item)
delete(q.transactions, txHash)
for k, v := range item.tx.Weights() {
q.poolWeights[k] -= v
}
items = append(items, item)
}
}
if mlen, qlen := len(q.transactions), q.priorityIndex.Len(); mlen != qlen {
panic(fmt.Errorf("inconsistent sizes of the underlying index (%v) and map (%v) after RemoveBatch", mlen, qlen))
}
if mlen, plen := uint64(len(q.transactions)), q.poolWeights[transaction.WeightCount]; mlen != plen {
panic(fmt.Errorf("inconsistent sizes of the map (%v) and pool weight count (%v) after RemoveBatch", mlen, plen))
}
q.removeTxsLocked(items)
}

// Implements api.TxPool.
Expand Down Expand Up @@ -244,6 +274,7 @@ func (q *priorityQueue) Clear() {
q.priorityIndex.Clear(true)
q.transactions = make(map[hash.Hash]*item)
q.poolWeights = make(map[transaction.Weight]uint64)
q.lowestPriority = 0
}

// NOTE: Assumes lock is held.
Expand Down
36 changes: 34 additions & 2 deletions go/runtime/scheduling/simple/txpool/tester.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,9 +303,9 @@ func testPriority(t *testing.T, pool api.TxPool) {
pool.Clear()

pool.UpdateConfig(api.Config{
MaxPoolSize: 50,
MaxPoolSize: 3,
WeightLimits: map[transaction.Weight]uint64{
transaction.WeightCount: 10,
transaction.WeightCount: 3,
transaction.WeightSizeBytes: 100,
},
})
Expand Down Expand Up @@ -343,6 +343,38 @@ func testPriority(t *testing.T, pool api.TxPool) {
batch,
"elements should be returned by priority",
)

// When the pool is full, a higher priority transaction should still get queued.
highTx := transaction.NewCheckedTransaction(
[]byte("hello world 6"),
6,
nil,
)
err := pool.Add(highTx)
require.NoError(t, err, "higher priority transaction should still get queued")

batch = pool.GetBatch(true)
require.Len(t, batch, 3, "three transactions should be returned")
require.EqualValues(
t,
[]*transaction.CheckedTransaction{
txs[2], // 20
txs[0], // 10
highTx, // 6
},
batch,
"elements should be returned by priority",
)

// A lower priority transaction should not get queued.
lowTx := transaction.NewCheckedTransaction(
[]byte("hello world 3"),
3,
nil,
)
err = pool.Add(lowTx)
require.Error(t, err, "lower priority transaction should not get queued")
require.ErrorIs(t, err, api.ErrFull)
}

// TxPoolImplementationBenchmarks runs the tx pool implementation benchmarks.
Expand Down

0 comments on commit 19e6fd8

Please sign in to comment.