diff --git a/packages/test/src/mock-native-worker.ts b/packages/test/src/mock-native-worker.ts index 57037aee8..1a0b1037d 100644 --- a/packages/test/src/mock-native-worker.ts +++ b/packages/test/src/mock-native-worker.ts @@ -1,3 +1,4 @@ +import { randomUUID } from 'crypto'; /* eslint-disable @typescript-eslint/no-non-null-assertion */ import { lastValueFrom } from 'rxjs'; import { defaultPayloadConverter, fromPayloadsAtIndex } from '@temporalio/common'; @@ -6,12 +7,8 @@ import { coresdk } from '@temporalio/proto'; import { DefaultLogger, Runtime, ShutdownError } from '@temporalio/worker'; import { byteArrayToBuffer } from '@temporalio/worker/lib/utils'; import { NativeReplayHandle, NativeWorkerLike, Worker as RealWorker } from '@temporalio/worker/lib/worker'; -import { - addDefaultWorkerOptions, - CompiledWorkerOptions, - compileWorkerOptions, - WorkerOptions, -} from '@temporalio/worker/lib/worker-options'; +import { withMetadata } from '@temporalio/worker/lib/logger'; +import { CompiledWorkerOptions, compileWorkerOptions, WorkerOptions } from '@temporalio/worker/lib/worker-options'; import type { WorkflowCreator } from '@temporalio/worker/lib/workflow/interface'; import * as activities from './activities'; @@ -163,11 +160,13 @@ export class Worker extends RealWorker { } public constructor(workflowCreator: WorkflowCreator, opts: CompiledWorkerOptions) { - // Worker.create() accesses Runtime.instance(), which has some side effects that would not happen (or that would - // happen too late) when creating a MockWorker. Force the singleton to be created now, if it doesn't already exist. - Runtime.instance(); + const logger = withMetadata(Runtime.instance().logger, { + subsystem: 'worker', + // A random workerId to make it easier to correlate logs + workerId: randomUUID(), + }); const nativeWorker = new MockNativeWorker(); - super(nativeWorker, workflowCreator, opts); + super(nativeWorker, workflowCreator, opts, logger); } public runWorkflows(...args: Parameters): Promise { @@ -183,6 +182,11 @@ export const defaultOptions: WorkerOptions = { }; export function isolateFreeWorker(options: WorkerOptions = defaultOptions): Worker { + const logger = withMetadata(Runtime.instance().logger, { + subsystem: 'worker', + // A random workerId to make it easier to correlate logs + workerId: randomUUID(), + }); return new Worker( { async createWorkflow() { @@ -192,6 +196,6 @@ export function isolateFreeWorker(options: WorkerOptions = defaultOptions): Work /* Nothing to destroy */ }, }, - compileWorkerOptions(addDefaultWorkerOptions(options)) + compileWorkerOptions(options, logger) ); } diff --git a/packages/worker/src/index.ts b/packages/worker/src/index.ts index 2c4507680..74e05d1fb 100644 --- a/packages/worker/src/index.ts +++ b/packages/worker/src/index.ts @@ -24,7 +24,7 @@ export { IllegalStateError } from '@temporalio/common'; export { ShutdownError, TransportError, UnexpectedError } from '@temporalio/core-bridge'; export { GracefulShutdownPeriodExpiredError, errors } from './errors'; // eslint-disable-line deprecation/deprecation export * from './interceptors'; -export * from './logger'; +export { DefaultLogger, LogEntry, LogLevel, LogMetadata, LogTimestamp, Logger, withMetadata } from './logger'; export { History, Runtime, RuntimeOptions, makeTelemetryFilterString } from './runtime'; export * from './sinks'; export { @@ -107,6 +107,12 @@ export { */ workflowLogAttributes, } from '@temporalio/workflow/lib/logs'; +export { + /** + * @deprecated This function is meant for internal usage. Don't use it. + */ + timeOfDayToBigint, +} from './logger'; /** * @deprecated Including `defaultWorkflowInterceptorModules` in BundlerOptions.workflowInterceptorModules is no longer required. */ diff --git a/packages/worker/src/logger.ts b/packages/worker/src/logger.ts index db6c50247..d3d6052db 100644 --- a/packages/worker/src/logger.ts +++ b/packages/worker/src/logger.ts @@ -87,3 +87,38 @@ export class DefaultLogger implements Logger { this.log('ERROR', message, meta); } } + +export function withMetadata(logger: Logger, meta: LogMetadata): Logger { + return new LoggerWithMetadata(logger, meta); +} + +class LoggerWithMetadata implements Logger { + constructor( + public readonly logger: Logger, + public readonly meta: LogMetadata + ) {} + + log(level: LogLevel, message: string, meta?: LogMetadata): void { + this.logger.log(level, message, { ...this.meta, ...meta }); + } + + trace(message: string, meta?: LogMetadata): void { + this.logger.trace(message, { ...this.meta, ...meta }); + } + + debug(message: string, meta?: LogMetadata): void { + this.logger.debug(message, { ...this.meta, ...meta }); + } + + info(message: string, meta?: LogMetadata): void { + this.logger.info(message, { ...this.meta, ...meta }); + } + + warn(message: string, meta?: LogMetadata): void { + this.logger.warn(message, { ...this.meta, ...meta }); + } + + error(message: string, meta?: LogMetadata): void { + this.logger.error(message, { ...this.meta, ...meta }); + } +} diff --git a/packages/worker/src/worker-options.ts b/packages/worker/src/worker-options.ts index 574b207c2..ae6d6c799 100644 --- a/packages/worker/src/worker-options.ts +++ b/packages/worker/src/worker-options.ts @@ -9,7 +9,7 @@ import { Context } from '@temporalio/activity'; import { ActivityInboundLogInterceptor } from './activity-log-interceptor'; import { NativeConnection } from './connection'; import { CompiledWorkerInterceptors, WorkerInterceptors } from './interceptors'; -import { Logger } from './logger'; +import { Logger, withMetadata } from './logger'; import { initLoggerSink } from './workflow/logger'; import { Runtime } from './runtime'; import { InjectedSinks } from './sinks'; @@ -615,7 +615,7 @@ export interface ReplayWorkerOptions * Build the sink used internally by the SDK to forwards log messages from the Workflow sandbox to an actual logger. * * @param logger a {@link Logger} - defaults to the {@link Runtime} singleton logger. - * + * @deprecated Calling `defaultSink()` is no longer required. To configure a custom logger, set the * {@link Runtime.logger} property instead. */ @@ -625,6 +625,13 @@ export function defaultSinks(logger?: Logger): InjectedSinks { // code that is still calling defaultSinks() expects return type to match the deprecated LoggerSinks API. Silently // cast just to mask type checking issues, even though we know this is wrong. Users shouldn't call functions directly // on the returned object anyway. + + // If no logger was provided, the legacy behavior was to _lazyly_ set the sink's logger to the Runtime's logger. + // This was required because may call defaultSinks() before the Runtime is initialized. We preserve that behavior + // here by silently not initializing the sink if no logger is provided. + // eslint-disable-next-line deprecation/deprecation + if (!logger) return {} as InjectedSinks; + // eslint-disable-next-line deprecation/deprecation return initLoggerSink(logger) as unknown as InjectedSinks; } @@ -646,7 +653,7 @@ export function appendDefaultInterceptors( return { activityInbound: [ // eslint-disable-next-line deprecation/deprecation - (ctx) => new ActivityInboundLogInterceptor(ctx, logger), + (ctx) => new ActivityInboundLogInterceptor(ctx, withMetadata(logger, { subsystem: 'activity' })), // eslint-disable-next-line deprecation/deprecation ...(interceptors.activityInbound ?? []), ], @@ -666,7 +673,7 @@ export function compileWorkerInterceptors({ }; } -export function addDefaultWorkerOptions(options: WorkerOptions): WorkerOptionsWithDefaults { +function addDefaultWorkerOptions(options: WorkerOptions, logger: Logger): WorkerOptionsWithDefaults { const { buildId, useVersioning, @@ -720,7 +727,7 @@ export function addDefaultWorkerOptions(options: WorkerOptions): WorkerOptionsWi }, nonStickyToStickyPollRatio: nonStickyToStickyPollRatio ?? 0.2, sinks: { - ...initLoggerSink(Runtime.instance().logger), + ...initLoggerSink(logger), // Fix deprecated registration of the 'defaultWorkerLogger' sink ...(sinks?.defaultWorkerLogger ? { __temporal_logger: sinks.defaultWorkerLogger } : {}), ...sinks, @@ -738,21 +745,20 @@ function isSet(env: string | undefined): boolean { return env === '1' || env === 't' || env === 'true'; } -export function compileWorkerOptions(opts: WorkerOptionsWithDefaults): CompiledWorkerOptions { +export function compileWorkerOptions(rawOpts: WorkerOptions, logger: Logger): CompiledWorkerOptions { + const opts = addDefaultWorkerOptions(rawOpts, logger); if (opts.maxCachedWorkflows !== 0 && opts.maxCachedWorkflows < 2) { - Runtime.instance().logger.warn( - 'maxCachedWorkflows must be either 0 (ie. cache is disabled) or greater than 1. Defaulting to 2.' - ); + logger.warn('maxCachedWorkflows must be either 0 (ie. cache is disabled) or greater than 1. Defaulting to 2.'); opts.maxCachedWorkflows = 2; } if (opts.maxCachedWorkflows > 0 && opts.maxConcurrentWorkflowTaskExecutions > opts.maxCachedWorkflows) { - Runtime.instance().logger.warn( + logger.warn( "maxConcurrentWorkflowTaskExecutions can't exceed maxCachedWorkflows (unless cache is disabled). Defaulting to maxCachedWorkflows." ); opts.maxConcurrentWorkflowTaskExecutions = opts.maxCachedWorkflows; } if (opts.maxCachedWorkflows > 0 && opts.maxConcurrentWorkflowTaskExecutions < 2) { - Runtime.instance().logger.warn( + logger.warn( "maxConcurrentWorkflowTaskExecutions can't be lower than 2 if maxCachedWorkflows is non-zero. Defaulting to 2." ); opts.maxConcurrentWorkflowTaskExecutions = 2; diff --git a/packages/worker/src/worker.ts b/packages/worker/src/worker.ts index b4a629a36..7184d6dae 100644 --- a/packages/worker/src/worker.ts +++ b/packages/worker/src/worker.ts @@ -1,4 +1,4 @@ -import crypto from 'node:crypto'; +import crypto, { randomUUID } from 'node:crypto'; import fs from 'node:fs/promises'; import * as path from 'node:path'; import * as vm from 'node:vm'; @@ -50,7 +50,7 @@ import { type SinkCall, type WorkflowInfo } from '@temporalio/workflow'; import { Activity, CancelReason, activityLogAttributes } from './activity'; import { extractNativeClient, extractReferenceHolders, InternalNativeConnection, NativeConnection } from './connection'; import { ActivityExecuteInput } from './interceptors'; -import { Logger } from './logger'; +import { Logger, withMetadata } from './logger'; import pkg from './pkg'; import { EvictionReason, @@ -63,7 +63,6 @@ import { History, Runtime } from './runtime'; import { CloseableGroupedObservable, closeableGroupBy, mapWithState, mergeMapWithState } from './rxutils'; import { byteArrayToBuffer, convertToParentWorkflowType } from './utils'; import { - addDefaultWorkerOptions, CompiledWorkerOptions, compileWorkerOptions, isCodeBundleOption, @@ -165,7 +164,6 @@ export interface NativeWorkerLike { completeWorkflowActivation: Promisify>; completeActivityTask: Promisify>; recordActivityHeartbeat: OmitFirstParam; - logger: Logger; } export interface NativeReplayHandle { @@ -241,10 +239,6 @@ export class NativeWorker implements NativeWorkerLike { public async finalizeShutdown(): Promise { await this.runtime.deregisterWorker(this.nativeWorker); } - - public get logger(): Logger { - return this.runtime.logger; - } } function formatTaskToken(taskToken: Uint8Array) { @@ -425,9 +419,14 @@ export class Worker { * This method initiates a connection to the server and will throw (asynchronously) on connection failure. */ public static async create(options: WorkerOptions): Promise { + const logger = withMetadata(Runtime.instance().logger, { + subsystem: 'worker', + // A random workerId to make it easier to correlate logs + workerId: randomUUID(), + }); const nativeWorkerCtor: WorkerConstructor = this.nativeWorkerCtor; - const compiledOptions = compileWorkerOptions(addDefaultWorkerOptions(options)); - Runtime.instance().logger.info('Creating worker', { + const compiledOptions = compileWorkerOptions(options, logger); + logger.info('Creating worker', { options: { ...compiledOptions, ...(compiledOptions.workflowBundle && isCodeBundleOption(compiledOptions.workflowBundle) @@ -440,7 +439,7 @@ export class Worker { : {}), }, }); - const bundle = await this.getOrCreateBundle(compiledOptions, Runtime.instance().logger); + const bundle = await this.getOrCreateBundle(compiledOptions, logger); let workflowCreator: WorkflowCreator | undefined = undefined; if (bundle) { workflowCreator = await this.createWorkflowCreator(bundle, compiledOptions); @@ -460,7 +459,7 @@ export class Worker { throw err; } extractReferenceHolders(connection).add(nativeWorker); - return new this(nativeWorker, workflowCreator, compiledOptionsWithBuildId, connection); + return new this(nativeWorker, workflowCreator, compiledOptionsWithBuildId, logger, connection); } protected static async createWorkflowCreator( @@ -599,6 +598,11 @@ export class Worker { } private static async constructReplayWorker(options: ReplayWorkerOptions): Promise<[Worker, native.HistoryPusher]> { + const logger = withMetadata(Runtime.instance().logger, { + subsystem: 'worker', + // A random workerId to make it easier to correlate logs + workerId: randomUUID(), + }); const nativeWorkerCtor: WorkerConstructor = this.nativeWorkerCtor; const fixedUpOptions: WorkerOptions = { taskQueue: (options.replayName ?? 'fake_replay_queue') + '-' + this.replayWorkerCount, @@ -606,15 +610,15 @@ export class Worker { ...options, }; this.replayWorkerCount++; - const compiledOptions = compileWorkerOptions(addDefaultWorkerOptions(fixedUpOptions)); - const bundle = await this.getOrCreateBundle(compiledOptions, Runtime.instance().logger); + const compiledOptions = compileWorkerOptions(fixedUpOptions, logger); + const bundle = await this.getOrCreateBundle(compiledOptions, logger); if (!bundle) { throw new TypeError('ReplayWorkerOptions must contain workflowsPath or workflowBundle'); } const workflowCreator = await this.createWorkflowCreator(bundle, compiledOptions); const replayHandle = await nativeWorkerCtor.createReplay(addBuildIdIfMissing(compiledOptions, bundle.code)); return [ - new this(replayHandle.worker, workflowCreator, compiledOptions, undefined, true), + new this(replayHandle.worker, workflowCreator, compiledOptions, logger, undefined, true), replayHandle.historyPusher, ]; } @@ -653,7 +657,7 @@ export class Worker { } } else if (compiledOptions.workflowsPath) { const bundler = new WorkflowCodeBundler({ - logger, + logger: withMetadata(logger, { subsystem: 'worker/bundler' }), workflowsPath: compiledOptions.workflowsPath, workflowInterceptorModules: compiledOptions.interceptors.workflowModules, failureConverterPath: compiledOptions.dataConverter?.failureConverterPath, @@ -678,6 +682,7 @@ export class Worker { */ protected readonly workflowCreator: WorkflowCreator | undefined, public readonly options: CompiledWorkerOptions, + protected readonly log: Logger, protected readonly connection?: NativeConnection, protected readonly isReplayWorker: boolean = false ) { @@ -705,10 +710,6 @@ export class Worker { return this.numCachedWorkflowsSubject; } - protected get log(): Logger { - return this.nativeWorker.logger; - } - /** * Get the poll state of this worker */ @@ -771,7 +772,10 @@ export class Worker { this.state = 'DRAINING'; } }) - .catch((error) => this.unexpectedErrorSubject.error(error)); + .catch((error) => { + this.log.warn('Failed to initiate shutdown', { error }); + this.unexpectedErrorSubject.error(error); + }); } /** diff --git a/packages/worker/src/workflow/logger.ts b/packages/worker/src/workflow/logger.ts index de4cb6c7c..5b7d44228 100644 --- a/packages/worker/src/workflow/logger.ts +++ b/packages/worker/src/workflow/logger.ts @@ -1,42 +1,37 @@ import { type LoggerSinksInternal } from '@temporalio/workflow/lib/logs'; import { type InjectedSinks } from '../sinks'; -import { Runtime } from '../runtime'; -import { type Logger } from '../logger'; +import { withMetadata, type Logger } from '../logger'; /** * Injects a logger sink that forwards to the worker's logger */ -export function initLoggerSink(logger?: Logger): InjectedSinks { +export function initLoggerSink(logger: Logger): InjectedSinks { + const loggerWithMetadata = withMetadata(logger, {}); return { __temporal_logger: { trace: { fn(_, message, attrs) { - logger ??= Runtime.instance().logger; - logger.trace(message, attrs); + loggerWithMetadata.trace(message, attrs); }, }, debug: { fn(_, message, attrs) { - logger ??= Runtime.instance().logger; - logger.debug(message, attrs); + loggerWithMetadata.debug(message, attrs); }, }, info: { fn(_, message, attrs) { - logger ??= Runtime.instance().logger; - logger.info(message, attrs); + loggerWithMetadata.info(message, attrs); }, }, warn: { fn(_, message, attrs) { - logger ??= Runtime.instance().logger; - logger.warn(message, attrs); + loggerWithMetadata.warn(message, attrs); }, }, error: { fn(_, message, attrs) { - logger ??= Runtime.instance().logger; - logger.error(message, attrs); + loggerWithMetadata.error(message, attrs); }, }, },