-
Notifications
You must be signed in to change notification settings - Fork 838
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
refactor: use helper functions for span building
- Loading branch information
Mark Wolff
committed
Oct 15, 2019
1 parent
c14b33f
commit dd1a7d3
Showing
6 changed files
with
200 additions
and
135 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,15 +15,27 @@ | |
*/ | ||
|
||
import { BasePlugin } from '@opentelemetry/core'; | ||
import { SpanKind } from '@opentelemetry/types'; | ||
import { SpanKind, Span, CanonicalCode } from '@opentelemetry/types'; | ||
import { AttributeNames } from './enums'; | ||
import { PostgresCallback, PostgresPluginOptions } from './types'; | ||
import { PostgresPluginOptions, PgClientConnectionParams, PgPluginQueryConfig } from './types'; | ||
import * as path from 'path'; | ||
import * as pgTypes from 'pg'; | ||
import * as shimmer from 'shimmer'; | ||
|
||
// Helper function to get a low cardinality command name from the full text query | ||
function getCommandFromText(text?: string): string { | ||
if (text) { | ||
const words = text.split(' '); | ||
if (words && words.length > 0) { | ||
return words[0]; | ||
} | ||
} | ||
return 'unknown'; | ||
} | ||
|
||
export class PostgresPlugin extends BasePlugin<typeof pgTypes> { | ||
static readonly component = 'pg'; | ||
static readonly COMPONENT = 'pg'; | ||
static readonly BASE_SPAN_NAME = PostgresPlugin.COMPONENT + '.query'; | ||
readonly supportedVersions = ['^7.12.1']; | ||
protected _config: PostgresPluginOptions; | ||
|
||
|
@@ -48,113 +60,131 @@ export class PostgresPlugin extends BasePlugin<typeof pgTypes> { | |
} | ||
} | ||
|
||
// Private helper function to start a span | ||
private _pgStartSpan(client: pgTypes.Client & PgClientConnectionParams) { | ||
return this._tracer.startSpan( | ||
PostgresPlugin.BASE_SPAN_NAME, | ||
{ | ||
kind: SpanKind.CLIENT, | ||
parent: this._tracer.getCurrentSpan() || undefined, | ||
attributes: { | ||
[AttributeNames.COMPONENT]: PostgresPlugin.COMPONENT, | ||
[AttributeNames.PEER_HOST]: client.connectionParameters.host, | ||
[AttributeNames.PEER_PORT]: client.connectionParameters.port, | ||
}, | ||
} | ||
); | ||
} | ||
|
||
private _getClientQueryPatch() { | ||
const plugin = this; | ||
return (original: typeof pgTypes.Client.prototype.query) => { | ||
plugin._logger.debug( | ||
`Patching ${PostgresPlugin.component}.Client.prototype.query` | ||
`Patching ${PostgresPlugin.COMPONENT}.Client.prototype.query` | ||
); | ||
return function query(this: pgTypes.Client, ...args: unknown[]) { | ||
// setup span | ||
let callbackProvided: boolean = | ||
args.length > 1 && typeof args[args.length - 1] === 'function'; | ||
const span = plugin._tracer.startSpan( | ||
`${PostgresPlugin.component}.query`, | ||
{ | ||
kind: SpanKind.CLIENT, | ||
parent: plugin._tracer.getCurrentSpan() || undefined, | ||
attributes: { | ||
[AttributeNames.COMPONENT]: PostgresPlugin.component, | ||
[AttributeNames.PG_HOST]: (this as any).connectionParameters.host, | ||
[AttributeNames.PG_PORT]: (this as any).connectionParameters.port, | ||
}, | ||
} | ||
); | ||
|
||
try { | ||
if (typeof args[0] === 'string') { | ||
span.setAttribute(AttributeNames.PG_TEXT, args[0]); | ||
if (args[1] instanceof Array) { | ||
span.setAttribute(AttributeNames.PG_VALUES, args[1]); | ||
if (callbackProvided) { | ||
args[2] = plugin._tracer.bind(args[2]); | ||
} | ||
} else { | ||
if (callbackProvided) { | ||
args[1] = plugin._tracer.bind(args[1]); | ||
} | ||
} | ||
return function query(this: pgTypes.Client & PgClientConnectionParams, ...args: unknown[]) { | ||
let callbackProvided = false; | ||
const span = plugin._pgStartSpan(this); | ||
|
||
// Handle different client.query(...) signatures | ||
if (typeof args[0] === 'string') { | ||
if (args.length > 1 && args[1] instanceof Array) { | ||
_handleParameterizedQuery.call(this, span, ...args); | ||
} else { | ||
const config = args[0] as pgTypes.QueryConfig & { | ||
callback?: PostgresCallback; | ||
}; | ||
if (typeof config.name === 'string') { | ||
span.setAttribute(AttributeNames.PG_PLAN, config.name); | ||
} else { | ||
if (typeof config.text === 'string') { | ||
span.setAttribute(AttributeNames.PG_TEXT, config.text); | ||
} | ||
if (config.values instanceof Array) { | ||
span.setAttribute(AttributeNames.PG_VALUES, config.values); | ||
} | ||
} | ||
|
||
if (callbackProvided) { | ||
if (typeof args[1] === 'function') { | ||
args[1] = plugin._tracer.bind(args[1]); | ||
} else if (typeof args[2] === 'function') { | ||
args[2] = plugin._tracer.bind(args[2]); | ||
} | ||
} else if ( | ||
config.callback && | ||
typeof config.callback === 'function' | ||
) { | ||
callbackProvided = true; | ||
config.callback = plugin._tracer.bind(config.callback); | ||
} | ||
_handleTextQuery.call(this, span, ...args); | ||
} | ||
} else if (typeof args[0] === 'object') { | ||
_handleConfigQuery.call(this, span, ...args); | ||
} | ||
|
||
// Bind callback to parent span | ||
// TODO: end the span | ||
if (args.length > 0) { | ||
if (typeof args[args.length - 1] === 'function') { | ||
args[args.length - 1] = plugin._tracer.bind(args[args.length - 1]); | ||
} else if (typeof (args[0] as PgPluginQueryConfig).callback === 'function') { | ||
(args[0] as PgPluginQueryConfig).callback = plugin._tracer.bind((args[0] as PgPluginQueryConfig).callback); | ||
} | ||
} catch (e) { | ||
plugin._logger.warn( | ||
`pg Plugin failed to trace query: error: ${e.message}` | ||
); | ||
const result = original.apply(this, arguments as any); | ||
span.end(); | ||
return result; | ||
} | ||
|
||
const queryResult = original.apply(this, args as any); | ||
// Perform the original query | ||
const result: unknown = original.apply(this, args as never); | ||
|
||
// No callback was provided, return a promise instead (new as of [email protected]) | ||
// Bind promise to parent span and end the span | ||
if (result instanceof Promise) { | ||
return plugin._tracer.bind(result | ||
.then((result: unknown) => { | ||
// Return a pass-along promise which ends the span and then goes to user's orig resolvers | ||
return new Promise((resolve, _) => { | ||
span.setStatus({ code: CanonicalCode.OK }); | ||
span.end(); | ||
resolve(result); | ||
}); | ||
}) | ||
.catch((error: Error) => { | ||
return new Promise((_, reject) => { | ||
span.setStatus({ code: CanonicalCode.UNKNOWN }) | ||
span.end(); | ||
reject(error); | ||
}); | ||
})); | ||
} | ||
// else returns void | ||
if (!callbackProvided) { | ||
const queryResultPromise = (queryResult as unknown) as Promise< | ||
unknown | ||
>; | ||
return plugin._tracer.bind( | ||
queryResultPromise | ||
.then((result: any) => { | ||
// Return a pass-along promise which ends the span and then goes to user's orig resolvers | ||
return new Promise((resolve, _) => { | ||
span.end(); | ||
resolve(result); | ||
}); | ||
}) | ||
.catch((error: Error) => { | ||
return new Promise((_, reject) => { | ||
span.end(); | ||
reject(error); | ||
}); | ||
}) | ||
); | ||
span.setStatus({ | ||
code: CanonicalCode.INVALID_ARGUMENT, | ||
message: 'Invalid query provided to the driver' | ||
}); | ||
span.end(); | ||
} | ||
|
||
// Else a callback was provided, so just return the result | ||
span.end(); | ||
return queryResult; | ||
return result; // void | ||
}; | ||
|
||
}; | ||
} | ||
} | ||
|
||
|
||
// Queries where args[0] is a text query and 'values' was not specified | ||
function _handleTextQuery(this: pgTypes.Client & PgClientConnectionParams, span: Span, ...args: unknown[]) { | ||
// Set child span name | ||
const queryCommand = getCommandFromText(args[0] as string); | ||
span.updateName(PostgresPlugin.BASE_SPAN_NAME + ':' + queryCommand); | ||
|
||
// Set attributes | ||
span.setAttribute(AttributeNames.DB_STATEMENT, args[0]); | ||
|
||
} | ||
|
||
// Queries where args[1] is a 'values' array | ||
function _handleParameterizedQuery(this: pgTypes.Client & PgClientConnectionParams, span: Span, ...args: unknown[]) { | ||
// Set child span name | ||
const queryCommand = getCommandFromText(args[0] as string); | ||
span.updateName(PostgresPlugin.BASE_SPAN_NAME + ':' + queryCommand); | ||
|
||
// Set attributes | ||
span.setAttribute(AttributeNames.DB_STATEMENT, args[0]); | ||
span.setAttribute(AttributeNames.PG_VALUES, args[1]); | ||
} | ||
|
||
// Queries where args[0] is a QueryConfig | ||
function _handleConfigQuery(this: pgTypes.Client & PgClientConnectionParams, span: Span, ...args: unknown[]) { | ||
const config = args[0] as PgPluginQueryConfig; | ||
|
||
// Set attributes | ||
span.setAttribute(AttributeNames.DB_STATEMENT, config.text); | ||
if (config.values) { | ||
span.setAttribute(AttributeNames.PG_VALUES, config.values); | ||
} | ||
if (config.name) { | ||
span.setAttribute(AttributeNames.PG_PLAN, config.name); | ||
} | ||
|
||
// Update span name with query command; prefer plan name, if available | ||
const queryCommand = getCommandFromText(config.name || config.text); | ||
span.updateName(PostgresPlugin.BASE_SPAN_NAME + ':' + queryCommand); | ||
} | ||
|
||
const basedir = path.dirname(require.resolve('pg')); | ||
const version = require(path.join(basedir, '../', 'package.json')).version; | ||
export const plugin = new PostgresPlugin(PostgresPlugin.component, version); | ||
export const plugin = new PostgresPlugin(PostgresPlugin.COMPONENT, version); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.