Skip to content

Commit

Permalink
Reject signal if workflow attempted to close (#4395)
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt authored May 26, 2023
1 parent 01034e4 commit 759aa2c
Show file tree
Hide file tree
Showing 12 changed files with 346 additions and 32 deletions.
9 changes: 9 additions & 0 deletions service/history/api/signal_workflow_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,5 +83,14 @@ func ValidateSignal(
return consts.ErrSignalsLimitExceeded
}

if mutableState.IsWorkflowCloseAttempted() && mutableState.HasStartedWorkflowTask() {
shard.GetThrottledLogger().Info("Signal rejected because workflow is closing",
tag.WorkflowNamespaceID(namespaceID),
tag.WorkflowID(workflowID),
tag.WorkflowRunID(runID),
)
return consts.ErrWorkflowClosing
}

return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -264,11 +264,17 @@ func signalWorkflow(
request.GetSignalInput().Size(),
"SignalWithStartWorkflowExecution",
); err != nil {
// in-memory mutable state is still clean, release the lock with nil error to prevent
// clearing and reloading mutable state
workflowContext.GetReleaseFn()(nil)
return err
}

if request.GetRequestId() != "" && mutableState.IsSignalRequested(request.GetRequestId()) {
// duplicate signal
// in-memory mutable state is still clean, release the lock with nil error to prevent
// clearing and reloading mutable state
workflowContext.GetReleaseFn()(nil)
return nil
}
if request.GetRequestId() != "" {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"go.temporal.io/server/common/clock"
"go.temporal.io/server/common/log"
"go.temporal.io/server/service/history/api"
"go.temporal.io/server/service/history/consts"
"go.temporal.io/server/service/history/shard"
"go.temporal.io/server/service/history/tests"
"go.temporal.io/server/service/history/workflow"
Expand Down Expand Up @@ -109,6 +110,27 @@ func (s *signalWithStartWorkflowSuite) TearDownTest() {
s.controller.Finish()
}

func (s *signalWithStartWorkflowSuite) TestSignalWorkflow_WorkflowCloseAttempted() {
ctx := context.Background()
currentWorkflowContext := api.NewWorkflowContext(
s.currentContext,
wcache.NoopReleaseFn,
s.currentMutableState,
)
request := s.randomRequest()

s.currentMutableState.EXPECT().IsWorkflowCloseAttempted().Return(true)
s.currentMutableState.EXPECT().HasStartedWorkflowTask().Return(true)

err := signalWorkflow(
ctx,
s.shardContext,
currentWorkflowContext,
request,
)
s.Error(consts.ErrWorkflowClosing, err)
}

func (s *signalWithStartWorkflowSuite) TestSignalWorkflow_Dedup() {
ctx := context.Background()
currentWorkflowContext := api.NewWorkflowContext(
Expand All @@ -118,6 +140,7 @@ func (s *signalWithStartWorkflowSuite) TestSignalWorkflow_Dedup() {
)
request := s.randomRequest()

s.currentMutableState.EXPECT().IsWorkflowCloseAttempted().Return(false)
s.currentMutableState.EXPECT().IsSignalRequested(request.GetRequestId()).Return(true)

err := signalWorkflow(
Expand All @@ -139,6 +162,7 @@ func (s *signalWithStartWorkflowSuite) TestSignalWorkflow_NewWorkflowTask() {
request := s.randomRequest()
request.SkipGenerateWorkflowTask = false

s.currentMutableState.EXPECT().IsWorkflowCloseAttempted().Return(false)
s.currentMutableState.EXPECT().IsSignalRequested(request.GetRequestId()).Return(false)
s.currentMutableState.EXPECT().AddSignalRequested(request.GetRequestId())
s.currentMutableState.EXPECT().AddWorkflowExecutionSignaled(
Expand Down Expand Up @@ -170,6 +194,7 @@ func (s *signalWithStartWorkflowSuite) TestSignalWorkflow_NoNewWorkflowTask() {
)
request := s.randomRequest()

s.currentMutableState.EXPECT().IsWorkflowCloseAttempted().Return(false)
s.currentMutableState.EXPECT().IsSignalRequested(request.GetRequestId()).Return(false)
s.currentMutableState.EXPECT().AddSignalRequested(request.GetRequestId())
s.currentMutableState.EXPECT().AddWorkflowExecutionSignaled(
Expand Down
16 changes: 11 additions & 5 deletions service/history/api/signalworkflow/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,30 +69,36 @@ func Invoke(
}, nil
}

releaseFn := workflowContext.GetReleaseFn()
if !mutableState.IsWorkflowExecutionRunning() {
// in-memory mutable state is still clean, release the lock with nil error to prevent
// clearing and reloading mutable state
releaseFn(nil)
return nil, consts.ErrWorkflowCompleted
}

executionInfo := mutableState.GetExecutionInfo()

// Do not create workflow task when the workflow has first workflow task backoff and execution is not started yet
createWorkflowTask := !mutableState.IsWorkflowPendingOnWorkflowTaskBackoff() && !request.GetSkipGenerateWorkflowTask()

if err := api.ValidateSignal(
ctx,
shard,
mutableState,
request.GetInput().Size(),
"SignalWorkflowExecution",
); err != nil {
releaseFn(nil)
return nil, err
}

executionInfo := mutableState.GetExecutionInfo()

// Do not create workflow task when the workflow has first workflow task backoff and execution is not started yet
createWorkflowTask := !mutableState.IsWorkflowPendingOnWorkflowTaskBackoff() && !request.GetSkipGenerateWorkflowTask()

if childWorkflowOnly {
parentWorkflowID := executionInfo.ParentWorkflowId
parentRunID := executionInfo.ParentRunId
if parentExecution.GetWorkflowId() != parentWorkflowID ||
parentExecution.GetRunId() != parentRunID {
releaseFn(nil)
return nil, consts.ErrWorkflowParent
}
}
Expand Down
149 changes: 149 additions & 0 deletions service/history/api/signalworkflow/api_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
// The MIT License
//
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
//
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package signalworkflow

import (
"context"
"math/rand"
"testing"
"time"

commonpb "go.temporal.io/api/common/v1"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"go.temporal.io/api/workflowservice/v1"
"go.temporal.io/server/api/historyservice/v1"
"go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/common/clock"
"go.temporal.io/server/common/cluster"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/service/history/api"
"go.temporal.io/server/service/history/consts"
"go.temporal.io/server/service/history/shard"
"go.temporal.io/server/service/history/tests"
"go.temporal.io/server/service/history/workflow"
wcache "go.temporal.io/server/service/history/workflow/cache"
)

type (
signalWorkflowSuite struct {
suite.Suite
*require.Assertions

controller *gomock.Controller
shardContext *shard.MockContext
namespaceRegistry *namespace.MockRegistry

workflowCache *wcache.MockCache
workflowConsistencyChecker api.WorkflowConsistencyChecker

currentContext *workflow.MockContext
currentMutableState *workflow.MockMutableState
}
)

func TestSignalWorkflowSuite(t *testing.T) {
s := new(signalWorkflowSuite)
suite.Run(t, s)
}

func (s *signalWorkflowSuite) SetupSuite() {
rand.Seed(time.Now().UnixNano())
}

func (s *signalWorkflowSuite) TearDownSuite() {
}

func (s *signalWorkflowSuite) SetupTest() {
s.Assertions = require.New(s.T())

s.controller = gomock.NewController(s.T())
s.namespaceRegistry = namespace.NewMockRegistry(s.controller)
s.namespaceRegistry.EXPECT().GetNamespaceByID(tests.GlobalNamespaceEntry.ID()).Return(tests.GlobalNamespaceEntry, nil).AnyTimes()

s.shardContext = shard.NewMockContext(s.controller)
s.shardContext.EXPECT().GetConfig().Return(tests.NewDynamicConfig()).AnyTimes()
s.shardContext.EXPECT().GetLogger().Return(log.NewTestLogger()).AnyTimes()
s.shardContext.EXPECT().GetThrottledLogger().Return(log.NewTestLogger()).AnyTimes()
s.shardContext.EXPECT().GetMetricsHandler().Return(metrics.NoopMetricsHandler).AnyTimes()
s.shardContext.EXPECT().GetTimeSource().Return(clock.NewRealTimeSource()).AnyTimes()
s.shardContext.EXPECT().GetNamespaceRegistry().Return(s.namespaceRegistry).AnyTimes()
s.shardContext.EXPECT().GetClusterMetadata().Return(cluster.NewMetadataForTest(cluster.NewTestClusterMetadataConfig(true, true))).AnyTimes()

s.currentMutableState = workflow.NewMockMutableState(s.controller)
s.currentMutableState.EXPECT().GetNamespaceEntry().Return(tests.GlobalNamespaceEntry).AnyTimes()
s.currentMutableState.EXPECT().GetExecutionInfo().Return(&persistence.WorkflowExecutionInfo{
WorkflowId: tests.WorkflowID,
}).AnyTimes()
s.currentMutableState.EXPECT().GetExecutionState().Return(&persistence.WorkflowExecutionState{
RunId: tests.RunID,
}).AnyTimes()

s.currentContext = workflow.NewMockContext(s.controller)
s.currentContext.EXPECT().LoadMutableState(gomock.Any()).Return(s.currentMutableState, nil).AnyTimes()

s.workflowCache = wcache.NewMockCache(s.controller)
s.workflowCache.EXPECT().GetOrCreateWorkflowExecution(gomock.Any(), gomock.Any(), gomock.Any(), workflow.LockPriorityHigh).
Return(s.currentContext, wcache.NoopReleaseFn, nil).AnyTimes()

s.workflowConsistencyChecker = api.NewWorkflowConsistencyChecker(
s.shardContext,
s.workflowCache,
)
}

func (s *signalWorkflowSuite) TearDownTest() {
s.controller.Finish()
}

func (s *signalWorkflowSuite) TestSignalWorkflow_WorkflowCloseAttempted() {
s.currentMutableState.EXPECT().IsWorkflowExecutionRunning().Return(true)
s.currentMutableState.EXPECT().IsWorkflowCloseAttempted().Return(true)
s.currentMutableState.EXPECT().HasStartedWorkflowTask().Return(true)

resp, err := Invoke(
context.Background(),
&historyservice.SignalWorkflowExecutionRequest{
NamespaceId: tests.NamespaceID.String(),
SignalRequest: &workflowservice.SignalWorkflowExecutionRequest{
Namespace: tests.Namespace.String(),
WorkflowExecution: &commonpb.WorkflowExecution{
WorkflowId: tests.WorkflowID,
RunId: tests.RunID,
},
SignalName: "signal-name",
Input: nil,
},
},
s.shardContext,
s.workflowConsistencyChecker,
)
s.Nil(resp)
s.Error(consts.ErrWorkflowClosing, err)
}
2 changes: 2 additions & 0 deletions service/history/consts/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ var (
ErrDeserializingToken = serviceerror.NewInvalidArgument("error deserializing task token")
// ErrSignalsLimitExceeded is the error indicating limit reached for maximum number of signal events
ErrSignalsLimitExceeded = serviceerror.NewInvalidArgument("exceeded workflow execution limit for signal events")
// ErrWorkflowClosing is the error indicating requests to workflow got rejected due to workflow is closing
ErrWorkflowClosing = serviceerror.NewUnavailable("workflow operation rejected because workflow is closing")
// ErrEventsAterWorkflowFinish is the error indicating server error trying to write events after workflow finish event
ErrEventsAterWorkflowFinish = serviceerror.NewInternal("error validating last event being workflow finish event")
// ErrQueryEnteredInvalidState is error indicating query entered invalid state
Expand Down
2 changes: 1 addition & 1 deletion service/history/workflow/mutable_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,6 @@ type (
CloneToProto() *persistencespb.WorkflowMutableState
RetryActivity(ai *persistencespb.ActivityInfo, failure *failurepb.Failure) (enumspb.RetryState, error)
GetTransientWorkflowTaskInfo(workflowTask *WorkflowTaskInfo, identity string) *historyspb.TransientWorkflowTaskInfo
DeleteWorkflowTask()
DeleteSignalRequested(requestID string)
FlushBufferedEvents()
GetAcceptedWorkflowExecutionUpdateIDs(context.Context) []string
Expand Down Expand Up @@ -209,6 +208,7 @@ type (
HasPendingWorkflowTask() bool
HadOrHasWorkflowTask() bool
IsCancelRequested() bool
IsWorkflowCloseAttempted() bool
IsCurrentWorkflowGuaranteed() bool
IsSignalRequested(requestID string) bool
GetApproximatePersistedSize() int
Expand Down
14 changes: 9 additions & 5 deletions service/history/workflow/mutable_state_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,11 @@ type (
// record if a event has been applied to mutable state
// TODO: persist this to db
appliedEvents map[string]struct{}
// a flag indicating if workflow has attempted to close (complete/cancel/continue as new)
// but failed due to undelievered buffered events
// the flag will be unset whenever workflow task successfully completed, timedout or failed
// due to cause other than UnhandledCommand
workflowCloseAttempted bool

InsertTasks map[tasks.Category][]tasks.Task

Expand Down Expand Up @@ -1501,11 +1506,6 @@ func (ms *MutableStateImpl) HasAnyBufferedEvent(filter BufferedEventFilter) bool
return ms.hBuilder.HasAnyBufferedEvent(filter)
}

// DeleteWorkflowTask deletes a workflow task.
func (ms *MutableStateImpl) DeleteWorkflowTask() {
ms.workflowTaskManager.DeleteWorkflowTask()
}

// GetLastFirstEventIDTxnID returns last first event ID and corresponding transaction ID
// first event ID is the ID of a batch of events in a single history events record
func (ms *MutableStateImpl) GetLastFirstEventIDTxnID() (int64, int64) {
Expand Down Expand Up @@ -1543,6 +1543,10 @@ func (ms *MutableStateImpl) IsCancelRequested() bool {
return ms.executionInfo.CancelRequested
}

func (ms *MutableStateImpl) IsWorkflowCloseAttempted() bool {
return ms.workflowCloseAttempted
}

func (ms *MutableStateImpl) IsSignalRequested(
requestID string,
) bool {
Expand Down
26 changes: 14 additions & 12 deletions service/history/workflow/mutable_state_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 759aa2c

Please sign in to comment.