From d19c0d812a6932d4835328b76ce08ebe95692973 Mon Sep 17 00:00:00 2001 From: David Reiss Date: Mon, 23 Jan 2023 21:12:10 -0800 Subject: [PATCH] add test and fix --- service/frontend/workflow_handler.go | 8 ++- tests/schedule_test.go | 87 +++++++++++++++++++++++++++- 2 files changed, 93 insertions(+), 2 deletions(-) diff --git a/service/frontend/workflow_handler.go b/service/frontend/workflow_handler.go index cced6043db7..bccfad4f8a2 100644 --- a/service/frontend/workflow_handler.go +++ b/service/frontend/workflow_handler.go @@ -4842,9 +4842,15 @@ func (wh *WorkflowHandler) cleanScheduleMemo(memo *commonpb.Memo) *commonpb.Memo // This mutates request (but idempotent so safe for retries) func (wh *WorkflowHandler) addInitialScheduleMemo(request *workflowservice.CreateScheduleRequest, args *schedspb.StartScheduleArgs) { info := scheduler.GetListInfoFromStartArgs(args) - p, err := sdk.PreferProtoDataConverter.ToPayload(info) + infoBytes, err := info.Marshal() if err != nil { wh.logger.Error("encoding initial schedule memo failed", tag.Error(err)) + return + } + p, err := sdk.PreferProtoDataConverter.ToPayload(infoBytes) + if err != nil { + wh.logger.Error("encoding initial schedule memo failed", tag.Error(err)) + return } if request.Memo == nil { request.Memo = &commonpb.Memo{} diff --git a/tests/schedule_test.go b/tests/schedule_test.go index 5eed6467935..163ba359fae 100644 --- a/tests/schedule_test.go +++ b/tests/schedule_test.go @@ -46,6 +46,7 @@ import ( "go.temporal.io/sdk/worker" "go.temporal.io/sdk/workflow" + "go.temporal.io/server/common/dynamicconfig" "go.temporal.io/server/common/log/tag" "go.temporal.io/server/common/payload" "go.temporal.io/server/common/persistence/sql/sqlplugin/sqlite" @@ -259,7 +260,7 @@ func (s *scheduleIntegrationSuite) TestBasics() { Namespace: s.namespace, MaximumPageSize: 5, }) - if err != nil || len(listResp.Schedules) != 1 || len(listResp.Schedules[0].GetInfo().GetRecentActions()) < 2 { + if err != nil || len(listResp.Schedules) != 1 || listResp.Schedules[0].ScheduleId != sid || len(listResp.Schedules[0].GetInfo().GetRecentActions()) < 2 { return false } s.NoError(err) @@ -466,6 +467,14 @@ func (s *scheduleIntegrationSuite) TestInput() { _, err = s.engine.CreateSchedule(NewContext(), req) s.NoError(err) s.Eventually(func() bool { return atomic.LoadInt32(&runs) == 1 }, 5*time.Second, 200*time.Millisecond) + + // cleanup + _, err = s.engine.DeleteSchedule(NewContext(), &workflowservice.DeleteScheduleRequest{ + Namespace: s.namespace, + ScheduleId: sid, + Identity: "test", + }) + s.NoError(err) } func (s *scheduleIntegrationSuite) TestRefresh() { @@ -546,4 +555,80 @@ func (s *scheduleIntegrationSuite) TestRefresh() { // scheduler has done some stuff events3 := s.getHistory(s.namespace, &commonpb.WorkflowExecution{WorkflowId: scheduler.WorkflowIDPrefix + sid}) s.Greater(len(events3), len(events2)) + + // cleanup + _, err = s.engine.DeleteSchedule(NewContext(), &workflowservice.DeleteScheduleRequest{ + Namespace: s.namespace, + ScheduleId: sid, + Identity: "test", + }) + s.NoError(err) +} + +func (s *scheduleIntegrationSuite) TestListBeforeRun() { + sid := "sched-test-list-before-run" + wid := "sched-test-list-before-run-wf" + wt := "sched-test-list-before-run-wt" + + // disable per-ns worker so that the schedule workflow never runs + dc := s.testCluster.host.dcClient + dc.OverrideValue(dynamicconfig.WorkerPerNamespaceWorkerCount, 0) + s.testCluster.host.workerService.RefreshPerNSWorkerManager() + + schedule := &schedulepb.Schedule{ + Spec: &schedulepb.ScheduleSpec{ + Interval: []*schedulepb.IntervalSpec{ + {Interval: timestamp.DurationPtr(3 * time.Second)}, + }, + }, + Action: &schedulepb.ScheduleAction{ + Action: &schedulepb.ScheduleAction_StartWorkflow{ + StartWorkflow: &workflowpb.NewWorkflowExecutionInfo{ + WorkflowId: wid, + WorkflowType: &commonpb.WorkflowType{Name: wt}, + TaskQueue: &taskqueuepb.TaskQueue{Name: s.taskQueue}, + }, + }, + }, + } + req := &workflowservice.CreateScheduleRequest{ + Namespace: s.namespace, + ScheduleId: sid, + Schedule: schedule, + Identity: "test", + RequestId: uuid.New(), + } + + _, err := s.engine.CreateSchedule(NewContext(), req) + s.NoError(err) + + s.Eventually(func() bool { // wait for visibility + listResp, err := s.engine.ListSchedules(NewContext(), &workflowservice.ListSchedulesRequest{ + Namespace: s.namespace, + MaximumPageSize: 5, + }) + if err != nil || len(listResp.Schedules) != 1 || listResp.Schedules[0].ScheduleId != sid { + return false + } + s.NoError(err) + entry := listResp.Schedules[0] + s.Equal(sid, entry.ScheduleId) + s.NotNil(entry.Info) + s.Equal(schedule.Spec, entry.Info.Spec) + s.Equal(wt, entry.Info.WorkflowType.Name) + s.False(entry.Info.Paused) + s.Greater(len(entry.Info.FutureActionTimes), 1) + return true + }, 10*time.Second, 1*time.Second) + + // cleanup + _, err = s.engine.DeleteSchedule(NewContext(), &workflowservice.DeleteScheduleRequest{ + Namespace: s.namespace, + ScheduleId: sid, + Identity: "test", + }) + s.NoError(err) + + dc.RemoveOverride(dynamicconfig.WorkerPerNamespaceWorkerCount) + s.testCluster.host.workerService.RefreshPerNSWorkerManager() }