Skip to content

Commit

Permalink
Merge #58875 #58947
Browse files Browse the repository at this point in the history
58875: sql: add doctor check for parent schema in parent db r=postamar a=postamar

With this patch, doctor now also checks that the parent id of an object
matches the parent id of the parent schema of the object.

Fixes #55716.

Release note: None

58947: sql: fix index column direction lookup bugs r=postamar a=postamar

This patch fixes a couple of instances in which we look up an index's
column directions using a wrong ordinal when the index has implicit
columns due to it being sharded or partitioned.

Fixes #58945.

Release note (bug fix): The indoption column in pg_catalog.index is now
populated correctly.

Co-authored-by: Marius Posta <[email protected]>
  • Loading branch information
craig[bot] and Marius Posta committed Jan 14, 2021
3 parents 4b79aab + 943ecdd + 91220b7 commit b1f5a62
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 16 deletions.
16 changes: 11 additions & 5 deletions pkg/sql/doctor/doctor.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func ExamineDescriptors(
}

_, parentExists := descGetter[desc.GetParentID()]
_, parentSchemaExists := descGetter[desc.GetParentSchemaID()]
parentSchema, parentSchemaExists := descGetter[desc.GetParentSchemaID()]
switch d := desc.(type) {
case catalog.TableDescriptor:
if err := d.Validate(ctx, descGetter); err != nil {
Expand All @@ -161,15 +161,21 @@ func ExamineDescriptors(
// parent schema id is always 0.
parentSchemaExists = true
}
var invalidParentID bool
if desc.GetParentID() != descpb.InvalidID && !parentExists {
problemsFound = true
invalidParentID = true
fmt.Fprint(stdout, reportMsg(desc, "invalid parent id %d", desc.GetParentID()))
}
if desc.GetParentSchemaID() != descpb.InvalidID &&
desc.GetParentSchemaID() != keys.PublicSchemaID &&
!parentSchemaExists {
problemsFound = true
fmt.Fprint(stdout, reportMsg(desc, "invalid parent schema id %d", desc.GetParentSchemaID()))
desc.GetParentSchemaID() != keys.PublicSchemaID {
if !parentSchemaExists {
problemsFound = true
fmt.Fprint(stdout, reportMsg(desc, "invalid parent schema id %d", desc.GetParentSchemaID()))
} else if !invalidParentID && parentSchema.GetParentID() != desc.GetParentID() {
problemsFound = true
fmt.Fprint(stdout, reportMsg(desc, "invalid parent id of parent schema, expected %d, found %d", desc.GetParentID(), parentSchema.GetParentID()))
}
}

// Process namespace entries pointing to this descriptor.
Expand Down
46 changes: 46 additions & 0 deletions pkg/sql/doctor/doctor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ func TestExamineDescriptors(t *testing.T) {
descpb.TableFromDescriptor(droppedValidTableDesc, hlc.Timestamp{WallTime: 1}).
State = descpb.DescriptorState_DROP

inSchemaValidTableDesc := protoutil.Clone(validTableDesc).(*descpb.Descriptor)
descpb.TableFromDescriptor(inSchemaValidTableDesc, hlc.Timestamp{WallTime: 1}).
UnexposedParentSchemaID = 3

tests := []struct {
descTable doctor.DescriptorTable
namespaceTable doctor.NamespaceTable
Expand Down Expand Up @@ -194,6 +198,48 @@ Database 1: ParentID 0, ParentSchemaID 0, Name 'db': not being dropped but
},
expected: `Examining 1 descriptors and 1 namespace entries...
Type 1: ParentID 0, ParentSchemaID 0, Name 'type': invalid parentID 0
`,
},
{
descTable: doctor.DescriptorTable{
{
ID: 1,
DescBytes: toBytes(t, &descpb.Descriptor{Union: &descpb.Descriptor_Type{
Type: &descpb.TypeDescriptor{Name: "type", ID: 1, ParentSchemaID: 2},
}}),
},
},
namespaceTable: doctor.NamespaceTable{
{NameInfo: descpb.NameInfo{ParentSchemaID: 2, Name: "type"}, ID: 1},
},
expected: `Examining 1 descriptors and 1 namespace entries...
Type 1: ParentID 0, ParentSchemaID 2, Name 'type': invalid parentID 0
Type 1: ParentID 0, ParentSchemaID 2, Name 'type': invalid parent schema id 2
`,
},
{
descTable: doctor.DescriptorTable{
{ID: 1, DescBytes: toBytes(t, inSchemaValidTableDesc)},
{
ID: 2,
DescBytes: toBytes(t, &descpb.Descriptor{Union: &descpb.Descriptor_Database{
Database: &descpb.DatabaseDescriptor{Name: "db", ID: 2},
}}),
},
{
ID: 3,
DescBytes: toBytes(t, &descpb.Descriptor{Union: &descpb.Descriptor_Schema{
Schema: &descpb.SchemaDescriptor{Name: "schema", ID: 3, ParentID: 0},
}}),
},
},
namespaceTable: doctor.NamespaceTable{
{NameInfo: descpb.NameInfo{ParentID: 2, ParentSchemaID: 3, Name: "t"}, ID: 1},
{NameInfo: descpb.NameInfo{Name: "db"}, ID: 2},
{NameInfo: descpb.NameInfo{ParentID: 0, Name: "schema"}, ID: 3},
},
expected: `Examining 3 descriptors and 3 namespace entries...
Table 1: ParentID 2, ParentSchemaID 3, Name 't': invalid parent id of parent schema, expected 2, found 0
`,
},
{
Expand Down
19 changes: 19 additions & 0 deletions pkg/sql/logictest/testdata/logic_test/pg_catalog
Original file line number Diff line number Diff line change
Expand Up @@ -3031,3 +3031,22 @@ SELECT tablename FROM pg_catalog.pg_tables

statement ok
SET DATABASE = test;

subtest 58945

statement ok
SET experimental_enable_hash_sharded_indexes = true

statement ok
CREATE TABLE t_hash (
a INT,
INDEX t_hash_a_idx (a DESC) USING HASH WITH BUCKET_COUNT=8
);

query T colnames
SELECT indoption
FROM pg_catalog.pg_index
WHERE indexrelid IN (SELECT crdb_oid FROM pg_catalog.pg_indexes WHERE indexname = 't_hash_a_idx')
----
indoption
1
20 changes: 9 additions & 11 deletions pkg/sql/pg_catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -1453,22 +1453,17 @@ https://www.postgresql.org/docs/9.5/catalog-pg-index.html`,
table.GetIndexMutationCapabilities(index.GetID())
isReady := isMutation && isWriteOnly

colIDs := make([]descpb.ColumnID, 0, index.NumColumns())
for i := index.IndexDesc().ExplicitColumnStartIdx(); i < index.NumColumns(); i++ {
colIDs = append(colIDs, index.GetColumnID(i))
}
indkey, err := colIDArrayToVector(colIDs)
if err != nil {
return err
}
// Get the collations for all of the columns. To do this we require
// the type of the column.
// Also fill in indoption for each column to indicate if the index
// is ASC/DESC and if nulls appear first/last.
collationOids := tree.NewDArray(types.Oid)
indoption := tree.NewDArray(types.Int)

for i, columnID := range colIDs {
colIDs := make([]descpb.ColumnID, 0, index.NumColumns())
for i := index.IndexDesc().ExplicitColumnStartIdx(); i < index.NumColumns(); i++ {
columnID := index.GetColumnID(i)
colIDs = append(colIDs, columnID)
col, err := table.FindColumnByID(columnID)
if err != nil {
return err
Expand All @@ -1488,6 +1483,10 @@ https://www.postgresql.org/docs/9.5/catalog-pg-index.html`,
return err
}
}
indkey, err := colIDArrayToVector(colIDs)
if err != nil {
return err
}
collationOidVector := tree.NewDOidVectorFromDArray(collationOids)
indoptionIntVector := tree.NewDIntVectorFromDArray(indoption)
// TODO(bram): #27763 indclass still needs to be populated but it
Expand Down Expand Up @@ -1562,7 +1561,6 @@ func indexDefFromDescriptor(
tableLookup tableLookupFn,
) (string, error) {
colNames := index.ColumnNames[index.ExplicitColumnStartIdx():]

indexDef := tree.CreateIndex{
Name: tree.Name(index.Name),
Table: tree.MakeTableName(tree.Name(db.GetName()), tree.Name(table.GetName())),
Expand All @@ -1576,7 +1574,7 @@ func indexDefFromDescriptor(
Column: tree.Name(name),
Direction: tree.Ascending,
}
if index.ColumnDirections[i] == descpb.IndexDescriptor_DESC {
if index.ColumnDirections[index.ExplicitColumnStartIdx()+i] == descpb.IndexDescriptor_DESC {
elem.Direction = tree.Descending
}
indexDef.Columns[i] = elem
Expand Down

0 comments on commit b1f5a62

Please sign in to comment.