Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Workflow task state machine: minor renames #3735

Merged
merged 2 commits into from
Dec 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion service/history/configs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ type Config struct {
// Workflow task settings
// DefaultWorkflowTaskTimeout the default workflow task timeout
DefaultWorkflowTaskTimeout dynamicconfig.DurationPropertyFnWithNamespaceFilter
// WorkflowTaskHeartbeatTimeout is to timeout behavior of: RespondWorkflowTaskComplete with ForceCreateNewWorkflowTask == true without any workflow tasks
// WorkflowTaskHeartbeatTimeout is to timeout behavior of: RespondWorkflowTaskComplete with ForceCreateNewWorkflowTask == true without any commands
// So that workflow task will be scheduled to another worker(by clear stickyness)
WorkflowTaskHeartbeatTimeout dynamicconfig.DurationPropertyFnWithNamespaceFilter
WorkflowTaskCriticalAttempts dynamicconfig.IntPropertyFn
Expand Down
8 changes: 4 additions & 4 deletions service/history/workflow/history_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,9 +198,9 @@ func (b *HistoryBuilder) AddWorkflowTaskScheduledEvent(
taskQueue *taskqueuepb.TaskQueue,
startToCloseTimeout *time.Duration,
attempt int32,
now time.Time,
scheduleTime time.Time,
) *historypb.HistoryEvent {
event := b.createNewHistoryEvent(enumspb.EVENT_TYPE_WORKFLOW_TASK_SCHEDULED, now)
event := b.createNewHistoryEvent(enumspb.EVENT_TYPE_WORKFLOW_TASK_SCHEDULED, scheduleTime)
event.Attributes = &historypb.HistoryEvent_WorkflowTaskScheduledEventAttributes{
WorkflowTaskScheduledEventAttributes: &historypb.WorkflowTaskScheduledEventAttributes{
TaskQueue: taskQueue,
Expand All @@ -216,9 +216,9 @@ func (b *HistoryBuilder) AddWorkflowTaskStartedEvent(
scheduledEventID int64,
requestID string,
identity string,
now time.Time,
startTime time.Time,
) *historypb.HistoryEvent {
event := b.createNewHistoryEvent(enumspb.EVENT_TYPE_WORKFLOW_TASK_STARTED, now)
event := b.createNewHistoryEvent(enumspb.EVENT_TYPE_WORKFLOW_TASK_STARTED, startTime)
event.Attributes = &historypb.HistoryEvent_WorkflowTaskStartedEventAttributes{
WorkflowTaskStartedEventAttributes: &historypb.WorkflowTaskStartedEventAttributes{
ScheduledEventId: scheduledEventID,
Expand Down
3 changes: 2 additions & 1 deletion service/history/workflow/mutable_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ type (
CheckResettable() error
CloneToProto() *persistencespb.WorkflowMutableState
RetryActivity(ai *persistencespb.ActivityInfo, failure *failurepb.Failure) (enumspb.RetryState, error)
CreateTransientWorkflowTask(workflowTask *WorkflowTaskInfo, identity string) *historyspb.TransientWorkflowTaskInfo
GetTransientWorkflowTaskInfo(workflowTask *WorkflowTaskInfo, identity string) *historyspb.TransientWorkflowTaskInfo
DeleteWorkflowTask()
DeleteSignalRequested(requestID string)
FlushBufferedEvents()
Expand Down Expand Up @@ -193,6 +193,7 @@ type (
IsCurrentWorkflowGuaranteed() bool
IsSignalRequested(requestID string) bool
IsStickyTaskQueueEnabled() bool
TaskQueue() *taskqueuepb.TaskQueue
IsWorkflowExecutionRunning() bool
IsResourceDuplicated(resourceDedupKey definition.DeduplicationID) bool
IsWorkflowPendingOnWorkflowTaskBackoff() bool
Expand Down
21 changes: 17 additions & 4 deletions service/history/workflow/mutable_state_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -607,6 +607,19 @@ func (ms *MutableStateImpl) IsStickyTaskQueueEnabled() bool {
return ms.executionInfo.StickyTaskQueue != ""
}

func (e *MutableStateImpl) TaskQueue() *taskqueuepb.TaskQueue {
if e.IsStickyTaskQueueEnabled() {
return &taskqueuepb.TaskQueue{
Name: e.executionInfo.StickyTaskQueue,
Kind: enumspb.TASK_QUEUE_KIND_STICKY,
}
}
return &taskqueuepb.TaskQueue{
Name: e.executionInfo.TaskQueue,
Kind: enumspb.TASK_QUEUE_KIND_NORMAL,
}
}

func (ms *MutableStateImpl) GetWorkflowType() *commonpb.WorkflowType {
wType := &commonpb.WorkflowType{}
wType.Name = ms.executionInfo.WorkflowTypeName
Expand Down Expand Up @@ -1304,7 +1317,7 @@ func (ms *MutableStateImpl) ClearTransientWorkflowTask() error {
return serviceerror.NewInternal("cannot clear transient workflow task when there are buffered events")
}
// no buffered event
resetWorkflowTaskInfo := &WorkflowTaskInfo{
emptyWorkflowTaskInfo := &WorkflowTaskInfo{
Version: common.EmptyVersion,
ScheduledEventID: common.EmptyEventID,
StartedEventID: common.EmptyEventID,
Expand All @@ -1317,7 +1330,7 @@ func (ms *MutableStateImpl) ClearTransientWorkflowTask() error {
TaskQueue: nil,
OriginalScheduledTime: timestamp.UnixOrZeroTimePtr(0),
}
ms.workflowTaskManager.UpdateWorkflowTask(resetWorkflowTaskInfo)
ms.workflowTaskManager.UpdateWorkflowTask(emptyWorkflowTaskInfo)
return nil
}

Expand Down Expand Up @@ -1774,11 +1787,11 @@ func (ms *MutableStateImpl) ReplicateWorkflowTaskStartedEvent(
return ms.workflowTaskManager.ReplicateWorkflowTaskStartedEvent(workflowTask, version, scheduledEventID, startedEventID, requestID, timestamp)
}

func (ms *MutableStateImpl) CreateTransientWorkflowTask(
func (ms *MutableStateImpl) GetTransientWorkflowTaskInfo(
workflowTask *WorkflowTaskInfo,
identity string,
) *historyspb.TransientWorkflowTaskInfo {
return ms.workflowTaskManager.CreateTransientWorkflowTaskEvents(workflowTask, identity)
return ms.workflowTaskManager.GetTransientWorkflowTaskInfo(workflowTask, identity)
}

// add BinaryCheckSum for the first workflowTaskCompletedID for auto-reset
Expand Down
42 changes: 28 additions & 14 deletions service/history/workflow/mutable_state_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

53 changes: 18 additions & 35 deletions service/history/workflow/workflow_task_state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,16 +250,6 @@ func (m *workflowTaskStateMachine) AddWorkflowTaskScheduledEventAsHeartbeat(
return nil, m.ms.createInternalServerError(opTag)
}

// Task queue and workflow task timeout should already be set from workflow execution started event
taskQueue := &taskqueuepb.TaskQueue{}
if m.ms.IsStickyTaskQueueEnabled() {
taskQueue.Name = m.ms.executionInfo.StickyTaskQueue
taskQueue.Kind = enumspb.TASK_QUEUE_KIND_STICKY
} else {
taskQueue.Name = m.ms.executionInfo.TaskQueue
taskQueue.Kind = enumspb.TASK_QUEUE_KIND_NORMAL
}

// Flush any buffered events before creating the workflow task, otherwise it will result in invalid IDs for transient
// workflow task and will cause in timeout processing to not work for transient workflow tasks
if m.ms.HasBufferedEvents() {
Expand All @@ -280,21 +270,27 @@ func (m *workflowTaskStateMachine) AddWorkflowTaskScheduledEventAsHeartbeat(
}
}

var newWorkflowTaskEvent *historypb.HistoryEvent
scheduledEventID := m.ms.GetNextEventID() // we will generate the schedule event later for repeatedly failing workflow tasks
// Avoid creating new history events when workflow tasks are continuously failing
scheduledTime := m.ms.timeSource.Now().UTC()
scheduleTime := m.ms.timeSource.Now().UTC()
attempt := m.ms.executionInfo.WorkflowTaskAttempt
// TaskQueue should already be set from workflow execution started event.
taskQueue := m.ms.TaskQueue()
// DefaultWorkflowTaskTimeout should already be set from workflow execution started event.
startToCloseTimeout := m.getStartToCloseTimeout(m.ms.executionInfo.DefaultWorkflowTaskTimeout, attempt)

var scheduledEvent *historypb.HistoryEvent
var scheduledEventID int64

if attempt == 1 {
newWorkflowTaskEvent = m.ms.hBuilder.AddWorkflowTaskScheduledEvent(
scheduledEvent = m.ms.hBuilder.AddWorkflowTaskScheduledEvent(
taskQueue,
startToCloseTimeout,
attempt,
m.ms.timeSource.Now(),
scheduleTime,
)
scheduledEventID = newWorkflowTaskEvent.GetEventId()
scheduledTime = timestamp.TimeValue(newWorkflowTaskEvent.GetEventTime())
scheduledEventID = scheduledEvent.GetEventId()
} else {
// WorkflowTaskScheduledEvent will be created later.
scheduledEventID = m.ms.GetNextEventID()
}

workflowTask, err := m.ReplicateWorkflowTaskScheduledEvent(
Expand All @@ -303,7 +299,7 @@ func (m *workflowTaskStateMachine) AddWorkflowTaskScheduledEventAsHeartbeat(
taskQueue,
startToCloseTimeout,
attempt,
&scheduledTime,
&scheduleTime,
originalScheduledTimestamp,
)
if err != nil {
Expand Down Expand Up @@ -446,12 +442,8 @@ func (m *workflowTaskStateMachine) AddWorkflowTaskCompletedEvent(
m.beforeAddWorkflowTaskCompletedEvent()
if workflowTask.Attempt > 1 {
// Create corresponding WorkflowTaskSchedule and WorkflowTaskStarted events for workflow tasks we have been retrying
taskQueue := &taskqueuepb.TaskQueue{
Name: m.ms.executionInfo.TaskQueue,
Kind: enumspb.TASK_QUEUE_KIND_NORMAL,
}
scheduledEvent := m.ms.hBuilder.AddWorkflowTaskScheduledEvent(
taskQueue,
m.ms.TaskQueue(),
workflowTask.WorkflowTaskTimeout,
workflowTask.Attempt,
timestamp.TimeValue(workflowTask.ScheduledTime).UTC(),
Expand Down Expand Up @@ -684,7 +676,7 @@ func (m *workflowTaskStateMachine) GetWorkflowTaskInfo(
return nil, false
}

func (m *workflowTaskStateMachine) CreateTransientWorkflowTaskEvents(
func (m *workflowTaskStateMachine) GetTransientWorkflowTaskInfo(
workflowTask *WorkflowTaskInfo,
identity string,
) *historyspb.TransientWorkflowTaskInfo {
Expand Down Expand Up @@ -735,15 +727,6 @@ func (m *workflowTaskStateMachine) CreateTransientWorkflowTaskEvents(
}

func (m *workflowTaskStateMachine) getWorkflowTaskInfo() *WorkflowTaskInfo {
taskQueue := &taskqueuepb.TaskQueue{}
if m.ms.IsStickyTaskQueueEnabled() {
taskQueue.Name = m.ms.executionInfo.StickyTaskQueue
taskQueue.Kind = enumspb.TASK_QUEUE_KIND_STICKY
} else {
taskQueue.Name = m.ms.executionInfo.TaskQueue
taskQueue.Kind = enumspb.TASK_QUEUE_KIND_NORMAL
}

return &WorkflowTaskInfo{
Version: m.ms.executionInfo.WorkflowTaskVersion,
ScheduledEventID: m.ms.executionInfo.WorkflowTaskScheduledEventId,
Expand All @@ -753,7 +736,7 @@ func (m *workflowTaskStateMachine) getWorkflowTaskInfo() *WorkflowTaskInfo {
Attempt: m.ms.executionInfo.WorkflowTaskAttempt,
StartedTime: m.ms.executionInfo.WorkflowTaskStartedTime,
ScheduledTime: m.ms.executionInfo.WorkflowTaskScheduledTime,
TaskQueue: taskQueue,
TaskQueue: m.ms.TaskQueue(),
OriginalScheduledTime: m.ms.executionInfo.WorkflowTaskOriginalScheduledTime,
}
}
Expand Down
3 changes: 2 additions & 1 deletion service/history/workflowTaskHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,14 +117,15 @@ func newWorkflowTaskHandler(
config *configs.Config,
shard shard.Context,
searchAttributesMapper searchattribute.Mapper,
hasBufferedEvents bool,
) *workflowTaskHandlerImpl {

return &workflowTaskHandlerImpl{
identity: identity,
workflowTaskCompletedID: workflowTaskCompletedID,

// internal state
hasBufferedEvents: mutableState.HasBufferedEvents(),
hasBufferedEvents: hasBufferedEvents,
workflowTaskFailedCause: nil,
activityNotStartedCancelled: false,
newMutableState: nil,
Expand Down
3 changes: 2 additions & 1 deletion service/history/workflowTaskHandlerCallbacks.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,7 @@ func (handler *workflowTaskHandlerCallbacksImpl) handleWorkflowTaskCompleted(
handler.config,
handler.shard,
handler.searchAttributesMapper,
hasUnhandledEvents,
)

if responseMutations, err = workflowTaskHandler.handleCommands(
Expand Down Expand Up @@ -726,7 +727,7 @@ func (handler *workflowTaskHandlerCallbacksImpl) createRecordWorkflowTaskStarted
response.ScheduledTime = workflowTask.ScheduledTime
response.StartedTime = workflowTask.StartedTime

response.TransientWorkflowTask = ms.CreateTransientWorkflowTask(workflowTask, identity)
response.TransientWorkflowTask = ms.GetTransientWorkflowTaskInfo(workflowTask, identity)

currentBranchToken, err := ms.GetCurrentBranchToken()
if err != nil {
Expand Down