Skip to content

Commit

Permalink
Skip over entire time range if paused and batch and cache time queries (
Browse files Browse the repository at this point in the history
#4215)

* Skip over entier time range if paused

* Refactor

* versioning using tweakables

* batch next time calculation in side effect

* Add version and a time generator to cache time results

* Update service/worker/scheduler/workflow.go

Co-authored-by: David Reiss <[email protected]>

* Refactor based on PR comments

* Update service/worker/scheduler/workflow.go

Co-authored-by: David Reiss <[email protected]>

* Update service/worker/scheduler/workflow.go

Co-authored-by: David Reiss <[email protected]>

* Update service/worker/scheduler/workflow.go

Co-authored-by: David Reiss <[email protected]>

* Update service/worker/scheduler/workflow.go

Co-authored-by: David Reiss <[email protected]>

* Add enum for versioning

* Fix a bug with skipping schedules

* Additional check for Equal

* Test cache

* Update service/worker/scheduler/workflow.go

Co-authored-by: David Reiss <[email protected]>

* Use map and add backfill to test

* Update service/worker/scheduler/workflow.go

Co-authored-by: David Reiss <[email protected]>

* Update service/worker/scheduler/workflow.go

Co-authored-by: David Reiss <[email protected]>

* Update service/worker/scheduler/workflow.go

Co-authored-by: David Reiss <[email protected]>

* Update name

* Improve backfill injection and add some randomness

* Clear the map before re-populating

* set map to nil

* update tests to refill cache

---------

Co-authored-by: David Reiss <[email protected]>
  • Loading branch information
samanbarghi and dnr authored May 10, 2023
1 parent 807b791 commit dc7f28d
Show file tree
Hide file tree
Showing 2 changed files with 141 additions and 10 deletions.
79 changes: 69 additions & 10 deletions service/worker/scheduler/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,15 @@ import (
"go.temporal.io/server/common/util"
)

type SchedulerWorkflowVersion int64

const (
// represents the state before Version is introduced
InitialVersion SchedulerWorkflowVersion = iota
// skip over entire time range if paused and batch and cache getNextTime queries
BatchAndCacheTimeQueries
)

const (
// Schedules are implemented by a workflow whose ID is this string plus the schedule ID.
WorkflowIDPrefix = "temporal-sys-scheduler:"
Expand All @@ -77,6 +86,8 @@ const (
maxListMatchingTimesCount = 1000

rateLimitedErrorType = "RateLimited"

maxNextTimeResultCacheSize = 10
)

type (
Expand Down Expand Up @@ -104,6 +115,10 @@ type (
pendingUpdate *schedspb.FullUpdateRequest

uuidBatch []string

// This cache is used to store time results after batching getNextTime queries
// in a single SideEffect
nextTimeResultCache map[time.Time]getNextTimeResult
}

tweakablePolicies struct {
Expand All @@ -120,8 +135,10 @@ type (
// MaxBufferSize limits the number of buffered starts. This also limits the number of
// workflows that can be backfilled at once (since they all have to fit in the buffer).
MaxBufferSize int
AllowZeroSleep bool // Whether to allow a zero-length timer. Used for workflow compatibility.
ReuseTimer bool // Whether to reuse timer. Used for workflow compatibility.
AllowZeroSleep bool // Whether to allow a zero-length timer. Used for workflow compatibility.
ReuseTimer bool // Whether to reuse timer. Used for workflow compatibility.
Version SchedulerWorkflowVersion // Used to keep track of schedules version to release new features and for backward compatibility
// version 0 corresponds to the schedule version that comes before introducing the Version parameter
}
)

Expand Down Expand Up @@ -155,6 +172,7 @@ var (
MaxBufferSize: 1000,
AllowZeroSleep: true,
ReuseTimer: true,
Version: BatchAndCacheTimeQueries,
}

errUpdateConflict = errors.New("conflicting concurrent update")
Expand Down Expand Up @@ -202,6 +220,7 @@ func (s *scheduler) run() error {
s.InitialPatch = nil

for iters := s.tweakables.IterationsBeforeContinueAsNew; iters > 0 || s.pendingUpdate != nil || s.pendingPatch != nil; iters-- {

t1 := timestamp.TimeValue(s.State.LastProcessedTime)
t2 := s.now()
if t2.Before(t1) {
Expand Down Expand Up @@ -271,6 +290,9 @@ func (s *scheduler) ensureFields() {
}

func (s *scheduler) compileSpec() {
// if spec changes invalidate current nextTimeResult cache
s.nextTimeResultCache = nil

cspec, err := NewCompiledSpec(s.Schedule.Spec)
if err != nil {
if s.logger != nil {
Expand Down Expand Up @@ -334,6 +356,29 @@ func (s *scheduler) processPatch(patch *schedpb.SchedulePatch) {
}
}

func (s *scheduler) getNextTime(after time.Time) getNextTimeResult {

// we populate the map sequentially, if after is not in the map, it means we either exhausted
// all items, or we jumped through time (forward or backward), in either case, refresh the cache
next, ok := s.nextTimeResultCache[after]
if ok {
return next
}
s.nextTimeResultCache = nil
// Run this logic in a SideEffect so that we can fix bugs there without breaking
// existing schedule workflows.
panicIfErr(workflow.SideEffect(s.ctx, func(ctx workflow.Context) interface{} {
results := make(map[time.Time]getNextTimeResult)
for t := after; !t.IsZero() && len(results) < maxNextTimeResultCacheSize; {
next := s.cspec.getNextTime(t)
results[t] = next
t = next.Next
}
return results
}).Get(&s.nextTimeResultCache))
return s.nextTimeResultCache[after]
}

func (s *scheduler) processTimeRange(
t1, t2 time.Time,
overlapPolicy enumspb.ScheduleOverlapPolicy,
Expand All @@ -347,20 +392,34 @@ func (s *scheduler) processTimeRange(

catchupWindow := s.getCatchupWindow()

// A previous version would record a marker for each time which could make a workflow
// fail. With the new version, the entire time range is skipped if the workflow is paused
// or we are not going to take an action now
if s.tweakables.Version >= BatchAndCacheTimeQueries {
// Peek at paused/remaining actions state and don't bother if we're not going to
// take an action now. (Don't count as missed catchup window either.)
// Skip over entire time range if paused or no actions can be taken
if !s.canTakeScheduledAction(manual, false) {
return s.getNextTime(t2).Next
}
}

for {
// Run this logic in a SideEffect so that we can fix bugs there without breaking
// existing schedule workflows.
var next getNextTimeResult
panicIfErr(workflow.SideEffect(s.ctx, func(ctx workflow.Context) interface{} {
return s.cspec.getNextTime(t1)
}).Get(&next))
if s.tweakables.Version < BatchAndCacheTimeQueries {
// Run this logic in a SideEffect so that we can fix bugs there without breaking
// existing schedule workflows.
panicIfErr(workflow.SideEffect(s.ctx, func(ctx workflow.Context) interface{} {
return s.cspec.getNextTime(t1)
}).Get(&next))
} else {
next = s.getNextTime(t1)
}
t1 = next.Next
if t1.IsZero() || t1.After(t2) {
return t1
}
// Peek at paused/remaining actions state and don't bother if we're not going to
// take an action now. (Don't count as missed catchup window either.)
if !s.canTakeScheduledAction(manual, false) {
if s.tweakables.Version < BatchAndCacheTimeQueries && !s.canTakeScheduledAction(manual, false) {
continue
}
if !manual && t2.Sub(t1) > catchupWindow {
Expand Down
72 changes: 72 additions & 0 deletions service/worker/scheduler/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ package scheduler
import (
"context"
"errors"
"math/rand"
"testing"
"time"

Expand Down Expand Up @@ -1402,3 +1403,74 @@ func (s *workflowSuite) TestLimitedActions() {
s.True(s.env.IsWorkflowCompleted())
// doesn't end properly since it sleeps forever after pausing
}

func (s *workflowSuite) TestLotsOfIterations() {
// This is mostly testing getNextTime caching logic.
const runIterations = 30
const backfillIterations = 15

runs := make([]workflowRun, runIterations)
for i := range runs {
t := time.Date(2022, 6, 1, i, 27+i%2, 0, 0, time.UTC)
runs[i] = workflowRun{
id: "myid-" + t.Format(time.RFC3339),
start: t,
end: t.Add(time.Duration(5+i%7) * time.Minute),
result: enumspb.WORKFLOW_EXECUTION_STATUS_COMPLETED,
}
}

delayedCallbacks := make([]delayedCallback, backfillIterations)

expected := runIterations
// schedule a call back every hour to spray backfills among scheduled runs
// each call back adds random number of backfills in
// [maxNextTimeResultCacheSize, 2*maxNextTimeResultCacheSize) range
for i := range delayedCallbacks {

maxRuns := rand.Intn(maxNextTimeResultCacheSize) + maxNextTimeResultCacheSize
expected += maxRuns
// a point in time to send the callback request
callbackTime := time.Date(2022, 6, 1, i+15, 2, 0, 0, time.UTC)
// start time for callback request
callBackRangeStartTime := time.Date(2022, 5, i, 0, 0, 0, 0, time.UTC)

// add/process maxRuns schedules
for j := 0; j < maxRuns; j++ {
runStartTime := time.Date(2022, 5, i, j, 27+j%2, 0, 0, time.UTC)
runs = append(runs, workflowRun{
id: "myid-" + runStartTime.Format(time.RFC3339),
start: callbackTime.Add(time.Duration(j) * time.Minute),
end: callbackTime.Add(time.Duration(j+1) * time.Minute),
result: enumspb.WORKFLOW_EXECUTION_STATUS_COMPLETED,
})
}

delayedCallbacks[i] = delayedCallback{
at: callbackTime,
f: func() {
s.env.SignalWorkflow(SignalNamePatch, &schedpb.SchedulePatch{
BackfillRequest: []*schedpb.BackfillRequest{{
StartTime: timestamp.TimePtr(callBackRangeStartTime),
EndTime: timestamp.TimePtr(callBackRangeStartTime.Add(time.Duration(maxRuns) * time.Hour)),
OverlapPolicy: enumspb.SCHEDULE_OVERLAP_POLICY_BUFFER_ALL,
}},
})
},
}
}

s.runAcrossContinue(
runs,
delayedCallbacks,
&schedpb.Schedule{
Spec: &schedpb.ScheduleSpec{
Calendar: []*schedpb.CalendarSpec{
{Minute: "27", Hour: "0/2"},
{Minute: "28", Hour: "1/2"},
},
},
},
expected+1,
)
}

0 comments on commit dc7f28d

Please sign in to comment.