diff --git a/cdc/entry/mounter.go b/cdc/entry/mounter.go index b1912d38bbf..a514e14e3c6 100644 --- a/cdc/entry/mounter.go +++ b/cdc/entry/mounter.go @@ -161,8 +161,8 @@ func (m *mounterImpl) collectMetrics(ctx context.Context) { captureID := util.CaptureIDFromCtx(ctx) changefeedID := util.ChangefeedIDFromCtx(ctx) tableIDStr := strconv.FormatInt(util.TableIDFromCtx(ctx), 10) - metricMounterInputChanSize := mounterInputChanSizeGauge.WithLabelValues(captureID, changefeedID, tableIDStr) + for { select { case <-ctx.Done(): diff --git a/cdc/entry/schema_storage.go b/cdc/entry/schema_storage.go index 96553a54180..255b99b56e4 100644 --- a/cdc/entry/schema_storage.go +++ b/cdc/entry/schema_storage.go @@ -544,17 +544,13 @@ type SchemaStorage struct { // NewSchemaStorage creates a new schema storage func NewSchemaStorage(jobs []*timodel.Job) (*SchemaStorage, error) { - snap := newEmptySchemaSnapshot() + schema := &SchemaStorage{} for _, job := range jobs { - if err := snap.handleDDL(job); err != nil { + if err := schema.HandleDDLJob(job); err != nil { return nil, errors.Trace(err) } } - return &SchemaStorage{ - snaps: []*schemaSnapshot{snap}, - gcTs: snap.currentTs, - resolvedTs: snap.currentTs, - }, nil + return schema, nil } func (s *SchemaStorage) getSnapshot(ts uint64) (*schemaSnapshot, error) { @@ -606,18 +602,23 @@ func (s *SchemaStorage) GetLastSnapshot() *schemaSnapshot { // HandleDDLJob creates a new snapshot in storage and handles the ddl job func (s *SchemaStorage) HandleDDLJob(job *timodel.Job) error { - s.snapsMu.Lock() - defer s.snapsMu.Unlock() - lastSnap := s.snaps[len(s.snaps)-1] - if job.BinlogInfo.FinishedTS <= lastSnap.currentTs { - log.Debug("ignore foregone DDL job", zap.Reflect("job", job)) - return nil - } if SkipJob(job) { s.AdvanceResolvedTs(job.BinlogInfo.FinishedTS) return nil } - snap := lastSnap.Clone() + s.snapsMu.Lock() + defer s.snapsMu.Unlock() + var snap *schemaSnapshot + if len(s.snaps) > 0 { + lastSnap := s.snaps[len(s.snaps)-1] + if job.BinlogInfo.FinishedTS <= lastSnap.currentTs { + log.Debug("ignore foregone DDL job", zap.Reflect("job", job)) + return nil + } + snap = lastSnap.Clone() + } else { + snap = newEmptySchemaSnapshot() + } if err := snap.handleDDL(job); err != nil { return errors.Trace(err) } @@ -649,8 +650,12 @@ func (s *SchemaStorage) DoGC(ts uint64) { } startIdx = i } + if startIdx == 0 { + return + } s.snaps = s.snaps[startIdx:] atomic.StoreUint64(&s.gcTs, s.snaps[0].currentTs) + log.Info("finished gc in schema storage", zap.Uint64("gcTs", s.snaps[0].currentTs)) } // SkipJob skip the job should not be executed diff --git a/cdc/entry/schema_storage_test.go b/cdc/entry/schema_storage_test.go index 468139a345b..e1fb0ae9244 100644 --- a/cdc/entry/schema_storage_test.go +++ b/cdc/entry/schema_storage_test.go @@ -466,6 +466,7 @@ func (t *schemaSuite) TestMultiVersionStorage(c *C) { Name: dbName, State: timodel.StatePublic, } + var jobs []*timodel.Job // `createSchema` job1 job := &timodel.Job{ ID: 3, @@ -475,9 +476,7 @@ func (t *schemaSuite) TestMultiVersionStorage(c *C) { BinlogInfo: &timodel.HistoryInfo{SchemaVersion: 1, DBInfo: dbInfo, FinishedTS: 100}, Query: "create database test", } - - storage, err := NewSchemaStorage([]*timodel.Job{job}) - c.Assert(err, IsNil) + jobs = append(jobs, job) // table info tblInfo := &timodel.TableInfo{ @@ -497,8 +496,7 @@ func (t *schemaSuite) TestMultiVersionStorage(c *C) { Query: "create table " + tbName.O, } - err = storage.HandleDDLJob(job) - c.Assert(err, IsNil) + jobs = append(jobs, job) tbName = timodel.NewCIStr("T2") // table info @@ -518,7 +516,8 @@ func (t *schemaSuite) TestMultiVersionStorage(c *C) { Query: "create table " + tbName.O, } - err = storage.HandleDDLJob(job) + jobs = append(jobs, job) + storage, err := NewSchemaStorage(jobs) c.Assert(err, IsNil) // `dropTable` job @@ -591,4 +590,39 @@ func (t *schemaSuite) TestMultiVersionStorage(c *C) { c.Assert(exist, IsFalse) _, exist = snap.TableByID(3) c.Assert(exist, IsFalse) + + storage.DoGC(0) + snap, err = storage.GetSnapshot(100) + c.Assert(err, IsNil) + _, exist = snap.SchemaByID(1) + c.Assert(exist, IsTrue) + _, exist = snap.TableByID(2) + c.Assert(exist, IsFalse) + _, exist = snap.TableByID(3) + c.Assert(exist, IsFalse) + storage.DoGC(115) + _, err = storage.GetSnapshot(100) + c.Assert(err, NotNil) + snap, err = storage.GetSnapshot(115) + c.Assert(err, IsNil) + _, exist = snap.SchemaByID(1) + c.Assert(exist, IsTrue) + _, exist = snap.TableByID(2) + c.Assert(exist, IsTrue) + _, exist = snap.TableByID(3) + c.Assert(exist, IsFalse) + + storage.DoGC(155) + storage.AdvanceResolvedTs(185) + + snap, err = storage.GetSnapshot(180) + c.Assert(err, IsNil) + _, exist = snap.SchemaByID(1) + c.Assert(exist, IsFalse) + _, exist = snap.TableByID(2) + c.Assert(exist, IsFalse) + _, exist = snap.TableByID(3) + c.Assert(exist, IsFalse) + _, err = storage.GetSnapshot(130) + c.Assert(err, NotNil) } diff --git a/cdc/kv/store_op.go b/cdc/kv/store_op.go index c40b93a2423..b933fde9ca7 100644 --- a/cdc/kv/store_op.go +++ b/cdc/kv/store_op.go @@ -60,7 +60,7 @@ func getSnapshotMeta(tiStore tidbkv.Storage) (*meta.Meta, error) { } // LoadHistoryDDLJobs loads all history DDL jobs from TiDB. -func LoadHistoryDDLJobs(kvStore tidbkv.Storage, ts uint64) ([]*model.Job, error) { +func LoadHistoryDDLJobs(kvStore tidbkv.Storage) ([]*model.Job, error) { originalJobs, err := loadHistoryDDLJobs(kvStore) jobs := make([]*model.Job, 0, len(originalJobs)) if err != nil { @@ -77,9 +77,6 @@ func LoadHistoryDDLJobs(kvStore tidbkv.Storage, ts uint64) ([]*model.Job, error) return nil, errors.Trace(err) } } - if job.BinlogInfo.FinishedTS > ts { - break - } jobs = append(jobs, job) } return jobs, nil diff --git a/cdc/kv/store_op_test.go b/cdc/kv/store_op_test.go index 97d9058eb1e..1cf402adae2 100644 --- a/cdc/kv/store_op_test.go +++ b/cdc/kv/store_op_test.go @@ -14,8 +14,6 @@ package kv import ( - "math" - "github.com/pingcap/check" "github.com/pingcap/tidb/session" "github.com/pingcap/tidb/store/mockstore" @@ -44,7 +42,7 @@ func (s *storeSuite) TestLoadHistoryDDLJobs(c *check.C) { domain.SetStatsUpdating(true) oldJobIDs := make(map[int64]struct{}) - oldJobs, err := LoadHistoryDDLJobs(store, math.MaxUint64) + oldJobs, err := LoadHistoryDDLJobs(store) c.Assert(err, check.IsNil) for _, job := range oldJobs { oldJobIDs[job.ID] = struct{}{} @@ -53,7 +51,7 @@ func (s *storeSuite) TestLoadHistoryDDLJobs(c *check.C) { tk := testkit.NewTestKit(c, store) tk.MustExec("create table test.simple_test (id bigint primary key)") - latestJobs, err := LoadHistoryDDLJobs(store, math.MaxUint64) + latestJobs, err := LoadHistoryDDLJobs(store) c.Assert(err, check.IsNil) c.Assert(len(latestJobs)-len(oldJobs), check.Equals, 1) _, ok := oldJobIDs[latestJobs[len(latestJobs)-1].ID] diff --git a/cdc/owner.go b/cdc/owner.go index 273ae5ca1a5..69ba635c1d1 100644 --- a/cdc/owner.go +++ b/cdc/owner.go @@ -135,7 +135,7 @@ func (o *Owner) newChangeFeed( if err != nil { return nil, err } - jobs, err := kv.LoadHistoryDDLJobs(kvStore, checkpointTs) + jobs, err := kv.LoadHistoryDDLJobs(kvStore) if err != nil { return nil, errors.Trace(err) } diff --git a/cdc/processor.go b/cdc/processor.go index 249cd8235eb..85ad6a98d81 100644 --- a/cdc/processor.go +++ b/cdc/processor.go @@ -584,7 +584,7 @@ func createSchemaStorage(pdEndpoints []string, checkpointTs uint64) (*entry.Sche if err != nil { return nil, errors.Trace(err) } - jobs, err := kv.LoadHistoryDDLJobs(kvStore, checkpointTs) + jobs, err := kv.LoadHistoryDDLJobs(kvStore) if err != nil { return nil, errors.Trace(err) } diff --git a/cdc/puller/mock_puller.go b/cdc/puller/mock_puller.go index 8e4832547d0..b4fee5c55cb 100644 --- a/cdc/puller/mock_puller.go +++ b/cdc/puller/mock_puller.go @@ -2,7 +2,6 @@ package puller import ( "context" - "math" "sync" "time" @@ -250,7 +249,7 @@ func (m *MockPullerManager) GetTableInfo(schemaName, tableName string) *entry.Ta // GetDDLJobs returns the ddl jobs func (m *MockPullerManager) GetDDLJobs() []*timodel.Job { - jobs, err := kv.LoadHistoryDDLJobs(m.store, math.MaxUint64) + jobs, err := kv.LoadHistoryDDLJobs(m.store) m.c.Assert(err, check.IsNil) return jobs } diff --git a/cdc/sink/mysql.go b/cdc/sink/mysql.go index bf72ef26c18..ed9cfc1297e 100644 --- a/cdc/sink/mysql.go +++ b/cdc/sink/mysql.go @@ -93,7 +93,9 @@ func (s *mysqlSink) EmitRowChangedEvent(ctx context.Context, rows ...*model.RowC key := util.QuoteSchema(row.Schema, row.Table) s.unresolvedRows[key] = append(s.unresolvedRows[key], row) } - atomic.StoreUint64(&s.sinkResolvedTs, resolvedTs) + if resolvedTs != 0 { + atomic.StoreUint64(&s.sinkResolvedTs, resolvedTs) + } return nil } diff --git a/cmd/client.go b/cmd/client.go index 58296843a57..57ba1e5bcda 100644 --- a/cmd/client.go +++ b/cmd/client.go @@ -273,7 +273,7 @@ func verifyTables(ctx context.Context, cfg *util.ReplicaConfig) (ineligibleTable if err != nil { return nil, err } - jobs, err := kv.LoadHistoryDDLJobs(kvStore, startTs) + jobs, err := kv.LoadHistoryDDLJobs(kvStore) if err != nil { return nil, errors.Trace(err) }