Skip to content

Commit

Permalink
statistics: update the global stats correctly after truncate a partit…
Browse files Browse the repository at this point in the history
…ion (#49362)

close #39681
  • Loading branch information
Rustin170506 authored Dec 13, 2023
1 parent 3379450 commit 619d5de
Show file tree
Hide file tree
Showing 5 changed files with 305 additions and 24 deletions.
3 changes: 2 additions & 1 deletion pkg/statistics/handle/ddl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ go_library(
"ddl.go",
"drop_partition.go",
"exchange_partition.go",
"truncate_partition.go",
],
importpath = "github.com/pingcap/tidb/pkg/statistics/handle/ddl",
visibility = ["//visibility:public"],
Expand All @@ -29,7 +30,7 @@ go_test(
timeout = "short",
srcs = ["ddl_test.go"],
flaky = True,
shard_count = 6,
shard_count = 8,
deps = [
"//pkg/parser/model",
"//pkg/planner/cardinality",
Expand Down
7 changes: 2 additions & 5 deletions pkg/statistics/handle/ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,8 @@ func (h *ddlHandlerImpl) HandleDDLEvent(t *util.DDLEvent) error {
}
}
case model.ActionTruncateTablePartition:
globalTableInfo, addedPartInfo, _ := t.GetTruncatePartitionInfo()
for _, def := range addedPartInfo.Definitions {
if err := h.statsWriter.InsertTableStats2KV(globalTableInfo, def.ID); err != nil {
return err
}
if err := h.onTruncatePartitions(t); err != nil {
return err
}
case model.ActionDropTablePartition:
if err := h.onDropPartitions(t); err != nil {
Expand Down
153 changes: 138 additions & 15 deletions pkg/statistics/handle/ddl/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,21 +255,6 @@ PARTITION BY RANGE ( a ) (
require.False(t, statsTbl.Pseudo)
}

truncatePartition := "alter table t truncate partition p4"
testKit.MustExec(truncatePartition)
is = do.InfoSchema()
tbl, err = is.TableByName(model.NewCIStr("test"), model.NewCIStr("t"))
require.NoError(t, err)
tableInfo = tbl.Meta()
err = h.HandleDDLEvent(<-h.DDLEventCh())
require.NoError(t, err)
require.Nil(t, h.Update(is))
pi = tableInfo.GetPartitionInfo()
for _, def := range pi.Definitions {
statsTbl := h.GetPartitionStats(tableInfo, def.ID)
require.False(t, statsTbl.Pseudo)
}

reorganizePartition := "alter table t reorganize partition p0,p1 into (partition p0 values less than (11))"
testKit.MustExec(reorganizePartition)
is = do.InfoSchema()
Expand All @@ -287,6 +272,144 @@ PARTITION BY RANGE ( a ) (
}
}

func TestTruncateAPartition(t *testing.T) {
store, do := testkit.CreateMockStoreAndDomain(t)
testKit := testkit.NewTestKit(t, store)
h := do.StatsHandle()
testKit.MustExec("use test")
testKit.MustExec("drop table if exists t")
testKit.MustExec(`
create table t (
a int,
b int,
primary key(a),
index idx(b)
)
partition by range (a) (
partition p0 values less than (6),
partition p1 values less than (11),
partition p2 values less than (16),
partition p3 values less than (21)
)
`)
testKit.MustExec("insert into t values (1,2),(2,2),(6,2),(11,2),(16,2)")
testKit.MustExec("analyze table t")
is := do.InfoSchema()
tbl, err := is.TableByName(
model.NewCIStr("test"), model.NewCIStr("t"),
)
require.NoError(t, err)
tableInfo := tbl.Meta()
pi := tableInfo.GetPartitionInfo()
for _, def := range pi.Definitions {
statsTbl := h.GetPartitionStats(tableInfo, def.ID)
require.False(t, statsTbl.Pseudo)
}
err = h.Update(is)
require.NoError(t, err)

// Get partition p0's stats update version.
partitionID := pi.Definitions[0].ID
// Get it from stats_meat first.
rows := testKit.MustQuery(
"select version from mysql.stats_meta where table_id = ?", partitionID,
).Rows()
require.Len(t, rows, 1)
version := rows[0][0].(string)

testKit.MustExec("alter table t truncate partition p0")
// Find the truncate partition event.
truncatePartitionEvent := findEvent(h.DDLEventCh(), model.ActionTruncateTablePartition)
err = h.HandleDDLEvent(truncatePartitionEvent)
require.NoError(t, err)
// Check global stats meta.
// Because we have truncated a partition, the count should be 5 - 2 = 3 and the modify count should be 2.
testKit.MustQuery(
"select count, modify_count from mysql.stats_meta where table_id = ?", tableInfo.ID,
).Check(
testkit.Rows("3 2"),
)

// Check the version again.
rows = testKit.MustQuery(
"select version from mysql.stats_meta where table_id = ?", partitionID,
).Rows()
require.Len(t, rows, 1)
// Version gets updated after truncate the partition.
require.NotEqual(t, version, rows[0][0].(string))
}

func TestTruncatePartitions(t *testing.T) {
store, do := testkit.CreateMockStoreAndDomain(t)
testKit := testkit.NewTestKit(t, store)
h := do.StatsHandle()
testKit.MustExec("use test")
testKit.MustExec("drop table if exists t")
testKit.MustExec(`
create table t (
a int,
b int,
primary key(a),
index idx(b)
)
partition by range (a) (
partition p0 values less than (6),
partition p1 values less than (11),
partition p2 values less than (16),
partition p3 values less than (21)
)
`)
testKit.MustExec("insert into t values (1,2),(2,2),(6,2),(11,2),(16,2)")
testKit.MustExec("analyze table t")
is := do.InfoSchema()
tbl, err := is.TableByName(
model.NewCIStr("test"), model.NewCIStr("t"),
)
require.NoError(t, err)
tableInfo := tbl.Meta()
pi := tableInfo.GetPartitionInfo()
for _, def := range pi.Definitions {
statsTbl := h.GetPartitionStats(tableInfo, def.ID)
require.False(t, statsTbl.Pseudo)
}
err = h.Update(is)
require.NoError(t, err)

// Get partition p0 and p1's stats update version.
partitionP0ID := pi.Definitions[0].ID
partitionP1ID := pi.Definitions[1].ID
// Get it from stats_meat first.
rows := testKit.MustQuery(
"select version from mysql.stats_meta where table_id in (?, ?) order by table_id", partitionP0ID, partitionP1ID,
).Rows()
require.Len(t, rows, 2)
versionP0 := rows[0][0].(string)
versionP1 := rows[1][0].(string)

// Truncate two partitions.
testKit.MustExec("alter table t truncate partition p0, p1")
// Find the truncate partition event.
truncatePartitionEvent := findEvent(h.DDLEventCh(), model.ActionTruncateTablePartition)
err = h.HandleDDLEvent(truncatePartitionEvent)
require.NoError(t, err)
// Check global stats meta.
// Because we have truncated two partitions, the count should be 5 - 2 - 1 = 2 and the modify count should be 3.
testKit.MustQuery(
"select count, modify_count from mysql.stats_meta where table_id = ?", tableInfo.ID,
).Check(
testkit.Rows("2 3"),
)

// Check the version again.
rows = testKit.MustQuery(
"select version from mysql.stats_meta where table_id in (?, ?) order by table_id", partitionP0ID, partitionP1ID,
).Rows()
require.Len(t, rows, 2)
// Version gets updated after truncate the partition.
require.NotEqual(t, versionP0, rows[0][0].(string))
require.NotEqual(t, versionP1, rows[1][0].(string))
}

func TestDropAPartition(t *testing.T) {
store, do := testkit.CreateMockStoreAndDomain(t)
testKit := testkit.NewTestKit(t, store)
Expand Down
161 changes: 161 additions & 0 deletions pkg/statistics/handle/ddl/truncate_partition.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package ddl

import (
"github.com/pingcap/errors"
"github.com/pingcap/tidb/pkg/infoschema"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/statistics/handle/lockstats"
"github.com/pingcap/tidb/pkg/statistics/handle/logutil"
"github.com/pingcap/tidb/pkg/statistics/handle/storage"
"github.com/pingcap/tidb/pkg/statistics/handle/util"
"go.uber.org/zap"
)

func (h *ddlHandlerImpl) onTruncatePartitions(t *util.DDLEvent) error {
globalTableInfo, addedPartInfo, droppedPartInfo := t.GetTruncatePartitionInfo()
// First, add the new stats meta record for the new partitions.
for _, def := range addedPartInfo.Definitions {
if err := h.statsWriter.InsertTableStats2KV(globalTableInfo, def.ID); err != nil {
return err
}
}

// Second, clean up the old stats meta from global stats meta for the dropped partitions.
// Do not forget to put those operations in one transaction.
if err := util.CallWithSCtx(h.statsHandler.SPool(), func(sctx sessionctx.Context) error {
count := int64(0)
partitionIDs := make([]int64, 0, len(droppedPartInfo.Definitions))
partitionNames := make([]string, 0, len(droppedPartInfo.Definitions))
for _, def := range droppedPartInfo.Definitions {
// Get the count and modify count of the partition.
tableCount, _, _, err := storage.StatsMetaCountAndModifyCount(sctx, def.ID)
if err != nil {
return err
}
count += tableCount
partitionIDs = append(partitionIDs, def.ID)
partitionNames = append(partitionNames, def.Name.O)
}

if count != 0 {
is := sctx.GetDomainInfoSchema().(infoschema.InfoSchema)
globalTableSchema, ok := is.SchemaByTable(globalTableInfo)
if !ok {
return errors.Errorf("schema not found for table %s", globalTableInfo.Name.O)
}
lockedTables, err := lockstats.QueryLockedTables(sctx)
if err != nil {
return errors.Trace(err)
}
isLocked := false
if _, ok := lockedTables[globalTableInfo.ID]; ok {
isLocked = true
}
startTS, err := util.GetStartTS(sctx)
if err != nil {
return errors.Trace(err)
}

// Because we drop the partition, we should subtract the count from the global stats.
// Note: We don't need to subtract the modify count from the global stats.
// For example:
// 1. The partition has 100 rows.
// 2. We deleted 100 rows from the partition.
// 3. The global stats has `count - 100 rows` and 100 modify count.
// 4. We drop the partition.
// 5. The global stats should not be `count` and 0 modify count. We need to keep the modify count.
delta := -count
err = storage.UpdateStatsMeta(
sctx,
startTS,
variable.TableDelta{Count: count, Delta: delta},
globalTableInfo.ID,
isLocked,
)
if err != nil {
fields := truncatePartitionsLogFields(
globalTableSchema,
globalTableInfo,
partitionIDs,
partitionNames,
count,
delta,
startTS,
isLocked,
)
fields = append(fields, zap.Error(err))
logutil.StatsLogger().Error("Update global stats after truncate partition failed",
fields...,
)
return err
}

logutil.StatsLogger().Info("Update global stats after truncate partition",
truncatePartitionsLogFields(
globalTableSchema,
globalTableInfo,
partitionIDs,
partitionNames,
count,
delta,
startTS,
isLocked,
)...,
)
return nil
}

return nil
}, util.FlagWrapTxn); err != nil {
return err
}

// Third, clean up the old stats meta from partition stats meta for the dropped partitions.
// It's OK to put those operations in different transactions. Because it will not affect the correctness.
for _, def := range droppedPartInfo.Definitions {
if err := h.statsWriter.ResetTableStats2KVForDrop(def.ID); err != nil {
return err
}
}

return nil
}

func truncatePartitionsLogFields(
globalTableSchema *model.DBInfo,
globalTableInfo *model.TableInfo,
partitionIDs []int64,
partitionNames []string,
count int64,
delta int64,
startTS uint64,
isLocked bool,
) []zap.Field {
return []zap.Field{
zap.String("schema", globalTableSchema.Name.O),
zap.Int64("tableID", globalTableInfo.ID),
zap.String("tableName", globalTableInfo.Name.O),
zap.Int64s("partitionIDs", partitionIDs),
zap.Strings("partitionNames", partitionNames),
zap.Int64("count", count),
zap.Int64("delta", delta),
zap.Uint64("startTS", startTS),
zap.Bool("isLocked", isLocked),
}
}
5 changes: 2 additions & 3 deletions pkg/statistics/handle/globalstats/global_stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -598,10 +598,9 @@ func TestDDLPartition4GlobalStats(t *testing.T) {
require.NoError(t, h.DumpStatsDeltaToKV(true))
require.NoError(t, h.HandleDDLEvent(<-h.DDLEventCh()))
require.NoError(t, h.Update(is))
// The value of global.count will not be updated automatically when we truncate the table partition.
// Because the partition-stats in the partition table which have been truncated has not been updated.
// We will update the global-stats after the truncate operation.
globalStats = h.GetTableStats(tableInfo)
require.Equal(t, int64(15), globalStats.RealtimeCount)
require.Equal(t, int64(11), globalStats.RealtimeCount)

tk.MustExec("analyze table t;")
result = tk.MustQuery("show stats_meta where table_name = 't';").Rows()
Expand Down

0 comments on commit 619d5de

Please sign in to comment.