From 931dfa67bdf074d3b276712b44c3783cf19e3324 Mon Sep 17 00:00:00 2001 From: Maddiaa <47148561+Maddiaa0@users.noreply.github.com> Date: Wed, 22 Jan 2025 04:04:40 +0800 Subject: [PATCH] chore: reenable reqresp offline peers test (#11384) --- yarn-project/p2p/src/mocks/index.ts | 6 +---- .../batch_connection_sampler.ts | 2 +- .../connection_sampler.test.ts | 6 ++--- .../connection-sampler/connection_sampler.ts | 22 ++++++++++++------- .../p2p/src/services/reqresp/reqresp.test.ts | 16 +++++++++----- .../p2p/src/services/reqresp/reqresp.ts | 15 +++++++------ 6 files changed, 37 insertions(+), 30 deletions(-) diff --git a/yarn-project/p2p/src/mocks/index.ts b/yarn-project/p2p/src/mocks/index.ts index 8bebe47f3be..d55e44c9be0 100644 --- a/yarn-project/p2p/src/mocks/index.ts +++ b/yarn-project/p2p/src/mocks/index.ts @@ -186,11 +186,7 @@ export const startNodes = async ( }; export const stopNodes = async (nodes: ReqRespNode[]): Promise => { - const stopPromises = []; - for (const node of nodes) { - stopPromises.push(node.req.stop()); - stopPromises.push(node.p2p.stop()); - } + const stopPromises = nodes.flatMap(node => [node.req.stop(), node.p2p.stop()]); await Promise.all(stopPromises); }; diff --git a/yarn-project/p2p/src/services/reqresp/connection-sampler/batch_connection_sampler.ts b/yarn-project/p2p/src/services/reqresp/connection-sampler/batch_connection_sampler.ts index 665d706a01f..572f75b3c61 100644 --- a/yarn-project/p2p/src/services/reqresp/connection-sampler/batch_connection_sampler.ts +++ b/yarn-project/p2p/src/services/reqresp/connection-sampler/batch_connection_sampler.ts @@ -65,7 +65,7 @@ export class BatchConnectionSampler { return; } - const excluding = new Map([[peerId, true]]); + const excluding = new Map([[peerId.toString(), true]]); const newPeer = this.connectionSampler.getPeer(excluding); if (newPeer) { diff --git a/yarn-project/p2p/src/services/reqresp/connection-sampler/connection_sampler.test.ts b/yarn-project/p2p/src/services/reqresp/connection-sampler/connection_sampler.test.ts index b718c835390..8ecb57c6ab2 100644 --- a/yarn-project/p2p/src/services/reqresp/connection-sampler/connection_sampler.test.ts +++ b/yarn-project/p2p/src/services/reqresp/connection-sampler/connection_sampler.test.ts @@ -11,7 +11,7 @@ describe('ConnectionSampler', () => { let sampler: ConnectionSampler; let mockLibp2p: any; let peers: PeerId[]; - let excluding: Map; + let excluding: Map; let mockRandomSampler: MockProxy; beforeEach(async () => { @@ -20,7 +20,7 @@ describe('ConnectionSampler', () => { // Mock libp2p mockLibp2p = { - getPeers: jest.fn().mockReturnValue(peers), + getPeers: jest.fn().mockReturnValue([...peers]), dialProtocol: jest.fn(), }; @@ -73,7 +73,7 @@ describe('ConnectionSampler', () => { .mockReturnValueOnce(0) .mockReturnValueOnce(1); - excluding.set(peers[0], true); + excluding.set(peers[0].toString(), true); const selectedPeer = sampler.getPeer(excluding); expect(selectedPeer).toBe(peers[1]); }); diff --git a/yarn-project/p2p/src/services/reqresp/connection-sampler/connection_sampler.ts b/yarn-project/p2p/src/services/reqresp/connection-sampler/connection_sampler.ts index 4c18816330a..bc91b23d5dd 100644 --- a/yarn-project/p2p/src/services/reqresp/connection-sampler/connection_sampler.ts +++ b/yarn-project/p2p/src/services/reqresp/connection-sampler/connection_sampler.ts @@ -1,6 +1,5 @@ import { createLogger } from '@aztec/foundation/log'; import { SerialQueue } from '@aztec/foundation/queue'; -import { RunningPromise } from '@aztec/foundation/running-promise'; import { type Libp2p, type PeerId, type Stream } from '@libp2p/interface'; @@ -26,7 +25,8 @@ export class RandomSampler { */ export class ConnectionSampler { private readonly logger = createLogger('p2p:reqresp:connection-sampler'); - private cleanupJob: RunningPromise; + private cleanupInterval: NodeJS.Timeout; + private abortController: AbortController = new AbortController(); private readonly activeConnectionsCount: Map = new Map(); private readonly streams: Map = new Map(); @@ -39,8 +39,7 @@ export class ConnectionSampler { private readonly cleanupIntervalMs: number = 60000, // Default to 1 minute private readonly sampler: RandomSampler = new RandomSampler(), // Allow randomness to be mocked for testing ) { - this.cleanupJob = new RunningPromise(() => this.cleanupStaleConnections(), this.logger, this.cleanupIntervalMs); - this.cleanupJob.start(); + this.cleanupInterval = setInterval(() => void this.cleanupStaleConnections(), this.cleanupIntervalMs); this.dialQueue.start(); } @@ -50,7 +49,9 @@ export class ConnectionSampler { */ async stop() { this.logger.info('Stopping connection sampler'); - await this.cleanupJob.stop(); + clearInterval(this.cleanupInterval); + + this.abortController.abort(); await this.dialQueue.end(); // Close all active streams @@ -65,7 +66,8 @@ export class ConnectionSampler { * This is to prevent sampling with replacement * @returns */ - getPeer(excluding?: Map): PeerId | undefined { + getPeer(excluding?: Map): PeerId | undefined { + // In libp2p getPeers performs a shallow copy, so this array can be sliced from safetly const peers = this.libp2p.getPeers(); if (peers.length === 0) { @@ -80,8 +82,10 @@ export class ConnectionSampler { // - either the peer has active connections OR is in the exclusion list while ( attempts < MAX_SAMPLE_ATTEMPTS && - ((this.activeConnectionsCount.get(peers[randomIndex]) ?? 0) > 0 || (excluding?.get(peers[randomIndex]) ?? false)) + ((this.activeConnectionsCount.get(peers[randomIndex]) ?? 0) > 0 || + (excluding?.get(peers[randomIndex]?.toString()) ?? false)) ) { + peers.splice(randomIndex, 1); randomIndex = this.sampler.random(peers.length); attempts++; } @@ -143,7 +147,9 @@ export class ConnectionSampler { async dialProtocol(peerId: PeerId, protocol: string): Promise { // Dialling at the same time can cause race conditions where two different streams // end up with the same id, hence a serial queue - const stream = await this.dialQueue.put(() => this.libp2p.dialProtocol(peerId, protocol)); + const stream = await this.dialQueue.put(() => + this.libp2p.dialProtocol(peerId, protocol, { signal: this.abortController.signal }), + ); this.streams.set(stream.id, { stream, peerId }); const updatedActiveConnectionsCount = (this.activeConnectionsCount.get(peerId) ?? 0) + 1; diff --git a/yarn-project/p2p/src/services/reqresp/reqresp.test.ts b/yarn-project/p2p/src/services/reqresp/reqresp.test.ts index 1090d2169c5..a6cd9554704 100644 --- a/yarn-project/p2p/src/services/reqresp/reqresp.test.ts +++ b/yarn-project/p2p/src/services/reqresp/reqresp.test.ts @@ -40,7 +40,7 @@ describe('ReqResp', () => { afterEach(async () => { if (nodes) { - await stopNodes(nodes as ReqRespNode[]); + await stopNodes(nodes); } }); @@ -74,15 +74,17 @@ describe('ReqResp', () => { await connectToPeers(nodes); await sleep(500); - void ponger.stop(); + const stopPonger = ponger.stop(); // It should return undefined if it cannot dial the peer const res = await pinger.sendRequest(ReqRespSubProtocol.PING, PING_REQUEST); expect(res).toBeUndefined(); + + await stopPonger; }); - it.skip('should request from a later peer if other peers are offline', async () => { + it('should request from a later peer if other peers are offline', async () => { nodes = await createNodes(peerScoring, 4); await startNodes(nodes); @@ -91,8 +93,8 @@ describe('ReqResp', () => { await sleep(500); // Stop the second middle two nodes - void nodes[1].req.stop(); - void nodes[2].req.stop(); + const stopNode1 = nodes[1].req.stop(); + const stopNode2 = nodes[2].req.stop(); // send from the first node let res = await nodes[0].req.sendRequest(ReqRespSubProtocol.PING, PING_REQUEST); @@ -100,13 +102,15 @@ describe('ReqResp', () => { if (!res) { // The peer chosen is randomly selected, and the node above wont respond, so if // we wait and try again, there will only be one node to chose from - logger.debug('No response from node, retrying'); + logger.debug('\n\n\n\n\nNo response from node, retrying\n\n\n\n\n'); await sleep(500); res = await nodes[0].req.sendRequest(ReqRespSubProtocol.PING, PING_REQUEST); } // It will randomly try to connect, then hit the correct node expect(res?.toBuffer().toString('utf-8')).toEqual('pong'); + + await Promise.all([stopNode1, stopNode2]); }); it('should hit a rate limit if too many requests are made in quick succession', async () => { diff --git a/yarn-project/p2p/src/services/reqresp/reqresp.ts b/yarn-project/p2p/src/services/reqresp/reqresp.ts index ff2f01195cd..0f428493c4e 100644 --- a/yarn-project/p2p/src/services/reqresp/reqresp.ts +++ b/yarn-project/p2p/src/services/reqresp/reqresp.ts @@ -104,15 +104,15 @@ export class ReqResp { * Stop the reqresp service */ async stop() { - // Unregister all handlers - for (const protocol of Object.keys(this.subProtocolHandlers)) { - await this.libp2p.unhandle(protocol); - } + // Unregister handlers in parallel + const unregisterPromises = Object.keys(this.subProtocolHandlers).map(protocol => this.libp2p.unhandle(protocol)); + await Promise.all(unregisterPromises); - // Close all active connections + // Close connection sampler await this.connectionSampler.stop(); this.logger.debug('ReqResp: Connection sampler stopped'); + // Close streams in parallel const closeStreamPromises = this.libp2p.getConnections().map(connection => connection.close()); await Promise.all(closeStreamPromises); this.logger.debug('ReqResp: All active streams closed'); @@ -169,16 +169,17 @@ export class ReqResp { return undefined; } - const attemptedPeers: Map = new Map(); + const attemptedPeers: Map = new Map(); for (let i = 0; i < numberOfPeers; i++) { // Sample a peer to make a request to const peer = this.connectionSampler.getPeer(attemptedPeers); + this.logger.trace(`Attempting to send request to peer: ${peer?.toString()}`); if (!peer) { this.logger.debug('No peers available to send requests to'); return undefined; } - attemptedPeers.set(peer, true); + attemptedPeers.set(peer.toString(), true); this.logger.trace(`Sending request to peer: ${peer.toString()}`); const response = await this.sendRequestToPeer(peer, subProtocol, requestBuffer);