From 7cd0a7b849f8454f8bbfa540d3db1ddf59623a7a Mon Sep 17 00:00:00 2001 From: Lynn Date: Thu, 1 Aug 2019 14:01:34 +0800 Subject: [PATCH 1/7] *: adjust detal schema count and add metrics --- domain/domain.go | 0 domain/schema_validator.go | 48 +++++++++++++-- domain/schema_validator_test.go | 87 +++++++++++++++++++++++++++ executor/seqtest/seq_executor_test.go | 29 +++++++++ metrics/domain.go | 14 +++++ metrics/metrics.go | 1 + session/session.go | 1 + session/txn.go | 0 sessionctx/variable/session.go | 3 + sessionctx/variable/sysvar.go | 1 + sessionctx/variable/tidb_vars.go | 6 ++ sessionctx/variable/varsutil.go | 12 ++++ 12 files changed, 197 insertions(+), 5 deletions(-) mode change 100644 => 100755 domain/domain.go mode change 100644 => 100755 session/session.go mode change 100644 => 100755 session/txn.go diff --git a/domain/domain.go b/domain/domain.go old mode 100644 new mode 100755 diff --git a/domain/schema_validator.go b/domain/schema_validator.go index 6917589720e41..816645baf5853 100644 --- a/domain/schema_validator.go +++ b/domain/schema_validator.go @@ -17,6 +17,8 @@ import ( "sync" "time" + "github.com/pingcap/tidb/metrics" + "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/store/tikv/oracle" "github.com/pingcap/tidb/util/logutil" "go.uber.org/zap" @@ -65,6 +67,7 @@ type schemaValidator struct { latestSchemaExpire time.Time // deltaSchemaInfos is a queue that maintain the history of changes. deltaSchemaInfos []deltaSchemaInfo + notMergeCnt int } // NewSchemaValidator returns a SchemaValidator structure. @@ -72,7 +75,7 @@ func NewSchemaValidator(lease time.Duration) SchemaValidator { return &schemaValidator{ isStarted: true, lease: lease, - deltaSchemaInfos: make([]deltaSchemaInfo, 0, maxNumberOfDiffsToLoad), + deltaSchemaInfos: make([]deltaSchemaInfo, 0, variable.DefTiDBMaxDeltaSchemaCount), } } @@ -85,14 +88,17 @@ func (s *schemaValidator) IsStarted() bool { func (s *schemaValidator) Stop() { logutil.BgLogger().Info("the schema validator stops") + metrics.LoadSchemaCounter.WithLabelValues(metrics.SchemaValidatorStop).Inc() s.mux.Lock() defer s.mux.Unlock() s.isStarted = false s.latestSchemaVer = 0 - s.deltaSchemaInfos = make([]deltaSchemaInfo, 0, maxNumberOfDiffsToLoad) + s.notMergeCnt = 0 + s.deltaSchemaInfos = s.deltaSchemaInfos[:0] } func (s *schemaValidator) Restart() { + metrics.LoadSchemaCounter.WithLabelValues(metrics.SchemaValidatorRestart).Inc() logutil.BgLogger().Info("the schema validator restarts") s.mux.Lock() defer s.mux.Unlock() @@ -100,11 +106,13 @@ func (s *schemaValidator) Restart() { } func (s *schemaValidator) Reset() { + metrics.LoadSchemaCounter.WithLabelValues(metrics.SchemaValidatorReset).Inc() s.mux.Lock() defer s.mux.Unlock() s.isStarted = true s.latestSchemaVer = 0 - s.deltaSchemaInfos = make([]deltaSchemaInfo, 0, maxNumberOfDiffsToLoad) + s.notMergeCnt = 0 + s.deltaSchemaInfos = s.deltaSchemaInfos[:0] } func (s *schemaValidator) Update(leaseGrantTS uint64, oldVer, currVer int64, changedTableIDs []int64) { @@ -146,13 +154,15 @@ func hasRelatedTableID(relatedTableIDs, updateTableIDs []int64) bool { // NOTE, this function should be called under lock! func (s *schemaValidator) isRelatedTablesChanged(currVer int64, tableIDs []int64) bool { if len(s.deltaSchemaInfos) == 0 { + metrics.LoadSchemaCounter.WithLabelValues(metrics.SchemaValidatorCacheEmpty).Inc() logutil.BgLogger().Info("schema change history is empty", zap.Int64("currVer", currVer)) return true } newerDeltas := s.findNewerDeltas(currVer) if len(newerDeltas) == len(s.deltaSchemaInfos) { + metrics.LoadSchemaCounter.WithLabelValues(metrics.SchemaValidatorCacheMiss).Inc() logutil.BgLogger().Info("the schema version is much older than the latest version", zap.Int64("currVer", currVer), - zap.Int64("latestSchemaVer", s.latestSchemaVer)) + zap.Int64("latestSchemaVer", s.latestSchemaVer), zap.Reflect("deltas", newerDeltas)) return true } for _, item := range newerDeltas { @@ -209,7 +219,35 @@ func (s *schemaValidator) Check(txnTS uint64, schemaVer int64, relatedTableIDs [ func (s *schemaValidator) enqueue(schemaVersion int64, relatedTableIDs []int64) { s.deltaSchemaInfos = append(s.deltaSchemaInfos, deltaSchemaInfo{schemaVersion, relatedTableIDs}) - if len(s.deltaSchemaInfos) > maxNumberOfDiffsToLoad { + s.notMergeCnt++ + + maxCnt := int(variable.GetMaxDetalSchemaCount()) + if len(s.deltaSchemaInfos) > maxCnt && s.notMergeCnt > maxCnt/2 { + s.merge() + s.notMergeCnt = 1 + } + if len(s.deltaSchemaInfos) > maxCnt { s.deltaSchemaInfos = s.deltaSchemaInfos[1:] } } + +func equal(a, b []int64) bool { + if len(a) != len(b) { + return false + } + for i := 0; i < len(a); i++ { + if a[i] != b[i] { + return false + } + } + return true +} + +func (s *schemaValidator) merge() { + // The first item we needn't to merge, because we hope to cover more versions. + for i := len(s.deltaSchemaInfos) - 1; i > 1; i-- { + if equal(s.deltaSchemaInfos[i].relatedTableIDs, s.deltaSchemaInfos[i-1].relatedTableIDs) { + s.deltaSchemaInfos = append(s.deltaSchemaInfos[:i-1], s.deltaSchemaInfos[i:]...) + } + } +} diff --git a/domain/schema_validator_test.go b/domain/schema_validator_test.go index 7dd59b620de7e..fb482435bee7c 100644 --- a/domain/schema_validator_test.go +++ b/domain/schema_validator_test.go @@ -19,6 +19,7 @@ import ( "time" . "github.com/pingcap/check" + "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/store/tikv/oracle" "github.com/pingcap/tidb/util/testleak" ) @@ -143,3 +144,89 @@ func serverFunc(lease time.Duration, requireLease chan leaseGrantItem, oracleCh } } } + +func (*testSuite) TestMerge(c *C) { + lease := 10 * time.Millisecond + originalCnt := variable.GetMaxDetalSchemaCount() + variable.SetMaxDetalSchemaCount(10) + defer variable.SetMaxDetalSchemaCount(originalCnt) + + validator := NewSchemaValidator(lease).(*schemaValidator) + c.Assert(validator.IsStarted(), IsTrue) + ds := []deltaSchemaInfo{ + {0, []int64{1}}, + {1, []int64{1}}, + {2, []int64{1}}, + {3, []int64{1, 2}}, + {4, []int64{1}}, + {5, []int64{1, 3}}, + {6, []int64{1, 3}}, + {7, []int64{1, 3}}, + {8, []int64{1, 2, 3}}, + {9, []int64{1, 2, 3}}, + } + for _, d := range ds { + validator.enqueue(d.schemaVersion, d.relatedTableIDs) + } + validator.enqueue(10, []int64{1}) + ret := []deltaSchemaInfo{ + {0, []int64{1}}, + {2, []int64{1}}, + {3, []int64{1, 2}}, + {4, []int64{1}}, + {7, []int64{1, 3}}, + {9, []int64{1, 2, 3}}, + {10, []int64{1}}, + } + c.Assert(validator.deltaSchemaInfos, DeepEquals, ret) + // notMergeCnt <= max deltal schema count + validator.enqueue(11, []int64{1}) + ret = append(ret, deltaSchemaInfo{11, []int64{1}}) + validator.enqueue(12, []int64{1}) + ret = append(ret, deltaSchemaInfo{12, []int64{1}}) + validator.enqueue(13, []int64{1}) + ret = append(ret, deltaSchemaInfo{13, []int64{1}}) + c.Assert(validator.deltaSchemaInfos, DeepEquals, ret) + validator.enqueue(14, []int64{1}) + ret = append(ret, deltaSchemaInfo{14, []int64{1}}) + c.Assert(validator.deltaSchemaInfos, DeepEquals, ret[1:]) + // notMergeCnt > max deltal schema count + validator.enqueue(15, []int64{1}) + ret = []deltaSchemaInfo{ + {2, []int64{1}}, + {3, []int64{1, 2}}, + {4, []int64{1}}, + {7, []int64{1, 3}}, + {9, []int64{1, 2, 3}}, + {15, []int64{1}}, + } + c.Assert(validator.deltaSchemaInfos, DeepEquals, ret) + + validator.notMergeCnt = 0 + validator.deltaSchemaInfos = []deltaSchemaInfo{ + {0, []int64{1, 2, 3}}, + {1, []int64{2}}, + {2, []int64{3}}, + {3, []int64{1, 2, 3}}, + {4, []int64{1, 4}}, + {5, []int64{1, 5}}, + {6, []int64{1, 6}}, + {7, []int64{1, 7}}, + {8, []int64{1, 2, 3}}, + {9, []int64{1, 2}}, + } + validator.enqueue(10, []int64{1}) + ret = []deltaSchemaInfo{ + {1, []int64{2}}, + {2, []int64{3}}, + {3, []int64{1, 2, 3}}, + {4, []int64{1, 4}}, + {5, []int64{1, 5}}, + {6, []int64{1, 6}}, + {7, []int64{1, 7}}, + {8, []int64{1, 2, 3}}, + {9, []int64{1, 2}}, + {10, []int64{1}}, + } + c.Assert(validator.deltaSchemaInfos, DeepEquals, ret) +} diff --git a/executor/seqtest/seq_executor_test.go b/executor/seqtest/seq_executor_test.go index 3936675404586..37d082658e9ee 100644 --- a/executor/seqtest/seq_executor_test.go +++ b/executor/seqtest/seq_executor_test.go @@ -36,6 +36,7 @@ import ( pb "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/parser" "github.com/pingcap/parser/model" + "github.com/pingcap/parser/terror" "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/executor" @@ -1050,3 +1051,31 @@ func (s *seqTestSuite) TestAutoIDInRetry(c *C) { tk.MustExec("insert into t values ()") tk.MustQuery(`select * from t`).Check(testkit.Rows("1", "2", "3", "4", "5")) } + +func (s *seqTestSuite) TestMaxDetalSchemaCount(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + c.Assert(variable.GetMaxDetalSchemaCount(), Equals, int64(variable.DefTiDBMaxDeltaSchemaCount)) + gvc := domain.GetDomain(tk.Se).GetGlobalVarsCache() + gvc.Disable() + + tk.MustExec("set @@global.tidb_max_delta_schema_count= -1") + tk.MustQuery("show warnings;").Check(testkit.Rows("Warning 1292 Truncated incorrect tidb_max_delta_schema_count value: '-1'")) + // Make sure a new session will load global variables. + tk.Se = nil + tk.MustExec("use test") + c.Assert(variable.GetMaxDetalSchemaCount(), Equals, int64(100)) + tk.MustExec(fmt.Sprintf("set @@global.tidb_max_delta_schema_count= %v", uint64(math.MaxInt64))) + tk.MustQuery("show warnings;").Check(testkit.Rows(fmt.Sprintf("Warning 1292 Truncated incorrect tidb_max_delta_schema_count value: '%d'", uint64(math.MaxInt64)))) + tk.Se = nil + tk.MustExec("use test") + c.Assert(variable.GetMaxDetalSchemaCount(), Equals, int64(16384)) + _, err := tk.Exec("set @@global.tidb_max_delta_schema_count= invalid_val") + c.Assert(terror.ErrorEqual(err, variable.ErrWrongTypeForVar), IsTrue, Commentf("err %v", err)) + + tk.MustExec("set @@global.tidb_max_delta_schema_count= 2048") + tk.Se = nil + tk.MustExec("use test") + c.Assert(variable.GetMaxDetalSchemaCount(), Equals, int64(2048)) + tk.MustQuery("select @@global.tidb_max_delta_schema_count").Check(testkit.Rows("2048")) +} diff --git a/metrics/domain.go b/metrics/domain.go index 017e007e4fb98..c191d047ce254 100644 --- a/metrics/domain.go +++ b/metrics/domain.go @@ -45,4 +45,18 @@ var ( Name: "load_privilege_total", Help: "Counter of load privilege", }, []string{LblType}) + + SchemaValidatorStop = "stop" + SchemaValidatorRestart = "restart" + SchemaValidatorReset = "reset" + SchemaValidatorCacheEmpty = "cache_empty" + SchemaValidatorCacheMiss = "cache_miss" + // HandleSchemaValidate records the counter of handling schema validate. + HandleSchemaValidate = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "tidb", + Subsystem: "domain", + Name: "handle_schema_validate", + Help: "Counter of handle schema validate", + }, []string{LblType}) ) diff --git a/metrics/metrics.go b/metrics/metrics.go index a31d5ec3837c6..5a6de201e21fe 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -152,4 +152,5 @@ func RegisterMetrics() { prometheus.MustRegister(TiKVBatchClientUnavailable) prometheus.MustRegister(TiKVRangeTaskStats) prometheus.MustRegister(TiKVRangeTaskPushDuration) + prometheus.MustRegister(HandleSchemaValidate) } diff --git a/session/session.go b/session/session.go old mode 100644 new mode 100755 index 84db86e0e6b72..1a345a38ea6c5 --- a/session/session.go +++ b/session/session.go @@ -1745,6 +1745,7 @@ var builtinGlobalVariable = []string{ variable.TiDBExpensiveQueryTimeThreshold, variable.TiDBEnableNoopFuncs, variable.TiDBEnableIndexMerge, + variable.TiDBMaxDeltaSchemaCount, } var ( diff --git a/session/txn.go b/session/txn.go old mode 100644 new mode 100755 diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index 715cede8bc46c..89db7229753d3 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -831,6 +831,9 @@ func (s *SessionVars) SetSystemVar(name string, val string) error { s.EnableIndexMerge = TiDBOptOn(val) case TiDBEnableNoopFuncs: s.EnableNoopFuncs = TiDBOptOn(val) + // It's a global variable, but it also wants to be cached in server. + case TiDBMaxDeltaSchemaCount: + SetMaxDetalSchemaCount(tidbOptInt64(val, DefTiDBMaxDeltaSchemaCount)) } s.systems[name] = val return nil diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index f42c04bf3c99e..6f7e12b08d29f 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -693,6 +693,7 @@ var defaultSysVars = []*SysVar{ {ScopeGlobal, TiDBDDLReorgBatchSize, strconv.Itoa(DefTiDBDDLReorgBatchSize)}, {ScopeGlobal, TiDBDDLErrorCountLimit, strconv.Itoa(DefTiDBDDLErrorCountLimit)}, {ScopeSession, TiDBDDLReorgPriority, "PRIORITY_LOW"}, + {ScopeGlobal, TiDBMaxDeltaSchemaCount, strconv.Itoa(DefTiDBMaxDeltaSchemaCount)}, {ScopeSession, TiDBForcePriority, mysql.Priority2Str[DefTiDBForcePriority]}, {ScopeSession, TiDBEnableRadixJoin, BoolToIntStr(DefTiDBUseRadixJoin)}, {ScopeGlobal | ScopeSession, TiDBOptJoinReorderThreshold, strconv.Itoa(DefTiDBOptJoinReorderThreshold)}, diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go index f2b7998ab0341..384c8dba978e5 100644 --- a/sessionctx/variable/tidb_vars.go +++ b/sessionctx/variable/tidb_vars.go @@ -244,6 +244,10 @@ const ( // It can be: PRIORITY_LOW, PRIORITY_NORMAL, PRIORITY_HIGH TiDBDDLReorgPriority = "tidb_ddl_reorg_priority" + // tidb_max_delta_schema_count defines the max length of deltaSchemaInfos. + // deltaSchemaInfos is a queue that maintains the history of schema changes. + TiDBMaxDeltaSchemaCount = "tidb_max_delta_schema_count" + // tidb_scatter_region will scatter the regions for DDLs when it is ON. TiDBScatterRegion = "tidb_scatter_region" @@ -336,6 +340,7 @@ const ( DefTiDBDDLReorgWorkerCount = 16 DefTiDBDDLReorgBatchSize = 1024 DefTiDBDDLErrorCountLimit = 512 + DefTiDBMaxDeltaSchemaCount = 1024 DefTiDBHashAggPartialConcurrency = 4 DefTiDBHashAggFinalConcurrency = 4 DefTiDBForcePriority = mysql.NoPriority @@ -359,6 +364,7 @@ var ( maxDDLReorgWorkerCount int32 = 128 ddlReorgBatchSize int32 = DefTiDBDDLReorgBatchSize ddlErrorCountlimit int64 = DefTiDBDDLErrorCountLimit + maxDetalSchemaCount int64 = DefTiDBMaxDeltaSchemaCount // Export for testing. MaxDDLReorgBatchSize int32 = 10240 MinDDLReorgBatchSize int32 = 32 diff --git a/sessionctx/variable/varsutil.go b/sessionctx/variable/varsutil.go index 47a60f1a7b7ed..12dfeb2dee1c4 100644 --- a/sessionctx/variable/varsutil.go +++ b/sessionctx/variable/varsutil.go @@ -73,6 +73,16 @@ func GetDDLErrorCountLimit() int64 { return atomic.LoadInt64(&ddlErrorCountlimit) } +// SetMaxDetalSchemaCount sets maxDetalSchemaCount size. +func SetMaxDetalSchemaCount(cnt int64) { + atomic.StoreInt64(&maxDetalSchemaCount, cnt) +} + +// GetMaxDetalSchemaCount gets maxDetalSchemaCount size. +func GetMaxDetalSchemaCount() int64 { + return atomic.LoadInt64(&maxDetalSchemaCount) +} + // GetSessionSystemVar gets a system variable. // If it is a session only variable, use the default value defined in code. // Returns error if there is no such variable. @@ -320,6 +330,8 @@ func ValidateSetSystemVar(vars *SessionVars, name string, value string) (string, return checkUInt64SystemVar(name, value, 0, 4294967295, vars) case OldPasswords: return checkUInt64SystemVar(name, value, 0, 2, vars) + case TiDBMaxDeltaSchemaCount: + return checkInt64SystemVar(name, value, 100, 16384, vars) case SessionTrackGtids: if strings.EqualFold(value, "OFF") || value == "0" { return "OFF", nil From 0fd29cbee4c67039f7884140b8efdd00b527ef29 Mon Sep 17 00:00:00 2001 From: Lynn Date: Thu, 22 Aug 2019 13:39:44 +0800 Subject: [PATCH 2/7] metrics: make golint happy --- metrics/domain.go | 1 + 1 file changed, 1 insertion(+) diff --git a/metrics/domain.go b/metrics/domain.go index c191d047ce254..a8ea4e3d4cbbf 100644 --- a/metrics/domain.go +++ b/metrics/domain.go @@ -17,6 +17,7 @@ import ( "github.com/prometheus/client_golang/prometheus" ) +// Metrics for the domain package. var ( // LoadSchemaCounter records the counter of load schema. LoadSchemaCounter = prometheus.NewCounterVec( From 1e33fc1d1d25fb475d67d84bf8c2bf56456d761a Mon Sep 17 00:00:00 2001 From: Lynn Date: Fri, 23 Aug 2019 13:48:46 +0800 Subject: [PATCH 3/7] domain: address comments --- domain/domain.go | 0 domain/schema_validator.go | 3 ++- 2 files changed, 2 insertions(+), 1 deletion(-) mode change 100755 => 100644 domain/domain.go diff --git a/domain/domain.go b/domain/domain.go old mode 100755 new mode 100644 diff --git a/domain/schema_validator.go b/domain/schema_validator.go index 816645baf5853..06f0e867209ac 100644 --- a/domain/schema_validator.go +++ b/domain/schema_validator.go @@ -67,7 +67,8 @@ type schemaValidator struct { latestSchemaExpire time.Time // deltaSchemaInfos is a queue that maintain the history of changes. deltaSchemaInfos []deltaSchemaInfo - notMergeCnt int + // notMergeCnt is used to record the number of deltaSchemaInfo that have not been merged. + notMergeCnt int } // NewSchemaValidator returns a SchemaValidator structure. From 528db8ccdd1187455c05d9c59db7358598ab97bd Mon Sep 17 00:00:00 2001 From: Lynn Date: Mon, 26 Aug 2019 10:06:30 +0800 Subject: [PATCH 4/7] session: tiny update --- session/session.go | 0 session/txn.go | 0 2 files changed, 0 insertions(+), 0 deletions(-) mode change 100755 => 100644 session/session.go mode change 100755 => 100644 session/txn.go diff --git a/session/session.go b/session/session.go old mode 100755 new mode 100644 diff --git a/session/txn.go b/session/txn.go old mode 100755 new mode 100644 From a98c326229d1575b9313fe503c862686db18ca72 Mon Sep 17 00:00:00 2001 From: Lynn Date: Fri, 27 Sep 2019 18:51:57 +0800 Subject: [PATCH 5/7] domain: address comments --- domain/schema_validator.go | 55 ++++++++++++++++----------- domain/schema_validator_test.go | 66 +++++++++------------------------ 2 files changed, 51 insertions(+), 70 deletions(-) diff --git a/domain/schema_validator.go b/domain/schema_validator.go index 06f0e867209ac..b9f5463bc397c 100644 --- a/domain/schema_validator.go +++ b/domain/schema_validator.go @@ -67,8 +67,6 @@ type schemaValidator struct { latestSchemaExpire time.Time // deltaSchemaInfos is a queue that maintain the history of changes. deltaSchemaInfos []deltaSchemaInfo - // notMergeCnt is used to record the number of deltaSchemaInfo that have not been merged. - notMergeCnt int } // NewSchemaValidator returns a SchemaValidator structure. @@ -94,7 +92,6 @@ func (s *schemaValidator) Stop() { defer s.mux.Unlock() s.isStarted = false s.latestSchemaVer = 0 - s.notMergeCnt = 0 s.deltaSchemaInfos = s.deltaSchemaInfos[:0] } @@ -112,7 +109,6 @@ func (s *schemaValidator) Reset() { defer s.mux.Unlock() s.isStarted = true s.latestSchemaVer = 0 - s.notMergeCnt = 0 s.deltaSchemaInfos = s.deltaSchemaInfos[:0] } @@ -219,36 +215,51 @@ func (s *schemaValidator) Check(txnTS uint64, schemaVer int64, relatedTableIDs [ } func (s *schemaValidator) enqueue(schemaVersion int64, relatedTableIDs []int64) { - s.deltaSchemaInfos = append(s.deltaSchemaInfos, deltaSchemaInfo{schemaVersion, relatedTableIDs}) - s.notMergeCnt++ - maxCnt := int(variable.GetMaxDetalSchemaCount()) - if len(s.deltaSchemaInfos) > maxCnt && s.notMergeCnt > maxCnt/2 { - s.merge() - s.notMergeCnt = 1 + if maxCnt <= 0 { + logutil.BgLogger().Info("the schema validator enqueue", zap.Int("delta max count", maxCnt)) + return + } + + delta := deltaSchemaInfo{schemaVersion, relatedTableIDs} + if len(s.deltaSchemaInfos) == 0 { + s.deltaSchemaInfos = append(s.deltaSchemaInfos, delta) + return } + + lastOffset := len(s.deltaSchemaInfos) - 1 + // The first item we needn't to merge, because we hope to cover more versions. + if lastOffset != 0 && unorderedEqual(s.deltaSchemaInfos[lastOffset].relatedTableIDs, delta.relatedTableIDs) { + s.deltaSchemaInfos[lastOffset] = delta + } else { + s.deltaSchemaInfos = append(s.deltaSchemaInfos, delta) + } + if len(s.deltaSchemaInfos) > maxCnt { + logutil.BgLogger().Info("the schema validator enqueue, queue is too long", + zap.Int("delta max count", maxCnt), zap.Int64("remove schema version", s.deltaSchemaInfos[0].schemaVersion)) s.deltaSchemaInfos = s.deltaSchemaInfos[1:] } } -func equal(a, b []int64) bool { +func unorderedEqual(a, b []int64) bool { if len(a) != len(b) { return false } - for i := 0; i < len(a); i++ { - if a[i] != b[i] { + + var isEqual bool + for _, i := range a { + isEqual = false + for _, j := range b { + if i == j { + isEqual = true + break + } + } + if !isEqual { return false } } - return true -} -func (s *schemaValidator) merge() { - // The first item we needn't to merge, because we hope to cover more versions. - for i := len(s.deltaSchemaInfos) - 1; i > 1; i-- { - if equal(s.deltaSchemaInfos[i].relatedTableIDs, s.deltaSchemaInfos[i-1].relatedTableIDs) { - s.deltaSchemaInfos = append(s.deltaSchemaInfos[:i-1], s.deltaSchemaInfos[i:]...) - } - } + return true } diff --git a/domain/schema_validator_test.go b/domain/schema_validator_test.go index fb482435bee7c..558088d5c9caf 100644 --- a/domain/schema_validator_test.go +++ b/domain/schema_validator_test.go @@ -145,14 +145,20 @@ func serverFunc(lease time.Duration, requireLease chan leaseGrantItem, oracleCh } } -func (*testSuite) TestMerge(c *C) { +func (*testSuite) TestEnqueue(c *C) { lease := 10 * time.Millisecond originalCnt := variable.GetMaxDetalSchemaCount() - variable.SetMaxDetalSchemaCount(10) defer variable.SetMaxDetalSchemaCount(originalCnt) validator := NewSchemaValidator(lease).(*schemaValidator) c.Assert(validator.IsStarted(), IsTrue) + // maxCnt is 0. + variable.SetMaxDetalSchemaCount(0) + validator.enqueue(1, []int64{11}) + c.Assert(validator.deltaSchemaInfos, HasLen, 0) + + // maxCnt is 10. + variable.SetMaxDetalSchemaCount(10) ds := []deltaSchemaInfo{ {0, []int64{1}}, {1, []int64{1}}, @@ -179,54 +185,18 @@ func (*testSuite) TestMerge(c *C) { {10, []int64{1}}, } c.Assert(validator.deltaSchemaInfos, DeepEquals, ret) - // notMergeCnt <= max deltal schema count - validator.enqueue(11, []int64{1}) - ret = append(ret, deltaSchemaInfo{11, []int64{1}}) - validator.enqueue(12, []int64{1}) - ret = append(ret, deltaSchemaInfo{12, []int64{1}}) - validator.enqueue(13, []int64{1}) - ret = append(ret, deltaSchemaInfo{13, []int64{1}}) + // The Items' relatedTableIDs have different order. + validator.enqueue(11, []int64{1, 2, 3, 4}) + validator.enqueue(12, []int64{4, 2, 3, 1}) + validator.enqueue(13, []int64{4, 1, 3, 2}) + ret = append(ret, deltaSchemaInfo{13, []int64{4, 1, 3, 2}}) c.Assert(validator.deltaSchemaInfos, DeepEquals, ret) + // The length of deltaSchemaInfos is greater then maxCnt. validator.enqueue(14, []int64{1}) + validator.enqueue(15, []int64{2}) + validator.enqueue(16, []int64{3}) ret = append(ret, deltaSchemaInfo{14, []int64{1}}) + ret = append(ret, deltaSchemaInfo{15, []int64{2}}) + ret = append(ret, deltaSchemaInfo{16, []int64{3}}) c.Assert(validator.deltaSchemaInfos, DeepEquals, ret[1:]) - // notMergeCnt > max deltal schema count - validator.enqueue(15, []int64{1}) - ret = []deltaSchemaInfo{ - {2, []int64{1}}, - {3, []int64{1, 2}}, - {4, []int64{1}}, - {7, []int64{1, 3}}, - {9, []int64{1, 2, 3}}, - {15, []int64{1}}, - } - c.Assert(validator.deltaSchemaInfos, DeepEquals, ret) - - validator.notMergeCnt = 0 - validator.deltaSchemaInfos = []deltaSchemaInfo{ - {0, []int64{1, 2, 3}}, - {1, []int64{2}}, - {2, []int64{3}}, - {3, []int64{1, 2, 3}}, - {4, []int64{1, 4}}, - {5, []int64{1, 5}}, - {6, []int64{1, 6}}, - {7, []int64{1, 7}}, - {8, []int64{1, 2, 3}}, - {9, []int64{1, 2}}, - } - validator.enqueue(10, []int64{1}) - ret = []deltaSchemaInfo{ - {1, []int64{2}}, - {2, []int64{3}}, - {3, []int64{1, 2, 3}}, - {4, []int64{1, 4}}, - {5, []int64{1, 5}}, - {6, []int64{1, 6}}, - {7, []int64{1, 7}}, - {8, []int64{1, 2, 3}}, - {9, []int64{1, 2}}, - {10, []int64{1}}, - } - c.Assert(validator.deltaSchemaInfos, DeepEquals, ret) } From 407607211a0f4f3269ee12f523aafbcc8c0b4d2d Mon Sep 17 00:00:00 2001 From: Lynn Date: Fri, 27 Sep 2019 19:08:05 +0800 Subject: [PATCH 6/7] *: s/Detal/Delta --- domain/schema_validator.go | 2 +- domain/schema_validator_test.go | 8 ++++---- executor/seqtest/seq_executor_test.go | 10 +++++----- sessionctx/variable/session.go | 2 +- sessionctx/variable/tidb_vars.go | 2 +- sessionctx/variable/varsutil.go | 12 ++++++------ 6 files changed, 18 insertions(+), 18 deletions(-) diff --git a/domain/schema_validator.go b/domain/schema_validator.go index b9f5463bc397c..0e906879b8d96 100644 --- a/domain/schema_validator.go +++ b/domain/schema_validator.go @@ -215,7 +215,7 @@ func (s *schemaValidator) Check(txnTS uint64, schemaVer int64, relatedTableIDs [ } func (s *schemaValidator) enqueue(schemaVersion int64, relatedTableIDs []int64) { - maxCnt := int(variable.GetMaxDetalSchemaCount()) + maxCnt := int(variable.GetMaxDeltaSchemaCount()) if maxCnt <= 0 { logutil.BgLogger().Info("the schema validator enqueue", zap.Int("delta max count", maxCnt)) return diff --git a/domain/schema_validator_test.go b/domain/schema_validator_test.go index 558088d5c9caf..dc95fe1af3b6c 100644 --- a/domain/schema_validator_test.go +++ b/domain/schema_validator_test.go @@ -147,18 +147,18 @@ func serverFunc(lease time.Duration, requireLease chan leaseGrantItem, oracleCh func (*testSuite) TestEnqueue(c *C) { lease := 10 * time.Millisecond - originalCnt := variable.GetMaxDetalSchemaCount() - defer variable.SetMaxDetalSchemaCount(originalCnt) + originalCnt := variable.GetMaxDeltaSchemaCount() + defer variable.SetMaxDeltaSchemaCount(originalCnt) validator := NewSchemaValidator(lease).(*schemaValidator) c.Assert(validator.IsStarted(), IsTrue) // maxCnt is 0. - variable.SetMaxDetalSchemaCount(0) + variable.SetMaxDeltaSchemaCount(0) validator.enqueue(1, []int64{11}) c.Assert(validator.deltaSchemaInfos, HasLen, 0) // maxCnt is 10. - variable.SetMaxDetalSchemaCount(10) + variable.SetMaxDeltaSchemaCount(10) ds := []deltaSchemaInfo{ {0, []int64{1}}, {1, []int64{1}}, diff --git a/executor/seqtest/seq_executor_test.go b/executor/seqtest/seq_executor_test.go index ad272fe12118a..8c38c85011f63 100644 --- a/executor/seqtest/seq_executor_test.go +++ b/executor/seqtest/seq_executor_test.go @@ -1055,10 +1055,10 @@ func (s *seqTestSuite) TestAutoIDInRetry(c *C) { tk.MustQuery(`select * from t`).Check(testkit.Rows("1", "2", "3", "4", "5")) } -func (s *seqTestSuite) TestMaxDetalSchemaCount(c *C) { +func (s *seqTestSuite) TestMaxDeltaSchemaCount(c *C) { tk := testkit.NewTestKit(c, s.store) tk.MustExec("use test") - c.Assert(variable.GetMaxDetalSchemaCount(), Equals, int64(variable.DefTiDBMaxDeltaSchemaCount)) + c.Assert(variable.GetMaxDeltaSchemaCount(), Equals, int64(variable.DefTiDBMaxDeltaSchemaCount)) gvc := domain.GetDomain(tk.Se).GetGlobalVarsCache() gvc.Disable() @@ -1067,18 +1067,18 @@ func (s *seqTestSuite) TestMaxDetalSchemaCount(c *C) { // Make sure a new session will load global variables. tk.Se = nil tk.MustExec("use test") - c.Assert(variable.GetMaxDetalSchemaCount(), Equals, int64(100)) + c.Assert(variable.GetMaxDeltaSchemaCount(), Equals, int64(100)) tk.MustExec(fmt.Sprintf("set @@global.tidb_max_delta_schema_count= %v", uint64(math.MaxInt64))) tk.MustQuery("show warnings;").Check(testkit.Rows(fmt.Sprintf("Warning 1292 Truncated incorrect tidb_max_delta_schema_count value: '%d'", uint64(math.MaxInt64)))) tk.Se = nil tk.MustExec("use test") - c.Assert(variable.GetMaxDetalSchemaCount(), Equals, int64(16384)) + c.Assert(variable.GetMaxDeltaSchemaCount(), Equals, int64(16384)) _, err := tk.Exec("set @@global.tidb_max_delta_schema_count= invalid_val") c.Assert(terror.ErrorEqual(err, variable.ErrWrongTypeForVar), IsTrue, Commentf("err %v", err)) tk.MustExec("set @@global.tidb_max_delta_schema_count= 2048") tk.Se = nil tk.MustExec("use test") - c.Assert(variable.GetMaxDetalSchemaCount(), Equals, int64(2048)) + c.Assert(variable.GetMaxDeltaSchemaCount(), Equals, int64(2048)) tk.MustQuery("select @@global.tidb_max_delta_schema_count").Check(testkit.Rows("2048")) } diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index 4d10d6f6779b1..05a2b2a1f4d94 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -896,7 +896,7 @@ func (s *SessionVars) SetSystemVar(name string, val string) error { s.AllowRemoveAutoInc = TiDBOptOn(val) // It's a global variable, but it also wants to be cached in server. case TiDBMaxDeltaSchemaCount: - SetMaxDetalSchemaCount(tidbOptInt64(val, DefTiDBMaxDeltaSchemaCount)) + SetMaxDeltaSchemaCount(tidbOptInt64(val, DefTiDBMaxDeltaSchemaCount)) } s.systems[name] = val return nil diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go index 23b37f3109f07..cf1be91298687 100644 --- a/sessionctx/variable/tidb_vars.go +++ b/sessionctx/variable/tidb_vars.go @@ -380,7 +380,7 @@ var ( maxDDLReorgWorkerCount int32 = 128 ddlReorgBatchSize int32 = DefTiDBDDLReorgBatchSize ddlErrorCountlimit int64 = DefTiDBDDLErrorCountLimit - maxDetalSchemaCount int64 = DefTiDBMaxDeltaSchemaCount + maxDeltaSchemaCount int64 = DefTiDBMaxDeltaSchemaCount // Export for testing. MaxDDLReorgBatchSize int32 = 10240 MinDDLReorgBatchSize int32 = 32 diff --git a/sessionctx/variable/varsutil.go b/sessionctx/variable/varsutil.go index 0f6cb2a5c46c2..a622a0f8558ff 100644 --- a/sessionctx/variable/varsutil.go +++ b/sessionctx/variable/varsutil.go @@ -74,14 +74,14 @@ func GetDDLErrorCountLimit() int64 { return atomic.LoadInt64(&ddlErrorCountlimit) } -// SetMaxDetalSchemaCount sets maxDetalSchemaCount size. -func SetMaxDetalSchemaCount(cnt int64) { - atomic.StoreInt64(&maxDetalSchemaCount, cnt) +// SetMaxDeltaSchemaCount sets maxDeltaSchemaCount size. +func SetMaxDeltaSchemaCount(cnt int64) { + atomic.StoreInt64(&maxDeltaSchemaCount, cnt) } -// GetMaxDetalSchemaCount gets maxDetalSchemaCount size. -func GetMaxDetalSchemaCount() int64 { - return atomic.LoadInt64(&maxDetalSchemaCount) +// GetMaxDeltaSchemaCount gets maxDeltaSchemaCount size. +func GetMaxDeltaSchemaCount() int64 { + return atomic.LoadInt64(&maxDeltaSchemaCount) } // GetSessionSystemVar gets a system variable. From 64bd6d00e7ebb021f36332500af688c775c90792 Mon Sep 17 00:00:00 2001 From: Lynn Date: Fri, 27 Sep 2019 20:03:53 +0800 Subject: [PATCH 7/7] domain: address a comment --- domain/schema_validator.go | 9 ++++++--- domain/schema_validator_test.go | 24 +++++++++++++----------- 2 files changed, 19 insertions(+), 14 deletions(-) diff --git a/domain/schema_validator.go b/domain/schema_validator.go index 0e906879b8d96..66d08cc739709 100644 --- a/domain/schema_validator.go +++ b/domain/schema_validator.go @@ -229,7 +229,7 @@ func (s *schemaValidator) enqueue(schemaVersion int64, relatedTableIDs []int64) lastOffset := len(s.deltaSchemaInfos) - 1 // The first item we needn't to merge, because we hope to cover more versions. - if lastOffset != 0 && unorderedEqual(s.deltaSchemaInfos[lastOffset].relatedTableIDs, delta.relatedTableIDs) { + if lastOffset != 0 && ids(s.deltaSchemaInfos[lastOffset].relatedTableIDs).containIn(delta.relatedTableIDs) { s.deltaSchemaInfos[lastOffset] = delta } else { s.deltaSchemaInfos = append(s.deltaSchemaInfos, delta) @@ -242,8 +242,11 @@ func (s *schemaValidator) enqueue(schemaVersion int64, relatedTableIDs []int64) } } -func unorderedEqual(a, b []int64) bool { - if len(a) != len(b) { +type ids []int64 + +// containIn is checks if a is included in b. +func (a ids) containIn(b []int64) bool { + if len(a) > len(b) { return false } diff --git a/domain/schema_validator_test.go b/domain/schema_validator_test.go index dc95fe1af3b6c..08b6aae000e20 100644 --- a/domain/schema_validator_test.go +++ b/domain/schema_validator_test.go @@ -163,11 +163,11 @@ func (*testSuite) TestEnqueue(c *C) { {0, []int64{1}}, {1, []int64{1}}, {2, []int64{1}}, - {3, []int64{1, 2}}, - {4, []int64{1}}, - {5, []int64{1, 3}}, - {6, []int64{1, 3}}, - {7, []int64{1, 3}}, + {3, []int64{2, 2}}, + {4, []int64{2}}, + {5, []int64{1, 4}}, + {6, []int64{1, 4}}, + {7, []int64{3, 1, 3}}, {8, []int64{1, 2, 3}}, {9, []int64{1, 2, 3}}, } @@ -178,25 +178,27 @@ func (*testSuite) TestEnqueue(c *C) { ret := []deltaSchemaInfo{ {0, []int64{1}}, {2, []int64{1}}, - {3, []int64{1, 2}}, - {4, []int64{1}}, - {7, []int64{1, 3}}, + {3, []int64{2, 2}}, + {4, []int64{2}}, + {6, []int64{1, 4}}, {9, []int64{1, 2, 3}}, {10, []int64{1}}, } c.Assert(validator.deltaSchemaInfos, DeepEquals, ret) // The Items' relatedTableIDs have different order. validator.enqueue(11, []int64{1, 2, 3, 4}) - validator.enqueue(12, []int64{4, 2, 3, 1}) - validator.enqueue(13, []int64{4, 1, 3, 2}) - ret = append(ret, deltaSchemaInfo{13, []int64{4, 1, 3, 2}}) + validator.enqueue(12, []int64{4, 1, 2, 3, 1}) + validator.enqueue(13, []int64{4, 1, 3, 2, 5}) + ret[len(ret)-1] = deltaSchemaInfo{13, []int64{4, 1, 3, 2, 5}} c.Assert(validator.deltaSchemaInfos, DeepEquals, ret) // The length of deltaSchemaInfos is greater then maxCnt. validator.enqueue(14, []int64{1}) validator.enqueue(15, []int64{2}) validator.enqueue(16, []int64{3}) + validator.enqueue(17, []int64{4}) ret = append(ret, deltaSchemaInfo{14, []int64{1}}) ret = append(ret, deltaSchemaInfo{15, []int64{2}}) ret = append(ret, deltaSchemaInfo{16, []int64{3}}) + ret = append(ret, deltaSchemaInfo{17, []int64{4}}) c.Assert(validator.deltaSchemaInfos, DeepEquals, ret[1:]) }