Skip to content

Commit

Permalink
More user-friendly NDE error message (#1837)
Browse files Browse the repository at this point in the history
  • Loading branch information
dandavison authored Feb 21, 2025
1 parent c99ec47 commit 1720257
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 3 deletions.
5 changes: 3 additions & 2 deletions internal/internal_command_state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -1074,8 +1074,9 @@ func (h *commandsHelper) incrementNextCommandEventIDIfVersionMarker() {
func (h *commandsHelper) getCommand(id commandID) commandStateMachine {
command, ok := h.commands[id]
if !ok {
panicMsg := fmt.Sprintf("[TMPRL1100] unknown command %v, possible causes are nondeterministic workflow definition code"+
" or incompatible change in the workflow definition", id)
panicMsg := fmt.Sprintf(
"[TMPRL1100] During replay, a matching %v command was expected in history event position %s. However, the replayed code did not produce that. "+
"Possible causes are nondeterministic workflow definition code, or an incompatible change in the workflow definition.", id.commandType, id.id)
panicIllegalState(panicMsg)
}
return command.Value.(commandStateMachine)
Expand Down
2 changes: 1 addition & 1 deletion internal/internal_event_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -1686,7 +1686,7 @@ func (weh *workflowExecutionEventHandlerImpl) handleLocalActivityMarker(details
if la, ok := weh.pendingLaTasks[lamd.ActivityID]; ok {
if len(lamd.ActivityType) > 0 && lamd.ActivityType != la.params.ActivityType {
// history marker mismatch to the current code.
panicMsg := fmt.Sprintf("[TMPRL1100] code execute local activity %v, but history event found %v, markerData: %v", la.params.ActivityType, lamd.ActivityType, markerData)
panicMsg := fmt.Sprintf("[TMPRL1100] code executed local activity %v, but history event found %v, markerData: %v", la.params.ActivityType, lamd.ActivityType, markerData)
panicIllegalState(panicMsg)
}
weh.commandsHelper.recordLocalActivityMarker(lamd.ActivityID, details, failure)
Expand Down
49 changes: 49 additions & 0 deletions test/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4724,6 +4724,55 @@ func (ts *IntegrationTestSuite) testNonDeterminismFailureCause(historyMismatch b
ts.True(taskFailedMetric >= 1)
}

func (ts *IntegrationTestSuite) TestNonDeterminismFailureCauseCommandNotFound() {
// Create a situation in which, on replay, an event (MARKER_RECORDED) is encountered and yet the
// code emits no corresponding command.
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

wfID := "test-non-determinism-failure-cause-command-not-found-" + uuid.New()
// Start workflow via UpdateWithStart and wait for update response
startWfOptions := ts.startWorkflowOptions(wfID)
startWfOptions.WorkflowIDConflictPolicy = enumspb.WORKFLOW_ID_CONFLICT_POLICY_FAIL
startWfOp := ts.client.NewWithStartWorkflowOperation(startWfOptions, ts.workflows.NonDeterminismCommandNotFoundWorkflow)
updHandle, err := ts.client.UpdateWithStartWorkflow(ctx, client.UpdateWithStartWorkflowOptions{
StartWorkflowOperation: startWfOp,
UpdateOptions: client.UpdateWorkflowOptions{
WorkflowID: wfID,
UpdateName: "wait-for-wft-completion",
WaitForStage: client.WorkflowUpdateStageCompleted,
},
})
ts.NoError(err)

// WFT 1: workflow shouldEmitCommand is true, workflow accepts and completes update and emits a
// RecordMarker command.
ts.NoError(updHandle.Get(ctx, nil))
// Stop worker and start a new one in order to force full history replay.
ts.worker.Stop()
nextWorker := worker.New(ts.client, ts.taskQueueName, worker.Options{WorkflowPanicPolicy: internal.FailWorkflow})
ts.registerWorkflowsAndActivities(nextWorker)
ts.NoError(nextWorker.Start())
defer nextWorker.Stop()
// Set shouldEmitCommand=false and send second update in order to trigger a WFT.
shouldEmitCommand = false
_, err = ts.client.UpdateWorkflow(ctx, client.UpdateWorkflowOptions{
WorkflowID: wfID,
UpdateName: "wait-for-wft-completion",
WaitForStage: client.WorkflowUpdateStageCompleted,
})
ts.Error(err)
run, err := startWfOp.Get(ctx)
ts.NoError(err)
// WFT 2: full replay, NDE due to missing RecordMarker command.
err = run.Get(ctx, nil)
ts.Error(err)
var workflowErr *temporal.WorkflowExecutionError
ts.True(errors.As(err, &workflowErr))
ts.Contains(workflowErr.Error(),
"[TMPRL1100] During replay, a matching Timer command was expected in history event position 8. However, the replayed code did not produce that.")
}

func (ts *IntegrationTestSuite) TestNonDeterminismFailureCauseReplay() {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
Expand Down
16 changes: 16 additions & 0 deletions test/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2831,6 +2831,21 @@ func (w *Workflows) ForcedNonDeterminism(ctx workflow.Context, sameCommandButDif
return
}

var shouldEmitCommand = true

func (w *Workflows) NonDeterminismCommandNotFoundWorkflow(ctx workflow.Context) error {
workflow.SetUpdateHandler(ctx, "wait-for-wft-completion", func(ctx workflow.Context) error {
return nil
})
if shouldEmitCommand {
_ = workflow.SideEffect(ctx, func(ctx workflow.Context) any {
return nil
}).Get(nil)
}
workflow.Sleep(ctx, 999*time.Hour)
return nil
}

func (w *Workflows) NonDeterminismReplay(ctx workflow.Context) error {
ctx = workflow.WithActivityOptions(ctx, w.defaultActivityOptions())
var a Activities
Expand Down Expand Up @@ -3495,6 +3510,7 @@ func (w *Workflows) register(worker worker.Worker) {
worker.RegisterWorkflow(w.SignalCounter)
worker.RegisterWorkflow(w.PanicOnSignal)
worker.RegisterWorkflow(w.ForcedNonDeterminism)
worker.RegisterWorkflow(w.NonDeterminismCommandNotFoundWorkflow)
worker.RegisterWorkflow(w.NonDeterminismReplay)
worker.RegisterWorkflow(w.MutableSideEffect)
worker.RegisterWorkflow(w.HistoryLengths)
Expand Down

0 comments on commit 1720257

Please sign in to comment.