Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent workflow close when updates are undelivered #4313

Merged
merged 2 commits into from
May 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions service/history/workflow/update/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,10 @@ type (
// messages and returns them.
ReadOutgoingMessages(startedEventID int64) ([]*protocolpb.Message, error)

// HasIncomplete returns true if the registry has Updates that have not
// yet completed or are not at least provisionally complete.
HasIncomplete() bool
// HasUndeliveredUpdates returns true if the registry has Updates that
// are not known to have been seen by user workflow code. In practice
// this means updates that have not yet been accepted or rejected.
HasUndeliveredUpdates() bool

// HasOutgoing returns true if the registry has any Updates that want to
// sent messages to a worker.
Expand Down Expand Up @@ -135,7 +136,7 @@ func NewRegistry(store UpdateStore, opts ...regOpt) *RegistryImpl {
opt(r)
}

// need to eager load here so that HasIncomplete is correct.
// need to eager load here so that Len and admit are correct.
for _, id := range store.GetAcceptedWorkflowExecutionUpdateIDs(context.Background()) {
r.updates[id] = newAccepted(id, r.remover(id), withInstrumentation(&r.instrumentation))
}
Expand All @@ -162,11 +163,11 @@ func (r *RegistryImpl) Find(ctx context.Context, id string) (*Update, bool) {
return r.findLocked(ctx, id)
}

func (r *RegistryImpl) HasIncomplete() bool {
func (r *RegistryImpl) HasUndeliveredUpdates() bool {
r.mu.RLock()
defer r.mu.RUnlock()
for _, upd := range r.updates {
if !upd.completedOrProvisionallyCompleted() {
if !upd.hasBeenSeenByWorkflowExecution() {
return true
}
}
Expand Down
7 changes: 4 additions & 3 deletions service/history/workflow/update/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ func TestFind(t *testing.T) {
_, found, err := reg.FindOrCreate(ctx, updateID)
require.NoError(t, err)
require.False(t, found)
require.True(t, reg.HasUndeliveredUpdates())

_, ok = reg.Find(ctx, updateID)
require.True(t, ok)
Expand Down Expand Up @@ -194,7 +195,7 @@ func TestFindOrCreate(t *testing.T) {
reg = update.NewRegistry(store)
)

require.True(t, reg.HasIncomplete())
require.False(t, reg.HasUndeliveredUpdates())

t.Run("new update", func(t *testing.T) {
updateID := "a completely new update ID"
Expand Down Expand Up @@ -252,7 +253,7 @@ func TestUpdateRemovalFromRegistry(t *testing.T) {
evStore := mockEventStore{Controller: &effects}
meta := updatepb.Meta{UpdateId: storedAcceptedUpdateID}
outcome := successOutcome(t, "success!")
require.True(t, reg.HasIncomplete(), "accepted update counts as incomplete")
require.False(t, reg.HasUndeliveredUpdates(), "accepted is not undelivered")

err = upd.OnMessage(
ctx,
Expand All @@ -261,7 +262,7 @@ func TestUpdateRemovalFromRegistry(t *testing.T) {
)

require.NoError(t, err)
require.False(t, reg.HasIncomplete(), "updates should be ProvisionallyCompleted")
require.False(t, reg.HasUndeliveredUpdates(), "updates should be ProvisionallyCompleted")
require.Equal(t, 1, reg.Len(), "update should still be present in map")
effects.Apply(ctx)
require.Equal(t, 0, reg.Len(), "update should have been removed")
Expand Down
5 changes: 3 additions & 2 deletions service/history/workflow/update/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,8 +342,9 @@ func (u *Update) onResponseMsg(
return nil
}

func (u *Update) completedOrProvisionallyCompleted() bool {
return u.state.Matches(stateSet(stateCompleted | stateProvisionallyCompleted))
func (u *Update) hasBeenSeenByWorkflowExecution() bool {
const unseen = stateAdmitted | stateProvisionallyRequested | stateRequested
return !u.state.Matches(stateSet(unseen))
}

func (u *Update) hasOutgoingMessage() bool {
Expand Down
9 changes: 4 additions & 5 deletions service/history/workflowTaskHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ type (

// internal state
hasBufferedEvents bool
hasPendingUpdates bool
workflowTaskFailedCause *workflowTaskFailedCause
activityNotStartedCancelled bool
newMutableState workflow.MutableState
Expand Down Expand Up @@ -553,7 +552,7 @@ func (handler *workflowTaskHandlerImpl) handleCommandCompleteWorkflow(
return handler.failWorkflowTask(enumspb.WORKFLOW_TASK_FAILED_CAUSE_UNHANDLED_COMMAND, nil)
}

if handler.hasPendingUpdates {
if handler.updateRegistry.HasUndeliveredUpdates() {
return handler.failWorkflowTask(enumspb.WORKFLOW_TASK_FAILED_CAUSE_UNHANDLED_UPDATE, nil)
}

Expand Down Expand Up @@ -615,7 +614,7 @@ func (handler *workflowTaskHandlerImpl) handleCommandFailWorkflow(
return handler.failWorkflowTask(enumspb.WORKFLOW_TASK_FAILED_CAUSE_UNHANDLED_COMMAND, nil)
}

if handler.hasPendingUpdates {
if handler.updateRegistry.HasUndeliveredUpdates() {
return handler.failWorkflowTask(enumspb.WORKFLOW_TASK_FAILED_CAUSE_UNHANDLED_UPDATE, nil)
}

Expand Down Expand Up @@ -721,7 +720,7 @@ func (handler *workflowTaskHandlerImpl) handleCommandCancelWorkflow(
return handler.failWorkflowTask(enumspb.WORKFLOW_TASK_FAILED_CAUSE_UNHANDLED_COMMAND, nil)
}

if handler.hasPendingUpdates {
if handler.updateRegistry.HasUndeliveredUpdates() {
return handler.failWorkflowTask(enumspb.WORKFLOW_TASK_FAILED_CAUSE_UNHANDLED_UPDATE, nil)
}

Expand Down Expand Up @@ -828,7 +827,7 @@ func (handler *workflowTaskHandlerImpl) handleCommandContinueAsNewWorkflow(
return handler.failWorkflowTask(enumspb.WORKFLOW_TASK_FAILED_CAUSE_UNHANDLED_COMMAND, nil)
}

if handler.hasPendingUpdates {
if handler.updateRegistry.HasUndeliveredUpdates() {
return handler.failWorkflowTask(enumspb.WORKFLOW_TASK_FAILED_CAUSE_UNHANDLED_UPDATE, nil)
}

Expand Down
10 changes: 3 additions & 7 deletions tests/update_workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1885,7 +1885,7 @@ func (s *integrationSuite) TestUpdateWorkflow_FailWorkflowTask() {
s.NoError(err)

// Wait for UpdateWorkflowExecution to timeout.
// This will also remove update from registry and next WT won't have any updates attached to it.
// This does NOT remove update from registry
<-updateResultCh

// Try to accept update in workflow 3rd time: get error. Poller will fail WT.
Expand All @@ -1895,7 +1895,7 @@ func (s *integrationSuite) TestUpdateWorkflow_FailWorkflowTask() {

// Complete workflow.
_, err = poller.PollAndProcessWorkflowTask(false, false)
s.NoError(err)
s.Error(err, "update was never successfully accepted so it prevents completion")

s.Equal(5, wtHandlerCalls)
s.Equal(5, msgHandlerCalls)
Expand All @@ -1909,11 +1909,7 @@ func (s *integrationSuite) TestUpdateWorkflow_FailWorkflowTask() {
5 ActivityTaskScheduled
6 WorkflowTaskScheduled
7 WorkflowTaskStarted
8 WorkflowTaskFailed
9 WorkflowTaskScheduled
10 WorkflowTaskStarted
11 WorkflowTaskCompleted
12 WorkflowExecutionCompleted`, events)
8 WorkflowTaskFailed`, events)
}

func (s *integrationSuite) TestUpdateWorkflow_ConvertSpeculativeWorkflowTaskToNormal_BecauseOfBufferedSignal() {
Expand Down