Skip to content

Commit

Permalink
resolve merge conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
THardy98 committed Mar 10, 2025
2 parents 15b1503 + 4155dad commit 69507da
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 4 deletions.
56 changes: 55 additions & 1 deletion packages/test/src/test-integration-split-two.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import {
import { msToNumber, tsToMs } from '@temporalio/common/lib/time';
import { decode as payloadDecode, decodeFromPayloadsAtIndex } from '@temporalio/common/lib/internal-non-workflow';

import { condition, defineQuery, setHandler, sleep } from '@temporalio/workflow';
import { condition, defineQuery, defineSignal, setDefaultQueryHandler, setHandler, sleep } from '@temporalio/workflow';
import { configurableHelpers, createTestWorkflowBundle } from './helpers-integration';
import * as activities from './activities';
import * as workflows from './workflows';
Expand Down Expand Up @@ -697,3 +697,57 @@ test('Query does not cause condition to be triggered', configMacro, async (t, co
// Worker did not crash
t.pass();
});

const completeSignal = defineSignal('complete');
const definedQuery = defineQuery<QueryNameAndArgs>('query-handler-type');

interface QueryNameAndArgs {
name: string;
queryName?: string;
args: any[];
}

export async function workflowWithMaybeDefinedQuery(useDefinedQuery: boolean): Promise<void> {
let complete = false;
setHandler(completeSignal, () => {
complete = true;
});
setDefaultQueryHandler((queryName: string, ...args: any[]) => {
return { name: 'default', queryName, args };
});
if (useDefinedQuery) {
setHandler(definedQuery, (...args: any[]) => {
return { name: definedQuery.name, args };
});
}

await condition(() => complete);
}

test('default query handler is used if requested query does not exist', configMacro, async (t, config) => {
const { env, createWorkerWithDefaults } = config;
const { startWorkflow } = configurableHelpers(t, t.context.workflowBundle, env);
const worker = await createWorkerWithDefaults(t, { activities });
const handle = await startWorkflow(workflowWithMaybeDefinedQuery, {
args: [false],
});
await worker.runUntil(async () => {
const args = ['test', 'args'];
const result = await handle.query(definedQuery, ...args);
t.deepEqual(result, { name: 'default', queryName: definedQuery.name, args });
});
});

test('default query handler is not used if requested query exists', configMacro, async (t, config) => {
const { env, createWorkerWithDefaults } = config;
const { startWorkflow } = configurableHelpers(t, t.context.workflowBundle, env);
const worker = await createWorkerWithDefaults(t, { activities });
const handle = await startWorkflow(workflowWithMaybeDefinedQuery, {
args: [true],
});
await worker.runUntil(async () => {
const args = ['test', 'args'];
const result = await handle.query('query-handler-type', ...args);
t.deepEqual(result, { name: definedQuery.name, args });
});
});
5 changes: 5 additions & 0 deletions packages/workflow/src/interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,11 @@ export type DefaultSignalHandler = (signalName: string, ...args: unknown[]) => v
*/
export type DefaultUpdateHandler = (updateName: string, ...args: unknown[]) => Promise<unknown> | unknown;

/**
* A handler function accepting query calls for non-registered query names.
*/
export type DefaultQueryHandler = (queryName: string, ...args: unknown[]) => unknown;

/**
* A validation function capable of accepting the arguments for a given UpdateDefinition.
*/
Expand Down
15 changes: 13 additions & 2 deletions packages/workflow/src/internals.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import {
WorkflowCreateOptionsInternal,
ActivationCompletion,
DefaultUpdateHandler,
DefaultQueryHandler,
} from './interfaces';
import { type SinkCall } from './sinks';
import { untrackPromise } from './stack-helpers';
Expand Down Expand Up @@ -195,6 +196,11 @@ export class Activator implements ActivationHandler {
*/
defaultUpdateHandler?: DefaultUpdateHandler;

/**
* A query handler that catches calls for non-registered query names.
*/
defaultQueryHandler?: DefaultQueryHandler;

/**
* Source map file for looking up the source files in response to __enhanced_stack_trace
*/
Expand Down Expand Up @@ -617,7 +623,11 @@ export class Activator implements ActivationHandler {

// Intentionally non-async function so this handler doesn't show up in the stack trace
protected queryWorkflowNextHandler({ queryName, args }: QueryInput): Promise<unknown> {
const fn = this.queryHandlers.get(queryName)?.handler;
let fn = this.queryHandlers.get(queryName)?.handler;
if (fn === undefined && this.defaultQueryHandler !== undefined) {
fn = this.defaultQueryHandler.bind(this, queryName);
}
// No handler or default registered, fail.
if (fn === undefined) {
const knownQueryTypes = [...this.queryHandlers.keys()].join(' ');
// Fail the query
Expand All @@ -627,6 +637,7 @@ export class Activator implements ActivationHandler {
)
);
}
// Execute handler.
try {
const ret = fn(...args);
if (ret instanceof Promise) {
Expand Down Expand Up @@ -676,7 +687,7 @@ export class Activator implements ActivationHandler {
this.updateHandlers.get(name) ??
(this.defaultUpdateHandler
? {
handler: this.defaultUpdateHandler.bind(name),
handler: this.defaultUpdateHandler.bind(this, name),
validator: undefined,
// Default to a warning policy.
unfinishedPolicy: HandlerUnfinishedPolicy.WARN_AND_ABANDON,
Expand Down
23 changes: 22 additions & 1 deletion packages/workflow/src/workflow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import {
encodeChildWorkflowCancellationType,
encodeParentClosePolicy,
DefaultUpdateHandler,
DefaultQueryHandler,
} from './interfaces';
import { LocalActivityDoBackoff } from './errors';
import { assertInWorkflowContext, getActivator, maybeGetActivator } from './global-attributes';
Expand Down Expand Up @@ -1301,7 +1302,7 @@ export function setHandler<
*
* Signals are dispatched to the default signal handler in the order that they were accepted by the server.
*
* If this function is called multiple times for a given signal or query name the last handler will overwrite any previous calls.
* If this function is called multiple times for a given signal name the last handler will overwrite any previous calls.
*
* @param handler a function that will handle signals for non-registered signal names, or `undefined` to unset the handler.
*/
Expand Down Expand Up @@ -1342,6 +1343,26 @@ export function setDefaultUpdateHandler(handler: DefaultUpdateHandler | undefine
}
}

/**
* Set a query handler function that will handle query calls for non-registered query names.
*
* Queries are dispatched to the default query handler in the order that they were accepted by the server.
*
* If this function is called multiple times for a given query name the last handler will overwrite any previous calls.
*
* @param handler a function that will handle queries for non-registered query names, or `undefined` to unset the handler.
*/
export function setDefaultQueryHandler(handler: DefaultQueryHandler | undefined): void {
const activator = assertInWorkflowContext(
'Workflow.setDefaultQueryHandler(...) may only be used from a Workflow Execution.'
);
if (typeof handler === 'function' || handler === undefined) {
activator.defaultQueryHandler = handler;
} else {
throw new TypeError(`Expected handler to be either a function or 'undefined'. Got: '${typeof handler}'`);
}
}

/**
* Updates this Workflow's Search Attributes by merging the provided `searchAttributes` with the existing Search
* Attributes, `workflowInfo().searchAttributes`.
Expand Down

0 comments on commit 69507da

Please sign in to comment.