Skip to content

Commit

Permalink
feat(reqresp): send status messages along with reqresp responses (#11727
Browse files Browse the repository at this point in the history
)
  • Loading branch information
Maddiaa0 authored Feb 6, 2025
1 parent f2f2634 commit b212490
Show file tree
Hide file tree
Showing 7 changed files with 209 additions and 30 deletions.
11 changes: 11 additions & 0 deletions yarn-project/p2p/src/services/reqresp/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ import { Fr } from '@aztec/foundation/fields';

import { type PeerId } from '@libp2p/interface';

import { type ReqRespStatus } from './status.js';

/*
* Request Response Sub Protocols
*/
Expand Down Expand Up @@ -31,6 +33,15 @@ export type ReqRespSubProtocolHandler = (peerId: PeerId, msg: Buffer) => Promise
*/
export type ReqRespSubProtocolRateLimits = Record<ReqRespSubProtocol, ProtocolRateLimitQuota>;

/**
* The response from the ReqResp protocol
* Consists of a status (Error code) and data
*/
export interface ReqRespResponse {
status: ReqRespStatus;
data: Buffer;
}

/**
* A rate limit quota
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ describe('rate limiter', () => {
expect(rateLimiter.allow(ReqRespSubProtocol.TX, peerId)).toBe(false);

// Spy on the peer manager and check that penalizePeer is called
expect(peerScoring.penalizePeer).toHaveBeenCalledWith(peerId, PeerErrorSeverity.MidToleranceError);
expect(peerScoring.penalizePeer).toHaveBeenCalledWith(peerId, PeerErrorSeverity.HighToleranceError);
});

it('Should allow requests within the global limit', () => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,9 +200,11 @@ export class RequestResponseRateLimiter {

switch (rateLimitStatus) {
case RateLimitStatus.DeniedPeer:
this.peerScoring.penalizePeer(peerId, PeerErrorSeverity.MidToleranceError);
// Hitting a peer specific limit, we should lightly penalise the peer
this.peerScoring.penalizePeer(peerId, PeerErrorSeverity.HighToleranceError);
return false;
case RateLimitStatus.DeniedGlobal:
// Hitting a global limit, we should not penalise the peer
return false;
default:
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ export const DEFAULT_RATE_LIMITS: ReqRespSubProtocolRateLimits = {
[ReqRespSubProtocol.TX]: {
peerLimit: {
quotaTimeMs: 1000,
quotaCount: 5,
quotaCount: 10,
},
globalLimit: {
quotaTimeMs: 1000,
quotaCount: 10,
quotaCount: 20,
},
},
[ReqRespSubProtocol.BLOCK]: {
Expand Down
24 changes: 22 additions & 2 deletions yarn-project/p2p/src/services/reqresp/reqresp.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import { type PeerScoring } from '../peer-manager/peer_scoring.js';
import { ReqRespSubProtocol, RequestableBuffer } from './interface.js';
import { reqRespBlockHandler } from './protocols/block.js';
import { GoodByeReason, reqGoodbyeHandler } from './protocols/goodbye.js';
import { ReqRespStatus, prettyPrintReqRespStatus } from './status.js';

const PING_REQUEST = RequestableBuffer.fromBuffer(Buffer.from('ping'));

Expand Down Expand Up @@ -126,10 +127,21 @@ describe('ReqResp', () => {
await sleep(500);

// Default rate is set at 1 every 200 ms; so this should fire a few times
const responses = [];
for (let i = 0; i < 10; i++) {
await nodes[0].req.sendRequestToPeer(nodes[1].p2p.peerId, ReqRespSubProtocol.PING, Buffer.from('ping'));
// Response object contains the status (error flags) and data
const response = await nodes[0].req.sendRequestToPeer(
nodes[1].p2p.peerId,
ReqRespSubProtocol.PING,
Buffer.from('ping'),
);
responses.push(response);
}

// Check that one of the responses gets a rate limit response
const rateLimitResponse = responses.find(response => response?.status === ReqRespStatus.RATE_LIMIT_EXCEEDED);
expect(rateLimitResponse).toBeDefined();

// Make sure the error message is logged
const errorMessage = `Rate limit exceeded for ${ReqRespSubProtocol.PING} from ${nodes[0].p2p.peerId.toString()}`;
expect(loggerSpy).toHaveBeenCalledWith(expect.stringContaining(errorMessage));
Expand Down Expand Up @@ -343,7 +355,8 @@ describe('ReqResp', () => {
);

// Expect the response to be a buffer of length 1
expect(response).toEqual(Buffer.from([0x0]));
expect(response?.status).toEqual(ReqRespStatus.SUCCESS);
expect(response?.data).toEqual(Buffer.from([0x0]));
});
});

Expand Down Expand Up @@ -413,6 +426,8 @@ describe('ReqResp', () => {
const batchSize = 12;
nodes = await createNodes(peerScoring, 3);

const requesterLoggerSpy = jest.spyOn((nodes[0].req as any).logger, 'debug');

await startNodes(nodes);
await sleep(500);
await connectToPeers(nodes);
Expand All @@ -426,6 +441,11 @@ describe('ReqResp', () => {

const res = await nodes[0].req.sendBatchRequest(ReqRespSubProtocol.PING, requests);
expect(res).toEqual(expectResponses);

// Check that we did detect hitting a rate limit
expect(requesterLoggerSpy).toHaveBeenCalledWith(
expect.stringContaining(`${prettyPrintReqRespStatus(ReqRespStatus.RATE_LIMIT_EXCEEDED)}`),
);
});
});
});
135 changes: 111 additions & 24 deletions yarn-project/p2p/src/services/reqresp/reqresp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,16 @@ import { ConnectionSampler } from './connection-sampler/connection_sampler.js';
import {
DEFAULT_SUB_PROTOCOL_HANDLERS,
DEFAULT_SUB_PROTOCOL_VALIDATORS,
type ReqRespSubProtocol,
type ReqRespResponse,
ReqRespSubProtocol,
type ReqRespSubProtocolHandlers,
type ReqRespSubProtocolValidators,
type SubProtocolMap,
subProtocolMap,
} from './interface.js';
import { ReqRespMetrics } from './metrics.js';
import { RequestResponseRateLimiter } from './rate-limiter/rate_limiter.js';
import { ReqRespStatus, ReqRespStatusError, parseStatusChunk, prettyPrintReqRespStatus } from './status.js';

/**
* The Request Response Service
Expand Down Expand Up @@ -190,10 +192,17 @@ export class ReqResp {
this.logger.trace(`Sending request to peer: ${peer.toString()}`);
const response = await this.sendRequestToPeer(peer, subProtocol, requestBuffer);

if (response && response.status !== ReqRespStatus.SUCCESS) {
this.logger.debug(
`Request to peer ${peer.toString()} failed with status ${prettyPrintReqRespStatus(response.status)}`,
);
continue;
}

// If we get a response, return it, otherwise we iterate onto the next peer
// We do not consider it a success if we have an empty buffer
if (response && response.length > 0) {
const object = subProtocolMap[subProtocol].response.fromBuffer(response);
if (response && response.data.length > 0) {
const object = subProtocolMap[subProtocol].response.fromBuffer(response.data);
// The response validator handles peer punishment within
const isValid = await responseValidator(request, object, peer);
if (!isValid) {
Expand Down Expand Up @@ -311,8 +320,22 @@ export class ReqResp {
for (const index of indices) {
const response = await this.sendRequestToPeer(peer, subProtocol, requestBuffers[index]);

if (response && response.length > 0) {
const object = subProtocolMap[subProtocol].response.fromBuffer(response);
// Check the status of the response buffer
if (response && response.status !== ReqRespStatus.SUCCESS) {
this.logger.debug(
`Request to peer ${peer.toString()} failed with status ${prettyPrintReqRespStatus(
response.status,
)}`,
);

// If we hit a rate limit or some failure, we remove the peer and return the results,
// they will be split among remaining peers and the new sampled peer
batchSampler.removePeerAndReplace(peer);
return { peer, results: peerResults };
}

if (response && response.data.length > 0) {
const object = subProtocolMap[subProtocol].response.fromBuffer(response.data);
const isValid = await responseValidator(requests[index], object, peer);

if (isValid) {
Expand Down Expand Up @@ -394,16 +417,16 @@ export class ReqResp {
peerId: PeerId,
subProtocol: ReqRespSubProtocol,
payload: Buffer,
): Promise<Buffer | undefined> {
): Promise<ReqRespResponse | undefined> {
let stream: Stream | undefined;
try {
this.metrics.recordRequestSent(subProtocol);

stream = await this.connectionSampler.dialProtocol(peerId, subProtocol);

// Open the stream with a timeout
const result = await executeTimeout<Buffer>(
(): Promise<Buffer> => pipe([payload], stream!, this.readMessage.bind(this)),
const result = await executeTimeout<ReqRespResponse>(
(): Promise<ReqRespResponse> => pipe([payload], stream!, this.readMessage.bind(this)),
this.individualRequestTimeoutMs,
() => new IndividualReqRespTimeoutError(),
);
Expand Down Expand Up @@ -447,7 +470,15 @@ export class ReqResp {
* Categorize the error and log it.
*/
private categorizeError(e: any, peerId: PeerId, subProtocol: ReqRespSubProtocol): PeerErrorSeverity | undefined {
// Non pubishable errors
// Non punishable errors - we do not expect a response for goodbye messages
if (subProtocol === ReqRespSubProtocol.GOODBYE) {
this.logger.debug('Error encountered on goodbye sub protocol, no penalty', {
peerId: peerId.toString(),
subProtocol,
});
return undefined;
}

// We do not punish a collective timeout, as the node triggers this interupt, independent of the peer's behaviour
const logTags = {
peerId: peerId.toString(),
Expand Down Expand Up @@ -492,14 +523,45 @@ export class ReqResp {

/**
* Read a message returned from a stream into a single buffer
*
* The message is split into two components
* - The first chunk should contain a control byte, indicating the status of the response see `ReqRespStatus`
* - The second chunk should contain the response data
*/
private async readMessage(source: AsyncIterable<Uint8ArrayList>): Promise<Buffer> {
private async readMessage(source: AsyncIterable<Uint8ArrayList>): Promise<ReqRespResponse> {
let statusBuffer: ReqRespStatus | undefined;
const chunks: Uint8Array[] = [];
for await (const chunk of source) {
chunks.push(chunk.subarray());

try {
for await (const chunk of source) {
if (statusBuffer === undefined) {
const firstChunkBuffer = chunk.subarray();
statusBuffer = parseStatusChunk(firstChunkBuffer);
} else {
chunks.push(chunk.subarray());
}
}

const messageData = Buffer.concat(chunks);
const message: Buffer = this.snappyTransform.inboundTransformNoTopic(messageData);

return {
status: statusBuffer ?? ReqRespStatus.UNKNOWN,
data: message,
};
} catch (e: any) {
this.logger.debug(`Reading message failed: ${e.message}`);

let status = ReqRespStatus.UNKNOWN;
if (e instanceof ReqRespStatusError) {
status = e.status;
}

return {
status,
data: Buffer.from([]),
};
}
const messageData = Buffer.concat(chunks);
return this.snappyTransform.inboundTransformNoTopic(messageData);
}

/**
Expand All @@ -525,25 +587,28 @@ export class ReqResp {
private async streamHandler(protocol: ReqRespSubProtocol, { stream, connection }: IncomingStreamData) {
this.metrics.recordRequestReceived(protocol);

// Store a reference to from this for the async generator
if (!this.rateLimiter.allow(protocol, connection.remotePeer)) {
this.logger.warn(`Rate limit exceeded for ${protocol} from ${connection.remotePeer}`);
try {
// Store a reference to from this for the async generator
if (!this.rateLimiter.allow(protocol, connection.remotePeer)) {
this.logger.warn(`Rate limit exceeded for ${protocol} from ${connection.remotePeer}`);

// TODO(#8483): handle changing peer scoring for failed rate limit, maybe differentiate between global and peer limits here when punishing
await stream.close();
return;
}
throw new ReqRespStatusError(ReqRespStatus.RATE_LIMIT_EXCEEDED);
}

const handler = this.subProtocolHandlers[protocol];
const transform = this.snappyTransform;
const handler = this.subProtocolHandlers[protocol];
const transform = this.snappyTransform;

try {
await pipe(
stream,
async function* (source: any) {
for await (const chunkList of source) {
const msg = Buffer.from(chunkList.subarray());
const response = await handler(connection.remotePeer, msg);

// Send success code first, then the response
const successChunk = Buffer.from([ReqRespStatus.SUCCESS]);
yield new Uint8Array(successChunk);

yield new Uint8Array(transform.outboundTransformNoTopic(response));
}
},
Expand All @@ -552,8 +617,30 @@ export class ReqResp {
} catch (e: any) {
this.logger.warn(e);
this.metrics.recordResponseError(protocol);

// If we receive a known error, we use the error status in the response chunk, otherwise we categorize as unknown
let errorStatus = ReqRespStatus.UNKNOWN;
if (e instanceof ReqRespStatusError) {
errorStatus = e.status;
}

const sendErrorChunk = this.sendErrorChunk(errorStatus);

// Return and yield the response chunk
await pipe(
stream,
async function* (_source: any) {
yield* sendErrorChunk;
},
stream,
);
} finally {
await stream.close();
}
}

private async *sendErrorChunk(error: ReqRespStatus): AsyncIterable<Uint8Array> {
const errorChunk = Buffer.from([error]);
yield new Uint8Array(errorChunk);
}
}
Loading

0 comments on commit b212490

Please sign in to comment.