Skip to content

Commit

Permalink
feat: batch request response requests across multiple peers
Browse files Browse the repository at this point in the history
  • Loading branch information
Maddiaa0 committed Jan 19, 2025
1 parent 0f6dffd commit bf90bf1
Show file tree
Hide file tree
Showing 6 changed files with 386 additions and 66 deletions.
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
import { describe, expect, it, jest } from '@jest/globals';
import { createSecp256k1PeerId } from '@libp2p/peer-id-factory';
import { Libp2p } from 'libp2p';
import { type Libp2p } from 'libp2p';

import { BatchConnectionSampler } from './batch_connection_sampler.js';
import { ConnectionSampler } from './connection_sampler.js';
import { RandomSampler } from './connection_sampler.js';
import { ConnectionSampler, type RandomSampler } from './connection_sampler.js';

describe('BatchConnectionSampler', () => {
const mockRandomSampler = {
Expand Down Expand Up @@ -34,18 +33,17 @@ describe('BatchConnectionSampler', () => {
await connectionSampler.stop();
});

it('initializes with correct number of peers and request distribution', async () => {
it('initializes with correct number of peers and request distribution', () => {
// Mock random to return sequential indices
mockRandomSampler.random.mockImplementation(_ => 0);

const sampler = new BatchConnectionSampler(connectionSampler, /* batchSize */ 10, /* maxPeers */ 3);

expect(sampler.activePeerCount).toBe(3);
expect(sampler.requestsPerBucket).toBe(3); // floor(10/3) = 3
expect(mockRandomSampler.random).toHaveBeenCalledTimes(3);
});

it('assigns requests to peers deterministically with wraparound', async () => {
it('assigns requests to peers deterministically with wraparound', () => {
// Mock to return first two peers
let callCount = 0;
mockRandomSampler.random.mockImplementation(() => callCount++ % 2);
Expand All @@ -67,10 +65,8 @@ describe('BatchConnectionSampler', () => {
expect(assignments[3]).toBe(peers[1]);
});

it('handles peer removal and replacement', async () => {
let callCount = 0;
mockRandomSampler.random.mockImplementation(max => {
if (callCount < 2) return callCount++; // Return 0, then 1 for initial peers
it('handles peer removal and replacement', () => {
mockRandomSampler.random.mockImplementation(_ => {
return 2; // Return index 2 for replacement peer
});

Expand All @@ -97,10 +93,12 @@ describe('BatchConnectionSampler', () => {
expect(sampler.getPeerForRequest(3)).toBe(peers[1]);
});

it('distributes requests according to documentation example', async () => {
it('distributes requests according to documentation example', () => {
let callCount = 0;
mockRandomSampler.random.mockImplementation(() => {
if (callCount < 3) return callCount++;
if (callCount < 3) {
return callCount++;
}
return 0;
});

Expand Down Expand Up @@ -129,7 +127,19 @@ describe('BatchConnectionSampler', () => {
expect(sampler.getPeerForRequest(8)).toBe(peers[2]);
});

it('handles edge cases', async () => {
it('same number of requests per peers', () => {
let callCount = 0;
mockRandomSampler.random.mockImplementation(() => callCount++ % 2);

const sampler = new BatchConnectionSampler(connectionSampler, /* batchSize */ 2, /* maxPeers */ 2);
expect(sampler.requestsPerBucket).toBe(1);
expect(sampler.activePeerCount).toBe(2);

expect(sampler.getPeerForRequest(0)).toBe(peers[0]);
expect(sampler.getPeerForRequest(1)).toBe(peers[1]);
});

it('handles edge cases, 0 peers, smaller batch than max peers', () => {
mockRandomSampler.random.mockImplementation(() => 0);
libp2p.getPeers.mockReturnValue([]);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { createLogger } from '@aztec/foundation/log';

import { type PeerId } from '@libp2p/interface';

import { ConnectionSampler } from './connection_sampler.js';
import { type ConnectionSampler } from './connection_sampler.js';

/**
* Manages batches of peers for parallel request processing.
Expand All @@ -22,11 +22,14 @@ export class BatchConnectionSampler {
private readonly batch: PeerId[] = [];
private readonly requestsPerPeer: number;

constructor(
private readonly connectionSampler: ConnectionSampler,
private readonly batchSize: number,
private readonly maxPeers: number,
) {
constructor(private readonly connectionSampler: ConnectionSampler, batchSize: number, maxPeers: number) {
if (maxPeers <= 0) {
throw new Error('Max peers cannot be 0');
}
if (batchSize <= 0) {
throw new Error('Batch size cannot be 0');
}

// Calculate how many requests each peer should handle, cannot be 0
this.requestsPerPeer = Math.max(1, Math.floor(batchSize / maxPeers));

Expand All @@ -41,7 +44,9 @@ export class BatchConnectionSampler {
* @returns The peer assigned to handle this request
*/
getPeerForRequest(index: number): PeerId | undefined {
if (this.batch.length === 0) return undefined;
if (this.batch.length === 0) {
return undefined;
}

// Calculate which peer bucket this index belongs to
const peerIndex = Math.floor(index / this.requestsPerPeer) % this.batch.length;
Expand All @@ -56,26 +61,17 @@ export class BatchConnectionSampler {
*/
removePeerAndReplace(peerId: PeerId): void {
const index = this.batch.findIndex(p => p === peerId);
if (index !== -1) {
const newPeer = this.addReplacement();
if (newPeer) {
this.batch[index] = newPeer;
this.logger.trace(`Replaced peer ${peerId} with ${newPeer}`, { peerId, newPeer });
} else {
// If we couldn't get a replacement, remove the peer and compact the array
this.batch.splice(index, 1);
this.logger.trace(`Removed peer ${peerId}`, { peerId });
}
}
}
if (index === -1) return;

/**
* Adds a new peer
*
* @returns The new peer if successful, undefined otherwise
*/
private addReplacement(): PeerId | undefined {
return this.connectionSampler.getPeer();
const newPeer = this.connectionSampler.getPeer();
if (newPeer) {
this.batch[index] = newPeer;
this.logger.trace(`Replaced peer ${peerId} with ${newPeer}`, { peerId, newPeer });
} else {
// If we couldn't get a replacement, remove the peer and compact the array
this.batch.splice(index, 1);
this.logger.trace(`Removed peer ${peerId}`, { peerId });
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,4 +167,95 @@ describe('ConnectionSampler', () => {
expect((sampler as any).streams.size).toBe(0);
});
});

describe('samplePeersBatch', () => {
beforeEach(async () => {
// Create test peers
peers = await Promise.all(new Array(5).fill(0).map(() => createSecp256k1PeerId()));

// Mock libp2p
mockLibp2p = {
getPeers: jest.fn().mockReturnValue(peers),
dialProtocol: jest.fn(),
};

mockRandomSampler = mock<RandomSampler>();
sampler = new ConnectionSampler(mockLibp2p, 1000, mockRandomSampler);
});

it('prioritizes peers without active connections', () => {
// Set up some peers with active connections
sampler['activeConnectionsCount'].set(peers[3], 1);
sampler['activeConnectionsCount'].set(peers[4], 2);

// Sample 3 peers
const sampledPeers = sampler.samplePeersBatch(3);

// Should get peers[0,1,2] first as they have no connections
expect(sampledPeers).toHaveLength(3);
expect(sampledPeers).toContain(peers[0]);
expect(sampledPeers).toContain(peers[1]);
expect(sampledPeers).toContain(peers[2]);
// Should not include peers with active connections when enough peers without connections exist
expect(sampledPeers).not.toContain(peers[3]);
expect(sampledPeers).not.toContain(peers[4]);
});

it('falls back to peers with connections when needed', () => {
// Set up most peers with active connections
sampler['activeConnectionsCount'].set(peers[1], 1);
sampler['activeConnectionsCount'].set(peers[2], 1);
sampler['activeConnectionsCount'].set(peers[3], 1);
sampler['activeConnectionsCount'].set(peers[4], 1);

mockRandomSampler.random.mockReturnValue(0); // Always pick first available peer

const sampledPeers = sampler.samplePeersBatch(3);

// Should get peers[0] first (no connections), then some with connections
expect(sampledPeers).toHaveLength(3);
expect(sampledPeers[0]).toBe(peers[0]); // The only peer without connections
expect(sampledPeers.slice(1)).toEqual(expect.arrayContaining([peers[1]])); // Should include some peers with connections
});

it('handles case when all peers have active connections', () => {
// Set up all peers with active connections
peers.forEach(peer => sampler['activeConnectionsCount'].set(peer, 1));

mockRandomSampler.random.mockReturnValue(0); // Always pick first available peer

const sampledPeers = sampler.samplePeersBatch(3);

expect(sampledPeers).toHaveLength(3);
expect(sampledPeers).toEqual(expect.arrayContaining([peers[0], peers[1], peers[2]]));
});

it('handles case when fewer peers available than requested', () => {
// Mock libp2p to return fewer peers
const fewPeers = peers.slice(0, 2);
mockLibp2p.getPeers.mockReturnValue(fewPeers);

const sampledPeers = sampler.samplePeersBatch(5);

expect(sampledPeers).toHaveLength(2); // Should only return available peers
expect(sampledPeers).toEqual(expect.arrayContaining(fewPeers));
});

it('handles case when no peers available', () => {
mockLibp2p.getPeers.mockReturnValue([]);

const sampledPeers = sampler.samplePeersBatch(3);

expect(sampledPeers).toHaveLength(0);
});

it('returns exactly the number of peers requested when available', () => {
const sampledPeers = sampler.samplePeersBatch(3);

expect(sampledPeers).toHaveLength(3);
// Verify all peers are unique
const uniquePeers = new Set(sampledPeers);
expect(uniquePeers.size).toBe(3);
});
});
});
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { createLogger } from '@aztec/foundation/log';
import { SerialQueue } from '@aztec/foundation/queue';

import { type Libp2p, type PeerId, type Stream } from '@libp2p/interface';

Expand All @@ -24,20 +25,20 @@ export class RandomSampler {
*/
export class ConnectionSampler {
public readonly activeConnectionsCount: Map<PeerId, number> = new Map();

private readonly streams: Map<string, StreamAndPeerId> = new Map();

private readonly logger = createLogger('p2p:reqresp:connection-sampler');
private cleanupInterval?: NodeJS.Timeout;

// Serial queue to ensure that we only dial one peer at a time
private dialQueue: SerialQueue = new SerialQueue();

constructor(
private readonly libp2p: Libp2p,
private readonly cleanupIntervalMs: number = 60000, // Default to 1 minute

// Random sampler provided so that it can be mocked
private readonly sampler: RandomSampler = new RandomSampler(),
private readonly sampler: RandomSampler = new RandomSampler(), // Allow randomness to be mocked for testing
) {
this.startCleanupJob();
this.dialQueue.start();
}

private startCleanupJob() {
Expand All @@ -53,18 +54,18 @@ export class ConnectionSampler {
if (this.cleanupInterval) {
clearInterval(this.cleanupInterval);
}
await this.dialQueue.end();

// Close all active streams
const closePromises = Array.from(this.streams.keys()).map(streamId => this.close(streamId));

await Promise.all(closePromises);
}

getPeer(): PeerId {
const peers = this.libp2p.getPeers();

let randomIndex = this.sampler.random(peers.length);
let attempts = 0;

// If the active connections count is greater than 0, then we already have a connection open
// So we try to sample a different peer, but only MAX_SAMPLE_ATTEMPTS times
while ((this.activeConnectionsCount.get(peers[randomIndex]) ?? 0) > 0 && attempts < MAX_SAMPLE_ATTEMPTS) {
Expand All @@ -80,22 +81,41 @@ export class ConnectionSampler {
}

/**
* Samples a batch of peers from the libp2p node
* Samples a batch of unique peers from the libp2p node, prioritizing peers without active connections
*
* @param maxPeers - The maximum number of peers to sample
* @returns The sampled peers
* @param numberToSample - The number of peers to sample
* @returns Array of unique sampled peers, prioritizing those without active connections
*/
samplePeersBatch(maxPeers: number): PeerId[] {
const peers = [];
for (let i = 0; i < maxPeers; i++) {
const peer = this.getPeer();
// Can be undefined if we have no peers
if (peer) {
peers.push(peer);
samplePeersBatch(numberToSample: number): PeerId[] {
const peers = this.libp2p.getPeers();
const sampledPeers: PeerId[] = [];
const peersWithConnections: PeerId[] = []; // Hold onto peers with active connections incase we need to sample more

for (const peer of peers) {
const activeConnections = this.activeConnectionsCount.get(peer) ?? 0;
if (activeConnections === 0) {
if (sampledPeers.push(peer) === numberToSample) {
return sampledPeers;
}
} else {
peersWithConnections.push(peer);
}
}
this.logger.trace(`Batch sampled ${peers.length} peers`, { peers });
return peers;

// If we still need more peers, sample from those with connections
while (sampledPeers.length < numberToSample && peersWithConnections.length > 0) {
const randomIndex = this.sampler.random(peersWithConnections.length);
const [peer] = peersWithConnections.splice(randomIndex, 1);
sampledPeers.push(peer);
}

this.logger.trace(`Batch sampled ${sampledPeers.length} unique peers`, {
peers: sampledPeers,
withoutConnections: sampledPeers.length - peersWithConnections.length,
withConnections: peersWithConnections.length,
});

return sampledPeers;
}

// Set of passthrough functions to keep track of active connections
Expand All @@ -108,9 +128,11 @@ export class ConnectionSampler {
* @returns The stream
*/
async dialProtocol(peerId: PeerId, protocol: string): Promise<Stream> {
const stream = await this.libp2p.dialProtocol(peerId, protocol);
this.streams.set(stream.id, { stream, peerId });
// Dialling at the same time can cause race conditions where two different streams
// end up with the same id, hence a serial queue
const stream = await this.dialQueue.put(() => this.libp2p.dialProtocol(peerId, protocol));

this.streams.set(stream.id, { stream, peerId });
const updatedActiveConnectionsCount = (this.activeConnectionsCount.get(peerId) ?? 0) + 1;
this.activeConnectionsCount.set(peerId, updatedActiveConnectionsCount);

Expand Down
Loading

0 comments on commit bf90bf1

Please sign in to comment.