Skip to content

Commit

Permalink
Revert "kvstreamer: reuse incomplete Get requests on resume batches"
Browse files Browse the repository at this point in the history
This reverts commit 21f2390.

Previously, I didn't realize that the KV layer would modify all requests
included into the BatchRequest in `txnSeqNumAllocator`, so we cannot
reuse even incomplete GetRequests. It is unfortunate, but not a big
deal.

Release note: None
  • Loading branch information
yuzefovich committed Jul 9, 2022
1 parent ddb5c3d commit 516553d
Showing 1 changed file with 16 additions and 12 deletions.
28 changes: 16 additions & 12 deletions pkg/kv/kvclient/kvstreamer/streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1567,6 +1567,14 @@ func buildResumeSingleRangeBatch(
// requests with the ResumeSpans.
resumeReq.reqsReservedBytes = fp.resumeReqsMemUsage
resumeReq.overheadAccountedFor = req.overheadAccountedFor
// Note that due to limitations of the KV layer (#75452) we cannot reuse
// original requests because the KV doesn't allow mutability (and all
// requests are modified by txnSeqNumAllocator, even if they are not
// evaluated due to TargetBytes limit).
gets := make([]struct {
req roachpb.GetRequest
union roachpb.RequestUnion_Get
}, fp.numIncompleteGets)
scans := make([]struct {
req roachpb.ScanRequest
union roachpb.RequestUnion_Scan
Expand All @@ -1583,18 +1591,14 @@ func buildResumeSingleRangeBatch(
emptyResponse = false
continue
}
// This Get wasn't completed - include it into the batch again (we
// can just reuse the original request since it hasn't been
// modified which is also asserted below).
if buildutil.CrdbTestBuild {
if origSpan := req.reqs[i].GetInner().Header().Span(); !get.ResumeSpan.Equal(origSpan) {
panic(errors.AssertionFailedf(
"unexpectedly the ResumeSpan %s on the GetResponse is different from the original span %s",
get.ResumeSpan, origSpan,
))
}
}
resumeReq.reqs[resumeReqIdx] = req.reqs[i]
// This Get wasn't completed - create a new request according to the
// ResumeSpan and include it into the batch.
newGet := gets[0]
gets = gets[1:]
newGet.req.SetSpan(*get.ResumeSpan)
newGet.req.KeyLocking = s.keyLocking
newGet.union.Get = &newGet.req
resumeReq.reqs[resumeReqIdx].Value = &newGet.union
resumeReq.positions = append(resumeReq.positions, position)
if req.subRequestIdx != nil {
resumeReq.subRequestIdx = append(resumeReq.subRequestIdx, req.subRequestIdx[i])
Expand Down

0 comments on commit 516553d

Please sign in to comment.