Skip to content

Commit

Permalink
dropping non-retryable activity/wf tasks (temporalio#6535)
Browse files Browse the repository at this point in the history
## What changed?
<!-- Describe what has changed in this PR -->
- This PR introduces changes to drop activity + workflow tasks which are
non-retryable due to certain specific errors.

## Why?
<!-- Tell your future self why have you made these changes -->
- Currently, these tasks are being added back to persistence on failure
and end up wasting resources by continuous retries.
- Manual intervention is required to terminate/cancel these workflows. 

## How did you test it?
<!-- How have you verified this change? Tested locally? Added a unit
test? Checked in staging env? -->
- Added unit tests 
- Existing suite of tests


## Is hotfix candidate?
<!-- Is this PR a hotfix candidate or does it require a notification to
be sent to the broader community? (Yes/No) -->
nope
  • Loading branch information
Shivs11 authored Oct 10, 2024
1 parent 83481a8 commit 4976109
Show file tree
Hide file tree
Showing 7 changed files with 159 additions and 6 deletions.
3 changes: 3 additions & 0 deletions common/metrics/metric_defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1030,6 +1030,9 @@ var (
TaskDispatchLatencyPerTaskQueue = NewTimerDef("task_dispatch_latency")
ApproximateBacklogCount = NewGaugeDef("approximate_backlog_count")
ApproximateBacklogAgeSeconds = NewGaugeDef("approximate_backlog_age_seconds")
NonRetryableTasks = NewCounterDef(
"non_retryable_tasks",
WithDescription("The number of non-retryable matching tasks which are dropped due to specific errors"))

// Versioning and Reachability
ReachabilityExitPointCounter = NewCounterDef("reachability_exit_point_count")
Expand Down
8 changes: 8 additions & 0 deletions common/rpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/persistence/serialization"
"go.temporal.io/server/common/rpc/interceptor"
serviceerrors "go.temporal.io/server/common/serviceerror"
"google.golang.org/grpc"
Expand Down Expand Up @@ -156,6 +157,13 @@ func ServiceErrorInterceptor(
) (interface{}, error) {

resp, err := handler(ctx, req)

var deserializationError *serialization.DeserializationError
var serializationError *serialization.SerializationError
// convert serialization errors to be captured as serviceerrors across gRPC calls
if errors.As(err, &deserializationError) || errors.As(err, &serializationError) {
err = serviceerror.NewDataLoss(err.Error())
}
return resp, serviceerror.ToStatus(err).Err()
}

Expand Down
5 changes: 3 additions & 2 deletions service/history/api/get_history_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/persistence"
"go.temporal.io/server/common/persistence/serialization"
"go.temporal.io/server/common/persistence/visibility/manager"
"go.temporal.io/server/common/rpc/interceptor"
"go.temporal.io/server/common/searchattribute"
Expand Down Expand Up @@ -130,9 +131,9 @@ func GetHistory(
switch err.(type) {
case nil:
// noop
case *serviceerror.DataLoss:
case *serviceerror.DataLoss, *serialization.DeserializationError, *serialization.SerializationError:
// log event
shard.GetLogger().Error("encountered data loss event",
shard.GetLogger().Error("encountered data corruption event",
tag.WorkflowNamespaceID(namespaceID.String()),
tag.WorkflowID(execution.GetWorkflowId()),
tag.WorkflowRunID(execution.GetRunId()),
Expand Down
5 changes: 3 additions & 2 deletions service/history/events/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/persistence"
"go.temporal.io/server/common/persistence/serialization"
"go.temporal.io/server/service/history/configs"
)

Expand Down Expand Up @@ -212,9 +213,9 @@ func (e *CacheImpl) getHistoryEventFromStore(
switch err.(type) {
case nil:
// noop
case *serviceerror.DataLoss:
case *serviceerror.DataLoss, *serialization.DeserializationError, *serialization.SerializationError:
// log event
e.logger.Error("encounter data loss event",
e.logger.Error("encounter data corruption event",
tag.WorkflowNamespaceID(key.NamespaceID.String()),
tag.WorkflowID(key.WorkflowID),
tag.WorkflowRunID(key.RunID))
Expand Down
2 changes: 1 addition & 1 deletion service/history/workflow/mutable_state_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -2361,7 +2361,7 @@ func (ms *MutableStateImpl) ApplyWorkflowExecutionStartedEvent(
return nil
}

// AddFirstWorkflowTaskScheduled adds the first workflow task scehduled event unless it should be delayed as indicated
// AddFirstWorkflowTaskScheduled adds the first workflow task scheduled event unless it should be delayed as indicated
// by the startEvent's FirstWorkflowTaskBackoff.
// Returns the workflow task's scheduled event ID if a task was scheduled, 0 otherwise.
func (ms *MutableStateImpl) AddFirstWorkflowTaskScheduled(
Expand Down
24 changes: 24 additions & 0 deletions service/matching/matching_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -615,6 +615,10 @@ pollLoop:
resp, err := e.recordWorkflowTaskStarted(ctx, requestClone, task)
if err != nil {
switch err.(type) {
case *serviceerror.Internal, *serviceerror.DataLoss:
e.nonRetryableErrorsDropTask(task, taskQueueName, err)
// drop the task as otherwise task would be stuck in a retry-loop
task.finish(nil)
case *serviceerror.NotFound: // mutable state not found, workflow not running or workflow task not found
e.logger.Info("Workflow task not found",
tag.WorkflowTaskQueueName(taskQueueName),
Expand Down Expand Up @@ -703,6 +707,22 @@ func (e *matchingEngineImpl) getHistoryForQueryTask(
return hist, resp.GetResponse().GetNextPageToken(), err
}

func (e *matchingEngineImpl) nonRetryableErrorsDropTask(task *internalTask, taskQueueName string, err error) {
e.logger.Error("dropping task due to non-nonretryable errors",
tag.WorkflowNamespace(task.namespace.String()),
tag.WorkflowNamespaceID(task.event.Data.GetNamespaceId()),
tag.WorkflowID(task.event.Data.GetWorkflowId()),
tag.WorkflowRunID(task.event.Data.GetRunId()),
tag.WorkflowTaskQueueName(taskQueueName),
tag.TaskID(task.event.GetTaskId()),
tag.WorkflowEventID(task.event.Data.GetScheduledEventId()),
tag.Error(err),
tag.ErrorType(err),
)

metrics.NonRetryableTasks.With(e.metricsHandler).Record(1, metrics.ServiceErrorTypeTag(err))
}

// PollActivityTaskQueue takes one task from the task manager, update workflow execution history, mark task as
// completed and return it to user. If a task from task manager is already started, return an empty response, without
// error. Timeouts handled by the timer queue.
Expand Down Expand Up @@ -760,6 +780,10 @@ pollLoop:
resp, err := e.recordActivityTaskStarted(ctx, requestClone, task)
if err != nil {
switch err.(type) {
case *serviceerror.Internal, *serviceerror.DataLoss:
e.nonRetryableErrorsDropTask(task, taskQueueName, err)
// drop the task as otherwise task would be stuck in a retry-loop
task.finish(nil)
case *serviceerror.NotFound: // mutable state not found, workflow not running or activity info not found
e.logger.Info("Activity task not found",
tag.WorkflowNamespaceID(task.event.Data.GetNamespaceId()),
Expand Down
118 changes: 117 additions & 1 deletion service/matching/matching_engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ import (
"google.golang.org/protobuf/types/known/wrapperspb"
)

// static error used in tests
var randomTestError = errors.New("random error")

type (
matchingEngineSuite struct {
suite.Suite
Expand Down Expand Up @@ -470,7 +473,7 @@ func (s *matchingEngineSuite) TestFailAddTaskWithHistoryExhausted() {
func (s *matchingEngineSuite) TestFailAddTaskWithHistoryError() {
historyError := serviceerror.NewInternal("nothing to start")
tqName := "testFailAddTaskWithHistoryError"
s.testFailAddTaskWithHistoryError(tqName, true, historyError, historyError)
s.testFailAddTaskWithHistoryError(tqName, true, historyError, nil) // expectedError shall be nil since history drops the task
}

func (s *matchingEngineSuite) testFailAddTaskWithHistoryError(
Expand Down Expand Up @@ -687,6 +690,119 @@ func (s *matchingEngineSuite) TestPollWorkflowTaskQueues_NamespaceHandover() {
s.Equal(common.ErrNamespaceHandover.Error(), err.Error())
}

func (s *matchingEngineSuite) TestPollActivityTaskQueues_InternalError() {
namespaceId := uuid.New()
tl := "queue"
taskQueue := &taskqueuepb.TaskQueue{Name: "queue", Kind: enumspb.TASK_QUEUE_KIND_NORMAL}

addRequest := matchingservice.AddActivityTaskRequest{
NamespaceId: namespaceId,
Execution: &commonpb.WorkflowExecution{WorkflowId: "workflowID", RunId: uuid.NewRandom().String()},
ScheduledEventId: int64(5),
TaskQueue: taskQueue,
ScheduleToStartTimeout: timestamp.DurationFromSeconds(0),
}

// add an activity task
_, _, err := s.matchingEngine.AddActivityTask(context.Background(), &addRequest)
s.NoError(err)
s.EqualValues(s.taskManager.getTaskCount(newUnversionedRootQueueKey(namespaceId, tl, enumspb.TASK_QUEUE_TYPE_ACTIVITY)), 1)

// task is dropped with no retry; RecordActivityTaskStarted should only be called once
s.mockHistoryClient.EXPECT().RecordActivityTaskStarted(gomock.Any(), gomock.Any(), gomock.Any()).
Return(nil, serviceerror.NewInternal("Internal error")).Times(1)
resp, err := s.matchingEngine.PollActivityTaskQueue(context.Background(), &matchingservice.PollActivityTaskQueueRequest{
NamespaceId: namespaceId,
PollRequest: &workflowservice.PollActivityTaskQueueRequest{
TaskQueue: taskQueue,
Identity: "identity",
},
}, metrics.NoopMetricsHandler)
s.EqualValues(emptyPollActivityTaskQueueResponse, resp)
s.NoError(err)
}

func (s *matchingEngineSuite) TestPollActivityTaskQueues_DataLossError() {
namespaceId := uuid.New()
tl := "queue"
taskQueue := &taskqueuepb.TaskQueue{Name: "queue", Kind: enumspb.TASK_QUEUE_KIND_NORMAL}

addRequest := matchingservice.AddActivityTaskRequest{
NamespaceId: namespaceId,
Execution: &commonpb.WorkflowExecution{WorkflowId: "workflowID", RunId: uuid.NewRandom().String()},
ScheduledEventId: int64(5),
TaskQueue: taskQueue,
ScheduleToStartTimeout: timestamp.DurationFromSeconds(0),
}

// add an activity task
_, _, err := s.matchingEngine.AddActivityTask(context.Background(), &addRequest)
s.NoError(err)
s.EqualValues(s.taskManager.getTaskCount(newUnversionedRootQueueKey(namespaceId, tl, enumspb.TASK_QUEUE_TYPE_ACTIVITY)), 1)

// task is dropped with no retry; RecordActivityTaskStarted should only be called once
s.mockHistoryClient.EXPECT().RecordActivityTaskStarted(gomock.Any(), gomock.Any(), gomock.Any()).
Return(nil, serviceerror.NewDataLoss("DataLoss Error")).Times(1)

resp, err := s.matchingEngine.PollActivityTaskQueue(context.Background(), &matchingservice.PollActivityTaskQueueRequest{
NamespaceId: namespaceId,
PollRequest: &workflowservice.PollActivityTaskQueueRequest{
TaskQueue: taskQueue,
Identity: "identity",
},
}, metrics.NoopMetricsHandler)
s.EqualValues(emptyPollActivityTaskQueueResponse, resp)
s.NoError(err)
}

func (s *matchingEngineSuite) TestPollWorkflowTaskQueues_InternalError() {
tqName := "queue"
taskQueue := &taskqueuepb.TaskQueue{Name: tqName, Kind: enumspb.TASK_QUEUE_KIND_NORMAL}
wfExecution := &commonpb.WorkflowExecution{WorkflowId: "workflowID", RunId: uuid.NewRandom().String()}

// add a wf task
s.addWorkflowTask(wfExecution, taskQueue)
s.EqualValues(1, s.taskManager.getTaskCount(newUnversionedRootQueueKey(namespaceId, tqName, enumspb.TASK_QUEUE_TYPE_WORKFLOW)))

// task is dropped with no retry; RecordWorkflowTaskStarted should only be called once
s.mockHistoryClient.EXPECT().RecordWorkflowTaskStarted(gomock.Any(), gomock.Any(), gomock.Any()).
Return(nil, serviceerror.NewInternal("internal error")).Times(1)

resp, err := s.matchingEngine.PollWorkflowTaskQueue(context.Background(), &matchingservice.PollWorkflowTaskQueueRequest{
NamespaceId: namespaceId,
PollRequest: &workflowservice.PollWorkflowTaskQueueRequest{
TaskQueue: taskQueue,
Identity: "identity",
},
}, metrics.NoopMetricsHandler)
s.EqualValues(emptyPollWorkflowTaskQueueResponse, resp)
s.NoError(err)
}

func (s *matchingEngineSuite) TestPollWorkflowTaskQueues_DataLossError() {
tqName := "queue"
taskQueue := &taskqueuepb.TaskQueue{Name: tqName, Kind: enumspb.TASK_QUEUE_KIND_NORMAL}
wfExecution := &commonpb.WorkflowExecution{WorkflowId: "workflowID", RunId: uuid.NewRandom().String()}

// add a wf task
s.addWorkflowTask(wfExecution, taskQueue)
s.EqualValues(1, s.taskManager.getTaskCount(newUnversionedRootQueueKey(namespaceId, tqName, enumspb.TASK_QUEUE_TYPE_WORKFLOW)))

// task is dropped with no retry; RecordWorkflowTaskStarted should only be called once
s.mockHistoryClient.EXPECT().RecordWorkflowTaskStarted(gomock.Any(), gomock.Any(), gomock.Any()).
Return(nil, serviceerror.NewDataLoss("DataLoss error")).Times(1)

resp, err := s.matchingEngine.PollWorkflowTaskQueue(context.Background(), &matchingservice.PollWorkflowTaskQueueRequest{
NamespaceId: namespaceId,
PollRequest: &workflowservice.PollWorkflowTaskQueueRequest{
TaskQueue: taskQueue,
Identity: "identity",
},
}, metrics.NoopMetricsHandler)
s.EqualValues(emptyPollWorkflowTaskQueueResponse, resp)
s.NoError(err)
}

func (s *matchingEngineSuite) TestPollActivityTaskQueues_NamespaceHandover() {
namespaceId := uuid.New()
taskQueue := &taskqueuepb.TaskQueue{Name: "queue", Kind: enumspb.TASK_QUEUE_KIND_NORMAL}
Expand Down

0 comments on commit 4976109

Please sign in to comment.