Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add metrics + max buffer size to schedule workflow #3872

Merged
merged 2 commits into from
Feb 3, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions common/metrics/metric_defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1311,6 +1311,12 @@ const (
TaskTypeTimerStandbyTaskDeleteHistoryEvent = "TimerStandbyTaskDeleteHistoryEvent"
)

// Schedule action types
const (
ScheduleActionTypeTag = "schedule_action"
ScheduleActionStartWorkflow = "start_workflow"
)

var (
ServiceRequests = NewCounterDef("service_requests")
ServicePendingRequests = NewGaugeDef("service_pending_requests")
Expand Down Expand Up @@ -1703,6 +1709,13 @@ var (
NamespaceReplicationEnqueueDLQCount = NewCounterDef("namespace_replication_dlq_enqueue_requests")
ParentClosePolicyProcessorSuccess = NewCounterDef("parent_close_policy_processor_requests")
ParentClosePolicyProcessorFailures = NewCounterDef("parent_close_policy_processor_errors")
ScheduleMissedCatchupWindow = NewCounterDef("schedule_missed_catchup_window")
ScheduleRateLimited = NewCounterDef("schedule_rate_limited")
ScheduleBufferOverruns = NewCounterDef("schedule_buffer_overruns")
ScheduleActionSuccess = NewCounterDef("schedule_action_success")
ScheduleActionErrors = NewCounterDef("schedule_action_errors")
ScheduleCancelWorkflowErrors = NewCounterDef("schedule_cancel_workflow_errors")
ScheduleTerminateWorkflowErrors = NewCounterDef("schedule_terminate_workflow_errors")

// Replication
NamespaceReplicationTaskAckLevelGauge = NewGaugeDef("namespace_replication_task_ack_level")
Expand Down
47 changes: 33 additions & 14 deletions service/worker/scheduler/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,13 @@ import (
schedpb "go.temporal.io/api/schedule/v1"
workflowpb "go.temporal.io/api/workflow/v1"
"go.temporal.io/api/workflowservice/v1"
sdkclient "go.temporal.io/sdk/client"
sdklog "go.temporal.io/sdk/log"
"go.temporal.io/sdk/temporal"
"go.temporal.io/sdk/workflow"

schedspb "go.temporal.io/server/api/schedule/v1"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/payload"
"go.temporal.io/server/common/primitives/timestamp"
"go.temporal.io/server/common/searchattribute"
Expand Down Expand Up @@ -81,9 +83,10 @@ type (
scheduler struct {
schedspb.StartScheduleArgs

ctx workflow.Context
a *activities
logger sdklog.Logger
ctx workflow.Context
a *activities
logger sdklog.Logger
metrics sdkclient.MetricsHandler

cspec *CompiledSpec

Expand Down Expand Up @@ -111,6 +114,9 @@ type (
RecentActionCountForList int // The number of recent actual action results to include in List (search attr).
IterationsBeforeContinueAsNew int
SleepWhilePaused bool // If true, don't set timers while paused/out of actions
// MaxBufferSize limits the number of buffered starts. This also limits the number of
// workflows that can be backfilled at once (since they all have to fit in the buffer).
MaxBufferSize int
}
)

Expand Down Expand Up @@ -141,6 +147,7 @@ var (
RecentActionCountForList: 5,
IterationsBeforeContinueAsNew: 500,
SleepWhilePaused: true,
MaxBufferSize: 1000,
}

errUpdateConflict = errors.New("conflicting concurrent update")
Expand All @@ -151,13 +158,14 @@ func SchedulerWorkflow(ctx workflow.Context, args *schedspb.StartScheduleArgs) e
StartScheduleArgs: *args,
ctx: ctx,
a: nil,
logger: sdklog.With(workflow.GetLogger(ctx), "schedule-id", args.State.ScheduleId),
logger: sdklog.With(workflow.GetLogger(ctx), "wf-namespace", args.State.Namespace, "schedule-id", args.State.ScheduleId),
metrics: workflow.GetMetricsHandler(ctx).WithTags(map[string]string{"namespace": args.State.Namespace}),
}
return scheduler.run()
}

func (s *scheduler) run() error {
s.logger.Info("Schedule starting", "schedule", s.Schedule)
s.logger.Debug("Schedule starting", "schedule", s.Schedule)

s.updateTweakables()
s.ensureFields()
Expand Down Expand Up @@ -218,7 +226,7 @@ func (s *scheduler) run() error {

// Any watcher activities will get cancelled automatically if running.

s.logger.Info("Schedule doing continue-as-new")
s.logger.Debug("Schedule doing continue-as-new")
return workflow.NewContinueAsNewError(s.ctx, WorkflowType, &s.StartScheduleArgs)
}

Expand Down Expand Up @@ -287,7 +295,7 @@ func (s *scheduler) now() time.Time {
}

func (s *scheduler) processPatch(patch *schedpb.SchedulePatch) {
s.logger.Info("Schedule patch", "patch", patch.String())
s.logger.Debug("Schedule patch", "patch", patch.String())

if trigger := patch.TriggerImmediately; trigger != nil {
now := s.now()
Expand Down Expand Up @@ -343,6 +351,7 @@ func (s *scheduler) processTimeRange(
}
if !manual && t2.Sub(t1) > catchupWindow {
s.logger.Warn("Schedule missed catchup window", "now", t2, "time", t1)
s.metrics.Counter(metrics.ScheduleMissedCatchupWindow.GetMetricName()).Inc(1)
s.Info.MissedCatchupWindow++
continue
}
Expand Down Expand Up @@ -456,10 +465,10 @@ func (s *scheduler) processWatcherResult(id string, f workflow.Future) {
s.Schedule.State.Paused = true
if res.Status == enumspb.WORKFLOW_EXECUTION_STATUS_FAILED {
s.Schedule.State.Notes = fmt.Sprintf("paused due to workflow failure: %s: %s", id, res.GetFailure().GetMessage())
s.logger.Info("paused due to workflow failure", "workflow", id, "message", res.GetFailure().GetMessage())
s.logger.Debug("paused due to workflow failure", "workflow", id, "message", res.GetFailure().GetMessage())
} else if res.Status == enumspb.WORKFLOW_EXECUTION_STATUS_TIMED_OUT {
s.Schedule.State.Notes = fmt.Sprintf("paused due to workflow timeout: %s", id)
s.logger.Info("paused due to workflow timeout", "workflow", id)
s.logger.Debug("paused due to workflow timeout", "workflow", id)
}
s.incSeqNo()
}
Expand All @@ -473,7 +482,7 @@ func (s *scheduler) processWatcherResult(id string, f workflow.Future) {
s.State.ContinuedFailure = res.GetFailure()
}

s.logger.Info("started workflow finished", "workflow", id, "status", res.Status, "pause-after-failure", pauseOnFailure)
s.logger.Debug("started workflow finished", "workflow", id, "status", res.Status, "pause-after-failure", pauseOnFailure)
}

func (s *scheduler) processUpdate(req *schedspb.FullUpdateRequest) {
Expand All @@ -482,7 +491,7 @@ func (s *scheduler) processUpdate(req *schedspb.FullUpdateRequest) {
return
}

s.logger.Info("Schedule update", "new-schedule", req.Schedule.String())
s.logger.Debug("Schedule update", "new-schedule", req.Schedule.String())

s.Schedule.Spec = req.Schedule.GetSpec()
s.Schedule.Action = req.Schedule.GetAction()
Expand Down Expand Up @@ -673,6 +682,11 @@ func (s *scheduler) resolveOverlapPolicy(overlapPolicy enumspb.ScheduleOverlapPo

func (s *scheduler) addStart(nominalTime, actualTime time.Time, overlapPolicy enumspb.ScheduleOverlapPolicy, manual bool) {
s.logger.Debug("addStart", "nominal", nominalTime, "actual", actualTime, "overlapPolicy", overlapPolicy, "manual", manual)
if s.tweakables.MaxBufferSize > 0 && len(s.State.BufferedStarts) >= s.tweakables.MaxBufferSize {
s.logger.Error("Buffer too large")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add some detail for better debugging?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what do you think would be useful here? it's tagged with namespace and schedule id already, which should be enough for someone to find the workload that's causing this. I could add the start time, overlap policy, manual?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could add schedule spec, start time, buffer size etc.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little reluctant to just dump the spec in here since it might be large, and this log might end up repeated a lot. Maybe we should log it once per schedule start so it can be referenced?

Buffer size will always just be the limit here so that's not that useful.

I can log the other args I guess.

s.metrics.Counter(metrics.ScheduleBufferOverruns.GetMetricName()).Inc(1)
return
}
s.State.BufferedStarts = append(s.State.BufferedStarts, &schedspb.BufferedStart{
NominalTime: timestamp.TimePtr(nominalTime),
ActualTime: timestamp.TimePtr(actualTime),
Expand Down Expand Up @@ -727,14 +741,18 @@ func (s *scheduler) processBuffer() bool {
continue
}
result, err := s.startWorkflow(start, req)
metricsWithTag := s.metrics.WithTags(map[string]string{
metrics.ScheduleActionTypeTag: metrics.ScheduleActionStartWorkflow})
if err != nil {
s.logger.Error("Failed to start workflow", "error", err)
metricsWithTag.Counter(metrics.ScheduleActionErrors.GetMetricName()).Inc(1)
// TODO: we could put this back in the buffer and retry (after a delay) up until
// the catchup window. of course, it's unlikely that this workflow would be making
// progress while we're unable to start a new one, so maybe it's not that valuable.
tryAgain = true
continue
}
metricsWithTag.Counter(metrics.ScheduleActionSuccess.GetMetricName()).Inc(1)
s.recordAction(result)
}

Expand Down Expand Up @@ -826,9 +844,8 @@ func (s *scheduler) startWorkflow(

var appErr *temporal.ApplicationError
var details rateLimitedDetails
if errors.As(err, &appErr) &&
appErr.Type() == rateLimitedErrorType &&
appErr.Details(&details) == nil {
if errors.As(err, &appErr) && appErr.Type() == rateLimitedErrorType && appErr.Details(&details) == nil {
s.metrics.Counter(metrics.ScheduleRateLimited.GetMetricName()).Inc(1)
workflow.Sleep(s.ctx, details.Delay)
req.CompletedRateLimitSleep = true // only use rate limiter once
continue
Expand Down Expand Up @@ -920,6 +937,7 @@ func (s *scheduler) cancelWorkflow(ex *commonpb.WorkflowExecution) {
err := workflow.ExecuteLocalActivity(ctx, s.a.CancelWorkflow, areq).Get(s.ctx, nil)
if err != nil {
s.logger.Error("cancel workflow failed", "workflow", ex.WorkflowId, "error", err)
s.metrics.Counter(metrics.ScheduleCancelWorkflowErrors.GetMetricName()).Inc(1)
}
// Note: the local activity has completed (or failed) here but the workflow might take time
// to close since a cancel is only a request.
Expand All @@ -937,6 +955,7 @@ func (s *scheduler) terminateWorkflow(ex *commonpb.WorkflowExecution) {
err := workflow.ExecuteLocalActivity(ctx, s.a.TerminateWorkflow, areq).Get(s.ctx, nil)
if err != nil {
s.logger.Error("terminate workflow failed", "workflow", ex.WorkflowId, "error", err)
s.metrics.Counter(metrics.ScheduleTerminateWorkflowErrors.GetMetricName()).Inc(1)
}
// Note: the local activity has completed (or failed) here but we'll still wait until we
// observe the workflow close (with a watcher) to start the next one.
Expand Down