Skip to content

Commit

Permalink
Merge #52313 #52349
Browse files Browse the repository at this point in the history
52313: colexec: add default comparison operator r=yuzefovich a=yuzefovich

Depends on #52315.

**colexec: add default comparison operators**

This commit introduces default comparison projection and selection
operators that handle all `tree.ComparisonExpr`s for which we don't
have optimized implementations. The main operators are very similar
to `defaultBuiltinFuncOperator`, but we also introduce optimized
adapter implementations from `tree.ComparisonExpr` to a vectorized
friendly model. Quick benchmarks show about 3.5x improvement in
speed of `IS DISTINCT FROM` projection operator against the wrapped
post-processing spec.

Addresses: #49781.

Release note (sql change): Vectorized execution engine now fully
supports comparison operators (things like `ILIKE`, `IS NOT DISTINCT FROM`,
`SIMILAR TO`, and several others).

**colexec: add support for Tuple expressions and clean up tests**

This commit adds support for `tree.Tuple` expressions which are
supported either by pre-evaluation during planning (and handling the
result as other constant values) if the tuple is constant or by planning
projection operators for each expression in the tuple and then using
newly-introduced `tupleProjOp` if the tuple expression is not constant.

The support required refactoring of IsNull* operators to template out special
"is tuple null" variants (because tuple have very peculiar null-handling
semantics). It also required that we fallback to the default comparison
operators on things that we have optimized support for (for example,
EQ), also because of the null-handling semantics.

It also improves the tests to handle datum-backed types better. Note
that a single unit test in `aggregators_test.go` has been switched to use
`types.TimeTZ` instead of `types.Jsonb` because of the differences in
order of `string`s and `json.JSON.String()` (the former is the
"expected" value whereas the latter is the actual; it was simpler to
switch the test to use a different datum-backed type).

Release note: None

52349: sql: implement `ALTER SCHEMA RENAME TO` r=rohany a=rohany

Fixes #50880.

This commit implements the `ALTER SCHEMA RENAME TO` command. It reuses
the existing schema change infrastructure to handle a more general case
of an arbitrary descriptor that just needs names and existing leases
drained.

Release note (sql change): Implement the `ALTER SCHEMA RENAME TO`
command.

Co-authored-by: Yahor Yuzefovich <[email protected]>
Co-authored-by: Rohan Yadav <[email protected]>
  • Loading branch information
3 people committed Aug 13, 2020
3 parents 822e3fa + b3f2f73 + c644dc0 commit b1359ad
Show file tree
Hide file tree
Showing 60 changed files with 8,819 additions and 6,569 deletions.
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -856,6 +856,9 @@ EXECGEN_TARGETS = \
pkg/sql/colexec/and_or_projection.eg.go \
pkg/sql/colexec/cast.eg.go \
pkg/sql/colexec/const.eg.go \
pkg/sql/colexec/default_cmp_expr.eg.go \
pkg/sql/colexec/default_cmp_proj_ops.eg.go \
pkg/sql/colexec/default_cmp_sel_ops.eg.go \
pkg/sql/colexec/distinct.eg.go \
pkg/sql/colexec/hashjoiner.eg.go \
pkg/sql/colexec/hashtable_distinct.eg.go \
Expand All @@ -872,6 +875,7 @@ EXECGEN_TARGETS = \
pkg/sql/colexec/hash_sum_agg.eg.go \
pkg/sql/colexec/hash_sum_int_agg.eg.go \
pkg/sql/colexec/hash_utils.eg.go \
pkg/sql/colexec/is_null_ops.eg.go \
pkg/sql/colexec/like_ops.eg.go \
pkg/sql/colexec/mergejoinbase.eg.go \
pkg/sql/colexec/mergejoiner_exceptall.eg.go \
Expand Down
8 changes: 4 additions & 4 deletions pkg/ccl/backupccl/restore_schema_change_creation.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,10 +180,10 @@ func createSchemaChangeJobsFromMutations(
Username: username,
DescriptorIDs: descpb.IDs{tableDesc.GetID()},
Details: jobspb.SchemaChangeDetails{
TableID: tableDesc.ID,
MutationID: mutationID,
ResumeSpanList: spanList,
FormatVersion: jobspb.JobResumerFormatVersion,
DescID: tableDesc.ID,
TableMutationID: mutationID,
ResumeSpanList: spanList,
FormatVersion: jobspb.JobResumerFormatVersion,
},
Progress: jobspb.SchemaChangeProgress{},
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/col/coldataext/datum_vec.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,9 @@ func newDatumVec(t *types.T, n int, evalCtx *tree.EvalContext) coldata.DatumVec
// BinFn evaluates the provided binary function between the receiver and other.
// other can either be nil, tree.Datum, or *Datum.
func (d *Datum) BinFn(
binFn *tree.BinOp, evalCtx *tree.EvalContext, other interface{},
binFn tree.TwoArgFn, evalCtx *tree.EvalContext, other interface{},
) (tree.Datum, error) {
return binFn.Fn(evalCtx, d.Datum, maybeUnwrapDatum(other))
return binFn(evalCtx, d.Datum, maybeUnwrapDatum(other))
}

// CompareDatum returns the comparison between d and other. The other is
Expand Down
588 changes: 298 additions & 290 deletions pkg/jobs/jobspb/jobs.pb.go

Large diffs are not rendered by default.

12 changes: 10 additions & 2 deletions pkg/jobs/jobspb/jobs.proto
Original file line number Diff line number Diff line change
Expand Up @@ -295,8 +295,16 @@ message SchemaChangeDetails {
(gogoproto.customname) = "DroppedDatabaseID",
(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb.ID"
];
uint32 table_id = 5 [(gogoproto.customname) = "TableID", (gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb.ID"];
uint32 mutation_id = 6 [(gogoproto.customname) = "MutationID", (gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb.MutationID"];
// desc_id is the target descriptor for this schema change. Note that this ID
// is not always a table ID! We allow referencing any descriptor here to allow
// generic schema changes on descriptors whose schema change process involves
// only draining names and existing leases. This allows us to implement the
// simple schema changes on SchemaDescriptors and DatabaseDescriptors without
// implementing a new job for each.
uint32 desc_id = 5 [(gogoproto.customname) = "DescID", (gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb.ID"];
// table_mutation_id is the mutation ID that the schema changer is to process. It is
// only set when desc_id references a TableDescriptor.
uint32 table_mutation_id = 6 [(gogoproto.customname) = "TableMutationID", (gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb.MutationID"];
// The format version of the schema change job details. This is used to
// distinguish between jobs as they existed in 19.2 and earlier versions
// (controlled and updated by a SchemaChanger) and jobs as they exist in 20.1
Expand Down
118 changes: 114 additions & 4 deletions pkg/sql/alter_schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,24 +13,134 @@ package sql
import (
"context"

"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/util/errorutil/unimplemented"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
)

type alterSchemaNode struct {
n *tree.AlterSchema
n *tree.AlterSchema
db *sqlbase.MutableDatabaseDescriptor
desc *sqlbase.MutableSchemaDescriptor
}

// Use to satisfy the linter.
var _ planNode = &alterSchemaNode{n: nil}

func (p *planner) AlterSchema(ctx context.Context, n *tree.AlterSchema) (planNode, error) {
return nil, unimplemented.NewWithIssue(50880, "ALTER SCHEMA")
// TODO (rohany, lucy): There should be an API to get a MutableSchemaDescriptor
// by name from the descs.Collection.
db, err := p.ResolveUncachedDatabaseByName(ctx, p.CurrentDatabase(), true /* required */)
if err != nil {
return nil, err
}
mutDB := sqlbase.NewMutableExistingDatabaseDescriptor(*db.DatabaseDesc())
found, schema, err := p.LogicalSchemaAccessor().GetSchema(ctx, p.txn, p.ExecCfg().Codec, db.ID, n.Schema)
if err != nil {
return nil, err
}
if !found {
return nil, pgerror.Newf(pgcode.InvalidSchemaName, "schema %q does not exist", n.Schema)
}
switch schema.Kind {
case sqlbase.SchemaPublic, sqlbase.SchemaVirtual, sqlbase.SchemaTemporary:
return nil, pgerror.Newf(pgcode.InvalidSchemaName, "cannot modify schema %q", n.Schema)
case sqlbase.SchemaUserDefined:
// TODO (rohany): Check permissions here.
desc, err := p.Descriptors().GetMutableSchemaDescriptorByID(ctx, schema.ID, p.txn)
if err != nil {
return nil, err
}
return &alterSchemaNode{n: n, db: mutDB, desc: desc}, nil
default:
return nil, errors.AssertionFailedf("unknown schema kind")
}
}

func (n *alterSchemaNode) startExec(params runParams) error {
return errors.AssertionFailedf("unimplemented")
switch t := n.n.Cmd.(type) {
case *tree.AlterSchemaRename:
return params.p.renameSchema(params.ctx, n.db, n.desc, t.NewName, tree.AsStringWithFQNames(n.n, params.Ann()))
default:
return errors.AssertionFailedf("unknown schema cmd %T", t)
}
}

func (p *planner) renameSchema(
ctx context.Context,
db *sqlbase.MutableDatabaseDescriptor,
desc *sqlbase.MutableSchemaDescriptor,
newName string,
jobDesc string,
) error {
// Check that there isn't a name collision with the new name.
found, err := p.schemaExists(ctx, db.ID, newName)
if err != nil {
return err
}
if found {
return pgerror.Newf(pgcode.DuplicateSchema, "schema %q already exists", newName)
}

// Ensure that the new name is a valid schema name.
if err := sqlbase.IsSchemaNameValid(newName); err != nil {
return err
}

// Set the new name for the descriptor.
oldName := desc.Name
desc.SetName(newName)

// Write a new namespace entry for the new name.
nameKey := sqlbase.NewSchemaKey(desc.ParentID, newName).Key(p.execCfg.Codec)
b := p.txn.NewBatch()
if p.ExtendedEvalContext().Tracing.KVTracingEnabled() {
log.VEventf(ctx, 2, "CPut %s -> %d", nameKey, desc.ID)
}
b.CPut(nameKey, desc.ID, nil)
if err := p.txn.Run(ctx, b); err != nil {
return err
}

// Update the schema mapping in the parent database.

// First, ensure that the new name isn't present, and that we have an entry
// for the old name.
_, oldPresent := db.Schemas[oldName]
_, newPresent := db.Schemas[newName]
if !oldPresent {
return errors.AssertionFailedf(
"old name %q not present in database schema mapping",
oldName,
)
}
if newPresent {
return errors.AssertionFailedf(
"new name %q already present in database schema mapping",
newName,
)
}

// Mark the old schema name as dropped.
db.Schemas[oldName] = descpb.DatabaseDescriptor_SchemaInfo{
ID: desc.ID,
Dropped: true,
}
// Create an entry for the new schema name.
db.Schemas[newName] = descpb.DatabaseDescriptor_SchemaInfo{
ID: desc.ID,
Dropped: false,
}
if err := p.writeDatabaseChange(ctx, db); err != nil {
return err
}

// Write the change to the schema itself.
return p.writeSchemaDescChange(ctx, desc, jobDesc)
}

func (n *alterSchemaNode) Next(params runParams) (bool, error) { return false, nil }
Expand Down
40 changes: 20 additions & 20 deletions pkg/sql/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,21 +312,21 @@ func (sc *SchemaChanger) dropConstraints(
for i := range constraints {
c := &constraints[i]
if c.ConstraintType == descpb.ConstraintToUpdate_FOREIGN_KEY &&
c.ForeignKey.ReferencedTableID != sc.tableID {
c.ForeignKey.ReferencedTableID != sc.descID {
fksByBackrefTable[c.ForeignKey.ReferencedTableID] = append(fksByBackrefTable[c.ForeignKey.ReferencedTableID], c)
}
}
tableIDsToUpdate := make([]descpb.ID, 0, len(fksByBackrefTable)+1)
tableIDsToUpdate = append(tableIDsToUpdate, sc.tableID)
tableIDsToUpdate = append(tableIDsToUpdate, sc.descID)
for id := range fksByBackrefTable {
tableIDsToUpdate = append(tableIDsToUpdate, id)
}

// Create update closure for the table and all other tables with backreferences.
update := func(_ *kv.Txn, descs map[descpb.ID]catalog.MutableDescriptor) error {
scDesc, ok := descs[sc.tableID]
scDesc, ok := descs[sc.descID]
if !ok {
return errors.AssertionFailedf("required table with ID %d not provided to update closure", sc.tableID)
return errors.AssertionFailedf("required table with ID %d not provided to update closure", sc.descID)
}
scTable := scDesc.(*MutableTableDescriptor)
for i := range constraints {
Expand Down Expand Up @@ -356,7 +356,7 @@ func (sc *SchemaChanger) dropConstraints(
if def.Name == constraint.Name {
backrefDesc, ok := descs[constraint.ForeignKey.ReferencedTableID]
if !ok {
return errors.AssertionFailedf("required table with ID %d not provided to update closure", sc.tableID)
return errors.AssertionFailedf("required table with ID %d not provided to update closure", sc.descID)
}
backrefTable := backrefDesc.(*MutableTableDescriptor)
if err := removeFKBackReferenceFromTable(backrefTable, def.Name, scTable.TableDesc()); err != nil {
Expand Down Expand Up @@ -384,7 +384,7 @@ func (sc *SchemaChanger) dropConstraints(
if err != nil {
return nil, err
}
if err := WaitToUpdateLeases(ctx, sc.leaseMgr, sc.tableID); err != nil {
if err := WaitToUpdateLeases(ctx, sc.leaseMgr, sc.descID); err != nil {
return nil, err
}
for id := range fksByBackrefTable {
Expand Down Expand Up @@ -414,21 +414,21 @@ func (sc *SchemaChanger) addConstraints(
for i := range constraints {
c := &constraints[i]
if c.ConstraintType == descpb.ConstraintToUpdate_FOREIGN_KEY &&
c.ForeignKey.ReferencedTableID != sc.tableID {
c.ForeignKey.ReferencedTableID != sc.descID {
fksByBackrefTable[c.ForeignKey.ReferencedTableID] = append(fksByBackrefTable[c.ForeignKey.ReferencedTableID], c)
}
}
tableIDsToUpdate := make([]descpb.ID, 0, len(fksByBackrefTable)+1)
tableIDsToUpdate = append(tableIDsToUpdate, sc.tableID)
tableIDsToUpdate = append(tableIDsToUpdate, sc.descID)
for id := range fksByBackrefTable {
tableIDsToUpdate = append(tableIDsToUpdate, id)
}

// Create update closure for the table and all other tables with backreferences
update := func(_ *kv.Txn, descs map[descpb.ID]catalog.MutableDescriptor) error {
scDesc, ok := descs[sc.tableID]
scDesc, ok := descs[sc.descID]
if !ok {
return errors.AssertionFailedf("required table with ID %d not provided to update closure", sc.tableID)
return errors.AssertionFailedf("required table with ID %d not provided to update closure", sc.descID)
}
scTable := scDesc.(*MutableTableDescriptor)
for i := range constraints {
Expand Down Expand Up @@ -478,7 +478,7 @@ func (sc *SchemaChanger) addConstraints(
scTable.OutboundFKs = append(scTable.OutboundFKs, constraint.ForeignKey)
backrefDesc, ok := descs[constraint.ForeignKey.ReferencedTableID]
if !ok {
return errors.AssertionFailedf("required table with ID %d not provided to update closure", sc.tableID)
return errors.AssertionFailedf("required table with ID %d not provided to update closure", sc.descID)
}
backrefTable := backrefDesc.(*MutableTableDescriptor)
backrefTable.InboundFKs = append(backrefTable.InboundFKs, constraint.ForeignKey)
Expand All @@ -491,7 +491,7 @@ func (sc *SchemaChanger) addConstraints(
if _, err := sc.leaseMgr.PublishMultiple(ctx, tableIDsToUpdate, update, nil); err != nil {
return err
}
if err := WaitToUpdateLeases(ctx, sc.leaseMgr, sc.tableID); err != nil {
if err := WaitToUpdateLeases(ctx, sc.leaseMgr, sc.descID); err != nil {
return err
}
for id := range fksByBackrefTable {
Expand Down Expand Up @@ -531,7 +531,7 @@ func (sc *SchemaChanger) validateConstraints(
var tableDesc *sqlbase.ImmutableTableDescriptor

if err := sc.fixedTimestampTxn(ctx, readAsOf, func(ctx context.Context, txn *kv.Txn) error {
tableDesc, err = catalogkv.MustGetTableDescByID(ctx, txn, sc.execCfg.Codec, sc.tableID)
tableDesc, err = catalogkv.MustGetTableDescByID(ctx, txn, sc.execCfg.Codec, sc.descID)
return err
}); err != nil {
return err
Expand Down Expand Up @@ -609,7 +609,7 @@ func (sc *SchemaChanger) validateConstraints(
func (sc *SchemaChanger) getTableVersion(
ctx context.Context, txn *kv.Txn, tc *descs.Collection, version descpb.DescriptorVersion,
) (*sqlbase.ImmutableTableDescriptor, error) {
tableDesc, err := tc.GetTableVersionByID(ctx, txn, sc.tableID, tree.ObjectLookupFlags{})
tableDesc, err := tc.GetTableVersionByID(ctx, txn, sc.descID, tree.ObjectLookupFlags{})
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -639,7 +639,7 @@ func (sc *SchemaChanger) truncateIndexes(
resumeAt := resume
if log.V(2) {
log.Infof(ctx, "drop index (%d, %d) at row: %d, span: %s",
sc.tableID, sc.mutationID, rowIdx, resume)
sc.descID, sc.mutationID, rowIdx, resume)
}

// Make a new txn just to drop this chunk.
Expand Down Expand Up @@ -696,7 +696,7 @@ func (sc *SchemaChanger) truncateIndexes(
// All the data chunks have been removed. Now also removed the
// zone configs for the dropped indexes, if any.
if err := sc.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
return RemoveIndexZoneConfigs(ctx, txn, sc.execCfg, sc.tableID, dropped)
return RemoveIndexZoneConfigs(ctx, txn, sc.execCfg, sc.descID, dropped)
}); err != nil {
return err
}
Expand Down Expand Up @@ -816,7 +816,7 @@ func (sc *SchemaChanger) distBackfill(
if err := sc.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
var err error
todoSpans, _, mutationIdx, err = rowexec.GetResumeSpans(
ctx, sc.jobRegistry, txn, sc.execCfg.Codec, sc.tableID, sc.mutationID, filter)
ctx, sc.jobRegistry, txn, sc.execCfg.Codec, sc.descID, sc.mutationID, filter)
return err
}); err != nil {
return err
Expand Down Expand Up @@ -903,7 +903,7 @@ func (sc *SchemaChanger) distBackfill(
if err := sc.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
var err error
resumeSpans, _, _, err = rowexec.GetResumeSpans(
ctx, sc.jobRegistry, txn, sc.execCfg.Codec, sc.tableID, sc.mutationID, filter)
ctx, sc.jobRegistry, txn, sc.execCfg.Codec, sc.descID, sc.mutationID, filter)
return err
}); err != nil {
return err
Expand Down Expand Up @@ -935,7 +935,7 @@ func (sc *SchemaChanger) updateJobRunningStatus(
) (*sqlbase.MutableTableDescriptor, error) {
var tableDesc *sqlbase.MutableTableDescriptor
err := sc.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
desc, err := catalogkv.GetDescriptorByID(ctx, txn, sc.execCfg.Codec, sc.tableID, catalogkv.Mutable,
desc, err := catalogkv.GetDescriptorByID(ctx, txn, sc.execCfg.Codec, sc.descID, catalogkv.Mutable,
catalogkv.TableDescriptorKind, true /* required */)
if err != nil {
return err
Expand Down Expand Up @@ -1002,7 +1002,7 @@ func (sc *SchemaChanger) validateIndexes(ctx context.Context) error {
readAsOf := sc.clock.Now()
var tableDesc *sqlbase.ImmutableTableDescriptor
if err := sc.fixedTimestampTxn(ctx, readAsOf, func(ctx context.Context, txn *kv.Txn) (err error) {
tableDesc, err = catalogkv.MustGetTableDescByID(ctx, txn, sc.execCfg.Codec, sc.tableID)
tableDesc, err = catalogkv.MustGetTableDescByID(ctx, txn, sc.execCfg.Codec, sc.descID)
return err
}); err != nil {
return err
Expand Down
18 changes: 18 additions & 0 deletions pkg/sql/catalog/descs/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -601,6 +601,17 @@ func (tc *Collection) getMutableDescriptorByID(
return desc.(catalog.MutableDescriptor), nil
}

// GetMutableSchemaDescriptorByID gets a MutableSchemaDescriptor by ID.
func (tc *Collection) GetMutableSchemaDescriptorByID(
ctx context.Context, scID descpb.ID, txn *kv.Txn,
) (*sqlbase.MutableSchemaDescriptor, error) {
desc, err := tc.getMutableDescriptorByID(ctx, scID, txn)
if err != nil {
return nil, err
}
return desc.(*sqlbase.MutableSchemaDescriptor), nil
}

// hydrateTypesInTableDesc installs user defined type metadata in all types.T
// present in the input TableDescriptor. It always returns the same type of
// TableDescriptor that was passed in. It ensures that ImmutableTableDescriptors
Expand Down Expand Up @@ -1157,6 +1168,13 @@ func (tc *Collection) ResetDatabaseCache(dbCache *database.Cache) {
tc.databaseCache = dbCache
}

// ResetSchemaCache resets the table collection's schema cache.
// TODO (rohany): This can removed once we use the collection's descriptors
// to manage schemas.
func (tc *Collection) ResetSchemaCache() {
tc.schemaCache = sync.Map{}
}

// MigrationSchemaChangeRequiredContext flags a schema change as necessary to
// run even in a mixed-version 19.2/20.1 state where schema changes are normally
// banned, because the schema change is being run in a startup migration. It's
Expand Down
Loading

0 comments on commit b1359ad

Please sign in to comment.