Skip to content

Commit

Permalink
add default query handler (#1639)
Browse files Browse the repository at this point in the history
  • Loading branch information
THardy98 authored Mar 10, 2025
1 parent eb67d04 commit 4155dad
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 3 deletions.
56 changes: 55 additions & 1 deletion packages/test/src/test-integration-split-two.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import {
import { msToNumber, tsToMs } from '@temporalio/common/lib/time';
import { decode as payloadDecode, decodeFromPayloadsAtIndex } from '@temporalio/common/lib/internal-non-workflow';

import { condition, defineQuery, setHandler, sleep } from '@temporalio/workflow';
import { condition, defineQuery, defineSignal, setDefaultQueryHandler, setHandler, sleep } from '@temporalio/workflow';
import { configurableHelpers, createTestWorkflowBundle } from './helpers-integration';
import * as activities from './activities';
import * as workflows from './workflows';
Expand Down Expand Up @@ -697,3 +697,57 @@ test('Query does not cause condition to be triggered', configMacro, async (t, co
// Worker did not crash
t.pass();
});

const completeSignal = defineSignal('complete');
const definedQuery = defineQuery<QueryNameAndArgs>('query-handler-type');

interface QueryNameAndArgs {
name: string;
queryName?: string;
args: any[];
}

export async function workflowWithMaybeDefinedQuery(useDefinedQuery: boolean): Promise<void> {
let complete = false;
setHandler(completeSignal, () => {
complete = true;
});
setDefaultQueryHandler((queryName: string, ...args: any[]) => {
return { name: 'default', queryName, args };
});
if (useDefinedQuery) {
setHandler(definedQuery, (...args: any[]) => {
return { name: definedQuery.name, args };
});
}

await condition(() => complete);
}

test('default query handler is used if requested query does not exist', configMacro, async (t, config) => {
const { env, createWorkerWithDefaults } = config;
const { startWorkflow } = configurableHelpers(t, t.context.workflowBundle, env);
const worker = await createWorkerWithDefaults(t, { activities });
const handle = await startWorkflow(workflowWithMaybeDefinedQuery, {
args: [false],
});
await worker.runUntil(async () => {
const args = ['test', 'args'];
const result = await handle.query(definedQuery, ...args);
t.deepEqual(result, { name: 'default', queryName: definedQuery.name, args });
});
});

test('default query handler is not used if requested query exists', configMacro, async (t, config) => {
const { env, createWorkerWithDefaults } = config;
const { startWorkflow } = configurableHelpers(t, t.context.workflowBundle, env);
const worker = await createWorkerWithDefaults(t, { activities });
const handle = await startWorkflow(workflowWithMaybeDefinedQuery, {
args: [true],
});
await worker.runUntil(async () => {
const args = ['test', 'args'];
const result = await handle.query('query-handler-type', ...args);
t.deepEqual(result, { name: definedQuery.name, args });
});
});
5 changes: 5 additions & 0 deletions packages/workflow/src/interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,11 @@ export type Handler<
*/
export type DefaultSignalHandler = (signalName: string, ...args: unknown[]) => void | Promise<void>;

/**
* A handler function accepting query calls for non-registered query names.
*/
export type DefaultQueryHandler = (queryName: string, ...args: unknown[]) => unknown;

/**
* A validation function capable of accepting the arguments for a given UpdateDefinition.
*/
Expand Down
13 changes: 12 additions & 1 deletion packages/workflow/src/internals.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import {
WorkflowInfo,
WorkflowCreateOptionsInternal,
ActivationCompletion,
DefaultQueryHandler,
} from './interfaces';
import { type SinkCall } from './sinks';
import { untrackPromise } from './stack-helpers';
Expand Down Expand Up @@ -189,6 +190,11 @@ export class Activator implements ActivationHandler {
*/
defaultSignalHandler?: DefaultSignalHandler;

/**
* A query handler that catches calls for non-registered query names.
*/
defaultQueryHandler?: DefaultQueryHandler;

/**
* Source map file for looking up the source files in response to __enhanced_stack_trace
*/
Expand Down Expand Up @@ -611,7 +617,11 @@ export class Activator implements ActivationHandler {

// Intentionally non-async function so this handler doesn't show up in the stack trace
protected queryWorkflowNextHandler({ queryName, args }: QueryInput): Promise<unknown> {
const fn = this.queryHandlers.get(queryName)?.handler;
let fn = this.queryHandlers.get(queryName)?.handler;
if (fn === undefined && this.defaultQueryHandler !== undefined) {
fn = this.defaultQueryHandler.bind(this, queryName);
}
// No handler or default registered, fail.
if (fn === undefined) {
const knownQueryTypes = [...this.queryHandlers.keys()].join(' ');
// Fail the query
Expand All @@ -621,6 +631,7 @@ export class Activator implements ActivationHandler {
)
);
}
// Execute handler.
try {
const ret = fn(...args);
if (ret instanceof Promise) {
Expand Down
23 changes: 22 additions & 1 deletion packages/workflow/src/workflow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import {
UpdateInfo,
encodeChildWorkflowCancellationType,
encodeParentClosePolicy,
DefaultQueryHandler,
} from './interfaces';
import { LocalActivityDoBackoff } from './errors';
import { assertInWorkflowContext, getActivator, maybeGetActivator } from './global-attributes';
Expand Down Expand Up @@ -1300,7 +1301,7 @@ export function setHandler<
*
* Signals are dispatched to the default signal handler in the order that they were accepted by the server.
*
* If this function is called multiple times for a given signal or query name the last handler will overwrite any previous calls.
* If this function is called multiple times for a given signal name the last handler will overwrite any previous calls.
*
* @param handler a function that will handle signals for non-registered signal names, or `undefined` to unset the handler.
*/
Expand All @@ -1318,6 +1319,26 @@ export function setDefaultSignalHandler(handler: DefaultSignalHandler | undefine
}
}

/**
* Set a query handler function that will handle query calls for non-registered query names.
*
* Queries are dispatched to the default query handler in the order that they were accepted by the server.
*
* If this function is called multiple times for a given query name the last handler will overwrite any previous calls.
*
* @param handler a function that will handle queries for non-registered query names, or `undefined` to unset the handler.
*/
export function setDefaultQueryHandler(handler: DefaultQueryHandler | undefined): void {
const activator = assertInWorkflowContext(
'Workflow.setDefaultQueryHandler(...) may only be used from a Workflow Execution.'
);
if (typeof handler === 'function' || handler === undefined) {
activator.defaultQueryHandler = handler;
} else {
throw new TypeError(`Expected handler to be either a function or 'undefined'. Got: '${typeof handler}'`);
}
}

/**
* Updates this Workflow's Search Attributes by merging the provided `searchAttributes` with the existing Search
* Attributes, `workflowInfo().searchAttributes`.
Expand Down

0 comments on commit 4155dad

Please sign in to comment.