Skip to content

Commit

Permalink
feat(archiver): read blobs from cl (#11273)
Browse files Browse the repository at this point in the history
  • Loading branch information
Maddiaa0 authored Jan 27, 2025
1 parent 5ed155a commit 8cf4982
Show file tree
Hide file tree
Showing 55 changed files with 1,236 additions and 298 deletions.
46 changes: 44 additions & 2 deletions scripts/run_native_testnet.sh
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,11 @@ PROVER_SCRIPT="\"./prover-node.sh 8078 false\""
NUM_VALIDATORS=3
INTERLEAVED=false
METRICS=false
DISABLE_BLOB_SINK=false
LOG_LEVEL="info"
ETHEREUM_HOST=
L1_CONSENSUS_HOST_URL=

OTEL_COLLECTOR_ENDPOINT=${OTEL_COLLECTOR_ENDPOINT:-"http://localhost:4318"}

# Function to display help message
Expand All @@ -49,6 +53,9 @@ display_help() {
echo " -i Run interleaved (default: $INTERLEAVED)"
echo " -m Run with metrics (default: $METRICS) will use $OTEL_COLLECTOR_ENDPOINT as default otel endpoint"
echo " -c Specify the otel collector endpoint (default: $OTEL_COLLECTOR_ENDPOINT)"
echo " -b Disable the blob sink (default: false)"
echo " -e Specify the ethereum host url (default: $ETHEREUM_HOST)"
echo " -cl Specify the l1 consensus host url (default: $L1_CONSENSUS_HOST_URL)"
echo
echo "Example:"
echo " $0 -t ./test-4epochs.sh -val 5 -v"
Expand Down Expand Up @@ -97,6 +104,18 @@ while [[ $# -gt 0 ]]; do
OTEL_COLLECTOR_ENDPOINT="$2"
shift 2
;;
-e)
ETHEREUM_HOST="$2"
shift 2
;;
-cl)
L1_CONSENSUS_HOST_URL="$2"
shift 2
;;
-b)
DISABLE_BLOB_SINK=true
shift
;;
*)
echo "Invalid option: $1" >&2
display_help
Expand All @@ -115,6 +134,28 @@ if $METRICS; then
export LOG_JSON=1
fi

# If an ethereum rpc url is provided, use it
if [ -n "$ETHEREUM_HOST" ]; then
export ETHEREUM_HOST
fi
if [ -n "$L1_CONSENSUS_HOST_URL" ]; then
export L1_CONSENSUS_HOST_URL
fi

# If an ethereum url has been provided, do not run the ethereum.sh script
if [ -n "$ETHEREUM_HOST" ]; then
ETHEREUM_SCRIPT=""
else
ETHEREUM_SCRIPT="./ethereum.sh"
fi

# If the blob sink is disabled, do not run the blob-sink.sh script
if $DISABLE_BLOB_SINK; then
BLOB_SINK_SCRIPT=""
else
BLOB_SINK_SCRIPT="./blob-sink.sh"
fi

# Go to repo root
cd $(git rev-parse --show-toplevel)

Expand All @@ -124,11 +165,12 @@ BASE_CMD="INTERLEAVED=$INTERLEAVED ./yarn-project/end-to-end/scripts/native_netw
\"./deploy-l1-contracts.sh $NUM_VALIDATORS\" \
./deploy-l2-contracts.sh \
./boot-node.sh \
./ethereum.sh \
$ETHEREUM_SCRIPT \
\"./validators.sh $NUM_VALIDATORS\" \
$PROVER_SCRIPT \
./pxe.sh \
./transaction-bot.sh"
./transaction-bot.sh \
$BLOB_SINK_SCRIPT"

# Execute the command
eval $BASE_CMD
12 changes: 11 additions & 1 deletion spartan/aztec-network/files/config/setup-service-addresses.sh
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ get_service_address() {
echo "http://${NODE_IP}:${PORT}"
}

# Configure Ethereum address
# Configure Ethereum execution client address
if [ "${EXTERNAL_ETHEREUM_HOST}" != "" ]; then
ETHEREUM_ADDR="${EXTERNAL_ETHEREUM_HOST}"
elif [ "${NETWORK_PUBLIC}" = "true" ]; then
Expand All @@ -61,6 +61,15 @@ else
ETHEREUM_ADDR="http://${SERVICE_NAME}-eth-execution.${NAMESPACE}:${ETHEREUM_PORT}"
fi

# Configure Ethereum Consensus address
if [ "${EXTERNAL_ETHEREUM_CONSENSUS_HOST}" != "" ]; then
ETHEREUM_CONSENSUS_ADDR="${EXTERNAL_ETHEREUM_CONSENSUS_HOST}"
elif [ "${NETWORK_PUBLIC}" = "true" ]; then
ETHEREUM_CONSENSUS_ADDR=$(get_service_address "eth-beacon" "${ETHEREUM_CONSENSUS_PORT}")
else
ETHEREUM_CONSENSUS_ADDR="http://${SERVICE_NAME}-eth-beacon.${NAMESPACE}:${ETHEREUM_CONSENSUS_PORT}"
fi

# Configure Boot Node address
if [ "${BOOT_NODE_EXTERNAL_HOST}" != "" ]; then
BOOT_NODE_ADDR="${BOOT_NODE_EXTERNAL_HOST}"
Expand Down Expand Up @@ -93,6 +102,7 @@ fi

# Write addresses to file for sourcing
echo "export ETHEREUM_HOST=${ETHEREUM_ADDR}" >> /shared/config/service-addresses
echo "export L1_CONSENSUS_HOST_URL=${ETHEREUM_CONSENSUS_ADDR}" >> /shared/config/service-addresses
echo "export BOOT_NODE_HOST=${BOOT_NODE_ADDR}" >> /shared/config/service-addresses
echo "export PROVER_NODE_HOST=${PROVER_NODE_ADDR}" >> /shared/config/service-addresses
echo "export PROVER_BROKER_HOST=${PROVER_BROKER_ADDR}" >> /shared/config/service-addresses
Expand Down
4 changes: 4 additions & 0 deletions spartan/aztec-network/templates/_helpers.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,10 @@ Service Address Setup Container
value: "{{ .Values.ethereum.externalHost }}"
- name: ETHEREUM_PORT
value: "{{ .Values.ethereum.execution.service.port }}"
- name: EXTERNAL_ETHEREUM_CONSENSUS_HOST
value: "{{ .Values.ethereum.beacon.externalHost }}"
- name: ETHEREUM_CONSENSUS_PORT
value: "{{ .Values.ethereum.beacon.service.port }}"
- name: EXTERNAL_BOOT_NODE_HOST
value: "{{ .Values.bootNode.externalHost }}"
- name: BOOT_NODE_PORT
Expand Down
18 changes: 18 additions & 0 deletions yarn-project/archiver/src/archiver/archiver.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,8 @@ describe('Archiver', () => {

mockInbox.read.totalMessagesInserted.mockResolvedValueOnce(2n).mockResolvedValueOnce(6n);

blocks.forEach(b => blobSinkClient.getBlobSidecar.mockResolvedValueOnce([makeBlobFromBlock(b)]));

makeMessageSentEvent(98n, 1n, 0n);
makeMessageSentEvent(99n, 1n, 1n);
makeL2BlockProposedEvent(101n, 1n, blocks[0].archive.root.toString());
Expand Down Expand Up @@ -292,6 +294,7 @@ describe('Archiver', () => {
makeL2BlockProposedEvent(90n, 3n, badArchive);

rollupTxs.forEach(tx => publicClient.getTransaction.mockResolvedValueOnce(tx));
blocks.forEach(b => blobSinkClient.getBlobSidecar.mockResolvedValueOnce([makeBlobFromBlock(b)]));

await archiver.start(false);

Expand Down Expand Up @@ -330,6 +333,7 @@ describe('Archiver', () => {
makeL2BlockProposedEvent(80n, 2n, blocks[1].archive.root.toString());

rollupTxs.forEach(tx => publicClient.getTransaction.mockResolvedValueOnce(tx));
blocks.forEach(b => blobSinkClient.getBlobSidecar.mockResolvedValueOnce([makeBlobFromBlock(b)]));

await archiver.start(false);

Expand Down Expand Up @@ -378,6 +382,7 @@ describe('Archiver', () => {
makeL2BlockProposedEvent(80n, 2n, blocks[1].archive.root.toString());

rollupTxs.forEach(tx => publicClient.getTransaction.mockResolvedValueOnce(tx));
blocks.forEach(b => blobSinkClient.getBlobSidecar.mockResolvedValueOnce([makeBlobFromBlock(b)]));

await archiver.start(false);

Expand Down Expand Up @@ -424,6 +429,7 @@ describe('Archiver', () => {
mockRollup.read.status.mockResolvedValueOnce([0n, GENESIS_ROOT, 1n, l2Block.archive.root.toString(), GENESIS_ROOT]);
makeL2BlockProposedEvent(l1BlockForL2Block, 1n, l2Block.archive.root.toString());
rollupTxs.forEach(tx => publicClient.getTransaction.mockResolvedValueOnce(tx));
blocks.forEach(b => blobSinkClient.getBlobSidecar.mockResolvedValueOnce([makeBlobFromBlock(b)]));

await archiver.start(false);

Expand Down Expand Up @@ -454,7 +460,9 @@ describe('Archiver', () => {
publicClient.getBlockNumber.mockResolvedValueOnce(l1BlockForL2Block);
mockRollup.read.status.mockResolvedValueOnce([0n, GENESIS_ROOT, 1n, l2Block.archive.root.toString(), GENESIS_ROOT]);
makeL2BlockProposedEvent(l1BlockForL2Block, 1n, l2Block.archive.root.toString());

rollupTxs.forEach(tx => publicClient.getTransaction.mockResolvedValueOnce(tx));
blocks.forEach(b => blobSinkClient.getBlobSidecar.mockResolvedValueOnce([makeBlobFromBlock(b)]));

await archiver.start(false);

Expand Down Expand Up @@ -557,3 +565,13 @@ function makeRollupTx(l2Block: L2Block) {
});
return { input } as Transaction<bigint, number>;
}

/**
* Blob response to be returned from the blob sink based on the expected block.
* @param block - The block.
* @returns The blob.
*/
function makeBlobFromBlock(block: L2Block) {
const blob = block.body.toBlobFields();
return Blob.fromFields(blob);
}
13 changes: 11 additions & 2 deletions yarn-project/archiver/src/archiver/archiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ import {
import { type ArchiverDataStore, type ArchiverL1SynchPoint } from './archiver_store.js';
import { type ArchiverConfig } from './config.js';
import { retrieveBlocksFromRollup, retrieveL1ToL2Messages } from './data_retrieval.js';
import { NoBlobBodiesFoundError } from './errors.js';
import { ArchiverInstrumentation } from './instrumentation.js';
import { type DataRetrieval } from './structs/data_retrieval.js';
import { type L1Published } from './structs/published.js';
Expand Down Expand Up @@ -117,7 +118,7 @@ export class Archiver implements ArchiveSource, Traceable {
private readonly l1Addresses: { rollupAddress: EthAddress; inboxAddress: EthAddress; registryAddress: EthAddress },
readonly dataStore: ArchiverDataStore,
private readonly config: { pollingIntervalMs: number; batchSize: number },
private readonly _blobSinkClient: BlobSinkClientInterface,
private readonly blobSinkClient: BlobSinkClientInterface,
private readonly instrumentation: ArchiverInstrumentation,
private readonly l1constants: L1RollupConstants,
private readonly log: Logger = createLogger('archiver'),
Expand Down Expand Up @@ -200,7 +201,12 @@ export class Archiver implements ArchiveSource, Traceable {
await this.sync(blockUntilSynced);
}

this.runningPromise = new RunningPromise(() => this.sync(false), this.log, this.config.pollingIntervalMs);
this.runningPromise = new RunningPromise(() => this.sync(false), this.log, this.config.pollingIntervalMs, [
// Ignored errors will not log to the console
// We ignore NoBlobBodiesFound as the message may not have been passed to the blob sink yet
NoBlobBodiesFoundError,
]);

this.runningPromise.start();
}

Expand Down Expand Up @@ -470,9 +476,12 @@ export class Archiver implements ArchiveSource, Traceable {
[searchStartBlock, searchEndBlock] = this.nextRange(searchEndBlock, currentL1BlockNumber);

this.log.trace(`Retrieving L2 blocks from L1 block ${searchStartBlock} to ${searchEndBlock}`);

// TODO(md): Retreive from blob sink then from consensus client, then from peers
const retrievedBlocks = await retrieveBlocksFromRollup(
this.rollup,
this.publicClient,
this.blobSinkClient,
searchStartBlock, // TODO(palla/reorg): If the L2 reorg was due to an L1 reorg, we need to start search earlier
searchEndBlock,
this.log,
Expand Down
8 changes: 4 additions & 4 deletions yarn-project/archiver/src/archiver/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ export type ArchiverConfig = {
archiverUrl?: string;

/** URL for an L1 consensus client */
l1ConsensusClientUrl: string;
l1ConsensusHostUrl?: string;

/** The polling interval in ms for retrieving new L2 blocks and encrypted logs. */
archiverPollingIntervalMS?: number;
Expand All @@ -47,10 +47,10 @@ export const archiverConfigMappings: ConfigMappingsType<ArchiverConfig> = {
description:
'URL for an archiver service. If set, will return an archiver client as opposed to starting a new one.',
},
l1ConsensusClientUrl: {
env: 'L1_CONSENSUS_CLIENT_URL',
l1ConsensusHostUrl: {
env: 'L1_CONSENSUS_HOST_URL',
description: 'URL for an L1 consensus client.',
parseEnv: (val: string) => (val ? val : 'http://localhost:5052'),
parseEnv: (val: string) => val,
},
archiverPollingIntervalMS: {
env: 'ARCHIVER_POLLING_INTERVAL_MS',
Expand Down
26 changes: 22 additions & 4 deletions yarn-project/archiver/src/archiver/data_retrieval.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { type BlobSinkClientInterface } from '@aztec/blob-sink/client';
import { Body, InboxLeaf, L2Block } from '@aztec/circuit-types';
import { AppendOnlyTreeSnapshot, BlockHeader, Fr, Proof } from '@aztec/circuits.js';
import { asyncPool } from '@aztec/foundation/async-pool';
Expand All @@ -20,6 +21,7 @@ import {
hexToBytes,
} from 'viem';

import { NoBlobBodiesFoundError } from './errors.js';
import { type DataRetrieval } from './structs/data_retrieval.js';
import { type L1Published, type L1PublishedData } from './structs/published.js';

Expand All @@ -35,6 +37,7 @@ import { type L1Published, type L1PublishedData } from './structs/published.js';
export async function retrieveBlocksFromRollup(
rollup: GetContractReturnType<typeof RollupAbi, PublicClient<HttpTransport, Chain>>,
publicClient: PublicClient,
blobSinkClient: BlobSinkClientInterface,
searchStartBlock: bigint,
searchEndBlock: bigint,
logger: Logger = createLogger('archiver'),
Expand Down Expand Up @@ -63,7 +66,13 @@ export async function retrieveBlocksFromRollup(
`Got ${l2BlockProposedLogs.length} L2 block processed logs for L2 blocks ${l2BlockProposedLogs[0].args.blockNumber}-${lastLog.args.blockNumber} between L1 blocks ${searchStartBlock}-${searchEndBlock}`,
);

const newBlocks = await processL2BlockProposedLogs(rollup, publicClient, l2BlockProposedLogs, logger);
const newBlocks = await processL2BlockProposedLogs(
rollup,
publicClient,
blobSinkClient,
l2BlockProposedLogs,
logger,
);
retrievedBlocks.push(...newBlocks);
searchStartBlock = lastLog.blockNumber! + 1n;
} while (searchStartBlock <= searchEndBlock);
Expand All @@ -80,6 +89,7 @@ export async function retrieveBlocksFromRollup(
export async function processL2BlockProposedLogs(
rollup: GetContractReturnType<typeof RollupAbi, PublicClient<HttpTransport, Chain>>,
publicClient: PublicClient,
blobSinkClient: BlobSinkClientInterface,
logs: GetContractEventsReturnType<typeof RollupAbi, 'L2BlockProposed'>,
logger: Logger,
): Promise<L1Published<L2Block>[]> {
Expand All @@ -91,7 +101,7 @@ export async function processL2BlockProposedLogs(

// The value from the event and contract will match only if the block is in the chain.
if (archive === archiveFromChain) {
const block = await getBlockFromRollupTx(publicClient, log.transactionHash!, l2BlockNumber);
const block = await getBlockFromRollupTx(publicClient, blobSinkClient, log.transactionHash!, l2BlockNumber);

const l1: L1PublishedData = {
blockNumber: log.blockNumber,
Expand Down Expand Up @@ -127,10 +137,12 @@ export async function getL1BlockTime(publicClient: PublicClient, blockNumber: bi
*/
async function getBlockFromRollupTx(
publicClient: PublicClient,
blobSinkClient: BlobSinkClientInterface,
txHash: `0x${string}`,
l2BlockNum: bigint,
): Promise<L2Block> {
const { input: data } = await publicClient.getTransaction({ hash: txHash });
const { input: data, blockHash } = await publicClient.getTransaction({ hash: txHash });

const { functionName, args } = decodeFunctionData({ abi: RollupAbi, data });

const allowedMethods = ['propose', 'proposeAndClaim'];
Expand All @@ -156,12 +168,18 @@ async function getBlockFromRollupTx(
];

const header = BlockHeader.fromBuffer(Buffer.from(hexToBytes(decodedArgs.header)));

const blobBodies = await blobSinkClient.getBlobSidecar(blockHash);
if (blobBodies.length === 0) {
throw new NoBlobBodiesFoundError(Number(l2BlockNum));
}

const blockFields = blobBodies.flatMap(b => b.toEncodedFields());
// TODO(#9101): Retreiving the block body from calldata is a temporary soln before we have
// either a beacon chain client or link to some blob store. Web2 is ok because we will
// verify the block body vs the blob as below.
const blockBody = Body.fromBuffer(Buffer.from(hexToBytes(bodyHex)));

const blockFields = blockBody.toBlobFields();
// TODO(#9101): The below reconstruction is currently redundant, but once we extract blobs will be the way to construct blocks.
// The blob source will give us blockFields, and we must construct the body from them:
// TODO(#8954): When logs are refactored into fields, we won't need to inject them here.
Expand Down
5 changes: 5 additions & 0 deletions yarn-project/archiver/src/archiver/errors.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
export class NoBlobBodiesFoundError extends Error {
constructor(l2BlockNum: number) {
super(`No blob bodies found for block ${l2BlockNum}`);
}
}
2 changes: 2 additions & 0 deletions yarn-project/aztec-node/src/aztec-node/config.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { type ArchiverConfig, archiverConfigMappings } from '@aztec/archiver/config';
import { type BlobSinkConfig } from '@aztec/blob-sink/client';
import { type ConfigMappingsType, booleanConfigHelper, getConfigFromMappings } from '@aztec/foundation/config';
import { type DataStoreConfig, dataConfigMappings } from '@aztec/kv-store/config';
import { type P2PConfig, p2pConfigMappings } from '@aztec/p2p/config';
Expand All @@ -17,6 +18,7 @@ export { sequencerClientConfigMappings, SequencerClientConfig };
* The configuration the aztec node.
*/
export type AztecNodeConfig = ArchiverConfig &
BlobSinkConfig &
SequencerClientConfig &
ValidatorClientConfig &
ProverClientConfig &
Expand Down
2 changes: 1 addition & 1 deletion yarn-project/aztec-node/src/aztec-node/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ export class AztecNodeService implements AztecNode, Traceable {
const telemetry = deps.telemetry ?? getTelemetryClient();
const log = deps.logger ?? createLogger('node');
const dateProvider = deps.dateProvider ?? new DateProvider();
const blobSinkClient = deps.blobSinkClient ?? createBlobSinkClient(config.blobSinkUrl);
const blobSinkClient = deps.blobSinkClient ?? createBlobSinkClient(config);
const ethereumChain = createEthereumChain(config.l1RpcUrl, config.l1ChainId);
//validate that the actual chain id matches that specified in configuration
if (config.l1ChainId !== ethereumChain.chainInfo.id) {
Expand Down
Loading

0 comments on commit 8cf4982

Please sign in to comment.