Skip to content

Commit

Permalink
client: Add support for storage and bytecode fetching (#2345)
Browse files Browse the repository at this point in the history
* Add account fetcher base

Add accountfetcher import

Add AccountFetcher as possible type for Synchronizer.fetcher

Place call to getAccountRange inside of fetcher

Place call to getAccountRange() in accountfetcher and comment it out

Add account fetcher base

Add accountfetcher import

add account fetcher getter setter in snapsync

Change order of importing accountfetcher in index file

Change bytes parameter to be per task

Remove root and bytes from task inputs and make them fetcher variables

Correct log message

Add debug console log statement

Fix linting issues

Add account to mpt and check validity with root and proof

Set root of trie

Add checks to fetcher.request()

client/snap: fix getAccountRange return type

client/snap: pass first proof

client/snap: add utility to convert slim account to a normal RLPd account

client/snap: implement account range db dump

Update to use verifyRangeProof

Correct some messages

Update verifyProofRange input for first account hash to be fetcher origin

Fix linting issues

Store accounts in store phase

Add logic for dividing hash ranges and adding them as tasks

Increment count by 1 before next iteration

client/snap: remove unnecessary account fetcher logic

client/snap: correctly feed the right values to verifyRangeProof

lint fixes

small cleanup

fix account fetcher with previous fixes

overhaul and simplify the fetcher and add partial results handling

cleanup comments

fix fetch spec tests

Experiment with putting accounts into DefaultStateManager and CheckpointTrie

Use return value of verifyRangeProof for checking if there are more accounts left to fetch in the range

Remove unused function

Export storage data

Create storage fetcher

Remove comment

Modify debug message

Update comments to reflect specs

Modify comments and change storage fetcher to fetch only single account

WIP: Queue storage fetches when accounts are received

WIP:  Continue work on storage fetcher

Add storage fetcher tests

Comment out storage fetcher integration until multi-fetcher sync support is added

WIP: Initialize and run storage fetcher in account fetcher

Add account field to JobTask type for storage fetcher and add enqueueByAccountList

Add accounts for storage fetching in account fetcher

Enable single account fetches in storage fetcher

Save changes so far

Index account body for storageRoot for use with storageFetcher

Update comments and print statements

Add custom debuggers to new fetchers

Add limit check for continueing a task after partial resluts

Add limit check for continueing a task after partial resluts and clean up comments

Optimize by removing invalidated tasks; Terminate using new conditions

Update comments

WIP: Implement multi-account storage requests

WIP: Continue development of multi-account fetches and optimizing storage fetcher

WIP: Impelemnt multi-account fetching

WIP: Debug task null error

Add some checks for peer storage response

Switch structure of post-fetch validation

Debug storage fetcher: Set starting origin to 0 and troubleshoot request logic

Aggregate partial results in embedded array

Use larger task ranges for storage fetcher

Set first and count in each task request

Debug range logic

Cleanup code and fix task generation loop

Fix one-off error

Clean up comments and logging in accountfetcher

Improve logging

Refactor and clean up storagefetcher

Add commented code snippet for demo

Use config value for maxRangeBytes

Return results in the case of a single, no-proof slot payload

Only enqueue storageRequests if more than 0 exist

Run account fetcher in syncWithPeer

Fix linting issues

* Update tests

* Bufferize storage root if it is not a buffer already

* Update storage fetcher tests

* Move storage request processing in account fetcher into store phase

* Update comments

* Fix linting issues

* Update comments

* Update comments

* Use config value for maxAccountRange

* Update comment

* Add tests for requests and proof verification

* Initialize chain using helper

* Setup to dev/test snapsync with sim architecture

* modfiy single-run to setup a lodestar<>geth node to snapsync from

* setup an ethereumjs inline client and get it to peer with geth

* cleanup setup a bit

* snapsync run spec

* get the snap testdev sim working

* finalize the test infra and update usage doc

* enhance coverage

* Fix lint error

* Setup to dev/test snapsync with sim architecture

* modfiy single-run to setup a lodestar<>geth node to snapsync from

* setup an ethereumjs inline client and get it to peer with geth

* cleanup setup a bit

* snapsync run spec

* get the snap testdev sim working

* finalize the test infra and update usage doc

* enhance coverage

* Use geth RPC to connect to ethJS

* refac wait for snap sync completion

* Emit snap sync completion event in accountfetcher

* Modify fetcher termination condition

* Cluster snap config items together

* Index account range starting from 0

* Sync fetchers using helper

* Put storage slots into tries

* Use destroyWhenDone to terminate storage fetcher

* End fetcher if finished tasks is greater than or equal to total

* setup writer just once if fetcher not destroyed

* cleanup and codeflow simplification

* cleanup

* fix accountspec

* increase coverage

* add some more coverage

* lint

* increase storagefetcher coverage

* further enhance storagefetcher coverage

* improve cov

---------

Co-authored-by: harkamal <[email protected]>
Co-authored-by: acolytec3 <[email protected]>
  • Loading branch information
3 people authored Mar 26, 2023
1 parent f0166f5 commit e2ec03c
Show file tree
Hide file tree
Showing 10 changed files with 1,272 additions and 50 deletions.
9 changes: 8 additions & 1 deletion packages/client/lib/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,8 @@ export interface ConfigOptions {
* The time after which synced state is downgraded to unsynced
*/
syncedStateRemovalPeriod?: number

maxStorageRange?: bigint
}

export class Config {
Expand Down Expand Up @@ -299,7 +301,10 @@ export class Config {
public static readonly SKELETON_SUBCHAIN_MERGE_MINIMUM = 1000
public static readonly MAX_RANGE_BYTES = 50000
// This should get like 100 accounts in this range
public static readonly MAX_ACCOUNT_RANGE = BigInt(2) ** BigInt(256) / BigInt(1_000_000)
public static readonly MAX_ACCOUNT_RANGE =
(BigInt(2) ** BigInt(256) - BigInt(1)) / BigInt(1_000_000)
// Larger ranges used for storage slots since assumption is slots should be much sparser than accounts
public static readonly MAX_STORAGE_RANGE = (BigInt(2) ** BigInt(256) - BigInt(1)) / BigInt(10)
public static readonly SYNCED_STATE_REMOVAL_PERIOD = 60000

public readonly logger: Logger
Expand Down Expand Up @@ -335,6 +340,7 @@ export class Config {
public readonly skeletonSubchainMergeMinimum: number
public readonly maxRangeBytes: number
public readonly maxAccountRange: bigint
public readonly maxStorageRange: bigint
public readonly syncedStateRemovalPeriod: number

public readonly disableBeaconSync: boolean
Expand Down Expand Up @@ -388,6 +394,7 @@ export class Config {
options.skeletonSubchainMergeMinimum ?? Config.SKELETON_SUBCHAIN_MERGE_MINIMUM
this.maxRangeBytes = options.maxRangeBytes ?? Config.MAX_RANGE_BYTES
this.maxAccountRange = options.maxAccountRange ?? Config.MAX_ACCOUNT_RANGE
this.maxStorageRange = options.maxStorageRange ?? Config.MAX_STORAGE_RANGE
this.syncedStateRemovalPeriod =
options.syncedStateRemovalPeriod ?? Config.SYNCED_STATE_REMOVAL_PERIOD

Expand Down
14 changes: 13 additions & 1 deletion packages/client/lib/net/protocol/snapprotocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,25 @@ type GetAccountRangeOpts = {
type GetStorageRangesOpts = {
reqId?: bigint
root: Buffer

// If multiple accounts' storage is requested, serving nodes
// should reply with the entire storage ranges (thus no Merkle
// proofs needed), up to the first contract which exceeds the
// packet limit. If the last included storage range does not
// fit entirely, a Merkle proof must be attached to that and
// only that.
// If a single account's storage is requested, serving nodes
// should only return slots starting with the requested
// starting hash, up to the last one or until the packet fills
// up. It the entire storage range is not being returned, a
// Merkle proof must be attached.
accounts: Buffer[]
origin: Buffer
limit: Buffer
bytes: bigint
}

type StorageData = {
export type StorageData = {
hash: Buffer
body: Buffer
}
Expand Down
214 changes: 172 additions & 42 deletions packages/client/lib/sync/fetcher/accountfetcher.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,29 @@
import { Trie } from '@ethereumjs/trie'
import { accountBodyToRLP, bigIntToBuffer, bufferToBigInt, setLengthLeft } from '@ethereumjs/util'
import {
KECCAK256_RLP,
accountBodyToRLP,
bigIntToBuffer,
bufArrToArr,
bufferToBigInt,
bufferToHex,
setLengthLeft,
} from '@ethereumjs/util'
import { debug as createDebugLogger } from 'debug'

import { LevelDB } from '../../execution/level'
import { Event } from '../../types'
import { short } from '../../util'

import { Fetcher } from './fetcher'
import { StorageFetcher } from './storagefetcher'

import type { Peer } from '../../net/peer'
import type { AccountData } from '../../net/protocol/snapprotocol'
import type { EventBusType } from '../../types'
import type { FetcherOptions } from './fetcher'
import type { StorageRequest } from './storagefetcher'
import type { Job } from './types'
import type { Debugger } from 'debug'

type AccountDataResponse = AccountData[] & { completed?: boolean }

Expand Down Expand Up @@ -39,7 +53,41 @@ export type JobTask = {
count: bigint
}

export type FetcherDoneFlags = {
storageFetcherDone: boolean
accountFetcherDone: boolean
eventBus?: EventBusType | undefined
stateRoot?: Buffer | undefined
}

export function snapFetchersCompleted(
fetcherDoneFlags: FetcherDoneFlags,
fetcherType: Object,
root?: Buffer,
eventBus?: EventBusType
) {
switch (fetcherType) {
// eslint-disable-next-line @typescript-eslint/no-use-before-define
case AccountFetcher:
fetcherDoneFlags.accountFetcherDone = true
fetcherDoneFlags.stateRoot = root
fetcherDoneFlags.eventBus = eventBus
break
case StorageFetcher:
fetcherDoneFlags.storageFetcherDone = true
break
}
if (fetcherDoneFlags.accountFetcherDone && fetcherDoneFlags.storageFetcherDone) {
fetcherDoneFlags.eventBus!.emit(
Event.SYNC_SNAPSYNC_COMPLETE,
bufArrToArr(fetcherDoneFlags.stateRoot as Buffer)
)
}
}

export class AccountFetcher extends Fetcher<JobTask, AccountData[], AccountData> {
protected debug: Debugger

/**
* The stateRoot for the fetcher which sorts of pin it to a snapshot.
* This might eventually be removed as the snapshots are moving and not static
Expand All @@ -52,23 +100,51 @@ export class AccountFetcher extends Fetcher<JobTask, AccountData[], AccountData>
/** The range to eventually, by default should be set at BigInt(2) ** BigInt(256) + BigInt(1) - first */
count: bigint

storageFetcher: StorageFetcher

accountTrie: Trie

accountToStorageTrie: Map<String, Trie>

fetcherDoneFlags: FetcherDoneFlags = {
storageFetcherDone: false,
accountFetcherDone: false,
}

/**
* Create new block fetcher
*/
constructor(options: AccountFetcherOptions) {
super(options)

this.root = options.root
this.first = options.first
this.count = options.count ?? BigInt(2) ** BigInt(256) - this.first
this.accountTrie = new Trie({ useKeyHashing: false })
this.accountToStorageTrie = new Map()
this.debug = createDebugLogger('client:AccountFetcher')
this.storageFetcher = new StorageFetcher({
config: this.config,
pool: this.pool,
root: this.root,
storageRequests: [],
first: BigInt(1),
destroyWhenDone: false,
accountToStorageTrie: this.accountToStorageTrie,
})
this.storageFetcher.fetch().then(
() => snapFetchersCompleted(this.fetcherDoneFlags, StorageFetcher),
() => {
throw Error('Snap fetcher failed to exit')
}
)

const fullJob = { task: { first: this.first, count: this.count } } as Job<
const syncRange = { task: { first: this.first, count: this.count } } as Job<
JobTask,
AccountData[],
AccountData
>
const origin = this.getOrigin(fullJob)
const limit = this.getLimit(fullJob)
const origin = this.getOrigin(syncRange)
const limit = this.getLimit(syncRange)

this.debug(
`Account fetcher instantiated root=${short(this.root)} origin=${short(origin)} limit=${short(
Expand All @@ -83,9 +159,9 @@ export class AccountFetcher extends Fetcher<JobTask, AccountData[], AccountData>
{ accounts, proof }: { accounts: AccountData[]; proof: Buffer[] }
): Promise<boolean> {
this.debug(
`verifyRangeProof accounts:${accounts.length} first=${short(accounts[0].hash)} last=${short(
accounts[accounts.length - 1].hash
)}`
`verifyRangeProof accounts:${accounts.length} first=${bufferToHex(
accounts[0].hash
)} last=${short(accounts[accounts.length - 1].hash)}`
)

for (let i = 0; i < accounts.length - 1; i++) {
Expand Down Expand Up @@ -123,6 +199,18 @@ export class AccountFetcher extends Fetcher<JobTask, AccountData[], AccountData>
return setLengthLeft(limit, 32)
}

private isMissingRightRange(
limit: Buffer,
{ accounts, proof: _proof }: { accounts: AccountData[]; proof: Buffer[] }
): boolean {
if (accounts.length > 0 && accounts[accounts.length - 1]?.hash.compare(limit) >= 0) {
return false
} else {
// TODO: Check if there is a proof of missing limit in state
return true
}
}

/**
* Request results from peer for the given job.
* Resolves with the raw result
Expand All @@ -143,41 +231,45 @@ export class AccountFetcher extends Fetcher<JobTask, AccountData[], AccountData>
limit,
bytes: BigInt(this.config.maxRangeBytes),
})

const peerInfo = `id=${peer?.id.slice(0, 8)} address=${peer?.address}`

// eslint-disable-next-line eqeqeq
if (rangeResult === undefined) {
return undefined
} else {
// validate the proof
try {
// verifyRangeProof will also verify validate there are no missed states between origin and
// response data and returns a boolean indiciating if there are more accounts remaining to fetch
// in the specified range
const isMissingRightRange: boolean = await this.verifyRangeProof(
this.root,
origin,
rangeResult
)
}

// Check if there is any pending data to be synced to the right
let completed: boolean
if (isMissingRightRange) {
this.debug(
`Peer ${peerInfo} returned missing right range account=${rangeResult.accounts[
rangeResult.accounts.length - 1
].hash.toString('hex')} limit=${limit.toString('hex')}`
)
completed = false
} else {
completed = true
}
return Object.assign([], rangeResult.accounts, { completed })
} catch (err) {
throw Error(`InvalidAccountRange: ${err}`)
if (
rangeResult.accounts.length === 0 ||
limit.compare(bigIntToBuffer(BigInt(2) ** BigInt(256))) === 0
) {
// TODO have to check proof of nonexistence -- as a shortcut for now, we can mark as completed if a proof is present
if (rangeResult.proof.length > 0) {
this.debug(`Data for last range has been received`)
// response contains empty object so that task can be terminated in store phase and not reenqueued
return Object.assign([], [Object.create(null) as any], { completed: true })
}
}

const peerInfo = `id=${peer?.id.slice(0, 8)} address=${peer?.address}`
// validate the proof
try {
// verifyRangeProof will also verify validate there are no missed states between origin and
// response data
const isMissingRightRange = await this.verifyRangeProof(this.root, origin, rangeResult)

// Check if there is any pending data to be synced to the right
let completed: boolean
if (isMissingRightRange && this.isMissingRightRange(limit, rangeResult)) {
this.debug(
`Peer ${peerInfo} returned missing right range account=${rangeResult.accounts[
rangeResult.accounts.length - 1
].hash.toString('hex')} limit=${limit.toString('hex')}`
)
completed = false
} else {
completed = true
}
return Object.assign([], rangeResult.accounts, { completed })
} catch (err) {
throw Error(`InvalidAccountRange: ${err}`)
}
}

/**
Expand Down Expand Up @@ -207,6 +299,39 @@ export class AccountFetcher extends Fetcher<JobTask, AccountData[], AccountData>
*/
async store(result: AccountData[]): Promise<void> {
this.debug(`Stored ${result.length} accounts in account trie`)

// TODO fails to handle case where there is a proof of non existence and returned accounts for last requested range
if (JSON.stringify(result[0]) === JSON.stringify(Object.create(null))) {
this.debug('Final range received with no elements remaining to the right')

// TODO include stateRoot in emission once moved over to using MPT's
await this.accountTrie.persistRoot()
snapFetchersCompleted(
this.fetcherDoneFlags,
AccountFetcher,
this.accountTrie.root(),
this.config.events
)
return
}
const storageFetchRequests: StorageRequest[] = []
for (const account of result) {
await this.accountTrie.put(account.hash, accountBodyToRLP(account.body))

// build record of accounts that need storage slots to be fetched
const storageRoot: Buffer =
account.body[2] instanceof Buffer ? account.body[2] : Buffer.from(account.body[2])
if (storageRoot.compare(KECCAK256_RLP) !== 0) {
storageFetchRequests.push({
accountHash: account.hash,
storageRoot,
first: BigInt(0),
count: BigInt(2) ** BigInt(256) - BigInt(1),
})
}
}
if (storageFetchRequests.length > 0)
this.storageFetcher.enqueueByStorageRequestList(storageFetchRequests)
}

/**
Expand Down Expand Up @@ -252,14 +377,19 @@ export class AccountFetcher extends Fetcher<JobTask, AccountData[], AccountData>
}

nextTasks(): void {
if (this.in.length === 0 && this.count > BigInt(0)) {
const fullJob = { task: { first: this.first, count: this.count } } as Job<
if (
this.in.length === 0 &&
this.count > BigInt(0) &&
this.processed - this.finished < this.config.maxFetcherRequests
) {
// pendingRange is for which new tasks need to be generated
const pendingRange = { task: { first: this.first, count: this.count } } as Job<
JobTask,
AccountData[],
AccountData
>
const origin = this.getOrigin(fullJob)
const limit = this.getLimit(fullJob)
const origin = this.getOrigin(pendingRange)
const limit = this.getLimit(pendingRange)

this.debug(`Fetcher pending with origin=${short(origin)} limit=${short(limit)}`)
const tasks = this.tasks()
Expand Down
Loading

0 comments on commit e2ec03c

Please sign in to comment.