Skip to content

Commit

Permalink
fake(engine): add cached checkpoint in job master (#6599)
Browse files Browse the repository at this point in the history
close #6598
  • Loading branch information
amyangfei authored Aug 4, 2022
1 parent 6ff0ac6 commit 5bd84ae
Showing 1 changed file with 32 additions and 12 deletions.
44 changes: 32 additions & 12 deletions engine/framework/fake/fake_master.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ type Master struct {
config *Config
bStatus *businessStatus

cachedCheckpoint *Checkpoint

// workerID stores the ID of the Master AS A WORKER.
workerID frameModel.WorkerID

Expand Down Expand Up @@ -351,12 +353,16 @@ func (m *Master) tickedCheckWorkers(ctx context.Context) error {
}

func (m *Master) tickedCheckStatus(ctx context.Context) error {
// if job master is not initialized, does nothing here
if !m.initialized.Load() {
return nil
}
if m.statusRateLimiter.Allow() {
m.bStatus.RLock()
log.Info("FakeMaster: Tick", zap.Any("status", m.bStatus.status))
m.bStatus.RUnlock()
// save checkpoint, which is used in business only
_, metaErr := m.MetaKVClient().Put(ctx, CheckpointKey(m.workerID), m.genCheckpoint().String())
_, metaErr := m.MetaKVClient().Put(ctx, CheckpointKey(m.workerID), m.genCheckpoint())
if metaErr != nil {
log.Warn("update checkpoint with error", zap.Error(metaErr))
}
Expand Down Expand Up @@ -452,17 +458,32 @@ func (m *Master) OnWorkerOffline(worker framework.WorkerHandle, reason error) er
}

workerCkpt := zeroWorkerCheckpoint()
checkpointLoaded := false
if ws, err := parseExtBytes(worker.Status().ExtBytes); err != nil {
log.Warn("failed to parse worker ext bytes", zap.Error(err))
workerCkpt.Revision = m.config.EtcdStartRevision
} else {
workerCkpt.Tick = ws.Tick
if ws.Checkpoint != nil {
workerCkpt.Revision = ws.Checkpoint.Revision
workerCkpt.MvccCount = ws.Checkpoint.MvccCount
workerCkpt.Value = ws.Checkpoint.Value
checkpointLoaded = true
}
}
// can't load worker status from worker manager, try to load checkpoint from
// cached value.
if !checkpointLoaded {
log.Warn("try to load checkpoint from cached value",
zap.Any("checkpoint", m.cachedCheckpoint.Checkpoints[businessID]))
if ckpt, ok := m.cachedCheckpoint.Checkpoints[businessID]; ok {
workerCkpt.Revision = ckpt.Revision
workerCkpt.MvccCount = ckpt.MvccCount
workerCkpt.Value = ckpt.Value
} else {
workerCkpt.Revision = m.config.EtcdStartRevision
}
}

wcfg := m.genWorkerConfig(businessID, workerCkpt)
m.workerListMu.Lock()
defer m.workerListMu.Unlock()
Expand Down Expand Up @@ -539,26 +560,20 @@ func CheckpointKey(id frameModel.MasterID) string {
return strings.Join([]string{"fake-master", "checkpoint", id}, "/")
}

func (m *Master) genCheckpoint() *Checkpoint {
func (m *Master) genCheckpoint() string {
m.workerListMu.Lock()
defer m.workerListMu.Unlock()
cp := &Checkpoint{
Ticks: make(map[int]int64),
Checkpoints: make(map[int]workerCheckpoint),
}
m.bStatus.RLock()
defer m.bStatus.RUnlock()
for wid, status := range m.bStatus.status {
if businessID, ok := m.workerID2BusinessID[wid]; ok {
cp.Ticks[businessID] = status.Tick
m.cachedCheckpoint.Ticks[businessID] = status.Tick
if status.Checkpoint != nil {
cp.Checkpoints[businessID] = *status.Checkpoint
} else {
cp.Checkpoints[businessID] = workerCheckpoint{}
m.cachedCheckpoint.Checkpoints[businessID] = *status.Checkpoint
}
}
}
return cp
return m.cachedCheckpoint.String()
}

func (m *Master) genWorkerConfig(index int, checkpoint workerCheckpoint) *WorkerConfig {
Expand All @@ -580,6 +595,10 @@ func (m *Master) genWorkerConfig(index int, checkpoint workerCheckpoint) *Worker
// NewFakeMaster creates a new fake master instance
func NewFakeMaster(ctx *dcontext.Context, workerID frameModel.WorkerID, masterID frameModel.MasterID, masterConfig *Config) *Master {
log.Info("new fake master", zap.Any("config", masterConfig))
ckpt := &Checkpoint{
Ticks: make(map[int]int64),
Checkpoints: make(map[int]workerCheckpoint),
}
ret := &Master{
workerID: workerID,
pendingWorkerSet: make(map[frameModel.WorkerID]int),
Expand All @@ -592,6 +611,7 @@ func NewFakeMaster(ctx *dcontext.Context, workerID frameModel.WorkerID, masterID
ctx: ctx.Context,
clocker: clock.New(),
initialized: atomic.NewBool(false),
cachedCheckpoint: ckpt,
}
ret.setStatusCode(frameModel.WorkerStatusNormal)
return ret
Expand Down

0 comments on commit 5bd84ae

Please sign in to comment.