Skip to content

Commit

Permalink
chore: Trace propagation from json rpc client to server (#11325)
Browse files Browse the repository at this point in the history
Uses otel's `propagate` API to push the current trace from the json rpc
client to server using http headers. Client is instrumented by wrapping
the `fetch` function so it creates a new span and creates the headers,
and the server is instrumented by injecting new middleware that captures
the headers and starts a new span for the scope of the request.

Fixes #11185 
Fixes #11223 

Cherry-picks work by @alexghr from #11222

### Example run

Note that `traceId` is the same in both processes.

Bot with pxe logs:
```
[20:08:54.763] INFO: pxe:service Sent transaction 0x125eb560f33f44a28b98f0f065197eed8a8557bcb1ab2659eb34e5be0b33a9b4 {"trace_id":"d1143080037e46c037dea7cbdf8e29d6","span_id":"236692ffb270ecb4","trace_flags":"01"}
[20:08:54.763] INFO: bot Sent tx with hash 0x125eb560f33f44a28b98f0f065197eed8a8557bcb1ab2659eb34e5be0b33a9b4 {"trace_id":"d1143080037e46c037dea7cbdf8e29d6","span_id":"236692ffb270ecb4","trace_flags":"01"}
```

Node logs:
```
[20:08:54.762] INFO: node Received tx 0x125eb560f33f44a28b98f0f065197eed8a8557bcb1ab2659eb34e5be0b33a9b4 {"txHash":"0x125eb560f33f44a28b98f0f065197eed8a8557bcb1ab2659eb34e5be0b33a9b4","trace_id":"d1143080037e46c037dea7cbdf8e29d6","span_id":"5d1b907122754832","trace_flags":"01"}
```

---------

Co-authored-by: Alex Gherghisan <[email protected]>
  • Loading branch information
spalladino and alexghr authored Jan 20, 2025
1 parent 046968f commit 85ccc15
Show file tree
Hide file tree
Showing 24 changed files with 224 additions and 51 deletions.
8 changes: 4 additions & 4 deletions yarn-project/archiver/src/rpc/index.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import { type ArchiverApi, ArchiverApiSchema } from '@aztec/circuit-types';
import { createSafeJsonRpcClient, makeFetch } from '@aztec/foundation/json-rpc/client';
import { createSafeJsonRpcServer } from '@aztec/foundation/json-rpc/server';
import { createSafeJsonRpcClient } from '@aztec/foundation/json-rpc/client';
import { createTracedJsonRpcServer, makeTracedFetch } from '@aztec/telemetry-client';

export function createArchiverClient(url: string, fetch = makeFetch([1, 2, 3], true)): ArchiverApi {
export function createArchiverClient(url: string, fetch = makeTracedFetch([1, 2, 3], true)): ArchiverApi {
return createSafeJsonRpcClient<ArchiverApi>(url, ArchiverApiSchema, false, 'archiver', fetch);
}

export function createArchiverRpcServer(handler: ArchiverApi) {
return createSafeJsonRpcServer(handler, ArchiverApiSchema);
return createTracedJsonRpcServer(handler, ArchiverApiSchema);
}
4 changes: 2 additions & 2 deletions yarn-project/aztec-node/src/aztec-node/http_rpc_server.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import { type AztecNode, AztecNodeApiSchema } from '@aztec/circuit-types';
import { createSafeJsonRpcServer } from '@aztec/foundation/json-rpc/server';
import { createTracedJsonRpcServer } from '@aztec/telemetry-client';

/**
* Wrap an AztecNode instance with a JSON RPC HTTP server.
* @param node - The AztecNode
* @returns An JSON-RPC HTTP server
*/
export function createAztecNodeRpcServer(node: AztecNode) {
return createSafeJsonRpcServer(node, AztecNodeApiSchema);
return createTracedJsonRpcServer(node, AztecNodeApiSchema);
}
7 changes: 6 additions & 1 deletion yarn-project/aztec/src/cli/aztec_start_action.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {
} from '@aztec/foundation/json-rpc/server';
import { type LogFn, type Logger } from '@aztec/foundation/log';
import { fileURLToPath } from '@aztec/foundation/url';
import { getOtelJsonRpcPropagationMiddleware } from '@aztec/telemetry-client';

import { readFileSync } from 'fs';
import { dirname, resolve } from 'path';
Expand Down Expand Up @@ -102,7 +103,11 @@ export async function aztecStart(options: any, userLog: LogFn, debugLogger: Logg
installSignalHandlers(debugLogger.info, signalHandlers);

if (Object.entries(services).length > 0) {
const rpcServer = createNamespacedSafeJsonRpcServer(services, { http200OnError: false, log: debugLogger });
const rpcServer = createNamespacedSafeJsonRpcServer(services, {
http200OnError: false,
log: debugLogger,
middlewares: [getOtelJsonRpcPropagationMiddleware()],
});
const { port } = await startHttpRpcServer(rpcServer, { port: options.port });
debugLogger.info(`Aztec Server listening on port ${port}`);
}
Expand Down
3 changes: 2 additions & 1 deletion yarn-project/aztec/src/cli/cmds/start_pxe.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import {
allPxeConfigMappings,
createPXEService,
} from '@aztec/pxe';
import { makeTracedFetch } from '@aztec/telemetry-client';
import { L2BasicContractsMap, Network } from '@aztec/types/network';

import { extractRelevantOptions } from '../util.js';
Expand Down Expand Up @@ -76,7 +77,7 @@ export async function addPXE(
process.exit(1);
}

const node = deps.node ?? createAztecNodeClient(nodeUrl!);
const node = deps.node ?? createAztecNodeClient(nodeUrl!, makeTracedFetch([1, 2, 3], true));
const pxe = await createPXEService(node, pxeConfig as PXEServiceConfig);

// register basic contracts
Expand Down
3 changes: 2 additions & 1 deletion yarn-project/bot/src/factory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import { type AztecNode, type FunctionCall, type PXE } from '@aztec/circuit-type
import { Fr, deriveSigningKey } from '@aztec/circuits.js';
import { EasyPrivateTokenContract } from '@aztec/noir-contracts.js/EasyPrivateToken';
import { TokenContract } from '@aztec/noir-contracts.js/Token';
import { makeTracedFetch } from '@aztec/telemetry-client';

import { type BotConfig, SupportedTokenContracts } from './config.js';
import { getBalances, getPrivateBalance, isStandardTokenContract } from './utils.js';
Expand Down Expand Up @@ -39,7 +40,7 @@ export class BotFactory {
return;
}
this.log.info(`Using remote PXE at ${config.pxeUrl!}`);
this.pxe = createPXEClient(config.pxeUrl!);
this.pxe = createPXEClient(config.pxeUrl!, makeTracedFetch([1, 2, 3], false));
}

/**
Expand Down
5 changes: 3 additions & 2 deletions yarn-project/bot/src/rpc.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { type ApiHandler, createSafeJsonRpcServer } from '@aztec/foundation/json-rpc/server';
import { type ApiHandler } from '@aztec/foundation/json-rpc/server';
import { createTracedJsonRpcServer } from '@aztec/telemetry-client';

import { BotRunnerApiSchema } from './interface.js';
import { type BotRunner } from './runner.js';
Expand All @@ -9,7 +10,7 @@ import { type BotRunner } from './runner.js';
* @returns An JSON-RPC HTTP server
*/
export function createBotRunnerRpcServer(botRunner: BotRunner) {
createSafeJsonRpcServer(botRunner, BotRunnerApiSchema, {
createTracedJsonRpcServer(botRunner, BotRunnerApiSchema, {
http200OnError: false,
healthCheck: botRunner.isHealthy.bind(botRunner),
});
Expand Down
15 changes: 10 additions & 5 deletions yarn-project/bot/src/runner.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { type AztecNode, type PXE, createAztecNodeClient, createLogger } from '@aztec/aztec.js';
import { RunningPromise } from '@aztec/foundation/running-promise';
import { type TelemetryClient, type Traceable, type Tracer, trackSpan } from '@aztec/telemetry-client';
import { type TelemetryClient, type Traceable, type Tracer, makeTracedFetch, trackSpan } from '@aztec/telemetry-client';

import { Bot } from './bot.js';
import { type BotConfig } from './config.js';
Expand All @@ -26,19 +26,24 @@ export class BotRunner implements BotRunnerApi, Traceable {
if (!dependencies.node && !config.nodeUrl) {
throw new Error(`Missing node URL in config or dependencies`);
}
this.node = dependencies.node ?? createAztecNodeClient(config.nodeUrl!);
this.node = dependencies.node ?? createAztecNodeClient(config.nodeUrl!, makeTracedFetch([1, 2, 3], true));
this.runningPromise = new RunningPromise(() => this.#work(), this.log, config.txIntervalSeconds * 1000);
}

/** Initializes the bot if needed. Blocks until the bot setup is finished. */
public async setup() {
if (!this.bot) {
this.log.verbose(`Setting up bot`);
await this.#createBot();
this.log.info(`Bot set up completed`);
await this.doSetup();
}
}

@trackSpan('Bot.setup')
private async doSetup() {
this.log.verbose(`Setting up bot`);
await this.#createBot();
this.log.info(`Bot set up completed`);
}

/**
* Initializes the bot if needed and starts sending txs at regular intervals.
* Blocks until the bot setup is finished.
Expand Down
20 changes: 14 additions & 6 deletions yarn-project/foundation/src/json-rpc/client/fetch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ export async function defaultFetch(
rpcMethod: string,
body: any,
useApiEndpoints: boolean,
extraHeaders: Record<string, string> = {},
noRetry = false,
) {
log.debug(format(`JsonRpcClient.fetch`, host, rpcMethod, '->', body));
Expand All @@ -30,13 +31,13 @@ export async function defaultFetch(
resp = await fetch(`${host}/${rpcMethod}`, {
method: 'POST',
body: jsonStringify(body),
headers: { 'content-type': 'application/json' },
headers: { 'content-type': 'application/json', ...extraHeaders },
});
} else {
resp = await fetch(host, {
method: 'POST',
body: jsonStringify({ ...body, method: rpcMethod }),
headers: { 'content-type': 'application/json' },
headers: { 'content-type': 'application/json', ...extraHeaders },
});
}
} catch (err) {
Expand All @@ -55,7 +56,7 @@ export async function defaultFetch(
}

if (!resp.ok) {
const errorMessage = `(JSON-RPC PROPAGATED) (host ${host}) (method ${rpcMethod}) (code ${resp.status}) ${responseJson.error.message}`;
const errorMessage = `Error ${resp.status} from server ${host} on ${rpcMethod}: ${responseJson.error.message}`;
if (noRetry || (resp.status >= 400 && resp.status < 500)) {
throw new NoRetryError(errorMessage);
} else {
Expand All @@ -73,10 +74,17 @@ export async function defaultFetch(
* @param log - Optional logger for logging attempts.
* @returns A fetch function.
*/
export function makeFetch(retries: number[], defaultNoRetry: boolean, log?: Logger) {
return async (host: string, rpcMethod: string, body: any, useApiEndpoints: boolean, noRetry?: boolean) => {
export function makeFetch(retries: number[], defaultNoRetry: boolean, log?: Logger): typeof defaultFetch {
return async (
host: string,
rpcMethod: string,
body: any,
useApiEndpoints: boolean,
extraHeaders: Record<string, string> = {},
noRetry?: boolean,
) => {
return await retry(
() => defaultFetch(host, rpcMethod, body, useApiEndpoints, noRetry ?? defaultNoRetry),
() => defaultFetch(host, rpcMethod, body, useApiEndpoints, extraHeaders, noRetry ?? defaultNoRetry),
`JsonRpcClient request ${rpcMethod} to ${host}`,
makeBackoff(retries),
log,
Expand Down
38 changes: 27 additions & 11 deletions yarn-project/foundation/src/json-rpc/server/safe_json_rpc_server.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import cors from '@koa/cors';
import http from 'http';
import Koa from 'koa';
import { type default as Application, default as Koa } from 'koa';
import bodyParser from 'koa-bodyparser';
import compress from 'koa-compress';
import Router from 'koa-router';
Expand All @@ -14,6 +14,15 @@ import { type ApiSchema, type ApiSchemaFor, parseWithOptionals, schemaHasMethod
import { jsonStringify } from '../convert.js';
import { assert } from '../js_utils.js';

export type DiagnosticsData = {
id: number | string | null;
method: string;
params: any[];
headers: http.IncomingHttpHeaders;
};

export type DiagnosticsMiddleware = (ctx: DiagnosticsData, next: () => Promise<void>) => Promise<void>;

export class SafeJsonRpcServer {
/**
* The HTTP server accepting remote requests.
Expand All @@ -31,6 +40,8 @@ export class SafeJsonRpcServer {
private http200OnError = false,
/** Health check function */
private readonly healthCheck: StatusCheckFn = () => true,
/** Additional middlewares */
private extraMiddlewares: Application.Middleware[] = [],
/** Logger */
private log = createLogger('json-rpc:server'),
) {}
Expand Down Expand Up @@ -90,8 +101,11 @@ export class SafeJsonRpcServer {
this.log.error(`Error on API handler: ${error}`);
});

app.use(compress({ br: false } as any));
app.use(compress({ br: false }));
app.use(jsonResponse);
for (const middleware of this.extraMiddlewares) {
app.use(middleware);
}
app.use(exceptionHandler);
app.use(bodyParser({ jsonLimit: '50mb', enableTypes: ['json'], detectJSON: () => true }));
app.use(cors());
Expand All @@ -114,7 +128,9 @@ export class SafeJsonRpcServer {
// Fail if not a registered function in the proxy
if (typeof method !== 'string' || method === 'constructor' || !this.proxy.hasMethod(method)) {
ctx.status = 400;
ctx.body = { jsonrpc, id, error: { code: -32601, message: `Method not found: ${method}` } };
const code = -32601;
const message = `Method not found: ${method}`;
ctx.body = { jsonrpc, id, error: { code, message } };
} else {
ctx.status = 200;
const result = await this.proxy.call(method, params);
Expand Down Expand Up @@ -263,10 +279,11 @@ function makeAggregateHealthcheck(namedHandlers: NamespacedApiHandlers, log?: Lo
};
}

type SafeJsonRpcServerOptions = {
export type SafeJsonRpcServerOptions = {
http200OnError: boolean;
healthCheck?: StatusCheckFn;
log?: Logger;
middlewares?: Application.Middleware[];
};

/**
Expand All @@ -276,25 +293,24 @@ type SafeJsonRpcServerOptions = {
*/
export function createNamespacedSafeJsonRpcServer(
handlers: NamespacedApiHandlers,
options: Omit<SafeJsonRpcServerOptions, 'healthcheck'> = {
http200OnError: false,
options: Partial<Omit<SafeJsonRpcServerOptions, 'healthcheck'>> = {
log: createLogger('json-rpc:server'),
},
): SafeJsonRpcServer {
const { http200OnError, log } = options;
const { middlewares, http200OnError, log } = options;
const proxy = new NamespacedSafeJsonProxy(handlers);
const healthCheck = makeAggregateHealthcheck(handlers, log);
return new SafeJsonRpcServer(proxy, http200OnError, healthCheck, log);
return new SafeJsonRpcServer(proxy, http200OnError, healthCheck, middlewares, log);
}

export function createSafeJsonRpcServer<T extends object = any>(
handler: T,
schema: ApiSchemaFor<T>,
options: SafeJsonRpcServerOptions = { http200OnError: false },
options: Partial<SafeJsonRpcServerOptions> = {},
) {
const { http200OnError, log, healthCheck } = options;
const { http200OnError, log, healthCheck, middlewares: extraMiddlewares } = options;
const proxy = new SafeJsonProxy(handler, schema);
return new SafeJsonRpcServer(proxy, http200OnError, healthCheck, log);
return new SafeJsonRpcServer(proxy, http200OnError, healthCheck, extraMiddlewares, log);
}

/**
Expand Down
Empty file.
10 changes: 5 additions & 5 deletions yarn-project/prover-client/src/prover-agent/rpc.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import { ProverAgentApiSchema, type ProvingJobSource, ProvingJobSourceSchema } from '@aztec/circuit-types';
import { createSafeJsonRpcClient, makeFetch } from '@aztec/foundation/json-rpc/client';
import { createSafeJsonRpcServer } from '@aztec/foundation/json-rpc/server';
import { createSafeJsonRpcClient } from '@aztec/foundation/json-rpc/client';
import { createTracedJsonRpcServer, makeTracedFetch } from '@aztec/telemetry-client';

import { type ProverAgent } from './prover-agent.js';

export function createProvingJobSourceServer(queue: ProvingJobSource) {
return createSafeJsonRpcServer(queue, ProvingJobSourceSchema);
return createTracedJsonRpcServer(queue, ProvingJobSourceSchema);
}

export function createProvingJobSourceClient(url: string, fetch = makeFetch([1, 2, 3], false)): ProvingJobSource {
export function createProvingJobSourceClient(url: string, fetch = makeTracedFetch([1, 2, 3], false)): ProvingJobSource {
return createSafeJsonRpcClient(url, ProvingJobSourceSchema, false, 'provingJobSource', fetch);
}

Expand All @@ -18,5 +18,5 @@ export function createProvingJobSourceClient(url: string, fetch = makeFetch([1,
* @returns An JSON-RPC HTTP server
*/
export function createProverAgentRpcServer(agent: ProverAgent) {
return createSafeJsonRpcServer(agent, ProverAgentApiSchema);
return createTracedJsonRpcServer(agent, ProverAgentApiSchema);
}
19 changes: 13 additions & 6 deletions yarn-project/prover-client/src/proving_broker/rpc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@ import {
ProvingJobStatus,
ProvingRequestType,
} from '@aztec/circuit-types';
import { createSafeJsonRpcClient, makeFetch } from '@aztec/foundation/json-rpc/client';
import { type SafeJsonRpcServer, createSafeJsonRpcServer } from '@aztec/foundation/json-rpc/server';
import { createSafeJsonRpcClient } from '@aztec/foundation/json-rpc/client';
import { type SafeJsonRpcServer } from '@aztec/foundation/json-rpc/server';
import { type ApiSchemaFor, optional } from '@aztec/foundation/schemas';
import { createTracedJsonRpcServer, makeTracedFetch } from '@aztec/telemetry-client';

import { z } from 'zod';

Expand Down Expand Up @@ -47,17 +48,23 @@ export const ProvingJobBrokerSchema: ApiSchemaFor<ProvingJobBroker> = {
};

export function createProvingBrokerServer(broker: ProvingJobBroker): SafeJsonRpcServer {
return createSafeJsonRpcServer(broker, ProvingJobBrokerSchema);
return createTracedJsonRpcServer(broker, ProvingJobBrokerSchema);
}

export function createProvingJobBrokerClient(url: string, fetch = makeFetch([1, 2, 3], false)): ProvingJobBroker {
export function createProvingJobBrokerClient(url: string, fetch = makeTracedFetch([1, 2, 3], false)): ProvingJobBroker {
return createSafeJsonRpcClient(url, ProvingJobBrokerSchema, false, 'proverBroker', fetch);
}

export function createProvingJobProducerClient(url: string, fetch = makeFetch([1, 2, 3], false)): ProvingJobProducer {
export function createProvingJobProducerClient(
url: string,
fetch = makeTracedFetch([1, 2, 3], false),
): ProvingJobProducer {
return createSafeJsonRpcClient(url, ProvingJobProducerSchema, false, 'provingJobProducer', fetch);
}

export function createProvingJobConsumerClient(url: string, fetch = makeFetch([1, 2, 3], false)): ProvingJobConsumer {
export function createProvingJobConsumerClient(
url: string,
fetch = makeTracedFetch([1, 2, 3], false),
): ProvingJobConsumer {
return createSafeJsonRpcClient(url, ProvingJobConsumerSchema, false, 'provingJobConsumer', fetch);
}
4 changes: 2 additions & 2 deletions yarn-project/prover-node/src/http.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { ProverNodeApiSchema } from '@aztec/circuit-types';
import { createSafeJsonRpcServer } from '@aztec/foundation/json-rpc/server';
import { createTracedJsonRpcServer } from '@aztec/telemetry-client';

import { type ProverNode } from './prover-node.js';

Expand All @@ -9,5 +9,5 @@ import { type ProverNode } from './prover-node.js';
* @returns An JSON-RPC HTTP server
*/
export function createProverNodeRpcServer(node: ProverNode) {
return createSafeJsonRpcServer(node, ProverNodeApiSchema);
return createTracedJsonRpcServer(node, ProverNodeApiSchema);
}
4 changes: 2 additions & 2 deletions yarn-project/prover-node/src/prover-coordination/factory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import { type EpochCache } from '@aztec/epoch-cache';
import { createLogger } from '@aztec/foundation/log';
import { type DataStoreConfig } from '@aztec/kv-store/config';
import { createP2PClient } from '@aztec/p2p';
import { type TelemetryClient } from '@aztec/telemetry-client';
import { type TelemetryClient, makeTracedFetch } from '@aztec/telemetry-client';

import { type ProverNodeConfig } from '../config.js';

Expand Down Expand Up @@ -64,7 +64,7 @@ export async function createProverCoordination(

if (config.proverCoordinationNodeUrl) {
log.info('Using prover coordination via node url');
return createAztecNodeClient(config.proverCoordinationNodeUrl);
return createAztecNodeClient(config.proverCoordinationNodeUrl, makeTracedFetch([1, 2, 3], false));
} else {
throw new Error(`Aztec Node URL for Tx Provider is not set.`);
}
Expand Down
Loading

0 comments on commit 85ccc15

Please sign in to comment.