Skip to content

Commit

Permalink
Merge pull request #15260 from RaduBerinde/fix-span-panic
Browse files Browse the repository at this point in the history
distsql: fix non-consecutive span planner crash
  • Loading branch information
RaduBerinde authored Apr 23, 2017
2 parents a9aebe9 + d91a9c3 commit 1227734
Show file tree
Hide file tree
Showing 7 changed files with 219 additions and 59 deletions.
6 changes: 6 additions & 0 deletions pkg/kv/range_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,12 @@ func (ri *RangeIterator) Seek(ctx context.Context, key roachpb.RKey, scanDir Sca
ri.pErr = nil // clear any prior error
ri.key = key // set the key

if (scanDir == Ascending && key.Equal(roachpb.RKeyMax)) ||
(scanDir == Descending && key.Equal(roachpb.RKeyMin)) {
ri.pErr = roachpb.NewErrorf("RangeIterator seek to invalid key %s", key)
return
}

// Retry loop for looking up next range in the span. The retry loop
// deals with retryable range descriptor lookups.
for r := retry.StartWithCtx(ctx, ri.ds.rpcRetryOptions); r.Next(); {
Expand Down
75 changes: 52 additions & 23 deletions pkg/server/testserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,47 +493,76 @@ func (ts *TestServer) LookupRange(key roachpb.Key) (roachpb.RangeDescriptor, err
func (ts *TestServer) SplitRange(
splitKey roachpb.Key,
) (roachpb.RangeDescriptor, roachpb.RangeDescriptor, error) {
ctx := context.Background()
splitRKey, err := keys.Addr(splitKey)
if err != nil {
return roachpb.RangeDescriptor{}, roachpb.RangeDescriptor{}, err
}
origRangeDesc, err := ts.LookupRange(splitKey)
if err != nil {
return roachpb.RangeDescriptor{}, roachpb.RangeDescriptor{}, err
}
if origRangeDesc.StartKey.Equal(splitRKey) {
return roachpb.RangeDescriptor{}, roachpb.RangeDescriptor{},
errors.Errorf(
"cannot split range %+v at start key %q", origRangeDesc, splitKey)
}
splitReq := roachpb.AdminSplitRequest{
Span: roachpb.Span{
Key: splitKey,
},
SplitKey: splitKey,
}
_, pErr := client.SendWrapped(context.Background(), ts.DistSender(), &splitReq)
_, pErr := client.SendWrapped(ctx, ts.DistSender(), &splitReq)
if pErr != nil {
return roachpb.RangeDescriptor{}, roachpb.RangeDescriptor{},
errors.Errorf(
"%q: split unexpected error: %s", splitReq.SplitKey, pErr)
}

// The split point may not be exactly at the key we requested (we request
// splits at valid table keys, and the split point corresponds to the row's
// prefix). We scan for the range that includes the key we requested and the
// one that precedes it.

// We use a transaction so that we get consistent results between the two
// scans (in case there are other splits happening).
var leftRangeDesc, rightRangeDesc roachpb.RangeDescriptor
if err := ts.DB().GetProto(context.TODO(),
keys.RangeDescriptorKey(origRangeDesc.StartKey), &leftRangeDesc); err != nil {
return roachpb.RangeDescriptor{}, roachpb.RangeDescriptor{},
errors.Wrap(err, "could not look up left-hand side descriptor")
}
// The split point might not be exactly the one we requested (it can be
// adjusted slightly so we don't split in the middle of SQL rows). Update it
// to the real point.
splitRKey = leftRangeDesc.EndKey
if err := ts.DB().GetProto(context.TODO(),
keys.RangeDescriptorKey(splitRKey), &rightRangeDesc); err != nil {
return roachpb.RangeDescriptor{}, roachpb.RangeDescriptor{},
errors.Wrap(err, "could not look up right-hand side descriptor")
if err := ts.DB().Txn(ctx, func(ctx context.Context, txn *client.Txn) error {
scanMeta := func(key roachpb.RKey, reverse bool) (desc roachpb.RangeDescriptor, err error) {
var kvs []client.KeyValue
if reverse {
// Find the last range that ends at or before key.
kvs, err = txn.ReverseScan(
ctx, keys.Meta2Prefix, keys.RangeMetaKey(key.Next()), 1, /* one result */
)
} else {
// Find the first range that ends after key.
kvs, err = txn.Scan(
ctx, keys.RangeMetaKey(key.Next()), keys.Meta2Prefix.PrefixEnd(), 1, /* one result */
)
}
if err != nil {
return desc, err
}
if len(kvs) != 1 {
return desc, fmt.Errorf("expected 1 result, got %d", len(kvs))
}
err = kvs[0].ValueProto(&desc)
return desc, err
}

rightRangeDesc, err = scanMeta(splitRKey, false /* !reverse */)
if err != nil {
return errors.Wrap(err, "could not look up right-hand side descriptor")
}

leftRangeDesc, err = scanMeta(splitRKey, true /* reverse */)
if err != nil {
return errors.Wrap(err, "could not look up left-hand side descriptor")
}

if !leftRangeDesc.EndKey.Equal(rightRangeDesc.StartKey) {
return errors.Errorf(
"inconsistent left (%v) and right (%v) descriptors", leftRangeDesc, rightRangeDesc,
)
}
return nil
}); err != nil {
return roachpb.RangeDescriptor{}, roachpb.RangeDescriptor{}, err
}

return leftRangeDesc, rightRangeDesc, nil
}

Expand Down
36 changes: 20 additions & 16 deletions pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,8 @@ func (dsp *distSQLPlanner) partitionSpans(
}

var lastNodeID roachpb.NodeID
// lastKey maintains the EndKey of the last piece of `span`.
lastKey := rspan.Key
for it.Seek(ctx, span, kv.Ascending); ; it.Next(ctx) {
if !it.Valid() {
return nil, it.Error()
Expand All @@ -415,16 +417,15 @@ func (dsp *distSQLPlanner) partitionSpans(
}
desc := it.Desc()

var trimmedSpan roachpb.Span
if rspan.Key.Less(desc.StartKey) {
trimmedSpan.Key = desc.StartKey.AsRawKey()
} else {
trimmedSpan.Key = span.Key
if !desc.ContainsKey(lastKey) {
// This range must contain the last range's EndKey.
log.Fatalf(ctx, "next range doesn't cover last end key: %#v %v", splits, desc.RSpan())
}
if desc.EndKey.Less(rspan.EndKey) {
trimmedSpan.EndKey = desc.EndKey.AsRawKey()
} else {
trimmedSpan.EndKey = span.EndKey

// Limit the end key to the end of the span we are resolving.
endKey := desc.EndKey
if rspan.EndKey.Less(endKey) {
endKey = rspan.EndKey
}

nodeID := replInfo.NodeDesc.NodeID
Expand All @@ -441,18 +442,21 @@ func (dsp *distSQLPlanner) partitionSpans(

if lastNodeID == nodeID {
// Two consecutive ranges on the same node, merge the spans.
if !split.spans[len(split.spans)-1].EndKey.Equal(trimmedSpan.Key) {
log.Fatalf(ctx, "expected consecutive span pieces %v %v", split.spans, trimmedSpan)
}
split.spans[len(split.spans)-1].EndKey = trimmedSpan.EndKey
split.spans[len(split.spans)-1].EndKey = endKey.AsRawKey()
} else {
split.spans = append(split.spans, trimmedSpan)
split.spans = append(split.spans, roachpb.Span{
Key: lastKey.AsRawKey(),
EndKey: endKey.AsRawKey(),
})
}

lastNodeID = nodeID
if !it.NeedAnother() {
if !endKey.Less(rspan.EndKey) {
// Done.
break
}

lastKey = endKey
lastNodeID = nodeID
}
}
return splits, nil
Expand Down
130 changes: 127 additions & 3 deletions pkg/sql/distsql_physical_planner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@
package sql

import (
gosql "database/sql"
"fmt"
"net/url"
"strings"
"sync"
"testing"
"time"

Expand All @@ -31,12 +34,14 @@ import (
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/sql/distsqlrun"
"github.com/cockroachdb/cockroach/pkg/sql/parser"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
)

// SplitTable splits a range in the table, creates a replica for the right
Expand Down Expand Up @@ -78,6 +83,128 @@ func SplitTable(
}
}

// TestPlanningDuringSplits verifies that table reader planning (resolving
// spans) tolerates concurrent splits.
func TestPlanningDuringSplits(t *testing.T) {
defer leaktest.AfterTest(t)()

const n = 100
const numNodes = 1
tc := serverutils.StartTestCluster(t, numNodes, base.TestClusterArgs{
ServerArgs: base.TestServerArgs{UseDatabase: "test"},
})

defer tc.Stopper().Stop(context.TODO())

sqlutils.CreateTable(
t, tc.ServerConn(0), "t", "x INT PRIMARY KEY, xsquared INT",
n,
sqlutils.ToRowFn(sqlutils.RowIdxFn, func(row int) parser.Datum {
return parser.NewDInt(parser.DInt(row * row))
}),
)

// Start a worker that continuously performs splits in the background.
tc.Stopper().RunWorker(context.TODO(), func(ctx context.Context) {
rng, _ := randutil.NewPseudoRand()
cdb := tc.Server(0).KVClient().(*client.DB)
for {
select {
case <-tc.Stopper().ShouldStop():
return
default:
// Split the table at a random row.
desc := sqlbase.GetTableDescriptor(cdb, "test", "t")

val := rng.Intn(n)
t.Logf("splitting at %d", val)
pik, err := sqlbase.MakePrimaryIndexKey(desc, val)
if err != nil {
panic(err)
}

splitKey := keys.MakeRowSentinelKey(pik)
if _, _, err := tc.Server(0).SplitRange(splitKey); err != nil {
panic(err)
}
}
}
})

sumX, sumXSquared := 0, 0
for x := 1; x <= n; x++ {
sumX += x
sumXSquared += x * x
}

// Run queries continuously in parallel workers. We need more than one worker
// because some queries result in cache updates, and we want to verify
// race conditions when planning during cache updates (see #15249).
const numQueriers = 4

var wg sync.WaitGroup
wg.Add(numQueriers)

for i := 0; i < numQueriers; i++ {
go func(idx int) {
defer wg.Done()

// Create a gosql.DB for this worker.
pgURL, cleanupGoDB := sqlutils.PGUrl(
t, tc.Server(0).ServingAddr(), fmt.Sprintf("%d", idx), url.User(security.RootUser),
)
defer cleanupGoDB()

pgURL.Path = "test"
goDB, err := gosql.Open("postgres", pgURL.String())
if err != nil {
t.Error(err)
return
}

defer func() {
if err := goDB.Close(); err != nil {
t.Error(err)
}
}()

// Limit to 1 connection because we set a session variable.
goDB.SetMaxOpenConns(1)
if _, err := goDB.Exec("SET DISTSQL = ALWAYS"); err != nil {
t.Error(err)
return
}

for run := 0; run < 20; run++ {
t.Logf("querier %d run %d", idx, run)
rows, err := goDB.Query("SELECT SUM(x), SUM(xsquared) FROM t")
if err != nil {
t.Error(err)
return
}
if !rows.Next() {
t.Errorf("no rows")
return
}
var sum, sumSq int
if err := rows.Scan(&sum, &sumSq); err != nil {
t.Error(err)
return
}
if sum != sumX || sumXSquared != sumSq {
t.Errorf("invalid results: expected %d, %d got %d, %d", sumX, sumXSquared, sum, sumSq)
return
}
if rows.Next() {
t.Errorf("more than one row")
return
}
}
}(i)
}
wg.Wait()
}

func TestDistBackfill(t *testing.T) {
defer leaktest.AfterTest(t)()

Expand Down Expand Up @@ -130,9 +257,6 @@ func TestDistBackfill(t *testing.T) {

r := sqlutils.MakeSQLRunner(t, tc.ServerConn(0))
r.DB.SetMaxOpenConns(1)
r.Exec("SET DISTSQL = ALWAYS")

r = r.Subtest(t)
r.Exec("SET DISTSQL = OFF")
if _, err := tc.ServerConn(0).Exec("CREATE INDEX foo ON NumToStr (str)"); err != nil {
t.Fatal(err)
Expand Down
4 changes: 3 additions & 1 deletion pkg/sql/distsqlplan/span_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,9 @@ type SpanResolverIterator interface {
// that was last Seek()ed.
NeedAnother() bool

// Next advances the iterator to the next range.
// Next advances the iterator to the next range. The next range contains the
// last range's end key (but it does not necessarily start there, because of
// asynchronous range splits and caching effects).
// Possible errors encountered should be checked for with Valid().
Next(ctx context.Context)

Expand Down
19 changes: 11 additions & 8 deletions pkg/sql/distsqlrun/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,23 +154,26 @@ func (ds *ServerImpl) setupFlow(
return nil, nil, errors.Errorf("setupFlow called before the NodeID was resolved")
}

evalCtx := ds.evalCtx

monitor := mon.MakeMonitor("flow",
ds.Counter, ds.Hist, -1 /* use default block size */, noteworthyMemoryUsageBytes)
monitor.Start(ctx, &ds.memMonitor, mon.BoundAccount{})
ds.evalCtx.Mon = &monitor
evalCtx.Mon = &monitor

// TODO(andrei): more fields from evalCtx need to be initialized (#13821).

// TODO(radu): we should sanity check some of these fields (especially
// txnProto).
flowCtx := FlowCtx{
AmbientContext: ds.AmbientContext,
id: req.Flow.FlowID,
// TODO(andrei): more fields from evalCtx need to be initialized (#13821).
evalCtx: ds.evalCtx,
rpcCtx: ds.RPCContext,
txnProto: &req.Txn,
clientDB: ds.DB,
testingKnobs: ds.TestingKnobs,
nodeID: nodeID,
evalCtx: evalCtx,
rpcCtx: ds.RPCContext,
txnProto: &req.Txn,
clientDB: ds.DB,
testingKnobs: ds.TestingKnobs,
nodeID: nodeID,
}

ctx = flowCtx.AnnotateCtx(ctx)
Expand Down
Loading

0 comments on commit 1227734

Please sign in to comment.