diff --git a/packages/beacon-node/src/network/network.ts b/packages/beacon-node/src/network/network.ts index f4b57fe0c658..34f34e0a225a 100644 --- a/packages/beacon-node/src/network/network.ts +++ b/packages/beacon-node/src/network/network.ts @@ -219,6 +219,8 @@ export class Network implements INetwork { this.aggregatorTracker.addAggregator(subscription.subnet, subscription.slot); } } + this.aggregatorTracker.prune(); + return this.core.prepareBeaconCommitteeSubnets(subscriptions); } diff --git a/packages/beacon-node/src/network/processor/aggregatorTracker.ts b/packages/beacon-node/src/network/processor/aggregatorTracker.ts index 0e50f0e7b8ac..634e809ae61f 100644 --- a/packages/beacon-node/src/network/processor/aggregatorTracker.ts +++ b/packages/beacon-node/src/network/processor/aggregatorTracker.ts @@ -14,13 +14,27 @@ const MAX_SLOTS_CACHED = SLOTS_PER_EPOCH * 2; export class AggregatorTracker { private subnetAggregatorsBySlot = new MapDef>(() => new Set()); + get maxSlotsCached(): number { + return MAX_SLOTS_CACHED; + } + addAggregator(subnet: SubnetId, slot: Slot): void { this.subnetAggregatorsBySlot.getOrDefault(slot).add(subnet); - - pruneSetToMax(this.subnetAggregatorsBySlot, MAX_SLOTS_CACHED); } shouldAggregate(subnet: SubnetId, slot: Slot): boolean { return this.subnetAggregatorsBySlot.get(slot)?.has(subnet) === true; } + + prune(): void { + // We could also `pruneBySlot` as items before current slot are no longer + // relevant but due to small cache size (64), the best approach is to + // just prune the cache after a batch of subnet subscriptions is processed. + pruneSetToMax( + this.subnetAggregatorsBySlot, + MAX_SLOTS_CACHED, + // Prune the oldest slots first + (a, b) => a - b + ); + } } diff --git a/packages/beacon-node/test/unit/network/processor/aggregatorTracker.test.ts b/packages/beacon-node/test/unit/network/processor/aggregatorTracker.test.ts new file mode 100644 index 000000000000..da907fd737e9 --- /dev/null +++ b/packages/beacon-node/test/unit/network/processor/aggregatorTracker.test.ts @@ -0,0 +1,54 @@ +import {describe, it, expect, beforeEach} from "vitest"; +import {AggregatorTracker} from "../../../../src/network/processor/aggregatorTracker.js"; + +describe("AggregatorTracker", () => { + let aggregatorTracker: AggregatorTracker; + + beforeEach(() => { + aggregatorTracker = new AggregatorTracker(); + }); + + it("should keep track of aggregator for subnet / slot", () => { + const subnet = 1; + const slot = 1; + + aggregatorTracker.addAggregator(subnet, slot); + + expect(aggregatorTracker.shouldAggregate(subnet, slot)).toBe(true); + }); + + it("should prune the oldest slots first when maximum cache size is reached", () => { + const {maxSlotsCached} = aggregatorTracker; + const firstSlot = 0; + const lastSlot = firstSlot + maxSlotsCached - 1; + const subnet = 1; + const slots = Array.from({length: maxSlotsCached}, (_, i) => firstSlot + i); + + // Slots should be inserted in random order + for (let i = slots.length - 1; i > 0; i--) { + const j = Math.floor(Math.random() * (i + 1)); + [slots[i], slots[j]] = [slots[j], slots[i]]; + } + + // Fill up the cache to its maximum size + for (const slot of slots) { + aggregatorTracker.addAggregator(subnet, slot); + } + + // This should prune the first two slots + aggregatorTracker.addAggregator(subnet, lastSlot + 1); + aggregatorTracker.addAggregator(subnet, lastSlot + 2); + aggregatorTracker.prune(); + + expect(aggregatorTracker.shouldAggregate(subnet, firstSlot)).toBe(false); + expect(aggregatorTracker.shouldAggregate(subnet, firstSlot + 1)).toBe(false); + + // Verify that all other slots are still available + for (let slot = firstSlot + 2; slot <= lastSlot + 2; slot++) { + expect(aggregatorTracker.shouldAggregate(subnet, slot)).toBeWithMessage( + true, + `expected aggregator for slot ${slot}` + ); + } + }); +}); diff --git a/packages/utils/src/map.ts b/packages/utils/src/map.ts index 15ac012e528c..8a936bc6b1b2 100644 --- a/packages/utils/src/map.ts +++ b/packages/utils/src/map.ts @@ -82,13 +82,20 @@ export class Map2dArr { /** * Prune an arbitrary set removing the first keys to have a set.size === maxItems. * Returns the count of deleted items. + * + * Keys can be sorted by `compareFn` to get more control over which items to prune first */ -export function pruneSetToMax(set: Set | Map, maxItems: number): number { +export function pruneSetToMax( + set: Set | Map, + maxItems: number, + compareFn?: (a: T, b: T) => number +): number { let itemsToDelete = set.size - maxItems; const deletedItems = Math.max(0, itemsToDelete); if (itemsToDelete > 0) { - for (const key of set.keys()) { + const keys = compareFn ? Array.from(set.keys()).sort(compareFn) : set.keys(); + for (const key of keys) { set.delete(key); itemsToDelete--; if (itemsToDelete <= 0) {