From 2189e39de62b118056ffbfe2e8642ba2290756d1 Mon Sep 17 00:00:00 2001 From: amyangfei Date: Wed, 3 Aug 2022 18:33:25 +0800 Subject: [PATCH 1/6] fake(engine): add cached checkpoint in job master --- engine/framework/fake/fake_master.go | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/engine/framework/fake/fake_master.go b/engine/framework/fake/fake_master.go index 544d50c4e5e..765aa43c1ab 100644 --- a/engine/framework/fake/fake_master.go +++ b/engine/framework/fake/fake_master.go @@ -102,6 +102,8 @@ type Master struct { config *Config bStatus *businessStatus + cachedCheckpoint *Checkpoint + // workerID stores the ID of the Master AS A WORKER. workerID frameModel.WorkerID @@ -356,7 +358,7 @@ func (m *Master) tickedCheckStatus(ctx context.Context) error { log.Info("FakeMaster: Tick", zap.Any("status", m.bStatus.status)) m.bStatus.RUnlock() // save checkpoint, which is used in business only - _, metaErr := m.MetaKVClient().Put(ctx, CheckpointKey(m.workerID), m.genCheckpoint().String()) + _, metaErr := m.MetaKVClient().Put(ctx, CheckpointKey(m.workerID), m.genCheckpoint()) if metaErr != nil { log.Warn("update checkpoint with error", zap.Error(metaErr)) } @@ -539,26 +541,20 @@ func CheckpointKey(id frameModel.MasterID) string { return strings.Join([]string{"fake-master", "checkpoint", id}, "/") } -func (m *Master) genCheckpoint() *Checkpoint { +func (m *Master) genCheckpoint() string { m.workerListMu.Lock() defer m.workerListMu.Unlock() - cp := &Checkpoint{ - Ticks: make(map[int]int64), - Checkpoints: make(map[int]workerCheckpoint), - } m.bStatus.RLock() defer m.bStatus.RUnlock() for wid, status := range m.bStatus.status { if businessID, ok := m.workerID2BusinessID[wid]; ok { - cp.Ticks[businessID] = status.Tick + m.cachedCheckpoint.Ticks[businessID] = status.Tick if status.Checkpoint != nil { - cp.Checkpoints[businessID] = *status.Checkpoint - } else { - cp.Checkpoints[businessID] = workerCheckpoint{} + m.cachedCheckpoint.Checkpoints[businessID] = *status.Checkpoint } } } - return cp + return m.cachedCheckpoint.String() } func (m *Master) genWorkerConfig(index int, checkpoint workerCheckpoint) *WorkerConfig { @@ -580,6 +576,10 @@ func (m *Master) genWorkerConfig(index int, checkpoint workerCheckpoint) *Worker // NewFakeMaster creates a new fake master instance func NewFakeMaster(ctx *dcontext.Context, workerID frameModel.WorkerID, masterID frameModel.MasterID, masterConfig *Config) *Master { log.Info("new fake master", zap.Any("config", masterConfig)) + ckpt := &Checkpoint{ + Ticks: make(map[int]int64), + Checkpoints: make(map[int]workerCheckpoint), + } ret := &Master{ workerID: workerID, pendingWorkerSet: make(map[frameModel.WorkerID]int), @@ -592,6 +592,7 @@ func NewFakeMaster(ctx *dcontext.Context, workerID frameModel.WorkerID, masterID ctx: ctx.Context, clocker: clock.New(), initialized: atomic.NewBool(false), + cachedCheckpoint: ckpt, } ret.setStatusCode(frameModel.WorkerStatusNormal) return ret From df023db4fd6feafeee810a08706b51331d4ec868 Mon Sep 17 00:00:00 2001 From: amyangfei Date: Thu, 4 Aug 2022 11:44:57 +0800 Subject: [PATCH 2/6] add more guard --- engine/framework/fake/fake_master.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/engine/framework/fake/fake_master.go b/engine/framework/fake/fake_master.go index 765aa43c1ab..25f2313aa5f 100644 --- a/engine/framework/fake/fake_master.go +++ b/engine/framework/fake/fake_master.go @@ -353,6 +353,9 @@ func (m *Master) tickedCheckWorkers(ctx context.Context) error { } func (m *Master) tickedCheckStatus(ctx context.Context) error { + if !m.initialized.Load() { + return nil + } if m.statusRateLimiter.Allow() { m.bStatus.RLock() log.Info("FakeMaster: Tick", zap.Any("status", m.bStatus.status)) From 33b3b899fe35783da0e66ae066506b6a54e1f751 Mon Sep 17 00:00:00 2001 From: amyangfei Date: Thu, 4 Aug 2022 11:49:44 +0800 Subject: [PATCH 3/6] add comment --- engine/framework/fake/fake_master.go | 1 + 1 file changed, 1 insertion(+) diff --git a/engine/framework/fake/fake_master.go b/engine/framework/fake/fake_master.go index 25f2313aa5f..4a5b52cf221 100644 --- a/engine/framework/fake/fake_master.go +++ b/engine/framework/fake/fake_master.go @@ -353,6 +353,7 @@ func (m *Master) tickedCheckWorkers(ctx context.Context) error { } func (m *Master) tickedCheckStatus(ctx context.Context) error { + // if job master is not initialized, does nothing here if !m.initialized.Load() { return nil } From 003d6dd58a4b0b8639cbcff3e2ddc53132680daa Mon Sep 17 00:00:00 2001 From: amyangfei Date: Thu, 4 Aug 2022 12:19:42 +0800 Subject: [PATCH 4/6] refine checkpoint usage on worker offline --- engine/framework/fake/fake_master.go | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/engine/framework/fake/fake_master.go b/engine/framework/fake/fake_master.go index 4a5b52cf221..7543fef8d41 100644 --- a/engine/framework/fake/fake_master.go +++ b/engine/framework/fake/fake_master.go @@ -458,17 +458,15 @@ func (m *Master) OnWorkerOffline(worker framework.WorkerHandle, reason error) er } workerCkpt := zeroWorkerCheckpoint() - if ws, err := parseExtBytes(worker.Status().ExtBytes); err != nil { - log.Warn("failed to parse worker ext bytes", zap.Error(err)) - workerCkpt.Revision = m.config.EtcdStartRevision + if ckpt, ok := m.cachedCheckpoint.Checkpoints[businessID]; ok { + workerCkpt = ckpt } else { - workerCkpt.Tick = ws.Tick - if ws.Checkpoint != nil { - workerCkpt.Revision = ws.Checkpoint.Revision - workerCkpt.MvccCount = ws.Checkpoint.MvccCount - workerCkpt.Value = ws.Checkpoint.Value - } + workerCkpt.Revision = m.config.EtcdStartRevision } + if tick, ok := m.cachedCheckpoint.Ticks[businessID]; ok { + workerCkpt.Tick = tick + } + wcfg := m.genWorkerConfig(businessID, workerCkpt) m.workerListMu.Lock() defer m.workerListMu.Unlock() From d95b50286aaaa85f458092a8c447b3369d6c7968 Mon Sep 17 00:00:00 2001 From: amyangfei Date: Thu, 4 Aug 2022 12:29:28 +0800 Subject: [PATCH 5/6] fix onworker offline --- engine/framework/fake/fake_master.go | 25 ++++++++++++++++++++----- 1 file changed, 20 insertions(+), 5 deletions(-) diff --git a/engine/framework/fake/fake_master.go b/engine/framework/fake/fake_master.go index 7543fef8d41..88e2ce57917 100644 --- a/engine/framework/fake/fake_master.go +++ b/engine/framework/fake/fake_master.go @@ -458,13 +458,28 @@ func (m *Master) OnWorkerOffline(worker framework.WorkerHandle, reason error) er } workerCkpt := zeroWorkerCheckpoint() - if ckpt, ok := m.cachedCheckpoint.Checkpoints[businessID]; ok { - workerCkpt = ckpt + checkpointLoaded := false + if ws, err := parseExtBytes(worker.Status().ExtBytes); err != nil { + log.Warn("failed to parse worker ext bytes", zap.Error(err)) } else { - workerCkpt.Revision = m.config.EtcdStartRevision + workerCkpt.Tick = ws.Tick + if ws.Checkpoint != nil { + workerCkpt.Revision = ws.Checkpoint.Revision + workerCkpt.MvccCount = ws.Checkpoint.MvccCount + workerCkpt.Value = ws.Checkpoint.Value + checkpointLoaded = true + } } - if tick, ok := m.cachedCheckpoint.Ticks[businessID]; ok { - workerCkpt.Tick = tick + // can't load worker status from worker manager, try to load checkpoint from + // cached value. + if !checkpointLoaded { + log.Warn("try to load checkpoint from cached value", + zap.Any("checkpoint", m.cachedCheckpoint.Checkpoints[businessID])) + if ckpt, ok := m.cachedCheckpoint.Checkpoints[businessID]; ok { + workerCkpt = ckpt + } else { + workerCkpt.Revision = m.config.EtcdStartRevision + } } wcfg := m.genWorkerConfig(businessID, workerCkpt) From bb5cf1e34b40cb6f964cdb75efad43dcb015cbe3 Mon Sep 17 00:00:00 2001 From: amyangfei Date: Thu, 4 Aug 2022 12:31:49 +0800 Subject: [PATCH 6/6] fix ckpt copy --- engine/framework/fake/fake_master.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/engine/framework/fake/fake_master.go b/engine/framework/fake/fake_master.go index 88e2ce57917..9e68a115e69 100644 --- a/engine/framework/fake/fake_master.go +++ b/engine/framework/fake/fake_master.go @@ -476,7 +476,9 @@ func (m *Master) OnWorkerOffline(worker framework.WorkerHandle, reason error) er log.Warn("try to load checkpoint from cached value", zap.Any("checkpoint", m.cachedCheckpoint.Checkpoints[businessID])) if ckpt, ok := m.cachedCheckpoint.Checkpoints[businessID]; ok { - workerCkpt = ckpt + workerCkpt.Revision = ckpt.Revision + workerCkpt.MvccCount = ckpt.MvccCount + workerCkpt.Value = ckpt.Value } else { workerCkpt.Revision = m.config.EtcdStartRevision }