Skip to content

Commit

Permalink
catkv,sql: remove catkv.Direct interface
Browse files Browse the repository at this point in the history
This can now be replaced by calls to descs.Collection methods.

Informs cockroachdb#64089.

Release note: None
  • Loading branch information
Marius Posta committed Dec 15, 2022
1 parent 571257b commit 5cc0140
Show file tree
Hide file tree
Showing 31 changed files with 324 additions and 307 deletions.
26 changes: 21 additions & 5 deletions pkg/ccl/backupccl/restore_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ func allocateDescriptorRewrites(
}

// See if there is an existing schema with the same name.
id, err := col.Direct().LookupSchemaID(ctx, txn, parentID, sc.Name)
id, err := col.LookupSchemaID(ctx, txn, parentID, sc.Name)
if err != nil {
return err
}
Expand All @@ -335,7 +335,11 @@ func allocateDescriptorRewrites(
} else {
// If we found an existing schema, then we need to remap all references
// to this schema to the existing one.
desc, err := col.Direct().MustGetSchemaDescByID(ctx, txn, id)
desc, err := col.GetImmutableSchemaByID(ctx, txn, id, tree.SchemaLookupFlags{
AvoidLeased: true,
IncludeDropped: true,
IncludeOffline: true,
})
if err != nil {
return err
}
Expand Down Expand Up @@ -384,7 +388,11 @@ func allocateDescriptorRewrites(
}

// Check privileges.
parentDB, err := col.Direct().MustGetDatabaseDescByID(ctx, txn, parentID)
_, parentDB, err := col.GetImmutableDatabaseByID(ctx, txn, parentID, tree.DatabaseLookupFlags{
AvoidLeased: true,
IncludeDropped: true,
IncludeOffline: true,
})
if err != nil {
return errors.Wrapf(err,
"failed to lookup parent DB %d", errors.Safe(parentID))
Expand Down Expand Up @@ -449,7 +457,11 @@ func allocateDescriptorRewrites(
targetDB, typ.Name)
}
// Check privileges on the parent DB.
parentDB, err := col.Direct().MustGetDatabaseDescByID(ctx, txn, parentID)
_, parentDB, err := col.GetImmutableDatabaseByID(ctx, txn, parentID, tree.DatabaseLookupFlags{
AvoidLeased: true,
IncludeDropped: true,
IncludeOffline: true,
})
if err != nil {
return errors.Wrapf(err,
"failed to lookup parent DB %d", errors.Safe(parentID))
Expand Down Expand Up @@ -697,7 +709,11 @@ func getDatabaseIDAndDesc(
return dbID, nil, errors.Errorf("a database named %q needs to exist", targetDB)
}
// Check privileges on the parent DB.
dbDesc, err = col.Direct().MustGetDatabaseDescByID(ctx, txn, dbID)
_, dbDesc, err = col.GetImmutableDatabaseByID(ctx, txn, dbID, tree.DatabaseLookupFlags{
AvoidLeased: true,
IncludeDropped: true,
IncludeOffline: true,
})
if err != nil {
return 0, nil, errors.Wrapf(err,
"failed to lookup parent DB %d", errors.Safe(dbID))
Expand Down
8 changes: 7 additions & 1 deletion pkg/ccl/backupccl/targets.go
Original file line number Diff line number Diff line change
Expand Up @@ -594,7 +594,13 @@ func checkMultiRegionCompatible(
// For REGION BY TABLE IN <region> tables, allow the restore if the
// database has the region.
regionEnumID := database.GetRegionConfig().RegionEnumID
regionEnum, err := col.Direct().MustGetTypeDescByID(ctx, txn, regionEnumID)
regionEnum, err := col.GetImmutableTypeByID(ctx, txn, regionEnumID, tree.ObjectLookupFlags{
CommonLookupFlags: tree.CommonLookupFlags{
AvoidLeased: true,
IncludeDropped: true,
IncludeOffline: true,
},
})
if err != nil {
return err
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -1180,7 +1180,11 @@ func getQualifiedTableNameObj(
ctx context.Context, execCfg *sql.ExecutorConfig, txn *kv.Txn, desc catalog.TableDescriptor,
) (tree.TableName, error) {
col := execCfg.CollectionFactory.NewCollection(ctx)
dbDesc, err := col.Direct().MustGetDatabaseDescByID(ctx, txn, desc.GetParentID())
_, dbDesc, err := col.GetImmutableDatabaseByID(ctx, txn, desc.GetParentID(), tree.DatabaseLookupFlags{
AvoidLeased: true,
IncludeDropped: true,
IncludeOffline: true,
})
if err != nil {
return tree.TableName{}, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduledjobs/schedulebase/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ func FullyQualifyTables(
if err != nil {
return err
}
schemaID, err = col.Direct().LookupSchemaID(ctx, txn, dbDesc.GetID(), tp.SchemaName.String())
schemaID, err = col.LookupSchemaID(ctx, txn, dbDesc.GetID(), tp.SchemaName.String())
return err
}); err != nil {
return nil, err
Expand Down
16 changes: 14 additions & 2 deletions pkg/sql/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -1379,7 +1379,13 @@ func (sc *SchemaChanger) updateJobRunningStatus(
) (tableDesc catalog.TableDescriptor, err error) {
err = DescsTxn(ctx, sc.execCfg, func(ctx context.Context, txn *kv.Txn, col *descs.Collection) (err error) {
// Read table descriptor without holding a lease.
tableDesc, err = col.Direct().MustGetTableDescByID(ctx, txn, sc.descID)
tableDesc, err = col.GetImmutableTableByID(ctx, txn, sc.descID, tree.ObjectLookupFlags{
CommonLookupFlags: tree.CommonLookupFlags{
AvoidLeased: true,
IncludeDropped: true,
IncludeOffline: true,
},
})
if err != nil {
return err
}
Expand Down Expand Up @@ -2635,7 +2641,13 @@ func getTargetTablesAndFk(
if fk == nil {
return nil, nil, nil, errors.AssertionFailedf("foreign key %s does not exist", fkName)
}
targetTable, err = descsCol.Direct().MustGetTableDescByID(ctx, txn, fk.ReferencedTableID)
targetTable, err = descsCol.GetImmutableTableByID(ctx, txn, fk.ReferencedTableID, tree.ObjectLookupFlags{
CommonLookupFlags: tree.CommonLookupFlags{
AvoidLeased: true,
IncludeDropped: true,
IncludeOffline: true,
},
})
if err != nil {
return nil, nil, nil, err
}
Expand Down
8 changes: 0 additions & 8 deletions pkg/sql/catalog/descs/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -1206,14 +1206,6 @@ func (tc *Collection) GetConstraintComment(
return tc.GetComment(catalogkeys.MakeCommentKey(uint32(tableID), uint32(constraintID), catalogkeys.ConstraintCommentType))
}

// Direct exports the catkv.Direct interface.
type Direct = catkv.Direct

// Direct provides direct access to the underlying KV-storage.
func (tc *Collection) Direct() Direct {
return catkv.MakeDirect(tc.codec(), tc.version, tc.validationModeProvider)
}

// MakeTestCollection makes a Collection that can be used for tests.
func MakeTestCollection(ctx context.Context, leaseManager *lease.Manager) Collection {
settings := cluster.MakeTestingClusterSettings()
Expand Down
71 changes: 57 additions & 14 deletions pkg/sql/catalog/desctestutils/descriptor_test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,17 @@ func TestingGetDatabaseDescriptorWithVersion(
) catalog.DatabaseDescriptor {
ctx := context.Background()
var desc catalog.Descriptor
direct := catkv.MakeDirect(
codec, version, catkv.DefaultDescriptorValidationModeProvider,
cr := catkv.NewCatalogReader(
codec, version, nil /* systemDatabaseCache */, nil, /* maybeMonitor */
)
if err := kvDB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) (err error) {
id, err := direct.LookupDescriptorID(ctx, txn, keys.RootNamespaceID, keys.RootNamespaceID, database)
id, err := lookupDescriptorID(ctx, cr, txn, keys.RootNamespaceID, keys.RootNamespaceID, database)
if err != nil {
panic(err)
} else if id == descpb.InvalidID {
panic(fmt.Sprintf("database %s not found", database))
}
desc, err = direct.MustGetDescriptorByID(ctx, txn, id, catalog.Database)
desc, err = mustGetDescriptorByID(ctx, version, cr, txn, id, catalog.Database)
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -76,17 +76,17 @@ func TestingGetSchemaDescriptorWithVersion(
) catalog.SchemaDescriptor {
ctx := context.Background()
var desc catalog.Descriptor
direct := catkv.MakeDirect(
codec, version, catkv.DefaultDescriptorValidationModeProvider,
cr := catkv.NewCatalogReader(
codec, version, nil /* systemDatabaseCache */, nil, /* maybeMonitor */
)
if err := kvDB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) (err error) {
schemaID, err := direct.LookupDescriptorID(ctx, txn, dbID, keys.RootNamespaceID, schemaName)
schemaID, err := lookupDescriptorID(ctx, cr, txn, dbID, keys.RootNamespaceID, schemaName)
if err != nil {
panic(err)
} else if schemaID == descpb.InvalidID {
panic(fmt.Sprintf("schema %s not found", schemaName))
}
desc, err = direct.MustGetDescriptorByID(ctx, txn, schemaID, catalog.Schema)
desc, err = mustGetDescriptorByID(ctx, version, cr, txn, schemaID, catalog.Schema)
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -174,39 +174,82 @@ func testingGetObjectDescriptor(
object string,
) (desc catalog.Descriptor) {
ctx := context.Background()
direct := catkv.MakeDirect(
codec, version, catkv.DefaultDescriptorValidationModeProvider,
cr := catkv.NewCatalogReader(
codec, version, nil /* systemDatabaseCache */, nil, /* maybeMonitor */
)
if err := kvDB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) (err error) {
dbID, err := direct.LookupDescriptorID(ctx, txn, keys.RootNamespaceID, keys.RootNamespaceID, database)
dbID, err := lookupDescriptorID(ctx, cr, txn, keys.RootNamespaceID, keys.RootNamespaceID, database)
if err != nil {
return err
}
if dbID == descpb.InvalidID {
return errors.Errorf("database %s not found", database)
}
schemaID, err := direct.LookupDescriptorID(ctx, txn, dbID, keys.RootNamespaceID, schema)
schemaID, err := lookupDescriptorID(ctx, cr, txn, dbID, keys.RootNamespaceID, schema)
if err != nil {
return err
}
if schemaID == descpb.InvalidID {
return errors.Errorf("schema %s not found", schema)
}
objectID, err := direct.LookupDescriptorID(ctx, txn, dbID, schemaID, object)
objectID, err := lookupDescriptorID(ctx, cr, txn, dbID, schemaID, object)
if err != nil {
return err
}
if objectID == descpb.InvalidID {
return errors.Errorf("object %s not found", object)
}
desc, err = direct.MustGetDescriptorByID(ctx, txn, objectID, catalog.Any)
desc, err = mustGetDescriptorByID(ctx, version, cr, txn, objectID, catalog.Any)
return err
}); err != nil {
panic(err)
}
return desc
}

func lookupDescriptorID(
ctx context.Context,
cr catkv.CatalogReader,
txn *kv.Txn,
dbID descpb.ID,
schemaID descpb.ID,
objectName string,
) (descpb.ID, error) {
key := descpb.NameInfo{ParentID: dbID, ParentSchemaID: schemaID, Name: objectName}
c, err := cr.GetByNames(ctx, txn, []descpb.NameInfo{key})
if err != nil {
return descpb.InvalidID, err
}
if e := c.LookupNamespaceEntry(&key); e != nil {
return e.GetID(), nil
}
return descpb.InvalidID, nil
}

func mustGetDescriptorByID(
ctx context.Context,
version clusterversion.ClusterVersion,
cr catkv.CatalogReader,
txn *kv.Txn,
id descpb.ID,
expectedType catalog.DescriptorType,
) (catalog.Descriptor, error) {
const isDescriptorRequired = true
c, err := cr.GetByIDs(ctx, txn, []descpb.ID{id}, isDescriptorRequired, expectedType)
if err != nil {
return nil, err
}
desc := c.LookupDescriptor(id)
vd := catkv.NewCatalogReaderBackedValidationDereferencer(cr, txn, nil /* dvmpMaybe */)
ve := validate.Validate(
ctx, version, vd, catalog.ValidationReadTelemetry, validate.ImmutableRead, desc,
)
if err := ve.CombinedError(); err != nil {
return nil, err
}
return desc, nil
}

// TestingValidateSelf is a convenience function for internal descriptor
// validation.
func TestingValidateSelf(desc catalog.Descriptor) error {
Expand Down
13 changes: 10 additions & 3 deletions pkg/sql/catalog/ingesting/privileges.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,11 @@ func getIngestingPrivilegesForTableOrSchema(
privilege.ListFromBitField(u.Privileges, privilegeType).ToBitField()
}
} else if descCoverage == tree.RequestedDescriptors {
parentDB, err := descsCol.Direct().MustGetDatabaseDescByID(ctx, txn, desc.GetParentID())
_, parentDB, err := descsCol.GetImmutableDatabaseByID(ctx, txn, desc.GetParentID(), tree.DatabaseLookupFlags{
AvoidLeased: true,
IncludeDropped: true,
IncludeOffline: true,
})
if err != nil {
return nil, errors.Wrapf(err, "failed to lookup parent DB %d", errors.Safe(desc.GetParentID()))
}
Expand All @@ -141,8 +145,11 @@ func getIngestingPrivilegesForTableOrSchema(
} else {
// If we are restoring into an existing schema, resolve it, and fetch
// its default privileges.
parentSchema, err := descsCol.Direct().MustGetSchemaDescByID(ctx, txn,
desc.GetParentSchemaID())
parentSchema, err := descsCol.GetImmutableSchemaByID(ctx, txn, desc.GetParentSchemaID(), tree.SchemaLookupFlags{
AvoidLeased: true,
IncludeDropped: true,
IncludeOffline: true,
})
if err != nil {
return nil,
errors.Wrapf(err, "failed to lookup parent schema %d", errors.Safe(desc.GetParentSchemaID()))
Expand Down
1 change: 0 additions & 1 deletion pkg/sql/catalog/internal/catkv/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ go_library(
"catalog_query.go",
"catalog_reader.go",
"catalog_reader_cached.go",
"direct.go",
"system_database_cache.go",
"validate.go",
],
Expand Down
Loading

0 comments on commit 5cc0140

Please sign in to comment.