Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DRAFT] Add subsystem metadata on log messages #1346

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 15 additions & 11 deletions packages/test/src/mock-native-worker.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { randomUUID } from 'crypto';
/* eslint-disable @typescript-eslint/no-non-null-assertion */
import { lastValueFrom } from 'rxjs';
import { defaultPayloadConverter, fromPayloadsAtIndex } from '@temporalio/common';
Expand All @@ -6,12 +7,8 @@ import { coresdk } from '@temporalio/proto';
import { DefaultLogger, Runtime, ShutdownError } from '@temporalio/worker';
import { byteArrayToBuffer } from '@temporalio/worker/lib/utils';
import { NativeReplayHandle, NativeWorkerLike, Worker as RealWorker } from '@temporalio/worker/lib/worker';
import {
addDefaultWorkerOptions,
CompiledWorkerOptions,
compileWorkerOptions,
WorkerOptions,
} from '@temporalio/worker/lib/worker-options';
import { withMetadata } from '@temporalio/worker/lib/logger';
import { CompiledWorkerOptions, compileWorkerOptions, WorkerOptions } from '@temporalio/worker/lib/worker-options';
import type { WorkflowCreator } from '@temporalio/worker/lib/workflow/interface';
import * as activities from './activities';

Expand Down Expand Up @@ -163,11 +160,13 @@ export class Worker extends RealWorker {
}

public constructor(workflowCreator: WorkflowCreator, opts: CompiledWorkerOptions) {
// Worker.create() accesses Runtime.instance(), which has some side effects that would not happen (or that would
// happen too late) when creating a MockWorker. Force the singleton to be created now, if it doesn't already exist.
Runtime.instance();
const logger = withMetadata(Runtime.instance().logger, {
subsystem: 'worker',
// A random workerId to make it easier to correlate logs
workerId: randomUUID(),
});
const nativeWorker = new MockNativeWorker();
super(nativeWorker, workflowCreator, opts);
super(nativeWorker, workflowCreator, opts, logger);
}

public runWorkflows(...args: Parameters<Worker['workflow$']>): Promise<void> {
Expand All @@ -183,6 +182,11 @@ export const defaultOptions: WorkerOptions = {
};

export function isolateFreeWorker(options: WorkerOptions = defaultOptions): Worker {
const logger = withMetadata(Runtime.instance().logger, {
subsystem: 'worker',
// A random workerId to make it easier to correlate logs
workerId: randomUUID(),
});
return new Worker(
{
async createWorkflow() {
Expand All @@ -192,6 +196,6 @@ export function isolateFreeWorker(options: WorkerOptions = defaultOptions): Work
/* Nothing to destroy */
},
},
compileWorkerOptions(addDefaultWorkerOptions(options))
compileWorkerOptions(options, logger)
);
}
8 changes: 7 additions & 1 deletion packages/worker/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ export { IllegalStateError } from '@temporalio/common';
export { ShutdownError, TransportError, UnexpectedError } from '@temporalio/core-bridge';
export { GracefulShutdownPeriodExpiredError, errors } from './errors'; // eslint-disable-line deprecation/deprecation
export * from './interceptors';
export * from './logger';
export { DefaultLogger, LogEntry, LogLevel, LogMetadata, LogTimestamp, Logger, withMetadata } from './logger';
export { History, Runtime, RuntimeOptions, makeTelemetryFilterString } from './runtime';
export * from './sinks';
export {
Expand Down Expand Up @@ -107,6 +107,12 @@ export {
*/
workflowLogAttributes,
} from '@temporalio/workflow/lib/logs';
export {
/**
* @deprecated This function is meant for internal usage. Don't use it.
*/
timeOfDayToBigint,
} from './logger';
/**
* @deprecated Including `defaultWorkflowInterceptorModules` in BundlerOptions.workflowInterceptorModules is no longer required.
*/
Expand Down
35 changes: 35 additions & 0 deletions packages/worker/src/logger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,3 +87,38 @@ export class DefaultLogger implements Logger {
this.log('ERROR', message, meta);
}
}

export function withMetadata(logger: Logger, meta: LogMetadata): Logger {
return new LoggerWithMetadata(logger, meta);
}

class LoggerWithMetadata implements Logger {
constructor(
public readonly logger: Logger,
public readonly meta: LogMetadata
) {}

log(level: LogLevel, message: string, meta?: LogMetadata): void {
this.logger.log(level, message, { ...this.meta, ...meta });
}

trace(message: string, meta?: LogMetadata): void {
this.logger.trace(message, { ...this.meta, ...meta });
}

debug(message: string, meta?: LogMetadata): void {
this.logger.debug(message, { ...this.meta, ...meta });
}

info(message: string, meta?: LogMetadata): void {
this.logger.info(message, { ...this.meta, ...meta });
}

warn(message: string, meta?: LogMetadata): void {
this.logger.warn(message, { ...this.meta, ...meta });
}

error(message: string, meta?: LogMetadata): void {
this.logger.error(message, { ...this.meta, ...meta });
}
}
28 changes: 17 additions & 11 deletions packages/worker/src/worker-options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import { Context } from '@temporalio/activity';
import { ActivityInboundLogInterceptor } from './activity-log-interceptor';
import { NativeConnection } from './connection';
import { CompiledWorkerInterceptors, WorkerInterceptors } from './interceptors';
import { Logger } from './logger';
import { Logger, withMetadata } from './logger';
import { initLoggerSink } from './workflow/logger';
import { Runtime } from './runtime';
import { InjectedSinks } from './sinks';
Expand Down Expand Up @@ -615,7 +615,7 @@ export interface ReplayWorkerOptions
* Build the sink used internally by the SDK to forwards log messages from the Workflow sandbox to an actual logger.
*
* @param logger a {@link Logger} - defaults to the {@link Runtime} singleton logger.
*

* @deprecated Calling `defaultSink()` is no longer required. To configure a custom logger, set the
* {@link Runtime.logger} property instead.
*/
Expand All @@ -625,6 +625,13 @@ export function defaultSinks(logger?: Logger): InjectedSinks<LoggerSinks> {
// code that is still calling defaultSinks() expects return type to match the deprecated LoggerSinks API. Silently
// cast just to mask type checking issues, even though we know this is wrong. Users shouldn't call functions directly
// on the returned object anyway.

// If no logger was provided, the legacy behavior was to _lazyly_ set the sink's logger to the Runtime's logger.
// This was required because may call defaultSinks() before the Runtime is initialized. We preserve that behavior
// here by silently not initializing the sink if no logger is provided.
// eslint-disable-next-line deprecation/deprecation
if (!logger) return {} as InjectedSinks<LoggerSinks>;

// eslint-disable-next-line deprecation/deprecation
return initLoggerSink(logger) as unknown as InjectedSinks<LoggerSinks>;
}
Expand All @@ -646,7 +653,7 @@ export function appendDefaultInterceptors(
return {
activityInbound: [
// eslint-disable-next-line deprecation/deprecation
(ctx) => new ActivityInboundLogInterceptor(ctx, logger),
(ctx) => new ActivityInboundLogInterceptor(ctx, withMetadata(logger, { subsystem: 'activity' })),
// eslint-disable-next-line deprecation/deprecation
...(interceptors.activityInbound ?? []),
],
Expand All @@ -666,7 +673,7 @@ export function compileWorkerInterceptors({
};
}

export function addDefaultWorkerOptions(options: WorkerOptions): WorkerOptionsWithDefaults {
function addDefaultWorkerOptions(options: WorkerOptions, logger: Logger): WorkerOptionsWithDefaults {
const {
buildId,
useVersioning,
Expand Down Expand Up @@ -720,7 +727,7 @@ export function addDefaultWorkerOptions(options: WorkerOptions): WorkerOptionsWi
},
nonStickyToStickyPollRatio: nonStickyToStickyPollRatio ?? 0.2,
sinks: {
...initLoggerSink(Runtime.instance().logger),
...initLoggerSink(logger),
// Fix deprecated registration of the 'defaultWorkerLogger' sink
...(sinks?.defaultWorkerLogger ? { __temporal_logger: sinks.defaultWorkerLogger } : {}),
...sinks,
Expand All @@ -738,21 +745,20 @@ function isSet(env: string | undefined): boolean {
return env === '1' || env === 't' || env === 'true';
}

export function compileWorkerOptions(opts: WorkerOptionsWithDefaults): CompiledWorkerOptions {
export function compileWorkerOptions(rawOpts: WorkerOptions, logger: Logger): CompiledWorkerOptions {
const opts = addDefaultWorkerOptions(rawOpts, logger);
if (opts.maxCachedWorkflows !== 0 && opts.maxCachedWorkflows < 2) {
Runtime.instance().logger.warn(
'maxCachedWorkflows must be either 0 (ie. cache is disabled) or greater than 1. Defaulting to 2.'
);
logger.warn('maxCachedWorkflows must be either 0 (ie. cache is disabled) or greater than 1. Defaulting to 2.');
opts.maxCachedWorkflows = 2;
}
if (opts.maxCachedWorkflows > 0 && opts.maxConcurrentWorkflowTaskExecutions > opts.maxCachedWorkflows) {
Runtime.instance().logger.warn(
logger.warn(
"maxConcurrentWorkflowTaskExecutions can't exceed maxCachedWorkflows (unless cache is disabled). Defaulting to maxCachedWorkflows."
);
opts.maxConcurrentWorkflowTaskExecutions = opts.maxCachedWorkflows;
}
if (opts.maxCachedWorkflows > 0 && opts.maxConcurrentWorkflowTaskExecutions < 2) {
Runtime.instance().logger.warn(
logger.warn(
"maxConcurrentWorkflowTaskExecutions can't be lower than 2 if maxCachedWorkflows is non-zero. Defaulting to 2."
);
opts.maxConcurrentWorkflowTaskExecutions = 2;
Expand Down
46 changes: 25 additions & 21 deletions packages/worker/src/worker.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import crypto from 'node:crypto';
import crypto, { randomUUID } from 'node:crypto';
import fs from 'node:fs/promises';
import * as path from 'node:path';
import * as vm from 'node:vm';
Expand Down Expand Up @@ -50,7 +50,7 @@ import { type SinkCall, type WorkflowInfo } from '@temporalio/workflow';
import { Activity, CancelReason, activityLogAttributes } from './activity';
import { extractNativeClient, extractReferenceHolders, InternalNativeConnection, NativeConnection } from './connection';
import { ActivityExecuteInput } from './interceptors';
import { Logger } from './logger';
import { Logger, withMetadata } from './logger';
import pkg from './pkg';
import {
EvictionReason,
Expand All @@ -63,7 +63,6 @@ import { History, Runtime } from './runtime';
import { CloseableGroupedObservable, closeableGroupBy, mapWithState, mergeMapWithState } from './rxutils';
import { byteArrayToBuffer, convertToParentWorkflowType } from './utils';
import {
addDefaultWorkerOptions,
CompiledWorkerOptions,
compileWorkerOptions,
isCodeBundleOption,
Expand Down Expand Up @@ -165,7 +164,6 @@ export interface NativeWorkerLike {
completeWorkflowActivation: Promisify<OmitFirstParam<typeof native.workerCompleteWorkflowActivation>>;
completeActivityTask: Promisify<OmitFirstParam<typeof native.workerCompleteActivityTask>>;
recordActivityHeartbeat: OmitFirstParam<typeof native.workerRecordActivityHeartbeat>;
logger: Logger;
}

export interface NativeReplayHandle {
Expand Down Expand Up @@ -241,10 +239,6 @@ export class NativeWorker implements NativeWorkerLike {
public async finalizeShutdown(): Promise<void> {
await this.runtime.deregisterWorker(this.nativeWorker);
}

public get logger(): Logger {
return this.runtime.logger;
}
}

function formatTaskToken(taskToken: Uint8Array) {
Expand Down Expand Up @@ -425,9 +419,14 @@ export class Worker {
* This method initiates a connection to the server and will throw (asynchronously) on connection failure.
*/
public static async create(options: WorkerOptions): Promise<Worker> {
const logger = withMetadata(Runtime.instance().logger, {
subsystem: 'worker',
// A random workerId to make it easier to correlate logs
workerId: randomUUID(),
});
const nativeWorkerCtor: WorkerConstructor = this.nativeWorkerCtor;
const compiledOptions = compileWorkerOptions(addDefaultWorkerOptions(options));
Runtime.instance().logger.info('Creating worker', {
const compiledOptions = compileWorkerOptions(options, logger);
logger.info('Creating worker', {
options: {
...compiledOptions,
...(compiledOptions.workflowBundle && isCodeBundleOption(compiledOptions.workflowBundle)
Expand All @@ -440,7 +439,7 @@ export class Worker {
: {}),
},
});
const bundle = await this.getOrCreateBundle(compiledOptions, Runtime.instance().logger);
const bundle = await this.getOrCreateBundle(compiledOptions, logger);
let workflowCreator: WorkflowCreator | undefined = undefined;
if (bundle) {
workflowCreator = await this.createWorkflowCreator(bundle, compiledOptions);
Expand All @@ -460,7 +459,7 @@ export class Worker {
throw err;
}
extractReferenceHolders(connection).add(nativeWorker);
return new this(nativeWorker, workflowCreator, compiledOptionsWithBuildId, connection);
return new this(nativeWorker, workflowCreator, compiledOptionsWithBuildId, logger, connection);
}

protected static async createWorkflowCreator(
Expand Down Expand Up @@ -599,22 +598,27 @@ export class Worker {
}

private static async constructReplayWorker(options: ReplayWorkerOptions): Promise<[Worker, native.HistoryPusher]> {
const logger = withMetadata(Runtime.instance().logger, {
subsystem: 'worker',
// A random workerId to make it easier to correlate logs
workerId: randomUUID(),
});
const nativeWorkerCtor: WorkerConstructor = this.nativeWorkerCtor;
const fixedUpOptions: WorkerOptions = {
taskQueue: (options.replayName ?? 'fake_replay_queue') + '-' + this.replayWorkerCount,
debugMode: true,
...options,
};
this.replayWorkerCount++;
const compiledOptions = compileWorkerOptions(addDefaultWorkerOptions(fixedUpOptions));
const bundle = await this.getOrCreateBundle(compiledOptions, Runtime.instance().logger);
const compiledOptions = compileWorkerOptions(fixedUpOptions, logger);
const bundle = await this.getOrCreateBundle(compiledOptions, logger);
if (!bundle) {
throw new TypeError('ReplayWorkerOptions must contain workflowsPath or workflowBundle');
}
const workflowCreator = await this.createWorkflowCreator(bundle, compiledOptions);
const replayHandle = await nativeWorkerCtor.createReplay(addBuildIdIfMissing(compiledOptions, bundle.code));
return [
new this(replayHandle.worker, workflowCreator, compiledOptions, undefined, true),
new this(replayHandle.worker, workflowCreator, compiledOptions, logger, undefined, true),
replayHandle.historyPusher,
];
}
Expand Down Expand Up @@ -653,7 +657,7 @@ export class Worker {
}
} else if (compiledOptions.workflowsPath) {
const bundler = new WorkflowCodeBundler({
logger,
logger: withMetadata(logger, { subsystem: 'worker/bundler' }),
workflowsPath: compiledOptions.workflowsPath,
workflowInterceptorModules: compiledOptions.interceptors.workflowModules,
failureConverterPath: compiledOptions.dataConverter?.failureConverterPath,
Expand All @@ -678,6 +682,7 @@ export class Worker {
*/
protected readonly workflowCreator: WorkflowCreator | undefined,
public readonly options: CompiledWorkerOptions,
protected readonly log: Logger,
protected readonly connection?: NativeConnection,
protected readonly isReplayWorker: boolean = false
) {
Expand Down Expand Up @@ -705,10 +710,6 @@ export class Worker {
return this.numCachedWorkflowsSubject;
}

protected get log(): Logger {
return this.nativeWorker.logger;
}

/**
* Get the poll state of this worker
*/
Expand Down Expand Up @@ -771,7 +772,10 @@ export class Worker {
this.state = 'DRAINING';
}
})
.catch((error) => this.unexpectedErrorSubject.error(error));
.catch((error) => {
this.log.warn('Failed to initiate shutdown', { error });
this.unexpectedErrorSubject.error(error);
});
}

/**
Expand Down
Loading
Loading