-
Notifications
You must be signed in to change notification settings - Fork 3.9k
/
Copy pathmock_transactional_sender.go
242 lines (201 loc) · 7.93 KB
/
mock_transactional_sender.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
// Copyright 2019 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package kv
import (
"context"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/errors"
)
// MockTransactionalSender allows a function to be used as a TxnSender.
type MockTransactionalSender struct {
senderFunc func(
context.Context, *roachpb.Transaction, roachpb.BatchRequest,
) (*roachpb.BatchResponse, *roachpb.Error)
txn roachpb.Transaction
}
// NewMockTransactionalSender creates a MockTransactionalSender.
// The passed in txn is cloned.
func NewMockTransactionalSender(
f func(
context.Context, *roachpb.Transaction, roachpb.BatchRequest,
) (*roachpb.BatchResponse, *roachpb.Error),
txn *roachpb.Transaction,
) *MockTransactionalSender {
return &MockTransactionalSender{senderFunc: f, txn: *txn}
}
// Send is part of the TxnSender interface.
func (m *MockTransactionalSender) Send(
ctx context.Context, ba roachpb.BatchRequest,
) (*roachpb.BatchResponse, *roachpb.Error) {
return m.senderFunc(ctx, &m.txn, ba)
}
// GetLeafTxnInputState is part of the TxnSender interface.
func (m *MockTransactionalSender) GetLeafTxnInputState(
context.Context, TxnStatusOpt,
) (roachpb.LeafTxnInputState, error) {
panic("unimplemented")
}
// GetLeafTxnFinalState is part of the TxnSender interface.
func (m *MockTransactionalSender) GetLeafTxnFinalState(
context.Context, TxnStatusOpt,
) (roachpb.LeafTxnFinalState, error) {
panic("unimplemented")
}
// UpdateRootWithLeafFinalState is part of the TxnSender interface.
func (m *MockTransactionalSender) UpdateRootWithLeafFinalState(
context.Context, *roachpb.LeafTxnFinalState,
) {
panic("unimplemented")
}
// AnchorOnSystemConfigRange is part of the TxnSender interface.
func (m *MockTransactionalSender) AnchorOnSystemConfigRange() error {
return errors.New("unimplemented")
}
// TxnStatus is part of the TxnSender interface.
func (m *MockTransactionalSender) TxnStatus() roachpb.TransactionStatus {
return m.txn.Status
}
// SetUserPriority is part of the TxnSender interface.
func (m *MockTransactionalSender) SetUserPriority(pri roachpb.UserPriority) error {
m.txn.Priority = roachpb.MakePriority(pri)
return nil
}
// SetDebugName is part of the TxnSender interface.
func (m *MockTransactionalSender) SetDebugName(name string) {
m.txn.Name = name
}
// String is part of the TxnSender interface.
func (m *MockTransactionalSender) String() string {
return m.txn.String()
}
// ReadTimestamp is part of the TxnSender interface.
func (m *MockTransactionalSender) ReadTimestamp() hlc.Timestamp {
return m.txn.ReadTimestamp
}
// ProvisionalCommitTimestamp is part of the TxnSender interface.
func (m *MockTransactionalSender) ProvisionalCommitTimestamp() hlc.Timestamp {
return m.txn.WriteTimestamp
}
// CommitTimestamp is part of the TxnSender interface.
func (m *MockTransactionalSender) CommitTimestamp() hlc.Timestamp {
return m.txn.ReadTimestamp
}
// CommitTimestampFixed is part of the TxnSender interface.
func (m *MockTransactionalSender) CommitTimestampFixed() bool {
panic("unimplemented")
}
// SetFixedTimestamp is part of the TxnSender interface.
func (m *MockTransactionalSender) SetFixedTimestamp(_ context.Context, ts hlc.Timestamp) {
m.txn.WriteTimestamp = ts
m.txn.ReadTimestamp = ts
m.txn.MaxTimestamp = ts
m.txn.CommitTimestampFixed = true
// Set the MinTimestamp to the minimum of the existing MinTimestamp and the fixed
// timestamp. This ensures that the MinTimestamp is always <= the other timestamps.
m.txn.MinTimestamp.Backward(ts)
}
// ManualRestart is part of the TxnSender interface.
func (m *MockTransactionalSender) ManualRestart(
ctx context.Context, pri roachpb.UserPriority, ts hlc.Timestamp,
) {
m.txn.Restart(pri, 0 /* upgradePriority */, ts)
}
// IsSerializablePushAndRefreshNotPossible is part of the TxnSender interface.
func (m *MockTransactionalSender) IsSerializablePushAndRefreshNotPossible() bool {
return false
}
// CreateSavepoint is part of the client.TxnSender interface.
func (m *MockTransactionalSender) CreateSavepoint(context.Context) (SavepointToken, error) {
panic("unimplemented")
}
// RollbackToSavepoint is part of the client.TxnSender interface.
func (m *MockTransactionalSender) RollbackToSavepoint(context.Context, SavepointToken) error {
panic("unimplemented")
}
// ReleaseSavepoint is part of the client.TxnSender interface.
func (m *MockTransactionalSender) ReleaseSavepoint(context.Context, SavepointToken) error {
panic("unimplemented")
}
// Epoch is part of the TxnSender interface.
func (m *MockTransactionalSender) Epoch() enginepb.TxnEpoch { panic("unimplemented") }
// TestingCloneTxn is part of the TxnSender interface.
func (m *MockTransactionalSender) TestingCloneTxn() *roachpb.Transaction {
return m.txn.Clone()
}
// Active is part of the TxnSender interface.
func (m *MockTransactionalSender) Active() bool {
panic("unimplemented")
}
// UpdateStateOnRemoteRetryableErr is part of the TxnSender interface.
func (m *MockTransactionalSender) UpdateStateOnRemoteRetryableErr(
ctx context.Context, pErr *roachpb.Error,
) *roachpb.Error {
panic("unimplemented")
}
// DisablePipelining is part of the client.TxnSender interface.
func (m *MockTransactionalSender) DisablePipelining() error { return nil }
// PrepareRetryableError is part of the client.TxnSender interface.
func (m *MockTransactionalSender) PrepareRetryableError(ctx context.Context, msg string) error {
return roachpb.NewTransactionRetryWithProtoRefreshError(msg, m.txn.ID, *m.txn.Clone())
}
// Step is part of the TxnSender interface.
func (m *MockTransactionalSender) Step(_ context.Context) error {
// At least one test (e.g sql/TestPortalsDestroyedOnTxnFinish) requires
// the ability to run simple statements that do not access storage,
// and that requires a non-panicky Step().
return nil
}
// ConfigureStepping is part of the TxnSender interface.
func (m *MockTransactionalSender) ConfigureStepping(context.Context, SteppingMode) SteppingMode {
// See Step() above.
return SteppingDisabled
}
// GetSteppingMode is part of the TxnSender interface.
func (m *MockTransactionalSender) GetSteppingMode(context.Context) SteppingMode {
return SteppingDisabled
}
// NewChildTransaction is part of the TxnSender interface.
func (m *MockTransactionalSender) NewChildTransaction() *roachpb.Transaction {
panic("unimplemented")
}
// MockTxnSenderFactory is a TxnSenderFactory producing MockTxnSenders.
type MockTxnSenderFactory struct {
senderFunc func(context.Context, *roachpb.Transaction, roachpb.BatchRequest) (
*roachpb.BatchResponse, *roachpb.Error)
}
var _ TxnSenderFactory = MockTxnSenderFactory{}
// MakeMockTxnSenderFactory creates a MockTxnSenderFactory from a sender
// function that receives the transaction in addition to the request. The
// function is responsible for putting the txn inside the batch, if needed.
func MakeMockTxnSenderFactory(
senderFunc func(
context.Context, *roachpb.Transaction, roachpb.BatchRequest,
) (*roachpb.BatchResponse, *roachpb.Error),
) MockTxnSenderFactory {
return MockTxnSenderFactory{
senderFunc: senderFunc,
}
}
// RootTransactionalSender is part of TxnSenderFactory.
func (f MockTxnSenderFactory) RootTransactionalSender(
txn *roachpb.Transaction, _ roachpb.UserPriority,
) TxnSender {
return NewMockTransactionalSender(f.senderFunc, txn)
}
// LeafTransactionalSender is part of TxnSenderFactory.
func (f MockTxnSenderFactory) LeafTransactionalSender(tis *roachpb.LeafTxnInputState) TxnSender {
return NewMockTransactionalSender(f.senderFunc, &tis.Txn)
}
// NonTransactionalSender is part of TxnSenderFactory.
func (f MockTxnSenderFactory) NonTransactionalSender() Sender {
return nil
}