Skip to content

Commit

Permalink
[Nexus] Set OnConflictOptions for WorkflowRunOperation (#1797)
Browse files Browse the repository at this point in the history
* [Nexus] Set OnConflictOptions for WorkflowRunOperation

* address comments

* set WorkflowExecutionErrorWhenAlreadyStarted to true
  • Loading branch information
rodrigozhou authored Feb 24, 2025
1 parent f5882aa commit b2b75c9
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 2 deletions.
26 changes: 24 additions & 2 deletions internal/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -647,8 +647,11 @@ type (
// When WorkflowExecutionErrorWhenAlreadyStarted is true, Client.ExecuteWorkflow will return an error if the
// workflow id has already been used and WorkflowIDReusePolicy or WorkflowIDConflictPolicy would
// disallow a re-run. If it is set to false, rather than erroring a WorkflowRun instance representing
// the current or last run will be returned. However, when WithStartOperation is set, this field is ignored and
// the WorkflowIDConflictPolicy UseExisting must be used instead to prevent erroring.
// the current or last run will be returned. However, this field is ignored in the following cases:
// - when WithStartOperation is set;
// - in the Nexus WorkflowRunOperation.
// When this field is ignored, you must set WorkflowIDConflictPolicy to UseExisting to prevent
// erroring.
//
// Optional: defaults to false
WorkflowExecutionErrorWhenAlreadyStarted bool
Expand Down Expand Up @@ -742,6 +745,14 @@ type (
callbacks []*commonpb.Callback
// links. Only settable by the SDK - e.g. [temporalnexus.workflowRunOperation].
links []*commonpb.Link

// OnConflictOptions - Optional workflow ID conflict options used in conjunction with conflict policy
// WORKFLOW_ID_CONFLICT_POLICY_USE_EXISTING. If onConflictOptions is set and a workflow is already
// running, the options specifies the actions to be taken on the running workflow. If not set or use
// together with any other WorkflowIDConflictPolicy, this parameter is ignored.
//
// NOTE: Only settable by the SDK -- e.g. [temporalnexus.workflowRunOperation].
onConflictOptions *OnConflictOptions
}

// WithStartWorkflowOperation defines how to start a workflow when using UpdateWithStartWorkflow.
Expand Down Expand Up @@ -1195,3 +1206,14 @@ func SetCallbacksOnStartWorkflowOptions(opts *StartWorkflowOptions, callbacks []
func SetLinksOnStartWorkflowOptions(opts *StartWorkflowOptions, links []*commonpb.Link) {
opts.links = links
}

// SetOnConflictOptionsOnStartWorkflowOptions is an internal only method for setting conflict
// options on StartWorkflowOptions.
// OnConflictOptions are purposefully not exposed to users for the time being.
func SetOnConflictOptionsOnStartWorkflowOptions(opts *StartWorkflowOptions) {
opts.onConflictOptions = &OnConflictOptions{
AttachRequestID: true,
AttachCompletionCallbacks: true,
AttachLinks: true,
}
}
1 change: 1 addition & 0 deletions internal/internal_workflow_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1678,6 +1678,7 @@ func (w *workflowClientInterceptor) createStartWorkflowRequest(
CompletionCallbacks: in.Options.callbacks,
Links: in.Options.links,
VersioningOverride: versioningOverrideToProto(in.Options.VersioningOverride),
OnConflictOptions: in.Options.onConflictOptions.ToProto(),
}

startRequest.UserMetadata, err = buildUserMetadata(in.Options.StaticSummary, in.Options.StaticDetails, dataConverter)
Expand Down
21 changes: 21 additions & 0 deletions internal/internal_workflow_execution_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,16 @@ type (
// Required if behavior is [VersioningBehaviorPinned]. Must be absent if behavior is not [VersioningBehaviorPinned].
PinnedVersion string
}

// OnConflictOptions specifies the actions to be taken when using the workflow ID conflict policy
// USE_EXISTING.
//
// NOTE: Experimental
OnConflictOptions struct {
AttachRequestID bool
AttachCompletionCallbacks bool
AttachLinks bool
}
)

// Mapping WorkflowExecutionOptions field names to proto ones.
Expand Down Expand Up @@ -209,3 +219,14 @@ func (r *UpdateWorkflowExecutionOptionsRequest) validateAndConvertToProto(namesp

return requestMsg, nil
}

func (o *OnConflictOptions) ToProto() *workflowpb.OnConflictOptions {
if o == nil {
return nil
}
return &workflowpb.OnConflictOptions{
AttachRequestId: o.AttachRequestID,
AttachCompletionCallbacks: o.AttachCompletionCallbacks,
AttachLinks: o.AttachLinks,
}
}
12 changes: 12 additions & 0 deletions temporalnexus/operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,11 @@ type WorkflowRunOperationOptions[I, O any] struct {
// The options returned must include a workflow ID that is deterministically generated from the input in order
// for the operation to be idempotent as the request to start the operation may be retried.
// TaskQueue is optional and defaults to the current worker's task queue.
// WorkflowExecutionErrorWhenAlreadyStarted is ignored and always set to true.
// WorkflowIDConflictPolicy is by default set to fail if a workflow is already running. That is,
// if a caller executes another operation that starts the same workflow, it will fail. You can set
// it to WORKFLOW_ID_CONFLICT_POLICY_USE_EXISTING to attach the caller's callback to the existing
// running workflow. This way, all attached callers will be notified when the workflow completes.
GetOptions func(context.Context, I, nexus.StartOperationOptions) (client.StartWorkflowOptions, error)
// Handler for starting a workflow with a different input than the operation. Mutually exclusive with Workflow
// and GetOptions.
Expand Down Expand Up @@ -382,6 +387,13 @@ func ExecuteUntypedWorkflow[R any](
}
}
internal.SetLinksOnStartWorkflowOptions(&startWorkflowOptions, links)
internal.SetOnConflictOptionsOnStartWorkflowOptions(&startWorkflowOptions)

// This makes sure that ExecuteWorkflow will respect the WorkflowIDConflictPolicy, ie., if the
// conflict policy is to fail (default value), then ExecuteWorkflow will return an error if the
// workflow already running. For Nexus, this ensures that operation has only started successfully
// when the callback has been attached to the workflow (new or existing running workflow).
startWorkflowOptions.WorkflowExecutionErrorWhenAlreadyStarted = true

run, err := GetClient(ctx).ExecuteWorkflow(ctx, startWorkflowOptions, workflowType, args...)
if err != nil {
Expand Down

0 comments on commit b2b75c9

Please sign in to comment.