diff --git a/go.mod b/go.mod index 713b813d1320..8463c05c3c9d 100644 --- a/go.mod +++ b/go.mod @@ -105,7 +105,7 @@ require ( github.com/mattn/go-runewidth v0.0.14 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect github.com/opentracing/opentracing-go v1.2.0 // indirect - github.com/pkg/errors v0.9.1 // indirect + github.com/pkg/errors v0.9.1 github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/client_model v0.4.0 github.com/prometheus/common v0.44.0 diff --git a/service/worker/migration/activities.go b/service/worker/migration/activities.go index d2bde33200c2..aa50165a0e0b 100644 --- a/service/worker/migration/activities.go +++ b/service/worker/migration/activities.go @@ -31,6 +31,7 @@ import ( "sort" "time" + "github.com/pkg/errors" commonpb "go.temporal.io/api/common/v1" replicationpb "go.temporal.io/api/replication/v1" "go.temporal.io/api/serviceerror" @@ -112,6 +113,13 @@ func (r VerifyResult) isCompleted() bool { return r.isVerified() || r.isSkipped() } +func (e verifyReplicationTasksTimeoutErr) Error() string { + return fmt.Sprintf("verifyReplicationTasks was not able to make progress for more than %v minutes (retryable). Not found WorkflowExecution: %v,", + e.timeout, + e.details.LastNotFoundWorkflowExecution, + ) +} + // TODO: CallerTypePreemptablee should be set in activity background context for all migration activities. // However, activity background context is per-worker, which means once set, all activities processed by the // worker will use CallerTypePreemptable, including those not related to migration. This is not ideal. @@ -645,20 +653,13 @@ func (a *activities) verifyReplicationTasks( return false, progress, nil default: - return false, progress, err + return false, progress, errors.WithMessage(err, "remoteClient.DescribeMutableState call failed") } } return true, progress, nil } -func (e verifyReplicationTasksTimeoutErr) Error() string { - return fmt.Sprintf("verifyReplicationTasks was not able to make progress for more than %v minutes (retryable). Not found WorkflowExecution: %v,", - e.timeout, - e.details.LastNotFoundWorkflowExecution, - ) -} - const ( defaultNoProgressRetryableTimeout = 5 * time.Minute defaultNoProgressNotRetryableTimeout = 15 * time.Minute diff --git a/service/worker/migration/force_replication_workflow.go b/service/worker/migration/force_replication_workflow.go index b08c40cb7322..501049cba3ae 100644 --- a/service/worker/migration/force_replication_workflow.go +++ b/service/worker/migration/force_replication_workflow.go @@ -371,6 +371,7 @@ func enqueueReplicationTasks(ctx workflow.Context, workflowExecutionsCh workflow var a *activities var futures []workflow.Future var workflowExecutions []commonpb.WorkflowExecution + var lastActivityErr error for workflowExecutionsCh.Receive(ctx, &workflowExecutions) { var replicationTaskFuture workflow.Future @@ -394,10 +395,17 @@ func enqueueReplicationTasks(ctx workflow.Context, workflowExecutionsCh workflow pendingActivities++ selector.AddFuture(replicationTaskFuture, func(f workflow.Future) { pendingActivities-- + + if err := f.Get(ctx, nil); err != nil { + lastActivityErr = err + } }) - if pendingActivities == params.ConcurrentActivityCount { + if pendingActivities >= params.ConcurrentActivityCount { selector.Select(ctx) // this will block until one of the in-flight activities completes + if lastActivityErr != nil { + return lastActivityErr + } } futures = append(futures, replicationTaskFuture) diff --git a/service/worker/migration/force_replication_workflow_test.go b/service/worker/migration/force_replication_workflow_test.go index 94e0ce94849a..cbdfbfd5cea6 100644 --- a/service/worker/migration/force_replication_workflow_test.go +++ b/service/worker/migration/force_replication_workflow_test.go @@ -210,7 +210,7 @@ func TestForceReplicationWorkflow_ListWorkflowsError(t *testing.T) { env.AssertExpectations(t) } -func TestForceReplicationWorkflow_GenerateReplicationTaskError(t *testing.T) { +func TestForceReplicationWorkflow_GenerateReplicationTaskRetryableError(t *testing.T) { testSuite := &testsuite.WorkflowTestSuite{} env := testSuite.NewTestWorkflowEnvironment() @@ -258,6 +258,58 @@ func TestForceReplicationWorkflow_GenerateReplicationTaskError(t *testing.T) { env.AssertExpectations(t) } +func TestForceReplicationWorkflow_GenerateReplicationTaskNonRetryableError(t *testing.T) { + testSuite := &testsuite.WorkflowTestSuite{} + env := testSuite.NewTestWorkflowEnvironment() + + namespaceID := uuid.New() + + var a *activities + env.OnActivity(a.GetMetadata, mock.Anything, metadataRequest{Namespace: "test-ns"}).Return(&metadataResponse{ShardCount: 4, NamespaceID: namespaceID}, nil) + + totalPageCount := 4 + currentPageCount := 0 + env.OnActivity(a.ListWorkflows, mock.Anything, mock.Anything).Return(func(ctx context.Context, request *workflowservice.ListWorkflowExecutionsRequest) (*listWorkflowsResponse, error) { + assert.Equal(t, "test-ns", request.Namespace) + currentPageCount++ + if currentPageCount < totalPageCount { + return &listWorkflowsResponse{ + Executions: []commonpb.WorkflowExecution{}, + NextPageToken: []byte("fake-page-token"), + }, nil + } + // your mock function implementation + return &listWorkflowsResponse{ + Executions: []commonpb.WorkflowExecution{}, + NextPageToken: nil, // last page + }, nil + }) + + // Only expect GenerateReplicationTasks to execute once and workflow will then fail because of + // non-retryable error. + env.OnActivity(a.GenerateReplicationTasks, mock.Anything, mock.Anything).Return( + temporal.NewNonRetryableApplicationError("mock generate replication tasks error", "", nil), + ).Times(1) + + env.RegisterWorkflow(ForceTaskQueueUserDataReplicationWorkflow) + env.OnActivity(a.SeedReplicationQueueWithUserDataEntries, mock.Anything, mock.Anything).Return(nil) + + env.ExecuteWorkflow(ForceReplicationWorkflow, ForceReplicationParams{ + Namespace: "test-ns", + Query: "", + ConcurrentActivityCount: 1, + OverallRps: 10, + ListWorkflowsPageSize: 1, + PageCountPerExecution: 4, + }) + + require.True(t, env.IsWorkflowCompleted()) + err := env.GetWorkflowError() + require.Error(t, err) + require.Contains(t, err.Error(), "mock generate replication tasks error") + env.AssertExpectations(t) +} + func TestForceReplicationWorkflow_TaskQueueReplicationFailure(t *testing.T) { testSuite := &testsuite.WorkflowTestSuite{} env := testSuite.NewTestWorkflowEnvironment()