Skip to content

Commit

Permalink
Add metrics + max buffer size to schedule workflow (#3872)
Browse files Browse the repository at this point in the history
  • Loading branch information
dnr authored Feb 3, 2023
1 parent c7d7a89 commit a4943bb
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 20 deletions.
13 changes: 13 additions & 0 deletions common/metrics/metric_defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1311,6 +1311,12 @@ const (
TaskTypeTimerStandbyTaskDeleteHistoryEvent = "TimerStandbyTaskDeleteHistoryEvent"
)

// Schedule action types
const (
ScheduleActionTypeTag = "schedule_action"
ScheduleActionStartWorkflow = "start_workflow"
)

var (
ServiceRequests = NewCounterDef("service_requests")
ServicePendingRequests = NewGaugeDef("service_pending_requests")
Expand Down Expand Up @@ -1714,6 +1720,13 @@ var (
NamespaceReplicationEnqueueDLQCount = NewCounterDef("namespace_replication_dlq_enqueue_requests")
ParentClosePolicyProcessorSuccess = NewCounterDef("parent_close_policy_processor_requests")
ParentClosePolicyProcessorFailures = NewCounterDef("parent_close_policy_processor_errors")
ScheduleMissedCatchupWindow = NewCounterDef("schedule_missed_catchup_window")
ScheduleRateLimited = NewCounterDef("schedule_rate_limited")
ScheduleBufferOverruns = NewCounterDef("schedule_buffer_overruns")
ScheduleActionSuccess = NewCounterDef("schedule_action_success")
ScheduleActionErrors = NewCounterDef("schedule_action_errors")
ScheduleCancelWorkflowErrors = NewCounterDef("schedule_cancel_workflow_errors")
ScheduleTerminateWorkflowErrors = NewCounterDef("schedule_terminate_workflow_errors")

// Replication
NamespaceReplicationTaskAckLevelGauge = NewGaugeDef("namespace_replication_task_ack_level")
Expand Down
65 changes: 45 additions & 20 deletions service/worker/scheduler/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@ package scheduler
import (
"errors"
"fmt"
"strings"
"time"

"github.com/gogo/protobuf/jsonpb"
"github.com/gogo/protobuf/proto"
"github.com/google/uuid"
"golang.org/x/exp/slices"
Expand All @@ -38,11 +40,13 @@ import (
schedpb "go.temporal.io/api/schedule/v1"
workflowpb "go.temporal.io/api/workflow/v1"
"go.temporal.io/api/workflowservice/v1"
sdkclient "go.temporal.io/sdk/client"
sdklog "go.temporal.io/sdk/log"
"go.temporal.io/sdk/temporal"
"go.temporal.io/sdk/workflow"

schedspb "go.temporal.io/server/api/schedule/v1"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/payload"
"go.temporal.io/server/common/primitives/timestamp"
"go.temporal.io/server/common/searchattribute"
Expand Down Expand Up @@ -81,9 +85,10 @@ type (
scheduler struct {
schedspb.StartScheduleArgs

ctx workflow.Context
a *activities
logger sdklog.Logger
ctx workflow.Context
a *activities
logger sdklog.Logger
metrics sdkclient.MetricsHandler

cspec *CompiledSpec

Expand Down Expand Up @@ -111,6 +116,9 @@ type (
RecentActionCountForList int // The number of recent actual action results to include in List (search attr).
IterationsBeforeContinueAsNew int
SleepWhilePaused bool // If true, don't set timers while paused/out of actions
// MaxBufferSize limits the number of buffered starts. This also limits the number of
// workflows that can be backfilled at once (since they all have to fit in the buffer).
MaxBufferSize int
}
)

Expand Down Expand Up @@ -141,6 +149,7 @@ var (
RecentActionCountForList: 5,
IterationsBeforeContinueAsNew: 500,
SleepWhilePaused: true,
MaxBufferSize: 1000,
}

errUpdateConflict = errors.New("conflicting concurrent update")
Expand All @@ -151,14 +160,13 @@ func SchedulerWorkflow(ctx workflow.Context, args *schedspb.StartScheduleArgs) e
StartScheduleArgs: *args,
ctx: ctx,
a: nil,
logger: sdklog.With(workflow.GetLogger(ctx), "schedule-id", args.State.ScheduleId),
logger: sdklog.With(workflow.GetLogger(ctx), "wf-namespace", args.State.Namespace, "schedule-id", args.State.ScheduleId),
metrics: workflow.GetMetricsHandler(ctx).WithTags(map[string]string{"namespace": args.State.Namespace}),
}
return scheduler.run()
}

func (s *scheduler) run() error {
s.logger.Info("Schedule starting", "schedule", s.Schedule)

s.updateTweakables()
s.ensureFields()
s.compileSpec()
Expand All @@ -171,7 +179,13 @@ func (s *scheduler) run() error {
}

if s.State.LastProcessedTime == nil {
s.logger.Debug("Initializing internal state")
// log these as json since it's more readable than the Go representation
var m jsonpb.Marshaler
var specJson, policiesJson strings.Builder
_ = m.Marshal(&specJson, s.Schedule.Spec)
_ = m.Marshal(&policiesJson, s.Schedule.Policies)
s.logger.Info("Starting schedule", "spec", specJson.String(), "policies", policiesJson.String())

s.State.LastProcessedTime = timestamp.TimePtr(s.now())
s.State.ConflictToken = InitialConflictToken
s.Info.CreateTime = s.State.LastProcessedTime
Expand Down Expand Up @@ -218,7 +232,7 @@ func (s *scheduler) run() error {

// Any watcher activities will get cancelled automatically if running.

s.logger.Info("Schedule doing continue-as-new")
s.logger.Debug("Schedule doing continue-as-new")
return workflow.NewContinueAsNewError(s.ctx, WorkflowType, &s.StartScheduleArgs)
}

Expand Down Expand Up @@ -287,7 +301,7 @@ func (s *scheduler) now() time.Time {
}

func (s *scheduler) processPatch(patch *schedpb.SchedulePatch) {
s.logger.Info("Schedule patch", "patch", patch.String())
s.logger.Debug("Schedule patch", "patch", patch.String())

if trigger := patch.TriggerImmediately; trigger != nil {
now := s.now()
Expand Down Expand Up @@ -320,7 +334,7 @@ func (s *scheduler) processTimeRange(
overlapPolicy enumspb.ScheduleOverlapPolicy,
manual bool,
) time.Duration {
s.logger.Debug("processTimeRange", "t1", t1, "t2", t2, "overlapPolicy", overlapPolicy, "manual", manual)
s.logger.Debug("processTimeRange", "t1", t1, "t2", t2, "overlap-policy", overlapPolicy, "manual", manual)

if s.cspec == nil {
return invalidDuration
Expand All @@ -343,6 +357,7 @@ func (s *scheduler) processTimeRange(
}
if !manual && t2.Sub(t1) > catchupWindow {
s.logger.Warn("Schedule missed catchup window", "now", t2, "time", t1)
s.metrics.Counter(metrics.ScheduleMissedCatchupWindow.GetMetricName()).Inc(1)
s.Info.MissedCatchupWindow++
continue
}
Expand Down Expand Up @@ -410,7 +425,7 @@ func (s *scheduler) sleep(nextSleep time.Duration) {
sel.AddFuture(s.watchingFuture, s.wfWatcherReturned)
}

s.logger.Debug("sleeping", "nextSleep", nextSleep, "watching", s.watchingFuture != nil)
s.logger.Debug("sleeping", "next-sleep", nextSleep, "watching", s.watchingFuture != nil)
sel.Select(s.ctx)
for sel.HasPending() {
sel.Select(s.ctx)
Expand Down Expand Up @@ -456,10 +471,10 @@ func (s *scheduler) processWatcherResult(id string, f workflow.Future) {
s.Schedule.State.Paused = true
if res.Status == enumspb.WORKFLOW_EXECUTION_STATUS_FAILED {
s.Schedule.State.Notes = fmt.Sprintf("paused due to workflow failure: %s: %s", id, res.GetFailure().GetMessage())
s.logger.Info("paused due to workflow failure", "workflow", id, "message", res.GetFailure().GetMessage())
s.logger.Debug("paused due to workflow failure", "workflow", id, "message", res.GetFailure().GetMessage())
} else if res.Status == enumspb.WORKFLOW_EXECUTION_STATUS_TIMED_OUT {
s.Schedule.State.Notes = fmt.Sprintf("paused due to workflow timeout: %s", id)
s.logger.Info("paused due to workflow timeout", "workflow", id)
s.logger.Debug("paused due to workflow timeout", "workflow", id)
}
s.incSeqNo()
}
Expand All @@ -473,7 +488,7 @@ func (s *scheduler) processWatcherResult(id string, f workflow.Future) {
s.State.ContinuedFailure = res.GetFailure()
}

s.logger.Info("started workflow finished", "workflow", id, "status", res.Status, "pause-after-failure", pauseOnFailure)
s.logger.Debug("started workflow finished", "workflow", id, "status", res.Status, "pause-after-failure", pauseOnFailure)
}

func (s *scheduler) processUpdate(req *schedspb.FullUpdateRequest) {
Expand All @@ -482,7 +497,7 @@ func (s *scheduler) processUpdate(req *schedspb.FullUpdateRequest) {
return
}

s.logger.Info("Schedule update", "new-schedule", req.Schedule.String())
s.logger.Debug("Schedule update", "new-schedule", req.Schedule.String())

s.Schedule.Spec = req.Schedule.GetSpec()
s.Schedule.Action = req.Schedule.GetAction()
Expand Down Expand Up @@ -672,7 +687,12 @@ func (s *scheduler) resolveOverlapPolicy(overlapPolicy enumspb.ScheduleOverlapPo
}

func (s *scheduler) addStart(nominalTime, actualTime time.Time, overlapPolicy enumspb.ScheduleOverlapPolicy, manual bool) {
s.logger.Debug("addStart", "nominal", nominalTime, "actual", actualTime, "overlapPolicy", overlapPolicy, "manual", manual)
s.logger.Debug("addStart", "start-time", nominalTime, "actual-start-time", actualTime, "overlap-policy", overlapPolicy, "manual", manual)
if s.tweakables.MaxBufferSize > 0 && len(s.State.BufferedStarts) >= s.tweakables.MaxBufferSize {
s.logger.Warn("Buffer too large", "start-time", nominalTime, "overlap-policy", overlapPolicy, "manual", manual)
s.metrics.Counter(metrics.ScheduleBufferOverruns.GetMetricName()).Inc(1)
return
}
s.State.BufferedStarts = append(s.State.BufferedStarts, &schedspb.BufferedStart{
NominalTime: timestamp.TimePtr(nominalTime),
ActualTime: timestamp.TimePtr(actualTime),
Expand All @@ -688,7 +708,7 @@ func (s *scheduler) addStart(nominalTime, actualTime time.Time, overlapPolicy en
//
//nolint:revive
func (s *scheduler) processBuffer() bool {
s.logger.Debug("processBuffer", "buffer", len(s.State.BufferedStarts), "running", len(s.Info.RunningWorkflows), "needRefresh", s.State.NeedRefresh)
s.logger.Debug("processBuffer", "buffer", len(s.State.BufferedStarts), "running", len(s.Info.RunningWorkflows), "need-refresh", s.State.NeedRefresh)

// TODO: consider doing this always and removing needRefresh? we only end up here without
// needRefresh in the case of update, or patch without an immediate run, so it's not much
Expand Down Expand Up @@ -727,14 +747,18 @@ func (s *scheduler) processBuffer() bool {
continue
}
result, err := s.startWorkflow(start, req)
metricsWithTag := s.metrics.WithTags(map[string]string{
metrics.ScheduleActionTypeTag: metrics.ScheduleActionStartWorkflow})
if err != nil {
s.logger.Error("Failed to start workflow", "error", err)
metricsWithTag.Counter(metrics.ScheduleActionErrors.GetMetricName()).Inc(1)
// TODO: we could put this back in the buffer and retry (after a delay) up until
// the catchup window. of course, it's unlikely that this workflow would be making
// progress while we're unable to start a new one, so maybe it's not that valuable.
tryAgain = true
continue
}
metricsWithTag.Counter(metrics.ScheduleActionSuccess.GetMetricName()).Inc(1)
s.recordAction(result)
}

Expand Down Expand Up @@ -826,9 +850,8 @@ func (s *scheduler) startWorkflow(

var appErr *temporal.ApplicationError
var details rateLimitedDetails
if errors.As(err, &appErr) &&
appErr.Type() == rateLimitedErrorType &&
appErr.Details(&details) == nil {
if errors.As(err, &appErr) && appErr.Type() == rateLimitedErrorType && appErr.Details(&details) == nil {
s.metrics.Counter(metrics.ScheduleRateLimited.GetMetricName()).Inc(1)
workflow.Sleep(s.ctx, details.Delay)
req.CompletedRateLimitSleep = true // only use rate limiter once
continue
Expand Down Expand Up @@ -920,6 +943,7 @@ func (s *scheduler) cancelWorkflow(ex *commonpb.WorkflowExecution) {
err := workflow.ExecuteLocalActivity(ctx, s.a.CancelWorkflow, areq).Get(s.ctx, nil)
if err != nil {
s.logger.Error("cancel workflow failed", "workflow", ex.WorkflowId, "error", err)
s.metrics.Counter(metrics.ScheduleCancelWorkflowErrors.GetMetricName()).Inc(1)
}
// Note: the local activity has completed (or failed) here but the workflow might take time
// to close since a cancel is only a request.
Expand All @@ -937,6 +961,7 @@ func (s *scheduler) terminateWorkflow(ex *commonpb.WorkflowExecution) {
err := workflow.ExecuteLocalActivity(ctx, s.a.TerminateWorkflow, areq).Get(s.ctx, nil)
if err != nil {
s.logger.Error("terminate workflow failed", "workflow", ex.WorkflowId, "error", err)
s.metrics.Counter(metrics.ScheduleTerminateWorkflowErrors.GetMetricName()).Inc(1)
}
// Note: the local activity has completed (or failed) here but we'll still wait until we
// observe the workflow close (with a watcher) to start the next one.
Expand Down

0 comments on commit a4943bb

Please sign in to comment.