Skip to content

Commit

Permalink
[Scheduled Actions] Limit the length of the ScheduleInfo Spec fields …
Browse files Browse the repository at this point in the history
…in the memo block (ListSchedules) (temporalio#7356)

## What changed?
- The Spec fields on `ScheduleInfo` have an upper bound applied to their
length with the LimitMemoSpecSize version applied.

## Why?
- Large ScheduleInfo blocks in the memo lead to unnecessarily large
database transactions. Customers can still use `DescribeSchedule` to get
the full spec.

## How did you test it?
- New functional test.
- `go test -tags test_dep -v ./tests/ -run
ScheduleFunctionalSuite/TestLimitMemoSpecSize && make lint`

## Potential risks
- We break a customer who'd been relying on an exhaustive ScheduleInfo
block in ListSchedules responses upon rollout.
  • Loading branch information
lina-temporal authored Feb 24, 2025
1 parent 274a064 commit 8def786
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 0 deletions.
12 changes: 12 additions & 0 deletions service/worker/scheduler/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ const (
AccurateFutureActionTimes = 9
// include WorkflowExecutionStatus in ScheduleActionResult
ActionResultIncludesStatus = 10
// limit the ScheduleSpec specs and exclusions to only 10 entries
LimitMemoSpecSize = 11
)

const (
Expand Down Expand Up @@ -172,6 +174,7 @@ type (
AllowZeroSleep bool // Whether to allow a zero-length timer. Used for workflow compatibility.
ReuseTimer bool // Whether to reuse timer. Used for workflow compatibility.
NextTimeCacheV2Size int // Size of next time cache (v2)
SpecFieldLengthLimit int // item limit per spec field on the ScheduleInfo memo
Version SchedulerWorkflowVersion // Used to keep track of schedules version to release new features and for backward compatibility
// version 0 corresponds to the schedule version that comes before introducing the Version parameter

Expand Down Expand Up @@ -221,6 +224,7 @@ var (
AllowZeroSleep: true,
ReuseTimer: true,
NextTimeCacheV2Size: 14, // see note below
SpecFieldLengthLimit: 10,
Version: ActionResultIncludesStatus,
}

Expand Down Expand Up @@ -1038,6 +1042,14 @@ func (s *scheduler) getListInfo(inWorkflowContext bool) *schedulepb.ScheduleList
// clear fields that are too large/not useful for the list view
spec.TimezoneData = nil

if s.hasMinVersion(LimitMemoSpecSize) {
// Limit the number of specs and exclusions stored on the memo.
limit := s.tweakables.SpecFieldLengthLimit
spec.ExcludeStructuredCalendar = util.SliceHead(spec.ExcludeStructuredCalendar, limit)
spec.Interval = util.SliceHead(spec.Interval, limit)
spec.StructuredCalendar = util.SliceHead(spec.StructuredCalendar, limit)
}

return &schedulepb.ScheduleListInfo{
Spec: spec,
WorkflowType: s.Schedule.Action.GetStartWorkflow().GetWorkflowType(),
Expand Down
75 changes: 75 additions & 0 deletions tests/schedule_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -981,6 +981,81 @@ func (s *ScheduleFunctionalSuite) TestListSchedulesReturnsWorkflowStatus() {
s.assertSameRecentActions(descResp, listResp)
}

// A schedule's memo should have an upper bound on the number of spec items stored.
func (s *ScheduleFunctionalSuite) TestLimitMemoSpecSize() {
// TODO - remove when MemoSpecFieldLimit becomes the default.
prevTweakables := scheduler.CurrentTweakablePolicies
scheduler.CurrentTweakablePolicies.Version = scheduler.LimitMemoSpecSize
defer func() { scheduler.CurrentTweakablePolicies = prevTweakables }()

expectedLimit := scheduler.CurrentTweakablePolicies.SpecFieldLengthLimit

sid := "sched-test-limit-memo-size"
wid := "sched-test-limit-memo-size-wf"
wt := "sched-test-limit-memo-size-wt"

schedule := &schedulepb.Schedule{
Spec: &schedulepb.ScheduleSpec{},
Action: &schedulepb.ScheduleAction{
Action: &schedulepb.ScheduleAction_StartWorkflow{
StartWorkflow: &workflowpb.NewWorkflowExecutionInfo{
WorkflowId: wid,
WorkflowType: &commonpb.WorkflowType{Name: wt},
TaskQueue: &taskqueuepb.TaskQueue{Name: s.taskQueue, Kind: enumspb.TASK_QUEUE_KIND_NORMAL},
},
},
},
}

// Set up a schedule with a large number of spec items that should be trimmed in
// the memo block.
for i := 0; i < expectedLimit*2; i++ {
schedule.Spec.Interval = append(schedule.Spec.Interval, &schedulepb.IntervalSpec{
Interval: durationpb.New(time.Duration(i+1) * time.Second),
})
schedule.Spec.StructuredCalendar = append(schedule.Spec.StructuredCalendar, &schedulepb.StructuredCalendarSpec{
Minute: []*schedulepb.Range{
{
Start: int32(i + 1),
End: int32(i + 1),
},
},
})
schedule.Spec.ExcludeStructuredCalendar = append(schedule.Spec.ExcludeStructuredCalendar, &schedulepb.StructuredCalendarSpec{
Second: []*schedulepb.Range{
{
Start: int32(i + 1),
End: int32(i + 1),
},
},
})
}

// Create the schedule.
req := &workflowservice.CreateScheduleRequest{
Namespace: s.Namespace().String(),
ScheduleId: sid,
Schedule: schedule,
Identity: "test",
RequestId: uuid.New(),
}
s.worker.RegisterWorkflowWithOptions(
func(ctx workflow.Context) error { return nil },
workflow.RegisterOptions{Name: wt},
)
_, err := s.FrontendClient().CreateSchedule(testcore.NewContext(), req)
s.NoError(err)
s.cleanup(sid)

// Verify the memo field length limit was enforced.
entry := s.getScheduleEntryFomVisibility(sid, nil)
s.Require().NotNil(entry)
spec := entry.GetInfo().GetSpec()
s.Require().Equal(expectedLimit, len(spec.GetInterval()))
s.Require().Equal(expectedLimit, len(spec.GetStructuredCalendar()))
s.Require().Equal(expectedLimit, len(spec.GetExcludeStructuredCalendar()))
}

func (s *ScheduleFunctionalSuite) TestNextTimeCache() {
sid := "sched-test-next-time-cache"
wid := "sched-test-next-time-cache-wf"
Expand Down

0 comments on commit 8def786

Please sign in to comment.