diff --git a/pkg/kv/kvserver/batcheval/declare.go b/pkg/kv/kvserver/batcheval/declare.go index 7252863b9ed2..85b1d13f4727 100644 --- a/pkg/kv/kvserver/batcheval/declare.go +++ b/pkg/kv/kvserver/batcheval/declare.go @@ -26,11 +26,9 @@ func DefaultDeclareKeys( req roachpb.Request, latchSpans, _ *spanset.SpanSet, ) { - var access spanset.SpanAccess + access := spanset.SpanReadWrite if roachpb.IsReadOnly(req) && !roachpb.IsLocking(req) { access = spanset.SpanReadOnly - } else { - access = spanset.SpanReadWrite } latchSpans.AddMVCC(access, req.Header().Span(), header.Timestamp) } @@ -46,13 +44,49 @@ func DefaultDeclareIsolatedKeys( req roachpb.Request, latchSpans, lockSpans *spanset.SpanSet, ) { - var access spanset.SpanAccess + access := spanset.SpanReadWrite + timestamp := header.Timestamp if roachpb.IsReadOnly(req) && !roachpb.IsLocking(req) { access = spanset.SpanReadOnly - } else { - access = spanset.SpanReadWrite + if header.Txn != nil { + // For transactional reads, acquire read latches all the way up to + // the transaction's MaxTimestamp, because reads may observe locks + // all the way up to this timestamp. + // + // TODO(nvanbenschoten): this parallels similar logic in + // concurrency.Request.readConflictTimestamp, which indicates that + // there is almost certainly a better way to structure this. There + // are actually two issues here that lead to this duplication: + // + // 1. latch spans and lock spans are declared separately. While these + // concepts are not identical, it appears that lock spans are always + // a subset of latch spans, which means that we can probably unify + // the concepts more closely than we have thus far. This would + // probably also have positive performance implications, as the + // duplication mandates extra memory allocations. + // + // 2. latch spans can each be assigned unique MVCC timestamps but lock + // spans inherit the timestamp of their request's transaction (see + // lockTable and concurrency.Request.{read,write}ConflictTimestamp). + // This difference is strange and confusing. It's not clear that the + // generality of latches each being declared at their own timestamp + // is useful. There may be an emergent pattern that arises here when + // we unify latch and lock spans (see part 1) where latches that are + // in the lock span subset can inherit their request's transaction's + // timestamp and latches that are not are non-MVCC latches. + // + // Note that addressing these issues does not necessarily need to + // lead to the timestamp that MVCC spans are interpretted at being + // the same for the purposes of the latch manager and lock-table. + // For instance, once the lock-table is segregated and all logic + // relating to "lock discovery" is removed, we no longer need to + // acquire read latches up to a txn's max timestamp, just to its + // read timestamp. However, we will still need to search the + // lock-table up to a txn's max timestamp. + timestamp.Forward(header.Txn.MaxTimestamp) + } } - latchSpans.AddMVCC(access, req.Header().Span(), header.Timestamp) + latchSpans.AddMVCC(access, req.Header().Span(), timestamp) lockSpans.AddNonMVCC(access, req.Header().Span()) } diff --git a/pkg/kv/kvserver/concurrency/lock_table.go b/pkg/kv/kvserver/concurrency/lock_table.go index 5d5e216613a5..cbb704c6d225 100644 --- a/pkg/kv/kvserver/concurrency/lock_table.go +++ b/pkg/kv/kvserver/concurrency/lock_table.go @@ -1138,7 +1138,8 @@ func (l *lockState) acquireLock( // Already held. beforeTxn, beforeTs, _ := l.getLockerInfo() if txn.ID != beforeTxn.ID { - return errors.Errorf("caller violated contract: existing lock cannot be acquired by different transaction") + return errors.Errorf("caller violated contract: " + + "existing lock cannot be acquired by different transaction") } seqs := l.holder.holder[durability].seqs if l.holder.holder[durability].txn != nil && l.holder.holder[durability].txn.Epoch < txn.Epoch { @@ -1248,7 +1249,8 @@ func (l *lockState) discoveredLock( if l.holder.locked { if !l.isLockedBy(txn.ID) { - panic("bug in caller or lockTable") + return errors.Errorf("caller violated contract: " + + "discovered lock by different transaction than existing lock") } } else { l.holder.locked = true diff --git a/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/acquire_wrong_txn_race b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/acquire_wrong_txn_race new file mode 100644 index 000000000000..4ed4312dca3f --- /dev/null +++ b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/acquire_wrong_txn_race @@ -0,0 +1,166 @@ +# ------------------------------------------------------------- +# The following sequence of events used to trigger an assertion +# in lock-table that a lock was acquired by a different +# transaction than the current lock holder. This was due to an +# interleaving of concurrent requests holding compatible latches +# that allowed the lock-table state to get out of sync. If scans +# do not acquire latches all the way up to their max timestamp +# (i.e. the max time that they can observe conflicting locks), +# we risk hitting this race condition. +# +# Setup: txn1 acquires lock k at ts 15 +# lock-table is cleared, lock k removed +# +# Test: txn2 sequences to scan at ts 10 with max ts 20 +# txn2 observes txn1's lock while evaluation +# txn2 does not immediately handle-write-intent-error +# txn1 attempts to commits and resolve its lock +# txn1 blocks while acquiring latches [*] +# txn3 blocks while acquiring latches +# txn2 "discovers" and adds txn1's lock to lock-table +# txn2 re-sequences and blocks while acquiring latches +# txn1 proceeds in clearing its lock from the lock-table +# txn2 proceeds in reading +# txn3 proceeds in acquiring lock k at ts 25 +# +# [*] if txn2 was only holding latches up to its read timestamp +# and not up to its max timestamp then this wouldn't block, +# allowing the lock to be removed. txn3 could then proceed +# after this. Regardless of whether txn2 informed the +# lock-table about the discovered lock it found from txn1 +# first or txn3 informed the lock-table about the lock it +# acquired first, the second request would hit an assertion. +# ------------------------------------------------------------- + +new-txn name=txn1 ts=15 epoch=0 +---- + +new-txn name=txn2 ts=10 epoch=0 maxts=20 +---- + +new-txn name=txn3 ts=25 epoch=0 +---- + +new-request name=req1 txn=txn1 ts=15 + put key=k value=v +---- + +new-request name=req2 txn=txn2 ts=10 + get key=k +---- + +new-request name=req3 txn=txn3 ts=25 + put key=k value=v2 +---- + +sequence req=req1 +---- +[1] sequence req1: sequencing request +[1] sequence req1: acquiring latches +[1] sequence req1: scanning lock table for conflicting locks +[1] sequence req1: sequencing complete, returned guard + +on-lock-acquired req=req1 key=k +---- +[-] acquire lock: txn 00000001 @ k + +finish req=req1 +---- +[-] finish req1: finishing request + +debug-lock-table +---- +global: num=1 + lock: "k" + holder: txn: 00000001-0000-0000-0000-000000000000, ts: 0.000000015,0, info: unrepl epoch: 0, seqs: [0] +local: num=0 + +on-split +---- +[-] split range: complete + +debug-lock-table +---- +global: num=0 +local: num=0 + +# -------------------------------- +# Setup complete, test starts here +# -------------------------------- + +sequence req=req2 +---- +[2] sequence req2: sequencing request +[2] sequence req2: acquiring latches +[2] sequence req2: scanning lock table for conflicting locks +[2] sequence req2: sequencing complete, returned guard + +# txn2 observes txn1's lock while evaluation, but it does not immediately +# handle-write-intent-error. Maybe its goroutine was unscheduled for a while. In +# the interim period, requests come in to try to resolve the lock and to replace +# the lock. Neither should be allowed to proceed. + +on-txn-updated txn=txn1 status=committed +---- +[-] update txn: committing txn1 + +new-request name=reqRes1 txn=none ts=15 + resolve-intent txn=txn1 key=k status=committed +---- + +sequence req=reqRes1 +---- +[3] sequence reqRes1: sequencing request +[3] sequence reqRes1: acquiring latches +[3] sequence reqRes1: blocked on select in spanlatch.(*Manager).waitForSignal + +sequence req=req3 +---- +[4] sequence req3: sequencing request +[4] sequence req3: acquiring latches +[4] sequence req3: blocked on select in spanlatch.(*Manager).waitForSignal + +handle-write-intent-error req=req2 txn=txn1 key=k +---- +[3] sequence reqRes1: sequencing complete, returned guard +[5] handle write intent error req2: handled conflicting intents on "k", released latches + +sequence req=req2 +---- +[6] sequence req2: re-sequencing request +[6] sequence req2: acquiring latches +[6] sequence req2: blocked on select in spanlatch.(*Manager).waitForSignal + +on-lock-updated req=reqRes1 txn=txn1 key=k status=committed +---- +[-] update lock: committing txn 00000001 @ k + +finish req=reqRes1 +---- +[-] finish reqRes1: finishing request +[4] sequence req3: scanning lock table for conflicting locks +[4] sequence req3: sequencing complete, returned guard +[6] sequence req2: scanning lock table for conflicting locks +[6] sequence req2: sequencing complete, returned guard + +finish req=req2 +---- +[-] finish req2: finishing request + +on-lock-acquired req=req3 key=k +---- +[-] acquire lock: txn 00000003 @ k + +finish req=req3 +---- +[-] finish req3: finishing request + +debug-lock-table +---- +global: num=1 + lock: "k" + holder: txn: 00000003-0000-0000-0000-000000000000, ts: 0.000000025,0, info: unrepl epoch: 0, seqs: [0] +local: num=0 + +reset +---- diff --git a/pkg/kv/kvserver/replica_send.go b/pkg/kv/kvserver/replica_send.go index d59a6cf92a2e..53577cd2ba50 100644 --- a/pkg/kv/kvserver/replica_send.go +++ b/pkg/kv/kvserver/replica_send.go @@ -161,17 +161,9 @@ var _ batchExecutionFn = (*Replica).executeReadOnlyBatch func (r *Replica) executeBatchWithConcurrencyRetries( ctx context.Context, ba *roachpb.BatchRequest, fn batchExecutionFn, ) (br *roachpb.BatchResponse, pErr *roachpb.Error) { - // Determine the maximal set of key spans that the batch will operate on. - latchSpans, lockSpans, err := r.collectSpans(ba) - if err != nil { - return nil, roachpb.NewError(err) - } - - // Handle load-based splitting. - r.recordBatchForLoadBasedSplitting(ctx, ba, latchSpans) - // Try to execute command; exit retry loop on success. var g *concurrency.Guard + var latchSpans, lockSpans *spanset.SpanSet defer func() { // NB: wrapped to delay g evaluation to its value when returning. if g != nil { @@ -205,6 +197,20 @@ func (r *Replica) executeBatchWithConcurrencyRetries( // Limit the transaction's maximum timestamp using observed timestamps. r.limitTxnMaxTimestamp(ctx, ba, status) + // Determine the maximal set of key spans that the batch will operate + // on. We only need to do this once and we make sure to do so after we + // have limited the transaction's maximum timestamp. + if latchSpans == nil { + var err error + latchSpans, lockSpans, err = r.collectSpans(ba) + if err != nil { + return nil, roachpb.NewError(err) + } + + // Handle load-based splitting. + r.recordBatchForLoadBasedSplitting(ctx, ba, latchSpans) + } + // Acquire latches to prevent overlapping requests from executing until // this request completes. After latching, wait on any conflicting locks // to ensure that the request has full isolation during evaluation. This