Skip to content

Commit

Permalink
Catchup tasks for Nexus features in the test env (#1824)
Browse files Browse the repository at this point in the history
- Propagate operation timeout to the handler via header.
- Handle operation complete-before-start.
  • Loading branch information
bergundy authored Feb 14, 2025
1 parent 4fb50dc commit b19fc66
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 7 deletions.
12 changes: 8 additions & 4 deletions internal/internal_workflow_testsuite.go
Original file line number Diff line number Diff line change
Expand Up @@ -2443,6 +2443,9 @@ func (env *testWorkflowEnvironmentImpl) ExecuteNexusOperation(

var token string
if params.options.ScheduleToCloseTimeout > 0 {
// Propagate operation timeout to the handler via header.
params.nexusHeader[strings.ToLower(nexus.HeaderOperationTimeout)] = strconv.FormatInt(params.options.ScheduleToCloseTimeout.Milliseconds(), 10) + "ms"

// Timer to fail the nexus operation due to schedule to close timeout.
env.NewTimer(
params.options.ScheduleToCloseTimeout,
Expand Down Expand Up @@ -2676,7 +2679,7 @@ func (env *testWorkflowEnvironmentImpl) scheduleNexusAsyncOperationCompletion(
}, completionHandle.delay)
}

func (env *testWorkflowEnvironmentImpl) resolveNexusOperation(seq int64, result *commonpb.Payload, err error) {
func (env *testWorkflowEnvironmentImpl) resolveNexusOperation(seq int64, token string, result *commonpb.Payload, err error) {
env.postCallback(func() {
handle, ok := env.getNexusOperationHandle(seq)
if !ok {
Expand All @@ -2685,10 +2688,11 @@ func (env *testWorkflowEnvironmentImpl) resolveNexusOperation(seq int64, result
if err != nil {
failure := env.failureConverter.ErrorToFailure(err)
err = env.failureConverter.FailureToError(nexusOperationFailure(handle.params, handle.operationToken, failure.GetCause()))
handle.completedCallback(nil, err)
} else {
handle.completedCallback(result, nil)
}
// Populate the token in case the operation completes before it marked as started.
// startedCallback is idempotent and will be a noop in case the operation has already been marked as started.
handle.startedCallback(token, err)
handle.completedCallback(result, err)
}, true)
}

Expand Down
16 changes: 13 additions & 3 deletions internal/nexus_operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ func apiHandlerErrorToNexusHandlerError(apiErr *nexuspb.HandlerError, failureCon
}

nexusErr := &nexus.HandlerError{
Type: nexus.HandlerErrorType(apiErr.GetErrorType()),
Type: nexus.HandlerErrorType(apiErr.GetErrorType()),
RetryBehavior: retryBehavior,
}

Expand Down Expand Up @@ -356,14 +356,24 @@ func (t *testSuiteClientForNexusOperations) ExecuteWorkflow(ctx context.Context,
panic(fmt.Errorf("unexpected operation sequence in callback header: %s: %w", seqStr, err))
}

// Send the operation token to account for a race when the completion comes in before the response to the
// StartOperation call is recorded.
// The token is extracted from the callback header which is attached in ExecuteUntypedWorkflow.
var operationToken string
if len(options.callbacks) == 1 {
if cbHeader := options.callbacks[0].GetNexus().GetHeader(); cbHeader != nil {
operationToken = cbHeader[nexus.HeaderOperationToken]
}
}

if wfErr != nil {
t.env.resolveNexusOperation(seq, nil, wfErr)
t.env.resolveNexusOperation(seq, operationToken, nil, wfErr)
} else {
var payload *commonpb.Payload
if len(result.GetPayloads()) > 0 {
payload = result.Payloads[0]
}
t.env.resolveNexusOperation(seq, payload, nil)
t.env.resolveNexusOperation(seq, operationToken, payload, nil)
}
}, func(r WorkflowExecution, err error) {
run.WorkflowExecution = r
Expand Down
3 changes: 3 additions & 0 deletions test/nexus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1122,6 +1122,9 @@ func TestWorkflowTestSuite_WorkflowRunOperation_ScheduleToCloseTimeout(t *testin
"op",
handlerWF,
func(ctx context.Context, _ nexus.NoValue, opts nexus.StartOperationOptions) (client.StartWorkflowOptions, error) {
if opts.Header.Get(nexus.HeaderOperationTimeout) == "" {
return client.StartWorkflowOptions{}, nexus.HandlerErrorf(nexus.HandlerErrorTypeBadRequest, "expected non empty operation timeout header")
}
time.Sleep(opSleepDuration)
return client.StartWorkflowOptions{ID: opts.RequestID}, nil
})
Expand Down

0 comments on commit b19fc66

Please sign in to comment.