Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kv/kvsever: acquire read latches up to Txn.MaxTimestamp #46830

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 41 additions & 7 deletions pkg/kv/kvserver/batcheval/declare.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,9 @@ func DefaultDeclareKeys(
req roachpb.Request,
latchSpans, _ *spanset.SpanSet,
) {
var access spanset.SpanAccess
access := spanset.SpanReadWrite
if roachpb.IsReadOnly(req) && !roachpb.IsLocking(req) {
access = spanset.SpanReadOnly
} else {
access = spanset.SpanReadWrite
}
latchSpans.AddMVCC(access, req.Header().Span(), header.Timestamp)
}
Expand All @@ -46,13 +44,49 @@ func DefaultDeclareIsolatedKeys(
req roachpb.Request,
latchSpans, lockSpans *spanset.SpanSet,
) {
var access spanset.SpanAccess
access := spanset.SpanReadWrite
timestamp := header.Timestamp
if roachpb.IsReadOnly(req) && !roachpb.IsLocking(req) {
access = spanset.SpanReadOnly
} else {
access = spanset.SpanReadWrite
if header.Txn != nil {
// For transactional reads, acquire read latches all the way up to
// the transaction's MaxTimestamp, because reads may observe locks
// all the way up to this timestamp.
//
// TODO(nvanbenschoten): this parallels similar logic in
// concurrency.Request.readConflictTimestamp, which indicates that
// there is almost certainly a better way to structure this. There
// are actually two issues here that lead to this duplication:
//
// 1. latch spans and lock spans are declared separately. While these
// concepts are not identical, it appears that lock spans are always
// a subset of latch spans, which means that we can probably unify
// the concepts more closely than we have thus far. This would
// probably also have positive performance implications, as the
// duplication mandates extra memory allocations.
//
// 2. latch spans can each be assigned unique MVCC timestamps but lock
// spans inherit the timestamp of their request's transaction (see
// lockTable and concurrency.Request.{read,write}ConflictTimestamp).
// This difference is strange and confusing. It's not clear that the
// generality of latches each being declared at their own timestamp
// is useful. There may be an emergent pattern that arises here when
// we unify latch and lock spans (see part 1) where latches that are
// in the lock span subset can inherit their request's transaction's
// timestamp and latches that are not are non-MVCC latches.
//
// Note that addressing these issues does not necessarily need to
// lead to the timestamp that MVCC spans are interpretted at being
// the same for the purposes of the latch manager and lock-table.
// For instance, once the lock-table is segregated and all logic
// relating to "lock discovery" is removed, we no longer need to
// acquire read latches up to a txn's max timestamp, just to its
// read timestamp. However, we will still need to search the
// lock-table up to a txn's max timestamp.
timestamp.Forward(header.Txn.MaxTimestamp)
}
}
latchSpans.AddMVCC(access, req.Header().Span(), header.Timestamp)
latchSpans.AddMVCC(access, req.Header().Span(), timestamp)
lockSpans.AddNonMVCC(access, req.Header().Span())
}

Expand Down
6 changes: 4 additions & 2 deletions pkg/kv/kvserver/concurrency/lock_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -1138,7 +1138,8 @@ func (l *lockState) acquireLock(
// Already held.
beforeTxn, beforeTs, _ := l.getLockerInfo()
if txn.ID != beforeTxn.ID {
return errors.Errorf("caller violated contract: existing lock cannot be acquired by different transaction")
return errors.Errorf("caller violated contract: " +
"existing lock cannot be acquired by different transaction")
}
seqs := l.holder.holder[durability].seqs
if l.holder.holder[durability].txn != nil && l.holder.holder[durability].txn.Epoch < txn.Epoch {
Expand Down Expand Up @@ -1248,7 +1249,8 @@ func (l *lockState) discoveredLock(

if l.holder.locked {
if !l.isLockedBy(txn.ID) {
panic("bug in caller or lockTable")
return errors.Errorf("caller violated contract: " +
"discovered lock by different transaction than existing lock")
}
} else {
l.holder.locked = true
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
# -------------------------------------------------------------
# The following sequence of events used to trigger an assertion
# in lock-table that a lock was acquired by a different
# transaction than the current lock holder. This was due to an
# interleaving of concurrent requests holding compatible latches
# that allowed the lock-table state to get out of sync. If scans
# do not acquire latches all the way up to their max timestamp
# (i.e. the max time that they can observe conflicting locks),
# we risk hitting this race condition.
#
# Setup: txn1 acquires lock k at ts 15
# lock-table is cleared, lock k removed
#
# Test: txn2 sequences to scan at ts 10 with max ts 20
# txn2 observes txn1's lock while evaluation
# txn2 does not immediately handle-write-intent-error
# txn1 attempts to commits and resolve its lock
# txn1 blocks while acquiring latches [*]
# txn3 blocks while acquiring latches
# txn2 "discovers" and adds txn1's lock to lock-table
# txn2 re-sequences and blocks while acquiring latches
# txn1 proceeds in clearing its lock from the lock-table
# txn2 proceeds in reading
# txn3 proceeds in acquiring lock k at ts 25
#
# [*] if txn2 was only holding latches up to its read timestamp
# and not up to its max timestamp then this wouldn't block,
# allowing the lock to be removed. txn3 could then proceed
# after this. Regardless of whether txn2 informed the
# lock-table about the discovered lock it found from txn1
# first or txn3 informed the lock-table about the lock it
# acquired first, the second request would hit an assertion.
# -------------------------------------------------------------

new-txn name=txn1 ts=15 epoch=0
----

new-txn name=txn2 ts=10 epoch=0 maxts=20
----

new-txn name=txn3 ts=25 epoch=0
----

new-request name=req1 txn=txn1 ts=15
put key=k value=v
----

new-request name=req2 txn=txn2 ts=10
get key=k
----

new-request name=req3 txn=txn3 ts=25
put key=k value=v2
----

sequence req=req1
----
[1] sequence req1: sequencing request
[1] sequence req1: acquiring latches
[1] sequence req1: scanning lock table for conflicting locks
[1] sequence req1: sequencing complete, returned guard

on-lock-acquired req=req1 key=k
----
[-] acquire lock: txn 00000001 @ k

finish req=req1
----
[-] finish req1: finishing request

debug-lock-table
----
global: num=1
lock: "k"
holder: txn: 00000001-0000-0000-0000-000000000000, ts: 0.000000015,0, info: unrepl epoch: 0, seqs: [0]
local: num=0

on-split
----
[-] split range: complete

debug-lock-table
----
global: num=0
local: num=0

# --------------------------------
# Setup complete, test starts here
# --------------------------------

sequence req=req2
----
[2] sequence req2: sequencing request
[2] sequence req2: acquiring latches
[2] sequence req2: scanning lock table for conflicting locks
[2] sequence req2: sequencing complete, returned guard

# txn2 observes txn1's lock while evaluation, but it does not immediately
# handle-write-intent-error. Maybe its goroutine was unscheduled for a while. In
# the interim period, requests come in to try to resolve the lock and to replace
# the lock. Neither should be allowed to proceed.

on-txn-updated txn=txn1 status=committed
----
[-] update txn: committing txn1

new-request name=reqRes1 txn=none ts=15
resolve-intent txn=txn1 key=k status=committed
----

sequence req=reqRes1
----
[3] sequence reqRes1: sequencing request
[3] sequence reqRes1: acquiring latches
[3] sequence reqRes1: blocked on select in spanlatch.(*Manager).waitForSignal

sequence req=req3
----
[4] sequence req3: sequencing request
[4] sequence req3: acquiring latches
[4] sequence req3: blocked on select in spanlatch.(*Manager).waitForSignal

handle-write-intent-error req=req2 txn=txn1 key=k
----
[3] sequence reqRes1: sequencing complete, returned guard
[5] handle write intent error req2: handled conflicting intents on "k", released latches

sequence req=req2
----
[6] sequence req2: re-sequencing request
[6] sequence req2: acquiring latches
[6] sequence req2: blocked on select in spanlatch.(*Manager).waitForSignal

on-lock-updated req=reqRes1 txn=txn1 key=k status=committed
----
[-] update lock: committing txn 00000001 @ k

finish req=reqRes1
----
[-] finish reqRes1: finishing request
[4] sequence req3: scanning lock table for conflicting locks
[4] sequence req3: sequencing complete, returned guard
[6] sequence req2: scanning lock table for conflicting locks
[6] sequence req2: sequencing complete, returned guard

finish req=req2
----
[-] finish req2: finishing request

on-lock-acquired req=req3 key=k
----
[-] acquire lock: txn 00000003 @ k

finish req=req3
----
[-] finish req3: finishing request

debug-lock-table
----
global: num=1
lock: "k"
holder: txn: 00000003-0000-0000-0000-000000000000, ts: 0.000000025,0, info: unrepl epoch: 0, seqs: [0]
local: num=0

reset
----
24 changes: 15 additions & 9 deletions pkg/kv/kvserver/replica_send.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,17 +161,9 @@ var _ batchExecutionFn = (*Replica).executeReadOnlyBatch
func (r *Replica) executeBatchWithConcurrencyRetries(
ctx context.Context, ba *roachpb.BatchRequest, fn batchExecutionFn,
) (br *roachpb.BatchResponse, pErr *roachpb.Error) {
// Determine the maximal set of key spans that the batch will operate on.
latchSpans, lockSpans, err := r.collectSpans(ba)
if err != nil {
return nil, roachpb.NewError(err)
}

// Handle load-based splitting.
r.recordBatchForLoadBasedSplitting(ctx, ba, latchSpans)

// Try to execute command; exit retry loop on success.
var g *concurrency.Guard
var latchSpans, lockSpans *spanset.SpanSet
defer func() {
// NB: wrapped to delay g evaluation to its value when returning.
if g != nil {
Expand Down Expand Up @@ -205,6 +197,20 @@ func (r *Replica) executeBatchWithConcurrencyRetries(
// Limit the transaction's maximum timestamp using observed timestamps.
r.limitTxnMaxTimestamp(ctx, ba, status)

// Determine the maximal set of key spans that the batch will operate
// on. We only need to do this once and we make sure to do so after we
// have limited the transaction's maximum timestamp.
if latchSpans == nil {
var err error
latchSpans, lockSpans, err = r.collectSpans(ba)
if err != nil {
return nil, roachpb.NewError(err)
}

// Handle load-based splitting.
r.recordBatchForLoadBasedSplitting(ctx, ba, latchSpans)
}

// Acquire latches to prevent overlapping requests from executing until
// this request completes. After latching, wait on any conflicting locks
// to ensure that the request has full isolation during evaluation. This
Expand Down