diff --git a/service/history/api/updateworkflow/api.go b/service/history/api/updateworkflow/api.go index f065714d19f..c5b1fee5c69 100644 --- a/service/history/api/updateworkflow/api.go +++ b/service/history/api/updateworkflow/api.go @@ -75,7 +75,7 @@ func Invoke( return nil, consts.ErrWorkflowExecutionNotFound } - upd, duplicate, removeFn := ms.UpdateRegistry().Add(req.GetRequest().GetRequest()) + upd, duplicate, removeFn := weCtx.GetContext().UpdateRegistry().Add(req.GetRequest().GetRequest()) if removeFn != nil { defer removeFn() } diff --git a/service/history/workflow/cache/cache_test.go b/service/history/workflow/cache/cache_test.go index 78cbc44b5c4..d38612e0db7 100644 --- a/service/history/workflow/cache/cache_test.go +++ b/service/history/workflow/cache/cache_test.go @@ -45,7 +45,6 @@ import ( "go.temporal.io/server/service/history/shard" "go.temporal.io/server/service/history/tests" "go.temporal.io/server/service/history/workflow" - "go.temporal.io/server/service/history/workflow/update" ) type ( @@ -226,7 +225,6 @@ func (s *workflowCacheSuite) TestHistoryCacheClear() { s.NotNil(ctx.(*workflow.ContextImpl).MutableState) mock.EXPECT().GetQueryRegistry().Return(workflow.NewQueryRegistry()) - mock.EXPECT().UpdateRegistry().Return(update.NewRegistry()) release(errors.New("some random error message")) // since last time, the release function receive a non-nil error @@ -279,7 +277,6 @@ func (s *workflowCacheSuite) TestHistoryCacheConcurrentAccess_Release() { // all we need is a fake MutableState mock := workflow.NewMockMutableState(s.controller) mock.EXPECT().GetQueryRegistry().Return(workflow.NewQueryRegistry()) - mock.EXPECT().UpdateRegistry().Return(update.NewRegistry()) ctx.(*workflow.ContextImpl).MutableState = mock release(errors.New("some random error message")) } diff --git a/service/history/workflow/context.go b/service/history/workflow/context.go index a9d39f82a62..344f31b9f6c 100644 --- a/service/history/workflow/context.go +++ b/service/history/workflow/context.go @@ -54,6 +54,7 @@ import ( "go.temporal.io/server/service/history/consts" "go.temporal.io/server/service/history/shard" "go.temporal.io/server/service/history/tasks" + "go.temporal.io/server/service/history/workflow/update" ) const ( @@ -144,6 +145,8 @@ type ( ctx context.Context, now time.Time, ) error + // TODO (alex-update): move this from workflow context. + UpdateRegistry() update.Registry } ) @@ -158,9 +161,10 @@ type ( config *configs.Config transaction Transaction - mutex locks.PriorityMutex - MutableState MutableState - stats *persistencespb.ExecutionStats + mutex locks.PriorityMutex + MutableState MutableState + updateRegistry update.Registry + stats *persistencespb.ExecutionStats } ) @@ -180,6 +184,7 @@ func NewContext( timeSource: shard.GetTimeSource(), config: shard.GetConfig(), mutex: locks.NewPriorityMutex(), + updateRegistry: update.NewRegistry(), transaction: NewTransaction(shard), stats: &persistencespb.ExecutionStats{ HistorySize: 0, @@ -218,10 +223,6 @@ func (c *ContextImpl) Clear() { c.metricsHandler.Counter(metrics.WorkflowContextCleared.GetMetricName()).Record(1) if c.MutableState != nil { c.MutableState.GetQueryRegistry().Clear() - // Updates are stored in-memory in mutable state. When mutable state is unload due to the error, - // all pending updates must be cleared to notify UpdateWorkflowExecution API callers immediately - // without waiting for timeout. - c.MutableState.UpdateRegistry().Clear() } c.MutableState = nil c.stats = &persistencespb.ExecutionStats{ @@ -872,6 +873,10 @@ func (c *ContextImpl) ReapplyEvents( return err } +func (c *ContextImpl) UpdateRegistry() update.Registry { + return c.updateRegistry +} + // Returns true if execution is forced terminated func (c *ContextImpl) enforceSizeCheck( ctx context.Context, diff --git a/service/history/workflow/context_mock.go b/service/history/workflow/context_mock.go index 6ce51318a96..7bf0c068490 100644 --- a/service/history/workflow/context_mock.go +++ b/service/history/workflow/context_mock.go @@ -37,6 +37,7 @@ import ( v1 "go.temporal.io/server/api/persistence/v1" definition "go.temporal.io/server/common/definition" persistence "go.temporal.io/server/common/persistence" + update "go.temporal.io/server/service/history/workflow/update" ) // MockContext is a mock of Context interface. @@ -241,6 +242,20 @@ func (mr *MockContextMockRecorder) Unlock(caller interface{}) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Unlock", reflect.TypeOf((*MockContext)(nil).Unlock), caller) } +// UpdateRegistry mocks base method. +func (m *MockContext) UpdateRegistry() update.Registry { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "UpdateRegistry") + ret0, _ := ret[0].(update.Registry) + return ret0 +} + +// UpdateRegistry indicates an expected call of UpdateRegistry. +func (mr *MockContextMockRecorder) UpdateRegistry() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateRegistry", reflect.TypeOf((*MockContext)(nil).UpdateRegistry)) +} + // UpdateWorkflowExecutionAsActive mocks base method. func (m *MockContext) UpdateWorkflowExecutionAsActive(ctx context.Context, now time.Time) error { m.ctrl.T.Helper() diff --git a/service/history/workflow/mutable_state.go b/service/history/workflow/mutable_state.go index d25e75a0f23..3c374d11256 100644 --- a/service/history/workflow/mutable_state.go +++ b/service/history/workflow/mutable_state.go @@ -49,7 +49,6 @@ import ( "go.temporal.io/server/common/namespace" "go.temporal.io/server/common/persistence" "go.temporal.io/server/service/history/tasks" - "go.temporal.io/server/service/history/workflow/update" ) type TransactionPolicy int @@ -198,8 +197,6 @@ type ( GetWorkflowStateStatus() (enumsspb.WorkflowExecutionState, enumspb.WorkflowExecutionStatus) GetQueryRegistry() QueryRegistry IsTransientWorkflowTask() bool - // TODO (alex-update): move this out from mutable state. - UpdateRegistry() update.Registry ClearTransientWorkflowTask() error HasBufferedEvents() bool HasInFlightWorkflowTask() bool diff --git a/service/history/workflow/mutable_state_impl.go b/service/history/workflow/mutable_state_impl.go index 69cc630db75..c5b5620a647 100644 --- a/service/history/workflow/mutable_state_impl.go +++ b/service/history/workflow/mutable_state_impl.go @@ -70,7 +70,6 @@ import ( "go.temporal.io/server/service/history/events" "go.temporal.io/server/service/history/shard" "go.temporal.io/server/service/history/tasks" - "go.temporal.io/server/service/history/workflow/update" ) const ( @@ -171,7 +170,6 @@ type ( taskGenerator TaskGenerator workflowTaskManager *workflowTaskStateMachine QueryRegistry QueryRegistry - updateRegistry update.Registry shard shard.Context clusterMetadata cluster.Metadata @@ -230,8 +228,7 @@ func NewMutableState( appliedEvents: make(map[string]struct{}), InsertTasks: make(map[tasks.Category][]tasks.Task), - QueryRegistry: NewQueryRegistry(), - updateRegistry: update.NewRegistry(), + QueryRegistry: NewQueryRegistry(), shard: shard, clusterMetadata: shard.GetClusterMetadata(), @@ -636,10 +633,6 @@ func (ms *MutableStateImpl) GetQueryRegistry() QueryRegistry { return ms.QueryRegistry } -func (ms *MutableStateImpl) UpdateRegistry() update.Registry { - return ms.updateRegistry -} - func (ms *MutableStateImpl) GetActivityScheduledEvent( ctx context.Context, scheduledEventID int64, diff --git a/service/history/workflow/mutable_state_mock.go b/service/history/workflow/mutable_state_mock.go index dae33054762..465a5a07c36 100644 --- a/service/history/workflow/mutable_state_mock.go +++ b/service/history/workflow/mutable_state_mock.go @@ -52,7 +52,6 @@ import ( namespace "go.temporal.io/server/common/namespace" persistence "go.temporal.io/server/common/persistence" tasks "go.temporal.io/server/service/history/tasks" - update "go.temporal.io/server/service/history/workflow/update" ) // MockMutableState is a mock of MutableState interface. @@ -2549,20 +2548,6 @@ func (mr *MockMutableStateMockRecorder) UpdateDuplicatedResource(resourceDedupKe return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateDuplicatedResource", reflect.TypeOf((*MockMutableState)(nil).UpdateDuplicatedResource), resourceDedupKey) } -// UpdateRegistry mocks base method. -func (m *MockMutableState) UpdateRegistry() update.Registry { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "UpdateRegistry") - ret0, _ := ret[0].(update.Registry) - return ret0 -} - -// UpdateRegistry indicates an expected call of UpdateRegistry. -func (mr *MockMutableStateMockRecorder) UpdateRegistry() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateRegistry", reflect.TypeOf((*MockMutableState)(nil).UpdateRegistry)) -} - // UpdateUserTimer mocks base method. func (m *MockMutableState) UpdateUserTimer(arg0 *v112.TimerInfo) error { m.ctrl.T.Helper() diff --git a/service/history/workflow/update/registry.go b/service/history/workflow/update/registry.go index a59565d0d94..3e45b533cdd 100644 --- a/service/history/workflow/update/registry.go +++ b/service/history/workflow/update/registry.go @@ -28,7 +28,6 @@ import ( "sync" "github.com/gogo/protobuf/types" - failurepb "go.temporal.io/api/failure/v1" protocolpb "go.temporal.io/api/protocol/v1" updatepb "go.temporal.io/api/update/v1" ) @@ -41,8 +40,6 @@ type ( HasPending(filterMessages []*protocolpb.Message) bool ProcessIncomingMessages(messages []*protocolpb.Message) error - - Clear() } Duplicate bool @@ -170,15 +167,6 @@ func (r *RegistryImpl) ProcessIncomingMessages(messages []*protocolpb.Message) e return nil } -func (r *RegistryImpl) Clear() { - r.Lock() - defer r.Unlock() - for _, upd := range r.updates { - upd.sendReject(r.clearFailure()) - } - r.updates = make(map[string]*Update) -} - func (r *RegistryImpl) getPendingUpdateNoLock(protocolInstanceID string) *Update { if upd, ok := r.updates[protocolInstanceID]; ok && upd.state == statePending { return upd @@ -193,17 +181,6 @@ func (r *RegistryImpl) getAcceptedUpdateNoLock(protocolInstanceID string) *Updat return nil } -func (r *RegistryImpl) clearFailure() *failurepb.Failure { - return &failurepb.Failure{ - Message: "update cleared, please retry", - FailureInfo: &failurepb.Failure_ServerFailureInfo{ - ServerFailureInfo: &failurepb.ServerFailureInfo{ - NonRetryable: false, - }, - }, - } -} - func (r *RegistryImpl) remove(id string) { r.Lock() defer r.Unlock() diff --git a/service/history/workflow/workflow_task_state_machine.go b/service/history/workflow/workflow_task_state_machine.go index 1b0c9efc7b3..f00341ccd4c 100644 --- a/service/history/workflow/workflow_task_state_machine.go +++ b/service/history/workflow/workflow_task_state_machine.go @@ -578,7 +578,6 @@ func (m *workflowTaskStateMachine) AddWorkflowTaskFailedEvent( workflowTask.SuggestContinueAsNew, workflowTask.HistorySizeBytes, ) - // TODO (alex-update): Do we need to call next line here same as in AddWorkflowTaskCompletedEvent? m.ms.hBuilder.FlushAndCreateNewBatch() workflowTask.StartedEventID = startedEvent.GetEventId() } @@ -638,9 +637,11 @@ func (m *workflowTaskStateMachine) FailWorkflowTask( incrementAttempt bool, ) { // Increment attempts only if workflow task is failing on non-sticky task queue. - // If it was stick task queue, clear stickiness first and try again before creating transient workflow tas. - incrementAttempt = incrementAttempt && !m.ms.IsStickyTaskQueueEnabled() - m.ms.ClearStickyness() + // If it was stick task queue, clear stickiness first and try again before creating transient workflow task. + if m.ms.IsStickyTaskQueueEnabled() { + incrementAttempt = false + m.ms.ClearStickyness() + } failWorkflowTaskInfo := &WorkflowTaskInfo{ Version: common.EmptyVersion, diff --git a/service/history/workflowTaskHandlerCallbacks.go b/service/history/workflowTaskHandlerCallbacks.go index fb048eed51e..84aa2ea7980 100644 --- a/service/history/workflowTaskHandlerCallbacks.go +++ b/service/history/workflowTaskHandlerCallbacks.go @@ -56,6 +56,7 @@ import ( "go.temporal.io/server/service/history/consts" "go.temporal.io/server/service/history/shard" "go.temporal.io/server/service/history/workflow" + "go.temporal.io/server/service/history/workflow/update" ) type ( @@ -212,7 +213,7 @@ func (handler *workflowTaskHandlerCallbacksImpl) handleWorkflowTaskStarted( if workflowTask.StartedEventID != common.EmptyEventID { // If workflow task is started as part of the current request scope then return a positive response if workflowTask.RequestID == requestID { - resp, err = handler.createRecordWorkflowTaskStartedResponse(mutableState, workflowTask, req.PollRequest.GetIdentity()) + resp, err = handler.createRecordWorkflowTaskStartedResponse(mutableState, workflowContext.GetContext().UpdateRegistry(), workflowTask, req.PollRequest.GetIdentity()) if err != nil { return nil, err } @@ -249,7 +250,7 @@ func (handler *workflowTaskHandlerCallbacksImpl) handleWorkflowTaskStarted( metrics.TaskQueueTypeTag(enumspb.TASK_QUEUE_TYPE_WORKFLOW), ) - resp, err = handler.createRecordWorkflowTaskStartedResponse(mutableState, workflowTask, req.PollRequest.GetIdentity()) + resp, err = handler.createRecordWorkflowTaskStartedResponse(mutableState, workflowContext.GetContext().UpdateRegistry(), workflowTask, req.PollRequest.GetIdentity()) if err != nil { return nil, err } @@ -450,7 +451,7 @@ func (handler *workflowTaskHandlerCallbacksImpl) handleWorkflowTaskCompleted( newMutableState workflow.MutableState ) // hasPendingUpdates indicates if there are more pending updates (excluding those which are accepted/rejected by this workflow task). - hasPendingUpdates := ms.UpdateRegistry().HasPending(request.GetMessages()) + hasPendingUpdates := weContext.UpdateRegistry().HasPending(request.GetMessages()) hasBufferedEvents := ms.HasBufferedEvents() if err := namespaceEntry.VerifyBinaryChecksum(request.GetBinaryChecksum()); err != nil { wtFailedCause = newWorkflowTaskFailedCause( @@ -671,7 +672,7 @@ func (handler *workflowTaskHandlerCallbacksImpl) handleWorkflowTaskCompleted( } // Send update results to gRPC API callers. - err = ms.UpdateRegistry().ProcessIncomingMessages(req.GetCompleteRequest().GetMessages()) + err = weContext.UpdateRegistry().ProcessIncomingMessages(req.GetCompleteRequest().GetMessages()) if err != nil { return nil, err } @@ -690,7 +691,7 @@ func (handler *workflowTaskHandlerCallbacksImpl) handleWorkflowTaskCompleted( resp := &historyservice.RespondWorkflowTaskCompletedResponse{} if request.GetReturnNewWorkflowTask() && createNewWorkflowTask { workflowTask, _ := ms.GetWorkflowTaskInfo(newWorkflowTaskScheduledEventID) - resp.StartedResponse, err = handler.createRecordWorkflowTaskStartedResponse(ms, workflowTask, request.GetIdentity()) + resp.StartedResponse, err = handler.createRecordWorkflowTaskStartedResponse(ms, weContext.UpdateRegistry(), workflowTask, request.GetIdentity()) if err != nil { return nil, err } @@ -754,6 +755,7 @@ func (handler *workflowTaskHandlerCallbacksImpl) verifyFirstWorkflowTaskSchedule func (handler *workflowTaskHandlerCallbacksImpl) createRecordWorkflowTaskStartedResponse( ms workflow.MutableState, + updateRegistry update.Registry, workflowTask *workflow.WorkflowTaskInfo, identity string, ) (*historyservice.RecordWorkflowTaskStartedResponse, error) { @@ -801,7 +803,7 @@ func (handler *workflowTaskHandlerCallbacksImpl) createRecordWorkflowTaskStarted } } - response.Messages, err = ms.UpdateRegistry().CreateOutgoingMessages(workflowTask.StartedEventID) + response.Messages, err = updateRegistry.CreateOutgoingMessages(workflowTask.StartedEventID) if err != nil { return nil, err } diff --git a/tests/update_workflow_test.go b/tests/update_workflow_test.go index 20325dde2d3..eaf1f11975c 100644 --- a/tests/update_workflow_test.go +++ b/tests/update_workflow_test.go @@ -25,6 +25,7 @@ package tests import ( + "context" "errors" "strconv" "testing" @@ -41,7 +42,7 @@ import ( updatepb "go.temporal.io/api/update/v1" "go.temporal.io/api/workflowservice/v1" - "go.temporal.io/server/common/debug" + "go.temporal.io/server/common" "go.temporal.io/server/common/payloads" "go.temporal.io/server/common/primitives/timestamp" ) @@ -1328,7 +1329,7 @@ func (s *integrationSuite) TestUpdateWorkflow_FirstWorkflowTask_1stAccept_2ndRej 17 WorkflowExecutionCompleted`, events) } -func (s *integrationSuite) TestUpdateWorkflow_FirstWorkflowTask_BadAcceptMessage() { +func (s *integrationSuite) TestUpdateWorkflow_FailWorkflowTask() { id := "integration-update-workflow-test-7" wt := "integration-update-workflow-test-7-type" tq := "integration-update-workflow-test-7-task-queue" @@ -1342,8 +1343,6 @@ func (s *integrationSuite) TestUpdateWorkflow_FirstWorkflowTask_BadAcceptMessage WorkflowId: id, WorkflowType: workflowType, TaskQueue: taskQueue, - // Some short but reasonable timeout because there is a wait for it in the test. - WorkflowTaskTimeout: timestamp.DurationPtr(1 * time.Second * debug.TimeoutMultiplier), } startResp, err := s.engine.StartWorkflowExecution(NewContext(), request) @@ -1382,6 +1381,9 @@ func (s *integrationSuite) TestUpdateWorkflow_FirstWorkflowTask_BadAcceptMessage s.Fail("should not be called because messageHandler returns error") return nil, nil case 4: + s.Fail("should not be called because messageHandler returns error") + return nil, nil + case 5: s.EqualHistory(` 1 WorkflowExecutionStarted 2 WorkflowTaskScheduled @@ -1429,13 +1431,21 @@ func (s *integrationSuite) TestUpdateWorkflow_FirstWorkflowTask_BadAcceptMessage }, }, nil case 3: - // 2nd attempt doesn't have any updates attached to it. - s.Empty(task.Messages) + // 2nd attempt has same updates attached to it. + updRequestMsg := task.Messages[0] + s.EqualValues(9, updRequestMsg.GetEventId()) wtHandlerCalls++ // because it won't be called for case 3 but counter should be in sync. // Fail WT one more time. Although 2nd attempt is normal WT, it is also transient and shouldn't appear in the history. // Returning error will cause the poller to fail WT. return nil, errors.New("malformed request") case 4: + // 3rd attempt doesn't have any updates attached to it because UpdateWorkflowExecution call has timed out. + s.Empty(task.Messages) + wtHandlerCalls++ // because it won't be called for case 4 but counter should be in sync. + // Fail WT one more time. This is transient WT and shouldn't appear in the history. + // Returning error will cause the poller to fail WT. + return nil, errors.New("malformed request") + case 5: return nil, nil default: s.Failf("msgHandler called too many times", "msgHandler shouldn't be called %d times", msgHandlerCalls) @@ -1457,13 +1467,11 @@ func (s *integrationSuite) TestUpdateWorkflow_FirstWorkflowTask_BadAcceptMessage _, err = poller.PollAndProcessWorkflowTask(true, false) s.NoError(err) - type UpdateResult struct { - Response *workflowservice.UpdateWorkflowExecutionResponse - Err error - } - updateResultCh := make(chan UpdateResult) + updateResultCh := make(chan struct{}) updateWorkflowFn := func() { - updateResponse, err1 := s.engine.UpdateWorkflowExecution(NewContext(), &workflowservice.UpdateWorkflowExecutionRequest{ + ctx1, cancel := context.WithTimeout(NewContext(), 2*time.Second) + defer cancel() + updateResponse, err1 := s.engine.UpdateWorkflowExecution(ctx1, &workflowservice.UpdateWorkflowExecutionRequest{ Namespace: s.namespace, WorkflowExecution: we, Request: &updatepb.Request{ @@ -1474,11 +1482,13 @@ func (s *integrationSuite) TestUpdateWorkflow_FirstWorkflowTask_BadAcceptMessage }, }, }) - s.NoError(err1) - updateResultCh <- UpdateResult{Response: updateResponse, Err: err1} + s.Error(err1) + // UpdateWorkflowExecution is timed out after 2 seconds. + s.True(common.IsContextDeadlineExceededErr(err1)) + s.Nil(updateResponse) + updateResultCh <- struct{}{} } go updateWorkflowFn() - time.Sleep(500 * time.Millisecond) // This is to make sure that update gets to the server. // Try to accept update in workflow: get malformed response. _, err = poller.PollAndProcessWorkflowTask(false, false) @@ -1486,22 +1496,26 @@ func (s *integrationSuite) TestUpdateWorkflow_FirstWorkflowTask_BadAcceptMessage s.Equal(err.Error(), "BadUpdateWorkflowExecutionMessage: accepted_request is not set on update.Acceptance message body.") // New normal (but transient) WT will be created but not returned. - updateResult := <-updateResultCh - s.NoError(updateResult.Err) - // TODO (alex-update): this is wrong. Caller shouldn't get this error if WT failed. - s.Equal("update cleared, please retry", updateResult.Response.GetOutcome().GetFailure().GetMessage()) - // Try to accept update in workflow 2nd time: get error. Poller will fail WT. _, err = poller.PollAndProcessWorkflowTask(false, false) // The error is from RespondWorkflowTaskFailed, which should go w/o error. s.NoError(err) + // Wait for UpdateWorkflowExecution to timeout. + // This will also remove update from registry and next WT won't have any updates attached to it. + <-updateResultCh + + // Try to accept update in workflow 3rd time: get error. Poller will fail WT. + _, err = poller.PollAndProcessWorkflowTask(false, false) + // The error is from RespondWorkflowTaskFailed, which should go w/o error. + s.NoError(err) + // Complete workflow. _, err = poller.PollAndProcessWorkflowTask(false, false) s.NoError(err) - s.Equal(4, wtHandlerCalls) - s.Equal(4, msgHandlerCalls) + s.Equal(5, wtHandlerCalls) + s.Equal(5, msgHandlerCalls) events := s.getHistory(s.namespace, we) s.EqualHistoryEvents(`