Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add tracer #380

Merged
merged 5 commits into from
Jan 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,6 @@ node_modules/
**/tsconfig.tsbuildinfo
coverage/*
.vscode/*
**/object_grpc_pb.js
**/object_grpc_pb.js

.DS_Store
47 changes: 47 additions & 0 deletions packages/tracer/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
{
"name": "@ts-drp/tracer",
"version": "0.6.0",
"license": "MIT",
"repository": {
"type": "git",
"url": "git+https://github.com/topology-foundation/ts-drp.git"
},
"type": "module",
"types": "./dist/src/index.d.ts",
"files": ["src", "dist", "!dist/test", "!**/*.tsbuildinfo"],
"exports": {
".": {
"types": "./dist/src/index.d.ts",
"import": "./dist/src/index.js"
}
},
"scripts": {
"build": "tsc -b",
"clean": "rm -rf dist/ node_modules/",
"prepack": "tsc -b",
"watch": "tsc -b -w"
},
"dependencies": {
"@libp2p/opentelemetry-metrics": "^1.0.0",
"@opentelemetry/api": "^1.9.0",
"@opentelemetry/context-async-hooks": "^1.30.0",
"@opentelemetry/context-zone": "^1.30.1",
"@opentelemetry/context-zone-peer-dep": "^1.30.0",
"@opentelemetry/exporter-trace-otlp-http": "^0.57.1",
"@opentelemetry/resources": "^1.30.0",
"@opentelemetry/sdk-trace-web": "^1.30.1",
"@opentelemetry/semantic-conventions": "1.28.0",
"loglevel": "^1.9.2",
"loglevel-plugin-prefix": "^0.8.4",
"object-inspect": "^1.13.3"
},
"devDependencies": {
"@eslint/js": "^9.18.0",
"@types/object-inspect": "^1.13.0",
"@typescript-eslint/eslint-plugin": "^8.20.0",
"@typescript-eslint/parser": "^8.20.0",
"eslint": "^9.18.0",
"typescript": "^5.7.3",
"typescript-eslint": "^8.20.0"
}
}
275 changes: 275 additions & 0 deletions packages/tracer/src/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,275 @@
import { type Span, SpanStatusCode, context, trace } from "@opentelemetry/api";
import { AsyncHooksContextManager } from "@opentelemetry/context-async-hooks";
import { ZoneContextManager } from "@opentelemetry/context-zone";
import { OTLPTraceExporter } from "@opentelemetry/exporter-trace-otlp-http";
import { Resource } from "@opentelemetry/resources";
import {
BatchSpanProcessor,
type Tracer as OtTracer,
WebTracerProvider,
} from "@opentelemetry/sdk-trace-web";
import { ATTR_SERVICE_NAME } from "@opentelemetry/semantic-conventions";

let enabled = false;
let tracer: OtTracer | undefined;
let provider: WebTracerProvider | undefined;
let exporter: OTLPTraceExporter | undefined;

const DEFAULT_EXPORTER_URL = "http://127.0.0.1:4318/v1/traces";
const DEFAULT_EXPORTER_HEADERS = {
"Content-Type": "application/json",
"Access-Control-Allow-Headers": "*",
"Access-Control-Allow-Origin": "*",
};
const isWeb = typeof window !== "undefined";

export type EnableTracingOptions = {
provider?: {
serviceName?: string;
exporterUrl?: string;
exporterHeaders?: Record<string, string>;
};
};

export const enableTracing = (
tracerName: string,
opts: EnableTracingOptions = {},
): void => {
enabled = true;
initContextManager();
initProvider(opts.provider);

if (provider) {
tracer = provider.getTracer(tracerName) as OtTracer;
}
};

// disableTracing should reset the tracer, provider, and exporter
// there for testing purposes
export const disableTracing = (): void => {
enabled = false;
tracer = undefined;
provider = undefined;
exporter = undefined;
};

const initContextManager = (): void => {
if (!isWeb) {
const contextManager = new AsyncHooksContextManager();
contextManager.enable();
context.setGlobalContextManager(contextManager);
return;
}

const contextManager = new ZoneContextManager();
contextManager.enable();
context.setGlobalContextManager(contextManager);
};

export function isPromise<T>(obj: unknown): obj is Promise<T> {
return typeof (obj as { then?: unknown })?.then === "function";
}

async function wrapPromise<T>(promise: Promise<T>, span: Span): Promise<T> {
return promise
.then((res) => {
span.setStatus({ code: SpanStatusCode.OK });
return res;
})
.catch((err: Error) => {
span.recordException(err);
span.setStatus({ code: SpanStatusCode.ERROR, message: err.toString() });
throw err;
})
.finally(() => {
span.end();
});
}

export function isGenerator(obj: unknown): obj is Generator {
if (!obj) return false;
const iterator = (obj as { [Symbol.iterator]?: unknown })?.[Symbol.iterator];
if (typeof iterator !== "function") return false;

const instance = obj as { next?: unknown };
return typeof instance.next === "function";
}

function wrapGenerator<T>(gen: Generator<T>, span: Span): Generator<T> {
const iter = gen[Symbol.iterator]();

const wrapped: Generator<T> = {
next: () => {
try {
const res = iter.next();

if (res.done === true) {
span.setStatus({ code: SpanStatusCode.OK });
span.end();
}
return res;
} catch (err) {
const error = err instanceof Error ? err : new Error(String(err));
span.recordException(error);
span.setStatus({
code: SpanStatusCode.ERROR,
message: error.toString(),
});
span.end();

throw error;
}
},
return: (value) => {
return iter.return(value);
},
throw: (err) => {
return iter.throw(err);
},
[Symbol.iterator]: () => {
return wrapped;
},
};

return wrapped;
}

export function isAsyncGenerator(obj: unknown): obj is AsyncGenerator {
if (!obj) return false;
const asyncIterator = (obj as { [Symbol.asyncIterator]?: unknown })?.[
Symbol.asyncIterator
];
if (typeof asyncIterator !== "function") return false;

const instance = obj as { next?: unknown };
return typeof instance.next === "function";
}

function wrapAsyncGenerator<T>(
gen: AsyncGenerator<T>,
span: Span,
): AsyncGenerator<T> {
const iter = gen[Symbol.asyncIterator]();

const wrapped: AsyncGenerator<T> = {
next: async () => {
try {
const res = await iter.next();

if (res.done === true) {
span.setStatus({ code: SpanStatusCode.OK });
span.end();
}
return res;
} catch (err) {
const error = err instanceof Error ? err : new Error(String(err));
span.recordException(error);
span.setStatus({
code: SpanStatusCode.ERROR,
message: error.toString(),
});
span.end();

throw error;
}
},
return: async (value) => {
return iter.return(value);
},
throw: async (err) => {
return iter.throw(err);
},
[Symbol.asyncIterator]: () => {
return wrapped;
},
};

return wrapped;
}

export function traceFunc<Args extends unknown[], Return>(
name: string,
fn: (...args: Args) => Return,
setAttributes?: (span: Span, ...args: Args) => void,
): (...args: Args) => Return {
return (...args: Args): Return => {
if (!tracer || !enabled) return fn(...args);
const parentContext = context.active();
const span = tracer.startSpan(name, {}, parentContext);

if (setAttributes) {
setAttributes(span, ...args);
}

let result: Return;
const childContext = trace.setSpan(parentContext, span);
try {
result = context.with(childContext, fn, undefined, ...args);
} catch (err) {
const error = err instanceof Error ? err : new Error(String(err));
span.recordException(error);
span.setStatus({
code: SpanStatusCode.ERROR,
message: error.toString(),
});
span.end();
throw error;
}

if (isPromise<unknown>(result)) {
return wrapPromise(result, span) as Return;
}
if (isGenerator(result)) {
return wrapGenerator(result, span) as Return;
}
if (isAsyncGenerator(result)) {
return wrapAsyncGenerator(result, span) as Return;
}

span.setStatus({ code: SpanStatusCode.OK });
span.end();
return result;
};
}

const initExporter = (
opts: EnableTracingOptions["provider"],
): OTLPTraceExporter => {
if (exporter) return exporter;

exporter = new OTLPTraceExporter({
url: opts?.exporterUrl ?? DEFAULT_EXPORTER_URL,
headers: opts?.exporterHeaders
? opts.exporterHeaders
: DEFAULT_EXPORTER_HEADERS,
});

return exporter;
};

const initProvider = (opts: EnableTracingOptions["provider"]): void => {
if (provider) return;

const resource = new Resource({
[ATTR_SERVICE_NAME]: opts?.serviceName ?? "unknown_service",
});
const exporter = initExporter(opts);
const spanProcessor = new BatchSpanProcessor(exporter, {
// Configuration options for batching
maxQueueSize: 2048, // Maximum number of spans kept in the queue before dropping
scheduledDelayMillis: 5000, // Interval for sending queued spans in milliseconds
exportTimeoutMillis: 30000, // Timeout for exporting a batch
maxExportBatchSize: 512, // Maximum number of spans per batch
});

provider = new WebTracerProvider({
resource,
spanProcessors: [spanProcessor],
});

provider.register();
};

export const flush = async (): Promise<void> => {
await provider?.forceFlush();
};
Loading
Loading