Skip to content

Commit

Permalink
sql: implement ALTER SCHEMA RENAME TO
Browse files Browse the repository at this point in the history
Fixes cockroachdb#50880.

This commit implements the `ALTER SCHEMA RENAME TO` command. It reuses
the existing schema change infrastructure to handle a more general case
of an arbitrary descriptor that just needs names and existing leases
drained.

Release note (sql change): Implement the `ALTER SCHEMA RENAME TO`
command.
  • Loading branch information
rohany committed Aug 10, 2020
1 parent c285413 commit bcc2776
Show file tree
Hide file tree
Showing 17 changed files with 771 additions and 460 deletions.
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/restore_schema_change_creation.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func createSchemaChangeJobsFromMutations(
Username: username,
DescriptorIDs: descpb.IDs{tableDesc.GetID()},
Details: jobspb.SchemaChangeDetails{
TableID: tableDesc.ID,
DescID: tableDesc.ID,
MutationID: mutationID,
ResumeSpanList: spanList,
FormatVersion: jobspb.JobResumerFormatVersion,
Expand Down
582 changes: 295 additions & 287 deletions pkg/jobs/jobspb/jobs.pb.go

Large diffs are not rendered by default.

10 changes: 9 additions & 1 deletion pkg/jobs/jobspb/jobs.proto
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,15 @@ message SchemaChangeDetails {
(gogoproto.customname) = "DroppedDatabaseID",
(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb.ID"
];
uint32 table_id = 5 [(gogoproto.customname) = "TableID", (gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb.ID"];
// desc_id is the target descriptor for this schema change. Note that this ID
// is not always a table ID! We allow referencing any descriptor here to allow
// generic schema changes on descriptors whose schema change process involves
// only draining names and existing leases. This allows us to implement the
// simple schema changes on SchemaDescriptors and DatabaseDescriptors without
// implementing a new job for each.
uint32 desc_id = 5 [(gogoproto.customname) = "DescID", (gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb.ID"];
// mutation_id is the mutation ID that the schema changer is to process. It is
// only set when desc_id references a TableDescriptor.
uint32 mutation_id = 6 [(gogoproto.customname) = "MutationID", (gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb.MutationID"];
// The format version of the schema change job details. This is used to
// distinguish between jobs as they existed in 19.2 and earlier versions
Expand Down
118 changes: 114 additions & 4 deletions pkg/sql/alter_schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,24 +13,134 @@ package sql
import (
"context"

"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/util/errorutil/unimplemented"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
)

type alterSchemaNode struct {
n *tree.AlterSchema
n *tree.AlterSchema
db *sqlbase.MutableDatabaseDescriptor
desc *sqlbase.MutableSchemaDescriptor
}

// Use to satisfy the linter.
var _ planNode = &alterSchemaNode{n: nil}

func (p *planner) AlterSchema(ctx context.Context, n *tree.AlterSchema) (planNode, error) {
return nil, unimplemented.NewWithIssue(50880, "ALTER SCHEMA")
// TODO (rohany, lucy): There should be an API to get a MutableSchemaDescriptor
// by name from the descs.Collection.
db, err := p.ResolveUncachedDatabaseByName(ctx, p.CurrentDatabase(), true /* required */)
if err != nil {
return nil, err
}
mutDB := sqlbase.NewMutableExistingDatabaseDescriptor(*db.DatabaseDesc())
found, schema, err := p.LogicalSchemaAccessor().GetSchema(ctx, p.txn, p.ExecCfg().Codec, db.ID, n.Schema)
if err != nil {
return nil, err
}
if !found {
return nil, pgerror.Newf(pgcode.InvalidSchemaName, "schema %q does not exist", n.Schema)
}
switch schema.Kind {
case sqlbase.SchemaPublic, sqlbase.SchemaVirtual, sqlbase.SchemaTemporary:
return nil, pgerror.Newf(pgcode.InvalidSchemaName, "cannot modify schema %q", n.Schema)
case sqlbase.SchemaUserDefined:
// TODO (rohany): Check permissions here.
desc, err := p.Descriptors().GetMutableSchemaDescriptorByID(ctx, schema.ID, p.txn)
if err != nil {
return nil, err
}
return &alterSchemaNode{n: n, db: mutDB, desc: desc}, nil
default:
return nil, errors.AssertionFailedf("unknown schema kind")
}
}

func (n *alterSchemaNode) startExec(params runParams) error {
return errors.AssertionFailedf("unimplemented")
switch t := n.n.Cmd.(type) {
case *tree.AlterSchemaRename:
return params.p.renameSchema(params.ctx, n.db, n.desc, t.NewName, tree.AsStringWithFQNames(n.n, params.Ann()))
default:
return errors.AssertionFailedf("unknown schema cmd %T", t)
}
}

func (p *planner) renameSchema(
ctx context.Context,
db *sqlbase.MutableDatabaseDescriptor,
desc *sqlbase.MutableSchemaDescriptor,
newName string,
jobDesc string,
) error {
// Check that there isn't a name collision with the new name.
found, err := p.schemaExists(ctx, db.ID, newName)
if err != nil {
return err
}
if found {
return pgerror.Newf(pgcode.DuplicateSchema, "schema %q already exists", newName)
}

// Ensure that the new name is a valid schema name.
if err := sqlbase.IsSchemaNameValid(newName); err != nil {
return err
}

// Set the new name for the descriptor.
oldName := desc.Name
desc.SetName(newName)

// Write a new namespace entry for the new name.
nameKey := sqlbase.NewSchemaKey(desc.ParentID, newName).Key(p.execCfg.Codec)
b := p.txn.NewBatch()
if p.ExtendedEvalContext().Tracing.KVTracingEnabled() {
log.VEventf(ctx, 2, "CPut %s -> %d", nameKey, desc.ID)
}
b.CPut(nameKey, desc.ID, nil)
if err := p.txn.Run(ctx, b); err != nil {
return err
}

// Update the schema mapping in the parent database.

// First, ensure that the new name isn't present, and that we have an entry
// for the old name.
_, oldPresent := db.Schemas[oldName]
_, newPresent := db.Schemas[newName]
if !oldPresent {
return errors.Newf(
"programming error: old name %q not present in database schema mapping",
oldName,
)
}
if newPresent {
return errors.Newf(
"programming error: new name %q already present in database schema mapping",
newName,
)
}

// Mark the old schema name as dropped.
db.Schemas[oldName] = descpb.DatabaseDescriptor_SchemaInfo{
ID: desc.ID,
Dropped: true,
}
// Create an entry for the new schema name.
db.Schemas[newName] = descpb.DatabaseDescriptor_SchemaInfo{
ID: desc.ID,
Dropped: false,
}
if err := p.writeDatabaseChange(ctx, db); err != nil {
return err
}

// Write the change to the schema itself.
return p.writeSchemaDescChange(ctx, desc, jobDesc)
}

func (n *alterSchemaNode) Next(params runParams) (bool, error) { return false, nil }
Expand Down
40 changes: 20 additions & 20 deletions pkg/sql/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,21 +312,21 @@ func (sc *SchemaChanger) dropConstraints(
for i := range constraints {
c := &constraints[i]
if c.ConstraintType == descpb.ConstraintToUpdate_FOREIGN_KEY &&
c.ForeignKey.ReferencedTableID != sc.tableID {
c.ForeignKey.ReferencedTableID != sc.descID {
fksByBackrefTable[c.ForeignKey.ReferencedTableID] = append(fksByBackrefTable[c.ForeignKey.ReferencedTableID], c)
}
}
tableIDsToUpdate := make([]descpb.ID, 0, len(fksByBackrefTable)+1)
tableIDsToUpdate = append(tableIDsToUpdate, sc.tableID)
tableIDsToUpdate = append(tableIDsToUpdate, sc.descID)
for id := range fksByBackrefTable {
tableIDsToUpdate = append(tableIDsToUpdate, id)
}

// Create update closure for the table and all other tables with backreferences.
update := func(_ *kv.Txn, descs map[descpb.ID]catalog.MutableDescriptor) error {
scDesc, ok := descs[sc.tableID]
scDesc, ok := descs[sc.descID]
if !ok {
return errors.AssertionFailedf("required table with ID %d not provided to update closure", sc.tableID)
return errors.AssertionFailedf("required table with ID %d not provided to update closure", sc.descID)
}
scTable := scDesc.(*MutableTableDescriptor)
for i := range constraints {
Expand Down Expand Up @@ -356,7 +356,7 @@ func (sc *SchemaChanger) dropConstraints(
if def.Name == constraint.Name {
backrefDesc, ok := descs[constraint.ForeignKey.ReferencedTableID]
if !ok {
return errors.AssertionFailedf("required table with ID %d not provided to update closure", sc.tableID)
return errors.AssertionFailedf("required table with ID %d not provided to update closure", sc.descID)
}
backrefTable := backrefDesc.(*MutableTableDescriptor)
if err := removeFKBackReferenceFromTable(backrefTable, def.Name, scTable.TableDesc()); err != nil {
Expand Down Expand Up @@ -384,7 +384,7 @@ func (sc *SchemaChanger) dropConstraints(
if err != nil {
return nil, err
}
if err := WaitToUpdateLeases(ctx, sc.leaseMgr, sc.tableID); err != nil {
if err := WaitToUpdateLeases(ctx, sc.leaseMgr, sc.descID); err != nil {
return nil, err
}
for id := range fksByBackrefTable {
Expand Down Expand Up @@ -414,21 +414,21 @@ func (sc *SchemaChanger) addConstraints(
for i := range constraints {
c := &constraints[i]
if c.ConstraintType == descpb.ConstraintToUpdate_FOREIGN_KEY &&
c.ForeignKey.ReferencedTableID != sc.tableID {
c.ForeignKey.ReferencedTableID != sc.descID {
fksByBackrefTable[c.ForeignKey.ReferencedTableID] = append(fksByBackrefTable[c.ForeignKey.ReferencedTableID], c)
}
}
tableIDsToUpdate := make([]descpb.ID, 0, len(fksByBackrefTable)+1)
tableIDsToUpdate = append(tableIDsToUpdate, sc.tableID)
tableIDsToUpdate = append(tableIDsToUpdate, sc.descID)
for id := range fksByBackrefTable {
tableIDsToUpdate = append(tableIDsToUpdate, id)
}

// Create update closure for the table and all other tables with backreferences
update := func(_ *kv.Txn, descs map[descpb.ID]catalog.MutableDescriptor) error {
scDesc, ok := descs[sc.tableID]
scDesc, ok := descs[sc.descID]
if !ok {
return errors.AssertionFailedf("required table with ID %d not provided to update closure", sc.tableID)
return errors.AssertionFailedf("required table with ID %d not provided to update closure", sc.descID)
}
scTable := scDesc.(*MutableTableDescriptor)
for i := range constraints {
Expand Down Expand Up @@ -478,7 +478,7 @@ func (sc *SchemaChanger) addConstraints(
scTable.OutboundFKs = append(scTable.OutboundFKs, constraint.ForeignKey)
backrefDesc, ok := descs[constraint.ForeignKey.ReferencedTableID]
if !ok {
return errors.AssertionFailedf("required table with ID %d not provided to update closure", sc.tableID)
return errors.AssertionFailedf("required table with ID %d not provided to update closure", sc.descID)
}
backrefTable := backrefDesc.(*MutableTableDescriptor)
backrefTable.InboundFKs = append(backrefTable.InboundFKs, constraint.ForeignKey)
Expand All @@ -491,7 +491,7 @@ func (sc *SchemaChanger) addConstraints(
if _, err := sc.leaseMgr.PublishMultiple(ctx, tableIDsToUpdate, update, nil); err != nil {
return err
}
if err := WaitToUpdateLeases(ctx, sc.leaseMgr, sc.tableID); err != nil {
if err := WaitToUpdateLeases(ctx, sc.leaseMgr, sc.descID); err != nil {
return err
}
for id := range fksByBackrefTable {
Expand Down Expand Up @@ -531,7 +531,7 @@ func (sc *SchemaChanger) validateConstraints(
var tableDesc *sqlbase.ImmutableTableDescriptor

if err := sc.fixedTimestampTxn(ctx, readAsOf, func(ctx context.Context, txn *kv.Txn) error {
tableDesc, err = catalogkv.MustGetTableDescByID(ctx, txn, sc.execCfg.Codec, sc.tableID)
tableDesc, err = catalogkv.MustGetTableDescByID(ctx, txn, sc.execCfg.Codec, sc.descID)
return err
}); err != nil {
return err
Expand Down Expand Up @@ -609,7 +609,7 @@ func (sc *SchemaChanger) validateConstraints(
func (sc *SchemaChanger) getTableVersion(
ctx context.Context, txn *kv.Txn, tc *descs.Collection, version descpb.DescriptorVersion,
) (*sqlbase.ImmutableTableDescriptor, error) {
tableDesc, err := tc.GetTableVersionByID(ctx, txn, sc.tableID, tree.ObjectLookupFlags{})
tableDesc, err := tc.GetTableVersionByID(ctx, txn, sc.descID, tree.ObjectLookupFlags{})
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -639,7 +639,7 @@ func (sc *SchemaChanger) truncateIndexes(
resumeAt := resume
if log.V(2) {
log.Infof(ctx, "drop index (%d, %d) at row: %d, span: %s",
sc.tableID, sc.mutationID, rowIdx, resume)
sc.descID, sc.mutationID, rowIdx, resume)
}

// Make a new txn just to drop this chunk.
Expand Down Expand Up @@ -696,7 +696,7 @@ func (sc *SchemaChanger) truncateIndexes(
// All the data chunks have been removed. Now also removed the
// zone configs for the dropped indexes, if any.
if err := sc.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
return RemoveIndexZoneConfigs(ctx, txn, sc.execCfg, sc.tableID, dropped)
return RemoveIndexZoneConfigs(ctx, txn, sc.execCfg, sc.descID, dropped)
}); err != nil {
return err
}
Expand Down Expand Up @@ -816,7 +816,7 @@ func (sc *SchemaChanger) distBackfill(
if err := sc.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
var err error
todoSpans, _, mutationIdx, err = rowexec.GetResumeSpans(
ctx, sc.jobRegistry, txn, sc.execCfg.Codec, sc.tableID, sc.mutationID, filter)
ctx, sc.jobRegistry, txn, sc.execCfg.Codec, sc.descID, sc.mutationID, filter)
return err
}); err != nil {
return err
Expand Down Expand Up @@ -903,7 +903,7 @@ func (sc *SchemaChanger) distBackfill(
if err := sc.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
var err error
resumeSpans, _, _, err = rowexec.GetResumeSpans(
ctx, sc.jobRegistry, txn, sc.execCfg.Codec, sc.tableID, sc.mutationID, filter)
ctx, sc.jobRegistry, txn, sc.execCfg.Codec, sc.descID, sc.mutationID, filter)
return err
}); err != nil {
return err
Expand Down Expand Up @@ -935,7 +935,7 @@ func (sc *SchemaChanger) updateJobRunningStatus(
) (*sqlbase.MutableTableDescriptor, error) {
var tableDesc *sqlbase.MutableTableDescriptor
err := sc.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
desc, err := catalogkv.GetDescriptorByID(ctx, txn, sc.execCfg.Codec, sc.tableID, catalogkv.Mutable,
desc, err := catalogkv.GetDescriptorByID(ctx, txn, sc.execCfg.Codec, sc.descID, catalogkv.Mutable,
catalogkv.TableDescriptorKind, true /* required */)
if err != nil {
return err
Expand Down Expand Up @@ -1002,7 +1002,7 @@ func (sc *SchemaChanger) validateIndexes(ctx context.Context) error {
readAsOf := sc.clock.Now()
var tableDesc *sqlbase.ImmutableTableDescriptor
if err := sc.fixedTimestampTxn(ctx, readAsOf, func(ctx context.Context, txn *kv.Txn) (err error) {
tableDesc, err = catalogkv.MustGetTableDescByID(ctx, txn, sc.execCfg.Codec, sc.tableID)
tableDesc, err = catalogkv.MustGetTableDescByID(ctx, txn, sc.execCfg.Codec, sc.descID)
return err
}); err != nil {
return err
Expand Down
18 changes: 18 additions & 0 deletions pkg/sql/catalog/descs/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -601,6 +601,17 @@ func (tc *Collection) getMutableDescriptorByID(
return desc.(catalog.MutableDescriptor), nil
}

// GetMutableSchemaDescriptorByID gets a MutableSchemaDescriptor by ID.
func (tc *Collection) GetMutableSchemaDescriptorByID(
ctx context.Context, scID descpb.ID, txn *kv.Txn,
) (*sqlbase.MutableSchemaDescriptor, error) {
desc, err := tc.getMutableDescriptorByID(ctx, scID, txn)
if err != nil {
return nil, err
}
return desc.(*sqlbase.MutableSchemaDescriptor), nil
}

// hydrateTypesInTableDesc installs user defined type metadata in all types.T
// present in the input TableDescriptor. It always returns the same type of
// TableDescriptor that was passed in. It ensures that ImmutableTableDescriptors
Expand Down Expand Up @@ -1153,6 +1164,13 @@ func (tc *Collection) ResetDatabaseCache(dbCache *database.Cache) {
tc.databaseCache = dbCache
}

// ResetSchemaCache resets the table collection's schema cache.
// TODO (rohany): This can removed once we use the collection's descriptors
// to manage schemas.
func (tc *Collection) ResetSchemaCache() {
tc.schemaCache = sync.Map{}
}

// MigrationSchemaChangeRequiredContext flags a schema change as necessary to
// run even in a mixed-version 19.2/20.1 state where schema changes are normally
// banned, because the schema change is being run in a startup migration. It's
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1204,8 +1204,8 @@ func (ex *connExecutor) resetExtraTxnState(
}

ex.extraTxnState.descCollection.ReleaseAll(ctx)

ex.extraTxnState.descCollection.ResetDatabaseCache(dbCacheHolder.getDatabaseCache())
ex.extraTxnState.descCollection.ResetSchemaCache()

// Close all portals.
for name, p := range ex.extraTxnState.prepStmtsNamespace.portals {
Expand Down
Loading

0 comments on commit bcc2776

Please sign in to comment.