Skip to content

Commit

Permalink
sql: generate descriptor IDs non-transactionally
Browse files Browse the repository at this point in the history
Before this patch, ids for new table of database descriptors were
created in the same transaction as the SQL statement performing the
creation. This meant that if they encountered a retryable error, the
statement failed. This is a problem for CREATEs performed by an ORM,
which like to perform such statements in explicit transactions (so we
can't automatically retry the statement).
This patch makes the id creation non-transactional, and retryable on
errors.

Fixes #13180.
Touches #16450
  • Loading branch information
andreimatei committed Jun 29, 2017
1 parent 4f11fae commit 74f5728
Show file tree
Hide file tree
Showing 10 changed files with 105 additions and 80 deletions.
43 changes: 19 additions & 24 deletions pkg/ccl/sqlccl/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,32 +194,27 @@ func reassignTableIDs(
var newTableIDs map[sqlbase.ID]sqlbase.ID
var rekeys []roachpb.ImportRequest_TableRekey

if err := db.Txn(ctx, func(ctx context.Context, txn *client.Txn) error {
newTableIDs = make(map[sqlbase.ID]sqlbase.ID, len(tables))
for _, table := range tables {
newTableID, err := sql.GenerateUniqueDescID(ctx, txn)
if err != nil {
return err
}
newTableIDs[table.ID] = newTableID
oldID := table.ID
table.ID = newTableID
newTableIDs = make(map[sqlbase.ID]sqlbase.ID, len(tables))
for _, table := range tables {
newTableID, err := sql.GenerateUniqueDescID(ctx, &db)
if err != nil {
return nil, nil, nil, err
}
newTableIDs[table.ID] = newTableID
oldID := table.ID
table.ID = newTableID

desc := sqlbase.Descriptor{
Union: &sqlbase.Descriptor_Table{Table: table},
}
newDescBytes, err := desc.Marshal()
if err != nil {
return errors.Wrap(err, "marshalling descriptor")
}
rekeys = append(rekeys, roachpb.ImportRequest_TableRekey{
OldID: uint32(oldID),
NewDesc: newDescBytes,
})
desc := sqlbase.Descriptor{
Union: &sqlbase.Descriptor_Table{Table: table},
}
return nil
}); err != nil {
return nil, nil, nil, err
newDescBytes, err := desc.Marshal()
if err != nil {
return nil, nil, nil, errors.Wrap(err, "marshalling descriptor")
}
rekeys = append(rekeys, roachpb.ImportRequest_TableRekey{
OldID: uint32(oldID),
NewDesc: newDescBytes,
})
}

if err := reassignReferencedTables(tables, newTableIDs, opt); err != nil {
Expand Down
21 changes: 21 additions & 0 deletions pkg/internal/client/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@ import (
"github.com/pkg/errors"
"golang.org/x/net/context"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
)

Expand Down Expand Up @@ -594,3 +596,22 @@ func getOneRow(runErr error, b *Batch) (KeyValue, error) {
}
return res.Rows[0], nil
}

// IncrementValRetryable increments a key's value by a specified amount and
// returns the new value.
//
// It performs the increment as a retryable non-transactional increment. The key
// might be incremented multiple times because of the retries.
func IncrementValRetryable(ctx context.Context, db *DB, key roachpb.Key, inc int64) (int64, error) {
var err error
var res KeyValue
for r := retry.Start(base.DefaultRetryOptions()); r.Next(); {
res, err = db.Inc(ctx, key, inc)
switch err.(type) {
case *roachpb.UnhandledRetryableError, *roachpb.AmbiguousResultError:
continue
}
break
}
return res.ValueInt(), err
}
22 changes: 2 additions & 20 deletions pkg/server/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ type Node struct {
// allocateNodeID increments the node id generator key to allocate
// a new, unique node id.
func allocateNodeID(ctx context.Context, db *client.DB) (roachpb.NodeID, error) {
val, err := incVal(ctx, db, keys.NodeIDGenerator, 1)
val, err := client.IncrementValRetryable(ctx, db, keys.NodeIDGenerator, 1)
if err != nil {
return 0, errors.Wrap(err, "unable to allocate node ID")
}
Expand All @@ -159,31 +159,13 @@ func allocateNodeID(ctx context.Context, db *client.DB) (roachpb.NodeID, error)
func allocateStoreIDs(
ctx context.Context, nodeID roachpb.NodeID, inc int64, db *client.DB,
) (roachpb.StoreID, error) {
val, err := incVal(ctx, db, keys.StoreIDGenerator, inc)
val, err := client.IncrementValRetryable(ctx, db, keys.StoreIDGenerator, inc)
if err != nil {
return 0, errors.Wrapf(err, "unable to allocate %d store IDs for node %d", inc, nodeID)
}
return roachpb.StoreID(val - inc + 1), nil
}

// incVal increments a key's value by a specified amount and returns the new
// value.
// It performs the increment as a retryable non-transactional increment. The key
// might be incremented multiple times because of the retries.
func incVal(ctx context.Context, db *client.DB, key roachpb.Key, inc int64) (int64, error) {
var err error
var res client.KeyValue
for r := retry.Start(base.DefaultRetryOptions()); r.Next(); {
res, err = db.Inc(ctx, key, inc)
switch err.(type) {
case *roachpb.UnhandledRetryableError, *roachpb.AmbiguousResultError:
continue
}
break
}
return res.ValueInt(), err
}

// GetBootstrapSchema returns the schema which will be used to bootstrap a new
// server.
func GetBootstrapSchema() sqlbase.MetadataSchema {
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ func (n *createViewNode) Start(ctx context.Context) error {
return err
}

id, err := GenerateUniqueDescID(ctx, n.p.txn)
id, err := GenerateUniqueDescID(ctx, n.p.session.execCfg.DB)
if err != nil {
return nil
}
Expand Down Expand Up @@ -593,7 +593,7 @@ func (n *createTableNode) Start(ctx context.Context) error {
return err
}

id, err := GenerateUniqueDescID(ctx, n.p.txn)
id, err := GenerateUniqueDescID(ctx, n.p.session.execCfg.DB)
if err != nil {
return err
}
Expand Down
56 changes: 39 additions & 17 deletions pkg/sql/create_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ func TestDatabaseDescriptor(t *testing.T) {
t.Fatalf("unexpected error %v", err)
}

// Even though the CREATE above failed, the counter is still incremented
// (that's performed non-transactionally).
expectedCounter++

if ir, err := kvDB.Get(ctx, keys.DescIDGenerator); err != nil {
t.Fatal(err)
} else if actual := ir.ValueInt(); actual != expectedCounter {
Expand All @@ -107,6 +111,7 @@ func TestDatabaseDescriptor(t *testing.T) {
t.Fatal(err)
}

dbDescKey = sqlbase.MakeDescMetadataKey(sqlbase.ID(expectedCounter))
if _, err := sqlDB.Exec(`CREATE DATABASE test`); err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -194,7 +199,7 @@ func createTestTable(
if testutils.IsSQLRetryableError(err) {
continue
}
t.Errorf("table %d: could not be created: %v", id, err)
t.Errorf("table %d: could not be created: %s", id, err)
return
}
completed <- id
Expand All @@ -210,24 +215,33 @@ func verifyTables(
tc *testcluster.TestCluster,
completed chan int,
expectedNumOfTables int,
descIDStart int64,
descIDStart sqlbase.ID,
) {
descIDEnd := descIDStart + int64(expectedNumOfTables)
usedTableIDs := make(map[sqlbase.ID]string)
var count int
tableIDs := make(map[sqlbase.ID]struct{})
maxID := descIDStart
for id := range completed {
count++
tableName := fmt.Sprintf("table_%d", id)
kvDB := tc.Servers[count%tc.NumServers()].KVClient().(*client.DB)
tableDesc := sqlbase.GetTableDescriptor(kvDB, "test", tableName)
if int64(tableDesc.ID) < descIDStart || int64(tableDesc.ID) >= descIDEnd {
if tableDesc.ID < descIDStart {
t.Fatalf(
"table %s's ID %d is not within the expected range of %d to %d",
"table %s's ID %d is too small. Expected >= %d",
tableName,
tableDesc.ID,
descIDStart,
descIDEnd,
)

if _, ok := tableIDs[tableDesc.ID]; ok {
t.Fatalf("duplicate ID: %d", id)
}
tableIDs[tableDesc.ID] = struct{}{}
if tableDesc.ID > maxID {
maxID = tableDesc.ID
}

}
usedTableIDs[tableDesc.ID] = tableName
}
Expand All @@ -236,12 +250,20 @@ func verifyTables(
t.Fatalf("expected %d tables created, only got %d", e, a)
}

kvDB := tc.Servers[count%tc.NumServers()].KVClient().(*client.DB)
if descID, err := kvDB.Get(context.Background(), keys.DescIDGenerator); err != nil {
t.Fatal(err)
} else {
if e, a := descIDEnd, descID.ValueInt(); e != a {
t.Fatalf("expected next descriptor ID to be %d, got %d", e, a)
// Check that no extra descriptors have been written in the range
// descIDStart..maxID.
kvDB := tc.Servers[0].KVClient().(*client.DB)
for id := descIDStart; id < maxID; id++ {
if _, ok := tableIDs[id]; ok {
continue
}
descKey := sqlbase.MakeDescMetadataKey(id)
desc := &sqlbase.Descriptor{}
if err := kvDB.GetProto(context.TODO(), descKey, desc); err != nil {
t.Fatal(err)
}
if (*desc != sqlbase.Descriptor{}) {
t.Fatalf("extra descriptor with id %d", id)
}
}
}
Expand All @@ -264,11 +286,11 @@ func TestParallelCreateTables(t *testing.T) {
}
// Get the id descriptor generator count.
kvDB := tc.Servers[0].KVClient().(*client.DB)
var descIDStart int64
var descIDStart sqlbase.ID
if descID, err := kvDB.Get(context.Background(), keys.DescIDGenerator); err != nil {
t.Fatal(err)
} else {
descIDStart = descID.ValueInt()
descIDStart = sqlbase.ID(descID.ValueInt())
}

var wgStart sync.WaitGroup
Expand Down Expand Up @@ -318,11 +340,11 @@ func TestParallelCreateConflictingTables(t *testing.T) {

// Get the id descriptor generator count.
kvDB := tc.Servers[0].KVClient().(*client.DB)
var descIDStart int64
var descIDStart sqlbase.ID
if descID, err := kvDB.Get(context.Background(), keys.DescIDGenerator); err != nil {
t.Fatal(err)
} else {
descIDStart = descID.ValueInt()
descIDStart = sqlbase.ID(descID.ValueInt())
}

var wgStart sync.WaitGroup
Expand All @@ -348,7 +370,7 @@ func TestParallelCreateConflictingTables(t *testing.T) {
t,
tc,
completed,
1,
1, /* expectedNumOfTables */
descIDStart,
)
}
11 changes: 6 additions & 5 deletions pkg/sql/descriptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,15 @@ func (d descriptorAlreadyExistsErr) Error() string {
}

// GenerateUniqueDescID returns the next available Descriptor ID and increments
// the counter.
func GenerateUniqueDescID(ctx context.Context, txn *client.Txn) (sqlbase.ID, error) {
// the counter. The incrementing is non-transactional, and the counter could be
// incremented multiple times because of retries.
func GenerateUniqueDescID(ctx context.Context, db *client.DB) (sqlbase.ID, error) {
// Increment unique descriptor counter.
ir, err := txn.Inc(ctx, keys.DescIDGenerator, 1)
newVal, err := client.IncrementValRetryable(ctx, db, keys.DescIDGenerator, 1)
if err != nil {
return 0, err
}
return sqlbase.ID(ir.ValueInt() - 1), nil
return sqlbase.ID(newVal - 1), nil
}

// createDescriptor implements the DescriptorAccessor interface.
Expand Down Expand Up @@ -102,7 +103,7 @@ func (p *planner) createDescriptor(
return false, err
}

id, err := GenerateUniqueDescID(ctx, p.txn)
id, err := GenerateUniqueDescID(ctx, p.session.execCfg.DB)
if err != nil {
return false, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/logictest/testdata/logic_test/interleaved
Original file line number Diff line number Diff line change
Expand Up @@ -290,4 +290,4 @@ EXPLAIN SELECT * FROM orders WHERE customer = 1 AND id = 1000
----
0 scan
0 table orders@primary
0 spans /1/#/57/1/1000-/1/#/57/1/1001
0 spans /1/#/64/1/1000-/1/#/64/1/1001
18 changes: 9 additions & 9 deletions pkg/sql/logictest/testdata/logic_test/show_trace
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,11 @@ SELECT span, operation, message FROM [SHOW SESSION KV TRACE] WHERE message NOT L
(1,0) sql txn implicit querying next range at /Table/2/1/0/"t"/3/1
(1,0) sql txn implicit r1: sending batch 1 Get to (n1,s1):1
(1,0) sql txn implicit querying next range at /System/"desc-idgen"
(1,0) sql txn implicit r1: sending batch 1 Inc, 1 BeginTxn to (n1,s1):1
(1,0) sql txn implicit r1: sending batch 1 Inc to (n1,s1):1
(1,0) sql txn implicit CPut /Table/2/1/0/"t"/3/1 -> 51
(1,0) sql txn implicit CPut /Table/3/1/51/2/1 -> database:<name:"t" id:51 privileges:<users:<user:"root" privileges:2 > > >
(1,0) sql txn implicit querying next range at /Table/2/1/0/"t"/3/1
(1,0) sql txn implicit r1: sending batch 2 CPut to (n1,s1):1
(1,0) sql txn implicit querying next range at /Table/SystemConfigSpan/Start
(1,0) sql txn implicit r1: sending batch 2 CPut, 1 BeginTxn to (n1,s1):1
(1,0) sql txn implicit querying next range at /Table/2/1/0/"system"/3/1
(1,0) sql txn implicit r1: sending batch 1 Get to (n1,s1):1
(1,0) sql txn implicit querying next range at /Table/3/1/1/2/1
Expand All @@ -74,11 +74,11 @@ SELECT span, operation, message FROM [SHOW KV TRACE FOR CREATE TABLE t.kv(k INT
(0,1) starting plan querying next range at /Table/2/1/51/"kv"/3/1
(0,1) starting plan r1: sending batch 1 Get to (n1,s1):1
(0,1) starting plan querying next range at /System/"desc-idgen"
(0,1) starting plan r1: sending batch 1 Inc, 1 BeginTxn to (n1,s1):1
(0,1) starting plan r1: sending batch 1 Inc to (n1,s1):1
(0,1) starting plan CPut /Table/2/1/51/"kv"/3/1 -> 52
(0,1) starting plan CPut /Table/3/1/52/2/1 -> table:<name:"kv" id:52 parent_id:51 version:1 up_version:false modification_time:<wall_time:0 logical:0 > columns:<name:"k" id:1 type:<semantic_type:INT width:0 precision:0 visible_type:NONE > nullable:false hidden:false > columns:<name:"v" id:2 type:<semantic_type:INT width:0 precision:0 visible_type:NONE > nullable:true hidden:false > next_column_id:3 families:<name:"primary" id:0 column_names:"k" column_names:"v" column_ids:1 column_ids:2 default_column_id:2 > next_family_id:1 primary_index:<name:"primary" id:1 unique:true column_names:"k" column_directions:ASC column_ids:1 foreign_key:<table:0 index:0 name:"" validity:Validated shared_prefix_len:0 > interleave:<> > next_index_id:2 privileges:<users:<user:"root" privileges:2 > > next_mutation_id:1 format_version:InterleavedFormatVersion state:PUBLIC view_query:"" >
(0,1) starting plan querying next range at /Table/2/1/51/"kv"/3/1
(0,1) starting plan r1: sending batch 2 CPut to (n1,s1):1
(0,1) starting plan querying next range at /Table/SystemConfigSpan/Start
(0,1) starting plan r1: sending batch 2 CPut, 1 BeginTxn to (n1,s1):1
(0,1) starting plan querying next range at /Table/3/1/51/2/1
(0,1) starting plan r1: sending batch 1 Get to (n1,s1):1
(0,1) starting plan querying next range at /Table/2/1/0/"system"/3/1
Expand Down Expand Up @@ -196,11 +196,11 @@ SELECT span, operation, regexp_replace(message, '\d\d\d\d\d+', '...PK...') as me
(0,1) starting plan querying next range at /Table/2/1/51/"kv2"/3/1
(0,1) starting plan r1: sending batch 1 Get to (n1,s1):1
(0,1) starting plan querying next range at /System/"desc-idgen"
(0,1) starting plan r1: sending batch 1 Inc, 1 BeginTxn to (n1,s1):1
(0,1) starting plan r1: sending batch 1 Inc to (n1,s1):1
(0,1) starting plan CPut /Table/2/1/51/"kv2"/3/1 -> 53
(0,1) starting plan CPut /Table/3/1/53/2/1 -> table:<name:"kv2" id:53 parent_id:51 version:1 up_version:false modification_time:<wall_time:0 logical:0 > columns:<name:"k" id:1 type:<semantic_type:INT width:0 precision:0 visible_type:NONE > nullable:true hidden:false > columns:<name:"v" id:2 type:<semantic_type:INT width:0 precision:0 visible_type:NONE > nullable:true hidden:false > columns:<name:"rowid" id:3 type:<semantic_type:INT width:0 precision:0 visible_type:NONE > nullable:false default_expr:"unique_rowid()" hidden:true > next_column_id:4 families:<name:"primary" id:0 column_names:"k" column_names:"v" column_names:"rowid" column_ids:1 column_ids:2 column_ids:3 default_column_id:0 > next_family_id:1 primary_index:<name:"primary" id:1 unique:true column_names:"rowid" column_directions:ASC column_ids:3 foreign_key:<table:0 index:0 name:"" validity:Validated shared_prefix_len:0 > interleave:<> > next_index_id:2 privileges:<users:<user:"root" privileges:2 > > next_mutation_id:1 format_version:InterleavedFormatVersion state:PUBLIC view_query:"" >
(0,1) starting plan querying next range at /Table/2/1/51/"kv2"/3/1
(0,1) starting plan r1: sending batch 2 CPut to (n1,s1):1
(0,1) starting plan querying next range at /Table/SystemConfigSpan/Start
(0,1) starting plan r1: sending batch 2 CPut, 1 BeginTxn to (n1,s1):1
(0,1) starting plan querying next range at /Table/3/1/51/2/1
(0,1) starting plan r1: sending batch 1 Get to (n1,s1):1
(0,1) starting plan querying next range at /Table/2/1/0/"system"/3/1
Expand Down
5 changes: 3 additions & 2 deletions pkg/sql/sqlbase/testutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/sql/parser"
"github.com/cockroachdb/cockroach/pkg/util/duration"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
)

Expand Down Expand Up @@ -59,8 +60,8 @@ func GetTableDescriptor(kvDB *client.DB, database string, table string) *TableDe

descKey := MakeDescMetadataKey(ID(gr.ValueInt()))
desc := &Descriptor{}
if err := kvDB.GetProto(context.TODO(), descKey, desc); err != nil {
panic("proto missing")
if err := kvDB.GetProto(context.TODO(), descKey, desc); err != nil || (*desc == Descriptor{}) {
log.Fatalf(context.TODO(), "proto with id %d missing. err: %v", gr.ValueInt(), err)
}
return desc.GetTable()
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/storage/id_alloc.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ import (

// An idAllocator is used to increment a key in allocation blocks
// of arbitrary size starting at a minimum ID.
//
// Note: if all you want is to increment a key and retry on retryable errors,
// see client.IncrementValRetryable().
type idAllocator struct {
log.AmbientContext

Expand Down

0 comments on commit 74f5728

Please sign in to comment.