Skip to content

Commit

Permalink
Add deadlines to schedule local activities (#4078)
Browse files Browse the repository at this point in the history
  • Loading branch information
dnr authored and wxing1292 committed Apr 14, 2023
1 parent 7e61ce8 commit d83f065
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 6 deletions.
29 changes: 26 additions & 3 deletions service/worker/scheduler/activities.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"go.temporal.io/server/api/historyservice/v1"
schedspb "go.temporal.io/server/api/schedule/v1"
"go.temporal.io/server/common"
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/primitives/timestamp"
"go.temporal.io/server/common/quotas"
Expand Down Expand Up @@ -138,6 +139,7 @@ func (a *activities) tryWatchWorkflow(ctx context.Context, req *schedspb.WatchWo
// just turn this into a success, with unspecified status
return &schedspb.WatchWorkflowResponse{Status: enumspb.WORKFLOW_EXECUTION_STATUS_UNSPECIFIED}, nil
}
a.Logger.Error("error from PollMutableState", tag.Error(err), tag.WorkflowID(req.Execution.WorkflowId))
return nil, err
}

Expand Down Expand Up @@ -180,11 +182,13 @@ func (a *activities) tryWatchWorkflow(ctx context.Context, req *schedspb.WatchWo
histRes, err := a.FrontendClient.GetWorkflowExecutionHistory(ctx, histReq)

if err != nil {
a.Logger.Error("error from GetWorkflowExecutionHistory", tag.Error(err), tag.WorkflowID(req.Execution.WorkflowId))
return nil, err
}

events := histRes.GetHistory().GetEvents()
if len(events) < 1 {
a.Logger.Error("GetWorkflowExecutionHistory returned no events", tag.WorkflowID(req.Execution.WorkflowId))
return nil, errNoEvents
}
lastEvent := events[0]
Expand Down Expand Up @@ -229,12 +233,21 @@ func (a *activities) tryWatchWorkflow(ctx context.Context, req *schedspb.WatchWo
}

func (a *activities) WatchWorkflow(ctx context.Context, req *schedspb.WatchWorkflowRequest) (*schedspb.WatchWorkflowResponse, error) {
if !req.LongPoll {
// Go SDK currently doesn't set context timeout based on local activity
// StartToCloseTimeout if ScheduleToCloseTimeout is set, so add a timeout here.
// TODO: remove after https://github.com/temporalio/sdk-go/issues/1066
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, defaultLocalActivityOptions.StartToCloseTimeout)
defer cancel()
}

for ctx.Err() == nil {
activity.RecordHeartbeat(ctx)
res, err := a.tryWatchWorkflow(ctx, req)
// long poll should return before our deadline, but even if it doesn't,
// we can still try again within the same activity
if err == errTryAgain || common.IsContextDeadlineExceededErr(err) {
if req.LongPoll && (err == errTryAgain || common.IsContextDeadlineExceededErr(err)) {
continue
}
if newRunID, ok := err.(errFollow); ok {
Expand All @@ -243,10 +256,15 @@ func (a *activities) WatchWorkflow(ctx context.Context, req *schedspb.WatchWorkf
}
return res, translateError(err, "WatchWorkflow")
}
return nil, ctx.Err()
return nil, translateError(ctx.Err(), "WatchWorkflow")
}

func (a *activities) CancelWorkflow(ctx context.Context, req *schedspb.CancelWorkflowRequest) error {
// TODO: remove after https://github.com/temporalio/sdk-go/issues/1066
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, defaultLocalActivityOptions.StartToCloseTimeout)
defer cancel()

rreq := &historyservice.RequestCancelWorkflowExecutionRequest{
NamespaceId: a.namespaceID.String(),
CancelRequest: &workflowservice.RequestCancelWorkflowExecutionRequest{
Expand All @@ -265,6 +283,11 @@ func (a *activities) CancelWorkflow(ctx context.Context, req *schedspb.CancelWor
}

func (a *activities) TerminateWorkflow(ctx context.Context, req *schedspb.TerminateWorkflowRequest) error {
// TODO: remove after https://github.com/temporalio/sdk-go/issues/1066
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, defaultLocalActivityOptions.StartToCloseTimeout)
defer cancel()

rreq := &historyservice.TerminateWorkflowExecutionRequest{
NamespaceId: a.namespaceID.String(),
TerminateRequest: &workflowservice.TerminateWorkflowExecutionRequest{
Expand All @@ -290,7 +313,7 @@ func translateError(err error, msgPrefix string) error {
return nil
}
message := fmt.Sprintf("%s: %s", msgPrefix, err.Error())
if common.IsServiceTransientError(err) {
if common.IsServiceTransientError(err) || common.IsContextDeadlineExceededErr(err) {
return temporal.NewApplicationErrorWithCause(message, errType(err), err)
}
return temporal.NewNonRetryableApplicationError(message, errType(err), err)
Expand Down
6 changes: 3 additions & 3 deletions service/worker/scheduler/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,10 @@ type (

var (
defaultLocalActivityOptions = workflow.LocalActivityOptions{
// This applies to poll, cancel, and terminate. Start workflow overrides this.
// This applies to watch, cancel, and terminate. Start workflow overrides this.
ScheduleToCloseTimeout: 1 * time.Hour,
// We're using the default workflow task timeout of 10s, so limit local activities to 5s.
// We might do up to two per workflow task (cancel previous and start new workflow).
// Each local activity is one or a few local RPCs.
// Currently this is applied manually, see https://github.com/temporalio/sdk-go/issues/1066
StartToCloseTimeout: 5 * time.Second,
RetryPolicy: &temporal.RetryPolicy{
InitialInterval: 1 * time.Second,
Expand Down

0 comments on commit d83f065

Please sign in to comment.