Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add support for globally enabling (and disabling) otel tracing #8

Merged
merged 1 commit into from
Jul 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/publisher/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ export class Publisher {
);

// If the span's context is valid we should inject the propagation trace context.
if (isSpanContextValid(span.spanContext())) {
if (span && isSpanContextValid(span.spanContext())) {
tracing.injectSpan(span, message, enabled);
}

Expand Down
12 changes: 12 additions & 0 deletions src/pubsub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ import {CallOptions} from 'google-gax';
import {Transform} from 'stream';
import {google} from '../protos/protos';
import {SchemaServiceClient} from './v1';
import * as tracing from './telemetry-tracing';

/**
* Project ID placeholder.
Expand Down Expand Up @@ -88,6 +89,12 @@ export interface ClientConfig extends gax.GrpcClientOptions {
servicePath?: string;
port?: string | number;
sslCreds?: gax.grpc.ChannelCredentials;

/**
* Enables OpenTelemetry tracing (newer, more full implementation). This
* defaults to false/undefined
*/
enableOpenTelemetryTracing?: boolean;
}

export interface PageOptions {
Expand Down Expand Up @@ -316,6 +323,11 @@ export class PubSub {
},
options
);

if (this.options.enableOpenTelemetryTracing) {
tracing.setGloballyEnabled(true);
}

/**
* @name PubSub#isEmulator
* @type {boolean}
Expand Down
83 changes: 67 additions & 16 deletions src/telemetry-tracing.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ function getTracer(): Tracer {
*/
export enum OpenTelemetryLevel {
/**
* None: OTel support is not enabled because we found no trace provider.
* None: OTel support is not enabled because we found no trace provider, or
* the user has not enabled it.
*/
None = 0,

Expand All @@ -75,6 +76,20 @@ export enum OpenTelemetryLevel {
Modern = 2,
}

// True if user code elsewhere wants to enable OpenTelemetry support.
let globallyEnabled = false;

/**
* Manually set the OpenTelemetry enabledness.
*
* @param enabled The enabled flag to use, to override any automated methods.
* @private
* @internal
*/
export function setGloballyEnabled(enabled: boolean) {
globallyEnabled = enabled;
}

/**
* Tries to divine what sort of OpenTelemetry we're supporting. See the enum
* for the meaning of the values, and other notes.
Expand All @@ -87,16 +102,16 @@ export enum OpenTelemetryLevel {
export function isEnabled(
publishSettings?: PublishOptions
): OpenTelemetryLevel {
// If there's no trace provider attached, do nothing in any case.
const traceProvider = trace.getTracerProvider();
if (!traceProvider) {
// If we're not enabled, skip everything.
if (!globallyEnabled) {
return OpenTelemetryLevel.None;
}

if (publishSettings?.enableOpenTelemetryTracing) {
return OpenTelemetryLevel.Legacy;
}

// Enable modern support.
return OpenTelemetryLevel.Modern;
}

Expand Down Expand Up @@ -327,7 +342,14 @@ export class PubsubSpans {
return spanAttributes;
}

static createPublisherSpan(message: PubsubMessage, topicName: string): Span {
static createPublisherSpan(
message: PubsubMessage,
topicName: string
): Span | undefined {
if (!globallyEnabled) {
return undefined;
}

const topicInfo = getTopicInfo(topicName);
const span: Span = getTracer().startSpan(`${topicName} create`, {
kind: SpanKind.PRODUCER,
Expand Down Expand Up @@ -358,7 +380,11 @@ export class PubsubSpans {
message: PubsubMessage,
subName: string,
parent: Context | undefined
): Span {
): Span | undefined {
if (!globallyEnabled) {
return undefined;
}

const subInfo = getSubscriptionInfo(subName);
const name = `${subInfo.subId ?? subName} subscribe`;
const attributes = this.createAttributes(subInfo, message);
Expand Down Expand Up @@ -389,6 +415,10 @@ export class PubsubSpans {
parentSpan?: Span,
attributes?: SpanAttributes
): Span | undefined {
if (!globallyEnabled) {
return undefined;
}

const parent = message?.parentSpan ?? parentSpan;
if (parent) {
return getTracer().startSpan(
Expand All @@ -415,7 +445,11 @@ export class PubsubSpans {
static createPublishRpcSpan(
messages: MessageWithAttributes[],
topicName: string
): Span {
): Span | undefined {
if (!globallyEnabled) {
return undefined;
}

const spanAttributes = PubsubSpans.createAttributes(
getTopicInfo(topicName)
);
Expand Down Expand Up @@ -452,7 +486,11 @@ export class PubsubSpans {
static createReceiveResponseRpcSpan(
messageSpans: (Span | undefined)[],
subName: string
): Span {
): Span | undefined {
if (!globallyEnabled) {
return undefined;
}

const spanAttributes = PubsubSpans.createAttributes(
getSubscriptionInfo(subName)
);
Expand Down Expand Up @@ -647,6 +685,10 @@ export function injectSpan(
message: MessageWithAttributes,
enabled: OpenTelemetryLevel
): void {
if (!globallyEnabled) {
return;
}

if (!message.attributes) {
message.attributes = {};
}
Expand Down Expand Up @@ -714,6 +756,10 @@ export function extractSpan(
subName: string,
enabled: OpenTelemetryLevel
): Span | undefined {
if (!globallyEnabled) {
return undefined;
}

if (message.parentSpan) {
return message.parentSpan;
}
Expand Down Expand Up @@ -763,13 +809,18 @@ export const legacyExports = {
attributes?: SpanAttributes,
parent?: SpanContext
): Span {
return getTracer().startSpan(
spanName,
{
kind,
attributes,
},
parent ? trace.setSpanContext(context.active(), parent) : undefined
);
if (!globallyEnabled) {
// This isn't great, but it's the fact of the situation.
return undefined as unknown as Span;
} else {
return getTracer().startSpan(
spanName,
{
kind,
attributes,
},
parent ? trace.setSpanContext(context.active(), parent) : undefined
);
}
},
};
3 changes: 3 additions & 0 deletions test/publisher/flow-publisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,12 @@ describe('Flow control publisher', () => {

afterEach(() => {
sandbox.restore();
tracing.setGloballyEnabled(false);
});

it('should create a flow span if a parent exists', async () => {
tracing.setGloballyEnabled(true);

const fcp = new fp.FlowControlledPublisher(publisher);
const message = {
data: Buffer.from('foo'),
Expand Down
5 changes: 5 additions & 0 deletions test/publisher/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ describe('Publisher', () => {

afterEach(() => {
sandbox.restore();
tracing.setGloballyEnabled(false);
});

describe('initialization', () => {
Expand Down Expand Up @@ -193,6 +194,8 @@ describe('Publisher', () => {
});

it('export created spans', async () => {
tracing.setGloballyEnabled(true);

// Setup trace exporting
tracingPublisher = new Publisher(topic);
const msg = {data: buffer} as p.PubsubMessage;
Expand Down Expand Up @@ -374,6 +377,8 @@ describe('Publisher', () => {
});

it('should issue a warning if OpenTelemetry span context key is set', () => {
tracing.setGloballyEnabled(true);

const warnSpy = sinon.spy(console, 'warn');
const attributes = {
[tracing.legacyAttributeName]: 'foobar',
Expand Down
12 changes: 12 additions & 0 deletions test/pubsub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import * as pubsubTypes from '../src/pubsub';
import {Snapshot} from '../src/snapshot';
import * as subby from '../src/subscription';
import * as tracing from '../src/telemetry-tracing';
import {Topic} from '../src/topic';
import * as util from '../src/util';
import {Schema, SchemaTypes, ISchema, SchemaViews} from '../src/schema';
Expand Down Expand Up @@ -303,6 +304,17 @@
assert.strictEqual(pubsub.isOpen, true);
});

it('should enable OpenTelemetry if requested', () => {
const options: pubsubTypes.ClientConfig = {
enableOpenTelemetryTracing: true,
};
const pubsub = new PubSub(options);

Check warning on line 311 in test/pubsub.ts

View workflow job for this annotation

GitHub Actions / lint

'pubsub' is assigned a value but never used
assert.strictEqual(
tracing.isEnabled(),
tracing.OpenTelemetryLevel.Modern
);
});

it('should not be in the opened state after close()', async () => {
await pubsub.close?.();
assert.strictEqual(pubsub.isOpen, false);
Expand Down
5 changes: 5 additions & 0 deletions test/subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ describe('Subscriber', () => {
afterEach(() => {
sandbox.restore();
subscriber.close();
tracing.setGloballyEnabled(false);
});

describe('initialization', () => {
Expand Down Expand Up @@ -901,6 +902,8 @@ describe('Subscriber', () => {
});

it('exports a span once it is created', () => {
tracing.setGloballyEnabled(true);

subscription = new FakeSubscription() as {} as Subscription;
subscriber = new Subscriber(subscription, enableTracing);
message = new Message(subscriber, RECEIVED_MESSAGE);
Expand Down Expand Up @@ -960,6 +963,8 @@ describe('Subscriber', () => {
});

it('exports a span even when a span context is not present on message', () => {
tracing.setGloballyEnabled(true);

subscriber = new Subscriber(subscription, enableTracing);
subscriber.open();

Expand Down
8 changes: 8 additions & 0 deletions test/telemetry-tracing.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@ import {PubsubMessage} from '../src/publisher';
describe('OpenTelemetryTracer', () => {
beforeEach(() => {
exporter.reset();
otel.setGloballyEnabled(true);
});
afterEach(() => {
exporter.reset();
otel.setGloballyEnabled(false);
});

describe('project parser', () => {
Expand Down Expand Up @@ -120,6 +122,7 @@ describe('OpenTelemetryTracer', () => {
message,
'projects/test/topics/topicfoo'
);
assert.ok(span);

otel.injectSpan(span, message, otel.OpenTelemetryLevel.Legacy);

Expand Down Expand Up @@ -148,6 +151,7 @@ describe('OpenTelemetryTracer', () => {
message,
'projects/test/topics/topicfoo'
);
assert.ok(span);

const warnSpy = sinon.spy(console, 'warn');
try {
Expand Down Expand Up @@ -268,6 +272,7 @@ describe('OpenTelemetryTracer', () => {
tests.message,
tests.topicInfo.topicName!
);
assert.ok(span);
span.end();

const spans = exporter.getFinishedSpans();
Expand All @@ -291,6 +296,7 @@ describe('OpenTelemetryTracer', () => {
tests.message,
tests.topicInfo.topicName!
);
assert.ok(span);
otel.PubsubSpans.updatePublisherTopicName(
span,
'projects/foo/topics/other'
Expand All @@ -315,11 +321,13 @@ describe('OpenTelemetryTracer', () => {
tests.message,
tests.topicInfo.topicName!
);
assert.ok(parentSpan);
const span = otel.PubsubSpans.createReceiveSpan(
tests.message,
tests.subInfo.subName!,
otel.spanContextToContext(parentSpan.spanContext())
);
assert.ok(span);
span.end();
parentSpan.end();

Expand Down
Loading