Skip to content

Commit

Permalink
Merge pull request #20855 from richardwu/fix-err-indexes-1.1
Browse files Browse the repository at this point in the history
1.1 fix/cherry-pick: fix error index alignment in multiple partial distSender batches
  • Loading branch information
richardwu authored Dec 19, 2017
2 parents c035110 + f362da8 commit 5fe4455
Show file tree
Hide file tree
Showing 6 changed files with 244 additions and 7 deletions.
7 changes: 2 additions & 5 deletions pkg/internal/client/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1063,7 +1063,6 @@ func (txn *Txn) Send(
// verifies that if an EndTransactionRequest is included, then it is the last
// request in the batch.
func firstWriteIndex(ba roachpb.BatchRequest) (int, *roachpb.Error) {
firstWriteIdx := -1
for i, ru := range ba.Requests {
args := ru.GetInner()
if i < len(ba.Requests)-1 /* if not last*/ {
Expand All @@ -1072,12 +1071,10 @@ func firstWriteIndex(ba roachpb.BatchRequest) (int, *roachpb.Error) {
}
}
if roachpb.IsTransactionWrite(args) {
if firstWriteIdx == -1 {
firstWriteIdx = i
}
return i, nil
}
}
return firstWriteIdx, nil
return -1, nil
}

// UpdateStateOnRemoteRetryableErr updates the Txn, and the Transaction proto
Expand Down
18 changes: 18 additions & 0 deletions pkg/kv/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -583,6 +583,8 @@ func (ds *DistSender) Send(
// Such a batch should never need splitting.
panic("batch with MaxSpanRequestKeys needs splitting")
}

errIdxOffset := 0
for len(parts) > 0 {
part := parts[0]
ba.Requests = part
Expand All @@ -594,6 +596,7 @@ func (ds *DistSender) Send(
if err != nil {
return nil, roachpb.NewError(err)
}

rpl, pErr := ds.divideAndSendBatchToRanges(ctx, ba, rs, 0 /* batchIdx */)

if pErr == errNo1PCTxn {
Expand All @@ -607,11 +610,20 @@ func (ds *DistSender) Send(
if len(parts) != 2 {
panic("split of final EndTransaction chunk resulted in != 2 parts")
}
// Restart transaction of the last chunk as two parts
// with EndTransaction in the second part.
continue
}
if pErr != nil {
if pErr.Index != nil && pErr.Index.Index != -1 {
pErr.Index.Index += int32(errIdxOffset)
}

return nil, pErr
}

errIdxOffset += len(ba.Requests)

// Propagate transaction from last reply to next request. The final
// update is taken and put into the response's main header.
ba.UpdateTxn(rpl.Txn)
Expand Down Expand Up @@ -934,6 +946,12 @@ func (ds *DistSender) sendPartialBatch(
return response{reply: reply, positions: positions}
}

// Re-map the error index within this partial batch back
// to its position in the encompassing batch.
if pErr.Index != nil && pErr.Index.Index != -1 && positions != nil {
pErr.Index.Index = int32(positions[pErr.Index.Index])
}

log.ErrEventf(ctx, "reply error %s: %s", ba, pErr)

// Error handling: If the error indicates that our range
Expand Down
177 changes: 175 additions & 2 deletions pkg/kv/dist_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"reflect"
"sort"
"strconv"
"sync/atomic"
"testing"
"time"
Expand All @@ -45,10 +46,12 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/tracing"
)

var testMetaEndKey = roachpb.RKey(keys.SystemPrefix)

var testMetaRangeDescriptor = roachpb.RangeDescriptor{
RangeID: 1,
StartKey: testutils.MakeKey(keys.Meta2Prefix, roachpb.RKey(roachpb.KeyMin)),
EndKey: testutils.MakeKey(keys.Meta2Prefix, roachpb.RKey(roachpb.KeyMax)),
StartKey: roachpb.RKeyMin,
EndKey: testMetaEndKey,
Replicas: []roachpb.ReplicaDescriptor{
{
NodeID: 1,
Expand Down Expand Up @@ -382,6 +385,29 @@ func (mdb MockRangeDescriptorDB) FirstRange() (*roachpb.RangeDescriptor, error)
return &rs[0], nil
}

func mockRangeDescriptorDBForDescs(descs ...roachpb.RangeDescriptor) MockRangeDescriptorDB {
return MockRangeDescriptorDB(func(key roachpb.RKey, useReverseScan bool) ([]roachpb.RangeDescriptor, []roachpb.RangeDescriptor, *roachpb.Error) {
var matchingDescs []roachpb.RangeDescriptor
for _, desc := range descs {
contains := desc.ContainsKey
if useReverseScan {
contains = desc.ContainsKeyInverted
}
if contains(key) {
matchingDescs = append(matchingDescs, desc)
}
}
switch len(matchingDescs) {
case 0:
panic(fmt.Sprintf("found no matching descriptors for key %s", key))
case 1:
return matchingDescs, nil, nil
default:
panic(fmt.Sprintf("found multiple matching descriptors for key %s: %v", key, matchingDescs))
}
})
}

var defaultMockRangeDescriptorDB = MockRangeDescriptorDB(func(key roachpb.RKey, _ bool) ([]roachpb.RangeDescriptor, []roachpb.RangeDescriptor, *roachpb.Error) {
if bytes.HasPrefix(key, keys.Meta2Prefix) {
return []roachpb.RangeDescriptor{testMetaRangeDescriptor}, nil, nil
Expand Down Expand Up @@ -2214,3 +2240,150 @@ func TestGatewayNodeID(t *testing.T) {
t.Errorf("got GatewayNodeID=%d, want %d", observedNodeID, expNodeID)
}
}

// Regression test for #20067.
// If a batch is partitioned into multiple partial batches, the
// roachpb.Error.Index of each batch should correspond to its original index in
// the overall batch.
func TestErrorIndexAlignment(t *testing.T) {
defer leaktest.AfterTest(t)()
stopper := stop.NewStopper()
defer stopper.Stop(context.TODO())

g, clock := makeGossip(t, stopper)

if err := g.SetNodeDescriptor(&roachpb.NodeDescriptor{NodeID: 1}); err != nil {
t.Fatal(err)
}
nd := &roachpb.NodeDescriptor{
NodeID: roachpb.NodeID(1),
Address: util.MakeUnresolvedAddr(testAddress.Network(), testAddress.String()),
}
if err := g.AddInfoProto(gossip.MakeNodeIDKey(roachpb.NodeID(1)), nd, time.Hour); err != nil {
t.Fatal(err)

}

// Fill MockRangeDescriptorDB with two descriptors.
var descriptor1 = roachpb.RangeDescriptor{
RangeID: 2,
StartKey: testMetaEndKey,
EndKey: roachpb.RKey("b"),
Replicas: []roachpb.ReplicaDescriptor{
{
NodeID: 1,
StoreID: 1,
},
},
}
var descriptor2 = roachpb.RangeDescriptor{
RangeID: 3,
StartKey: roachpb.RKey("b"),
EndKey: roachpb.RKey("c"),
Replicas: []roachpb.ReplicaDescriptor{
{
NodeID: 1,
StoreID: 1,
},
},
}
var descriptor3 = roachpb.RangeDescriptor{
RangeID: 4,
StartKey: roachpb.RKey("c"),
EndKey: roachpb.RKeyMax,
Replicas: []roachpb.ReplicaDescriptor{
{
NodeID: 1,
StoreID: 1,
},
},
}

// The 1st partial batch has 1 request.
// The 2nd partial batch has 2 requests.
// The 3rd partial batch has 1 request.
// Each test case returns an error for the first request of the nth
// partial batch.
testCases := []struct {
// The nth request to return an error.
nthPartialBatch int
expectedFinalIdx int32
}{
{0, 0},
{1, 1},
{2, 3},
}

descDB := mockRangeDescriptorDBForDescs(
testMetaRangeDescriptor,
descriptor1,
descriptor2,
descriptor3,
)

for i, tc := range testCases {
t.Run(strconv.Itoa(i), func(t *testing.T) {
nthRequest := 0

var testFn rpcSendFn = func(
_ context.Context,
_ SendOptions,
_ ReplicaSlice,
ba roachpb.BatchRequest,
_ *rpc.Context,
) (*roachpb.BatchResponse, error) {
reply := ba.CreateReply()
if nthRequest == tc.nthPartialBatch {
reply.Error = &roachpb.Error{
// The relative index is always 0 since
// we return an error for the first
// request of the nthPartialBatch.
Index: &roachpb.ErrPosition{Index: 0},
}
}
nthRequest++
return reply, nil
}

cfg := DistSenderConfig{
AmbientCtx: log.AmbientContext{Tracer: tracing.NewTracer()},
Clock: clock,
TestingKnobs: DistSenderTestingKnobs{
TransportFactory: adaptLegacyTransport(testFn),
},
RangeDescriptorDB: descDB,
}
ds := NewDistSender(cfg, g)

var ba roachpb.BatchRequest
ba.Txn = &roachpb.Transaction{Name: "test"}
// First batch has 1 request.
val := roachpb.MakeValueFromString("val")
ba.Add(roachpb.NewPut(roachpb.Key("a"), val))

// Second batch has 2 requests.
val = roachpb.MakeValueFromString("val")
ba.Add(roachpb.NewPut(roachpb.Key("b"), val))
val = roachpb.MakeValueFromString("val")
ba.Add(roachpb.NewPut(roachpb.Key("bb"), val))

// Third batch has 1 request.
val = roachpb.MakeValueFromString("val")
ba.Add(roachpb.NewPut(roachpb.Key("c"), val))

_, pErr := ds.Send(context.Background(), ba)
if pErr == nil {
t.Fatalf("expected an error to be returned from distSender")
}

if pErr.Index == nil {
t.Fatalf("expected error index to be set")
}

if pErr.Index.Index != tc.expectedFinalIdx {
t.Errorf("expected error index to be %d, instead got %d", tc.expectedFinalIdx, pErr.Index.Index)
}
})
}

}
7 changes: 7 additions & 0 deletions pkg/roachpb/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -1212,6 +1212,13 @@ func (rs RSpan) ContainsKey(key RKey) bool {
return bytes.Compare(key, rs.Key) >= 0 && bytes.Compare(key, rs.EndKey) < 0
}

// ContainsKeyInverted returns whether this span contains the specified key. The
// receiver span is considered inverted, meaning that instead of containing the
// range ["key","endKey"), it contains the range ("key","endKey"].
func (rs RSpan) ContainsKeyInverted(key RKey) bool {
return bytes.Compare(key, rs.Key) > 0 && bytes.Compare(key, rs.EndKey) <= 0
}

// ContainsExclusiveEndKey returns whether this span contains the specified key.
// A span is considered to include its EndKey (e.g., span ["a", b") contains
// "b" according to this function, but does not contain "a").
Expand Down
6 changes: 6 additions & 0 deletions pkg/roachpb/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,12 @@ func (r RangeDescriptor) ContainsKey(key RKey) bool {
return r.RSpan().ContainsKey(key)
}

// ContainsKeyInverted returns whether this RangeDescriptor contains the
// specified key using an inverted range. See RSpan.ContainsKeyInverted.
func (r RangeDescriptor) ContainsKeyInverted(key RKey) bool {
return r.RSpan().ContainsKeyInverted(key)
}

// ContainsExclusiveEndKey returns whether this RangeDescriptor contains the specified end key.
func (r RangeDescriptor) ContainsExclusiveEndKey(key RKey) bool {
return r.RSpan().ContainsExclusiveEndKey(key)
Expand Down
36 changes: 36 additions & 0 deletions pkg/sql/logictest/testdata/logic_test/interleaved
Original file line number Diff line number Diff line change
Expand Up @@ -318,3 +318,39 @@ DROP INDEX t1@c;

statement ok
DROP TABLE t1

# Regression test for #20067.

statement ok
CREATE TABLE p20067 (
p_id INT PRIMARY KEY,
name STRING NOT NULL
)

statement ok
CREATE TABLE c20067 (
p_id INT,
c_id INT,
name STRING NOT NULL,
PRIMARY KEY (p_id, c_id),
CONSTRAINT uq_name UNIQUE(name)
) INTERLEAVE IN PARENT p20067 (p_id)

statement ok
BEGIN;
INSERT INTO p20067 VALUES (1, 'John Doe');
INSERT INTO c20067 VALUES (1, 1, 'John Doe Junior');
COMMIT;

statement error duplicate key value \(name\)=\('John Doe Junior'\) violates unique constraint "uq_name"
INSERT INTO c20067 VALUES (2, 1, 'John Doe Junior')

statement error duplicate key value \(name\)=\('John Doe Junior'\) violates unique constraint "uq_name"
BEGIN; INSERT INTO p20067 VALUES (2, 'John Doe'); INSERT INTO c20067 VALUES (2, 1, 'John Doe Junior'); END;

# End the last transaction.
statement ok
END

statement error duplicate key value \(p_id,c_id\)=\(1,1\) violates unique constraint "primary"
INSERT INTO c20067 VALUES (1, 1, 'John Doe')

0 comments on commit 5fe4455

Please sign in to comment.