diff --git a/package-lock.json b/package-lock.json index 32620b216..e0b296223 100644 --- a/package-lock.json +++ b/package-lock.json @@ -19625,13 +19625,24 @@ "@grpc/grpc-js": "^1.6.7", "@temporalio/common": "file:../common", "@temporalio/proto": "file:../proto", - "ms": "^2.1.3", + "long": "^5.2.0", "uuid": "^8.3.2" }, "devDependencies": { + "@types/long": "^5.0.0", "protobufjs": "^7.0.0" } }, + "packages/client/node_modules/@types/long": { + "version": "5.0.0", + "resolved": "https://registry.npmjs.org/@types/long/-/long-5.0.0.tgz", + "integrity": "sha512-eQs9RsucA/LNjnMoJvWG/nXa7Pot/RbBzilF/QRIU/xRl+0ApxrSUFsV5lmf01SvSlqMzJ7Zwxe440wmz2SJGA==", + "deprecated": "This is a stub types definition. long provides its own type definitions, so you do not need this installed.", + "dev": true, + "dependencies": { + "long": "*" + } + }, "packages/common": { "name": "@temporalio/common", "version": "1.4.3", @@ -23828,9 +23839,21 @@ "@grpc/grpc-js": "^1.6.7", "@temporalio/common": "file:../common", "@temporalio/proto": "file:../proto", - "ms": "^2.1.3", + "@types/long": "^5.0.0", + "long": "^5.2.0", "protobufjs": "^7.0.0", "uuid": "^8.3.2" + }, + "dependencies": { + "@types/long": { + "version": "5.0.0", + "resolved": "https://registry.npmjs.org/@types/long/-/long-5.0.0.tgz", + "integrity": "sha512-eQs9RsucA/LNjnMoJvWG/nXa7Pot/RbBzilF/QRIU/xRl+0ApxrSUFsV5lmf01SvSlqMzJ7Zwxe440wmz2SJGA==", + "dev": true, + "requires": { + "long": "*" + } + } } }, "@temporalio/common": { diff --git a/packages/client/package.json b/packages/client/package.json index 1e8b17a1e..c9e649c79 100644 --- a/packages/client/package.json +++ b/packages/client/package.json @@ -16,10 +16,11 @@ "@grpc/grpc-js": "^1.6.7", "@temporalio/common": "file:../common", "@temporalio/proto": "file:../proto", - "ms": "^2.1.3", + "long": "^5.2.0", "uuid": "^8.3.2" }, "devDependencies": { + "@types/long": "^5.0.0", "protobufjs": "^7.0.0" }, "bugs": { diff --git a/packages/client/src/client.ts b/packages/client/src/client.ts index fa1b0795f..64ebc88c5 100644 --- a/packages/client/src/client.ts +++ b/packages/client/src/client.ts @@ -6,6 +6,7 @@ import os from 'os'; import { AsyncCompletionClient } from './async-completion-client'; import { Connection } from './connection'; import { ClientInterceptors } from './interceptors'; +import { ScheduleClient } from './schedule-client'; import { ConnectionLike, Metadata, WorkflowService } from './types'; import { WorkflowClient } from './workflow-client'; @@ -95,6 +96,12 @@ export class Client { * (Async) Activity completion sub-client - use to manually manage Activities */ public readonly activity: AsyncCompletionClient; + /** + * Schedule sub-client - use to start and interact with Schedules + * + * @experimental + */ + public readonly schedule: ScheduleClient; constructor(options?: ClientOptions) { this.connection = options?.connection ?? Connection.lazy(); @@ -119,6 +126,13 @@ export class Client { connection: this.connection, dataConverter: loadedDataConverter, }); + + this.schedule = new ScheduleClient({ + ...base, + connection: this.connection, + dataConverter: loadedDataConverter, + interceptors: interceptors.schedule, + }); } /** diff --git a/packages/client/src/index.ts b/packages/client/src/index.ts index 7dcc6c8a3..4c7cb03be 100644 --- a/packages/client/src/index.ts +++ b/packages/client/src/index.ts @@ -36,3 +36,5 @@ export * from './interceptors'; export * from './types'; export * from './workflow-client'; export * from './workflow-options'; +export * from './schedule-types'; +export * from './schedule-client'; diff --git a/packages/client/src/interceptors.ts b/packages/client/src/interceptors.ts index cdf289570..2898785bf 100644 --- a/packages/client/src/interceptors.ts +++ b/packages/client/src/interceptors.ts @@ -6,6 +6,7 @@ import { Headers, Next } from '@temporalio/common'; import { temporal } from '@temporalio/proto'; +import { CompiledScheduleOptions } from './schedule-types'; import { DescribeWorkflowExecutionResponse, RequestCancelWorkflowExecutionResponse, @@ -131,11 +132,42 @@ export interface WorkflowClientInterceptors { calls?: WorkflowClientCallsInterceptorFactory[]; } +/** + * Implement any of these methods to intercept ScheduleClient outbound calls + * + * @experimental + */ +export interface ScheduleClientInterceptor { + /** + * Intercept a service call to CreateSchedule + */ + create?: (input: CreateScheduleInput, next: Next) => Promise; +} + +/** + * Input for {@link ScheduleClientInterceptor.create} + * + * @experimental + */ +export interface CreateScheduleInput { + readonly headers: Headers; + readonly options: CompiledScheduleOptions; +} + +export type CreateScheduleOutput = { + readonly conflictToken: Uint8Array; +}; + /** * Interceptors for any high-level SDK client. * - * NOTE: Currently only for {@link WorkflowClient}. More will be added later as needed. + * NOTE: Currently only for {@link WorkflowClient} and {@link ScheduleClient}. More will be added later as needed. */ export interface ClientInterceptors { workflow?: WorkflowClientInterceptors; + + /** + * @experimental + */ + schedule?: ScheduleClientInterceptor[]; } diff --git a/packages/client/src/schedule-client.ts b/packages/client/src/schedule-client.ts new file mode 100644 index 000000000..06d8a4ba9 --- /dev/null +++ b/packages/client/src/schedule-client.ts @@ -0,0 +1,605 @@ +import { status as grpcStatus } from '@grpc/grpc-js'; +import { DataConverter, LoadedDataConverter, mapToPayloads, searchAttributePayloadConverter } from '@temporalio/common'; +import { composeInterceptors, Headers } from '@temporalio/common/lib/interceptors'; +import { + encodeMapToPayloads, + loadDataConverter, + isLoadedDataConverter, + filterNullAndUndefined, + decodeMapFromPayloads, +} from '@temporalio/common/lib/internal-non-workflow'; +import os from 'os'; +import { Connection } from './connection'; +import { CreateScheduleInput, CreateScheduleOutput, ScheduleClientInterceptor } from './interceptors'; +import { ConnectionLike, Metadata, WorkflowService } from './types'; +import { v4 as uuid4 } from 'uuid'; +import { isServerErrorResponse, ServiceError } from './errors'; +import { + Backfill, + CompiledScheduleUpdateOptions, + ScheduleSummary, + ScheduleDescription, + ScheduleOptions, + ScheduleOverlapPolicy, + ScheduleUpdateOptions, +} from './schedule-types'; +import { Replace } from '@temporalio/common/lib/type-helpers'; +import { temporal } from '@temporalio/proto'; +import { optionalDateToTs, optionalTsToDate, optionalTsToMs, tsToDate } from '@temporalio/common/lib/time'; +import { + compileScheduleOptions, + compileUpdatedScheduleOptions, + decodeOverlapPolicy, + decodeScheduleAction, + decodeScheduleRecentActions, + decodeScheduleRunningActions, + decodeScheduleSpec, + decodeSearchAttributes, + encodeOverlapPolicy, + encodeScheduleAction, + encodeSchedulePolicies, + encodeScheduleSpec, + encodeScheduleState, +} from './schedule-helpers'; + +/** + * Handle to a single Schedule + * + * @experimental + */ +export interface ScheduleHandle { + /** + * This Schedule's identifier + */ + readonly scheduleId: string; + + /** + * Fetch the Schedule's description from the Server + */ + describe(): Promise; + + /** + * Update the Schedule + * + * This function calls `.describe()`, provides the `Schedule` to the provided `updateFn`, and + * sends the returned `UpdatedSchedule` to the Server to update the Schedule definition. Note that, + * in the future, `updateFn` might be invoked multiple time, with identical or different input. + */ + update(updateFn: (previous: ScheduleDescription) => ScheduleUpdateOptions): Promise; + + /** + * Delete the Schedule + */ + delete(): Promise; + + /** + * Trigger an Action to be taken immediately + * + * @param overlap Override the Overlap Policy for this one trigger. Defaults to {@link ScheduleOverlapPolicy.ALLOW_ALL}. + */ + trigger(overlap?: ScheduleOverlapPolicy): Promise; + + /** + * Run though the specified time period(s) and take Actions as if that time passed by right now, all at once. + * The Overlap Policy can be overridden for the scope of the Backfill. + */ + backfill(options: Backfill | Backfill[]): Promise; + + /** + * Pause the Schedule + * + * @param note A new {@link ScheduleDescription.note}. Defaults to `"Paused via TypeScript SDK"` + */ + pause(note?: string): Promise; + + /** + * Unpause the Schedule + * + * @param note A new {@link ScheduleDescription.note}. Defaults to `"Unpaused via TypeScript SDK" + */ + unpause(note?: string): Promise; + + /** + * Readonly accessor to the underlying ScheduleClient + */ + readonly client: ScheduleClient; +} + +/** + * @experimental + */ +export interface ScheduleClientOptions { + /** + * {@link DataConverter} to use for serializing and deserializing payloads + */ + dataConverter?: DataConverter | LoadedDataConverter; + + /** + * Used to override and extend default Connection functionality + * + * Useful for injecting auth headers and tracing Workflow executions + */ + interceptors?: ScheduleClientInterceptor[]; + + /** + * Identity to report to the server + * + * @default `${process.pid}@${os.hostname()}` + */ + identity?: string; + + /** + * Connection to use to communicate with the server. + * + * By default `ScheduleClient` connects to localhost. + * + * Connections are expensive to construct and should be reused. + */ + connection?: ConnectionLike; + + /** + * Server namespace + * + * @default default + */ + namespace?: string; +} + +/** @experimental */ +export type ScheduleClientOptionsWithDefaults = Replace< + Required, + { + connection?: ConnectionLike; + } +>; + +/** @experimental */ +export type LoadedScheduleClientOptions = ScheduleClientOptionsWithDefaults & { + loadedDataConverter: LoadedDataConverter; +}; + +function defaultScheduleClientOptions(): ScheduleClientOptionsWithDefaults { + return { + dataConverter: {}, + // The equivalent in Java is ManagementFactory.getRuntimeMXBean().getName() + identity: `${process.pid}@${os.hostname()}`, + interceptors: [], + namespace: 'default', + }; +} + +function assertRequiredScheduleOptions(opts: ScheduleOptions, action: 'CREATE'): void; +function assertRequiredScheduleOptions(opts: ScheduleUpdateOptions, action: 'UPDATE'): void; +function assertRequiredScheduleOptions( + opts: ScheduleOptions | ScheduleUpdateOptions, + action: 'CREATE' | 'UPDATE' +): void { + const structureName = action === 'CREATE' ? 'ScheduleOptions' : 'ScheduleUpdateOptions'; + if (action === 'CREATE' && !(opts as ScheduleOptions).scheduleId) { + throw new TypeError(`Missing ${structureName}.scheduleId`); + } + if (!(opts.spec.calendars?.length || opts.spec.intervals?.length || opts.spec.cronExpressions?.length)) { + throw new TypeError(`At least one ${structureName}.spec.calendars, .intervals or .cronExpressions is required`); + } + switch (opts.action.type) { + case 'startWorkflow': + if (!opts.action.taskQueue) { + throw new TypeError(`Missing ${structureName}.action.taskQueue for 'startWorkflow' action`); + } + if (!opts.action.workflowId && action === 'UPDATE') { + throw new TypeError(`Missing ${structureName}.action.workflowId for 'startWorkflow' action`); + } + if (!opts.action.workflowType) { + throw new TypeError(`Missing ${structureName}.action.workflowType for 'startWorkflow' action`); + } + } +} + +/** @experimental */ +export interface ListScheduleOptions { + /** + * How many results to fetch from the Server at a time. + * @default 1000 + */ + pageSize?: number; +} + +/** + * Client for starting Workflow executions and creating Workflow handles + * + * @experimental + */ +export class ScheduleClient { + public readonly options: LoadedScheduleClientOptions; + public readonly connection: ConnectionLike; + + constructor(options?: ScheduleClientOptions) { + this.connection = options?.connection ?? Connection.lazy(); + const dataConverter = options?.dataConverter; + const loadedDataConverter = isLoadedDataConverter(dataConverter) ? dataConverter : loadDataConverter(dataConverter); + this.options = { + ...defaultScheduleClientOptions(), + ...filterNullAndUndefined(options ?? {}), + loadedDataConverter, + }; + } + + /** + * Raw gRPC access to the Temporal service. Schedule-related methods are included in {@link WorkflowService}. + * + * **NOTE**: The namespace provided in {@link options} is **not** automatically set on requests made to the service. + */ + get workflowService(): WorkflowService { + return this.connection.workflowService; + } + + protected get dataConverter(): LoadedDataConverter { + return this.options.loadedDataConverter; + } + + /** + * Set the deadline for any service requests executed in `fn`'s scope. + */ + async withDeadline(deadline: number | Date, fn: () => Promise): Promise { + return await this.connection.withDeadline(deadline, fn); + } + + /** + * Set metadata for any service requests executed in `fn`'s scope. + * + * @returns returned value of `fn` + * + * @see {@link Connection.withMetadata} + */ + async withMetadata(metadata: Metadata, fn: () => Promise): Promise { + return await this.connection.withMetadata(metadata, fn); + } + + /** + * Create a new Schedule. + * + * @throws {@link ScheduleAlreadyRunning} if there's a running (not deleted) Schedule with the given `id` + * @returns a ScheduleHandle to the created Schedule + */ + public async create(options: ScheduleOptions): Promise { + await this._createSchedule(options); + return this.getHandle(options.scheduleId); + } + + /** + * Create a new Schedule. + */ + protected async _createSchedule(options: ScheduleOptions): Promise { + assertRequiredScheduleOptions(options, 'CREATE'); + const compiledOptions = compileScheduleOptions(options); + + const create = composeInterceptors(this.options.interceptors, 'create', this._createScheduleHandler.bind(this)); + await create({ + options: compiledOptions, + headers: {}, + }); + } + + /** + * Create a new Schedule. + */ + protected async _createScheduleHandler(input: CreateScheduleInput): Promise { + const { options: opts, headers } = input; + const { identity } = this.options; + const req: temporal.api.workflowservice.v1.ICreateScheduleRequest = { + namespace: this.options.namespace, + identity, + requestId: uuid4(), + scheduleId: opts.scheduleId, + schedule: { + spec: encodeScheduleSpec(opts.spec), + action: await encodeScheduleAction(this.dataConverter, opts.action, headers), + policies: encodeSchedulePolicies(opts.policies), + state: encodeScheduleState(opts.state), + }, + memo: opts.memo ? { fields: await encodeMapToPayloads(this.dataConverter, opts.memo) } : undefined, + searchAttributes: opts.searchAttributes + ? { + indexedFields: mapToPayloads(searchAttributePayloadConverter, opts.searchAttributes), + } + : undefined, + initialPatch: { + triggerImmediately: opts.state?.triggerImmediately + ? { overlapPolicy: temporal.api.enums.v1.ScheduleOverlapPolicy.SCHEDULE_OVERLAP_POLICY_ALLOW_ALL } + : undefined, + backfillRequest: opts.state?.backfill + ? opts.state.backfill.map((x) => ({ + startTime: optionalDateToTs(x.start), + endTime: optionalDateToTs(x.end), + overlapPolicy: x.overlap ? encodeOverlapPolicy(x.overlap) : undefined, + })) + : undefined, + }, + }; + try { + const res = await this.workflowService.createSchedule(req); + return { conflictToken: res.conflictToken }; + } catch (err: any) { + if (err.code === grpcStatus.ALREADY_EXISTS) { + throw new ScheduleAlreadyRunning('Schedule already exists and is running', opts.scheduleId); + } + this.rethrowGrpcError(err, opts.scheduleId, 'Failed to create schedule'); + } + } + + /** + * Describe a Schedule. + */ + protected async _describeSchedule( + scheduleId: string + ): Promise { + try { + return await this.workflowService.describeSchedule({ + namespace: this.options.namespace, + scheduleId, + }); + } catch (err: any) { + this.rethrowGrpcError(err, scheduleId, 'Failed to describe schedule'); + } + } + + /** + * Update a Schedule. + */ + protected async _updateSchedule( + scheduleId: string, + opts: CompiledScheduleUpdateOptions, + header: Headers + ): Promise { + try { + return await this.workflowService.updateSchedule({ + namespace: this.options.namespace, + scheduleId, + schedule: { + spec: encodeScheduleSpec(opts.spec), + action: await encodeScheduleAction(this.dataConverter, opts.action, header), + policies: encodeSchedulePolicies(opts.policies), + state: encodeScheduleState(opts.state), + }, + identity: this.options.identity, + requestId: uuid4(), + }); + } catch (err: any) { + this.rethrowGrpcError(err, scheduleId, 'Failed to update schedule'); + } + } + + /** + * Patch a Schedule. + */ + protected async _patchSchedule( + scheduleId: string, + patch: temporal.api.schedule.v1.ISchedulePatch + ): Promise { + try { + return await this.workflowService.patchSchedule({ + namespace: this.options.namespace, + scheduleId, + identity: this.options.identity, + requestId: uuid4(), + patch, + }); + } catch (err: any) { + this.rethrowGrpcError(err, scheduleId, 'Failed to patch schedule'); + } + } + + /** + * Delete a Schedule. + */ + protected async _deleteSchedule( + scheduleId: string + ): Promise { + try { + return await this.workflowService.deleteSchedule({ + namespace: this.options.namespace, + identity: this.options.identity, + scheduleId, + }); + } catch (err: any) { + this.rethrowGrpcError(err, scheduleId, 'Failed to delete schedule'); + } + } + + /** + * List Schedules with an `AsyncIterator`: + * + * ```ts + * for await (const schedule: Schedule of client.list()) { + * const { id, memo, searchAttributes } = schedule + * // ... + * } + * ``` + * + * To list one page at a time, instead use the raw gRPC method {@link WorkflowService.listSchedules}: + * + * ```ts + * await { schedules, nextPageToken } = client.scheduleService.listSchedules() + * ``` + */ + public async *list(options?: ListScheduleOptions): AsyncIterable { + let nextPageToken: Uint8Array | undefined = undefined; + for (;;) { + const response: temporal.api.workflowservice.v1.IListSchedulesResponse = await this.workflowService.listSchedules( + { + nextPageToken, + namespace: this.options.namespace, + maximumPageSize: options?.pageSize, + } + ); + + for (const raw of response.schedules ?? []) { + if (!raw.info?.spec) continue; + + yield { + scheduleId: raw.scheduleId, + + spec: decodeScheduleSpec(raw.info.spec), + action: { + type: 'startWorkflow', + workflowType: raw.info.workflowType?.name, + }, + memo: await decodeMapFromPayloads(this.dataConverter, raw.memo?.fields), + searchAttributes: decodeSearchAttributes(raw.searchAttributes), + state: { + paused: raw.info.paused === true, + note: raw.info.notes ?? undefined, + }, + info: { + recentActions: decodeScheduleRecentActions(raw.info.recentActions), + nextActionTimes: raw.info?.futureActionTimes?.map(tsToDate) ?? [], + }, + }; + } + + if (response.nextPageToken == null || response.nextPageToken.length === 0) break; + nextPageToken = response.nextPageToken; + } + } + + /** + * Get a handle to a Schedule + * + * This method does not validate `scheduleId`. If there is no Schedule with the given `scheduleId`, handle + * methods like `handle.describe()` will throw a {@link ScheduleNotFoundError} error. + */ + public getHandle(scheduleId: string): ScheduleHandle { + return { + client: this, + scheduleId, + + async describe(): Promise { + const raw = await this.client._describeSchedule(this.scheduleId); + if (!raw.schedule?.spec || !raw.schedule.action) + throw new Error('Received invalid Schedule description from server'); + return { + scheduleId, + spec: decodeScheduleSpec(raw.schedule.spec), + action: await decodeScheduleAction(this.client.dataConverter, raw.schedule.action), + memo: await decodeMapFromPayloads(this.client.dataConverter, raw.memo?.fields), + searchAttributes: decodeSearchAttributes(raw.searchAttributes), + policies: { + overlap: decodeOverlapPolicy(raw.schedule.policies?.overlapPolicy), + catchupWindow: optionalTsToMs(raw.schedule.policies?.catchupWindow) ?? 60_000, + pauseOnFailure: raw.schedule.policies?.pauseOnFailure === true, + }, + state: { + paused: raw.schedule.state?.paused === true, + note: raw.schedule.state?.notes ?? undefined, + remainingActions: raw.schedule.state?.limitedActions + ? raw.schedule.state?.remainingActions?.toNumber() || 0 + : undefined, + }, + info: { + recentActions: decodeScheduleRecentActions(raw.info?.recentActions), + nextActionTimes: raw.info?.futureActionTimes?.map(tsToDate) ?? [], + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + createdAt: tsToDate(raw.info!.createTime!), + lastUpdatedAt: optionalTsToDate(raw.info?.updateTime), + runningActions: decodeScheduleRunningActions(raw.info?.runningWorkflows), + numActionsMissedCatchupWindow: raw.info?.missedCatchupWindow?.toNumber() ?? 0, + numActionsSkippedOverlap: raw.info?.overlapSkipped?.toNumber() ?? 0, + numActionsTaken: raw.info?.actionCount?.toNumber() ?? 0, + }, + raw, + }; + }, + + async update(updateFn): Promise { + const current = await this.describe(); + // Keep existing headers + const currentHeader: Headers = current.raw.schedule?.action?.startWorkflow?.header?.fields ?? {}; + const updated = updateFn(current); + assertRequiredScheduleOptions(updated, 'UPDATE'); + await this.client._updateSchedule(scheduleId, compileUpdatedScheduleOptions(updated), currentHeader); + }, + + async delete(): Promise { + await this.client._deleteSchedule(this.scheduleId); + }, + + async pause(note?: string): Promise { + await this.client._patchSchedule(this.scheduleId, { + pause: note ?? 'Paused via TypeScript SDK"', + }); + }, + + async unpause(note?): Promise { + await this.client._patchSchedule(this.scheduleId, { + unpause: note ?? 'Unpaused via TypeScript SDK"', + }); + }, + + async trigger(overlap?): Promise { + await this.client._patchSchedule(this.scheduleId, { + triggerImmediately: { + overlapPolicy: overlap + ? encodeOverlapPolicy(overlap) + : temporal.api.enums.v1.ScheduleOverlapPolicy.SCHEDULE_OVERLAP_POLICY_ALLOW_ALL, + }, + }); + }, + + async backfill(options): Promise { + const backfills = Array.isArray(options) ? options : [options]; + await this.client._patchSchedule(this.scheduleId, { + backfillRequest: backfills.map((x) => ({ + startTime: optionalDateToTs(x.start), + endTime: optionalDateToTs(x.end), + overlapPolicy: x.overlap ? encodeOverlapPolicy(x.overlap) : undefined, + })), + }); + }, + }; + } + + protected rethrowGrpcError(err: unknown, scheduleId: string, fallbackMessage: string): never { + if (isServerErrorResponse(err)) { + if (err.code === grpcStatus.NOT_FOUND) { + throw new ScheduleNotFoundError(err.details ?? 'Schedule not found', scheduleId); + } + if ( + err.code === grpcStatus.INVALID_ARGUMENT && + err.message.match(/^3 INVALID_ARGUMENT: Invalid schedule spec: /) + ) { + throw new TypeError(err.message.replace(/^3 INVALID_ARGUMENT: Invalid schedule spec: /, '')); + } + throw new ServiceError(fallbackMessage, { cause: err }); + } + throw new ServiceError('Unexpected error while making gRPC request'); + } +} + +/** + * Thrown from {@link ScheduleClient.create} if there's a running (not deleted) Schedule with the given `id`. + * + * @experimental + */ +export class ScheduleAlreadyRunning extends Error { + public readonly name: string = 'ScheduleAlreadyRunning'; + + constructor(message: string, public readonly scheduleId: string) { + super(message); + } +} + +/** + * Thrown when a Schedule with the given Id is not known to Temporal Server. + * It could be because: + * - Id passed is incorrect + * - Schedule was deleted + * + * @experimental + */ +export class ScheduleNotFoundError extends Error { + public readonly name: string = 'ScheduleNotFoundError'; + + constructor(message: string, public readonly scheduleId: string) { + super(message); + } +} diff --git a/packages/client/src/schedule-helpers.ts b/packages/client/src/schedule-helpers.ts new file mode 100644 index 000000000..ec9527ac7 --- /dev/null +++ b/packages/client/src/schedule-helpers.ts @@ -0,0 +1,414 @@ +import { + compileRetryPolicy, + decompileRetryPolicy, + LoadedDataConverter, + mapFromPayloads, + mapToPayloads, + searchAttributePayloadConverter, + SearchAttributes, +} from '@temporalio/common'; +import { Headers } from '@temporalio/common/lib/interceptors'; +import { + decodeArrayFromPayloads, + decodeMapFromPayloads, + encodeMapToPayloads, + encodeToPayloads, +} from '@temporalio/common/lib/internal-non-workflow'; +import { + CalendarSpec, + CalendarSpecDescription, + CompiledScheduleOptions, + CompiledScheduleUpdateOptions, + Range, + ScheduleOptions, + ScheduleOverlapPolicy, + ScheduleUpdateOptions, + DayOfWeek, + DAYS_OF_WEEK, + Month, + MONTHS, + LooseRange, + ScheduleSpec, + CompiledScheduleAction, + ScheduleSpecDescription, + IntervalSpecDescription, + ScheduleDescriptionAction, + ScheduleExecutionActionResult, + ScheduleExecutionResult, + ScheduleExecutionStartWorkflowActionResult, +} from './schedule-types'; +import { temporal } from '@temporalio/proto'; +import { + msOptionalToTs, + msToTs, + optionalDateToTs, + optionalTsToDate, + optionalTsToMs, + tsToDate, +} from '@temporalio/common/lib/time'; +import { RequireAtLeastOne } from '@temporalio/common/src/type-helpers'; +// eslint-disable-next-line import/no-named-as-default +import Long from 'long'; + +const [encodeSecond, decodeSecond] = makeCalendarSpecFieldCoders( + 'second', + (x: number) => (typeof x === 'number' && x >= 0 && x <= 59 ? x : undefined), + (x: number) => x, + [{ start: 0, end: 0, step: 0 }], // default to 0 + [{ start: 0, end: 59, step: 1 }] +); + +const [encodeMinute, decodeMinue] = makeCalendarSpecFieldCoders( + 'minute', + (x: number) => (typeof x === 'number' && x >= 0 && x <= 59 ? x : undefined), + (x: number) => x, + [{ start: 0, end: 0, step: 0 }], // default to 0 + [{ start: 0, end: 59, step: 1 }] +); + +const [encodeHour, decodeHour] = makeCalendarSpecFieldCoders( + 'hour', + (x: number) => (typeof x === 'number' && x >= 0 && x <= 59 ? x : undefined), + (x: number) => x, + [{ start: 0, end: 0, step: 0 }], // default to 0 + [{ start: 0, end: 23, step: 1 }] +); + +const [encodeDayOfMonth, decodeDayOfMonth] = makeCalendarSpecFieldCoders( + 'dayOfMonth', + (x: number) => (typeof x === 'number' && x >= 0 && x <= 6 ? x : undefined), + (x: number) => x, + [{ start: 1, end: 31, step: 1 }], // default to * + [{ start: 1, end: 31, step: 1 }] +); + +const [encodeMonth, decodeMonth] = makeCalendarSpecFieldCoders( + 'month', + function monthNameToNumber(month: Month): number | undefined { + const index = MONTHS.indexOf(month); + return index >= 0 ? index + 1 : undefined; + }, + (month: number) => MONTHS[month - 1], + [{ start: 1, end: 12, step: 1 }], // default to * + [{ start: 1, end: 12, step: 1 }] +); + +const [encodeYear, decodeYear] = makeCalendarSpecFieldCoders( + 'year', + (x: number) => (typeof x === 'number' ? x : undefined), + (x: number) => x, + [], // default to * + [] // special case: * for years is encoded as no range at all +); + +const [encodeDayOfWeek, decodeDayOfWeek] = makeCalendarSpecFieldCoders( + 'dayOfWeek', + function dayOfWeekNameToNumber(day: DayOfWeek): number | undefined { + const index = DAYS_OF_WEEK.indexOf(day); + return index >= 0 ? index : undefined; + }, + (day: number) => DAYS_OF_WEEK[day], + [{ start: 0, end: 6, step: 1 }], // default to * + [{ start: 0, end: 6, step: 1 }] +); + +function makeCalendarSpecFieldCoders( + fieldName: string, + encodeValueFn: (x: Unit) => number | undefined, + decodeValueFn: (x: number) => Unit, + defaultValue: temporal.api.schedule.v1.IRange[], + matchAllValue: temporal.api.schedule.v1.IRange[] +) { + function encoder( + input: LooseRange | LooseRange[] | '*' | undefined + ): temporal.api.schedule.v1.IRange[] | undefined { + if (input === undefined) return defaultValue; + if (input === '*') return matchAllValue; + + return (Array.isArray(input) ? input : [input]).map((item) => { + if (typeof item === 'object' && (item as Range).start !== undefined) { + const range = item as Range; + const start = encodeValueFn(range.start); + if (start !== undefined) { + return { + start, + end: range.end !== undefined ? encodeValueFn(range.end) ?? start : 1, + step: typeof range.step === 'number' && range.step > 0 ? range.step : 1, + }; + } + } + if (item !== undefined) { + const value = encodeValueFn(item as Unit); + if (value !== undefined) return { start: value, end: value, step: 1 }; + } + throw new Error(`Invalid CalendarSpec component for field ${fieldName}: '${item}' of type '${typeof item}'`); + }); + } + + function decoder(input: temporal.api.schedule.v1.IRange[] | undefined | null): Range[] { + if (!input) return []; + return (input as temporal.api.schedule.v1.Range[]).map((pb): Range => { + const start = decodeValueFn(pb.start); + const end = pb.end > pb.start ? decodeValueFn(pb.end) ?? start : start; + const step = pb.step > 0 ? pb.step : 1; + return { start, end, step }; + }); + } + + return [encoder, decoder] as const; +} + +export function encodeOptionalStructuredCalendarSpecs( + input: CalendarSpec[] | null | undefined +): temporal.api.schedule.v1.IStructuredCalendarSpec[] | undefined { + if (!input) return undefined; + return input.map((spec) => ({ + second: encodeSecond(spec.second), + minute: encodeMinute(spec.minute), + hour: encodeHour(spec.hour), + dayOfMonth: encodeDayOfMonth(spec.dayOfMonth), + month: encodeMonth(spec.month), + year: encodeYear(spec.year), + dayOfWeek: encodeDayOfWeek(spec.dayOfWeek), + comment: spec.comment, + })); +} + +export function decodeOptionalStructuredCalendarSpecs( + input: temporal.api.schedule.v1.IStructuredCalendarSpec[] | null | undefined +): CalendarSpecDescription[] { + if (!input) return []; + + return (input as temporal.api.schedule.v1.StructuredCalendarSpec[]).map( + (pb): CalendarSpecDescription => ({ + second: decodeSecond(pb.second), + minute: decodeMinue(pb.minute), + hour: decodeHour(pb.hour), + dayOfMonth: decodeDayOfMonth(pb.dayOfMonth), + month: decodeMonth(pb.month), + year: decodeYear(pb.year), + dayOfWeek: decodeDayOfWeek(pb.dayOfWeek), + comment: pb.comment, + }) + ); +} + +export function encodeOverlapPolicy(input: ScheduleOverlapPolicy): temporal.api.enums.v1.ScheduleOverlapPolicy { + return temporal.api.enums.v1.ScheduleOverlapPolicy[ + `SCHEDULE_OVERLAP_POLICY_${ScheduleOverlapPolicy[input] as keyof typeof ScheduleOverlapPolicy}` + ]; +} + +export function decodeOverlapPolicy(input?: temporal.api.enums.v1.ScheduleOverlapPolicy | null): ScheduleOverlapPolicy { + if (!input) return ScheduleOverlapPolicy.UNSPECIFIED; + const encodedPolicyName = temporal.api.enums.v1.ScheduleOverlapPolicy[input]; + const decodedPolicyName = encodedPolicyName.substring( + 'SCHEDULE_OVERLAP_POLICY_'.length + ) as keyof typeof ScheduleOverlapPolicy; + return ScheduleOverlapPolicy[decodedPolicyName]; +} + +export function compileScheduleOptions(options: ScheduleOptions): CompiledScheduleOptions { + const workflowTypeOrFunc = options.action.workflowType; + const workflowType = typeof workflowTypeOrFunc === 'string' ? workflowTypeOrFunc : workflowTypeOrFunc.name; + return { + ...options, + action: { + ...options.action, + workflowId: options.action.workflowId ?? `${options.scheduleId}-workflow`, + workflowType, + args: options.action.args ?? [], + }, + }; +} + +export function compileUpdatedScheduleOptions(options: ScheduleUpdateOptions): CompiledScheduleUpdateOptions { + const workflowTypeOrFunc = options.action.workflowType; + const workflowType = typeof workflowTypeOrFunc === 'string' ? workflowTypeOrFunc : workflowTypeOrFunc.name; + return { + ...options, + action: { + ...options.action, + workflowType, + args: options.action.args ?? [], + }, + }; +} + +export function encodeScheduleSpec(spec: ScheduleSpec): temporal.api.schedule.v1.IScheduleSpec { + return { + structuredCalendar: encodeOptionalStructuredCalendarSpecs(spec.calendars), + interval: spec.intervals?.map((interval) => ({ + interval: msToTs(interval.every), + phase: msOptionalToTs(interval.offset), + })), + cronString: spec.cronExpressions, + excludeStructuredCalendar: encodeOptionalStructuredCalendarSpecs(spec.skip), + startTime: optionalDateToTs(spec.startAt), + endTime: optionalDateToTs(spec.endAt), + jitter: msOptionalToTs(spec.jitter), + timezoneName: spec.timezone, + }; +} + +export async function encodeScheduleAction( + dataConverter: LoadedDataConverter, + action: CompiledScheduleAction, + headers: Headers +): Promise { + return { + startWorkflow: { + workflowId: action.workflowId, + workflowType: { + name: action.workflowType, + }, + input: { payloads: await encodeToPayloads(dataConverter, ...action.args) }, + taskQueue: { + kind: temporal.api.enums.v1.TaskQueueKind.TASK_QUEUE_KIND_UNSPECIFIED, + name: action.taskQueue, + }, + workflowExecutionTimeout: msOptionalToTs(action.workflowExecutionTimeout), + workflowRunTimeout: msOptionalToTs(action.workflowRunTimeout), + workflowTaskTimeout: msOptionalToTs(action.workflowTaskTimeout), + retryPolicy: action.retry ? compileRetryPolicy(action.retry) : undefined, + memo: action.memo ? { fields: await encodeMapToPayloads(dataConverter, action.memo) } : undefined, + searchAttributes: action.searchAttributes + ? { + indexedFields: mapToPayloads(searchAttributePayloadConverter, action.searchAttributes), + } + : undefined, + header: { fields: headers }, + }, + }; +} + +export function encodeSchedulePolicies( + policies?: ScheduleOptions['policies'] +): temporal.api.schedule.v1.ISchedulePolicies { + return { + catchupWindow: msOptionalToTs(policies?.catchupWindow), + overlapPolicy: policies?.overlap ? encodeOverlapPolicy(policies.overlap) : undefined, + pauseOnFailure: policies?.pauseOnFailure, + }; +} + +export function encodeScheduleState(state?: ScheduleOptions['state']): temporal.api.schedule.v1.IScheduleState { + return { + paused: state?.paused, + notes: state?.note, + limitedActions: state?.remainingActions !== undefined, + remainingActions: state?.remainingActions ? Long.fromNumber(state?.remainingActions) : undefined, + }; +} + +export function decodeScheduleSpec( + pb: temporal.api.schedule.v1.IScheduleSpec +): RequireAtLeastOne { + // Note: the server will have compiled calendar and cron_string fields into + // structured_calendar (and maybe interval and timezone_name), so at this + // point, we'll see only structured_calendar, interval, etc. + return { + calendars: decodeOptionalStructuredCalendarSpecs(pb.structuredCalendar), + intervals: (pb.interval ?? []).map( + (x) => + { + every: optionalTsToMs(x.interval), + offset: optionalTsToMs(x.phase), + } + ), + skip: decodeOptionalStructuredCalendarSpecs(pb.excludeStructuredCalendar), + startAt: optionalTsToDate(pb.startTime), + endAt: optionalTsToDate(pb.endTime), + jitter: optionalTsToMs(pb.jitter), + }; +} + +export async function decodeScheduleAction( + dataConverter: LoadedDataConverter, + pb: temporal.api.schedule.v1.IScheduleAction +): Promise { + if (pb.startWorkflow) { + return { + type: 'startWorkflow', + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + workflowId: pb.startWorkflow.workflowId!, + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + workflowType: pb.startWorkflow.workflowType!.name!, + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + taskQueue: pb.startWorkflow.taskQueue!.name!, + args: await decodeArrayFromPayloads(dataConverter, pb.startWorkflow.input?.payloads), + memo: await decodeMapFromPayloads(dataConverter, pb.startWorkflow.memo?.fields), + retry: decompileRetryPolicy(pb.startWorkflow.retryPolicy), + searchAttributes: Object.fromEntries( + Object.entries( + mapFromPayloads( + searchAttributePayloadConverter, + pb.startWorkflow.searchAttributes?.indexedFields ?? {} + ) as SearchAttributes + ) + ), + workflowExecutionTimeout: optionalTsToMs(pb.startWorkflow.workflowExecutionTimeout), + workflowRunTimeout: optionalTsToMs(pb.startWorkflow.workflowRunTimeout), + workflowTaskTimeout: optionalTsToMs(pb.startWorkflow.workflowTaskTimeout), + }; + } + throw new Error('Unsupported schedule action'); +} + +export function decodeSearchAttributes( + pb: temporal.api.common.v1.ISearchAttributes | undefined | null +): SearchAttributes { + if (!pb?.indexedFields) return {}; + return Object.fromEntries( + Object.entries(mapFromPayloads(searchAttributePayloadConverter, pb.indexedFields) as SearchAttributes).filter( + ([_, v]) => v && v.length > 0 + ) // Filter out empty arrays returned by pre 1.18 servers + ); +} + +export function decodeScheduleRunningActions( + pb?: temporal.api.common.v1.IWorkflowExecution[] | null +): ScheduleExecutionStartWorkflowActionResult[] { + if (!pb) return []; + return pb.map( + (x): ScheduleExecutionStartWorkflowActionResult => ({ + type: 'startWorkflow', + workflow: { + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + workflowId: x.workflowId!, + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + firstExecutionRunId: x.runId!, + }, + }) + ); +} + +export function decodeScheduleRecentActions( + pb?: temporal.api.schedule.v1.IScheduleActionResult[] | null +): ScheduleExecutionResult[] { + if (!pb) return []; + return (pb as Required[]).map( + (executionResult): ScheduleExecutionResult => { + let action: ScheduleExecutionActionResult | undefined; + if (executionResult.startWorkflowResult) { + action = { + type: 'startWorkflow', + workflow: { + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + workflowId: executionResult.startWorkflowResult!.workflowId!, + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + firstExecutionRunId: executionResult.startWorkflowResult!.runId!, + }, + }; + } else throw new Error('Unsupported schedule action'); + + return { + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + scheduledAt: tsToDate(executionResult.scheduleTime!), + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + takenAt: tsToDate(executionResult.actualTime!), + action, + }; + } + ); +} diff --git a/packages/client/src/schedule-types.ts b/packages/client/src/schedule-types.ts new file mode 100644 index 000000000..259a5715a --- /dev/null +++ b/packages/client/src/schedule-types.ts @@ -0,0 +1,866 @@ +import { checkExtends, Replace, RequireAtLeastOne } from '@temporalio/common/lib/type-helpers'; +import { SearchAttributes, Workflow } from '@temporalio/common'; +import type { temporal } from '@temporalio/proto'; +import { WorkflowStartOptions } from './workflow-options'; + +/** + * The specification of a Schedule to be created, as expected by {@link ScheduleClient.create}. + * + * @experimental + */ +export interface ScheduleOptions { + /** + * Schedule Id + * + * We recommend using a meaningful business identifier. + */ + scheduleId: string; + + /** + * When Actions should be taken + */ + spec: RequireAtLeastOne; + + /** + * Which Action to take + */ + action: ScheduleOptionsAction; + + policies?: { + /** + * Controls what happens when an Action would be started by a Schedule at the same time that an older Action is still + * running. This can be changed after a Schedule has taken some Actions, and some changes might produce + * unintuitive results. In general, the later policy overrides the earlier policy. + * + * @default {@link ScheduleOverlapPolicy.SKIP} + */ + overlap?: ScheduleOverlapPolicy; + + /** + * The Temporal Server might be down or unavailable at the time when a Schedule should take an Action. When the Server + * comes back up, `catchupWindow` controls which missed Actions should be taken at that point. The default is one + * minute, which means that the Schedule attempts to take any Actions that wouldn't be more than one minute late. It + * takes those Actions according to the {@link ScheduleOverlapPolicy}. An outage that lasts longer than the Catchup + * Window could lead to missed Actions. (But you can always {@link ScheduleHandle.backfill}.) + * + * @default 1 minute + * @format number of milliseconds or {@link https://www.npmjs.com/package/ms | ms-formatted string} + */ + catchupWindow?: number | string; + + /** + * When an Action times out or reaches the end of its Retry Policy, {@link pause}. + * + * With {@link ScheduleOverlapPolicy.ALLOW_ALL}, this pause might not apply to the next Action, because the next Action + * might have already started previous to the failed one finishing. Pausing applies only to Actions that are scheduled + * to start after the failed one finishes. + * + * @default false + */ + pauseOnFailure?: boolean; + }; + + /** + * Additional non-indexed information attached to the Schedule. The values can be anything that is + * serializable by the {@link DataConverter}. + */ + memo?: Record; + + /** + * Additional indexed information attached to the Schedule. More info: + * https://docs.temporal.io/docs/typescript/search-attributes + * + * Values are always converted using {@link JsonPayloadConverter}, even when a custom Data Converter is provided. + */ + searchAttributes?: SearchAttributes; + + /** + * The initial state of the schedule, right after creation or update. + */ + state?: { + /** + * Start in paused state. + * + * @default false + */ + paused?: boolean; + + /** + * Informative human-readable message with contextual notes, e.g. the reason + * a Schedule is paused. The system may overwrite this message on certain + * conditions, e.g. when pause-on-failure happens. + */ + note?: string; + + /** + * Limit the number of Actions to take. + * + * This number is decremented after each Action is taken, and Actions are not + * taken when the number is `0` (unless {@link ScheduleHandle.trigger} is called). + * + * If `undefined`, then no such limit applies. + * + * @default undefined, which allows for unlimited exections + */ + remainingActions?: number; + + /** + * Trigger one Action immediately on create. + * + * @default false + */ + triggerImmediately?: boolean; + + /** + * Runs though the specified time periods and takes Actions as if that time passed by right now, all at once. The + * overlap policy can be overridden for the scope of the backfill. + */ + backfill?: Backfill[]; + }; +} + +/** @experimental */ +export type CompiledScheduleOptions = Replace< + ScheduleOptions, + { + action: CompiledScheduleAction; + } +>; + +/** + * The specification of an updated Schedule, as expected by {@link ScheduleHandle.update}. + * + * @experimental + */ +export type ScheduleUpdateOptions = Replace< + Omit, + { + action: Replace< + ScheduleOptions['action'], + { + // No default value on update + workflowId: string; + } + >; + state: Omit; + } +>; + +/** @experimental */ +export type CompiledScheduleUpdateOptions = Replace< + ScheduleUpdateOptions, + { + action: CompiledScheduleAction; + } +>; + +/** + * A summary description of an existing Schedule, as returned by {@link ScheduleClient.list}. + * + * @experimental + */ +export interface ScheduleSummary { + /** + * The Schedule Id. We recommend using a meaningful business identifier. + */ + scheduleId: string; + + /** + * When will Actions be taken. + */ + spec: RequireAtLeastOne; + + /** + * The Action that will be taken. + */ + action: ScheduleSummaryAction; + + /** + * Additional non-indexed information attached to the Schedule. + * The values can be anything that is serializable by the {@link DataConverter}. + */ + memo?: Record; + + /** + * Additional indexed information attached to the Schedule. + * More info: https://docs.temporal.io/docs/typescript/search-attributes + * + * Values are always converted using {@link JsonPayloadConverter}, even when a custom Data Converter is provided. + */ + searchAttributes: SearchAttributes; + + state: { + /** + * Whether Schedule is currently paused. + */ + paused: boolean; + + /** + * Informative human-readable message with contextual notes, e.g. the reason a Schedule is paused. + * The system may overwrite this message on certain conditions, e.g. when pause-on-failure happens. + */ + note?: string; + }; + + info: { + /** + * Most recent 10 Actions started (including manual triggers), sorted from older start time to newer. + */ + recentActions: ScheduleExecutionResult[]; + + /** + * Scheduled time of the next 10 executions of this Schedule + */ + nextActionTimes: Date[]; + }; +} + +/** @experimental */ +export interface ScheduleExecutionResult { + /** Time that the Action was scheduled for, including jitter */ + scheduledAt: Date; + + /** Time that the Action was actually taken */ + takenAt: Date; + + /** The Action that was taken */ + action: ScheduleExecutionActionResult; +} + +/** @experimental */ +export type ScheduleExecutionActionResult = ScheduleExecutionStartWorkflowActionResult; + +/** @experimental */ +export interface ScheduleExecutionStartWorkflowActionResult { + type: 'startWorkflow'; + workflow: { + workflowId: string; + + /** + * The Run Id of the original execution that was started by the Schedule. If the Workflow retried, did + * Continue-As-New, or was Reset, the following runs would have different Run Ids. + */ + firstExecutionRunId: string; + }; +} + +/** + * A detailed description of an exisiting Schedule, as returned by {@link ScheduleHandle.describe}. + * + * @experimental + */ +export type ScheduleDescription = ScheduleSummary & { + /** + * The Action that will be taken. + */ + action: ScheduleDescriptionAction; + + policies: { + /** + * Controls what happens when an Action would be started by a Schedule at the same time that an older Action is still + * running. + */ + overlap: ScheduleOverlapPolicy; + + /** + * The Temporal Server might be down or unavailable at the time when a Schedule should take an Action. + * When the Server comes back up, `catchupWindow` controls which missed Actions should be taken at that point. + * It takes those Actions according to the {@link ScheduleOverlapPolicy}. An outage that lasts longer than the + * Catchup Window could lead to missed Actions. (But you can always {@link ScheduleHandle.backfill}.) + * + * Unit is miliseconds. + */ + catchupWindow: number; + + /** + * When an Action times out or reaches the end of its Retry Policy, {@link pause}. + * + * With {@link ScheduleOverlapPolicy.ALLOW_ALL}, this pause might not apply to the next Action, because the next Action + * might have already started previous to the failed one finishing. Pausing applies only to Actions that are scheduled + * to start after the failed one finishes. + */ + pauseOnFailure: boolean; + }; + + state: ScheduleSummary['state'] & { + /** + * The Actions remaining in this Schedule. + * Once this number hits `0`, no further Actions are taken (unless {@link ScheduleHandle.trigger} is called). + * + * If `undefined`, then no such limit applies. + */ + remainingActions?: number; + }; + + info: ScheduleSummary['info'] & { + /** + * Number of Actions taken so far. + */ + numActionsTaken: number; + + /** + * Number of times a scheduled Action was skipped due to missing the catchup window. + */ + numActionsMissedCatchupWindow: number; + + /** + * Number of Actions skipped due to overlap. + */ + numActionsSkippedOverlap: number; + + createdAt: Date; + lastUpdatedAt: Date | undefined; + + /** + * Currently-running workflows started by this schedule. (There might be + * more than one if the overlap policy allows overlaps.) + */ + runningActions: ScheduleExecutionActionResult[]; + }; + + /** @internal */ + raw: temporal.api.workflowservice.v1.IDescribeScheduleResponse; +}; + +// Invariant: An existing ScheduleDescription can be used as template to create a new Schedule +checkExtends(); + +// Invariant: An existing ScheduleDescription can be used as template to update that Schedule +checkExtends(); + +/** + * A complete description of a set of absolute times (possibly infinite) that an Action should occur at. + * The times are the union of `calendars`, `intervals`, and `cronExpressions`, minus the `skip` times. These times + * never change, except that the definition of a time zone can change over time (most commonly, when daylight saving + * time policy changes for an area). To create a totally self-contained `ScheduleSpec`, use UTC. + * + * @experimental + */ +export interface ScheduleSpec { + /** Calendar-based specifications of times. */ + calendars?: CalendarSpec[]; + + /** Interval-based specifications of times. */ + intervals?: IntervalSpec[]; + + /** + * [Cron expressions](https://crontab.guru/). This is provided for easy migration from legacy Cron Workflows. For new + * use cases, we recommend using {@link calendars} or {@link intervals} for readability and maintainability. + * + * For example, `0 12 * * MON-WED,FRI` is every M/Tu/W/F at noon, and is equivalent to this {@link CalendarSpec}: + * + * ```ts + * { + * hour: 12, + * dayOfWeek: [{ + * start: 'MONDAY' + * end: 'WEDNESDAY' + * }, 'FRIDAY'] + * } + * ``` + * + * The string can have 5, 6, or 7 fields, separated by spaces, and they are interpreted in the + * same way as a {@link CalendarSpec}. + * + * - 5 fields: minute, hour, day_of_month, month, day_of_week + * - 6 fields: minute, hour, day_of_month, month, day_of_week, year + * - 7 fields: second, minute, hour, day_of_month, month, day_of_week, year + * + * Notes: + * + * - If year is not given, it defaults to *. + * - If second is not given, it defaults to 0. + * - Shorthands `@yearly`, `@monthly`, `@weekly`, `@daily`, and `@hourly` are also + * accepted instead of the 5-7 time fields. + * - `@every interval[/]` is accepted and gets compiled into an + * IntervalSpec instead. `` and `` should be a decimal integer + * with a unit suffix s, m, h, or d. + * - Optionally, the string can be preceded by `CRON_TZ=` or + * `TZ=`, which will get copied to {@link timezone}. + * (In which case the {@link timezone} field should be left empty.) + * - Optionally, "#" followed by a comment can appear at the end of the string. + * - Note that the special case that some cron implementations have for + * treating day_of_month and day_of_week as "or" instead of "and" when both + * are set is not implemented. + */ + cronExpressions?: string[]; + + /** + * Any matching times will be skipped. + * + * All aspects of the CalendarSpec—including seconds—must match a time for the time to be skipped. + */ + skip?: CalendarSpec[]; + // TODO see if users want to be able to skip an IntervalSpec + // https://github.com/temporalio/api/pull/230/files#r956434347 + + /** + * Any times before `startAt` will be skipped. Together, `startAt` and `endAt` make an inclusive interval. + * + * @default The beginning of time + */ + startAt?: Date; + + /** + * Any times after `endAt` will be skipped. + * + * @default The end of time + */ + endAt?: Date; + + /** + * All times will be incremented by a random value from 0 to this amount of jitter. + * + * @default 0 + * @format number of milliseconds or {@link https://www.npmjs.com/package/ms | ms-formatted string} + */ + jitter?: number | string; + + /** + * IANA timezone name, for example `US/Pacific`. + * + * https://en.wikipedia.org/wiki/List_of_tz_database_time_zones + * + * The definition will be loaded by Temporal Server from the environment it runs in. + * + * Calendar spec matching is based on literal matching of the clock time + * with no special handling of DST: if you write a calendar spec that fires + * at 2:30am and specify a time zone that follows DST, that action will not + * be triggered on the day that has no 2:30am. Similarly, an action that + * fires at 1:30am will be triggered twice on the day that has two 1:30s. + * + * Also note that no actions are taken on leap-seconds (e.g. 23:59:60 UTC). + * + * @default UTC + */ + timezone?: string; +} + +/** + * The version of {@link ScheduleSpec} that you get back from {@link ScheduleHandle.describe} and {@link ScheduleClient.list} + * + * @experimental + */ +export type ScheduleSpecDescription = Omit< + ScheduleSpec, + 'calendars' | 'intervals' | 'cronExpressions' | 'skip' | 'jitter' +> & { + /** Calendar-based specifications of times. */ + calendars?: CalendarSpecDescription[]; + + /** Interval-based specifications of times. */ + intervals?: IntervalSpecDescription[]; + + /** Any matching times will be skipped. */ + skip?: CalendarSpecDescription[]; + + /** + * All times will be incremented by a random value from 0 to this amount of jitter. + * + * @default 1 second + * @format number of milliseconds + */ + jitter?: number; +}; + +// Invariant: An existing ScheduleSpec can be used as is to create or update a Schedule +checkExtends(); + +/** + * An event specification relative to the calendar, similar to a traditional cron specification. + * + * A second in time matches if all fields match. This includes `dayOfMonth` and `dayOfWeek`. + * + * @experimental + */ +export interface CalendarSpec { + /** + * Valid values: 0–59 + * + * @default 0 + */ + second?: LooseRange | LooseRange[] | '*'; + + /** + * Valid values: 0–59 + * + * @default 0 + */ + minute?: LooseRange | LooseRange[] | '*'; + + /** + * Valid values: 0–59 + * + * @default 0 + */ + hour?: LooseRange | LooseRange[] | '*'; + + /** + * Valid values: 1–31 + * + * @default '*' + */ + dayOfMonth?: LooseRange | LooseRange[] | '*'; + + /** + * @default '*' + */ + month?: LooseRange | LooseRange[] | '*'; + + /** + * Use full years, like `2030` + * + * @default '*' + */ + year?: LooseRange | LooseRange[] | '*'; + + /** + * @default '*' + */ + dayOfWeek?: LooseRange | LooseRange[] | '*'; + + /** + * Description of the intention of this spec. + */ + comment?: string; +} + +/** + * An event specification relative to the calendar, similar to a traditional cron specification. + * + * A second in time matches if all fields match. This includes `dayOfMonth` and `dayOfWeek`. + * + * @experimental + */ +export interface CalendarSpecDescription { + /** + * Valid values: 0–59 + * + * @default Match only when second is 0 (ie. `[{ start: 0, end: 0, step: 0 }]`) + */ + second: Range[]; + + /** + * Valid values: 0–59 + * + * @default Match only when minute is 0 (ie. `[{ start: 0, end: 0, step: 0 }]`) + */ + minute: Range[]; + + /** + * Valid values: 0–23 + * + * @default Match only when hour is 0 (ie. `[{ start: 0, end: 0, step: 0 }]`) + */ + hour: Range[]; + + /** + * Valid values: 1–31 + * + * @default Match on any day (ie. `[{ start: 1, end: 31, step: 1 }]`) + */ + dayOfMonth: Range[]; + + /** + * Valid values are 'JANUARY' to 'DECEMBER'. + * + * @default Match on any month (ie. `[{ start: 'JANUARY', end: 'DECEMBER', step: 1 }]`) + */ + month: Range[]; + + /** + * Use full years, like `2030` + * + * @default Match on any year + */ + year: Range[]; + + /** + * Valid values are 'SUNDAY' to 'SATURDAY'. + * + * @default Match on any day of the week (ie. `[{ start: 'SUNDAY', end: 'SATURDAY', step: 1 }]`) + */ + dayOfWeek: Range[]; + + /** + * Description of the intention of this spec. + */ + comment?: string; +} + +/** + * IntervalSpec matches times that can be expressed as: + * + * `Epoch + (n * every) + offset` + * + * where `n` is all integers ≥ 0. + * + * For example, an `every` of 1 hour with `offset` of zero would match every hour, on the hour. The same `every` but an `offset` + * of 19 minutes would match every `xx:19:00`. An `every` of 28 days with `offset` zero would match `2022-02-17T00:00:00Z` + * (among other times). The same `every` with `offset` of 3 days, 5 hours, and 23 minutes would match `2022-02-20T05:23:00Z` + * instead. + * + * @experimental + */ +export interface IntervalSpec { + /** + * Value is rounded to the nearest second. + * + * @format number of milliseconds or {@link https://www.npmjs.com/package/ms | ms-formatted string} + */ + every: number | string; + + /** + * Value is rounded to the nearest second. + * + * @default 0 + * @format number of milliseconds or {@link https://www.npmjs.com/package/ms | ms-formatted string} + */ + offset?: number | string; +} + +/** + * IntervalSpec matches times that can be expressed as: + * + * `Epoch + (n * every) + offset` + * + * where `n` is all integers ≥ 0. + * + * For example, an `every` of 1 hour with `offset` of zero would match every hour, on the hour. The same `every` but an `offset` + * of 19 minutes would match every `xx:19:00`. An `every` of 28 days with `offset` zero would match `2022-02-17T00:00:00Z` + * (among other times). The same `every` with `offset` of 3 days, 5 hours, and 23 minutes would match `2022-02-20T05:23:00Z` + * instead. + * + * This is the version of {@link IntervalSpec} that you get back from {@link ScheduleHandle.describe} and {@link ScheduleClient.list} + * + * @experimental + */ +export interface IntervalSpecDescription { + /** + * Value is rounded to the nearest second. + * + * @format number of milliseconds + */ + every: number; + + /** + * Value is rounded to the nearest second. + * + * @format number of milliseconds + */ + offset: number; +} + +/** + * Range represents a set of values, used to match fields of a calendar. If end < start, then end is + * interpreted as equal to start. Similarly, if step is less than 1, then step is interpreted as 1. + * + * @experimental + */ +export interface Range { + /** + * Start of range (inclusive) + */ + start: Unit; + + /** + * End of range (inclusive) + * + * @default `start` + */ + end: Unit; + + /** + * The step to take between each value. + * + * @default 1 + */ + step: number; +} + +/** + * A {@link Range} definition, with support for loose syntax. + * + * For example: + * ``` + * 3 ➡️ 3 + * { start: 2 } ➡️ 2 + * { start: 2, end: 4 } ➡️ 2, 3, 4 + * { start: 2, end: 10, step: 3 } ➡️ 2, 5, 8 + * ``` + * + * @experimental + */ +export type LooseRange = + | Range + | { start: Range['start']; end?: Range['end']; step?: never } + | Unit; + +/** @experimental */ +export const MONTHS = [ + 'JANUARY', + 'FEBRUARY', + 'MARCH', + 'APRIL', + 'MAY', + 'JUNE', + 'JULY', + 'AUGUST', + 'SEPTEMBER', + 'OCTOBER', + 'NOVEMBER', + 'DECEMBER', +] as const; + +/** @experimental */ +export type Month = typeof MONTHS[number]; + +/** @experimental */ +export const DAYS_OF_WEEK = ['SUNDAY', 'MONDAY', 'TUESDAY', 'WEDNESDAY', 'THURSDAY', 'FRIDAY', 'SATURDAY'] as const; + +/** @experimental */ +export type DayOfWeek = typeof DAYS_OF_WEEK[number]; + +/** @experimental */ +export type ScheduleOptionsAction = ScheduleOptionsStartWorkflowAction; + +/** @experimental */ +export type ScheduleOptionsStartWorkflowAction = { + type: 'startWorkflow'; + workflowType: string | W; +} & Pick< + WorkflowStartOptions, + | 'taskQueue' + | 'args' + | 'memo' + | 'searchAttributes' + | 'retry' + | 'workflowExecutionTimeout' + | 'workflowRunTimeout' + | 'workflowTaskTimeout' +> & { + /** + * Workflow id to use when starting. Assign a meaningful business id. + * This ID can be used to ensure starting Workflows is idempotent. + * + * @default `${scheduleId}-workflow` + */ + workflowId?: string; + }; + +/** @experimental */ +export type ScheduleSummaryAction = ScheduleSummaryStartWorkflowAction; + +/** @experimental */ +export interface ScheduleSummaryStartWorkflowAction { + type: 'startWorkflow'; + workflowType: string; +} +/** @experimental */ +export type ScheduleDescriptionAction = ScheduleDescriptionStartWorkflowAction; + +/** @experimental */ +export type ScheduleDescriptionStartWorkflowAction = ScheduleSummaryStartWorkflowAction & + Pick< + WorkflowStartOptions, + | 'taskQueue' + | 'workflowId' + | 'args' + | 'memo' + | 'searchAttributes' + | 'retry' + | 'workflowExecutionTimeout' + | 'workflowRunTimeout' + | 'workflowTaskTimeout' + >; + +// Invariant: an existing ScheduleDescriptionAction can be used as is to create or update a schedule +checkExtends(); + +/** @experimental */ +export type CompiledScheduleAction = Replace< + ScheduleDescriptionAction, + { + workflowType: string; + args: unknown[]; + } +>; + +/** + * Policy for overlapping Actions. + * + * @experimental + */ +export enum ScheduleOverlapPolicy { + /** + * Use server default (currently SKIP). + * + * FIXME: remove this field if this issue is implemented: https://github.com/temporalio/temporal/issues/3240 + */ + UNSPECIFIED = 0, + + /** + * Don't start a new Action. + */ + SKIP, + + /** + * Start another Action as soon as the current Action completes, but only buffer one Action in this way. If another + * Action is supposed to start, but one Action is running and one is already buffered, then only the buffered one will + * be started after the running Action finishes. + */ + BUFFER_ONE, + + /** + * Allows an unlimited number of Actions to buffer. They are started sequentially. + */ + BUFFER_ALL, + + /** + * Cancels the running Action, and then starts the new Action once the cancelled one completes. + */ + CANCEL_OTHER, + + /** + * Terminate the running Action and start the new Action immediately. + */ + TERMINATE_OTHER, + + /** + * Allow any number of Actions to start immediately. + * + * This is the only policy under which multiple Actions can run concurrently. + */ + ALLOW_ALL, +} + +export type ScheduleOverlapPolicy2 = keyof typeof temporal.api.enums.v1.ScheduleOverlapPolicy; + +checkExtends< + keyof typeof temporal.api.enums.v1.ScheduleOverlapPolicy, + `SCHEDULE_OVERLAP_POLICY_${keyof typeof ScheduleOverlapPolicy}` +>(); +checkExtends< + `SCHEDULE_OVERLAP_POLICY_${keyof typeof ScheduleOverlapPolicy}`, + keyof typeof temporal.api.enums.v1.ScheduleOverlapPolicy +>(); + +/** @experimental */ +export interface Backfill { + /** + * Start of the time range to evaluate Schedule in. + */ + start: Date; + + /** + * End of the time range to evaluate Schedule in. + */ + end: Date; + + /** + * Override the Overlap Policy for this request. + * + * @default SKIP + */ + overlap?: ScheduleOverlapPolicy; +} diff --git a/packages/client/src/workflow-client.ts b/packages/client/src/workflow-client.ts index 04cf0b9ae..2274830b9 100644 --- a/packages/client/src/workflow-client.ts +++ b/packages/client/src/workflow-client.ts @@ -60,7 +60,12 @@ import { WorkflowExecutionInfo, WorkflowService, } from './types'; -import { compileWorkflowOptions, WorkflowOptions, WorkflowSignalWithStartOptions } from './workflow-options'; +import { + compileWorkflowOptions, + WorkflowOptions, + WorkflowSignalWithStartOptions, + WorkflowStartOptions, +} from './workflow-options'; import { executionInfoFromRaw } from './helpers'; /** @@ -280,11 +285,6 @@ interface WorkflowHandleOptions extends GetWorkflowHandleOptions { runIdForResult?: string; } -/** - * Options for starting a Workflow - */ -export type WorkflowStartOptions = WithWorkflowArgs; - /** * Options for {@link WorkflowClient.list} */ diff --git a/packages/client/src/workflow-options.ts b/packages/client/src/workflow-options.ts index 550f9d2fe..86f888d60 100644 --- a/packages/client/src/workflow-options.ts +++ b/packages/client/src/workflow-options.ts @@ -1,4 +1,10 @@ -import { CommonWorkflowOptions, SignalDefinition, WithCompiledWorkflowOptions } from '@temporalio/common'; +import { + CommonWorkflowOptions, + SignalDefinition, + WithCompiledWorkflowOptions, + WithWorkflowArgs, + Workflow, +} from '@temporalio/common'; export * from '@temporalio/common/lib/workflow-options'; @@ -60,3 +66,8 @@ export interface WorkflowSignalWithStartOptionsWithArgs = WithWorkflowArgs; diff --git a/packages/common/src/time.ts b/packages/common/src/time.ts index a9bed559d..1143c3076 100644 --- a/packages/common/src/time.ts +++ b/packages/common/src/time.ts @@ -78,3 +78,11 @@ export function optionalTsToDate(ts: Timestamp | null | undefined): Date | undef } return new Date(tsToMs(ts)); } + +// ts-prune-ignore-next (imported via schedule-helpers.ts) +export function optionalDateToTs(date: Date | null | undefined): Timestamp | undefined { + if (date === undefined || date === null) { + return undefined; + } + return msToTs(date.getTime()); +} diff --git a/packages/common/src/type-helpers.ts b/packages/common/src/type-helpers.ts index ea037a250..2b9d92727 100644 --- a/packages/common/src/type-helpers.ts +++ b/packages/common/src/type-helpers.ts @@ -12,6 +12,11 @@ export function checkExtends<_Orig, _Copy extends _Orig>(): void { export type Replace = Omit & New; +export type RequireAtLeastOne = Omit & + { + [K in Keys]-?: Required> & Partial>>; + }[Keys]; + export function isRecord(value: unknown): value is Record { return typeof value === 'object' && value !== null; } diff --git a/packages/test/src/test-schedules.ts b/packages/test/src/test-schedules.ts new file mode 100644 index 000000000..bad81d058 --- /dev/null +++ b/packages/test/src/test-schedules.ts @@ -0,0 +1,561 @@ +import { RUN_INTEGRATION_TESTS } from './helpers'; +import anyTest, { TestInterface } from 'ava'; +import { randomUUID } from 'crypto'; +import { Client, defaultPayloadConverter } from '@temporalio/client'; +import asyncRetry from 'async-retry'; +import { msToNumber } from '@temporalio/common/lib/time'; +import { CalendarSpec, CalendarSpecDescription, ScheduleSummary } from '@temporalio/client/lib/schedule-types'; +import { ScheduleHandle } from '@temporalio/client/lib/schedule-client'; + +export interface Context { + client: Client; +} + +const taskQueue = 'async-activity-completion'; +const test = anyTest as TestInterface; + +const dummyWorkflow = async () => undefined; +const dummyWorkflow2 = async (_x?: string) => undefined; + +const calendarSpecDescriptionDefaults: CalendarSpecDescription = { + second: [{ start: 0, end: 0, step: 1 }], + minute: [{ start: 0, end: 0, step: 1 }], + hour: [{ start: 0, end: 0, step: 1 }], + dayOfMonth: [{ start: 1, end: 31, step: 1 }], + month: [{ start: 'JANUARY', end: 'DECEMBER', step: 1 }], + dayOfWeek: [{ start: 'SUNDAY', end: 'SATURDAY', step: 1 }], + year: [], + comment: '', +}; + +if (RUN_INTEGRATION_TESTS) { + test.before(async (t) => { + t.context = { + client: new Client(), + }; + }); + + test('Can create schedule with calendar', async (t) => { + const { client } = t.context; + const scheduleId = `can-create-schedule-with-calendar-${randomUUID()}`; + const handle = await client.schedule.create({ + scheduleId, + spec: { + calendars: [{ hour: { start: 2, end: 7, step: 1 } }], + }, + action: { + type: 'startWorkflow', + workflowType: dummyWorkflow, + taskQueue, + }, + }); + + try { + const describedSchedule = await handle.describe(); + t.deepEqual(describedSchedule.spec.calendars, [ + { ...calendarSpecDescriptionDefaults, hour: [{ start: 2, end: 7, step: 1 }] }, + ]); + } finally { + await handle.delete(); + } + }); + + test('Can create schedule with intervals', async (t) => { + const { client } = t.context; + const scheduleId = `can-create-schedule-with-inteval-${randomUUID()}`; + const handle = await client.schedule.create({ + scheduleId, + spec: { + intervals: [{ every: '1h', offset: '5m' }], + }, + action: { + type: 'startWorkflow', + workflowType: dummyWorkflow, + taskQueue, + }, + }); + + try { + const describedSchedule = await handle.describe(); + t.deepEqual(describedSchedule.spec.intervals, [{ every: msToNumber('1h'), offset: msToNumber('5m') }]); + } finally { + await handle.delete(); + } + }); + + test('Can create schedule with cron syntax', async (t) => { + const { client } = t.context; + const scheduleId = `can-create-schedule-with-cron-syntax-${randomUUID()}`; + const handle = await client.schedule.create({ + scheduleId, + spec: { + cronExpressions: ['0 12 * * MON-WED,FRI'], + }, + action: { + type: 'startWorkflow', + workflowType: dummyWorkflow, + taskQueue, + }, + }); + + try { + const describedSchedule = await handle.describe(); + t.deepEqual(describedSchedule.spec.calendars, [ + { + ...calendarSpecDescriptionDefaults, + hour: [{ start: 12, end: 12, step: 1 }], + dayOfWeek: [ + { start: 'MONDAY', end: 'WEDNESDAY', step: 1 }, + { start: 'FRIDAY', end: 'FRIDAY', step: 1 }, + ], + }, + ]); + } finally { + await handle.delete(); + } + }); + + test('Can create schedule with startWorkflow action', async (t) => { + const { client } = t.context; + const scheduleId = `can-create-schedule-with-startWorkflow-action-${randomUUID()}`; + const handle = await client.schedule.create({ + scheduleId, + spec: { + calendars: [{ hour: { start: 2, end: 7, step: 1 } }], + }, + action: { + type: 'startWorkflow', + workflowType: dummyWorkflow, + taskQueue, + memo: { + 'my-memo': 'foo', + }, + searchAttributes: { + CustomKeywordField: ['test-value2'], + }, + }, + }); + + try { + const describedSchedule = await handle.describe(); + + t.is(describedSchedule.action.type, 'startWorkflow'); + t.is(describedSchedule.action.workflowType, 'dummyWorkflow'); + t.deepEqual(describedSchedule.action.memo, { 'my-memo': 'foo' }); + t.deepEqual(describedSchedule.action.searchAttributes?.CustomKeywordField, ['test-value2']); + } finally { + await handle.delete(); + } + }); + + test('Interceptor is called on create schedule', async (t) => { + const clientWithInterceptor = new Client({ + connection: t.context.client.connection, + interceptors: { + schedule: [ + { + async create(input, next) { + return next({ + ...input, + headers: { + intercepted: defaultPayloadConverter.toPayload('intercepted'), + }, + }); + }, + }, + ], + }, + }); + + const scheduleId = `interceptor-called-on-create-schedule-${randomUUID()}`; + const handle = await clientWithInterceptor.schedule.create({ + scheduleId, + spec: { + intervals: [{ every: '1h', offset: '5m' }], + }, + action: { + type: 'startWorkflow', + workflowType: dummyWorkflow, + taskQueue, + }, + }); + + try { + const describedSchedule = await handle.describe(); + const outHeaders = describedSchedule.raw.schedule?.action?.startWorkflow?.header; + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + t.is(defaultPayloadConverter.fromPayload(outHeaders!.fields!.intercepted!), 'intercepted'); + } finally { + await handle.delete(); + } + }); + + test('startWorkflow headers are kept on update', async (t) => { + const clientWithInterceptor = new Client({ + connection: t.context.client.connection, + interceptors: { + schedule: [ + { + async create(input, next) { + return next({ + ...input, + headers: { + intercepted: defaultPayloadConverter.toPayload('intercepted'), + }, + }); + }, + }, + ], + }, + }); + + const scheduleId = `startWorkflow-headerskept-on-update-${randomUUID()}`; + const handle = await clientWithInterceptor.schedule.create({ + scheduleId, + spec: { + intervals: [{ every: '1h', offset: '5m' }], + }, + action: { + type: 'startWorkflow', + workflowType: dummyWorkflow, + taskQueue, + }, + }); + + try { + // Actually perform no change to the schedule + await handle.update((x) => x); + + const describedSchedule = await handle.describe(); + const outHeaders = describedSchedule.raw.schedule?.action?.startWorkflow?.header; + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + t.is(defaultPayloadConverter.fromPayload(outHeaders!.fields!.intercepted!), 'intercepted'); + } finally { + await handle.delete(); + } + }); + + test('Can pause and unpause schedule', async (t) => { + const { client } = t.context; + const scheduleId = `can-pause-and-unpause-schedule-${randomUUID()}`; + const handle = await client.schedule.create({ + scheduleId, + spec: { + calendars: [{ hour: { start: 2, end: 7, step: 1 } }], + }, + action: { + type: 'startWorkflow', + workflowType: dummyWorkflow, + taskQueue, + memo: { + 'my-memo': 'foo', + }, + searchAttributes: { + CustomKeywordField: ['test-value2'], + }, + }, + }); + + try { + let describedSchedule = await handle.describe(); + t.false(describedSchedule.state.paused); + + await handle.pause(); + describedSchedule = await handle.describe(); + t.true(describedSchedule.state.paused); + + await handle.unpause(); + describedSchedule = await handle.describe(); + t.false(describedSchedule.state.paused); + } finally { + await handle.delete(); + } + }); + + test('Can update schedule', async (t) => { + const { client } = t.context; + const scheduleId = `can-update-schedule-${randomUUID()}`; + const handle = await client.schedule.create({ + scheduleId, + spec: { + calendars: [{ hour: { start: 2, end: 7, step: 1 } }], + }, + action: { + type: 'startWorkflow', + workflowType: dummyWorkflow, + taskQueue, + }, + }); + + try { + await handle.update((x) => ({ + ...x, + spec: { + calendars: [{ hour: { start: 6, end: 9, step: 1 } }], + }, + })); + + const describedSchedule = await handle.describe(); + t.deepEqual(describedSchedule.spec.calendars, [ + { ...calendarSpecDescriptionDefaults, hour: [{ start: 6, end: 9, step: 1 }] }, + ]); + } finally { + await handle.delete(); + } + }); + + test('Can update schedule action', async (t) => { + const { client } = t.context; + const scheduleId = `can-update-schedule-action-${randomUUID()}`; + const handle = await client.schedule.create({ + scheduleId, + spec: { + calendars: [{ hour: { start: 2, end: 7, step: 1 } }], + }, + action: { + type: 'startWorkflow', + workflowId: `${scheduleId}-workflow`, + workflowType: dummyWorkflow, + taskQueue, + }, + }); + + try { + await handle.update((x) => ({ + ...x, + action: { + type: 'startWorkflow', + workflowId: `${scheduleId}-workflow-2`, + workflowType: dummyWorkflow2, + args: ['updated'], + taskQueue, + }, + })); + + const describedSchedule = await handle.describe(); + t.is(describedSchedule.action.type, 'startWorkflow'); + t.is(describedSchedule.action.workflowType, 'dummyWorkflow2'); + t.deepEqual(describedSchedule.action.args, ['updated']); + } finally { + await handle.delete(); + } + }); + + test('Schedule updates throws without retry on validation error', async (t) => { + const { client } = t.context; + const scheduleId = `schedule-update-throws-without-retry-on-validation-error-${randomUUID()}`; + const handle = await client.schedule.create({ + scheduleId, + spec: { + calendars: [{ hour: { start: 2, end: 7, step: 1 } }], + }, + action: { + type: 'startWorkflow', + workflowType: dummyWorkflow, + taskQueue, + }, + }); + + try { + let retryCount = 0; + + await t.throwsAsync( + async (): Promise => { + retryCount++; + return handle.update((previous) => ({ + ...previous, + spec: { + calendars: [{ hour: 42 }], + }, + })); + }, + { + instanceOf: TypeError, + } + ); + + t.is(retryCount, 1); + } finally { + await handle.delete(); + } + }); + + test('Can list Schedules', async (t) => { + const { client } = t.context; + + const groupId = randomUUID(); + + const createdScheduleHandlesPromises = []; + for (let i = 10; i < 30; i++) { + const scheduleId = `can-list-schedule-${groupId}-${i}`; + createdScheduleHandlesPromises.push( + client.schedule.create({ + scheduleId, + spec: { + calendars: [{ hour: { start: 2, end: 7, step: 1 } }], + }, + action: { + type: 'startWorkflow', + workflowType: dummyWorkflow, + taskQueue, + }, + }) + ); + } + const createdScheduleHandles: { [k: string]: ScheduleHandle } = Object.fromEntries( + (await Promise.all(createdScheduleHandlesPromises)).map((x) => [x.scheduleId, x]) + ); + + try { + // Wait for visibility to stabilize + await asyncRetry( + async () => { + const listedScheduleHandles: ScheduleSummary[] = []; + // Page size is intentionnally low to guarantee multiple pages + for await (const schedule of client.schedule.list({ pageSize: 6 })) { + listedScheduleHandles.push(schedule); + } + + const listedScheduleIds = listedScheduleHandles + .map((x) => x.scheduleId) + .filter((x) => x.startsWith(`can-list-schedule-${groupId}-`)) + .sort(); + + const createdSchedulesIds = Object.values(createdScheduleHandles).map((x) => x.scheduleId); + if (createdSchedulesIds.length != listedScheduleIds.length) throw new Error('Missing list entries'); + + t.deepEqual(listedScheduleIds, createdSchedulesIds); + }, + { + retries: 60, + maxTimeout: 1000, + } + ); + + t.pass(); + } finally { + for (const handle of Object.values(createdScheduleHandles)) { + await handle.delete(); + } + } + }); + + test('Structured calendar specs are encoded and decoded properly', async (t) => { + const checks: { input: CalendarSpec; expected: CalendarSpecDescription; comment?: string }[] = [ + { + comment: 'a single value X encode to a range in the form { X, X, 1 }', + input: { + hour: 4, + dayOfWeek: 'MONDAY', + month: 'APRIL', + }, + expected: { + ...calendarSpecDescriptionDefaults, + hour: [{ start: 4, end: 4, step: 1 }], + dayOfWeek: [{ start: 'MONDAY', end: 'MONDAY', step: 1 }], + month: [{ start: 'APRIL', end: 'APRIL', step: 1 }], + }, + }, + { + comment: 'match all ranges are exact', + input: { + second: '*', + minute: '*', + hour: '*', + dayOfMonth: '*', + month: '*', + year: '*', + dayOfWeek: '*', + }, + expected: { + ...calendarSpecDescriptionDefaults, + second: [{ start: 0, end: 59, step: 1 }], + minute: [{ start: 0, end: 59, step: 1 }], + hour: [{ start: 0, end: 23, step: 1 }], + dayOfMonth: [{ start: 1, end: 31, step: 1 }], + month: [{ start: 'JANUARY', end: 'DECEMBER', step: 1 }], + year: [], + dayOfWeek: [{ start: 'SUNDAY', end: 'SATURDAY', step: 1 }], + }, + }, + { + comment: 'a mixed array of values and ranges encode properly', + input: { + hour: [4, 7, 9, { start: 15, end: 20, step: 2 }], + dayOfWeek: ['FRIDAY', 'SATURDAY', { start: 'TUESDAY', end: 'FRIDAY', step: 1 }], + month: ['DECEMBER', 'JANUARY', { start: 'APRIL', end: 'JULY', step: 3 }], + }, + expected: { + ...calendarSpecDescriptionDefaults, + hour: [ + { start: 4, end: 4, step: 1 }, + { start: 7, end: 7, step: 1 }, + { start: 9, end: 9, step: 1 }, + { start: 15, end: 20, step: 2 }, + ], + dayOfWeek: [ + { start: 'FRIDAY', end: 'FRIDAY', step: 1 }, + { start: 'SATURDAY', end: 'SATURDAY', step: 1 }, + { start: 'TUESDAY', end: 'FRIDAY', step: 1 }, + ], + month: [ + { start: 'DECEMBER', end: 'DECEMBER', step: 1 }, + { start: 'JANUARY', end: 'JANUARY', step: 1 }, + { start: 'APRIL', end: 'JULY', step: 3 }, + ], + }, + }, + { + input: { + hour: [ + { start: 2, end: 7 }, + { start: 2, end: 7, step: 1 }, + { start: 2, end: 7, step: 1 }, + { start: 2, end: 7, step: 2 }, + { start: 4, end: 0, step: 2 }, + ], + }, + expected: { + ...calendarSpecDescriptionDefaults, + hour: [ + { start: 2, end: 7, step: 1 }, + { start: 2, end: 7, step: 1 }, + { start: 2, end: 7, step: 1 }, + { start: 2, end: 7, step: 2 }, + { start: 4, end: 4, step: 2 }, + ], + }, + }, + { + input: { hour: 4 }, + expected: { ...calendarSpecDescriptionDefaults, hour: [{ start: 4, end: 4, step: 1 }] }, + }, + ]; + + const { client } = t.context; + const scheduleId = `structured-schedule-specs-encoding-${randomUUID()}`; + const handle = await client.schedule.create({ + scheduleId, + spec: { + calendars: checks.map(({ input }) => input), + }, + action: { + type: 'startWorkflow', + workflowType: dummyWorkflow, + taskQueue, + }, + }); + + try { + const describedSchedule = await handle.describe(); + const describedCalendars = describedSchedule.spec.calendars ?? []; + + t.is(describedCalendars.length, checks.length); + for (let i = 0; i < checks.length; i++) { + t.deepEqual(describedCalendars[i], checks[i].expected, checks[i].comment); + } + } finally { + await handle.delete(); + } + }); +}