Skip to content

Commit

Permalink
add initial memo on schedule workflows
Browse files Browse the repository at this point in the history
  • Loading branch information
dnr committed Jan 26, 2023
1 parent b494736 commit ff76e9b
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 1 deletion.
18 changes: 18 additions & 0 deletions service/frontend/workflow_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2982,6 +2982,8 @@ func (wh *WorkflowHandler) CreateSchedule(ctx context.Context, request *workflow
if err != nil {
return nil, err
}
// Add initial memo for list schedules
wh.addInitialScheduleMemo(request, input)
// Add namespace division
searchattribute.AddSearchAttribute(&request.SearchAttributes, searchattribute.TemporalNamespaceDivision, payload.EncodeString(scheduler.NamespaceDivision))
// Create StartWorkflowExecutionRequest
Expand Down Expand Up @@ -4837,6 +4839,22 @@ func (wh *WorkflowHandler) cleanScheduleMemo(memo *commonpb.Memo) *commonpb.Memo
return memo
}

// This mutates request (but idempotent so safe for retries)
func (wh *WorkflowHandler) addInitialScheduleMemo(request *workflowservice.CreateScheduleRequest, args *schedspb.StartScheduleArgs) {
info := scheduler.GetListInfoFromStartArgs(args)
p, err := sdk.PreferProtoDataConverter.ToPayload(info)
if err != nil {
wh.logger.Error("encoding initial schedule memo failed", tag.Error(err))
}
if request.Memo == nil {
request.Memo = &commonpb.Memo{}
}
if request.Memo.Fields == nil {
request.Memo.Fields = make(map[string]*commonpb.Payload)
}
request.Memo.Fields[scheduler.MemoFieldInfo] = p
}

func getBatchOperationState(workflowState enumspb.WorkflowExecutionStatus) enumspb.BatchOperationState {
var operationState enumspb.BatchOperationState
switch workflowState {
Expand Down
21 changes: 20 additions & 1 deletion service/worker/scheduler/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,9 @@ func (s *scheduler) ensureFields() {
func (s *scheduler) compileSpec() {
cspec, err := NewCompiledSpec(s.Schedule.Spec)
if err != nil {
s.logger.Error("Invalid schedule", "error", err)
if s.logger != nil {
s.logger.Error("Invalid schedule", "error", err)
}
s.Info.InvalidScheduleError = err.Error()
s.cspec = nil
} else {
Expand Down Expand Up @@ -516,6 +518,8 @@ func (s *scheduler) processSignals() bool {
}

func (s *scheduler) getFutureActionTimes(n int) []*time.Time {
// Note that `s` may be a fake scheduler used to compute list info at creation time.

if s.cspec == nil {
return nil
}
Expand Down Expand Up @@ -571,6 +575,9 @@ func (s *scheduler) incSeqNo() {
}

func (s *scheduler) getListInfo() *schedpb.ScheduleListInfo {
// Note that `s` may be a fake scheduler used to compute list info at creation time, before
// the first workflow task. This function and anything it calls should not use s.ctx.

// make shallow copy
spec := *s.Schedule.Spec
// clear fields that are too large/not useful for the list view
Expand Down Expand Up @@ -676,6 +683,7 @@ func (s *scheduler) addStart(nominalTime, actualTime time.Time, overlapPolicy en
}

// processBuffer should return true if there might be more work to do right now.
//
//nolint:revive
func (s *scheduler) processBuffer() bool {
s.logger.Debug("processBuffer", "buffer", len(s.State.BufferedStarts), "running", len(s.Info.RunningWorkflows), "needRefresh", s.State.NeedRefresh)
Expand Down Expand Up @@ -941,3 +949,14 @@ func panicIfErr(err error) {
panic(err)
}
}

func GetListInfoFromStartArgs(args *schedspb.StartScheduleArgs) *schedpb.ScheduleListInfo {
// note that this does not take into account InitialPatch
fakeScheduler := &scheduler{
StartScheduleArgs: *args,
tweakables: currentTweakablePolicies,
}
fakeScheduler.ensureFields()
fakeScheduler.compileSpec()
return fakeScheduler.getListInfo()
}

0 comments on commit ff76e9b

Please sign in to comment.