Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add default update handler #1640

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
205 changes: 173 additions & 32 deletions packages/test/src/test-workflows.ts
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,40 @@ function makeSetPatchMarker(myPatchId: string, deprecated: boolean): coresdk.wor
};
}

function makeUpdateActivationJob(
id: string,
protocolInstanceId: string,
name: string,
input: unknown[]
): coresdk.workflow_activation.IWorkflowActivationJob {
return {
doUpdate: {
id,
protocolInstanceId,
name,
input: toPayloads(defaultPayloadConverter, ...input),
},
};
}

function makeUpdateAcceptedResponse(id: string): coresdk.workflow_commands.IWorkflowCommand {
return {
updateResponse: {
protocolInstanceId: id,
accepted: {},
},
};
}

function makeUpdateCompleteResponse(id: string, result: unknown): coresdk.workflow_commands.IWorkflowCommand {
return {
updateResponse: {
protocolInstanceId: id,
completed: defaultPayloadConverter.toPayload(result),
},
};
}

test('random', async (t) => {
const { logs, workflowType } = t.context;
{
Expand Down Expand Up @@ -2402,23 +2436,9 @@ test('Signals/Updates/Activities/Timers - Trace promises completion order - pre-
...makeActivation(
undefined,
makeSignalWorkflowJob('aaSignal', ['signal1']),
{
doUpdate: {
id: 'first',
name: 'aaUpdate',
protocolInstanceId: '1',
input: toPayloads(defaultPayloadConverter, ['update1']),
},
},
makeUpdateActivationJob('first', '1', 'aaUpdate', ['update1']),
makeSignalWorkflowJob('aaSignal', ['signal2']),
{
doUpdate: {
id: 'second',
name: 'aaUpdate',
protocolInstanceId: '2',
input: toPayloads(defaultPayloadConverter, ['update2']),
},
},
makeUpdateActivationJob('second', '2', 'aaUpdate', ['update2']),
makeFireTimerJob(1),
makeResolveActivityJob(1, { completed: {} })
),
Expand Down Expand Up @@ -2481,23 +2501,9 @@ test('Signals/Updates/Activities/Timers - Trace promises completion order - 1.11
...makeActivation(
undefined,
makeSignalWorkflowJob('aaSignal', ['signal1']),
{
doUpdate: {
id: 'first',
name: 'aaUpdate',
protocolInstanceId: '1',
input: toPayloads(defaultPayloadConverter, ['update1']),
},
},
makeUpdateActivationJob('first', '1', 'aaUpdate', ['update1']),
makeSignalWorkflowJob('aaSignal', ['signal2']),
{
doUpdate: {
id: 'second',
name: 'aaUpdate',
protocolInstanceId: '2',
input: toPayloads(defaultPayloadConverter, ['update2']),
},
},
makeUpdateActivationJob('second', '2', 'aaUpdate', ['update2']),
makeFireTimerJob(1),
makeResolveActivityJob(1, { completed: {} })
),
Expand Down Expand Up @@ -2528,3 +2534,138 @@ test('Signals/Updates/Activities/Timers - Trace promises completion order - 1.11
);
}
});

test('Buffered updates are dispatched in the correct order - updatesOrdering', async (t) => {
const { workflowType } = t.context;
{
const completion = await activate(
t,
makeActivation(
undefined,
makeInitializeWorkflowJob(workflowType),
makeUpdateActivationJob('1', '1', 'non-existant', [1]),
makeUpdateActivationJob('2', '2', 'updateA', [2]),
makeUpdateActivationJob('3', '3', 'updateA', [3]),
makeUpdateActivationJob('4', '4', 'updateC', [4]),
makeUpdateActivationJob('5', '5', 'updateB', [5]),
makeUpdateActivationJob('6', '6', 'non-existant', [6]),
makeUpdateActivationJob('7', '7', 'updateB', [7])
)
);

// The activation above:
// - initializes the workflow
// - buffers all its updates (we attempt update jobs first, but since there are no handlers, they get buffered)
// - enters the workflow code
// - workflow code sets handler for updateA
// - handler is registered for updateA
// - we attempt to dispatch buffered updates
// - buffered updates for handler A are *accepted* but not executed
// (executing an update is a promise/async, so it instead goes on the node event queue)
// - we continue/re-enter the workflow code
// - ...and do the same pattern for updateB, the default update handler, the updateC
// - once updates have been accepted, node processes the waiting events in its queue (the waiting updates)
// - these are processesed in FIFO order, so we get execution for updateA, then updateB, the default handler, then updateC

// As such, the expected order of these updates is the order that the handlers were registered.
// Note that because the default handler was registered *before* updateC, all remaining buffered updates were dispatched
// to it, including the update for updateC.

compareCompletion(
t,
completion,
makeSuccess(
[
// FIFO accepted order
makeUpdateAcceptedResponse('2'),
makeUpdateAcceptedResponse('3'),
makeUpdateAcceptedResponse('5'),
makeUpdateAcceptedResponse('7'),
makeUpdateAcceptedResponse('1'),
makeUpdateAcceptedResponse('4'),
makeUpdateAcceptedResponse('6'),
// FIFO executed order
makeUpdateCompleteResponse('2', { handler: 'updateA', args: [2] }),
makeUpdateCompleteResponse('3', { handler: 'updateA', args: [3] }),
makeUpdateCompleteResponse('5', { handler: 'updateB', args: [5] }),
makeUpdateCompleteResponse('7', { handler: 'updateB', args: [7] }),
makeUpdateCompleteResponse('1', { handler: 'default', updateName: 'non-existant', args: [1] }),
// updateC handled by default handler.
makeUpdateCompleteResponse('4', { handler: 'default', updateName: 'updateC', args: [4] }),
makeUpdateCompleteResponse('6', { handler: 'default', updateName: 'non-existant', args: [6] }),
// No expected update response from updateC handler
makeCompleteWorkflowExecution(),
]
// [SdkFlags.ProcessWorkflowActivationJobsAsSingleBatch]
)
);
}
});

test('Buffered updates are reentrant - updatesAreReentrant', async (t) => {
const { workflowType } = t.context;
{
const completion = await activate(
t,
makeActivation(
undefined,
makeInitializeWorkflowJob(workflowType),
makeUpdateActivationJob('1', '1', 'non-existant', [1]),
makeUpdateActivationJob('2', '2', 'updateA', [2]),
makeUpdateActivationJob('3', '3', 'updateA', [3]),
makeUpdateActivationJob('4', '4', 'updateC', [4]),
makeUpdateActivationJob('5', '5', 'updateB', [5]),
makeUpdateActivationJob('6', '6', 'non-existant', [6]),
makeUpdateActivationJob('7', '7', 'updateB', [7]),
makeUpdateActivationJob('8', '8', 'updateC', [8])
)
);

// The activation above:
// - initializes the workflow
// - buffers all its updates (we attempt update jobs first, but since there are no handlers, they get buffered)
// - enters the workflow code
// - workflow code sets handler for updateA
// - handler is registered for updateA
// - we attempt to dispatch buffered updates
// - buffered updates for handler A are *accepted* but not executed
// (executing an update is a promise/async, so it instead goes on the node event queue)
// - however, there is no more workflow code, node dequues event queue and we immediately run the update handler
// (we begin executing the update which...)
// - deletes the current handler and registers the next one (updateB)
// - this pattern repeats (updateA -> updateB -> updateC -> default) until there are no more updates to handle
// - at this point, all updates have been accepted and are executing
// - due to the call order in the workflow, the completion order of the updates follows the call stack, LIFO

// This workflow is interesting in that updates are accepted FIFO, but executed LIFO

compareCompletion(
t,
completion,
makeSuccess(
[
// FIFO accepted order
makeUpdateAcceptedResponse('2'),
makeUpdateAcceptedResponse('5'),
makeUpdateAcceptedResponse('4'),
makeUpdateAcceptedResponse('1'),
makeUpdateAcceptedResponse('3'),
makeUpdateAcceptedResponse('7'),
makeUpdateAcceptedResponse('8'),
makeUpdateAcceptedResponse('6'),
// LIFO executed order
makeUpdateCompleteResponse('6', { handler: 'default', updateName: 'non-existant', args: [6] }),
makeUpdateCompleteResponse('8', { handler: 'updateC', args: [8] }),
makeUpdateCompleteResponse('7', { handler: 'updateB', args: [7] }),
makeUpdateCompleteResponse('3', { handler: 'updateA', args: [3] }),
makeUpdateCompleteResponse('1', { handler: 'default', updateName: 'non-existant', args: [1] }),
makeUpdateCompleteResponse('4', { handler: 'updateC', args: [4] }),
makeUpdateCompleteResponse('5', { handler: 'updateB', args: [5] }),
makeUpdateCompleteResponse('2', { handler: 'updateA', args: [2] }),
makeCompleteWorkflowExecution(),
]
// [SdkFlags.ProcessWorkflowActivationJobsAsSingleBatch]
)
);
}
});
1 change: 1 addition & 0 deletions packages/test/src/workflows/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,3 +89,4 @@ export * from './upsert-and-read-search-attributes';
export * from './wait-on-user';
export * from './workflow-cancellation-scenarios';
export * from './upsert-and-read-memo';
export * from './updates-ordering';
77 changes: 77 additions & 0 deletions packages/test/src/workflows/updates-ordering.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import { defineUpdate, setDefaultUpdateHandler, setHandler } from '@temporalio/workflow';

const updateA = defineUpdate<ProcessedUpdate, [number]>('updateA');
const updateB = defineUpdate<ProcessedUpdate, [number]>('updateB');
const updateC = defineUpdate<ProcessedUpdate, [number]>('updateC');

interface ProcessedUpdate {
handler: string;
updateName?: string;
args: unknown[];
}

/*
There's a surprising amount going on with the workflow below. Let's simplify it to just updateA and updateB
(no updateC or the default) and walk through it.
1. setHandler for updateA
- this is all synchronous code until we run `UpdateScope.updateWithInfo(updateId, name, doUpdateImpl)`,
which calls `doUpdateImpl` which is promise/async, so...
Copy link
Contributor

@mjameswh mjameswh Mar 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be exact, it is sync until the point where we actually call the update handler (i.e. the execute(input) call in doUpdateImpl(). By specification, the update validation must be executed sync, and the workflow handler function must be called immediately after. Otherwise, the workflow's internal state could be modified between the validation and the execute functions, and the validation result could be no longer valid by the time the execute function is called.

And in the specific case of this Workflow, the update handler is purely sync (i.e. there is no promise awaited on that code path), so it will also execute synchronously.

Copy link
Contributor Author

@THardy98 THardy98 Mar 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm confused.

By specification, the update validation must be executed sync, and the workflow handler function must be called immediately after. Otherwise, the workflow's internal state could be modified between the validation and the execute functions, and the validation result could be no longer valid by the time the execute function is called.

This makes sense to me.

But how can the code be sync up until we call the execute call?

  /** Alias to `new UpdateScope({ id, name }).run(fn)` */
  static updateWithInfo<T>(id: string, name: string, fn: () => Promise<T>): Promise<T> {
    return new this({ id, name }).run(fn);
  }

UpdateScope.updateWithInfo(updateId, name, doUpdateImpl)

This is the async call at the end of doUpdate, it returns a promise.
doUpdateImpl itself is an async function as well.
How could these async calls guarantee sync execution till we execute the handler?

This is irrespective of the update handler code (async or not).

2. queue doUpdateImpl for A on node event queue: [doUpdateImplA]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAICS, in the case of this specific workflow, doUpdateImplA will not get queued on node's event queue because there's no promise that is awaited from that update handler. Same for updateB.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doUpdateImplA is async though, so I thought differently. Maybe we should coalesce this conversation into the other comment.

3. continue running the workflow code (currently running code, we aren't awaiting the promise)
4. setHandler for updateB
- same deal as A
5. queue doUpdateImpl for B on node event queue: [doUpdateImplA, doUpdateImplB]
6. finished workflow code, go through the event queue
7. doUpdateImplA
- synchronous until we get to `execute`, which means we've accepted the update, command ordering [acceptA]
8. `execute` returns a promise, add it to the node event queue: [doUpdateImplB, executeA]
9. doUpdateImplB
- same deal as A, command ordering [acceptA, acceptB]
- `execute` returns promise, node event queue [executeA, executeB]
10. execute update A, node event queue [executeB], command ordering [acceptA, acceptB, executeA]
11. execute update B, node event queue [] (empty), command ordering [acceptA, acceptB, executeA, executeB]
The only additional complexity with the workflow below is that once the default handler is registered, buffered updates for C will be
dispatched to the default handler. So in this scenario: C1, C2 -> default registered -> C registered, both C1 and C2 will be dispatched
to the default handler.
*/
export async function updatesOrdering(): Promise<void> {
setHandler(updateA, (...args: any[]) => {
return { handler: 'updateA', args };
});
setHandler(updateB, (...args: any[]) => {
return { handler: 'updateB', args };
});
setDefaultUpdateHandler((updateName, ...args: any[]) => {
return { handler: 'default', updateName, args };
});
setHandler(updateC, (...args: any[]) => {
return { handler: 'updateC', args };
});
}

export async function updatesAreReentrant(): Promise<void> {
function handlerA(...args: any[]) {
setHandler(updateA, undefined);
setHandler(updateB, handlerB);
return { handler: 'updateA', args };
}
function handlerB(...args: any[]) {
setHandler(updateB, undefined);
setHandler(updateC, handlerC);
return { handler: 'updateB', args };
}
function handlerC(...args: any[]) {
setHandler(updateC, undefined);
setDefaultUpdateHandler(defaultHandler);
return { handler: 'updateC', args };
}
function defaultHandler(updateName: string, ...args: any[]) {
setDefaultUpdateHandler(undefined);
setHandler(updateA, handlerA);
return { handler: 'default', updateName, args };
}

setHandler(updateA, handlerA);
}
5 changes: 5 additions & 0 deletions packages/workflow/src/interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,11 @@ export type Handler<
*/
export type DefaultSignalHandler = (signalName: string, ...args: unknown[]) => void | Promise<void>;

/**
* A handler function accepting update calls for non-registered update names.
*/
export type DefaultUpdateHandler = (updateName: string, ...args: unknown[]) => Promise<unknown> | unknown;

/**
* A handler function accepting query calls for non-registered query names.
*/
Expand Down
42 changes: 34 additions & 8 deletions packages/workflow/src/internals.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import {
WorkflowInfo,
WorkflowCreateOptionsInternal,
ActivationCompletion,
DefaultUpdateHandler,
DefaultQueryHandler,
} from './interfaces';
import { type SinkCall } from './sinks';
Expand Down Expand Up @@ -190,6 +191,11 @@ export class Activator implements ActivationHandler {
*/
defaultSignalHandler?: DefaultSignalHandler;

/**
* A update handler that catches calls for non-registered update names.
*/
defaultUpdateHandler?: DefaultUpdateHandler;

/**
* A query handler that catches calls for non-registered query names.
*/
Expand Down Expand Up @@ -676,8 +682,20 @@ export class Activator implements ActivationHandler {
if (!protocolInstanceId) {
throw new TypeError('Missing activation update protocolInstanceId');
}
const entry = this.updateHandlers.get(name);
if (!entry) {

const entry =
this.updateHandlers.get(name) ??
(this.defaultUpdateHandler
? {
handler: this.defaultUpdateHandler.bind(this, name),
validator: undefined,
// Default to a warning policy.
unfinishedPolicy: HandlerUnfinishedPolicy.WARN_AND_ABANDON,
}
: null);

// If we don't have an entry from either source, buffer and return
if (entry === null) {
this.bufferedUpdates.push(activation);
return;
}
Expand Down Expand Up @@ -769,13 +787,21 @@ export class Activator implements ActivationHandler {
public dispatchBufferedUpdates(): void {
const bufferedUpdates = this.bufferedUpdates;
while (bufferedUpdates.length) {
const foundIndex = bufferedUpdates.findIndex((update) => this.updateHandlers.has(update.name as string));
if (foundIndex === -1) {
// No buffered Updates have a handler yet.
break;
// We have a default update handler, so all updates are dispatchable.
if (this.defaultUpdateHandler) {
const update = bufferedUpdates.shift();
// Logically, this must be defined as we're in the loop.
// But Typescript doesn't know that so we use a non-null assertion (!).
this.doUpdate(update!);
} else {
const foundIndex = bufferedUpdates.findIndex((update) => this.updateHandlers.has(update.name as string));
if (foundIndex === -1) {
// No buffered Updates have a handler yet.
break;
}
const [update] = bufferedUpdates.splice(foundIndex, 1);
this.doUpdate(update);
}
const [update] = bufferedUpdates.splice(foundIndex, 1);
this.doUpdate(update);
}
}

Expand Down
Loading
Loading