Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Nexus APIs to the Temporal Go SDK #89

Merged
merged 4 commits into from
May 7, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
369 changes: 369 additions & 0 deletions nexus/sdk-go.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,369 @@
# Nexus APIs for the Temporal Go SDK

## Background

Nexus RPC is a modern open-source service framework for arbitrary-length operations whose lifetime may extend beyond a
traditional RPC. Nexus was designed with durable execution in mind, as an underpinning to connect durable executions
within and across namespaces, clusters and regions – with a clean API contract to streamline multi-team collaboration.
Any service can be exposed as a set of sync or async Nexus operations – the latter provides an operation identity and a
uniform interface to get the status of an operation or its result, receive a completion callback, or cancel the
operation.

Temporal is leveraing the Nexus RPC protocol to facilitate calling across namespace and cluster and boundaries.

### Prior Art

- [Nexus over HTTP Spec](https://github.com/nexus-rpc/api/blob/main/SPEC.md)
- [Nexus SDK Go](https://github.com/nexus-rpc/sdk-go)

### This Proposal

This proposal discusses exposing Nexus concepts in the Temporal Go SDK, reusing definitions from the Nexus Go SDK.

> NOTE: The proposed APIs will all be marked experimental until we get user feedback and we feel they've matured enough.
>
> Due to limitation in the SDK package structure, some of the types shown in this proposal are aliases to types defined
> in the `internal` package. We define them inline here for brevity.

## Temporal Operations

### NewSyncOperation

`NewSyncOperation` creates a synchronous operation, passing the client the worker was created with as an argument to the
handler. Sync operations are useful for exposing short-lived Temporal client requests, such as signals, queries, sync
update, list workflows, etc...

```go
package temporalnexus

import "github.com/nexus-rpc/sdk-go/nexus"

func NewSyncOperation[I any, O any](
name string,
handler func(context.Context, client.Client, I, nexus.StartOperationOptions) (O, error),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not very future proof if there are Temporal-specific start options we ever may want to add. Maybe change nexus.StartOperationOptions to temporalnexus.StartOperationOptions and have the latter just embed the former as its only field?

Comment on lines +42 to +43
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider a more future proof temporalnexus.SyncOperationOptions that accepts a name and a handler. That way you can add other options (some mutually exclusive). Do not overly concern yourself with type inference, Go does not have good type inference, don't let it change the API.

For example:

temporalnexus.NewSyncOperation(temporalnexus.SyncOperationOptions[string, string]{
    Name: "get-status",
    Handler: func(ctx context.Context, c client.Client, param string, opts nexus.StartOperationOptions) (string, error) {
        // ...
    },
})

Or we could offer a shortcut which assumes ID is the param:

temporalnexus.NewSyncOperation(temporalnexus.SyncOperationOptions[string, string]{
    Name: "get-status",
    WorkflowQuery: "some-query",
})

EDIT: Can you just add a NewSyncOperationWithOptions like you did with NewWorkflowRunOperation? Arguably we could only have the options form, but I know there is an (IMO unhealthy) obsession with type inference.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should over complicate the API with future compat concerns.
I don't even think that WorkflowQuery is an option we want here, I'd make a NewWorkflowQueryOperation instead if we want to go that direction but in the previous proposal, you were the one advocating for a single SyncOperation concept and giving users a client.

I'm open to renaming this to NewClientOperation or NewSyncClientOperation.

Copy link
Member

@cretz cretz Apr 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should over complicate the API with future compat concerns.

I don't think it's overcomplicating. Can ignore most of my comment, but I would expect NewSyncOperationWithOptions to exist just like NewWorkflowRunOperationWithOptions does. (having said that, I wish there was only 1 call that took options always, but seems you're settled on 2)

) nexus.Operation[I, O]
```

> NOTE: this is a simple wrapper around the Nexus Go SDK's
> [NewSyncOperation](https://pkg.go.dev/github.com/nexus-rpc/[email protected]/nexus#NewSyncOperation), which can be used in
> sync operations that don't require a Temporal client.

**Usage**:

```go
import (
"github.com/nexus-rpc/sdk-go/nexus"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/temporalnexus"
)

opGetStatus := temporalnexus.NewSyncOperation("get-status", func(ctx context.Context, c client.Client, id string, opts nexus.StartOperationOptions) (int, error) {
res, err := c.QueryWorkflow(ctx, id, "", "some-query", nil)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you confirm for me that we decided that operations cannot have arbitrary user-defined operations on themselves and that instead the id-as-parameter pattern is how you interact w/ an existing operation (besides the built-ins like cancel)? If this pattern is so common, we can have helpers for it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, operations have a limited pre-defined interface.

if err != nil {
return 0, err
}
var ret int
return ret, res.Get(&ret)
})

// Operations don't have to return values.
opContinue := temporalnexus.NewSyncOperation("continue-processing", func(ctx context.Context, c client.Client, id string, opts nexus.StartOperationOptions) (nexus.NoValue, error) {
return nil, c.SignalWorkflow(ctx, id, "", "continue-processing", nil)
})
```

### NewWorkflowRunOperation

```go
package temporalnexus

import (
"github.com/nexus-rpc/sdk-go/nexus"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/workflow"
)

// NewWorkflowRunOperation maps an operation to a workflow run.
func NewWorkflowRunOperation[I, O any](
name string,
workflow func(internal.Context, I) (O, error),
getOptions func(context.Context, I, nexus.StartOperationOptions) (client.StartWorkflowOptions, error),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this param be nil?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, you need at least a workflow ID to start a workflow.

Copy link
Member

@cretz cretz Apr 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add that this param cannot be nil in the docs? Usually people in Go don't expect to have to provide a workflow ID, so this is a change.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm.. right, they'll need to if they want their operation to be idempotent. It should be a deterministic function from input to ID.

) nexus.Operation[I, O]

type WorkflowRunOperationOptions[I, O any] struct {
// Operation name.
Name string
// Workflow function to map this operation to. The operation input maps directly to workflow input.
// The workflow name is resolved as it would when using this function in client.ExecuteOperation.
// GetOptions must be provided when setting this option. Mutually exclusive with Handler.
Workflow func(workflow.Context, I) (O, error)
// Options for starting the workflow. Must be set if Workflow is set. Mutually exclusive with Handler.
GetOptions func(context.Context, I, nexus.StartOperationOptions) (client.StartWorkflowOptions, error)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For others reading, the Handler term is commonly used in Go in places like net/http.Server.Handler and the GetOptions type of name is used in places like crypto/tls.Config.GetCertificate (even though it may feel inconsistent that it's not GetHandler or OptionsGetter)

// Handler for starting a workflow with a different input than the operation. Mutually exclusive with Workflow
// and GetOptions.
Handler func(context.Context, I, nexus.StartOperationOptions) (WorkflowHandle[O], error)
}

// NewWorkflowRunOperation map an operation to a workflow run with the given options.
func NewWorkflowRunOperationWithOptions[I, O any](opts WorkflowRunOperationOptions[I, O]) nexus.Operation[I, O]

type WorkflowHandle[T any] interface {
ID() string
RunID() string
}

// ExecuteWorkflow starts a workflow run for a [WorkflowRunOperationOptions.Handler], linking the execution chain to a
// Nexus operation (subsequent runs started from continue-as-new and retries).
// Automatically propagates the callback and request ID from the nexus options to the workflow.
func ExecuteWorkflow[I, O any, WF func(internal.Context, I) (O, error)](
ctx context.Context,
nexusOptions nexus.StartOperationOptions,
startWorkflowOptions client.StartWorkflowOptions,
workflow WF,
arg I,
) (WorkflowHandle[O], error)

// ExecuteUntypedWorkflow starts a workflow with by function reference or string name, linking the execution chain to a
// Nexus operation.
// Useful for invoking workflows that don't follow the single argument - single return type signature.
// See [ExecuteWorkflow] for more information.
func ExecuteUntypedWorkflow[R any](
ctx context.Context,
nexusOptions nexus.StartOperationOptions,
startWorkflowOptions client.StartWorkflowOptions,
workflow any,
args ...any,
) (WorkflowHandle[R], error)
```

**Usage**:

```go
import (
"github.com/nexus-rpc/sdk-go/nexus"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/temporalnexus"
)

func MyHandlerWorkflow(workflow.Context, MyInput) (MyOutput, error)
func MyHandlerWorkflowWithAlternativeInput(workflow.Context, MyWorkflowInput) (MyOutput, error)

// Alternative 1 - shortest form, for workflows that have input and outputs that map 1:1 with the operation's I/O.
opStartTransactionAlt1 := temporalnexus.NewWorkflowRunOperation(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To clarify, the only reason this exists is for people not wanting to make an options object? But we still make them make a full workflow start options callback? Did they really save much? Maybe instead you should have a NewWorkflowRunOperationOptions that accepts these fields instead of top-level overloads.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's the reason. I believe this will be the preferred way to expose workflows as operations so I made this shorthand form.

The reason I don't like the options struct is that you have to explicitly type the type params but yes a NewWorkflowRunOperationOptions also solves that. But keep in mind that even NewWorkflowRunOperationOptions may require overloads to support handler and (in the future) a result mapper.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason I don't like the options struct is that you have to explicitly type the type params

I don't think this is that bad of a thing in Go and I think we're adding extra methods do to this personal dislike. But I don't have a super strong opinion just so long as we have proper options-struct-based form we can grow on all of these.

"start-transaction",
MyHandlerWorkflow,
func(ctx context.Context, input MyInput, opts nexus.StartOperationOptions) (client.StartWorkflowOptions, error) {
return client.StartWorkflowOptions{
ID: input.ID,
}, nil
})

// Alternative 2 - same as above but using the "WithOptions" method.
opStartTransactionAlt2 := temporalnexus.NewWorkflowRunOperationWithOptions(
temporalnexus.WorkflowRunOperationOptions[MyInput, MyOutput]{
Name: "start-transaction",
Workflow: MyHandlerWorkflow,
GetOptions: func(ctx context.Context, input MyInput, opts nexus.StartOperationOptions) (client.StartWorkflowOptions, error) {
return client.StartWorkflowOptions{
ID: input.ID,
}, nil
},
})

// Alternative 3 - start a workflow with alternative inputs.
opStartTransactionAlt3 := temporalnexus.NewWorkflowRunOperationWithOptions(
temporalnexus.WorkflowRunOperationOptions[MyInput, MyOutput]{
Name: "start-transaction",
Handler: func(ctx context.Context, input MyInput, opts nexus.StartOperationOptions) (temporalnexus.WorkflowHandle[MyOutput], error) {
// Workflows started with this API must take a single input and return single output.
return temporalnexus.ExecuteWorkflow(ctx, opts, client.StartWorkflowOptions{
ID: input.ID,
}, MyHandlerWorkflowWithAlternativeInput, MyWorkflowInput{})
},
})

// Alternative 4 - start a workflow with an arbitrary number of inputs (either using a string or workflow function for
// the name).
opStartTransactionAlt4 := temporalnexus.NewWorkflowRunOperationWithOptions(
temporalnexus.WorkflowRunOperationOptions[MyInput, MyOutput]{
Name: "start-transaction",
Handler: func(ctx context.Context, input MyInput, opts nexus.StartOperationOptions) (temporalnexus.WorkflowHandle[MyOutput], error) {
// Run any arbitrary workflow.
return temporalnexus.ExecuteUntypedWorkflow[MyOutput](ctx, opts, client.StartWorkflowOptions{
ID: input.ID,
}, "SomeOtherWorkflow", input1, input2, input3)
},
})
```

### Register Operations As Part of a Service with a Worker

We define a `NexusServiceRegistry` interface with a single method `RegisterNexusService` and embed it in the `Registry`
interface:

```go
import "github.com/nexus-rpc/sdk-go/nexus"

Registry interface {
WorkflowRegistry
ActivityRegistry
NexusServiceRegistry // <-- New
}

NexusServiceRegistry interface {
// RegisterNexusService registers a service with a worker. Panics if a service with the same name has already
// been registered on this worker or if the worker has already been started. A worker will only poll for
// Nexus tasks if any services are registered on it.
RegisterNexusService(service *nexus.Service)
}
```

**Usage**:

```go
service := nexus.NewService("payments")
err := service.Register(myOperation, myOtherOperation)
if err != nil {
panic(err)
}

myWorker.RegisterNexusService(service)
```

## Starting an Operation from a Workflow

The `ExecuteNexusOperation` API is modelled after `ExecuteChildWorkflow`, leveraging futures that can be used with the
SDK's selectors.

> NOTE: In the future, as the Go SDK adds support for typed futures, we will add a strongly typed variant of this API.

```go
// NexusOperationOptions are options for starting a Nexus Operation from a Workflow.
type NexusOperationOptions struct {
ScheduleToCloseTimeout time.Duration
}

// NexusOperationExecution is the result of [NexusOperationFuture.GetNexusOperationExecution].
type NexusOperationExecution struct {
OperationID string
}

// NexusOperationFuture represents the result of a Nexus Operation.
type NexusOperationFuture interface {
Future
// GetNexusOperationExecution returns a future that is resolved when the operation reaches the STARTED state.
// For synchronous operations, this will be resolved at the same as the containing [NexusOperationFuture]. For
// asynchronous operations, this future is resolved independently.
// If the operation is unsuccessful, this future will contain the same error as the [NexusOperationFuture].
// Use this method to extract the Operation ID of an asynchronous operation. OperationID will be empty for
// synchronous operations.
GetNexusOperationExecution() Future
}

// NexusClient is a client for executing Nexus Operations from a workflow.
type NexusClient interface {
// ExecuteOperation executes a Nexus Operation.
// The operation argument can be a string, a [nexus.Operation] or a [nexus.OperationReference].
ExecuteOperation(ctx Context, operation any, input any, options NexusOperationOptions) NexusOperationFuture
}

// Create a [NexusClient] from a service name and an endpoint name.
func NewNexusClient(service, endpoint string) NexusClient
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider reversing the order to endpoint, service.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also considering adding context and an options struct to future proof this API.

```

**Usage**:

```go
import (
"fmt"
"time"

"go.temporal.io/sdk/workflow"
)

func MyCallerWorkflow(ctx workflow.Context) (MyOutput, error) {
client := workflow.NewNexusClient("payments", "prod-payments")
fut := client.ExecuteOperation(ctx, "start-transaction", MyInput{ID: "tx-deadbeef"}, workflow.NexusOperationOptions{
ScheduleToCloseTimeout: time.Hour,
})
var exec workflow.NexusOperationExecution
_ = fut.GetNexusOperationExecution().Get(ctx, &exec)
fmt.Println(exec.OperationID) // May be empty if the operation completed synchronously.
var result MyOutput
return result, fut.Get(ctx, &result)
}
```

> NOTE: To cancel a Nexus Operation, cancel the context used to execute it.

### Interceptors

For now we'll only intercept outbound APIs from a workflow, extending the `WorkflowOutboundInterceptor` interface.
More interceptors are likely to come later.

```go
type WorkflowOutboundInterceptor interface {
NewNexusClient(service, endpoint string) workflow.NexusClient
RequestCancelNexusOperation(ctx Context, service, operation, id string, options nexus.CancelOperationOptions) error
}
```

## Sequence

### Async Operation Flow

```mermaid
sequenceDiagram
CallerWorkflow->>CallerWorker: ExecuteNexusOperation
CallerWorker->>CallerTemporal: RespondWorkflowTaskCompleted<br>(ScheduleNexusOperation)
CallerTemporal->>CallerTemporal: Record NexusOperationScheduled
loop Until Succeeded
CallerTemporal->>HandlerTemporal: StartNexusOperation(requestID, callback, input, ...)
HandlerWorker->>HandlerTemporal: PollNexusTaskQueue
HandlerTemporal-->>HandlerWorker: Nexus Task
HandlerWorker->>UserHandler: StartNexusOperation
UserHandler->>HandlerTemporal: StartWorkflowExecution
HandlerTemporal-->>UserHandler: OK
UserHandler-->>HandlerWorker: OperationID
HandlerWorker-->>HandlerTemporal: RespondNexusTaskCompleted(OperationID)
HandlerTemporal-->>CallerTemporal: OperationID
end
CallerWorker->>CallerTemporal: PollWorkflowTaskQueue
CallerTemporal-->>CallerWorker: Workflow Task (NexusOperationStarted)
CallerWorker-->>CallerWorkflow: Unblock "started" future

Note right of HandlerTemporal: Some time later...

ArbitraryHandlerWorker->>HandlerTemporal: PollWorkflowTaskQueue
HandlerTemporal-->>ArbitraryHandlerWorker: Workflow Task (start workflow from UserHandler)
ArbitraryHandlerWorker->>HandlerTemporal: RespondWorkflowTaskCompleted<br>(WorkflowExecutionCompleted)
HandlerTemporal-->>ArbitraryHandlerWorker: OK
HandlerTemporal->>CallerTemporal: POST callback with operation (workflow) result
CallerTemporal-->>HandlerTemporal: OK
CallerWorker->>CallerTemporal: PollWorkflowTaskQueue
CallerTemporal-->>CallerWorker: Workflow Task (NexusOperationCompleted)
CallerWorker-->>CallerWorkflow: Unblock "completed" future
```

### Sync Operation Flow

```mermaid
sequenceDiagram
CallerWorkflow->>CallerWorker: ExecuteNexusOperation
CallerWorker->>CallerTemporal: RespondWorkflowTaskCompleted<br>(ScheduleNexusOperation)
CallerTemporal->>CallerTemporal: Record NexusOperationScheduled
loop Until Succeeded
CallerTemporal->>HandlerTemporal: StartNexusOperation(requestID, callback, input, ...)
HandlerWorker->>HandlerTemporal: PollNexusTaskQueue
HandlerTemporal-->>HandlerWorker: Nexus Task
HandlerWorker->>UserHandler: StartNexusOperation
UserHandler->>HandlerTemporal: Signal/Query/List
HandlerTemporal-->>UserHandler: OK
UserHandler-->>HandlerWorker: Result
HandlerWorker-->>HandlerTemporal: RespondNexusTaskCompleted(Result)
HandlerTemporal-->>CallerTemporal: Result
end
CallerWorker->>CallerTemporal: PollWorkflowTaskQueue
CallerTemporal-->>CallerWorker: Workflow Task (NexusOperationCompleted)
CallerWorker-->>CallerWorkflow: Unblock "started" + "completed" futures
```