-
Notifications
You must be signed in to change notification settings - Fork 3.9k
/
Copy pathstore_send.go
452 lines (420 loc) · 16.5 KB
/
store_send.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
// Copyright 2019 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package kvserver
import (
"context"
"time"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/limit"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
)
// Send fetches a range based on the header's replica, assembles method, args &
// reply into a Raft Cmd struct and executes the command using the fetched
// range.
//
// An incoming request may be transactional or not. If it is not transactional,
// the timestamp at which it executes may be higher than that optionally
// specified through the incoming BatchRequest, and it is not guaranteed that
// all operations are written at the same timestamp. If it is transactional, a
// timestamp must not be set - it is deduced automatically from the
// transaction. In particular, the read timestamp will be used for
// all reads and the write (provisional commit) timestamp will be used for
// all writes. See the comments on txn.TxnMeta.Timestamp and txn.ReadTimestamp
// for more details.
//
// Should a transactional operation be forced to a higher timestamp (for
// instance due to the timestamp cache or finding a committed value in the path
// of one of its writes), the response will have a transaction set which should
// be used to update the client transaction object.
func (s *Store) Send(
ctx context.Context, ba roachpb.BatchRequest,
) (br *roachpb.BatchResponse, pErr *roachpb.Error) {
// Attach any log tags from the store to the context (which normally
// comes from gRPC).
ctx = s.AnnotateCtx(ctx)
for _, union := range ba.Requests {
arg := union.GetInner()
header := arg.Header()
if err := verifyKeys(header.Key, header.EndKey, roachpb.IsRange(arg)); err != nil {
return nil, roachpb.NewError(err)
}
}
if res, err := s.maybeThrottleBatch(ctx, ba); err != nil {
return nil, roachpb.NewError(err)
} else if res != nil {
defer res.Release()
}
if ba.BoundedStaleness != nil {
newBa, pErr := s.executeServerSideBoundedStalenessNegotiation(ctx, ba)
if pErr != nil {
return nil, pErr
}
ba = newBa
}
if err := ba.SetActiveTimestamp(s.Clock().Now); err != nil {
return nil, roachpb.NewError(err)
}
// Update our clock with the incoming request timestamp. This advances the
// local node's clock to a high water mark from all nodes with which it has
// interacted.
if baClockTS, ok := ba.Timestamp.TryToClockTimestamp(); ok {
if s.cfg.TestingKnobs.DisableMaxOffsetCheck {
s.cfg.Clock.Update(baClockTS)
} else {
// If the command appears to come from a node with a bad clock,
// reject it instead of updating the local clock and proceeding.
if err := s.cfg.Clock.UpdateAndCheckMaxOffset(ctx, baClockTS); err != nil {
return nil, roachpb.NewError(err)
}
}
}
defer func() {
if r := recover(); r != nil {
// On panic, don't run the defer. It's probably just going to panic
// again due to undefined state.
panic(r)
}
if ba.Txn != nil {
// We're in a Txn, so we can reduce uncertainty restarts by attaching
// the above timestamp to the returned response or error. The caller
// can use it to shorten its uncertainty interval when it comes back to
// this node.
if pErr != nil {
pErr.OriginNode = s.NodeID()
if txn := pErr.GetTxn(); txn == nil {
pErr.SetTxn(ba.Txn)
}
} else {
if br.Txn == nil {
br.Txn = ba.Txn
}
// Update our clock with the outgoing response txn timestamp
// (if timestamp has been forwarded).
if ba.Timestamp.Less(br.Txn.WriteTimestamp) {
if clockTS, ok := br.Txn.WriteTimestamp.TryToClockTimestamp(); ok {
s.cfg.Clock.Update(clockTS)
}
}
}
} else {
if pErr == nil {
// Update our clock with the outgoing response timestamp.
// (if timestamp has been forwarded).
if ba.Timestamp.Less(br.Timestamp) {
if clockTS, ok := br.Timestamp.TryToClockTimestamp(); ok {
s.cfg.Clock.Update(clockTS)
}
}
}
}
// We get the latest timestamp - we know that any
// write with a higher timestamp we run into later must
// have started after this point in (absolute) time.
now := s.cfg.Clock.NowAsClockTimestamp()
if pErr != nil {
pErr.Now = now
} else {
br.Now = now
}
}()
if ba.Txn != nil {
// We make our transaction aware that no other operation that causally
// precedes it could have started after `now`. This is important: If we
// wind up pushing a value, it will be in our immediate future, and not
// updating the top end of our uncertainty timestamp would lead to a
// restart (at least in the absence of a prior observed timestamp from
// this node, in which case the following is a no-op).
if _, ok := ba.Txn.GetObservedTimestamp(s.NodeID()); !ok {
txnClone := ba.Txn.Clone()
txnClone.UpdateObservedTimestamp(s.NodeID(), s.Clock().NowAsClockTimestamp())
ba.Txn = txnClone
}
}
if log.ExpensiveLogEnabled(ctx, 1) {
log.Eventf(ctx, "executing %s", ba)
}
// Get range and add command to the range for execution.
repl, err := s.GetReplica(ba.RangeID)
if err != nil {
return nil, roachpb.NewError(err)
}
if !repl.IsInitialized() {
repl.mu.RLock()
replicaID := repl.mu.replicaID
repl.mu.RUnlock()
// If we have an uninitialized copy of the range, then we are
// probably a valid member of the range, we're just in the
// process of getting our snapshot. If we returned
// RangeNotFoundError, the client would invalidate its cache,
// but we can be smarter: the replica that caused our
// uninitialized replica to be created is most likely the
// leader.
return nil, roachpb.NewError(&roachpb.NotLeaseHolderError{
RangeID: ba.RangeID,
LeaseHolder: repl.creatingReplica,
// The replica doesn't have a range descriptor yet, so we have to build
// a ReplicaDescriptor manually.
Replica: roachpb.ReplicaDescriptor{
NodeID: repl.store.nodeDesc.NodeID,
StoreID: repl.store.StoreID(),
ReplicaID: replicaID,
},
})
}
br, pErr = repl.Send(ctx, ba)
if pErr == nil {
return br, nil
}
// Augment error if necessary and return.
switch t := pErr.GetDetail().(type) {
case *roachpb.RangeKeyMismatchError:
// TODO(andrei): It seems silly that, if the client specified a RangeID that
// doesn't match the keys it wanted to access, but this node can serve those
// keys anyway, we still return a RangeKeyMismatchError to the client
// instead of serving the request. Particularly since we have the mechanism
// to communicate correct range information to the client when returning a
// successful response (i.e. br.RangeInfos).
// On a RangeKeyMismatchError where the batch didn't even overlap
// the start of the mismatched Range, try to suggest a more suitable
// Range from this Store.
rSpan, err := keys.Range(ba.Requests)
if err != nil {
return nil, roachpb.NewError(err)
}
// The kvclient thought that a particular range id covers rSpans. It was
// wrong; the respective range doesn't cover all of rSpan, or perhaps it
// doesn't even overlap it. Clearly the client has a stale range cache.
// We'll return info on the range that the request ended up being routed to
// and, to the extent that we have the info, the ranges containing the keys
// that the client requested, and all the ranges in between.
ri, err := t.MismatchedRange()
if err != nil {
return nil, roachpb.NewError(err)
}
skipRID := ri.Desc.RangeID // We already have info on one range, so don't add it again below.
startKey := ri.Desc.StartKey
if rSpan.Key.Less(startKey) {
startKey = rSpan.Key
}
endKey := ri.Desc.EndKey
if endKey.Less(rSpan.EndKey) {
endKey = rSpan.EndKey
}
var ris []roachpb.RangeInfo
if err := s.visitReplicasByKey(ctx, startKey, endKey, AscendingKeyOrder, func(ctx context.Context, repl *Replica) error {
// Note that we return the lease even if it's expired. The kvclient can
// use it as it sees fit.
ri := repl.GetRangeInfo(ctx)
if ri.Desc.RangeID == skipRID {
return nil
}
ris = append(ris, ri)
return nil
}); err != nil {
// Errors here should not be possible, but if there is one, it is ignored
// as attaching RangeInfo is optional.
log.Warningf(ctx, "unexpected error visiting replicas: %s", err)
ris = nil // just to be safe
}
for _, ri := range ris {
t.AppendRangeInfo(ctx, ri.Desc, ri.Lease)
}
// We have to write `t` back to `pErr` so that it picks up the changes.
pErr = roachpb.NewError(t)
case *roachpb.RaftGroupDeletedError:
// This error needs to be converted appropriately so that clients
// will retry.
err := roachpb.NewRangeNotFoundError(repl.RangeID, repl.store.StoreID())
pErr = roachpb.NewError(err)
}
return nil, pErr
}
// maybeThrottleBatch inspects the provided batch and determines whether
// throttling should be applied to avoid overloading the Store. If so, the
// method blocks and returns a reservation that must be released after the
// request has completed.
//
// Of note is that request throttling is all performed above evaluation and
// before a request acquires latches on a range. Otherwise, the request could
// inadvertently block others while being throttled.
func (s *Store) maybeThrottleBatch(
ctx context.Context, ba roachpb.BatchRequest,
) (limit.Reservation, error) {
if !ba.IsSingleRequest() {
return nil, nil
}
switch t := ba.Requests[0].GetInner().(type) {
case *roachpb.AddSSTableRequest:
limiter := s.limiters.ConcurrentAddSSTableRequests
if t.IngestAsWrites {
limiter = s.limiters.ConcurrentAddSSTableAsWritesRequests
}
before := timeutil.Now()
res, err := limiter.Begin(ctx)
if err != nil {
return nil, err
}
beforeEngineDelay := timeutil.Now()
s.engine.PreIngestDelay(ctx)
after := timeutil.Now()
waited, waitedEngine := after.Sub(before), after.Sub(beforeEngineDelay)
s.metrics.AddSSTableProposalTotalDelay.Inc(waited.Nanoseconds())
s.metrics.AddSSTableProposalEngineDelay.Inc(waitedEngine.Nanoseconds())
if waited > time.Second {
log.Infof(ctx, "SST ingestion was delayed by %v (%v for storage engine back-pressure)",
waited, waitedEngine)
}
return res, nil
case *roachpb.ExportRequest:
// Limit the number of concurrent Export requests, as these often scan and
// entire Range at a time and place significant read load on a Store.
before := timeutil.Now()
res, err := s.limiters.ConcurrentExportRequests.Begin(ctx)
if err != nil {
return nil, err
}
waited := timeutil.Since(before)
s.metrics.ExportRequestProposalTotalDelay.Inc(waited.Nanoseconds())
if waited > time.Second {
log.Infof(ctx, "Export request was delayed by %v", waited)
}
return res, nil
case *roachpb.ScanInterleavedIntentsRequest:
before := timeutil.Now()
res, err := s.limiters.ConcurrentScanInterleavedIntents.Begin(ctx)
if err != nil {
return nil, err
}
waited := timeutil.Since(before)
if waited > time.Second {
log.Infof(ctx, "ScanInterleavedIntents request was delayed by %v", waited)
}
return res, nil
default:
return nil, nil
}
}
// executeServerSideBoundedStalenessNegotiation performs the server-side
// negotiation fast-path for bounded staleness read requests. This fast-path
// allows a bounded staleness read request that lands on a single range to
// perform its negotiation phase and execution phase in a single RPC.
//
// The server-side negotiation fast-path provides two benefits:
// 1. it avoids two network hops in the common-case where a bounded staleness
// read is targeting a single range. This in an important performance
// optimization for single-row point lookups.
// 2. it provides stronger guarantees around minimizing staleness during bounded
// staleness reads. Bounded staleness reads that hit the server-side
// fast-path use their target replica's most up-to-date resolved timestamp,
// so they are as fresh as possible. Bounded staleness reads that miss the
// fast-path and perform explicit negotiation (see below) consult a cache, so
// they may use an out-of-date, suboptimal resolved timestamp, as long as it
// is fresh enough to satisfy the staleness bound of the request.
//
// The method should be called for requests that have their MinTimestampBound
// field set, which indicates that the request wants a dynamic timestamp equal
// to the resolved timestamp over its key span on the local replica. Setting the
// request timestamp to the local resolved timestamp ensures that the request
// will not block on replication or on conflicting transactions when executed.
// If the method returns successfully, the new request will have its
// MinTimestampBound field unset and its Timestamp field set to the negotiated
// timestamp.
//
// If the local resolved timestamp is below the request's MinTimestampBound,
// then a MinTimestampBoundUnsatisfiableError will be returned if the request
// has its MinTimestampBoundStrict flag set to true. Otherwise, the request's
// timestamp will be set to the MinTimestampBound and allowed to execute.
//
// For more information, see the "Server-side negotiation fast-path" section of
// docs/RFCS/20210519_bounded_staleness_reads.md.
func (s *Store) executeServerSideBoundedStalenessNegotiation(
ctx context.Context, ba roachpb.BatchRequest,
) (roachpb.BatchRequest, *roachpb.Error) {
if ba.BoundedStaleness == nil {
log.Fatal(ctx, "BoundedStaleness header required for server-side negotiation fast-path")
}
cfg := ba.BoundedStaleness
if cfg.MinTimestampBound.IsEmpty() {
return ba, roachpb.NewError(errors.AssertionFailedf(
"MinTimestampBound must be set in batch"))
}
if !cfg.MaxTimestampBound.IsEmpty() && cfg.MaxTimestampBound.LessEq(cfg.MinTimestampBound) {
return ba, roachpb.NewError(errors.AssertionFailedf(
"MaxTimestampBound, if set in batch, must be greater than MinTimestampBound"))
}
if !ba.Timestamp.IsEmpty() {
return ba, roachpb.NewError(errors.AssertionFailedf(
"MinTimestampBound and Timestamp cannot both be set in batch"))
}
if ba.Txn != nil {
return ba, roachpb.NewError(errors.AssertionFailedf(
"MinTimestampBound and Txn cannot both be set in batch"))
}
// Use one or more QueryResolvedTimestampRequests to compute a resolved
// timestamp over the read spans on the local replica.
var queryResBa roachpb.BatchRequest
queryResBa.RangeID = ba.RangeID
queryResBa.Replica = ba.Replica
queryResBa.ClientRangeInfo = ba.ClientRangeInfo
queryResBa.ReadConsistency = roachpb.INCONSISTENT
for _, ru := range ba.Requests {
span := ru.GetInner().Header().Span()
if len(span.EndKey) == 0 {
// QueryResolvedTimestamp is a ranged operation.
span.EndKey = span.Key.Next()
}
queryResBa.Add(&roachpb.QueryResolvedTimestampRequest{
RequestHeader: roachpb.RequestHeaderFromSpan(span),
})
}
br, pErr := s.Send(ctx, queryResBa)
if pErr != nil {
return ba, pErr
}
// Merge the resolved timestamps together and verify that the bounded
// staleness read can be satisfied by the local replica, according to
// its minimum timestamp bound.
var resTS hlc.Timestamp
for _, ru := range br.Responses {
ts := ru.GetQueryResolvedTimestamp().ResolvedTS
if resTS.IsEmpty() {
resTS = ts
} else {
resTS.Backward(ts)
}
}
if resTS.Less(cfg.MinTimestampBound) {
// The local resolved timestamp was below the request's minimum timestamp
// bound. If the minimum timestamp bound should be strictly obeyed, reject
// the batch. Otherwise, consider the minimum timestamp bound to be the
// request timestamp and let the request proceed. On follower replicas, this
// may result in the request being redirected (with a NotLeaseholderError)
// to the current leaseholder. On the leaseholder, this may result in the
// request blocking on conflicting transactions.
if cfg.MinTimestampBoundStrict {
return ba, roachpb.NewError(roachpb.NewMinTimestampBoundUnsatisfiableError(
cfg.MinTimestampBound, resTS,
))
}
resTS = cfg.MinTimestampBound
}
if !cfg.MaxTimestampBound.IsEmpty() && cfg.MaxTimestampBound.LessEq(resTS) {
// The local resolved timestamp was above the request's maximum timestamp
// bound. Drop the request timestamp to the maximum timestamp bound.
resTS = cfg.MaxTimestampBound.Prev()
}
ba.Timestamp = resTS
ba.BoundedStaleness = nil
return ba, nil
}