Skip to content

Commit

Permalink
Update task executor to handle WorkflowAlreadyCompletedError for sign…
Browse files Browse the repository at this point in the history
…al and cancel workflow
  • Loading branch information
Shaddoll committed Apr 29, 2024
1 parent b567432 commit 611dfda
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 10 deletions.
10 changes: 10 additions & 0 deletions service/history/task/cross_cluster_source_task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,10 @@ func (t *crossClusterSourceTaskExecutor) executeCancelExecutionTask(

if failedCause != nil {
// remaining errors are non-retryable
cause := types.CancelExternalWorkflowExecutionFailedCauseUnknownExternalWorkflowExecution
if *failedCause == types.CrossClusterTaskFailedCauseWorkflowAlreadyCompleted {
cause = types.CancelExternalWorkflowExecutionFailedCauseWorkflowAlreadyCompleted
}
return requestCancelExternalExecutionFailed(
ctx,
taskInfo,
Expand All @@ -265,6 +269,7 @@ func (t *crossClusterSourceTaskExecutor) executeCancelExecutionTask(
taskInfo.TargetWorkflowID,
taskInfo.TargetRunID,
now,
cause,
)
}
return requestCancelExternalExecutionCompleted(
Expand Down Expand Up @@ -479,6 +484,10 @@ func (t *crossClusterSourceTaskExecutor) executeSignalExecutionTask(

if failedCause != nil {
// remaining errors are non-retryable
cause := types.SignalExternalWorkflowExecutionFailedCauseUnknownExternalWorkflowExecution
if *failedCause == types.CrossClusterTaskFailedCauseWorkflowAlreadyCompleted {
cause = types.SignalExternalWorkflowExecutionFailedCauseWorkflowAlreadyCompleted
}
return signalExternalExecutionFailed(
ctx,
taskInfo,
Expand All @@ -488,6 +497,7 @@ func (t *crossClusterSourceTaskExecutor) executeSignalExecutionTask(
taskInfo.TargetRunID,
signalInfo.Control,
now,
cause,
)
}

Expand Down
36 changes: 26 additions & 10 deletions service/history/task/transfer_active_task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -630,6 +630,7 @@ func (t *transferActiveTaskExecutor) processCancelExecution(
task.TargetWorkflowID,
task.TargetRunID,
t.shard.GetTimeSource().Now(),
types.CancelExternalWorkflowExecutionFailedCauseUnknownExternalWorkflowExecution,
)
return err
}
Expand All @@ -650,10 +651,15 @@ func (t *transferActiveTaskExecutor) processCancelExecution(
tag.TargetWorkflowRunID(task.TargetRunID),
tag.Error(err))

// Check to see if the error is non-transient, in which case add RequestCancelFailed
// event and complete transfer task by setting the err = nil
if common.IsServiceTransientError(err) || common.IsContextTimeoutError(err) {
// for retryable error just return
var notExistsErr *types.EntityNotExistsError
var alreadyCompletedErr *types.WorkflowExecutionAlreadyCompletedError
var cause *types.CancelExternalWorkflowExecutionFailedCause
if errors.As(err, &notExistsErr) {
cause = types.CancelExternalWorkflowExecutionFailedCauseUnknownExternalWorkflowExecution.Ptr()
} else if errors.As(err, &alreadyCompletedErr) {
cause = types.CancelExternalWorkflowExecutionFailedCauseWorkflowAlreadyCompleted.Ptr()
}
if cause == nil {
return err
}
return requestCancelExternalExecutionFailed(
Expand All @@ -664,6 +670,7 @@ func (t *transferActiveTaskExecutor) processCancelExecution(
task.TargetWorkflowID,
task.TargetRunID,
t.shard.GetTimeSource().Now(),
*cause,
)
}

Expand Down Expand Up @@ -750,6 +757,7 @@ func (t *transferActiveTaskExecutor) processSignalExecution(
task.TargetRunID,
signalInfo.Control,
t.shard.GetTimeSource().Now(),
types.SignalExternalWorkflowExecutionFailedCauseUnknownExternalWorkflowExecution,
)
}

Expand All @@ -769,10 +777,15 @@ func (t *transferActiveTaskExecutor) processSignalExecution(
tag.TargetWorkflowRunID(task.TargetRunID),
tag.Error(err))

// Check to see if the error is non-transient, in which case add SignalFailed
// event and complete transfer task by setting the err = nil
if common.IsServiceTransientError(err) || common.IsContextTimeoutError(err) {
// for retryable error just return
var notExistsErr *types.EntityNotExistsError
var alreadyCompletedErr *types.WorkflowExecutionAlreadyCompletedError
var cause *types.SignalExternalWorkflowExecutionFailedCause
if errors.As(err, &notExistsErr) {
cause = types.SignalExternalWorkflowExecutionFailedCauseUnknownExternalWorkflowExecution.Ptr()
} else if errors.As(err, &alreadyCompletedErr) {
cause = types.SignalExternalWorkflowExecutionFailedCauseWorkflowAlreadyCompleted.Ptr()
}
if cause == nil {
return err
}
return signalExternalExecutionFailed(
Expand All @@ -784,6 +797,7 @@ func (t *transferActiveTaskExecutor) processSignalExecution(
task.TargetRunID,
signalInfo.Control,
t.shard.GetTimeSource().Now(),
*cause,
)
}

Expand Down Expand Up @@ -1419,6 +1433,7 @@ func requestCancelExternalExecutionFailed(
targetWorkflowID string,
targetRunID string,
now time.Time,
cause types.CancelExternalWorkflowExecutionFailedCause,
) error {

err := updateWorkflowExecution(ctx, wfContext, true,
Expand All @@ -1439,7 +1454,7 @@ func requestCancelExternalExecutionFailed(
targetDomain,
targetWorkflowID,
targetRunID,
types.CancelExternalWorkflowExecutionFailedCauseUnknownExternalWorkflowExecution,
cause,
)
return err
},
Expand All @@ -1464,6 +1479,7 @@ func signalExternalExecutionFailed(
targetRunID string,
control []byte,
now time.Time,
cause types.SignalExternalWorkflowExecutionFailedCause,
) error {

err := updateWorkflowExecution(ctx, wfContext, true,
Expand All @@ -1485,7 +1501,7 @@ func signalExternalExecutionFailed(
targetWorkflowID,
targetRunID,
control,
types.SignalExternalWorkflowExecutionFailedCauseUnknownExternalWorkflowExecution,
cause,
)
return err
},
Expand Down

0 comments on commit 611dfda

Please sign in to comment.