Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor, infoschema: filter out the PRIMARY index and foreign keys in table_constraints #55255

Merged
merged 1 commit into from
Aug 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 53 additions & 33 deletions pkg/executor/infoschema_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ import (
"go.uber.org/zap"
)

var lowerPrimaryKeyName = strings.ToLower(mysql.PrimaryKeyName)

type memtableRetriever struct {
dummyCloser
table *model.TableInfo
Expand Down Expand Up @@ -1170,9 +1172,6 @@ func (e *memtableRetriever) setDataFromPartitions(ctx context.Context, sctx sess
} else {
// needs to update needed partitions for partition table.
for _, pi := range table.GetPartitionInfo().Definitions {
if ex.Filter("partition_name", pi.Name.L) {
continue
}
err := cache.TableRowStatsCache.UpdateByID(sctx, pi.ID)
if err != nil {
return err
Expand All @@ -1187,6 +1186,10 @@ func (e *memtableRetriever) setDataFromPartitions(ctx context.Context, sctx sess
if rowCount != 0 {
avgRowLength = dataLength / rowCount
}
// If there are any condition on the `PARTITION_NAME` in the extractor, this record should be ignored
if len(ex.ColPredicates["partition_name"]) > 0 {
continue
}
record := types.MakeDatums(
infoschema.CatalogVal, // TABLE_CATALOG
schema.O, // TABLE_SCHEMA
Expand Down Expand Up @@ -1818,24 +1821,26 @@ func (e *memtableRetriever) setDataForMetricTables() {
func keyColumnUsageInTable(schema model.CIStr, table *model.TableInfo, extractor *plannercore.InfoSchemaBaseExtractor) [][]types.Datum {
var rows [][]types.Datum
if table.PKIsHandle {
for _, col := range table.Columns {
if mysql.HasPriKeyFlag(col.GetFlag()) {
record := types.MakeDatums(
infoschema.CatalogVal, // CONSTRAINT_CATALOG
schema.O, // CONSTRAINT_SCHEMA
infoschema.PrimaryConstraint, // CONSTRAINT_NAME
infoschema.CatalogVal, // TABLE_CATALOG
schema.O, // TABLE_SCHEMA
table.Name.O, // TABLE_NAME
col.Name.O, // COLUMN_NAME
1, // ORDINAL_POSITION
1, // POSITION_IN_UNIQUE_CONSTRAINT
nil, // REFERENCED_TABLE_SCHEMA
nil, // REFERENCED_TABLE_NAME
nil, // REFERENCED_COLUMN_NAME
)
rows = append(rows, record)
break
if extractor == nil || !extractor.Filter("constraint_name", lowerPrimaryKeyName) {
for _, col := range table.Columns {
if mysql.HasPriKeyFlag(col.GetFlag()) {
record := types.MakeDatums(
infoschema.CatalogVal, // CONSTRAINT_CATALOG
schema.O, // CONSTRAINT_SCHEMA
infoschema.PrimaryConstraint, // CONSTRAINT_NAME
infoschema.CatalogVal, // TABLE_CATALOG
schema.O, // TABLE_SCHEMA
table.Name.O, // TABLE_NAME
col.Name.O, // COLUMN_NAME
1, // ORDINAL_POSITION
1, // POSITION_IN_UNIQUE_CONSTRAINT
nil, // REFERENCED_TABLE_SCHEMA
nil, // REFERENCED_TABLE_NAME
nil, // REFERENCED_COLUMN_NAME
)
rows = append(rows, record)
break
}
}
}
}
Expand All @@ -1845,16 +1850,19 @@ func keyColumnUsageInTable(schema model.CIStr, table *model.TableInfo, extractor
}
for _, index := range table.Indices {
var idxName string
var filterIdxName string
if index.Primary {
idxName = infoschema.PrimaryConstraint
idxName = mysql.PrimaryKeyName
filterIdxName = lowerPrimaryKeyName
} else if index.Unique {
idxName = index.Name.O
filterIdxName = index.Name.L
} else {
// Only handle unique/primary key
continue
}

if extractor != nil && extractor.Filter("constraint_name", idxName) {
if extractor != nil && extractor.Filter("constraint_name", filterIdxName) {
continue
}

Expand All @@ -1881,6 +1889,10 @@ func keyColumnUsageInTable(schema model.CIStr, table *model.TableInfo, extractor
}
}
for _, fk := range table.ForeignKeys {
if extractor != nil && extractor.Filter("constraint_name", fk.Name.L) {
continue
}

for i, key := range fk.Cols {
fkRefCol := ""
if len(fk.RefCols) > i {
Expand Down Expand Up @@ -2146,30 +2158,35 @@ func (e *memtableRetriever) setDataFromTableConstraints(ctx context.Context, sct
}

if tbl.PKIsHandle {
record := types.MakeDatums(
infoschema.CatalogVal, // CONSTRAINT_CATALOG
schema.O, // CONSTRAINT_SCHEMA
mysql.PrimaryKeyName, // CONSTRAINT_NAME
schema.O, // TABLE_SCHEMA
tbl.Name.O, // TABLE_NAME
infoschema.PrimaryKeyType, // CONSTRAINT_TYPE
)
rows = append(rows, record)
if !ok || !extractor.Filter("constraint_name", lowerPrimaryKeyName) {
record := types.MakeDatums(
infoschema.CatalogVal, // CONSTRAINT_CATALOG
schema.O, // CONSTRAINT_SCHEMA
mysql.PrimaryKeyName, // CONSTRAINT_NAME
schema.O, // TABLE_SCHEMA
tbl.Name.O, // TABLE_NAME
infoschema.PrimaryKeyType, // CONSTRAINT_TYPE
)
rows = append(rows, record)
}
}

for _, idx := range tbl.Indices {
var cname, ctype string
var filterName string
if idx.Primary {
cname = mysql.PrimaryKeyName
filterName = lowerPrimaryKeyName
ctype = infoschema.PrimaryKeyType
} else if idx.Unique {
cname = idx.Name.O
filterName = idx.Name.L
ctype = infoschema.UniqueKeyType
} else {
// The index has no constriant.
continue
}
if ok && extractor.Filter("constraint_name", cname) {
if ok && extractor.Filter("constraint_name", filterName) {
continue
}
record := types.MakeDatums(
Expand All @@ -2184,6 +2201,9 @@ func (e *memtableRetriever) setDataFromTableConstraints(ctx context.Context, sct
}
// TiDB includes foreign key information for compatibility but foreign keys are not yet enforced.
for _, fk := range tbl.ForeignKeys {
if ok && extractor.Filter("constraint_name", fk.Name.L) {
continue
}
record := types.MakeDatums(
infoschema.CatalogVal, // CONSTRAINT_CATALOG
schema.O, // CONSTRAINT_SCHEMA
Expand Down
46 changes: 40 additions & 6 deletions pkg/executor/infoschema_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -643,14 +643,14 @@ func TestInfoSchemaConditionWorks(t *testing.T) {
tk := testkit.NewTestKit(t, store)
for db := 0; db < 2; db++ {
for table := 0; table < 2; table++ {
tk.MustExec(fmt.Sprintf("create database if not exists db%d;", db))
tk.MustExec(fmt.Sprintf(`create table db%d.table%d (id int primary key, data0 varchar(255), data1 varchar(255))
tk.MustExec(fmt.Sprintf("create database if not exists Db%d;", db))
tk.MustExec(fmt.Sprintf(`create table Db%d.Table%d (id int primary key, data0 varchar(255), data1 varchar(255))
partition by range (id) (
partition p0 values less than (10),
partition p1 values less than (20)
);`, db, table))
for index := 0; index < 2; index++ {
tk.MustExec(fmt.Sprintf("create index idx%d on db%d.table%d (data%d);", index, db, table, index))
tk.MustExec(fmt.Sprintf("create unique index Idx%d on Db%d.Table%d (id, data%d);", index, db, table, index))
}
}
}
Expand Down Expand Up @@ -703,12 +703,46 @@ func TestInfoSchemaConditionWorks(t *testing.T) {
colName := cols[i].Column.Name.L
if valPrefix, ok := testColumns[colName]; ok {
for j := 0; j < 2; j++ {
rows := tk.MustQuery(fmt.Sprintf("select * from information_schema.%s where %s = '%s%d';",
table, colName, valPrefix, j)).Rows()
sql := fmt.Sprintf("select * from information_schema.%s where %s = '%s%d';",
table, colName, valPrefix, j)
rows := tk.MustQuery(sql).Rows()
rowCountWithCondition := len(rows)
require.Less(t, rowCountWithCondition, rowCount, "%s has no effect on %s", colName, table)
require.Less(t, rowCountWithCondition, rowCount, "%s has no effect on %s. SQL: %s", colName, table, sql)

// check the condition works as expected
for _, row := range rows {
require.Equal(t, fmt.Sprintf("%s%d", valPrefix, j), strings.ToLower(row[i].(string)),
"%s has no effect on %s. SQL: %s", colName, table, sql)
}
}
}
}
}

// Test the PRIMARY constraint filter
rows := tk.MustQuery("select constraint_name, table_schema from information_schema.table_constraints where constraint_name = 'PRIMARY' and table_schema = 'db0';").Rows()
require.Equal(t, 2, len(rows))
for _, row := range rows {
require.Equal(t, "PRIMARY", row[0].(string))
require.Equal(t, "Db0", row[1].(string))
}
rows = tk.MustQuery("select constraint_name, table_schema from information_schema.key_column_usage where constraint_name = 'PRIMARY' and table_schema = 'db1';").Rows()
require.Equal(t, 2, len(rows))
for _, row := range rows {
require.Equal(t, "PRIMARY", row[0].(string))
require.Equal(t, "Db1", row[1].(string))
}

// Test the `partition_name` filter
tk.MustExec("create database if not exists db_no_partition;")
tk.MustExec("create table db_no_partition.t_no_partition (id int primary key, data0 varchar(255), data1 varchar(255));")
tk.MustExec(`create table db_no_partition.t_partition (id int primary key, data0 varchar(255), data1 varchar(255))
partition by range (id) (
partition p0 values less than (10),
partition p1 values less than (20)
);`)
rows = tk.MustQuery("select * from information_schema.partitions where table_schema = 'db_no_partition' and partition_name is NULL;").Rows()
require.Equal(t, 1, len(rows))
rows = tk.MustQuery("select * from information_schema.partitions where table_schema = 'db_no_partition' and (partition_name is NULL or partition_name = 'p0');").Rows()
require.Equal(t, 2, len(rows))
}