Skip to content

Commit

Permalink
Add subsystem metadata on log messages
Browse files Browse the repository at this point in the history
  • Loading branch information
mjameswh committed Jan 24, 2024
1 parent 6b35787 commit bc3d753
Show file tree
Hide file tree
Showing 6 changed files with 93 additions and 45 deletions.
26 changes: 15 additions & 11 deletions packages/test/src/mock-native-worker.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { randomUUID } from 'crypto';
/* eslint-disable @typescript-eslint/no-non-null-assertion */
import { lastValueFrom } from 'rxjs';
import { defaultPayloadConverter, fromPayloadsAtIndex } from '@temporalio/common';
Expand All @@ -6,12 +7,8 @@ import { coresdk } from '@temporalio/proto';
import { DefaultLogger, Runtime, ShutdownError } from '@temporalio/worker';
import { byteArrayToBuffer } from '@temporalio/worker/lib/utils';
import { NativeReplayHandle, NativeWorkerLike, Worker as RealWorker } from '@temporalio/worker/lib/worker';
import {
addDefaultWorkerOptions,
CompiledWorkerOptions,
compileWorkerOptions,
WorkerOptions,
} from '@temporalio/worker/lib/worker-options';
import { withMetadata } from '@temporalio/worker/lib/logger';
import { CompiledWorkerOptions, compileWorkerOptions, WorkerOptions } from '@temporalio/worker/lib/worker-options';
import type { WorkflowCreator } from '@temporalio/worker/lib/workflow/interface';
import * as activities from './activities';

Expand Down Expand Up @@ -163,11 +160,13 @@ export class Worker extends RealWorker {
}

public constructor(workflowCreator: WorkflowCreator, opts: CompiledWorkerOptions) {
// Worker.create() accesses Runtime.instance(), which has some side effects that would not happen (or that would
// happen too late) when creating a MockWorker. Force the singleton to be created now, if it doesn't already exist.
Runtime.instance();
const logger = withMetadata(Runtime.instance().logger, {
subsystem: 'worker',
// A random workerId to make it easier to correlate logs
workerId: randomUUID(),
});
const nativeWorker = new MockNativeWorker();
super(nativeWorker, workflowCreator, opts);
super(nativeWorker, workflowCreator, opts, logger);
}

public runWorkflows(...args: Parameters<Worker['workflow$']>): Promise<void> {
Expand All @@ -183,6 +182,11 @@ export const defaultOptions: WorkerOptions = {
};

export function isolateFreeWorker(options: WorkerOptions = defaultOptions): Worker {
const logger = withMetadata(Runtime.instance().logger, {
subsystem: 'worker',
// A random workerId to make it easier to correlate logs
workerId: randomUUID(),
});
return new Worker(
{
async createWorkflow() {
Expand All @@ -192,6 +196,6 @@ export function isolateFreeWorker(options: WorkerOptions = defaultOptions): Work
/* Nothing to destroy */
},
},
compileWorkerOptions(addDefaultWorkerOptions(options))
compileWorkerOptions(options, logger)
);
}
8 changes: 7 additions & 1 deletion packages/worker/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ export { IllegalStateError } from '@temporalio/common';
export { ShutdownError, TransportError, UnexpectedError } from '@temporalio/core-bridge';
export { GracefulShutdownPeriodExpiredError, errors } from './errors'; // eslint-disable-line deprecation/deprecation
export * from './interceptors';
export * from './logger';
export { DefaultLogger, LogEntry, LogLevel, LogMetadata, LogTimestamp, Logger, withMetadata } from './logger';
export { History, Runtime, RuntimeOptions, makeTelemetryFilterString } from './runtime';
export * from './sinks';
export {
Expand Down Expand Up @@ -107,6 +107,12 @@ export {
*/
workflowLogAttributes,
} from '@temporalio/workflow/lib/logs';
export {
/**
* @deprecated This function is meant for internal usage. Don't use it.
*/
timeOfDayToBigint,
} from './logger';
/**
* @deprecated Including `defaultWorkflowInterceptorModules` in BundlerOptions.workflowInterceptorModules is no longer required.
*/
Expand Down
35 changes: 35 additions & 0 deletions packages/worker/src/logger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,3 +87,38 @@ export class DefaultLogger implements Logger {
this.log('ERROR', message, meta);
}
}

export function withMetadata(logger: Logger, meta: LogMetadata): Logger {
return new LoggerWithMetadata(logger, meta);
}

class LoggerWithMetadata implements Logger {
constructor(
public readonly logger: Logger,
public readonly meta: LogMetadata
) {}

log(level: LogLevel, message: string, meta?: LogMetadata): void {
this.logger.log(level, message, { ...this.meta, ...meta });
}

trace(message: string, meta?: LogMetadata): void {
this.logger.trace(message, { ...this.meta, ...meta });
}

debug(message: string, meta?: LogMetadata): void {
this.logger.debug(message, { ...this.meta, ...meta });
}

info(message: string, meta?: LogMetadata): void {
this.logger.info(message, { ...this.meta, ...meta });
}

warn(message: string, meta?: LogMetadata): void {
this.logger.warn(message, { ...this.meta, ...meta });
}

error(message: string, meta?: LogMetadata): void {
this.logger.error(message, { ...this.meta, ...meta });
}
}
19 changes: 9 additions & 10 deletions packages/worker/src/worker-options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import { Context } from '@temporalio/activity';
import { ActivityInboundLogInterceptor } from './activity-log-interceptor';
import { NativeConnection } from './connection';
import { CompiledWorkerInterceptors, WorkerInterceptors } from './interceptors';
import { Logger } from './logger';
import { Logger, withMetadata } from './logger';
import { initLoggerSink } from './workflow/logger';
import { Runtime } from './runtime';
import { InjectedSinks } from './sinks';
Expand Down Expand Up @@ -646,7 +646,7 @@ export function appendDefaultInterceptors(
return {
activityInbound: [
// eslint-disable-next-line deprecation/deprecation
(ctx) => new ActivityInboundLogInterceptor(ctx, logger),
(ctx) => new ActivityInboundLogInterceptor(ctx, withMetadata(logger, { subsystem: 'activity' })),
// eslint-disable-next-line deprecation/deprecation
...(interceptors.activityInbound ?? []),
],
Expand All @@ -666,7 +666,7 @@ export function compileWorkerInterceptors({
};
}

export function addDefaultWorkerOptions(options: WorkerOptions): WorkerOptionsWithDefaults {
function addDefaultWorkerOptions(options: WorkerOptions, logger: Logger): WorkerOptionsWithDefaults {
const {
buildId,
useVersioning,
Expand Down Expand Up @@ -720,7 +720,7 @@ export function addDefaultWorkerOptions(options: WorkerOptions): WorkerOptionsWi
},
nonStickyToStickyPollRatio: nonStickyToStickyPollRatio ?? 0.2,
sinks: {
...initLoggerSink(Runtime.instance().logger),
...initLoggerSink(logger),
// Fix deprecated registration of the 'defaultWorkerLogger' sink
...(sinks?.defaultWorkerLogger ? { __temporal_logger: sinks.defaultWorkerLogger } : {}),
...sinks,
Expand All @@ -738,21 +738,20 @@ function isSet(env: string | undefined): boolean {
return env === '1' || env === 't' || env === 'true';
}

export function compileWorkerOptions(opts: WorkerOptionsWithDefaults): CompiledWorkerOptions {
export function compileWorkerOptions(rawOpts: WorkerOptions, logger: Logger): CompiledWorkerOptions {
const opts = addDefaultWorkerOptions(rawOpts, logger);
if (opts.maxCachedWorkflows !== 0 && opts.maxCachedWorkflows < 2) {
Runtime.instance().logger.warn(
'maxCachedWorkflows must be either 0 (ie. cache is disabled) or greater than 1. Defaulting to 2.'
);
logger.warn('maxCachedWorkflows must be either 0 (ie. cache is disabled) or greater than 1. Defaulting to 2.');
opts.maxCachedWorkflows = 2;
}
if (opts.maxCachedWorkflows > 0 && opts.maxConcurrentWorkflowTaskExecutions > opts.maxCachedWorkflows) {
Runtime.instance().logger.warn(
logger.warn(
"maxConcurrentWorkflowTaskExecutions can't exceed maxCachedWorkflows (unless cache is disabled). Defaulting to maxCachedWorkflows."
);
opts.maxConcurrentWorkflowTaskExecutions = opts.maxCachedWorkflows;
}
if (opts.maxCachedWorkflows > 0 && opts.maxConcurrentWorkflowTaskExecutions < 2) {
Runtime.instance().logger.warn(
logger.warn(
"maxConcurrentWorkflowTaskExecutions can't be lower than 2 if maxCachedWorkflows is non-zero. Defaulting to 2."
);
opts.maxConcurrentWorkflowTaskExecutions = 2;
Expand Down
46 changes: 25 additions & 21 deletions packages/worker/src/worker.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import crypto from 'node:crypto';
import crypto, { randomUUID } from 'node:crypto';
import fs from 'node:fs/promises';
import * as path from 'node:path';
import * as vm from 'node:vm';
Expand Down Expand Up @@ -50,7 +50,7 @@ import { type SinkCall, type WorkflowInfo } from '@temporalio/workflow';
import { Activity, CancelReason, activityLogAttributes } from './activity';
import { extractNativeClient, extractReferenceHolders, InternalNativeConnection, NativeConnection } from './connection';
import { ActivityExecuteInput } from './interceptors';
import { Logger } from './logger';
import { Logger, withMetadata } from './logger';
import pkg from './pkg';
import {
EvictionReason,
Expand All @@ -63,7 +63,6 @@ import { History, Runtime } from './runtime';
import { CloseableGroupedObservable, closeableGroupBy, mapWithState, mergeMapWithState } from './rxutils';
import { byteArrayToBuffer, convertToParentWorkflowType } from './utils';
import {
addDefaultWorkerOptions,
CompiledWorkerOptions,
compileWorkerOptions,
isCodeBundleOption,
Expand Down Expand Up @@ -165,7 +164,6 @@ export interface NativeWorkerLike {
completeWorkflowActivation: Promisify<OmitFirstParam<typeof native.workerCompleteWorkflowActivation>>;
completeActivityTask: Promisify<OmitFirstParam<typeof native.workerCompleteActivityTask>>;
recordActivityHeartbeat: OmitFirstParam<typeof native.workerRecordActivityHeartbeat>;
logger: Logger;
}

export interface NativeReplayHandle {
Expand Down Expand Up @@ -241,10 +239,6 @@ export class NativeWorker implements NativeWorkerLike {
public async finalizeShutdown(): Promise<void> {
await this.runtime.deregisterWorker(this.nativeWorker);
}

public get logger(): Logger {
return this.runtime.logger;
}
}

function formatTaskToken(taskToken: Uint8Array) {
Expand Down Expand Up @@ -425,9 +419,14 @@ export class Worker {
* This method initiates a connection to the server and will throw (asynchronously) on connection failure.
*/
public static async create(options: WorkerOptions): Promise<Worker> {
const logger = withMetadata(Runtime.instance().logger, {
subsystem: 'worker',
// A random workerId to make it easier to correlate logs
workerId: randomUUID(),
});
const nativeWorkerCtor: WorkerConstructor = this.nativeWorkerCtor;
const compiledOptions = compileWorkerOptions(addDefaultWorkerOptions(options));
Runtime.instance().logger.info('Creating worker', {
const compiledOptions = compileWorkerOptions(options, logger);
logger.info('Creating worker', {
options: {
...compiledOptions,
...(compiledOptions.workflowBundle && isCodeBundleOption(compiledOptions.workflowBundle)
Expand All @@ -440,7 +439,7 @@ export class Worker {
: {}),
},
});
const bundle = await this.getOrCreateBundle(compiledOptions, Runtime.instance().logger);
const bundle = await this.getOrCreateBundle(compiledOptions, logger);
let workflowCreator: WorkflowCreator | undefined = undefined;
if (bundle) {
workflowCreator = await this.createWorkflowCreator(bundle, compiledOptions);
Expand All @@ -460,7 +459,7 @@ export class Worker {
throw err;
}
extractReferenceHolders(connection).add(nativeWorker);
return new this(nativeWorker, workflowCreator, compiledOptionsWithBuildId, connection);
return new this(nativeWorker, workflowCreator, compiledOptionsWithBuildId, logger, connection);
}

protected static async createWorkflowCreator(
Expand Down Expand Up @@ -599,22 +598,27 @@ export class Worker {
}

private static async constructReplayWorker(options: ReplayWorkerOptions): Promise<[Worker, native.HistoryPusher]> {
const logger = withMetadata(Runtime.instance().logger, {
subsystem: 'worker',
// A random workerId to make it easier to correlate logs
workerId: randomUUID(),
});
const nativeWorkerCtor: WorkerConstructor = this.nativeWorkerCtor;
const fixedUpOptions: WorkerOptions = {
taskQueue: (options.replayName ?? 'fake_replay_queue') + '-' + this.replayWorkerCount,
debugMode: true,
...options,
};
this.replayWorkerCount++;
const compiledOptions = compileWorkerOptions(addDefaultWorkerOptions(fixedUpOptions));
const bundle = await this.getOrCreateBundle(compiledOptions, Runtime.instance().logger);
const compiledOptions = compileWorkerOptions(fixedUpOptions, logger);
const bundle = await this.getOrCreateBundle(compiledOptions, logger);
if (!bundle) {
throw new TypeError('ReplayWorkerOptions must contain workflowsPath or workflowBundle');
}
const workflowCreator = await this.createWorkflowCreator(bundle, compiledOptions);
const replayHandle = await nativeWorkerCtor.createReplay(addBuildIdIfMissing(compiledOptions, bundle.code));
return [
new this(replayHandle.worker, workflowCreator, compiledOptions, undefined, true),
new this(replayHandle.worker, workflowCreator, compiledOptions, logger, undefined, true),
replayHandle.historyPusher,
];
}
Expand Down Expand Up @@ -649,7 +653,7 @@ export class Worker {
}
} else if (compiledOptions.workflowsPath) {
const bundler = new WorkflowCodeBundler({
logger,
logger: withMetadata(logger, { subsystem: 'worker/bundler' }),
workflowsPath: compiledOptions.workflowsPath,
workflowInterceptorModules: compiledOptions.interceptors.workflowModules,
failureConverterPath: compiledOptions.dataConverter?.failureConverterPath,
Expand All @@ -674,6 +678,7 @@ export class Worker {
*/
protected readonly workflowCreator: WorkflowCreator | undefined,
public readonly options: CompiledWorkerOptions,
protected readonly log: Logger,
protected readonly connection?: NativeConnection,
protected readonly isReplayWorker: boolean = false
) {
Expand Down Expand Up @@ -701,10 +706,6 @@ export class Worker {
return this.numCachedWorkflowsSubject;
}

protected get log(): Logger {
return this.nativeWorker.logger;
}

/**
* Get the poll state of this worker
*/
Expand Down Expand Up @@ -767,7 +768,10 @@ export class Worker {
this.state = 'DRAINING';
}
})
.catch((error) => this.unexpectedErrorSubject.error(error));
.catch((error) => {
this.log.warn('Failed to initiate shutdown', { error });
this.unexpectedErrorSubject.error(error);
});
}

/**
Expand Down
4 changes: 2 additions & 2 deletions packages/worker/src/workflow/logger.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { type LoggerSinksInternal } from '@temporalio/workflow/lib/logs';
import { type InjectedSinks } from '../sinks';
import { Runtime } from '../runtime';
import { type Logger } from '../logger';
import { withMetadata, type Logger } from '../logger';

/**
* Injects a logger sink that forwards to the worker's logger
Expand All @@ -11,7 +11,7 @@ export function initLoggerSink(logger?: Logger): InjectedSinks<LoggerSinksIntern
__temporal_logger: {
trace: {
fn(_, message, attrs) {
logger ??= Runtime.instance().logger;
logger ??= withMetadata(Runtime.instance().logger);
logger.trace(message, attrs);
},
},
Expand Down

0 comments on commit bc3d753

Please sign in to comment.