diff --git a/go.mod b/go.mod index 3da3b577c6d..043f58742cd 100644 --- a/go.mod +++ b/go.mod @@ -44,7 +44,7 @@ require ( go.opentelemetry.io/otel/metric v0.36.0 go.opentelemetry.io/otel/sdk v1.13.0 go.opentelemetry.io/otel/sdk/metric v0.36.0 - go.temporal.io/api v1.19.0 + go.temporal.io/api v1.19.1-0.20230321175928-34293b5611df go.temporal.io/sdk v1.21.2-0.20230313212213-3d009e0d1f24 go.temporal.io/version v0.3.0 go.uber.org/atomic v1.10.0 @@ -128,8 +128,8 @@ require ( golang.org/x/tools v0.6.0 // indirect golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect google.golang.org/appengine v1.6.7 // indirect - google.golang.org/genproto v0.0.0-20230306155012-7f2fa6fef1f4 // indirect - google.golang.org/protobuf v1.29.0 // indirect + google.golang.org/genproto v0.0.0-20230320184635-7606e756e683 // indirect + google.golang.org/protobuf v1.30.0 // indirect gopkg.in/inf.v0 v0.9.1 // indirect lukechampine.com/uint128 v1.2.0 // indirect modernc.org/cc/v3 v3.40.0 // indirect diff --git a/go.sum b/go.sum index e0980262453..91733f101f3 100644 --- a/go.sum +++ b/go.sum @@ -1031,8 +1031,9 @@ go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqe go.opentelemetry.io/proto/otlp v0.15.0/go.mod h1:H7XAot3MsfNsj7EXtrA2q5xSNQ10UqI405h3+duxN4U= go.opentelemetry.io/proto/otlp v0.19.0 h1:IVN6GR+mhC4s5yfcTbmzHYODqvWAp3ZedA2SJPI1Nnw= go.opentelemetry.io/proto/otlp v0.19.0/go.mod h1:H7XAot3MsfNsj7EXtrA2q5xSNQ10UqI405h3+duxN4U= -go.temporal.io/api v1.19.0 h1:/U7H3yh6Ufb0+Q6Ph/QgYwNRIiO2lbbvETz1xjv+4b4= go.temporal.io/api v1.19.0/go.mod h1:cY1oHaK9CmbSPs+pS/kfmYFZc18+Eu7LYTo/mFdTilc= +go.temporal.io/api v1.19.1-0.20230321175928-34293b5611df h1:CSHeGo25aeFSz3MykI9HvFALkmn+Vv68W+K1VejXMQc= +go.temporal.io/api v1.19.1-0.20230321175928-34293b5611df/go.mod h1:cb7wj4dwfSCJmrzAcnkOXrQjFIEh3Ag3qeTurt+mUyw= go.temporal.io/sdk v1.21.2-0.20230313212213-3d009e0d1f24 h1:bDmrH+Biej7YQkCbOL6JRCeeQK+xLLLgvshT6OObF/w= go.temporal.io/sdk v1.21.2-0.20230313212213-3d009e0d1f24/go.mod h1:ZIzYuws6VNiwRPVzkbuV7SqlvK0cZdY/gw+jY+fB9iY= go.temporal.io/version v0.3.0 h1:dMrei9l9NyHt8nG6EB8vAwDLLTwx2SvRyucCSumAiig= @@ -1621,8 +1622,9 @@ google.golang.org/genproto v0.0.0-20230127162408-596548ed4efa/go.mod h1:RGgjbofJ google.golang.org/genproto v0.0.0-20230209215440-0dfe4f8abfcc/go.mod h1:RGgjbofJ8xD9Sq1VVhDM1Vok1vRONV+rg+CjzG4SZKM= google.golang.org/genproto v0.0.0-20230216225411-c8e22ba71e44/go.mod h1:8B0gmkoRebU8ukX6HP+4wrVQUY1+6PkQ44BSyIlflHA= google.golang.org/genproto v0.0.0-20230222225845-10f96fb3dbec/go.mod h1:3Dl5ZL0q0isWJt+FVcfpQyirqemEuLAK/iFvg1UP1Hw= -google.golang.org/genproto v0.0.0-20230306155012-7f2fa6fef1f4 h1:DdoeryqhaXp1LtT/emMP1BRJPHHKFi5akj/nbx/zNTA= google.golang.org/genproto v0.0.0-20230306155012-7f2fa6fef1f4/go.mod h1:NWraEVixdDnqcqQ30jipen1STv2r/n24Wb7twVTGR4s= +google.golang.org/genproto v0.0.0-20230320184635-7606e756e683 h1:khxVcsk/FhnzxMKOyD+TDGwjbEOpcPuIpmafPGFmhMA= +google.golang.org/genproto v0.0.0-20230320184635-7606e756e683/go.mod h1:NWraEVixdDnqcqQ30jipen1STv2r/n24Wb7twVTGR4s= google.golang.org/grpc v1.12.0/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38= @@ -1681,8 +1683,9 @@ google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQ google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= -google.golang.org/protobuf v1.29.0 h1:44S3JjaKmLEE4YIkjzexaP+NzZsudE3Zin5Njn/pYX0= google.golang.org/protobuf v1.29.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= +google.golang.org/protobuf v1.30.0 h1:kPPoIgf3TsEvrm0PFe15JQ+570QVxYzEvvHqChK+cng= +google.golang.org/protobuf v1.30.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/proto/api b/proto/api index 9206f155646..fb1bc36da55 160000 --- a/proto/api +++ b/proto/api @@ -1 +1 @@ -Subproject commit 9206f1556469515d8e5a82c4d4bf21f2dd9730ce +Subproject commit fb1bc36da55a194d03d0f32f6ba43736d766b653 diff --git a/service/history/transferQueueActiveTaskExecutor.go b/service/history/transferQueueActiveTaskExecutor.go index 488a224cebd..e470b171cae 100644 --- a/service/history/transferQueueActiveTaskExecutor.go +++ b/service/history/transferQueueActiveTaskExecutor.go @@ -656,6 +656,8 @@ func (t *transferQueueActiveTaskExecutor) processSignalExecution( failedCause = enumspb.SIGNAL_EXTERNAL_WORKFLOW_EXECUTION_FAILED_CAUSE_EXTERNAL_WORKFLOW_EXECUTION_NOT_FOUND case *serviceerror.NamespaceNotFound: failedCause = enumspb.SIGNAL_EXTERNAL_WORKFLOW_EXECUTION_FAILED_CAUSE_NAMESPACE_NOT_FOUND + case *serviceerror.InvalidArgument: + failedCause = enumspb.SIGNAL_EXTERNAL_WORKFLOW_EXECUTION_FAILED_CAUSE_SIGNAL_COUNT_LIMIT_EXCEEDED default: t.logger.Error("Unexpected error type returned from SignalWorkflowExecution API call.", tag.ErrorType(err), tag.Error(err)) return err diff --git a/service/history/transferQueueActiveTaskExecutor_test.go b/service/history/transferQueueActiveTaskExecutor_test.go index 55e20d8a898..7aee4a2fcc8 100644 --- a/service/history/transferQueueActiveTaskExecutor_test.go +++ b/service/history/transferQueueActiveTaskExecutor_test.go @@ -1640,69 +1640,27 @@ func (s *transferQueueActiveTaskExecutorSuite) TestProcessCancelExecution_Duplic } func (s *transferQueueActiveTaskExecutorSuite) TestProcessSignalExecution_Success() { - execution := commonpb.WorkflowExecution{ - WorkflowId: "some random workflow ID", - RunId: uuid.New(), - } - workflowType := "some random workflow type" - taskQueueName := "some random task queue" - - targetExecution := commonpb.WorkflowExecution{ - WorkflowId: "some random target workflow ID", - RunId: uuid.New(), - } - signalName := "some random signal name" - signalInput := payloads.EncodeString("some random signal input") - signalControl := "some random signal control" - signalHeader := &commonpb.Header{ - Fields: map[string]*commonpb.Payload{"signal header key": payload.EncodeString("signal header value")}, - } - - mutableState := workflow.TestGlobalMutableState(s.mockShard, s.mockShard.GetEventsCache(), s.logger, s.version, execution.GetRunId()) - _, err := mutableState.AddWorkflowExecutionStartedEvent( - execution, - &historyservice.StartWorkflowExecutionRequest{ - Attempt: 1, - NamespaceId: s.namespaceID.String(), - StartRequest: &workflowservice.StartWorkflowExecutionRequest{ - WorkflowType: &commonpb.WorkflowType{Name: workflowType}, - TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueueName}, - WorkflowExecutionTimeout: timestamp.DurationPtr(2 * time.Second), - WorkflowTaskTimeout: timestamp.DurationPtr(1 * time.Second), - }, - }, - ) - s.Nil(err) - - wt := addWorkflowTaskScheduledEvent(mutableState) - event := addWorkflowTaskStartedEvent(mutableState, wt.ScheduledEventID, taskQueueName, uuid.New()) - wt.StartedEventID = event.GetEventId() - event = addWorkflowTaskCompletedEvent(&s.Suite, mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") - - taskID := int64(59) - event, si := addRequestSignalInitiatedEvent(mutableState, event.GetEventId(), uuid.New(), - tests.TargetNamespace, tests.TargetNamespaceID, targetExecution.GetWorkflowId(), targetExecution.GetRunId(), signalName, signalInput, - signalControl, signalHeader) + mutableState, event, si := s.setupSignalExternalWorkflowInitiated() attributes := event.GetSignalExternalWorkflowExecutionInitiatedEventAttributes() transferTask := &tasks.SignalExecutionTask{ WorkflowKey: definition.NewWorkflowKey( - s.namespaceID.String(), - execution.GetWorkflowId(), - execution.GetRunId(), + mutableState.GetExecutionInfo().NamespaceId, + mutableState.GetExecutionInfo().WorkflowId, + mutableState.GetExecutionState().RunId, ), Version: s.version, - TargetNamespaceID: s.targetNamespaceID.String(), - TargetWorkflowID: targetExecution.GetWorkflowId(), - TargetRunID: targetExecution.GetRunId(), - TaskID: taskID, + TargetNamespaceID: attributes.GetNamespaceId(), + TargetWorkflowID: attributes.WorkflowExecution.GetWorkflowId(), + TargetRunID: attributes.WorkflowExecution.GetRunId(), + TaskID: int64(59), TargetChildWorkflowOnly: true, InitiatedEventID: event.GetEventId(), } persistenceMutableState := s.createPersistenceMutableState(mutableState, event.GetEventId(), event.GetVersion()) s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil) - s.mockHistoryClient.EXPECT().SignalWorkflowExecution(gomock.Any(), s.createSignalWorkflowExecutionRequest(s.targetNamespace, transferTask, si, attributes)).Return(nil, nil) + s.mockHistoryClient.EXPECT().SignalWorkflowExecution(gomock.Any(), s.createSignalWorkflowExecutionRequest(namespace.Name(attributes.Namespace), transferTask, si, attributes)).Return(nil, nil) s.mockExecutionMgr.EXPECT().UpdateWorkflowExecution(gomock.Any(), gomock.Any()).Return(tests.UpdateWorkflowExecutionResponse, nil) s.mockClusterMetadata.EXPECT().ClusterNameForFailoverVersion(s.namespaceEntry.IsGlobalNamespace(), s.version).Return(cluster.TestCurrentClusterName).AnyTimes() @@ -1715,151 +1673,166 @@ func (s *transferQueueActiveTaskExecutorSuite) TestProcessSignalExecution_Succes RequestId: si.GetRequestId(), }).Return(nil, nil) - _, _, err = s.transferQueueActiveTaskExecutor.Execute(context.Background(), s.newTaskExecutable(transferTask)) + _, _, err := s.transferQueueActiveTaskExecutor.Execute(context.Background(), s.newTaskExecutable(transferTask)) s.Nil(err) } -func (s *transferQueueActiveTaskExecutorSuite) TestProcessSignalExecution_Failure() { - execution := commonpb.WorkflowExecution{ - WorkflowId: "some random workflow ID", - RunId: uuid.New(), - } - workflowType := "some random workflow type" - taskQueueName := "some random task queue" +func (s *transferQueueActiveTaskExecutorSuite) TestProcessSignalExecution_Failure_TargetWorkflowNotFound() { + mutableState, event, si := s.setupSignalExternalWorkflowInitiated() + attributes := event.GetSignalExternalWorkflowExecutionInitiatedEventAttributes() - targetExecution := commonpb.WorkflowExecution{ - WorkflowId: "some random target workflow ID", - RunId: uuid.New(), - } - signalName := "some random signal name" - signalInput := payloads.EncodeString("some random signal input") - signalControl := "some random signal control" - signalHeader := &commonpb.Header{ - Fields: map[string]*commonpb.Payload{"signal header key": payload.EncodeString("signal header value")}, + transferTask := &tasks.SignalExecutionTask{ + WorkflowKey: definition.NewWorkflowKey( + mutableState.GetExecutionInfo().NamespaceId, + mutableState.GetExecutionInfo().WorkflowId, + mutableState.GetExecutionState().RunId, + ), + Version: s.version, + TargetNamespaceID: attributes.GetNamespaceId(), + TargetWorkflowID: attributes.WorkflowExecution.GetWorkflowId(), + TargetRunID: attributes.WorkflowExecution.GetRunId(), + TaskID: int64(59), + TargetChildWorkflowOnly: true, + InitiatedEventID: event.GetEventId(), } - mutableState := workflow.TestGlobalMutableState(s.mockShard, s.mockShard.GetEventsCache(), s.logger, s.version, execution.GetRunId()) - _, err := mutableState.AddWorkflowExecutionStartedEvent( - execution, - &historyservice.StartWorkflowExecutionRequest{ - Attempt: 1, - NamespaceId: s.namespaceID.String(), - StartRequest: &workflowservice.StartWorkflowExecutionRequest{ - WorkflowType: &commonpb.WorkflowType{Name: workflowType}, - TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueueName}, - WorkflowExecutionTimeout: timestamp.DurationPtr(2 * time.Second), - WorkflowTaskTimeout: timestamp.DurationPtr(1 * time.Second), - }, + persistenceMutableState := s.createPersistenceMutableState(mutableState, event.GetEventId(), event.GetVersion()) + s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil) + s.mockHistoryClient.EXPECT().SignalWorkflowExecution(gomock.Any(), s.createSignalWorkflowExecutionRequest(namespace.Name(attributes.Namespace), transferTask, si, attributes)).Return(nil, serviceerror.NewNotFound("")) + s.mockExecutionMgr.EXPECT().UpdateWorkflowExecution(gomock.Any(), gomock.Any()).DoAndReturn( + func(_ context.Context, request *persistence.UpdateWorkflowExecutionRequest) (*persistence.UpdateWorkflowExecutionResponse, error) { + s.validateUpdateExecutionRequestWithSignalExternalFailedEvent( + si.InitiatedEventId, + enumspb.SIGNAL_EXTERNAL_WORKFLOW_EXECUTION_FAILED_CAUSE_EXTERNAL_WORKFLOW_EXECUTION_NOT_FOUND, + request, + ) + return tests.UpdateWorkflowExecutionResponse, nil }, ) - s.Nil(err) + s.mockClusterMetadata.EXPECT().ClusterNameForFailoverVersion(s.namespaceEntry.IsGlobalNamespace(), s.version).Return(cluster.TestCurrentClusterName).AnyTimes() - wt := addWorkflowTaskScheduledEvent(mutableState) - event := addWorkflowTaskStartedEvent(mutableState, wt.ScheduledEventID, taskQueueName, uuid.New()) - wt.StartedEventID = event.GetEventId() - event = addWorkflowTaskCompletedEvent(&s.Suite, mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") + _, _, err := s.transferQueueActiveTaskExecutor.Execute(context.Background(), s.newTaskExecutable(transferTask)) + s.Nil(err) +} - taskID := int64(59) - event, si := addRequestSignalInitiatedEvent(mutableState, event.GetEventId(), uuid.New(), - tests.TargetNamespace, tests.TargetNamespaceID, targetExecution.GetWorkflowId(), targetExecution.GetRunId(), signalName, signalInput, - signalControl, signalHeader) +func (s *transferQueueActiveTaskExecutorSuite) TestProcessSignalExecution_Failure_TargetNamespaceNotFound() { + mutableState, event, si := s.setupSignalExternalWorkflowInitiated() attributes := event.GetSignalExternalWorkflowExecutionInitiatedEventAttributes() transferTask := &tasks.SignalExecutionTask{ WorkflowKey: definition.NewWorkflowKey( - s.namespaceID.String(), - execution.GetWorkflowId(), - execution.GetRunId(), + mutableState.GetExecutionInfo().NamespaceId, + mutableState.GetExecutionInfo().WorkflowId, + mutableState.GetExecutionState().RunId, ), Version: s.version, - TargetNamespaceID: s.targetNamespaceID.String(), - TargetWorkflowID: targetExecution.GetWorkflowId(), - TargetRunID: targetExecution.GetRunId(), - TaskID: taskID, + TargetNamespaceID: tests.MissedNamespaceID.String(), + TargetWorkflowID: attributes.WorkflowExecution.GetWorkflowId(), + TargetRunID: attributes.WorkflowExecution.GetRunId(), + TaskID: int64(59), TargetChildWorkflowOnly: true, InitiatedEventID: event.GetEventId(), } persistenceMutableState := s.createPersistenceMutableState(mutableState, event.GetEventId(), event.GetVersion()) s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil) - s.mockHistoryClient.EXPECT().SignalWorkflowExecution(gomock.Any(), s.createSignalWorkflowExecutionRequest(s.targetNamespace, transferTask, si, attributes)).Return(nil, serviceerror.NewNotFound("")) - s.mockExecutionMgr.EXPECT().UpdateWorkflowExecution(gomock.Any(), gomock.Any()).Return(tests.UpdateWorkflowExecutionResponse, nil) + s.mockExecutionMgr.EXPECT().UpdateWorkflowExecution(gomock.Any(), gomock.Any()).DoAndReturn( + func(_ context.Context, request *persistence.UpdateWorkflowExecutionRequest) (*persistence.UpdateWorkflowExecutionResponse, error) { + s.validateUpdateExecutionRequestWithSignalExternalFailedEvent( + si.InitiatedEventId, + enumspb.SIGNAL_EXTERNAL_WORKFLOW_EXECUTION_FAILED_CAUSE_NAMESPACE_NOT_FOUND, + request, + ) + return tests.UpdateWorkflowExecutionResponse, nil + }, + ) s.mockClusterMetadata.EXPECT().ClusterNameForFailoverVersion(s.namespaceEntry.IsGlobalNamespace(), s.version).Return(cluster.TestCurrentClusterName).AnyTimes() - _, _, err = s.transferQueueActiveTaskExecutor.Execute(context.Background(), s.newTaskExecutable(transferTask)) + _, _, err := s.transferQueueActiveTaskExecutor.Execute(context.Background(), s.newTaskExecutable(transferTask)) s.Nil(err) } -func (s *transferQueueActiveTaskExecutorSuite) TestProcessSignalExecution_Failure_TargetNamespaceNotFound() { - execution := commonpb.WorkflowExecution{ - WorkflowId: "some random workflow ID", - RunId: uuid.New(), - } - workflowType := "some random workflow type" - taskQueueName := "some random task queue" +func (s *transferQueueActiveTaskExecutorSuite) TestProcessSignalExecution_Failure_SignalCountLimitExceeded() { + mutableState, event, si := s.setupSignalExternalWorkflowInitiated() + attributes := event.GetSignalExternalWorkflowExecutionInitiatedEventAttributes() - targetExecution := commonpb.WorkflowExecution{ - WorkflowId: "some random target workflow ID", - RunId: uuid.New(), - } - signalName := "some random signal name" - signalInput := payloads.EncodeString("some random signal input") - signalControl := "some random signal control" - signalHeader := &commonpb.Header{ - Fields: map[string]*commonpb.Payload{"signal header key": payload.EncodeString("signal header value")}, + transferTask := &tasks.SignalExecutionTask{ + WorkflowKey: definition.NewWorkflowKey( + mutableState.GetExecutionInfo().NamespaceId, + mutableState.GetExecutionInfo().WorkflowId, + mutableState.GetExecutionState().RunId, + ), + Version: s.version, + TargetNamespaceID: attributes.GetNamespaceId(), + TargetWorkflowID: attributes.WorkflowExecution.GetWorkflowId(), + TargetRunID: attributes.WorkflowExecution.GetRunId(), + TaskID: int64(59), + TargetChildWorkflowOnly: true, + InitiatedEventID: event.GetEventId(), } - mutableState := workflow.TestGlobalMutableState(s.mockShard, s.mockShard.GetEventsCache(), s.logger, s.version, execution.GetRunId()) - _, err := mutableState.AddWorkflowExecutionStartedEvent( - execution, - &historyservice.StartWorkflowExecutionRequest{ - Attempt: 1, - NamespaceId: s.namespaceID.String(), - StartRequest: &workflowservice.StartWorkflowExecutionRequest{ - WorkflowType: &commonpb.WorkflowType{Name: workflowType}, - TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueueName}, - WorkflowExecutionTimeout: timestamp.DurationPtr(2 * time.Second), - WorkflowTaskTimeout: timestamp.DurationPtr(1 * time.Second), - }, + persistenceMutableState := s.createPersistenceMutableState(mutableState, event.GetEventId(), event.GetVersion()) + s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil) + s.mockHistoryClient.EXPECT().SignalWorkflowExecution(gomock.Any(), s.createSignalWorkflowExecutionRequest(namespace.Name(attributes.Namespace), transferTask, si, attributes)).Return(nil, consts.ErrSignalsLimitExceeded) + s.mockExecutionMgr.EXPECT().UpdateWorkflowExecution(gomock.Any(), gomock.Any()).DoAndReturn( + func(_ context.Context, request *persistence.UpdateWorkflowExecutionRequest) (*persistence.UpdateWorkflowExecutionResponse, error) { + s.validateUpdateExecutionRequestWithSignalExternalFailedEvent( + si.InitiatedEventId, + enumspb.SIGNAL_EXTERNAL_WORKFLOW_EXECUTION_FAILED_CAUSE_SIGNAL_COUNT_LIMIT_EXCEEDED, + request, + ) + return tests.UpdateWorkflowExecutionResponse, nil }, ) - s.NoError(err) + s.mockClusterMetadata.EXPECT().ClusterNameForFailoverVersion(s.namespaceEntry.IsGlobalNamespace(), s.version).Return(cluster.TestCurrentClusterName).AnyTimes() - wt := addWorkflowTaskScheduledEvent(mutableState) - event := addWorkflowTaskStartedEvent(mutableState, wt.ScheduledEventID, taskQueueName, uuid.New()) - wt.StartedEventID = event.GetEventId() - event = addWorkflowTaskCompletedEvent(&s.Suite, mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") + _, _, err := s.transferQueueActiveTaskExecutor.Execute(context.Background(), s.newTaskExecutable(transferTask)) + s.Nil(err) +} - taskID := int64(59) - event, _ = addRequestSignalInitiatedEvent(mutableState, event.GetEventId(), uuid.New(), - tests.TargetNamespace, tests.TargetNamespaceID, targetExecution.GetWorkflowId(), targetExecution.GetRunId(), signalName, signalInput, - signalControl, signalHeader) +func (s *transferQueueActiveTaskExecutorSuite) TestProcessSignalExecution_Duplication() { + mutableState, event, _ := s.setupSignalExternalWorkflowInitiated() + attributes := event.GetSignalExternalWorkflowExecutionInitiatedEventAttributes() transferTask := &tasks.SignalExecutionTask{ WorkflowKey: definition.NewWorkflowKey( - s.namespaceID.String(), - execution.GetWorkflowId(), - execution.GetRunId(), + mutableState.GetExecutionInfo().NamespaceId, + mutableState.GetExecutionInfo().WorkflowId, + mutableState.GetExecutionState().RunId, ), Version: s.version, - TargetNamespaceID: tests.MissedNamespaceID.String(), - TargetWorkflowID: targetExecution.GetWorkflowId(), - TargetRunID: targetExecution.GetRunId(), - TaskID: taskID, + TargetNamespaceID: attributes.GetNamespaceId(), + TargetWorkflowID: attributes.WorkflowExecution.GetWorkflowId(), + TargetRunID: attributes.WorkflowExecution.GetRunId(), + TaskID: int64(59), TargetChildWorkflowOnly: true, InitiatedEventID: event.GetEventId(), } + event = addSignaledEvent( + mutableState, + event.GetEventId(), + tests.TargetNamespace, + namespace.ID(transferTask.TargetNamespaceID), + attributes.WorkflowExecution.GetWorkflowId(), + attributes.WorkflowExecution.GetRunId(), + "", + ) + // Flush buffered events so real IDs get assigned + mutableState.FlushBufferedEvents() + persistenceMutableState := s.createPersistenceMutableState(mutableState, event.GetEventId(), event.GetVersion()) s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil) - s.mockExecutionMgr.EXPECT().UpdateWorkflowExecution(gomock.Any(), gomock.Any()).Return(tests.UpdateWorkflowExecutionResponse, nil) - s.mockClusterMetadata.EXPECT().ClusterNameForFailoverVersion(s.namespaceEntry.IsGlobalNamespace(), s.version).Return(cluster.TestCurrentClusterName).AnyTimes() - _, _, err = s.transferQueueActiveTaskExecutor.Execute(context.Background(), s.newTaskExecutable(transferTask)) + _, _, err := s.transferQueueActiveTaskExecutor.Execute(context.Background(), s.newTaskExecutable(transferTask)) s.Nil(err) } -func (s *transferQueueActiveTaskExecutorSuite) TestProcessSignalExecution_Duplication() { +func (s *transferQueueActiveTaskExecutorSuite) setupSignalExternalWorkflowInitiated() ( + *workflow.MutableStateImpl, + *historypb.HistoryEvent, + *persistencespb.SignalInfo, +) { execution := commonpb.WorkflowExecution{ WorkflowId: "some random workflow ID", RunId: uuid.New(), @@ -1892,42 +1865,40 @@ func (s *transferQueueActiveTaskExecutorSuite) TestProcessSignalExecution_Duplic }, }, ) - s.Nil(err) + s.NoError(err) wt := addWorkflowTaskScheduledEvent(mutableState) event := addWorkflowTaskStartedEvent(mutableState, wt.ScheduledEventID, taskQueueName, uuid.New()) wt.StartedEventID = event.GetEventId() event = addWorkflowTaskCompletedEvent(&s.Suite, mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") - taskID := int64(59) - event, _ = addRequestSignalInitiatedEvent(mutableState, event.GetEventId(), uuid.New(), + event, signalInfo := addRequestSignalInitiatedEvent(mutableState, event.GetEventId(), uuid.New(), tests.TargetNamespace, tests.TargetNamespaceID, targetExecution.GetWorkflowId(), targetExecution.GetRunId(), signalName, signalInput, signalControl, signalHeader) - transferTask := &tasks.SignalExecutionTask{ - WorkflowKey: definition.NewWorkflowKey( - s.namespaceID.String(), - execution.GetWorkflowId(), - execution.GetRunId(), - ), - Version: s.version, - TargetNamespaceID: s.targetNamespaceID.String(), - TargetWorkflowID: targetExecution.GetWorkflowId(), - TargetRunID: targetExecution.GetRunId(), - TaskID: taskID, - TargetChildWorkflowOnly: true, - InitiatedEventID: event.GetEventId(), - } - - event = addSignaledEvent(mutableState, event.GetEventId(), tests.TargetNamespace, tests.TargetNamespaceID, targetExecution.GetWorkflowId(), targetExecution.GetRunId(), "") - // Flush buffered events so real IDs get assigned - mutableState.FlushBufferedEvents() - - persistenceMutableState := s.createPersistenceMutableState(mutableState, event.GetEventId(), event.GetVersion()) - s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil) + return mutableState, event, signalInfo +} - _, _, err = s.transferQueueActiveTaskExecutor.Execute(context.Background(), s.newTaskExecutable(transferTask)) - s.Nil(err) +func (s *transferQueueActiveTaskExecutorSuite) validateUpdateExecutionRequestWithSignalExternalFailedEvent( + signalInitiatedEventId int64, + expectedFailedCause enumspb.SignalExternalWorkflowExecutionFailedCause, + request *persistence.UpdateWorkflowExecutionRequest, +) { + s.Len(request.UpdateWorkflowMutation.DeleteSignalInfos, 1) + _, ok := request.UpdateWorkflowMutation.DeleteSignalInfos[signalInitiatedEventId] + s.True(ok) + + numFailedEvent := 0 + s.Len(request.UpdateWorkflowEvents, 1) + for _, event := range request.UpdateWorkflowEvents[0].Events { + if event.EventType != enumspb.EVENT_TYPE_SIGNAL_EXTERNAL_WORKFLOW_EXECUTION_FAILED { + continue + } + attr := event.GetSignalExternalWorkflowExecutionFailedEventAttributes() + s.Equal(expectedFailedCause, attr.GetCause()) + numFailedEvent++ + } + s.Equal(1, numFailedEvent) } func (s *transferQueueActiveTaskExecutorSuite) TestProcessStartChildExecution_Success() {