From 5a83f71b286f4629d38467bbf723f3790fe7e39a Mon Sep 17 00:00:00 2001 From: David Reiss Date: Tue, 21 Mar 2023 19:52:52 -0700 Subject: [PATCH] Add deadlines to schedule local activities (#4078) --- service/worker/scheduler/activities.go | 29 +++++++++++++++++++++++--- service/worker/scheduler/workflow.go | 6 +++--- 2 files changed, 29 insertions(+), 6 deletions(-) diff --git a/service/worker/scheduler/activities.go b/service/worker/scheduler/activities.go index d6703ffbb59..35b86304fec 100644 --- a/service/worker/scheduler/activities.go +++ b/service/worker/scheduler/activities.go @@ -42,6 +42,7 @@ import ( "go.temporal.io/server/api/historyservice/v1" schedspb "go.temporal.io/server/api/schedule/v1" "go.temporal.io/server/common" + "go.temporal.io/server/common/log/tag" "go.temporal.io/server/common/namespace" "go.temporal.io/server/common/primitives/timestamp" "go.temporal.io/server/common/quotas" @@ -138,6 +139,7 @@ func (a *activities) tryWatchWorkflow(ctx context.Context, req *schedspb.WatchWo // just turn this into a success, with unspecified status return &schedspb.WatchWorkflowResponse{Status: enumspb.WORKFLOW_EXECUTION_STATUS_UNSPECIFIED}, nil } + a.Logger.Error("error from PollMutableState", tag.Error(err), tag.WorkflowID(req.Execution.WorkflowId)) return nil, err } @@ -180,11 +182,13 @@ func (a *activities) tryWatchWorkflow(ctx context.Context, req *schedspb.WatchWo histRes, err := a.FrontendClient.GetWorkflowExecutionHistory(ctx, histReq) if err != nil { + a.Logger.Error("error from GetWorkflowExecutionHistory", tag.Error(err), tag.WorkflowID(req.Execution.WorkflowId)) return nil, err } events := histRes.GetHistory().GetEvents() if len(events) < 1 { + a.Logger.Error("GetWorkflowExecutionHistory returned no events", tag.WorkflowID(req.Execution.WorkflowId)) return nil, errNoEvents } lastEvent := events[0] @@ -229,12 +233,21 @@ func (a *activities) tryWatchWorkflow(ctx context.Context, req *schedspb.WatchWo } func (a *activities) WatchWorkflow(ctx context.Context, req *schedspb.WatchWorkflowRequest) (*schedspb.WatchWorkflowResponse, error) { + if !req.LongPoll { + // Go SDK currently doesn't set context timeout based on local activity + // StartToCloseTimeout if ScheduleToCloseTimeout is set, so add a timeout here. + // TODO: remove after https://github.com/temporalio/sdk-go/issues/1066 + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, defaultLocalActivityOptions.StartToCloseTimeout) + defer cancel() + } + for ctx.Err() == nil { activity.RecordHeartbeat(ctx) res, err := a.tryWatchWorkflow(ctx, req) // long poll should return before our deadline, but even if it doesn't, // we can still try again within the same activity - if err == errTryAgain || common.IsContextDeadlineExceededErr(err) { + if req.LongPoll && (err == errTryAgain || common.IsContextDeadlineExceededErr(err)) { continue } if newRunID, ok := err.(errFollow); ok { @@ -243,10 +256,15 @@ func (a *activities) WatchWorkflow(ctx context.Context, req *schedspb.WatchWorkf } return res, translateError(err, "WatchWorkflow") } - return nil, ctx.Err() + return nil, translateError(ctx.Err(), "WatchWorkflow") } func (a *activities) CancelWorkflow(ctx context.Context, req *schedspb.CancelWorkflowRequest) error { + // TODO: remove after https://github.com/temporalio/sdk-go/issues/1066 + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, defaultLocalActivityOptions.StartToCloseTimeout) + defer cancel() + rreq := &historyservice.RequestCancelWorkflowExecutionRequest{ NamespaceId: a.namespaceID.String(), CancelRequest: &workflowservice.RequestCancelWorkflowExecutionRequest{ @@ -265,6 +283,11 @@ func (a *activities) CancelWorkflow(ctx context.Context, req *schedspb.CancelWor } func (a *activities) TerminateWorkflow(ctx context.Context, req *schedspb.TerminateWorkflowRequest) error { + // TODO: remove after https://github.com/temporalio/sdk-go/issues/1066 + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, defaultLocalActivityOptions.StartToCloseTimeout) + defer cancel() + rreq := &historyservice.TerminateWorkflowExecutionRequest{ NamespaceId: a.namespaceID.String(), TerminateRequest: &workflowservice.TerminateWorkflowExecutionRequest{ @@ -290,7 +313,7 @@ func translateError(err error, msgPrefix string) error { return nil } message := fmt.Sprintf("%s: %s", msgPrefix, err.Error()) - if common.IsServiceTransientError(err) { + if common.IsServiceTransientError(err) || common.IsContextDeadlineExceededErr(err) { return temporal.NewApplicationErrorWithCause(message, errType(err), err) } return temporal.NewNonRetryableApplicationError(message, errType(err), err) diff --git a/service/worker/scheduler/workflow.go b/service/worker/scheduler/workflow.go index f1133f9dc15..c3073174592 100644 --- a/service/worker/scheduler/workflow.go +++ b/service/worker/scheduler/workflow.go @@ -124,10 +124,10 @@ type ( var ( defaultLocalActivityOptions = workflow.LocalActivityOptions{ - // This applies to poll, cancel, and terminate. Start workflow overrides this. + // This applies to watch, cancel, and terminate. Start workflow overrides this. ScheduleToCloseTimeout: 1 * time.Hour, - // We're using the default workflow task timeout of 10s, so limit local activities to 5s. - // We might do up to two per workflow task (cancel previous and start new workflow). + // Each local activity is one or a few local RPCs. + // Currently this is applied manually, see https://github.com/temporalio/sdk-go/issues/1066 StartToCloseTimeout: 5 * time.Second, RetryPolicy: &temporal.RetryPolicy{ InitialInterval: 1 * time.Second,