Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(reqresp): send status messages along with reqresp responses #11727

Merged
merged 5 commits into from
Feb 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions yarn-project/p2p/src/services/reqresp/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ import { Fr } from '@aztec/foundation/fields';

import { type PeerId } from '@libp2p/interface';

import { type ReqRespStatus } from './status.js';

/*
* Request Response Sub Protocols
*/
Expand Down Expand Up @@ -31,6 +33,15 @@ export type ReqRespSubProtocolHandler = (peerId: PeerId, msg: Buffer) => Promise
*/
export type ReqRespSubProtocolRateLimits = Record<ReqRespSubProtocol, ProtocolRateLimitQuota>;

/**
* The response from the ReqResp protocol
* Consists of a status (Error code) and data
*/
export interface ReqRespResponse {
status: ReqRespStatus;
data: Buffer;
}

/**
* A rate limit quota
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ describe('rate limiter', () => {
expect(rateLimiter.allow(ReqRespSubProtocol.TX, peerId)).toBe(false);

// Spy on the peer manager and check that penalizePeer is called
expect(peerScoring.penalizePeer).toHaveBeenCalledWith(peerId, PeerErrorSeverity.MidToleranceError);
expect(peerScoring.penalizePeer).toHaveBeenCalledWith(peerId, PeerErrorSeverity.HighToleranceError);
});

it('Should allow requests within the global limit', () => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,9 +200,11 @@ export class RequestResponseRateLimiter {

switch (rateLimitStatus) {
case RateLimitStatus.DeniedPeer:
this.peerScoring.penalizePeer(peerId, PeerErrorSeverity.MidToleranceError);
// Hitting a peer specific limit, we should lightly penalise the peer
this.peerScoring.penalizePeer(peerId, PeerErrorSeverity.HighToleranceError);
return false;
case RateLimitStatus.DeniedGlobal:
// Hitting a global limit, we should not penalise the peer
return false;
default:
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ export const DEFAULT_RATE_LIMITS: ReqRespSubProtocolRateLimits = {
[ReqRespSubProtocol.TX]: {
peerLimit: {
quotaTimeMs: 1000,
quotaCount: 5,
quotaCount: 10,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bumping rate limits here for the meantime!

},
globalLimit: {
quotaTimeMs: 1000,
quotaCount: 10,
quotaCount: 20,
},
},
[ReqRespSubProtocol.BLOCK]: {
Expand Down
24 changes: 22 additions & 2 deletions yarn-project/p2p/src/services/reqresp/reqresp.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import { type PeerScoring } from '../peer-manager/peer_scoring.js';
import { ReqRespSubProtocol, RequestableBuffer } from './interface.js';
import { reqRespBlockHandler } from './protocols/block.js';
import { GoodByeReason, reqGoodbyeHandler } from './protocols/goodbye.js';
import { ReqRespStatus, prettyPrintReqRespStatus } from './status.js';

const PING_REQUEST = RequestableBuffer.fromBuffer(Buffer.from('ping'));

Expand Down Expand Up @@ -126,10 +127,21 @@ describe('ReqResp', () => {
await sleep(500);

// Default rate is set at 1 every 200 ms; so this should fire a few times
const responses = [];
for (let i = 0; i < 10; i++) {
await nodes[0].req.sendRequestToPeer(nodes[1].p2p.peerId, ReqRespSubProtocol.PING, Buffer.from('ping'));
// Response object contains the status (error flags) and data
const response = await nodes[0].req.sendRequestToPeer(
nodes[1].p2p.peerId,
ReqRespSubProtocol.PING,
Buffer.from('ping'),
);
responses.push(response);
}

// Check that one of the responses gets a rate limit response
const rateLimitResponse = responses.find(response => response?.status === ReqRespStatus.RATE_LIMIT_EXCEEDED);
expect(rateLimitResponse).toBeDefined();

// Make sure the error message is logged
const errorMessage = `Rate limit exceeded for ${ReqRespSubProtocol.PING} from ${nodes[0].p2p.peerId.toString()}`;
expect(loggerSpy).toHaveBeenCalledWith(expect.stringContaining(errorMessage));
Expand Down Expand Up @@ -343,7 +355,8 @@ describe('ReqResp', () => {
);

// Expect the response to be a buffer of length 1
expect(response).toEqual(Buffer.from([0x0]));
expect(response?.status).toEqual(ReqRespStatus.SUCCESS);
expect(response?.data).toEqual(Buffer.from([0x0]));
});
});

Expand Down Expand Up @@ -413,6 +426,8 @@ describe('ReqResp', () => {
const batchSize = 12;
nodes = await createNodes(peerScoring, 3);

const requesterLoggerSpy = jest.spyOn((nodes[0].req as any).logger, 'debug');
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not a fan of hooking onto the logger to test for events, but I don't have a better recommendation that doesn't involve extending an EventEmitter and firing events that no one except a test cares about, or having an empty virtual onStuffHappened method that we extend on a test-only version of the subject.


await startNodes(nodes);
await sleep(500);
await connectToPeers(nodes);
Expand All @@ -426,6 +441,11 @@ describe('ReqResp', () => {

const res = await nodes[0].req.sendBatchRequest(ReqRespSubProtocol.PING, requests);
expect(res).toEqual(expectResponses);

// Check that we did detect hitting a rate limit
expect(requesterLoggerSpy).toHaveBeenCalledWith(
expect.stringContaining(`${prettyPrintReqRespStatus(ReqRespStatus.RATE_LIMIT_EXCEEDED)}`),
);
});
});
});
135 changes: 111 additions & 24 deletions yarn-project/p2p/src/services/reqresp/reqresp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,16 @@ import { ConnectionSampler } from './connection-sampler/connection_sampler.js';
import {
DEFAULT_SUB_PROTOCOL_HANDLERS,
DEFAULT_SUB_PROTOCOL_VALIDATORS,
type ReqRespSubProtocol,
type ReqRespResponse,
ReqRespSubProtocol,
type ReqRespSubProtocolHandlers,
type ReqRespSubProtocolValidators,
type SubProtocolMap,
subProtocolMap,
} from './interface.js';
import { ReqRespMetrics } from './metrics.js';
import { RequestResponseRateLimiter } from './rate-limiter/rate_limiter.js';
import { ReqRespStatus, ReqRespStatusError, parseStatusChunk, prettyPrintReqRespStatus } from './status.js';

/**
* The Request Response Service
Expand Down Expand Up @@ -190,10 +192,17 @@ export class ReqResp {
this.logger.trace(`Sending request to peer: ${peer.toString()}`);
const response = await this.sendRequestToPeer(peer, subProtocol, requestBuffer);

if (response && response.status !== ReqRespStatus.SUCCESS) {
this.logger.debug(
`Request to peer ${peer.toString()} failed with status ${prettyPrintReqRespStatus(response.status)}`,
);
continue;
}

// If we get a response, return it, otherwise we iterate onto the next peer
// We do not consider it a success if we have an empty buffer
if (response && response.length > 0) {
const object = subProtocolMap[subProtocol].response.fromBuffer(response);
if (response && response.data.length > 0) {
const object = subProtocolMap[subProtocol].response.fromBuffer(response.data);
// The response validator handles peer punishment within
const isValid = await responseValidator(request, object, peer);
if (!isValid) {
Expand Down Expand Up @@ -311,8 +320,22 @@ export class ReqResp {
for (const index of indices) {
const response = await this.sendRequestToPeer(peer, subProtocol, requestBuffers[index]);

if (response && response.length > 0) {
const object = subProtocolMap[subProtocol].response.fromBuffer(response);
// Check the status of the response buffer
if (response && response.status !== ReqRespStatus.SUCCESS) {
this.logger.debug(
`Request to peer ${peer.toString()} failed with status ${prettyPrintReqRespStatus(
response.status,
)}`,
);

// If we hit a rate limit or some failure, we remove the peer and return the results,
// they will be split among remaining peers and the new sampled peer
batchSampler.removePeerAndReplace(peer);
return { peer, results: peerResults };
}

if (response && response.data.length > 0) {
const object = subProtocolMap[subProtocol].response.fromBuffer(response.data);
const isValid = await responseValidator(requests[index], object, peer);

if (isValid) {
Expand Down Expand Up @@ -394,16 +417,16 @@ export class ReqResp {
peerId: PeerId,
subProtocol: ReqRespSubProtocol,
payload: Buffer,
): Promise<Buffer | undefined> {
): Promise<ReqRespResponse | undefined> {
let stream: Stream | undefined;
try {
this.metrics.recordRequestSent(subProtocol);

stream = await this.connectionSampler.dialProtocol(peerId, subProtocol);

// Open the stream with a timeout
const result = await executeTimeout<Buffer>(
(): Promise<Buffer> => pipe([payload], stream!, this.readMessage.bind(this)),
const result = await executeTimeout<ReqRespResponse>(
(): Promise<ReqRespResponse> => pipe([payload], stream!, this.readMessage.bind(this)),
this.individualRequestTimeoutMs,
() => new IndividualReqRespTimeoutError(),
);
Expand Down Expand Up @@ -447,7 +470,15 @@ export class ReqResp {
* Categorize the error and log it.
*/
private categorizeError(e: any, peerId: PeerId, subProtocol: ReqRespSubProtocol): PeerErrorSeverity | undefined {
// Non pubishable errors
// Non punishable errors - we do not expect a response for goodbye messages
if (subProtocol === ReqRespSubProtocol.GOODBYE) {
this.logger.debug('Error encountered on goodbye sub protocol, no penalty', {
peerId: peerId.toString(),
subProtocol,
});
return undefined;
}

// We do not punish a collective timeout, as the node triggers this interupt, independent of the peer's behaviour
const logTags = {
peerId: peerId.toString(),
Expand Down Expand Up @@ -492,14 +523,45 @@ export class ReqResp {

/**
* Read a message returned from a stream into a single buffer
*
* The message is split into two components
* - The first chunk should contain a control byte, indicating the status of the response see `ReqRespStatus`
* - The second chunk should contain the response data
*/
private async readMessage(source: AsyncIterable<Uint8ArrayList>): Promise<Buffer> {
private async readMessage(source: AsyncIterable<Uint8ArrayList>): Promise<ReqRespResponse> {
let statusBuffer: ReqRespStatus | undefined;
const chunks: Uint8Array[] = [];
for await (const chunk of source) {
chunks.push(chunk.subarray());

try {
for await (const chunk of source) {
if (statusBuffer === undefined) {
const firstChunkBuffer = chunk.subarray();
statusBuffer = parseStatusChunk(firstChunkBuffer);
} else {
chunks.push(chunk.subarray());
}
}

const messageData = Buffer.concat(chunks);
const message: Buffer = this.snappyTransform.inboundTransformNoTopic(messageData);

return {
status: statusBuffer ?? ReqRespStatus.UNKNOWN,
data: message,
};
} catch (e: any) {
this.logger.debug(`Reading message failed: ${e.message}`);

let status = ReqRespStatus.UNKNOWN;
if (e instanceof ReqRespStatusError) {
status = e.status;
}

return {
status,
data: Buffer.from([]),
};
}
const messageData = Buffer.concat(chunks);
return this.snappyTransform.inboundTransformNoTopic(messageData);
}

/**
Expand All @@ -525,25 +587,28 @@ export class ReqResp {
private async streamHandler(protocol: ReqRespSubProtocol, { stream, connection }: IncomingStreamData) {
this.metrics.recordRequestReceived(protocol);

// Store a reference to from this for the async generator
if (!this.rateLimiter.allow(protocol, connection.remotePeer)) {
this.logger.warn(`Rate limit exceeded for ${protocol} from ${connection.remotePeer}`);
try {
// Store a reference to from this for the async generator
if (!this.rateLimiter.allow(protocol, connection.remotePeer)) {
this.logger.warn(`Rate limit exceeded for ${protocol} from ${connection.remotePeer}`);

// TODO(#8483): handle changing peer scoring for failed rate limit, maybe differentiate between global and peer limits here when punishing
await stream.close();
return;
}
throw new ReqRespStatusError(ReqRespStatus.RATE_LIMIT_EXCEEDED);
}

const handler = this.subProtocolHandlers[protocol];
const transform = this.snappyTransform;
const handler = this.subProtocolHandlers[protocol];
const transform = this.snappyTransform;

try {
await pipe(
stream,
async function* (source: any) {
for await (const chunkList of source) {
const msg = Buffer.from(chunkList.subarray());
const response = await handler(connection.remotePeer, msg);

// Send success code first, then the response
const successChunk = Buffer.from([ReqRespStatus.SUCCESS]);
yield new Uint8Array(successChunk);

yield new Uint8Array(transform.outboundTransformNoTopic(response));
}
},
Expand All @@ -552,8 +617,30 @@ export class ReqResp {
} catch (e: any) {
this.logger.warn(e);
this.metrics.recordResponseError(protocol);

// If we receive a known error, we use the error status in the response chunk, otherwise we categorize as unknown
let errorStatus = ReqRespStatus.UNKNOWN;
if (e instanceof ReqRespStatusError) {
errorStatus = e.status;
}

const sendErrorChunk = this.sendErrorChunk(errorStatus);

// Return and yield the response chunk
await pipe(
stream,
async function* (_source: any) {
yield* sendErrorChunk;
},
stream,
);
} finally {
await stream.close();
}
}

private async *sendErrorChunk(error: ReqRespStatus): AsyncIterable<Uint8Array> {
const errorChunk = Buffer.from([error]);
yield new Uint8Array(errorChunk);
}
}
Loading
Loading