diff --git a/service/history/api/signal_workflow_util.go b/service/history/api/signal_workflow_util.go index 1b8e0febd90..6a5a2fbc78e 100644 --- a/service/history/api/signal_workflow_util.go +++ b/service/history/api/signal_workflow_util.go @@ -83,5 +83,14 @@ func ValidateSignal( return consts.ErrSignalsLimitExceeded } + if mutableState.IsWorkflowCloseAttempted() && mutableState.HasStartedWorkflowTask() { + shard.GetThrottledLogger().Info("Signal rejected because workflow is closing", + tag.WorkflowNamespaceID(namespaceID), + tag.WorkflowID(workflowID), + tag.WorkflowRunID(runID), + ) + return consts.ErrWorkflowClosing + } + return nil } diff --git a/service/history/api/signalwithstartworkflow/signal_with_start_workflow.go b/service/history/api/signalwithstartworkflow/signal_with_start_workflow.go index 321043b91e3..349ff8a14d6 100644 --- a/service/history/api/signalwithstartworkflow/signal_with_start_workflow.go +++ b/service/history/api/signalwithstartworkflow/signal_with_start_workflow.go @@ -264,11 +264,17 @@ func signalWorkflow( request.GetSignalInput().Size(), "SignalWithStartWorkflowExecution", ); err != nil { + // in-memory mutable state is still clean, release the lock with nil error to prevent + // clearing and reloading mutable state + workflowContext.GetReleaseFn()(nil) return err } if request.GetRequestId() != "" && mutableState.IsSignalRequested(request.GetRequestId()) { // duplicate signal + // in-memory mutable state is still clean, release the lock with nil error to prevent + // clearing and reloading mutable state + workflowContext.GetReleaseFn()(nil) return nil } if request.GetRequestId() != "" { diff --git a/service/history/api/signalwithstartworkflow/signal_with_start_workflow_test.go b/service/history/api/signalwithstartworkflow/signal_with_start_workflow_test.go index ce2441e3683..4acec96afc8 100644 --- a/service/history/api/signalwithstartworkflow/signal_with_start_workflow_test.go +++ b/service/history/api/signalwithstartworkflow/signal_with_start_workflow_test.go @@ -43,6 +43,7 @@ import ( "go.temporal.io/server/common/clock" "go.temporal.io/server/common/log" "go.temporal.io/server/service/history/api" + "go.temporal.io/server/service/history/consts" "go.temporal.io/server/service/history/shard" "go.temporal.io/server/service/history/tests" "go.temporal.io/server/service/history/workflow" @@ -109,6 +110,27 @@ func (s *signalWithStartWorkflowSuite) TearDownTest() { s.controller.Finish() } +func (s *signalWithStartWorkflowSuite) TestSignalWorkflow_WorkflowCloseAttempted() { + ctx := context.Background() + currentWorkflowContext := api.NewWorkflowContext( + s.currentContext, + wcache.NoopReleaseFn, + s.currentMutableState, + ) + request := s.randomRequest() + + s.currentMutableState.EXPECT().IsWorkflowCloseAttempted().Return(true) + s.currentMutableState.EXPECT().HasStartedWorkflowTask().Return(true) + + err := signalWorkflow( + ctx, + s.shardContext, + currentWorkflowContext, + request, + ) + s.Error(consts.ErrWorkflowClosing, err) +} + func (s *signalWithStartWorkflowSuite) TestSignalWorkflow_Dedup() { ctx := context.Background() currentWorkflowContext := api.NewWorkflowContext( @@ -118,6 +140,7 @@ func (s *signalWithStartWorkflowSuite) TestSignalWorkflow_Dedup() { ) request := s.randomRequest() + s.currentMutableState.EXPECT().IsWorkflowCloseAttempted().Return(false) s.currentMutableState.EXPECT().IsSignalRequested(request.GetRequestId()).Return(true) err := signalWorkflow( @@ -139,6 +162,7 @@ func (s *signalWithStartWorkflowSuite) TestSignalWorkflow_NewWorkflowTask() { request := s.randomRequest() request.SkipGenerateWorkflowTask = false + s.currentMutableState.EXPECT().IsWorkflowCloseAttempted().Return(false) s.currentMutableState.EXPECT().IsSignalRequested(request.GetRequestId()).Return(false) s.currentMutableState.EXPECT().AddSignalRequested(request.GetRequestId()) s.currentMutableState.EXPECT().AddWorkflowExecutionSignaled( @@ -170,6 +194,7 @@ func (s *signalWithStartWorkflowSuite) TestSignalWorkflow_NoNewWorkflowTask() { ) request := s.randomRequest() + s.currentMutableState.EXPECT().IsWorkflowCloseAttempted().Return(false) s.currentMutableState.EXPECT().IsSignalRequested(request.GetRequestId()).Return(false) s.currentMutableState.EXPECT().AddSignalRequested(request.GetRequestId()) s.currentMutableState.EXPECT().AddWorkflowExecutionSignaled( diff --git a/service/history/api/signalworkflow/api.go b/service/history/api/signalworkflow/api.go index 744de8a8af3..b8884020e56 100644 --- a/service/history/api/signalworkflow/api.go +++ b/service/history/api/signalworkflow/api.go @@ -69,15 +69,14 @@ func Invoke( }, nil } + releaseFn := workflowContext.GetReleaseFn() if !mutableState.IsWorkflowExecutionRunning() { + // in-memory mutable state is still clean, release the lock with nil error to prevent + // clearing and reloading mutable state + releaseFn(nil) return nil, consts.ErrWorkflowCompleted } - executionInfo := mutableState.GetExecutionInfo() - - // Do not create workflow task when the workflow has first workflow task backoff and execution is not started yet - createWorkflowTask := !mutableState.IsWorkflowPendingOnWorkflowTaskBackoff() && !request.GetSkipGenerateWorkflowTask() - if err := api.ValidateSignal( ctx, shard, @@ -85,14 +84,21 @@ func Invoke( request.GetInput().Size(), "SignalWorkflowExecution", ); err != nil { + releaseFn(nil) return nil, err } + executionInfo := mutableState.GetExecutionInfo() + + // Do not create workflow task when the workflow has first workflow task backoff and execution is not started yet + createWorkflowTask := !mutableState.IsWorkflowPendingOnWorkflowTaskBackoff() && !request.GetSkipGenerateWorkflowTask() + if childWorkflowOnly { parentWorkflowID := executionInfo.ParentWorkflowId parentRunID := executionInfo.ParentRunId if parentExecution.GetWorkflowId() != parentWorkflowID || parentExecution.GetRunId() != parentRunID { + releaseFn(nil) return nil, consts.ErrWorkflowParent } } diff --git a/service/history/api/signalworkflow/api_test.go b/service/history/api/signalworkflow/api_test.go new file mode 100644 index 00000000000..03ecd39fedd --- /dev/null +++ b/service/history/api/signalworkflow/api_test.go @@ -0,0 +1,149 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package signalworkflow + +import ( + "context" + "math/rand" + "testing" + "time" + + commonpb "go.temporal.io/api/common/v1" + + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + "go.temporal.io/api/workflowservice/v1" + "go.temporal.io/server/api/historyservice/v1" + "go.temporal.io/server/api/persistence/v1" + "go.temporal.io/server/common/clock" + "go.temporal.io/server/common/cluster" + "go.temporal.io/server/common/log" + "go.temporal.io/server/common/metrics" + "go.temporal.io/server/common/namespace" + "go.temporal.io/server/service/history/api" + "go.temporal.io/server/service/history/consts" + "go.temporal.io/server/service/history/shard" + "go.temporal.io/server/service/history/tests" + "go.temporal.io/server/service/history/workflow" + wcache "go.temporal.io/server/service/history/workflow/cache" +) + +type ( + signalWorkflowSuite struct { + suite.Suite + *require.Assertions + + controller *gomock.Controller + shardContext *shard.MockContext + namespaceRegistry *namespace.MockRegistry + + workflowCache *wcache.MockCache + workflowConsistencyChecker api.WorkflowConsistencyChecker + + currentContext *workflow.MockContext + currentMutableState *workflow.MockMutableState + } +) + +func TestSignalWorkflowSuite(t *testing.T) { + s := new(signalWorkflowSuite) + suite.Run(t, s) +} + +func (s *signalWorkflowSuite) SetupSuite() { + rand.Seed(time.Now().UnixNano()) +} + +func (s *signalWorkflowSuite) TearDownSuite() { +} + +func (s *signalWorkflowSuite) SetupTest() { + s.Assertions = require.New(s.T()) + + s.controller = gomock.NewController(s.T()) + s.namespaceRegistry = namespace.NewMockRegistry(s.controller) + s.namespaceRegistry.EXPECT().GetNamespaceByID(tests.GlobalNamespaceEntry.ID()).Return(tests.GlobalNamespaceEntry, nil).AnyTimes() + + s.shardContext = shard.NewMockContext(s.controller) + s.shardContext.EXPECT().GetConfig().Return(tests.NewDynamicConfig()).AnyTimes() + s.shardContext.EXPECT().GetLogger().Return(log.NewTestLogger()).AnyTimes() + s.shardContext.EXPECT().GetThrottledLogger().Return(log.NewTestLogger()).AnyTimes() + s.shardContext.EXPECT().GetMetricsHandler().Return(metrics.NoopMetricsHandler).AnyTimes() + s.shardContext.EXPECT().GetTimeSource().Return(clock.NewRealTimeSource()).AnyTimes() + s.shardContext.EXPECT().GetNamespaceRegistry().Return(s.namespaceRegistry).AnyTimes() + s.shardContext.EXPECT().GetClusterMetadata().Return(cluster.NewMetadataForTest(cluster.NewTestClusterMetadataConfig(true, true))).AnyTimes() + + s.currentMutableState = workflow.NewMockMutableState(s.controller) + s.currentMutableState.EXPECT().GetNamespaceEntry().Return(tests.GlobalNamespaceEntry).AnyTimes() + s.currentMutableState.EXPECT().GetExecutionInfo().Return(&persistence.WorkflowExecutionInfo{ + WorkflowId: tests.WorkflowID, + }).AnyTimes() + s.currentMutableState.EXPECT().GetExecutionState().Return(&persistence.WorkflowExecutionState{ + RunId: tests.RunID, + }).AnyTimes() + + s.currentContext = workflow.NewMockContext(s.controller) + s.currentContext.EXPECT().LoadMutableState(gomock.Any()).Return(s.currentMutableState, nil).AnyTimes() + + s.workflowCache = wcache.NewMockCache(s.controller) + s.workflowCache.EXPECT().GetOrCreateWorkflowExecution(gomock.Any(), gomock.Any(), gomock.Any(), workflow.LockPriorityHigh). + Return(s.currentContext, wcache.NoopReleaseFn, nil).AnyTimes() + + s.workflowConsistencyChecker = api.NewWorkflowConsistencyChecker( + s.shardContext, + s.workflowCache, + ) +} + +func (s *signalWorkflowSuite) TearDownTest() { + s.controller.Finish() +} + +func (s *signalWorkflowSuite) TestSignalWorkflow_WorkflowCloseAttempted() { + s.currentMutableState.EXPECT().IsWorkflowExecutionRunning().Return(true) + s.currentMutableState.EXPECT().IsWorkflowCloseAttempted().Return(true) + s.currentMutableState.EXPECT().HasStartedWorkflowTask().Return(true) + + resp, err := Invoke( + context.Background(), + &historyservice.SignalWorkflowExecutionRequest{ + NamespaceId: tests.NamespaceID.String(), + SignalRequest: &workflowservice.SignalWorkflowExecutionRequest{ + Namespace: tests.Namespace.String(), + WorkflowExecution: &commonpb.WorkflowExecution{ + WorkflowId: tests.WorkflowID, + RunId: tests.RunID, + }, + SignalName: "signal-name", + Input: nil, + }, + }, + s.shardContext, + s.workflowConsistencyChecker, + ) + s.Nil(resp) + s.Error(consts.ErrWorkflowClosing, err) +} diff --git a/service/history/consts/const.go b/service/history/consts/const.go index e23bd6d2bae..db2cf88d9c3 100644 --- a/service/history/consts/const.go +++ b/service/history/consts/const.go @@ -69,6 +69,8 @@ var ( ErrDeserializingToken = serviceerror.NewInvalidArgument("error deserializing task token") // ErrSignalsLimitExceeded is the error indicating limit reached for maximum number of signal events ErrSignalsLimitExceeded = serviceerror.NewInvalidArgument("exceeded workflow execution limit for signal events") + // ErrWorkflowClosing is the error indicating requests to workflow got rejected due to workflow is closing + ErrWorkflowClosing = serviceerror.NewUnavailable("workflow operation rejected because workflow is closing") // ErrEventsAterWorkflowFinish is the error indicating server error trying to write events after workflow finish event ErrEventsAterWorkflowFinish = serviceerror.NewInternal("error validating last event being workflow finish event") // ErrQueryEnteredInvalidState is error indicating query entered invalid state diff --git a/service/history/workflow/mutable_state.go b/service/history/workflow/mutable_state.go index a574ad3a89e..de84c52e3b6 100644 --- a/service/history/workflow/mutable_state.go +++ b/service/history/workflow/mutable_state.go @@ -153,7 +153,6 @@ type ( CloneToProto() *persistencespb.WorkflowMutableState RetryActivity(ai *persistencespb.ActivityInfo, failure *failurepb.Failure) (enumspb.RetryState, error) GetTransientWorkflowTaskInfo(workflowTask *WorkflowTaskInfo, identity string) *historyspb.TransientWorkflowTaskInfo - DeleteWorkflowTask() DeleteSignalRequested(requestID string) FlushBufferedEvents() GetAcceptedWorkflowExecutionUpdateIDs(context.Context) []string @@ -209,6 +208,7 @@ type ( HasPendingWorkflowTask() bool HadOrHasWorkflowTask() bool IsCancelRequested() bool + IsWorkflowCloseAttempted() bool IsCurrentWorkflowGuaranteed() bool IsSignalRequested(requestID string) bool GetApproximatePersistedSize() int diff --git a/service/history/workflow/mutable_state_impl.go b/service/history/workflow/mutable_state_impl.go index 57fb1bb6aa9..afb232eaee3 100644 --- a/service/history/workflow/mutable_state_impl.go +++ b/service/history/workflow/mutable_state_impl.go @@ -166,6 +166,11 @@ type ( // record if a event has been applied to mutable state // TODO: persist this to db appliedEvents map[string]struct{} + // a flag indicating if workflow has attempted to close (complete/cancel/continue as new) + // but failed due to undelievered buffered events + // the flag will be unset whenever workflow task successfully completed, timedout or failed + // due to cause other than UnhandledCommand + workflowCloseAttempted bool InsertTasks map[tasks.Category][]tasks.Task @@ -1501,11 +1506,6 @@ func (ms *MutableStateImpl) HasAnyBufferedEvent(filter BufferedEventFilter) bool return ms.hBuilder.HasAnyBufferedEvent(filter) } -// DeleteWorkflowTask deletes a workflow task. -func (ms *MutableStateImpl) DeleteWorkflowTask() { - ms.workflowTaskManager.DeleteWorkflowTask() -} - // GetLastFirstEventIDTxnID returns last first event ID and corresponding transaction ID // first event ID is the ID of a batch of events in a single history events record func (ms *MutableStateImpl) GetLastFirstEventIDTxnID() (int64, int64) { @@ -1543,6 +1543,10 @@ func (ms *MutableStateImpl) IsCancelRequested() bool { return ms.executionInfo.CancelRequested } +func (ms *MutableStateImpl) IsWorkflowCloseAttempted() bool { + return ms.workflowCloseAttempted +} + func (ms *MutableStateImpl) IsSignalRequested( requestID string, ) bool { diff --git a/service/history/workflow/mutable_state_mock.go b/service/history/workflow/mutable_state_mock.go index 4d43043668e..3f3b3c1af12 100644 --- a/service/history/workflow/mutable_state_mock.go +++ b/service/history/workflow/mutable_state_mock.go @@ -971,18 +971,6 @@ func (mr *MockMutableStateMockRecorder) DeleteSignalRequested(requestID interfac return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteSignalRequested", reflect.TypeOf((*MockMutableState)(nil).DeleteSignalRequested), requestID) } -// DeleteWorkflowTask mocks base method. -func (m *MockMutableState) DeleteWorkflowTask() { - m.ctrl.T.Helper() - m.ctrl.Call(m, "DeleteWorkflowTask") -} - -// DeleteWorkflowTask indicates an expected call of DeleteWorkflowTask. -func (mr *MockMutableStateMockRecorder) DeleteWorkflowTask() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteWorkflowTask", reflect.TypeOf((*MockMutableState)(nil).DeleteWorkflowTask)) -} - // FlushBufferedEvents mocks base method. func (m *MockMutableState) FlushBufferedEvents() { m.ctrl.T.Helper() @@ -1877,6 +1865,20 @@ func (mr *MockMutableStateMockRecorder) IsTransientWorkflowTask() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsTransientWorkflowTask", reflect.TypeOf((*MockMutableState)(nil).IsTransientWorkflowTask)) } +// IsWorkflowCloseAttempted mocks base method. +func (m *MockMutableState) IsWorkflowCloseAttempted() bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "IsWorkflowCloseAttempted") + ret0, _ := ret[0].(bool) + return ret0 +} + +// IsWorkflowCloseAttempted indicates an expected call of IsWorkflowCloseAttempted. +func (mr *MockMutableStateMockRecorder) IsWorkflowCloseAttempted() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsWorkflowCloseAttempted", reflect.TypeOf((*MockMutableState)(nil).IsWorkflowCloseAttempted)) +} + // IsWorkflowExecutionRunning mocks base method. func (m *MockMutableState) IsWorkflowExecutionRunning() bool { m.ctrl.T.Helper() diff --git a/service/history/workflow/workflow_task_state_machine.go b/service/history/workflow/workflow_task_state_machine.go index 229201471a7..5854f79fd0d 100644 --- a/service/history/workflow/workflow_task_state_machine.go +++ b/service/history/workflow/workflow_task_state_machine.go @@ -216,7 +216,7 @@ func (m *workflowTaskStateMachine) ReplicateWorkflowTaskCompletedEvent( } func (m *workflowTaskStateMachine) ReplicateWorkflowTaskFailedEvent() error { - m.FailWorkflowTask(true) + m.failWorkflowTask(true) return nil } @@ -228,7 +228,7 @@ func (m *workflowTaskStateMachine) ReplicateWorkflowTaskTimedOutEvent( if timeoutType == enumspb.TIMEOUT_TYPE_SCHEDULE_TO_START { incrementAttempt = false } - m.FailWorkflowTask(incrementAttempt) + m.failWorkflowTask(incrementAttempt) return nil } @@ -621,10 +621,14 @@ func (m *workflowTaskStateMachine) AddWorkflowTaskFailedEvent( return nil, err } - // always clear workflow task attempt for reset - if cause == enumspb.WORKFLOW_TASK_FAILED_CAUSE_RESET_WORKFLOW || - cause == enumspb.WORKFLOW_TASK_FAILED_CAUSE_FAILOVER_CLOSE_COMMAND { + switch cause { + case enumspb.WORKFLOW_TASK_FAILED_CAUSE_RESET_WORKFLOW, + enumspb.WORKFLOW_TASK_FAILED_CAUSE_FAILOVER_CLOSE_COMMAND: + // always clear workflow task attempt for reset and failover close command m.ms.executionInfo.WorkflowTaskAttempt = 1 + case enumspb.WORKFLOW_TASK_FAILED_CAUSE_UNHANDLED_COMMAND: + // workflow attempted to close but failed due to unhandled buffer events + m.ms.workflowCloseAttempted = true } // Attempt counter was incremented directly in mutable state. Current WT attempt counter needs to be updated. @@ -676,7 +680,7 @@ func (m *workflowTaskStateMachine) AddWorkflowTaskTimedOutEvent( return event, nil } -func (m *workflowTaskStateMachine) FailWorkflowTask( +func (m *workflowTaskStateMachine) failWorkflowTask( incrementAttempt bool, ) { // Increment attempts only if workflow task is failing on non-sticky task queue. @@ -707,8 +711,8 @@ func (m *workflowTaskStateMachine) FailWorkflowTask( m.UpdateWorkflowTask(failWorkflowTaskInfo) } -// DeleteWorkflowTask deletes a workflow task. -func (m *workflowTaskStateMachine) DeleteWorkflowTask() { +// deleteWorkflowTask deletes a workflow task. +func (m *workflowTaskStateMachine) deleteWorkflowTask() { resetWorkflowTaskInfo := &WorkflowTaskInfo{ Version: common.EmptyVersion, ScheduledEventID: common.EmptyEventID, @@ -733,6 +737,23 @@ func (m *workflowTaskStateMachine) DeleteWorkflowTask() { func (m *workflowTaskStateMachine) UpdateWorkflowTask( workflowTask *WorkflowTaskInfo, ) { + if m.HasStartedWorkflowTask() && workflowTask.StartedEventID == common.EmptyEventID { + // reset the flag whenever started workflow task closes, there could be three cases: + // 1. workflow task completed: + // a. workflow task contains close workflow command, the fact that workflow task + // completes successfully means workflow will also close and the value of the + // flag doesn't matter. + // b. workflow task doesn't contain close workflow command, then by definition, + // workflow is not trying to close, so unset the flag. + // 2. workflow task timedout: we don't know if workflow is trying to close or not, + // reset the flag to be safe. It's possible that workflow task is trying to signal + // itself within a local activity when this flag is set, which may result in timeout. + // reset the flag will allow the workflow to proceed. + // 3. workflow failed: always reset the flag here. If failure is due to unhandled command, + // AddWorkflowTaskFailedEvent will set the flag. + m.ms.workflowCloseAttempted = false + } + m.ms.executionInfo.WorkflowTaskVersion = workflowTask.Version m.ms.executionInfo.WorkflowTaskScheduledEventId = workflowTask.ScheduledEventID m.ms.executionInfo.WorkflowTaskStartedEventId = workflowTask.StartedEventID @@ -908,7 +929,7 @@ func (m *workflowTaskStateMachine) getWorkflowTaskInfo() *WorkflowTaskInfo { func (m *workflowTaskStateMachine) beforeAddWorkflowTaskCompletedEvent() { // Make sure to delete workflow task before adding events. Otherwise they are buffered rather than getting appended. - m.DeleteWorkflowTask() + m.deleteWorkflowTask() } func (m *workflowTaskStateMachine) afterAddWorkflowTaskCompletedEvent( diff --git a/service/history/workflowTaskHandlerCallbacks.go b/service/history/workflowTaskHandlerCallbacks.go index 9e57f938412..2013307ac3d 100644 --- a/service/history/workflowTaskHandlerCallbacks.go +++ b/service/history/workflowTaskHandlerCallbacks.go @@ -726,10 +726,14 @@ func (handler *workflowTaskHandlerCallbacksImpl) handleWorkflowTaskCompleted( if workflowTaskHeartbeatTimeout { // at this point, update is successful, but we still return an error to client so that the worker will give up this workflow + // release workflow lock with nil error to prevent mutable state from being cleared and reloaded + workflowContext.GetReleaseFn()(nil) return nil, serviceerror.NewNotFound("workflow task heartbeat timeout") } if wtFailedCause != nil { + // release workflow lock with nil error to prevent mutable state from being cleared and reloaded + workflowContext.GetReleaseFn()(nil) return nil, serviceerror.NewInvalidArgument(wtFailedCause.Message()) } diff --git a/tests/signal_workflow_test.go b/tests/signal_workflow_test.go index bc8673faf5b..03e0e473f44 100644 --- a/tests/signal_workflow_test.go +++ b/tests/signal_workflow_test.go @@ -798,6 +798,92 @@ func (s *integrationSuite) TestSignalWorkflow_NoWorkflowTaskCreated() { 10 WorkflowExecutionCompleted`, historyResponse.GetHistory()) } +func (s *integrationSuite) TestSignalWorkflow_WorkflowCloseAttempted() { + id := "integration-signal-workflow-workflow-close-attempted-test" + wt := "integration-signal-workflow-workflow-close-attempted-test-type" + tl := "integration-signal-workflow-workflow-close-attempted-test-taskqueue" + identity := "worker1" + workflowType := &commonpb.WorkflowType{Name: wt} + taskQueue := &taskqueuepb.TaskQueue{Name: tl} + + we, err := s.engine.StartWorkflowExecution(NewContext(), &workflowservice.StartWorkflowExecutionRequest{ + RequestId: uuid.New(), + Namespace: s.namespace, + WorkflowId: id, + WorkflowType: workflowType, + TaskQueue: taskQueue, + Input: nil, + WorkflowRunTimeout: timestamp.DurationPtr(100 * time.Second), + WorkflowTaskTimeout: timestamp.DurationPtr(3 * time.Second), + Identity: identity, + }) + s.NoError(err) + + attemptCount := 1 + wtHandler := func( + execution *commonpb.WorkflowExecution, + wt *commonpb.WorkflowType, + previousStartedEventID, startedEventID int64, + history *historypb.History, + ) ([]*commandpb.Command, error) { + if attemptCount == 1 { + _, err := s.engine.SignalWorkflowExecution(NewContext(), &workflowservice.SignalWorkflowExecutionRequest{ + Namespace: s.namespace, + WorkflowExecution: &commonpb.WorkflowExecution{ + WorkflowId: id, + RunId: we.RunId, + }, + SignalName: "buffered-signal", + Identity: identity, + RequestId: uuid.New(), + }) + s.NoError(err) + } + + if attemptCount == 2 { + ctx, _ := rpc.NewContextWithTimeoutAndVersionHeaders(time.Second) + _, err := s.engine.SignalWorkflowExecution(ctx, &workflowservice.SignalWorkflowExecutionRequest{ + Namespace: s.namespace, + WorkflowExecution: &commonpb.WorkflowExecution{ + WorkflowId: id, + RunId: we.RunId, + }, + SignalName: "rejected-signal", + Identity: identity, + RequestId: uuid.New(), + }) + s.Error(err) + s.IsType(&serviceerror.Unavailable{}, err) + } + + attemptCount++ + return []*commandpb.Command{{ + CommandType: enumspb.COMMAND_TYPE_COMPLETE_WORKFLOW_EXECUTION, + Attributes: &commandpb.Command_CompleteWorkflowExecutionCommandAttributes{CompleteWorkflowExecutionCommandAttributes: &commandpb.CompleteWorkflowExecutionCommandAttributes{ + Result: payloads.EncodeString("Done"), + }}, + }}, nil + } + + poller := &TaskPoller{ + Engine: s.engine, + Namespace: s.namespace, + TaskQueue: taskQueue, + Identity: identity, + WorkflowTaskHandler: wtHandler, + Logger: s.Logger, + T: s.T(), + } + + _, err = poller.PollAndProcessWorkflowTask(false, false) + s.Logger.Info("PollAndProcessWorkflowTask", tag.Error(err)) + s.Error(err) + + _, err = poller.PollAndProcessWorkflowTask(false, false) + s.Logger.Info("PollAndProcessWorkflowTask", tag.Error(err)) + s.NoError(err) +} + func (s *integrationSuite) TestSignalExternalWorkflowCommand_WithoutRunID() { id := "integration-signal-external-workflow-test-without-run-id" wt := "integration-signal-external-workflow-test-without-run-id-type"