Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into preload-blocks-on-c…
Browse files Browse the repository at this point in the history
…lient-start
  • Loading branch information
acolytec3 committed Feb 20, 2023
2 parents 5396cdb + 4aa9753 commit 73095a8
Show file tree
Hide file tree
Showing 14 changed files with 201 additions and 144 deletions.
115 changes: 79 additions & 36 deletions packages/client/lib/miner/pendingBlock.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,15 @@
import { BlockHeader } from '@ethereumjs/block'
import { BlobEIP4844Transaction } from '@ethereumjs/tx'
import { randomBytes } from 'crypto'
import {
TypeOutput,
bigIntToUnpaddedBuffer,
bufferToHex,
toBuffer,
toType,
zeros,
} from '@ethereumjs/util'
import { BuildStatus } from '@ethereumjs/vm/dist/buildBlock'
import { keccak256 } from 'ethereum-cryptography/keccak'

import type { Config } from '../config'
import type { TxPool } from '../service/txpool'
Expand Down Expand Up @@ -33,20 +42,42 @@ interface BlobBundle {
* For now this simple implementation just adds txs from the pool when
* started and called.
*/

// Max two payload to be cached
const MAX_PAYLOAD_CACHE = 2

export class PendingBlock {
config: Config
txPool: TxPool
pendingPayloads: [payloadId: Buffer, builder: BlockBuilder][] = []
blobBundles: Map<string, BlobBundle>

pendingPayloads: Map<string, BlockBuilder> = new Map()
blobBundles: Map<string, BlobBundle> = new Map()

private skipHardForkValidation?: boolean

constructor(opts: PendingBlockOpts) {
this.config = opts.config
this.txPool = opts.txPool
this.blobBundles = new Map()
this.skipHardForkValidation = opts.skipHardForkValidation
}

pruneSetToMax(maxItems: number): number {
let itemsToDelete = this.pendingPayloads.size - maxItems
const deletedItems = Math.max(0, itemsToDelete)

if (itemsToDelete > 0) {
// keys are in fifo order
for (const payloadId of this.pendingPayloads.keys()) {
this.stop(payloadId)
itemsToDelete--
if (itemsToDelete <= 0) {
break
}
}
}
return deletedItems
}

/**
* Starts building a pending block with the given payload
* @returns an 8-byte payload identifier to call {@link BlockBuilder.build} with
Expand All @@ -58,9 +89,30 @@ export class PendingBlock {
withdrawals?: WithdrawalData[]
) {
const number = parentBlock.header.number + BigInt(1)
const { timestamp } = headerData
const { timestamp, mixHash } = headerData
const { gasLimit } = parentBlock.header

// payload is uniquely defined by timestamp, parent and mixHash, gasLimit can also be
// potentially included in the fcU in future and can be safely added in uniqueness calc
const timestampBuf = bigIntToUnpaddedBuffer(toType(timestamp ?? 0, TypeOutput.BigInt))
const gasLimitBuf = bigIntToUnpaddedBuffer(gasLimit)
const mixHashBuf = toType(mixHash!, TypeOutput.Buffer) ?? zeros(32)
const payloadIdBuffer = toBuffer(
keccak256(Buffer.concat([parentBlock.hash(), mixHashBuf, timestampBuf, gasLimitBuf])).slice(
0,
8
)
)
const payloadId = bufferToHex(payloadIdBuffer)

// If payload has already been triggered, then return the payloadid
if (this.pendingPayloads.get(payloadId)) {
return payloadIdBuffer
}

// Prune the builders and blobbundles
this.pruneSetToMax(MAX_PAYLOAD_CACHE)

if (typeof vm.blockchain.getTotalDifficulty !== 'function') {
throw new Error('cannot get iterator head: blockchain has no getTotalDifficulty function')
}
Expand Down Expand Up @@ -92,8 +144,7 @@ export class PendingBlock {
},
})

const payloadId = randomBytes(8)
this.pendingPayloads.push([payloadId, builder])
this.pendingPayloads.set(payloadId, builder)

// Add current txs in pool
const txs = await this.txPool.txsByPriceAndNonce(vm, baseFeePerGas)
Expand Down Expand Up @@ -122,18 +173,6 @@ export class PendingBlock {
`Pending: Assembled block full (gasLeft: ${gasLimit - builder.gasUsed})`
)
}
} else if ((error as Error).message.includes('tx has a different hardfork than the vm')) {
// We can here decide to keep a tx in pool if it belongs to future hf
// but for simplicity just remove the tx as the sender can always retransmit
// the tx
this.txPool.removeByHash(txs[index].hash().toString('hex'))
this.config.logger.error(
`Pending: Removed from txPool tx 0x${txs[index]
.hash()
.toString('hex')} having different hf=${txs[
index
].common.hardfork()} than block vm hf=${vm._common.hardfork()}`
)
} else {
// If there is an error adding a tx, it will be skipped
this.config.logger.debug(
Expand Down Expand Up @@ -163,33 +202,40 @@ export class PendingBlock {
)
this.constructBlobsBundle(payloadId, blobTxs, header.hash())
}
return payloadId
return payloadIdBuffer
}

/**
* Stops a pending payload
*/
stop(payloadId: Buffer) {
const payload = this.pendingPayloads.find((p) => p[0].equals(payloadId))
if (!payload) return
stop(payloadIdBuffer: Buffer | string) {
const payloadId =
typeof payloadIdBuffer !== 'string' ? bufferToHex(payloadIdBuffer) : payloadIdBuffer
const builder = this.pendingPayloads.get(payloadId)
if (builder === undefined) return
// Revert blockBuilder
void payload[1].revert()
void builder.revert()
// Remove from pendingPayloads
this.pendingPayloads = this.pendingPayloads.filter((p) => !p[0].equals(payloadId))
this.blobBundles.delete('0x' + payloadId.toString())
this.pendingPayloads.delete(payloadId)
this.blobBundles.delete(payloadId)
}

/**
* Returns the completed block
*/
async build(
payloadId: Buffer
payloadIdBuffer: Buffer | string
): Promise<void | [block: Block, receipts: TxReceipt[], value: bigint]> {
const payload = this.pendingPayloads.find((p) => p[0].equals(payloadId))
if (!payload) {
const payloadId =
typeof payloadIdBuffer !== 'string' ? bufferToHex(payloadIdBuffer) : payloadIdBuffer
const builder = this.pendingPayloads.get(payloadId)
if (!builder) {
return
}
const builder = payload[1]
const blockStatus = builder.getStatus()
if (blockStatus.status === BuildStatus.Build) {
return [blockStatus.block, builder.transactionReceipts, builder.minerValue]
}
const { vm, headerData } = builder as any

// Add new txs that the pool received
Expand Down Expand Up @@ -259,9 +305,6 @@ export class PendingBlock {
this.constructBlobsBundle(payloadId, blobTxs, block.header.hash())
}

// Remove from pendingPayloads
this.pendingPayloads = this.pendingPayloads.filter((p) => !p[0].equals(payloadId))

return [block, builder.transactionReceipts, builder.minerValue]
}

Expand All @@ -272,13 +315,13 @@ export class PendingBlock {
* @param blockHash the blockhash of the pending block (computed from the header data provided)
*/
private constructBlobsBundle = (
payloadId: Buffer,
payloadId: string,
txs: BlobEIP4844Transaction[],
blockHash: Buffer
) => {
let blobs: Buffer[] = []
let kzgCommitments: Buffer[] = []
const bundle = this.blobBundles.get('0x' + payloadId.toString('hex'))
const bundle = this.blobBundles.get(payloadId)
if (bundle !== undefined) {
blobs = bundle.blobs
kzgCommitments = bundle.kzgCommitments
Expand All @@ -291,7 +334,7 @@ export class PendingBlock {
kzgCommitments = kzgCommitments.concat(tx.kzgCommitments!)
}
}
this.blobBundles.set('0x' + payloadId.toString('hex'), {
this.blobBundles.set(payloadId, {
blockHash: '0x' + blockHash.toString('hex'),
blobs,
kzgCommitments,
Expand Down
2 changes: 2 additions & 0 deletions packages/client/lib/net/server/rlpxserver.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { DPT as Devp2pDPT, RLPx as Devp2pRLPx } from '@ethereumjs/devp2p'

import { Event } from '../../types'
import { getClientVersion } from '../../util'
import { RlpxPeer } from '../peer/rlpxpeer'

import { Server } from './server'
Expand Down Expand Up @@ -238,6 +239,7 @@ export class RlpxServer extends Server {
private async initRlpx() {
return new Promise<void>((resolve) => {
this.rlpx = new Devp2pRLPx(this.key, {
clientId: Buffer.from(getClientVersion()),
dpt: this.dpt!,
maxPeers: this.config.maxPeers,
capabilities: RlpxPeer.capabilities(Array.from(this.protocols)),
Expand Down
14 changes: 7 additions & 7 deletions packages/client/lib/rpc/modules/engine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ const payloadAttributesFieldValidatorsV1 = {
}
const payloadAttributesFieldValidatorsV2 = {
...payloadAttributesFieldValidatorsV1,
withdrawals: validators.array(validators.withdrawal()),
withdrawals: validators.optional(validators.array(validators.withdrawal())),
}
/**
* Formats a block to {@link ExecutionPayloadV1}.
Expand Down Expand Up @@ -624,15 +624,17 @@ export class Engine {

async newPayloadV2(params: [ExecutionPayloadV2 | ExecutionPayloadV1]): Promise<PayloadStatusV1> {
const shanghaiTimestamp = this.chain.config.chainCommon.hardforkTimestamp(Hardfork.Shanghai)
const withdrawals = (params[0] as ExecutionPayloadV2).withdrawals

if (shanghaiTimestamp === null || parseInt(params[0].timestamp) < shanghaiTimestamp) {
if ('withdrawals' in params[0]) {
if (withdrawals !== undefined && withdrawals !== null) {
throw {
code: INVALID_PARAMS,
message: 'ExecutionPayloadV1 MUST be used before Shanghai is activated',
}
}
} else if (parseInt(params[0].timestamp) >= shanghaiTimestamp) {
if (!('withdrawals' in params[0]) || params[0].withdrawals === null) {
if (withdrawals === undefined || withdrawals === null) {
throw {
code: INVALID_PARAMS,
message: 'ExecutionPayloadV2 MUST be used after Shanghai is activated',
Expand Down Expand Up @@ -895,7 +897,8 @@ export class Engine {
if (payloadAttributes !== undefined && payloadAttributes !== null) {
const shanghaiTimestamp = this.chain.config.chainCommon.hardforkTimestamp(Hardfork.Shanghai)
const ts = BigInt(payloadAttributes.timestamp)
if ('withdrawals' in payloadAttributes) {
const withdrawals = (payloadAttributes as PayloadAttributesV2).withdrawals
if (withdrawals !== undefined && withdrawals !== null) {
if (ts < shanghaiTimestamp!) {
throw {
code: INVALID_PARAMS,
Expand Down Expand Up @@ -996,13 +999,10 @@ export class Engine {
const payloadId = params[0]

const bundle = this.pendingBlock.blobBundles.get(payloadId)

if (bundle === undefined) {
throw EngineError.UnknownPayload
}

// Remove built blocks once retrieved by CL layer
this.pendingBlock.blobBundles.delete(payloadId)
return {
blockHash: bundle.blockHash,
kzgs: bundle.kzgCommitments.map((commitment) => '0x' + commitment.toString('hex')),
Expand Down
10 changes: 7 additions & 3 deletions packages/client/test/miner/miner.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -213,10 +213,14 @@ tape('[Miner]', async (t) => {

// disable consensus to skip PoA block signer validation
;(vm.blockchain.consensus as CliqueConsensus).cliqueActiveSigners = () => [A.address] // stub
// tx as at Harfork.Berlin so lets change the vm's hardfork

chain.putBlocks = (blocks: Block[]) => {
t.equal(blocks[0].transactions.length, 0, 'new block should not include tx')
t.equal(txPool.txsInPool, 0, 'transaction should also have been removed from pool')
t.equal(
blocks[0].transactions.length,
0,
'new block should not include tx due to hardfork mismatch'
)
t.equal(txPool.txsInPool, 1, 'transaction should remain in pool')
miner.stop()
txPool.stop()
}
Expand Down
Loading

0 comments on commit 73095a8

Please sign in to comment.