Skip to content

Commit

Permalink
chore: reenable reqresp offline peers test (#11384)
Browse files Browse the repository at this point in the history
  • Loading branch information
Maddiaa0 authored Jan 21, 2025
1 parent 13fed74 commit 931dfa6
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 30 deletions.
6 changes: 1 addition & 5 deletions yarn-project/p2p/src/mocks/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -186,11 +186,7 @@ export const startNodes = async (
};

export const stopNodes = async (nodes: ReqRespNode[]): Promise<void> => {
const stopPromises = [];
for (const node of nodes) {
stopPromises.push(node.req.stop());
stopPromises.push(node.p2p.stop());
}
const stopPromises = nodes.flatMap(node => [node.req.stop(), node.p2p.stop()]);
await Promise.all(stopPromises);
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ export class BatchConnectionSampler {
return;
}

const excluding = new Map([[peerId, true]]);
const excluding = new Map([[peerId.toString(), true]]);
const newPeer = this.connectionSampler.getPeer(excluding);

if (newPeer) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ describe('ConnectionSampler', () => {
let sampler: ConnectionSampler;
let mockLibp2p: any;
let peers: PeerId[];
let excluding: Map<PeerId, boolean>;
let excluding: Map<string, boolean>;
let mockRandomSampler: MockProxy<RandomSampler>;

beforeEach(async () => {
Expand All @@ -20,7 +20,7 @@ describe('ConnectionSampler', () => {

// Mock libp2p
mockLibp2p = {
getPeers: jest.fn().mockReturnValue(peers),
getPeers: jest.fn().mockReturnValue([...peers]),
dialProtocol: jest.fn(),
};

Expand Down Expand Up @@ -73,7 +73,7 @@ describe('ConnectionSampler', () => {
.mockReturnValueOnce(0)
.mockReturnValueOnce(1);

excluding.set(peers[0], true);
excluding.set(peers[0].toString(), true);
const selectedPeer = sampler.getPeer(excluding);
expect(selectedPeer).toBe(peers[1]);
});
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import { createLogger } from '@aztec/foundation/log';
import { SerialQueue } from '@aztec/foundation/queue';
import { RunningPromise } from '@aztec/foundation/running-promise';

import { type Libp2p, type PeerId, type Stream } from '@libp2p/interface';

Expand All @@ -26,7 +25,8 @@ export class RandomSampler {
*/
export class ConnectionSampler {
private readonly logger = createLogger('p2p:reqresp:connection-sampler');
private cleanupJob: RunningPromise;
private cleanupInterval: NodeJS.Timeout;
private abortController: AbortController = new AbortController();

private readonly activeConnectionsCount: Map<PeerId, number> = new Map();
private readonly streams: Map<string, StreamAndPeerId> = new Map();
Expand All @@ -39,8 +39,7 @@ export class ConnectionSampler {
private readonly cleanupIntervalMs: number = 60000, // Default to 1 minute
private readonly sampler: RandomSampler = new RandomSampler(), // Allow randomness to be mocked for testing
) {
this.cleanupJob = new RunningPromise(() => this.cleanupStaleConnections(), this.logger, this.cleanupIntervalMs);
this.cleanupJob.start();
this.cleanupInterval = setInterval(() => void this.cleanupStaleConnections(), this.cleanupIntervalMs);

this.dialQueue.start();
}
Expand All @@ -50,7 +49,9 @@ export class ConnectionSampler {
*/
async stop() {
this.logger.info('Stopping connection sampler');
await this.cleanupJob.stop();
clearInterval(this.cleanupInterval);

this.abortController.abort();
await this.dialQueue.end();

// Close all active streams
Expand All @@ -65,7 +66,8 @@ export class ConnectionSampler {
* This is to prevent sampling with replacement
* @returns
*/
getPeer(excluding?: Map<PeerId, boolean>): PeerId | undefined {
getPeer(excluding?: Map<string, boolean>): PeerId | undefined {
// In libp2p getPeers performs a shallow copy, so this array can be sliced from safetly
const peers = this.libp2p.getPeers();

if (peers.length === 0) {
Expand All @@ -80,8 +82,10 @@ export class ConnectionSampler {
// - either the peer has active connections OR is in the exclusion list
while (
attempts < MAX_SAMPLE_ATTEMPTS &&
((this.activeConnectionsCount.get(peers[randomIndex]) ?? 0) > 0 || (excluding?.get(peers[randomIndex]) ?? false))
((this.activeConnectionsCount.get(peers[randomIndex]) ?? 0) > 0 ||
(excluding?.get(peers[randomIndex]?.toString()) ?? false))
) {
peers.splice(randomIndex, 1);
randomIndex = this.sampler.random(peers.length);
attempts++;
}
Expand Down Expand Up @@ -143,7 +147,9 @@ export class ConnectionSampler {
async dialProtocol(peerId: PeerId, protocol: string): Promise<Stream> {
// Dialling at the same time can cause race conditions where two different streams
// end up with the same id, hence a serial queue
const stream = await this.dialQueue.put(() => this.libp2p.dialProtocol(peerId, protocol));
const stream = await this.dialQueue.put(() =>
this.libp2p.dialProtocol(peerId, protocol, { signal: this.abortController.signal }),
);

this.streams.set(stream.id, { stream, peerId });
const updatedActiveConnectionsCount = (this.activeConnectionsCount.get(peerId) ?? 0) + 1;
Expand Down
16 changes: 10 additions & 6 deletions yarn-project/p2p/src/services/reqresp/reqresp.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ describe('ReqResp', () => {

afterEach(async () => {
if (nodes) {
await stopNodes(nodes as ReqRespNode[]);
await stopNodes(nodes);
}
});

Expand Down Expand Up @@ -74,15 +74,17 @@ describe('ReqResp', () => {
await connectToPeers(nodes);
await sleep(500);

void ponger.stop();
const stopPonger = ponger.stop();

// It should return undefined if it cannot dial the peer
const res = await pinger.sendRequest(ReqRespSubProtocol.PING, PING_REQUEST);

expect(res).toBeUndefined();

await stopPonger;
});

it.skip('should request from a later peer if other peers are offline', async () => {
it('should request from a later peer if other peers are offline', async () => {
nodes = await createNodes(peerScoring, 4);

await startNodes(nodes);
Expand All @@ -91,22 +93,24 @@ describe('ReqResp', () => {
await sleep(500);

// Stop the second middle two nodes
void nodes[1].req.stop();
void nodes[2].req.stop();
const stopNode1 = nodes[1].req.stop();
const stopNode2 = nodes[2].req.stop();

// send from the first node
let res = await nodes[0].req.sendRequest(ReqRespSubProtocol.PING, PING_REQUEST);

if (!res) {
// The peer chosen is randomly selected, and the node above wont respond, so if
// we wait and try again, there will only be one node to chose from
logger.debug('No response from node, retrying');
logger.debug('\n\n\n\n\nNo response from node, retrying\n\n\n\n\n');
await sleep(500);
res = await nodes[0].req.sendRequest(ReqRespSubProtocol.PING, PING_REQUEST);
}

// It will randomly try to connect, then hit the correct node
expect(res?.toBuffer().toString('utf-8')).toEqual('pong');

await Promise.all([stopNode1, stopNode2]);
});

it('should hit a rate limit if too many requests are made in quick succession', async () => {
Expand Down
15 changes: 8 additions & 7 deletions yarn-project/p2p/src/services/reqresp/reqresp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -104,15 +104,15 @@ export class ReqResp {
* Stop the reqresp service
*/
async stop() {
// Unregister all handlers
for (const protocol of Object.keys(this.subProtocolHandlers)) {
await this.libp2p.unhandle(protocol);
}
// Unregister handlers in parallel
const unregisterPromises = Object.keys(this.subProtocolHandlers).map(protocol => this.libp2p.unhandle(protocol));
await Promise.all(unregisterPromises);

// Close all active connections
// Close connection sampler
await this.connectionSampler.stop();
this.logger.debug('ReqResp: Connection sampler stopped');

// Close streams in parallel
const closeStreamPromises = this.libp2p.getConnections().map(connection => connection.close());
await Promise.all(closeStreamPromises);
this.logger.debug('ReqResp: All active streams closed');
Expand Down Expand Up @@ -169,16 +169,17 @@ export class ReqResp {
return undefined;
}

const attemptedPeers: Map<PeerId, boolean> = new Map();
const attemptedPeers: Map<string, boolean> = new Map();
for (let i = 0; i < numberOfPeers; i++) {
// Sample a peer to make a request to
const peer = this.connectionSampler.getPeer(attemptedPeers);
this.logger.trace(`Attempting to send request to peer: ${peer?.toString()}`);
if (!peer) {
this.logger.debug('No peers available to send requests to');
return undefined;
}

attemptedPeers.set(peer, true);
attemptedPeers.set(peer.toString(), true);

this.logger.trace(`Sending request to peer: ${peer.toString()}`);
const response = await this.sendRequestToPeer(peer, subProtocol, requestBuffer);
Expand Down

0 comments on commit 931dfa6

Please sign in to comment.