Skip to content

Commit

Permalink
Block using conflict policy UseExisting for Nexus WorkflowRunOperation (
Browse files Browse the repository at this point in the history
#1845)

* Block using conflict policy UseExisting for Nexus WorkflowRunOperation

* address comments
  • Loading branch information
rodrigozhou authored Feb 25, 2025
1 parent 61c10ce commit ab1c356
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 20 deletions.
9 changes: 9 additions & 0 deletions temporalnexus/operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,15 @@ func ExecuteUntypedWorkflow[R any](
internal.SetLinksOnStartWorkflowOptions(&startWorkflowOptions, links)
internal.SetOnConflictOptionsOnStartWorkflowOptions(&startWorkflowOptions)

// TODO(rodrigozhou): temporarily blocking conflict policy UseExisting.
if startWorkflowOptions.WorkflowIDConflictPolicy == enums.WORKFLOW_ID_CONFLICT_POLICY_USE_EXISTING {
return nil, &nexus.HandlerError{
Type: nexus.HandlerErrorTypeInternal,
RetryBehavior: nexus.HandlerErrorRetryBehaviorNonRetryable,
Cause: errors.New("workflow ID conflict policy UseExisting is not supported for Nexus WorkflowRunOperation"),
}
}

// This makes sure that ExecuteWorkflow will respect the WorkflowIDConflictPolicy, ie., if the
// conflict policy is to fail (default value), then ExecuteWorkflow will return an error if the
// workflow already running. For Nexus, this ensures that operation has only started successfully
Expand Down
60 changes: 40 additions & 20 deletions test/nexus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -994,7 +994,6 @@ func TestAsyncOperationFromWorkflow_MultipleCallers(t *testing.T) {
fut := client.ExecuteOperation(ctx, op, input, workflow.NexusOperationOptions{})
var exec workflow.NexusOperationExecution
err := fut.GetNexusOperationExecution().Get(ctx, &exec)
execOpCh.Send(ctx, nil)
if err != nil {
output.CntErr++
var handlerErr *nexus.HandlerError
Expand All @@ -1006,9 +1005,13 @@ func TestAsyncOperationFromWorkflow_MultipleCallers(t *testing.T) {
} else if appErr.Type() != "WorkflowExecutionAlreadyStarted" {
retError = err
}
} else {
output.CntOk++
}
execOpCh.Send(ctx, nil)
if err != nil {
return
}
output.CntOk++
var res string
err = fut.Get(ctx, &res)
if err != nil {
Expand All @@ -1023,8 +1026,10 @@ func TestAsyncOperationFromWorkflow_MultipleCallers(t *testing.T) {
execOpCh.Receive(ctx, nil)
}

// signal handler workflow so it will complete
workflow.SignalExternalWorkflow(ctx, handlerWorkflowID, "", "terminate", nil).Get(ctx, nil)
if output.CntOk > 0 {
// signal handler workflow so it will complete
workflow.SignalExternalWorkflow(ctx, handlerWorkflowID, "", "terminate", nil).Get(ctx, nil)
}
wg.Wait(ctx)
return output, retError
}
Expand All @@ -1040,19 +1045,24 @@ func TestAsyncOperationFromWorkflow_MultipleCallers(t *testing.T) {

testCases := []struct {
input string
checkOutput func(t *testing.T, numCalls int, res CallerWfOutput)
checkOutput func(t *testing.T, numCalls int, res CallerWfOutput, err error)
}{
{
input: "conflict-policy-fail",
checkOutput: func(t *testing.T, numCalls int, res CallerWfOutput) {
checkOutput: func(t *testing.T, numCalls int, res CallerWfOutput, err error) {
require.NoError(t, err)
require.EqualValues(t, 1, res.CntOk)
require.EqualValues(t, numCalls-1, res.CntErr)
},
},
{
input: "conflict-policy-use-existing",
checkOutput: func(t *testing.T, numCalls int, res CallerWfOutput) {
require.EqualValues(t, numCalls, res.CntOk)
checkOutput: func(t *testing.T, numCalls int, res CallerWfOutput, err error) {
// TODO(rodrigozhou): assert NotError and remove the comments below after UseExisting is
// unblocked.
require.ErrorContains(t, err, "workflow ID conflict policy UseExisting is not supported for Nexus WorkflowRunOperation")
// require.EqualValues(t, numCalls, res.CntOk)
// require.EqualValues(t, 0, res.CntErr)
},
},
}
Expand All @@ -1075,8 +1085,8 @@ func TestAsyncOperationFromWorkflow_MultipleCallers(t *testing.T) {
)
require.NoError(t, err)
var res CallerWfOutput
require.NoError(t, run.Get(ctx, &res))
tc.checkOutput(t, numCalls, res)
err = run.Get(ctx, &res)
tc.checkOutput(t, numCalls, res, err)
})
}
}
Expand Down Expand Up @@ -1687,7 +1697,6 @@ func TestWorkflowTestSuite_WorkflowRunOperation_MultipleCallers(t *testing.T) {
fut := client.ExecuteOperation(ctx, op, input, workflow.NexusOperationOptions{})
var exec workflow.NexusOperationExecution
err := fut.GetNexusOperationExecution().Get(ctx, &exec)
execOpCh.Send(ctx, nil)
if err != nil {
output.CntErr++
var handlerErr *nexus.HandlerError
Expand All @@ -1699,9 +1708,13 @@ func TestWorkflowTestSuite_WorkflowRunOperation_MultipleCallers(t *testing.T) {
} else if appErr.Type() != "WorkflowExecutionAlreadyStarted" {
retError = err
}
} else {
output.CntOk++
}
execOpCh.Send(ctx, nil)
if err != nil {
return
}
output.CntOk++
var res string
err = fut.Get(ctx, &res)
if err != nil {
Expand All @@ -1716,7 +1729,10 @@ func TestWorkflowTestSuite_WorkflowRunOperation_MultipleCallers(t *testing.T) {
execOpCh.Receive(ctx, nil)
}

workflow.SignalExternalWorkflow(ctx, handlerWorkflowID, "", "terminate", nil).Get(ctx, nil)
if output.CntOk > 0 {
// signal handler workflow so it will complete
workflow.SignalExternalWorkflow(ctx, handlerWorkflowID, "", "terminate", nil).Get(ctx, nil)
}
wg.Wait(ctx)
return output, retError
}
Expand All @@ -1726,19 +1742,24 @@ func TestWorkflowTestSuite_WorkflowRunOperation_MultipleCallers(t *testing.T) {

testCases := []struct {
input string
checkOutput func(t *testing.T, numCalls int, res CallerWfOutput)
checkOutput func(t *testing.T, numCalls int, res CallerWfOutput, err error)
}{
{
input: "conflict-policy-fail",
checkOutput: func(t *testing.T, numCalls int, res CallerWfOutput) {
checkOutput: func(t *testing.T, numCalls int, res CallerWfOutput, err error) {
require.NoError(t, err)
require.EqualValues(t, 1, res.CntOk)
require.EqualValues(t, numCalls-1, res.CntErr)
},
},
{
input: "conflict-policy-use-existing",
checkOutput: func(t *testing.T, numCalls int, res CallerWfOutput) {
require.EqualValues(t, numCalls, res.CntOk)
checkOutput: func(t *testing.T, numCalls int, res CallerWfOutput, err error) {
// TODO(rodrigozhou): assert NotError and remove the comments below after UseExisting is
// unblocked.
require.ErrorContains(t, err, "workflow ID conflict policy UseExisting is not supported for Nexus WorkflowRunOperation")
// require.EqualValues(t, numCalls, res.CntOk)
// require.EqualValues(t, 0, res.CntErr)
},
},
}
Expand All @@ -1754,10 +1775,9 @@ func TestWorkflowTestSuite_WorkflowRunOperation_MultipleCallers(t *testing.T) {

env.ExecuteWorkflow(callerWf, tc.input, numCalls)
require.True(t, env.IsWorkflowCompleted())
require.NoError(t, env.GetWorkflowError())
var res CallerWfOutput
require.NoError(t, env.GetWorkflowResult(&res))
tc.checkOutput(t, numCalls, res)
err := env.GetWorkflowResult(&res)
tc.checkOutput(t, numCalls, res, err)
})
}
}
Expand Down

0 comments on commit ab1c356

Please sign in to comment.