diff --git a/pkg/kv/kvserver/txnwait/queue.go b/pkg/kv/kvserver/txnwait/queue.go index fda0c7fb0528..b09558d3604c 100644 --- a/pkg/kv/kvserver/txnwait/queue.go +++ b/pkg/kv/kvserver/txnwait/queue.go @@ -84,8 +84,18 @@ func ShouldPushImmediately(req *roachpb.PushTxnRequest) bool { // CanPushWithPriority returns true if the given pusher can push the pushee // based on its priority. func CanPushWithPriority(pusher, pushee enginepb.TxnPriority) bool { - return pusher > pushee && - (pusher == enginepb.MaxTxnPriority || pushee == enginepb.MinTxnPriority) + // Normalize the transaction priorities so that normal user priorities are + // considered equal for the purposes of pushing. + normalize := func(p enginepb.TxnPriority) enginepb.TxnPriority { + switch p { + case enginepb.MinTxnPriority, enginepb.MaxTxnPriority: + return p + default /* normal txn priorities */ : + return enginepb.MinTxnPriority + 1 + } + } + pusher, pushee = normalize(pusher), normalize(pushee) + return pusher > pushee } // isPushed returns whether the PushTxn request has already been diff --git a/pkg/kv/kvserver/txnwait/queue_test.go b/pkg/kv/kvserver/txnwait/queue_test.go index 131bdef457e2..11fc1fce4877 100644 --- a/pkg/kv/kvserver/txnwait/queue_test.go +++ b/pkg/kv/kvserver/txnwait/queue_test.go @@ -12,6 +12,7 @@ package txnwait import ( "context" + "fmt" "sync" "sync/atomic" "testing" @@ -33,7 +34,8 @@ func TestShouldPushImmediately(t *testing.T) { min := enginepb.MinTxnPriority max := enginepb.MaxTxnPriority - mid := enginepb.TxnPriority(1) + mid1 := enginepb.TxnPriority(1) + mid2 := enginepb.TxnPriority(2) testCases := []struct { force bool typ roachpb.PushTxnType @@ -42,59 +44,101 @@ func TestShouldPushImmediately(t *testing.T) { shouldPush bool }{ {false, roachpb.PUSH_ABORT, min, min, false}, - {false, roachpb.PUSH_ABORT, min, mid, false}, + {false, roachpb.PUSH_ABORT, min, mid1, false}, + {false, roachpb.PUSH_ABORT, min, mid2, false}, {false, roachpb.PUSH_ABORT, min, max, false}, - {false, roachpb.PUSH_ABORT, mid, min, true}, - {false, roachpb.PUSH_ABORT, mid, mid, false}, - {false, roachpb.PUSH_ABORT, mid, max, false}, + {false, roachpb.PUSH_ABORT, mid1, min, true}, + {false, roachpb.PUSH_ABORT, mid1, mid1, false}, + {false, roachpb.PUSH_ABORT, mid1, mid2, false}, + {false, roachpb.PUSH_ABORT, mid1, max, false}, + {false, roachpb.PUSH_ABORT, mid2, min, true}, + {false, roachpb.PUSH_ABORT, mid2, mid1, false}, + {false, roachpb.PUSH_ABORT, mid2, mid2, false}, + {false, roachpb.PUSH_ABORT, mid2, max, false}, {false, roachpb.PUSH_ABORT, max, min, true}, - {false, roachpb.PUSH_ABORT, max, mid, true}, + {false, roachpb.PUSH_ABORT, max, mid1, true}, + {false, roachpb.PUSH_ABORT, max, mid2, true}, {false, roachpb.PUSH_ABORT, max, max, false}, {false, roachpb.PUSH_TIMESTAMP, min, min, false}, - {false, roachpb.PUSH_TIMESTAMP, min, mid, false}, + {false, roachpb.PUSH_TIMESTAMP, min, mid1, false}, + {false, roachpb.PUSH_TIMESTAMP, min, mid2, false}, {false, roachpb.PUSH_TIMESTAMP, min, max, false}, - {false, roachpb.PUSH_TIMESTAMP, mid, min, true}, - {false, roachpb.PUSH_TIMESTAMP, mid, mid, false}, - {false, roachpb.PUSH_TIMESTAMP, mid, max, false}, + {false, roachpb.PUSH_TIMESTAMP, mid1, min, true}, + {false, roachpb.PUSH_TIMESTAMP, mid1, mid1, false}, + {false, roachpb.PUSH_TIMESTAMP, mid1, mid2, false}, + {false, roachpb.PUSH_TIMESTAMP, mid1, max, false}, + {false, roachpb.PUSH_TIMESTAMP, mid2, min, true}, + {false, roachpb.PUSH_TIMESTAMP, mid2, mid1, false}, + {false, roachpb.PUSH_TIMESTAMP, mid2, mid2, false}, + {false, roachpb.PUSH_TIMESTAMP, mid2, max, false}, {false, roachpb.PUSH_TIMESTAMP, max, min, true}, - {false, roachpb.PUSH_TIMESTAMP, max, mid, true}, + {false, roachpb.PUSH_TIMESTAMP, max, mid1, true}, + {false, roachpb.PUSH_TIMESTAMP, max, mid2, true}, {false, roachpb.PUSH_TIMESTAMP, max, max, false}, {false, roachpb.PUSH_TOUCH, min, min, true}, - {false, roachpb.PUSH_TOUCH, min, mid, true}, + {false, roachpb.PUSH_TOUCH, min, mid1, true}, + {false, roachpb.PUSH_TOUCH, min, mid2, true}, {false, roachpb.PUSH_TOUCH, min, max, true}, - {false, roachpb.PUSH_TOUCH, mid, min, true}, - {false, roachpb.PUSH_TOUCH, mid, mid, true}, - {false, roachpb.PUSH_TOUCH, mid, max, true}, + {false, roachpb.PUSH_TOUCH, mid1, min, true}, + {false, roachpb.PUSH_TOUCH, mid1, mid1, true}, + {false, roachpb.PUSH_TOUCH, mid1, mid2, true}, + {false, roachpb.PUSH_TOUCH, mid1, max, true}, + {false, roachpb.PUSH_TOUCH, mid2, min, true}, + {false, roachpb.PUSH_TOUCH, mid2, mid1, true}, + {false, roachpb.PUSH_TOUCH, mid2, mid2, true}, + {false, roachpb.PUSH_TOUCH, mid2, max, true}, {false, roachpb.PUSH_TOUCH, max, min, true}, - {false, roachpb.PUSH_TOUCH, max, mid, true}, + {false, roachpb.PUSH_TOUCH, max, mid1, true}, + {false, roachpb.PUSH_TOUCH, max, mid2, true}, {false, roachpb.PUSH_TOUCH, max, max, true}, // Force pushes always push immediately. {true, roachpb.PUSH_ABORT, min, min, true}, - {true, roachpb.PUSH_ABORT, min, mid, true}, + {true, roachpb.PUSH_ABORT, min, mid1, true}, + {true, roachpb.PUSH_ABORT, min, mid2, true}, {true, roachpb.PUSH_ABORT, min, max, true}, - {true, roachpb.PUSH_ABORT, mid, min, true}, - {true, roachpb.PUSH_ABORT, mid, mid, true}, - {true, roachpb.PUSH_ABORT, mid, max, true}, + {true, roachpb.PUSH_ABORT, mid1, min, true}, + {true, roachpb.PUSH_ABORT, mid1, mid1, true}, + {true, roachpb.PUSH_ABORT, mid1, mid2, true}, + {true, roachpb.PUSH_ABORT, mid1, max, true}, + {true, roachpb.PUSH_ABORT, mid2, min, true}, + {true, roachpb.PUSH_ABORT, mid2, mid1, true}, + {true, roachpb.PUSH_ABORT, mid2, mid2, true}, + {true, roachpb.PUSH_ABORT, mid2, max, true}, {true, roachpb.PUSH_ABORT, max, min, true}, - {true, roachpb.PUSH_ABORT, max, mid, true}, + {true, roachpb.PUSH_ABORT, max, mid1, true}, + {true, roachpb.PUSH_ABORT, max, mid2, true}, {true, roachpb.PUSH_ABORT, max, max, true}, {true, roachpb.PUSH_TIMESTAMP, min, min, true}, - {true, roachpb.PUSH_TIMESTAMP, min, mid, true}, + {true, roachpb.PUSH_TIMESTAMP, min, mid1, true}, + {true, roachpb.PUSH_TIMESTAMP, min, mid2, true}, {true, roachpb.PUSH_TIMESTAMP, min, max, true}, - {true, roachpb.PUSH_TIMESTAMP, mid, min, true}, - {true, roachpb.PUSH_TIMESTAMP, mid, mid, true}, - {true, roachpb.PUSH_TIMESTAMP, mid, max, true}, + {true, roachpb.PUSH_TIMESTAMP, mid1, min, true}, + {true, roachpb.PUSH_TIMESTAMP, mid1, mid1, true}, + {true, roachpb.PUSH_TIMESTAMP, mid1, mid2, true}, + {true, roachpb.PUSH_TIMESTAMP, mid1, max, true}, + {true, roachpb.PUSH_TIMESTAMP, mid2, min, true}, + {true, roachpb.PUSH_TIMESTAMP, mid2, mid1, true}, + {true, roachpb.PUSH_TIMESTAMP, mid2, mid2, true}, + {true, roachpb.PUSH_TIMESTAMP, mid2, max, true}, {true, roachpb.PUSH_TIMESTAMP, max, min, true}, - {true, roachpb.PUSH_TIMESTAMP, max, mid, true}, + {true, roachpb.PUSH_TIMESTAMP, max, mid1, true}, + {true, roachpb.PUSH_TIMESTAMP, max, mid2, true}, {true, roachpb.PUSH_TIMESTAMP, max, max, true}, {true, roachpb.PUSH_TOUCH, min, min, true}, - {true, roachpb.PUSH_TOUCH, min, mid, true}, + {true, roachpb.PUSH_TOUCH, min, mid1, true}, + {true, roachpb.PUSH_TOUCH, min, mid2, true}, {true, roachpb.PUSH_TOUCH, min, max, true}, - {true, roachpb.PUSH_TOUCH, mid, min, true}, - {true, roachpb.PUSH_TOUCH, mid, mid, true}, - {true, roachpb.PUSH_TOUCH, mid, max, true}, + {true, roachpb.PUSH_TOUCH, mid1, min, true}, + {true, roachpb.PUSH_TOUCH, mid1, mid1, true}, + {true, roachpb.PUSH_TOUCH, mid1, mid2, true}, + {true, roachpb.PUSH_TOUCH, mid1, max, true}, + {true, roachpb.PUSH_TOUCH, mid2, min, true}, + {true, roachpb.PUSH_TOUCH, mid2, mid1, true}, + {true, roachpb.PUSH_TOUCH, mid2, mid2, true}, + {true, roachpb.PUSH_TOUCH, mid2, max, true}, {true, roachpb.PUSH_TOUCH, max, min, true}, - {true, roachpb.PUSH_TOUCH, max, mid, true}, + {true, roachpb.PUSH_TOUCH, max, mid1, true}, + {true, roachpb.PUSH_TOUCH, max, mid2, true}, {true, roachpb.PUSH_TOUCH, max, max, true}, } for _, test := range testCases { @@ -111,9 +155,46 @@ func TestShouldPushImmediately(t *testing.T) { Priority: test.pusheePri, }, } - if shouldPush := ShouldPushImmediately(&req); shouldPush != test.shouldPush { - t.Errorf("expected %t; got %t", test.shouldPush, shouldPush) - } + shouldPush := ShouldPushImmediately(&req) + require.Equal(t, test.shouldPush, shouldPush) + }) + } +} + +func TestCanPushWithPriority(t *testing.T) { + defer leaktest.AfterTest(t)() + + min := enginepb.MinTxnPriority + max := enginepb.MaxTxnPriority + mid1 := enginepb.TxnPriority(1) + mid2 := enginepb.TxnPriority(2) + testCases := []struct { + pusher enginepb.TxnPriority + pushee enginepb.TxnPriority + exp bool + }{ + {min, min, false}, + {min, mid1, false}, + {min, mid2, false}, + {min, max, false}, + {mid1, min, true}, + {mid1, mid1, false}, + {mid1, mid2, false}, + {mid1, max, false}, + {mid2, min, true}, + {mid2, mid1, false}, + {mid2, mid2, false}, + {mid2, max, false}, + {max, min, true}, + {max, mid1, true}, + {max, mid2, true}, + {max, max, false}, + } + for _, test := range testCases { + name := fmt.Sprintf("pusher=%d/pushee=%d", test.pusher, test.pushee) + t.Run(name, func(t *testing.T) { + canPush := CanPushWithPriority(test.pusher, test.pushee) + require.Equal(t, test.exp, canPush) }) } } @@ -158,9 +239,8 @@ func TestIsPushed(t *testing.T) { WriteTimestamp: test.txnTimestamp, }, } - if isPushed := isPushed(&req, &txn); isPushed != test.isPushed { - t.Errorf("expected %t; got %t", test.isPushed, isPushed) - } + isPushed := isPushed(&req, &txn) + require.Equal(t, test.isPushed, isPushed) }) } }