Skip to content

Commit

Permalink
fix: beaconBlocksMaybeBlobsByRoot() to handle single block root
Browse files Browse the repository at this point in the history
  • Loading branch information
twoeths committed Feb 14, 2025
1 parent 3e0a08f commit a73d81d
Show file tree
Hide file tree
Showing 2 changed files with 123 additions and 78 deletions.
197 changes: 121 additions & 76 deletions packages/beacon-node/src/network/reqresp/beaconBlocksMaybeBlobsByRoot.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import {fromHexString, toHexString} from "@chainsafe/ssz";
import {ChainForkConfig} from "@lodestar/config";
import {ForkName, ForkSeq} from "@lodestar/params";
import {signedBlockToSignedHeader} from "@lodestar/state-transition";
import {RootHex, SignedBeaconBlock, deneb, fulu, phase0, ssz} from "@lodestar/types";
import {Root, RootHex, SignedBeaconBlock, deneb, fulu, phase0, ssz} from "@lodestar/types";
import {BlobAndProof} from "@lodestar/types/deneb";
import {fromHex} from "@lodestar/utils";
import {Logger} from "@lodestar/utils";
Expand All @@ -14,14 +14,15 @@ import {
BlockInputType,
BlockSource,
CachedBlobs,
CachedData,
CachedDataColumns,
DataColumnsSource,
NullBlockInput,
getBlockInput,
getBlockInputBlobs,
getBlockInputDataColumns,
} from "../../chain/blocks/types.js";
import {BlockInputAvailabilitySource} from "../../chain/seenCache/seenGossipBlockInput.js";
import {BlockInputAvailabilitySource, getEmptyBlockInputCacheEntry} from "../../chain/seenCache/seenGossipBlockInput.js";
import {IExecutionEngine} from "../../execution/index.js";
import {Metrics} from "../../metrics/index.js";
import {computeInclusionProof, kzgCommitmentToVersionedHash} from "../../util/blobs.js";
Expand All @@ -33,21 +34,29 @@ import {PartialDownload, matchBlockWithBlobs, matchBlockWithDataColumns} from ".
const MAX_ENGINE_GETBLOBS_CACHE = 32 * 16;
const MAX_UNAVAILABLE_RETRY_CACHE = 32;

/**
* Given a block root, fetch all block along with its data (blobs or data columns) from a peer
* - for deneb and electra, fetch blobs in 1 round
* - for fulu, fetch data columns in multiple rounds
* - round 0: only have the root, partialDownload = null
* - round 1 onwards: partialDownload contains the block and pending data columns
*/
export async function beaconBlocksMaybeBlobsByRoot(
config: ChainForkConfig,
network: INetwork,
peerId: PeerIdStr,
request: phase0.BeaconBlocksByRootRequest,
root: Root,
partialDownload: null | PartialDownload,
peerClient: string,
logger?: Logger
): Promise<{blocks: BlockInput[]; pendingDataColumns: null | number[]}> {
// console.log("beaconBlocksMaybeBlobsByRoot", request);
const allBlocks = partialDownload
): Promise<{block: BlockInput; pendingDataColumns: null | number[]}> {
const [block] = partialDownload
? partialDownload.blocks.map((blockInput) => ({data: blockInput.block}))
: await network.sendBeaconBlocksByRoot(peerId, request);
: await network.sendBeaconBlocksByRoot(peerId, [root]);

logger?.debug("beaconBlocksMaybeBlobsByRoot response", {allBlocks: allBlocks.length, peerClient});
if (partialDownload !== null) {
logger?.debug("beaconBlocksMaybeBlobsByRoot response", {slot: block.data.message.slot, peerClient});
}

const preDataBlocks = [];
const blobsDataBlocks = [];
Expand All @@ -69,41 +78,35 @@ export async function beaconBlocksMaybeBlobsByRoot(
const blobIdentifiers: deneb.BlobIdentifier[] = [];
const dataColumnIdentifiers: fulu.DataColumnIdentifier[] = [];

let prevFork = null;
for (const block of allBlocks) {
const slot = block.data.message.slot;
const blockRoot = config.getForkTypes(slot).BeaconBlock.hashTreeRoot(block.data.message);
const fork = config.getForkName(slot);
if (fork !== (prevFork ?? fork)) {
throw Error("beaconBlocksMaybeBlobsByRoot only accepts requests of same fork");
const slot = block.data.message.slot;
const blockRoot = config.getForkTypes(slot).BeaconBlock.hashTreeRoot(block.data.message);
const fork = config.getForkName(slot);

if (ForkSeq[fork] < ForkSeq.deneb) {
preDataBlocks.push(block);
} else if (fork === ForkName.deneb || fork === ForkName.electra) {
blobsDataBlocks.push(block);
const blobKzgCommitmentsLen = (block.data.message.body as deneb.BeaconBlockBody).blobKzgCommitments.length;
logger?.debug("beaconBlocksMaybeBlobsByRoot", {blobKzgCommitmentsLen, peerClient});
for (let index = 0; index < blobKzgCommitmentsLen; index++) {
// try see if the blob is available locally
blobIdentifiers.push({blockRoot, index});
}
prevFork = fork;

if (ForkSeq[fork] < ForkSeq.deneb) {
preDataBlocks.push(block);
} else if (fork === ForkName.deneb || fork === ForkName.electra) {
blobsDataBlocks.push(block);
const blobKzgCommitmentsLen = (block.data.message.body as deneb.BeaconBlockBody).blobKzgCommitments.length;
logger?.debug("beaconBlocksMaybeBlobsByRoot", {blobKzgCommitmentsLen, peerClient});
for (let index = 0; index < blobKzgCommitmentsLen; index++) {
// try see if the blob is available locally
blobIdentifiers.push({blockRoot, index});
}
} else if (fork === ForkName.fulu) {
dataColumnsDataBlocks.push(block);
const blobKzgCommitmentsLen = (block.data.message.body as deneb.BeaconBlockBody).blobKzgCommitments.length;
const custodyColumnIndexes = blobKzgCommitmentsLen > 0 ? columns : [];
for (const columnIndex of custodyColumnIndexes) {
dataColumnIdentifiers.push({blockRoot, index: columnIndex});
}
} else {
throw Error(`Invalid fork=${fork} in beaconBlocksMaybeBlobsByRoot`);
} else if (fork === ForkName.fulu) {
dataColumnsDataBlocks.push(block);
const blobKzgCommitmentsLen = (block.data.message.body as deneb.BeaconBlockBody).blobKzgCommitments.length;
const custodyColumnIndexes = blobKzgCommitmentsLen > 0 ? columns : [];
for (const columnIndex of custodyColumnIndexes) {
dataColumnIdentifiers.push({blockRoot, index: columnIndex});
}
} else {
throw Error(`Invalid fork=${fork} in beaconBlocksMaybeBlobsByRoot`);
}

let blockInputs = preDataBlocks.map((block) => getBlockInput.preData(config, block.data, BlockSource.byRoot));
let blockInput = getBlockInput.preData(config, block.data, BlockSource.byRoot);

if (blobsDataBlocks.length > 0) {
// deneb and electra
let allBlobSidecars: deneb.BlobSidecar[];
if (blobIdentifiers.length > 0) {
allBlobSidecars = await network.sendBlobSidecarsByRoot(peerId, blobIdentifiers);
Expand All @@ -115,65 +118,111 @@ export async function beaconBlocksMaybeBlobsByRoot(
// and here it should be infinity since all bobs should match
const blockInputWithBlobs = matchBlockWithBlobs(
config,
allBlocks,
[block],
allBlobSidecars,
Infinity,
BlockSource.byRoot,
BlobsSource.byRoot
);
blockInputs = [...blockInputs, ...blockInputWithBlobs];
if (blockInputWithBlobs.length !== 1) {
throw Error(`Expected exactly one blockInputWithBlobs slot=${slot}`);
}
blockInput = blockInputWithBlobs[0];
}

if (dataColumnsDataBlocks.length > 0) {
pendingDataColumns = neededColumns.reduce((acc, elem) => {
if (!columns.includes(elem)) {
acc.push(elem);
}
return acc;
}, [] as number[]);

let allDataColumnsSidecars: fulu.DataColumnSidecar[];
logger?.debug("allDataColumnsSidecars partialDownload", {
// fulu
let dataColumnSidecars: fulu.DataColumnSidecar[];
logger?.debug("beaconBlocksMaybeBlobsByRoot: dataColumnsSidecars partialDownload", {
...(partialDownload
? {blocks: partialDownload.blocks.length, pendingDataColumns: partialDownload.pendingDataColumns.join(" ")}
: {blocks: null, pendingDataColumns: null}),
dataColumnIdentifiers: dataColumnIdentifiers.map((did) => did.index).join(" "),
peerClient,
});
if (dataColumnIdentifiers.length > 0) {
allDataColumnsSidecars = await network.sendDataColumnSidecarsByRoot(peerId, dataColumnIdentifiers);
dataColumnSidecars = await network.sendDataColumnSidecarsByRoot(peerId, dataColumnIdentifiers);
} else {
// peer doesn't have columns we need, return. Consumer should try another peer
if (partialDownload !== null) {
return partialDownload;
logger?.verbose("beaconBlocksMaybeBlobsByRoot: peer doesn't have columns we need", {slot, peerClient});
return {block: partialDownload.blocks[0], pendingDataColumns};
}
allDataColumnsSidecars = [];
dataColumnSidecars = [];
}

// The last arg is to provide slot to which all blobs should be exausted in matching
// and here it should be infinity since all bobs should match
// TODO: should not call matchBlockWithDataColumns() because it's supposed for range sync
// in that function, peers should return all requested data columns, this function runs at gossip time
// and it should not expect that
const blockInputWithBlobs = matchBlockWithDataColumns(
network,
peerId,
config,
custodyConfig,
columns,
allBlocks,
allDataColumnsSidecars,
Infinity,
BlockSource.byRoot,
DataColumnsSource.byRoot,
partialDownload,
// the same to matchBlockWithDataColumns() without expecting requested data columns = responded data columns
// because at gossip time peer may not have enough column to return
let cachedData: CachedData;
if (partialDownload !== null) {
const prevBlockInput = partialDownload.blocks[0];
if (prevBlockInput == null) {
throw Error("beaconBlocksMaybeBlobsByRoot: prevBlockInput=null in partialDownload");
}

if (prevBlockInput.type !== BlockInputType.dataPromise) {
throw Error(`beaconBlocksMaybeBlobsByRoot: prevBlockInput.type=${prevBlockInput.type} in prevPartialDownload`);
}
cachedData = prevBlockInput.cachedData;
} else {
// biome-ignore lint/style/noNonNullAssertion: checked below for validity
cachedData = getEmptyBlockInputCacheEntry(config.getForkName(block.data.message.slot), -1).cachedData!;
if (cachedData === undefined) {
throw Error("beaconBlocksMaybeBlobsByRoot: Invalid cachedData=undefined from getEmptyBlockInputCacheEntry");
}
}

if (cachedData.fork !== ForkName.fulu) {
throw Error("Invalid fork for cachedData on dataColumns");
}

const dataColumnsCache = (cachedData as CachedDataColumns).dataColumnsCache;
for (const dataColumnSidecar of dataColumnSidecars) {
dataColumnsCache.set(dataColumnSidecar.index, {
dataColumn: dataColumnSidecar,
// TODO: req/resp should return bytes here
dataColumnBytes: null,
});
}

pendingDataColumns = custodyConfig.sampledColumns.reduce((acc, elem) => {
if (dataColumnsCache.get(elem) === undefined) {
acc.push(elem);
}
return acc;
}, [] as number[]);

const logCtx = {
slot: slot,
requestedColumns: columns.join(","),
respondedColumns: dataColumnSidecars.map((dcs) => dcs.index).join(","),
peerClient,
logger
);
blockInputs = [...blockInputs, ...blockInputWithBlobs];
};

if (pendingDataColumns.length === 0) {
const {dataColumns, dataColumnsBytes} = getBlockInputDataColumns(
dataColumnsCache,
custodyConfig.sampledColumns
);

const blockData = {
fork: config.getForkName(slot),
dataColumns,
dataColumnsBytes,
dataColumnsSource: DataColumnsSource.byRoot,
} as BlockInputDataColumns;

logger?.verbose("beaconBlocksMaybeBlobsByRoot: fetched all data columns", logCtx);
blockInput = getBlockInput.availableData(config, block.data, BlockSource.byRoot, blockData);
} else {
// Consumer need to try with another peer
logger?.verbose("beaconBlocksMaybeBlobsByRoot: still missing data columns for block", logCtx);
blockInput = getBlockInput.dataPromise(config, block.data, BlockSource.byRoot, cachedData);
}
}

return {
blocks: blockInputs,
block: blockInput,
pendingDataColumns: pendingDataColumns && pendingDataColumns.length > 0 ? pendingDataColumns : null,
};
}
Expand Down Expand Up @@ -203,10 +252,6 @@ export async function unavailableBeaconBlobsByRoot(
block = allBlocks[0].data;
cachedData = unavailableBlockInput.cachedData;
unavailableBlockInput = getBlockInput.dataPromise(config, block, BlockSource.byRoot, cachedData);
// console.log(
// "downloaded sendBeaconBlocksByRoot",
// ssz.fulu.SignedBeaconBlock.toJson(block as fulu.SignedBeaconBlock)
// );
} else {
({block, cachedData} = unavailableBlockInput);
}
Expand Down
4 changes: 2 additions & 2 deletions packages/beacon-node/src/sync/unknownBlock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -583,13 +583,13 @@ export class UnknownBlockSync {
try {
const peerClient = this.network.getConnectedPeerClientAgent(peer);
const {
blocks: [blockInput],
block: blockInput,
pendingDataColumns,
} = await beaconBlocksMaybeBlobsByRoot(
this.config,
this.network,
peer,
[blockRoot],
blockRoot,
partialDownload,
peerClient,
this.logger
Expand Down

0 comments on commit a73d81d

Please sign in to comment.