diff --git a/service/history/workflow/update/registry.go b/service/history/workflow/update/registry.go index 540aee3d62a..3ffcef58839 100644 --- a/service/history/workflow/update/registry.go +++ b/service/history/workflow/update/registry.go @@ -57,9 +57,10 @@ type ( // messages and returns them. ReadOutgoingMessages(startedEventID int64) ([]*protocolpb.Message, error) - // HasIncomplete returns true if the registry has Updates that have not - // yet completed or are not at least provisionally complete. - HasIncomplete() bool + // HasUndeliveredUpdates returns true if the registry has Updates that + // are not known to have been seen by user workflow code. In practice + // this means updates that have not yet been accepted or rejected. + HasUndeliveredUpdates() bool // HasOutgoing returns true if the registry has any Updates that want to // sent messages to a worker. @@ -135,7 +136,7 @@ func NewRegistry(store UpdateStore, opts ...regOpt) *RegistryImpl { opt(r) } - // need to eager load here so that HasIncomplete is correct. + // need to eager load here so that Len and admit are correct. for _, id := range store.GetAcceptedWorkflowExecutionUpdateIDs(context.Background()) { r.updates[id] = newAccepted(id, r.remover(id), withInstrumentation(&r.instrumentation)) } @@ -162,11 +163,11 @@ func (r *RegistryImpl) Find(ctx context.Context, id string) (*Update, bool) { return r.findLocked(ctx, id) } -func (r *RegistryImpl) HasIncomplete() bool { +func (r *RegistryImpl) HasUndeliveredUpdates() bool { r.mu.RLock() defer r.mu.RUnlock() for _, upd := range r.updates { - if !upd.completedOrProvisionallyCompleted() { + if !upd.hasBeenSeenByWorkflowExecution() { return true } } diff --git a/service/history/workflow/update/registry_test.go b/service/history/workflow/update/registry_test.go index c63270f60a7..c7eb3840c08 100644 --- a/service/history/workflow/update/registry_test.go +++ b/service/history/workflow/update/registry_test.go @@ -104,6 +104,7 @@ func TestFind(t *testing.T) { _, found, err := reg.FindOrCreate(ctx, updateID) require.NoError(t, err) require.False(t, found) + require.True(t, reg.HasUndeliveredUpdates()) _, ok = reg.Find(ctx, updateID) require.True(t, ok) @@ -194,7 +195,7 @@ func TestFindOrCreate(t *testing.T) { reg = update.NewRegistry(store) ) - require.True(t, reg.HasIncomplete()) + require.False(t, reg.HasUndeliveredUpdates()) t.Run("new update", func(t *testing.T) { updateID := "a completely new update ID" @@ -252,7 +253,7 @@ func TestUpdateRemovalFromRegistry(t *testing.T) { evStore := mockEventStore{Controller: &effects} meta := updatepb.Meta{UpdateId: storedAcceptedUpdateID} outcome := successOutcome(t, "success!") - require.True(t, reg.HasIncomplete(), "accepted update counts as incomplete") + require.False(t, reg.HasUndeliveredUpdates(), "accepted is not undelivered") err = upd.OnMessage( ctx, @@ -261,7 +262,7 @@ func TestUpdateRemovalFromRegistry(t *testing.T) { ) require.NoError(t, err) - require.False(t, reg.HasIncomplete(), "updates should be ProvisionallyCompleted") + require.False(t, reg.HasUndeliveredUpdates(), "updates should be ProvisionallyCompleted") require.Equal(t, 1, reg.Len(), "update should still be present in map") effects.Apply(ctx) require.Equal(t, 0, reg.Len(), "update should have been removed") diff --git a/service/history/workflow/update/update.go b/service/history/workflow/update/update.go index d0616dd7eae..7b8c8f8ebe2 100644 --- a/service/history/workflow/update/update.go +++ b/service/history/workflow/update/update.go @@ -342,8 +342,9 @@ func (u *Update) onResponseMsg( return nil } -func (u *Update) completedOrProvisionallyCompleted() bool { - return u.state.Matches(stateSet(stateCompleted | stateProvisionallyCompleted)) +func (u *Update) hasBeenSeenByWorkflowExecution() bool { + const unseen = stateAdmitted | stateProvisionallyRequested | stateRequested + return !u.state.Matches(stateSet(unseen)) } func (u *Update) hasOutgoingMessage() bool { diff --git a/service/history/workflowTaskHandler.go b/service/history/workflowTaskHandler.go index f4058ec39cf..fef4619ff52 100644 --- a/service/history/workflowTaskHandler.go +++ b/service/history/workflowTaskHandler.go @@ -70,7 +70,6 @@ type ( // internal state hasBufferedEvents bool - hasPendingUpdates bool workflowTaskFailedCause *workflowTaskFailedCause activityNotStartedCancelled bool newMutableState workflow.MutableState @@ -553,7 +552,7 @@ func (handler *workflowTaskHandlerImpl) handleCommandCompleteWorkflow( return handler.failWorkflowTask(enumspb.WORKFLOW_TASK_FAILED_CAUSE_UNHANDLED_COMMAND, nil) } - if handler.hasPendingUpdates { + if handler.updateRegistry.HasUndeliveredUpdates() { return handler.failWorkflowTask(enumspb.WORKFLOW_TASK_FAILED_CAUSE_UNHANDLED_UPDATE, nil) } @@ -615,7 +614,7 @@ func (handler *workflowTaskHandlerImpl) handleCommandFailWorkflow( return handler.failWorkflowTask(enumspb.WORKFLOW_TASK_FAILED_CAUSE_UNHANDLED_COMMAND, nil) } - if handler.hasPendingUpdates { + if handler.updateRegistry.HasUndeliveredUpdates() { return handler.failWorkflowTask(enumspb.WORKFLOW_TASK_FAILED_CAUSE_UNHANDLED_UPDATE, nil) } @@ -721,7 +720,7 @@ func (handler *workflowTaskHandlerImpl) handleCommandCancelWorkflow( return handler.failWorkflowTask(enumspb.WORKFLOW_TASK_FAILED_CAUSE_UNHANDLED_COMMAND, nil) } - if handler.hasPendingUpdates { + if handler.updateRegistry.HasUndeliveredUpdates() { return handler.failWorkflowTask(enumspb.WORKFLOW_TASK_FAILED_CAUSE_UNHANDLED_UPDATE, nil) } @@ -828,7 +827,7 @@ func (handler *workflowTaskHandlerImpl) handleCommandContinueAsNewWorkflow( return handler.failWorkflowTask(enumspb.WORKFLOW_TASK_FAILED_CAUSE_UNHANDLED_COMMAND, nil) } - if handler.hasPendingUpdates { + if handler.updateRegistry.HasUndeliveredUpdates() { return handler.failWorkflowTask(enumspb.WORKFLOW_TASK_FAILED_CAUSE_UNHANDLED_UPDATE, nil) } diff --git a/tests/update_workflow_test.go b/tests/update_workflow_test.go index 055a637592e..b69cc9d6b22 100644 --- a/tests/update_workflow_test.go +++ b/tests/update_workflow_test.go @@ -1885,7 +1885,7 @@ func (s *integrationSuite) TestUpdateWorkflow_FailWorkflowTask() { s.NoError(err) // Wait for UpdateWorkflowExecution to timeout. - // This will also remove update from registry and next WT won't have any updates attached to it. + // This does NOT remove update from registry <-updateResultCh // Try to accept update in workflow 3rd time: get error. Poller will fail WT. @@ -1895,7 +1895,7 @@ func (s *integrationSuite) TestUpdateWorkflow_FailWorkflowTask() { // Complete workflow. _, err = poller.PollAndProcessWorkflowTask(false, false) - s.NoError(err) + s.Error(err, "update was never successfully accepted so it prevents completion") s.Equal(5, wtHandlerCalls) s.Equal(5, msgHandlerCalls) @@ -1909,11 +1909,7 @@ func (s *integrationSuite) TestUpdateWorkflow_FailWorkflowTask() { 5 ActivityTaskScheduled 6 WorkflowTaskScheduled 7 WorkflowTaskStarted - 8 WorkflowTaskFailed - 9 WorkflowTaskScheduled - 10 WorkflowTaskStarted - 11 WorkflowTaskCompleted - 12 WorkflowExecutionCompleted`, events) + 8 WorkflowTaskFailed`, events) } func (s *integrationSuite) TestUpdateWorkflow_ConvertSpeculativeWorkflowTaskToNormal_BecauseOfBufferedSignal() {