-
Notifications
You must be signed in to change notification settings - Fork 3.9k
/
Copy pathtxn_lock_gatekeeper.go
92 lines (84 loc) · 3.6 KB
/
txn_lock_gatekeeper.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
// Copyright 2019 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package kvcoord
import (
"context"
"sync"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/errors"
)
// lockedSender is like a client.Sender but requires the caller to hold the
// TxnCoordSender lock to send requests.
type lockedSender interface {
// SendLocked sends the batch request and receives a batch response. It
// requires that the TxnCoordSender lock be held when called, but this lock
// is not held for the entire duration of the call. Instead, the lock is
// released immediately before the batch is sent to a lower-level Sender and
// is re-acquired when the response is returned.
// WARNING: because the lock is released when calling this method and
// re-acquired before it returned, callers cannot rely on a single mutual
// exclusion zone mainted across the call.
SendLocked(context.Context, roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error)
}
// txnLockGatekeeper is a lockedSender that sits at the bottom of the
// TxnCoordSender's interceptor stack and handles unlocking the TxnCoordSender's
// mutex when sending a request and locking the TxnCoordSender's mutex when
// receiving a response. It allows the entire txnInterceptor stack to operate
// under lock without needing to worry about unlocking at the correct time.
type txnLockGatekeeper struct {
wrapped kv.Sender
mu sync.Locker // shared with TxnCoordSender
knobs *ClientTestingKnobs
// If set, concurrent requests are allowed. If not set, concurrent requests
// result in an assertion error. Only leaf transactions are supposed allow
// concurrent requests - leaves don't restart the transaction and they don't
// bump the read timestamp through refreshes.
allowConcurrentRequests bool
// requestInFlight is set while a request is being processed by the wrapped
// sender. Used to detect and prevent concurrent txn use.
requestInFlight bool
}
// SendLocked implements the lockedSender interface.
func (gs *txnLockGatekeeper) SendLocked(
ctx context.Context, ba roachpb.BatchRequest,
) (*roachpb.BatchResponse, *roachpb.Error) {
// If so configured, protect against concurrent use of the txn. Concurrent
// requests don't work generally because of races between clients sending
// requests and the TxnCoordSender restarting the transaction, and also
// concurrent requests are not compatible with the span refresher in
// particular since refreshing is invalid if done concurrently with requests
// in flight whose spans haven't been accounted for.
//
// As a special case, allow for async rollbacks and heartbeats to be sent
// whenever.
if !gs.allowConcurrentRequests {
asyncRequest := ba.IsSingleAbortTxnRequest() || ba.IsSingleHeartbeatTxnRequest()
if !asyncRequest {
if gs.requestInFlight {
return nil, roachpb.NewError(
errors.AssertionFailedf("concurrent txn use detected. ba: %s", ba))
}
gs.requestInFlight = true
defer func() {
gs.requestInFlight = false
}()
}
}
// Note the funky locking here: we unlock for the duration of the call and the
// lock again.
gs.mu.Unlock()
defer gs.mu.Lock()
sender := gs.wrapped
if intercept := gs.knobs.TxnRequestInterceptorFactory; intercept != nil {
sender = intercept(sender)
}
return sender.Send(ctx, ba)
}