diff --git a/common/dynamicconfig/constants.go b/common/dynamicconfig/constants.go index 288598ad72c..d96d27ecf9a 100644 --- a/common/dynamicconfig/constants.go +++ b/common/dynamicconfig/constants.go @@ -2072,6 +2072,11 @@ the number of children greater than or equal to this threshold`, time.Minute*10, `WorkflowTaskRetryMaxInterval is the maximum interval added to a workflow task's startToClose timeout for slowing down retry`, ) + DiscardSpeculativeWorkflowTaskMaximumEventsCount = NewGlobalIntSetting( + "history.discardSpeculativeWorkflowTaskMaximumEventsCount", + 10, + `If speculative workflow task shipped more than DiscardSpeculativeWorkflowTaskMaximumEventsCount events, it can't be discarded`, + ) DefaultWorkflowTaskTimeout = NewNamespaceDurationSetting( "history.defaultWorkflowTaskTimeout", primitives.DefaultWorkflowTaskTimeout, diff --git a/docs/architecture/speculative-workflow-task.md b/docs/architecture/speculative-workflow-task.md index 0561827df33..e109d223313 100644 --- a/docs/architecture/speculative-workflow-task.md +++ b/docs/architecture/speculative-workflow-task.md @@ -39,7 +39,7 @@ event. - its `WorkflowTaskInfo.Type` is `WORKFLOW_TASK_TYPE_SPECULATIVE` Similar to a CPU's *speculative execution* (which gives this Workflow Task its name) where a branch -execution can be thrown away, a speculative Workflow Task can be dropped as if it never existed. +execution can be thrown away, a speculative Workflow Task can be discarded as if it never existed. The overall strategy is to optimistically assume the speculative Workflow Task will go through, but if anything goes wrong, give up quickly and convert the speculative Workflow Task to a normal one. @@ -98,23 +98,35 @@ a new speculative Workflow Task can be created after the first one is lost, but try to complete the first one. To prevent this, `StartedTime` was added to the Workflow Task token and if it doesn't match the start time in mutable state, the Workflow Task can't be completed. -### Persist or Drop +### Persist or Discard While completing a speculative Workflow Task, the server makes a decision to either write the -speculative events followed by a `WorkflowTaskCompleted` event - or drop the speculative events and -make the speculative Workflow Task disappear. The latter can only happen, if the server knows that +speculative events followed by a `WorkflowTaskCompleted` event - or discard the speculative events and +make the speculative Workflow Task disappear. The latter can only happen if the server knows that this Workflow Task didn't change the Workflow state. Currently, the conditions are (check `skipWorkflowTaskCompletedEvent()` func): - response doesn't have any commands, - response has only Update rejection messages. -> #### TODO -> There is one more special case: when the speculative Workflow Task contained other events -> (e.g. activity scheduled), then it can't be dropped because they would need to be sent *again* in the -> next Workflow Task - but older SDK versions don't support receiving the same events twice. A -> compatibility flag is needed to safely allow SDKs to opt-in to this optimization. +The speculative Workflow Task can also ship other events (e.g. `ActivityTaskScheduled` or `TimerStarted`) +that were generated from previous Workflow Task commands (also known as command-events). +Unfortunately, older SDKs don't support receiving same events more +than once. If SDK supports this, it will set `DiscardSpeculativeWorkflowTaskWithEvents` flag to `true` +and the server will discard speculative Workflow Task even if it had events. These events can be shipped +multiply times if Updates keep being rejected. To prevent shipping a large set of events to the worker over +and over again, the server persists speculative Workflow Task if a number of events exceed +`DiscardSpeculativeWorkflowTaskMaximumEventsCount` threshold. -When the server decides to drop a speculative Workflow Task, it needs to communicate this decision to -the worker - because the SDK needs to roll back to a previous history event and drop all events after +> #### NOTE +> This is possible because of an important server invariant: the Workflow history can only end with: +> - Workflow Task event (Scheduled, Started, Completed, Failed, Timeout), +> +> or +> - command-event, generated from previous Workflow Task command. +> All these events don't change the Workflow state on the worker side. This invariant must not be +> broken by other features. + +When the server decides to discard a speculative Workflow Task, it needs to communicate this decision to +the worker - because the SDK needs to roll back to a previous history event and discard all events after that one. To do that, the server will set the `ResetHistoryEventId` field on the `RespondWorkflowTaskCompletedResponse` to the mutable state's `LastCompletedWorkflowTaskStartedEventId` (since the SDK uses `WorkflowTaskStartedEventID` as its history checkpoint). @@ -128,7 +140,7 @@ rejection or acceptance message. If it does happen, the server will persist all events and create a new Workflow Task as normal. > #### NOTE -> This is a design decision, which could be changed later: instead, the server could drop the +> This is a design decision, which could be changed later: instead, the server could discard the > speculative Workflow Task when it heartbeats and create a new speculative Workflow Task. No > new events would be added to the history - but heartbeats would not be visible anymore. diff --git a/docs/architecture/workflow-update.md b/docs/architecture/workflow-update.md index 5bb931a5b1b..5add511648f 100644 --- a/docs/architecture/workflow-update.md +++ b/docs/architecture/workflow-update.md @@ -214,7 +214,7 @@ Update to the worker. This Workflow Task is always speculative, unless there is already-scheduled-but-not-yet-started Workflow Task present. Later, when handling a worker response in the `RespondWorkflowTaskCompleted` API handler, the server -might write or drop events for this Workflow Task. Read +might write or discard events for this Workflow Task. Read [Speculative Workflow Tasks](./speculative-workflow-task.md) for more details. ### Lifecycle Stage diff --git a/service/history/api/respondworkflowtaskcompleted/api_test.go b/service/history/api/respondworkflowtaskcompleted/api_test.go index 1215ccb484c..210b31c1890 100644 --- a/service/history/api/respondworkflowtaskcompleted/api_test.go +++ b/service/history/api/respondworkflowtaskcompleted/api_test.go @@ -28,11 +28,13 @@ import ( "context" "errors" "maps" + "strconv" "testing" "time" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + commandpb "go.temporal.io/api/command/v1" enumspb "go.temporal.io/api/enums/v1" historypb "go.temporal.io/api/history/v1" protocolpb "go.temporal.io/api/protocol/v1" @@ -159,7 +161,6 @@ func (s *WorkflowTaskCompletedHandlerSuite) TestUpdateWorkflow() { createWrittenHistoryCh := func(expectedUpdateWorkflowExecutionCalls int) <-chan []*historypb.HistoryEvent { writtenHistoryCh := make(chan []*historypb.HistoryEvent, expectedUpdateWorkflowExecutionCalls) - var historyEvents []*historypb.HistoryEvent s.mockExecutionMgr.EXPECT().UpdateWorkflowExecution(gomock.Any(), gomock.Any()).DoAndReturn(func(_ context.Context, request *persistence.UpdateWorkflowExecutionRequest) (*persistence.UpdateWorkflowExecutionResponse, error) { var wfEvents []*persistence.WorkflowEvents if len(request.UpdateWorkflowEvents) > 0 { @@ -168,6 +169,7 @@ func (s *WorkflowTaskCompletedHandlerSuite) TestUpdateWorkflow() { wfEvents = request.NewWorkflowEvents } + var historyEvents []*historypb.HistoryEvent for _, uwe := range wfEvents { for _, event := range uwe.Events { historyEvents = append(historyEvents, event) @@ -187,10 +189,13 @@ func (s *WorkflowTaskCompletedHandlerSuite) TestUpdateWorkflow() { wfContext := s.createStartedWorkflow(tv) writtenHistoryCh := createWrittenHistoryCh(1) + _, err := wfContext.LoadMutableState(context.Background(), s.workflowTaskCompletedHandler.shardContext) + s.NoError(err) + updRequestMsg, upd, serializedTaskToken := s.createSentUpdate(tv, "1", wfContext) s.NotNil(upd) - _, err := s.workflowTaskCompletedHandler.Invoke(context.Background(), &historyservice.RespondWorkflowTaskCompletedRequest{ + _, err = s.workflowTaskCompletedHandler.Invoke(context.Background(), &historyservice.RespondWorkflowTaskCompletedRequest{ NamespaceId: tv.NamespaceID().String(), CompleteRequest: &workflowservice.RespondWorkflowTaskCompletedRequest{ TaskToken: serializedTaskToken, @@ -338,6 +343,129 @@ func (s *WorkflowTaskCompletedHandlerSuite) TestUpdateWorkflow() { 7 WorkflowTaskScheduled 8 WorkflowTaskStarted`, <-writtenHistoryCh) }) + + s.Run("Discard speculative WFT with events", func() { + tv := testvars.New(s.T()) + tv = tv.WithRunID(tv.Any().RunID()) + s.mockNamespaceCache.EXPECT().GetNamespaceByID(tv.NamespaceID()).Return(tv.Namespace(), nil).AnyTimes() + wfContext := s.createStartedWorkflow(tv) + // Expect only 2 calls to UpdateWorkflowExecution: for timer started and timer fired events but not Update or WFT events. + writtenHistoryCh := createWrittenHistoryCh(2) + ms, err := wfContext.LoadMutableState(context.Background(), s.workflowTaskCompletedHandler.shardContext) + s.NoError(err) + + _, _, err = ms.AddTimerStartedEvent( + 1, + &commandpb.StartTimerCommandAttributes{ + TimerId: tv.TimerID(), + StartToFireTimeout: tv.InfiniteTimeout(), + }, + ) + s.NoError(err) + err = wfContext.UpdateWorkflowExecutionAsActive(context.Background(), s.workflowTaskCompletedHandler.shardContext) + s.NoError(err) + + s.EqualHistoryEvents(` + 2 TimerStarted +`, <-writtenHistoryCh) + + updRequestMsg, upd, serializedTaskToken := s.createSentUpdate(tv, "1", wfContext) + s.NotNil(upd) + + _, err = s.workflowTaskCompletedHandler.Invoke(context.Background(), &historyservice.RespondWorkflowTaskCompletedRequest{ + NamespaceId: tv.NamespaceID().String(), + CompleteRequest: &workflowservice.RespondWorkflowTaskCompletedRequest{ + TaskToken: serializedTaskToken, + Messages: s.UpdateRejectMessages(tv, updRequestMsg, "1"), + Identity: tv.Any().String(), + Capabilities: &workflowservice.RespondWorkflowTaskCompletedRequest_Capabilities{ + DiscardSpeculativeWorkflowTaskWithEvents: true, + }, + }, + }) + s.NoError(err) + + updStatus, err := upd.WaitLifecycleStage(context.Background(), enumspb.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_UNSPECIFIED, time.Duration(0)) + s.NoError(err) + s.Equal(enumspb.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_COMPLETED.String(), updStatus.Stage.String()) + s.Equal("rejection-of-"+tv.UpdateID("1"), updStatus.Outcome.GetFailure().GetMessage()) + + ms, err = wfContext.LoadMutableState(context.Background(), s.workflowTaskCompletedHandler.shardContext) + s.NoError(err) + _, err = ms.AddTimerFiredEvent(tv.TimerID()) + s.NoError(err) + err = wfContext.UpdateWorkflowExecutionAsActive(context.Background(), s.workflowTaskCompletedHandler.shardContext) + s.NoError(err) + + s.EqualHistoryEvents(` + 3 TimerFired // No WFT events in between 2 and 3. +`, <-writtenHistoryCh) + }) + + s.Run("Do not discard speculative WFT with more than 10 events", func() { + tv := testvars.New(s.T()) + tv = tv.WithRunID(tv.Any().RunID()) + s.mockNamespaceCache.EXPECT().GetNamespaceByID(tv.NamespaceID()).Return(tv.Namespace(), nil).AnyTimes() + wfContext := s.createStartedWorkflow(tv) + // Expect 2 calls to UpdateWorkflowExecution: for timer started and WFT events. + writtenHistoryCh := createWrittenHistoryCh(2) + ms, err := wfContext.LoadMutableState(context.Background(), s.workflowTaskCompletedHandler.shardContext) + s.NoError(err) + + for i := 0; i < 11; i++ { + _, _, err = ms.AddTimerStartedEvent( + 1, + &commandpb.StartTimerCommandAttributes{ + TimerId: tv.TimerID(strconv.Itoa(i)), + StartToFireTimeout: tv.InfiniteTimeout(), + }, + ) + s.NoError(err) + } + err = wfContext.UpdateWorkflowExecutionAsActive(context.Background(), s.workflowTaskCompletedHandler.shardContext) + s.NoError(err) + + s.EqualHistoryEvents(` + 2 TimerStarted + 3 TimerStarted + 4 TimerStarted + 5 TimerStarted + 6 TimerStarted + 7 TimerStarted + 8 TimerStarted + 9 TimerStarted + 10 TimerStarted + 11 TimerStarted + 12 TimerStarted +`, <-writtenHistoryCh) + + updRequestMsg, upd, serializedTaskToken := s.createSentUpdate(tv, "1", wfContext) + s.NotNil(upd) + + _, err = s.workflowTaskCompletedHandler.Invoke(context.Background(), &historyservice.RespondWorkflowTaskCompletedRequest{ + NamespaceId: tv.NamespaceID().String(), + CompleteRequest: &workflowservice.RespondWorkflowTaskCompletedRequest{ + TaskToken: serializedTaskToken, + Messages: s.UpdateRejectMessages(tv, updRequestMsg, "1"), + Identity: tv.Any().String(), + Capabilities: &workflowservice.RespondWorkflowTaskCompletedRequest_Capabilities{ + DiscardSpeculativeWorkflowTaskWithEvents: true, + }, + }, + }) + s.NoError(err) + + updStatus, err := upd.WaitLifecycleStage(context.Background(), enumspb.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_UNSPECIFIED, time.Duration(0)) + s.NoError(err) + s.Equal(enumspb.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_COMPLETED.String(), updStatus.Stage.String()) + s.Equal("rejection-of-"+tv.UpdateID("1"), updStatus.Outcome.GetFailure().GetMessage()) + + s.EqualHistoryEvents(` + 13 WorkflowTaskScheduled // WFT events were created even if it was a rejection (because number of events > 10). + 14 WorkflowTaskStarted + 15 WorkflowTaskCompleted +`, <-writtenHistoryCh) + }) } func (s *WorkflowTaskCompletedHandlerSuite) TestHandleBufferedQueries() { @@ -420,7 +548,7 @@ func (s *WorkflowTaskCompletedHandlerSuite) createStartedWorkflow(tv *testvars.T WorkflowExecutionTimeout: durationpb.New(tv.InfiniteTimeout().AsDuration()), WorkflowRunTimeout: durationpb.New(tv.InfiniteTimeout().AsDuration()), WorkflowTaskTimeout: durationpb.New(tv.InfiniteTimeout().AsDuration()), - Identity: tv.Any().String(), + Identity: tv.ClientIdentity(), } _, _ = ms.AddWorkflowExecutionStartedEvent( diff --git a/service/history/configs/config.go b/service/history/configs/config.go index d0558876e99..d35cf150cd5 100644 --- a/service/history/configs/config.go +++ b/service/history/configs/config.go @@ -255,9 +255,10 @@ type Config struct { DefaultWorkflowTaskTimeout dynamicconfig.DurationPropertyFnWithNamespaceFilter // WorkflowTaskHeartbeatTimeout is to timeout behavior of: RespondWorkflowTaskComplete with ForceCreateNewWorkflowTask == true // without any commands or messages. After this timeout workflow task will be scheduled to another worker(by clear stickyness). - WorkflowTaskHeartbeatTimeout dynamicconfig.DurationPropertyFnWithNamespaceFilter - WorkflowTaskCriticalAttempts dynamicconfig.IntPropertyFn - WorkflowTaskRetryMaxInterval dynamicconfig.DurationPropertyFn + WorkflowTaskHeartbeatTimeout dynamicconfig.DurationPropertyFnWithNamespaceFilter + WorkflowTaskCriticalAttempts dynamicconfig.IntPropertyFn + WorkflowTaskRetryMaxInterval dynamicconfig.DurationPropertyFn + DiscardSpeculativeWorkflowTaskMaximumEventsCount dynamicconfig.IntPropertyFn // The following is used by the new RPC replication stack ReplicationTaskApplyTimeout dynamicconfig.DurationPropertyFn @@ -586,11 +587,12 @@ func NewConfig( ThrottledLogRPS: dynamicconfig.HistoryThrottledLogRPS.Get(dc), EnableStickyQuery: dynamicconfig.EnableStickyQuery.Get(dc), - DefaultActivityRetryPolicy: dynamicconfig.DefaultActivityRetryPolicy.Get(dc), - DefaultWorkflowRetryPolicy: dynamicconfig.DefaultWorkflowRetryPolicy.Get(dc), - WorkflowTaskHeartbeatTimeout: dynamicconfig.WorkflowTaskHeartbeatTimeout.Get(dc), - WorkflowTaskCriticalAttempts: dynamicconfig.WorkflowTaskCriticalAttempts.Get(dc), - WorkflowTaskRetryMaxInterval: dynamicconfig.WorkflowTaskRetryMaxInterval.Get(dc), + DefaultActivityRetryPolicy: dynamicconfig.DefaultActivityRetryPolicy.Get(dc), + DefaultWorkflowRetryPolicy: dynamicconfig.DefaultWorkflowRetryPolicy.Get(dc), + WorkflowTaskHeartbeatTimeout: dynamicconfig.WorkflowTaskHeartbeatTimeout.Get(dc), + WorkflowTaskCriticalAttempts: dynamicconfig.WorkflowTaskCriticalAttempts.Get(dc), + WorkflowTaskRetryMaxInterval: dynamicconfig.WorkflowTaskRetryMaxInterval.Get(dc), + DiscardSpeculativeWorkflowTaskMaximumEventsCount: dynamicconfig.DiscardSpeculativeWorkflowTaskMaximumEventsCount.Get(dc), ReplicationTaskApplyTimeout: dynamicconfig.ReplicationTaskApplyTimeout.Get(dc), ReplicationTaskFetcherParallelism: dynamicconfig.ReplicationTaskFetcherParallelism.Get(dc), diff --git a/service/history/workflow/workflow_task_state_machine.go b/service/history/workflow/workflow_task_state_machine.go index c566360ceb9..f43344f796c 100644 --- a/service/history/workflow/workflow_task_state_machine.go +++ b/service/history/workflow/workflow_task_state_machine.go @@ -624,28 +624,46 @@ func (m *workflowTaskStateMachine) skipWorkflowTaskCompletedEvent(workflowTaskTy if request.GetForceCreateNewWorkflowTask() { // If ForceCreateNewWorkflowTask is set to true, then this is a heartbeat response. - // New WT will be created as Normal and WorkflowTaskCompletedEvent for this WT is also must be written. - // In the future, if we decide not to write heartbeat of speculative WT to the history, this check should be removed, - // and extra logic should be added to create next WT as Speculative. Currently, new heartbeat WT is always created as Normal. + // New WFT will be created as Normal and WorkflowTaskCompletedEvent for this WFT is also must be written. + // In the future, if we decide not to write heartbeat of speculative WFT to the history, this check should be removed, + // and extra logic should be added to create next WFT as Speculative. Currently, new heartbeat WFT is always created as Normal. metrics.SpeculativeWorkflowTaskCommits.With(m.metricsHandler).Record(1, metrics.ReasonTag("force_create_task")) return false } - // Speculative WFT can be dropped only if there are no events after previous WFTCompleted event, + // Speculative WFT that has only Update rejection messages should be discarded (this function returns `true`). + // If speculative WFT also shipped events to the worker and was discarded, then + // next WFT will ship these events again. Unfortunately, old SDKs don't support receiving same events more than once. + // If SDK supports this, it will set DiscardSpeculativeWorkflowTaskWithEvents to `true` + // and server can discard speculative WFT even if it had events. + + // Otherwise, server needs to determinate if there were events on this speculative WFT, // i.e. last event in the history is WFTCompleted event. // It is guaranteed that WFTStarted event is followed by WFTCompleted event and history tail might look like: // previous WFTStarted // previous WFTCompleted // --> NextEventID points here because it doesn't move for speculative WFT. - // In this case difference between NextEventID and LastCompletedWorkflowTaskStartedEventId is 2. - // If there are other events after WFTCompleted event, then difference is > 2 and speculative WFT can't be dropped. - if m.ms.GetNextEventID() != m.ms.GetLastCompletedWorkflowTaskStartedEventId()+2 { + // In this case, the difference between NextEventID and LastCompletedWorkflowTaskStartedEventId is 2. + // If there are other events after WFTCompleted event, then the difference is > 2 and speculative WFT can't be discarded. + if !request.GetCapabilities().GetDiscardSpeculativeWorkflowTaskWithEvents() && + m.ms.GetNextEventID() > m.ms.GetLastCompletedWorkflowTaskStartedEventId()+2 { metrics.SpeculativeWorkflowTaskCommits.With(m.metricsHandler).Record(1, metrics.ReasonTag("interleaved_events")) return false } + // Even if worker supports receiving same events more than once, + // server still writes speculative WFT if it had too many events. + // This is to prevent shipping a big set of events to the worker over and over again, + // in case if Updates are constantly rejected. + if request.GetCapabilities().GetDiscardSpeculativeWorkflowTaskWithEvents() && + m.ms.GetNextEventID() > m.ms.GetLastCompletedWorkflowTaskStartedEventId()+2+int64(m.ms.config.DiscardSpeculativeWorkflowTaskMaximumEventsCount()) { + metrics.SpeculativeWorkflowTaskCommits.With(m.metricsHandler).Record(1, + metrics.ReasonTag("too_many_interleaved_events")) + return false + } + for _, message := range request.Messages { if !message.GetBody().MessageIs((*updatepb.Rejection)(nil)) { metrics.SpeculativeWorkflowTaskCommits.With(m.metricsHandler).Record(1, @@ -654,9 +672,9 @@ func (m *workflowTaskStateMachine) skipWorkflowTaskCompletedEvent(workflowTaskTy } } - // Speculative WT can be dropped when response contains only rejection messages. + // Speculative WFT can be discarded when response contains only rejection messages. // Empty messages list is equivalent to only rejection messages because server will reject all sent updates (if any). - // + // TODO: We should perform a shard ownership check here to prevent the case where the entire speculative workflow task // is done on a stale mutable state and the fact that mutable state is stale caused workflow update requests to be rejected. // NOTE: The AssertShardOwnership persistence API is not implemented in the repo.