From 5bd84aea209e9f962e080d2fb2a005a79773591e Mon Sep 17 00:00:00 2001 From: amyangfei Date: Thu, 4 Aug 2022 19:10:06 +0800 Subject: [PATCH] fake(engine): add cached checkpoint in job master (#6599) close pingcap/tiflow#6598 --- engine/framework/fake/fake_master.go | 44 ++++++++++++++++++++-------- 1 file changed, 32 insertions(+), 12 deletions(-) diff --git a/engine/framework/fake/fake_master.go b/engine/framework/fake/fake_master.go index 544d50c4e5e..9e68a115e69 100644 --- a/engine/framework/fake/fake_master.go +++ b/engine/framework/fake/fake_master.go @@ -102,6 +102,8 @@ type Master struct { config *Config bStatus *businessStatus + cachedCheckpoint *Checkpoint + // workerID stores the ID of the Master AS A WORKER. workerID frameModel.WorkerID @@ -351,12 +353,16 @@ func (m *Master) tickedCheckWorkers(ctx context.Context) error { } func (m *Master) tickedCheckStatus(ctx context.Context) error { + // if job master is not initialized, does nothing here + if !m.initialized.Load() { + return nil + } if m.statusRateLimiter.Allow() { m.bStatus.RLock() log.Info("FakeMaster: Tick", zap.Any("status", m.bStatus.status)) m.bStatus.RUnlock() // save checkpoint, which is used in business only - _, metaErr := m.MetaKVClient().Put(ctx, CheckpointKey(m.workerID), m.genCheckpoint().String()) + _, metaErr := m.MetaKVClient().Put(ctx, CheckpointKey(m.workerID), m.genCheckpoint()) if metaErr != nil { log.Warn("update checkpoint with error", zap.Error(metaErr)) } @@ -452,17 +458,32 @@ func (m *Master) OnWorkerOffline(worker framework.WorkerHandle, reason error) er } workerCkpt := zeroWorkerCheckpoint() + checkpointLoaded := false if ws, err := parseExtBytes(worker.Status().ExtBytes); err != nil { log.Warn("failed to parse worker ext bytes", zap.Error(err)) - workerCkpt.Revision = m.config.EtcdStartRevision } else { workerCkpt.Tick = ws.Tick if ws.Checkpoint != nil { workerCkpt.Revision = ws.Checkpoint.Revision workerCkpt.MvccCount = ws.Checkpoint.MvccCount workerCkpt.Value = ws.Checkpoint.Value + checkpointLoaded = true } } + // can't load worker status from worker manager, try to load checkpoint from + // cached value. + if !checkpointLoaded { + log.Warn("try to load checkpoint from cached value", + zap.Any("checkpoint", m.cachedCheckpoint.Checkpoints[businessID])) + if ckpt, ok := m.cachedCheckpoint.Checkpoints[businessID]; ok { + workerCkpt.Revision = ckpt.Revision + workerCkpt.MvccCount = ckpt.MvccCount + workerCkpt.Value = ckpt.Value + } else { + workerCkpt.Revision = m.config.EtcdStartRevision + } + } + wcfg := m.genWorkerConfig(businessID, workerCkpt) m.workerListMu.Lock() defer m.workerListMu.Unlock() @@ -539,26 +560,20 @@ func CheckpointKey(id frameModel.MasterID) string { return strings.Join([]string{"fake-master", "checkpoint", id}, "/") } -func (m *Master) genCheckpoint() *Checkpoint { +func (m *Master) genCheckpoint() string { m.workerListMu.Lock() defer m.workerListMu.Unlock() - cp := &Checkpoint{ - Ticks: make(map[int]int64), - Checkpoints: make(map[int]workerCheckpoint), - } m.bStatus.RLock() defer m.bStatus.RUnlock() for wid, status := range m.bStatus.status { if businessID, ok := m.workerID2BusinessID[wid]; ok { - cp.Ticks[businessID] = status.Tick + m.cachedCheckpoint.Ticks[businessID] = status.Tick if status.Checkpoint != nil { - cp.Checkpoints[businessID] = *status.Checkpoint - } else { - cp.Checkpoints[businessID] = workerCheckpoint{} + m.cachedCheckpoint.Checkpoints[businessID] = *status.Checkpoint } } } - return cp + return m.cachedCheckpoint.String() } func (m *Master) genWorkerConfig(index int, checkpoint workerCheckpoint) *WorkerConfig { @@ -580,6 +595,10 @@ func (m *Master) genWorkerConfig(index int, checkpoint workerCheckpoint) *Worker // NewFakeMaster creates a new fake master instance func NewFakeMaster(ctx *dcontext.Context, workerID frameModel.WorkerID, masterID frameModel.MasterID, masterConfig *Config) *Master { log.Info("new fake master", zap.Any("config", masterConfig)) + ckpt := &Checkpoint{ + Ticks: make(map[int]int64), + Checkpoints: make(map[int]workerCheckpoint), + } ret := &Master{ workerID: workerID, pendingWorkerSet: make(map[frameModel.WorkerID]int), @@ -592,6 +611,7 @@ func NewFakeMaster(ctx *dcontext.Context, workerID frameModel.WorkerID, masterID ctx: ctx.Context, clocker: clock.New(), initialized: atomic.NewBool(false), + cachedCheckpoint: ckpt, } ret.setStatusCode(frameModel.WorkerStatusNormal) return ret