From 1bd1313b71e4dd7402e764a45dbdcf0f97f2a86e Mon Sep 17 00:00:00 2001 From: Satyajit Das Date: Wed, 13 Mar 2024 08:07:39 +0530 Subject: [PATCH] blobTx: mining + brodcasting (#2253) --- core/types/tx_blob.go | 9 ++ eth/downloader/downloader.go | 6 +- eth/downloader/fetchers_concurrent_bodies.go | 4 +- eth/downloader/queue.go | 4 +- eth/downloader/queue_test.go | 2 +- eth/fetcher/block_fetcher.go | 3 +- eth/handler.go | 1 + eth/handler_eth.go | 11 +- eth/protocols/eth/broadcast.go | 11 +- eth/protocols/eth/handler_test.go | 162 ++++++++++++++++++- eth/protocols/eth/handlers.go | 23 ++- eth/protocols/eth/peer.go | 13 +- eth/protocols/eth/protocol.go | 32 +++- miner/worker.go | 3 +- 14 files changed, 255 insertions(+), 29 deletions(-) diff --git a/core/types/tx_blob.go b/core/types/tx_blob.go index 1931d18f66..eaf3fbc5ea 100644 --- a/core/types/tx_blob.go +++ b/core/types/tx_blob.go @@ -54,6 +54,15 @@ type BlobTx struct { type BlobTxSidecars []*BlobTxSidecar +// Len returns the length of s. +func (s BlobTxSidecars) Len() int { return len(s) } + +// EncodeIndex encodes the i'th BlobTxSidecar to w. Note that this does not check for errors +// because we assume that BlobTxSidecars will only ever contain valid sidecars +func (s BlobTxSidecars) EncodeIndex(i int, w *bytes.Buffer) { + rlp.Encode(w, s[i]) +} + // BlobTxSidecar contains the blobs of a blob transaction. type BlobTxSidecar struct { Blobs []kzg4844.Blob // Blobs needed by the blob pool diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index a382a929a0..19d7ba7434 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -1388,7 +1388,7 @@ func (d *Downloader) importBlockResults(results []*fetchResult) error { ) blocks := make([]*types.Block, len(results)) for i, result := range results { - blocks[i] = types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles).WithWithdrawals(result.Withdrawals) + blocks[i] = types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles).WithWithdrawals(result.Withdrawals).WithBlobs(result.Sidecars) } // Downloaded blocks are always regarded as trusted after the // transition. Because the downloaded chain is guided by the @@ -1599,7 +1599,7 @@ func (d *Downloader) commitSnapSyncData(results []*fetchResult, stateSync *state blocks := make([]*types.Block, len(results)) receipts := make([]types.Receipts, len(results)) for i, result := range results { - blocks[i] = types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles).WithWithdrawals(result.Withdrawals) + blocks[i] = types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles).WithWithdrawals(result.Withdrawals).WithBlobs(result.Sidecars) receipts[i] = result.Receipts } if index, err := d.blockchain.InsertReceiptChain(blocks, receipts, d.ancientLimit); err != nil { @@ -1610,7 +1610,7 @@ func (d *Downloader) commitSnapSyncData(results []*fetchResult, stateSync *state } func (d *Downloader) commitPivotBlock(result *fetchResult) error { - block := types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles).WithWithdrawals(result.Withdrawals) + block := types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles).WithWithdrawals(result.Withdrawals).WithBlobs(result.Sidecars) log.Debug("Committing snap sync pivot as new head", "number", block.Number(), "hash", block.Hash()) // Commit the pivot block as the new head, will require full sync from here on diff --git a/eth/downloader/fetchers_concurrent_bodies.go b/eth/downloader/fetchers_concurrent_bodies.go index 5105fda66b..49f3644fd9 100644 --- a/eth/downloader/fetchers_concurrent_bodies.go +++ b/eth/downloader/fetchers_concurrent_bodies.go @@ -89,10 +89,10 @@ func (q *bodyQueue) request(peer *peerConnection, req *fetchRequest, resCh chan // deliver is responsible for taking a generic response packet from the concurrent // fetcher, unpacking the body data and delivering it to the downloader's queue. func (q *bodyQueue) deliver(peer *peerConnection, packet *eth.Response) (int, error) { - txs, uncles, withdrawals := packet.Res.(*eth.BlockBodiesResponse).Unpack() + txs, uncles, withdrawals, sidecars := packet.Res.(*eth.BlockBodiesResponse).Unpack() hashsets := packet.Meta.([][]common.Hash) // {txs hashes, uncle hashes, withdrawal hashes} - accepted, err := q.queue.DeliverBodies(peer.id, txs, hashsets[0], uncles, hashsets[1], withdrawals, hashsets[2]) + accepted, err := q.queue.DeliverBodies(peer.id, txs, hashsets[0], uncles, hashsets[1], withdrawals, hashsets[2], sidecars) switch { case err == nil && len(txs) == 0: peer.log.Trace("Requested bodies delivered") diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go index 07c0e79d3a..6510115171 100644 --- a/eth/downloader/queue.go +++ b/eth/downloader/queue.go @@ -71,6 +71,7 @@ type fetchResult struct { Transactions types.Transactions Receipts types.Receipts Withdrawals types.Withdrawals + Sidecars types.BlobTxSidecars } func newFetchResult(header *types.Header, fastSync bool, pid string) *fetchResult { @@ -776,7 +777,7 @@ func (q *queue) DeliverHeaders(id string, headers []*types.Header, hashes []comm // also wakes any threads waiting for data delivery. func (q *queue) DeliverBodies(id string, txLists [][]*types.Transaction, txListHashes []common.Hash, uncleLists [][]*types.Header, uncleListHashes []common.Hash, - withdrawalLists [][]*types.Withdrawal, withdrawalListHashes []common.Hash) (int, error) { + withdrawalLists [][]*types.Withdrawal, withdrawalListHashes []common.Hash, sidecars [][]*types.BlobTxSidecar) (int, error) { q.lock.Lock() defer q.lock.Unlock() @@ -838,6 +839,7 @@ func (q *queue) DeliverBodies(id string, txLists [][]*types.Transaction, txListH result.Transactions = txLists[index] result.Uncles = uncleLists[index] result.Withdrawals = withdrawalLists[index] + result.Sidecars = sidecars[index] result.SetBodyDone() } return q.deliver(id, q.blockTaskPool, q.blockTaskQueue, q.blockPendPool, diff --git a/eth/downloader/queue_test.go b/eth/downloader/queue_test.go index 50b9031a27..c10483b5b2 100644 --- a/eth/downloader/queue_test.go +++ b/eth/downloader/queue_test.go @@ -341,7 +341,7 @@ func XTestDelivery(t *testing.T) { uncleHashes[i] = types.CalcUncleHash(uncles) } time.Sleep(100 * time.Millisecond) - _, err := q.DeliverBodies(peer.id, txset, txsHashes, uncleset, uncleHashes, nil, nil) + _, err := q.DeliverBodies(peer.id, txset, txsHashes, uncleset, uncleHashes, nil, nil, nil) if err != nil { fmt.Printf("delivered %d bodies %v\n", len(txset), err) } diff --git a/eth/fetcher/block_fetcher.go b/eth/fetcher/block_fetcher.go index c574b80498..1bf8bae0a7 100644 --- a/eth/fetcher/block_fetcher.go +++ b/eth/fetcher/block_fetcher.go @@ -576,7 +576,8 @@ func (f *BlockFetcher) loop() { case res := <-resCh: res.Done <- nil // Ignoring withdrawals here, since the block fetcher is not used post-merge. - txs, uncles, _ := res.Res.(*eth.BlockBodiesResponse).Unpack() + // todo 4844 is it ok to ignore sidecars here too? + txs, uncles, _, _ := res.Res.(*eth.BlockBodiesResponse).Unpack() f.FilterBodies(peer, txs, uncles, time.Now()) case <-timeout.C: diff --git a/eth/handler.go b/eth/handler.go index bb72a94812..9f3f7147b1 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -794,6 +794,7 @@ func (h *handler) BroadcastBlock(block *types.Block, propagate bool) { } else { transfer = peers[:int(math.Sqrt(float64(len(peers))))] } + for _, peer := range transfer { peer.AsyncSendNewBlock(block, td) } diff --git a/eth/handler_eth.go b/eth/handler_eth.go index 0b3da37928..c647c6196e 100644 --- a/eth/handler_eth.go +++ b/eth/handler_eth.go @@ -65,7 +65,7 @@ func (h *ethHandler) Handle(peer *eth.Peer, packet eth.Packet) error { return h.handleBlockAnnounces(peer, hashes, numbers) case *eth.NewBlockPacket: - return h.handleBlockBroadcast(peer, packet.Block, packet.TD) + return h.handleBlockBroadcast(peer, packet) case *eth.NewPooledTransactionHashesPacket67: return h.txFetcher.Notify(peer.ID(), nil, nil, *packet) @@ -118,13 +118,20 @@ func (h *ethHandler) handleBlockAnnounces(peer *eth.Peer, hashes []common.Hash, // handleBlockBroadcast is invoked from a peer's message handler when it transmits a // block broadcast for the local node to process. -func (h *ethHandler) handleBlockBroadcast(peer *eth.Peer, block *types.Block, td *big.Int) error { +func (h *ethHandler) handleBlockBroadcast(peer *eth.Peer, packet *eth.NewBlockPacket) error { // Drop all incoming block announces from the p2p network if // the chain already entered the pos stage and disconnect the // remote peer. if h.merger.PoSFinalized() { return errors.New("disallowed block broadcast") } + block := packet.Block + td := packet.TD + sidecars := packet.Sidecars + if sidecars != nil { + block = block.WithBlobs(sidecars) + } + // Schedule the block for import h.blockFetcher.Enqueue(peer.ID(), block) diff --git a/eth/protocols/eth/broadcast.go b/eth/protocols/eth/broadcast.go index c02e2fa60e..27312fd066 100644 --- a/eth/protocols/eth/broadcast.go +++ b/eth/protocols/eth/broadcast.go @@ -33,8 +33,9 @@ const ( // blockPropagation is a block propagation event, waiting for its turn in the // broadcast queue. type blockPropagation struct { - block *types.Block - td *big.Int + block *types.Block + td *big.Int + sidecars types.BlobTxSidecars `rlp:"optional"` } // broadcastBlocks is a write loop that multiplexes blocks and block announcements @@ -47,7 +48,11 @@ func (p *Peer) broadcastBlocks() { if err := p.SendNewBlock(prop.block, prop.td); err != nil { return } - p.Log().Trace("Propagated block", "number", prop.block.Number(), "hash", prop.block.Hash(), "td", prop.td) + if len(prop.sidecars) > 0 { + p.Log().Trace("Propagated block", "number", prop.block.Number(), "hash", prop.block.Hash(), "td", prop.td, "sidecars", prop.sidecars.Len()) + } else { + p.Log().Trace("Propagated block", "number", prop.block.Number(), "hash", prop.block.Hash(), "td", prop.td) + } case block := <-p.queuedBlockAnns: if err := p.SendNewBlockHashes([]common.Hash{block.Hash()}, []uint64{block.NumberU64()}); err != nil { diff --git a/eth/protocols/eth/handler_test.go b/eth/protocols/eth/handler_test.go index 310e75400b..137d53946e 100644 --- a/eth/protocols/eth/handler_test.go +++ b/eth/protocols/eth/handler_test.go @@ -17,6 +17,8 @@ package eth import ( + rand2 "crypto/rand" + "io" "math" "math/big" "math/rand" @@ -33,10 +35,14 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/vm" "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/crypto/kzg4844" + "github.com/ethereum/go-ethereum/eth/protocols/bsc" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/params" + "github.com/ethereum/go-ethereum/rlp" + "github.com/holiman/uint256" ) var ( @@ -146,7 +152,7 @@ func (b *testBackend) AcceptTxs() bool { panic("data processing tests should be done in the handler package") } func (b *testBackend) Handle(*Peer, Packet) error { - panic("data processing tests should be done in the handler package") + return nil } // Tests that block headers can be retrieved from a remote chain based on user queries. @@ -502,3 +508,157 @@ func testGetBlockReceipts(t *testing.T, protocol uint) { t.Errorf("receipts mismatch: %v", err) } } + +func TestHandleNewBlock(t *testing.T) { + t.Parallel() + + gen := func(n int, g *core.BlockGen) { + if n%2 == 0 { + w := &types.Withdrawal{ + Address: common.Address{0xaa}, + Amount: 42, + } + g.AddWithdrawal(w) + } + } + + backend := newTestBackendWithGenerator(maxBodiesServe+15, true, gen) + defer backend.close() + + peer, _ := newTestPeer("peer", ETH68, backend) + defer peer.close() + + v := new(uint32) + *v = 1 + genBlobs := makeBlkBlobs(1, 2) + tx1 := types.NewTx(&types.BlobTx{ + ChainID: new(uint256.Int).SetUint64(1), + GasTipCap: new(uint256.Int), + GasFeeCap: new(uint256.Int), + Gas: 0, + Value: new(uint256.Int), + Data: nil, + BlobFeeCap: new(uint256.Int), + BlobHashes: []common.Hash{common.HexToHash("0x34ec6e64f9cda8fe0451a391e4798085a3ef51a65ed1bfb016e34fc1a2028f8f"), common.HexToHash("0xb9a412e875f29fac436acde234f954e91173c4cf79814f6dcf630d8a6345747f")}, + Sidecar: genBlobs[0], + V: new(uint256.Int), + R: new(uint256.Int), + S: new(uint256.Int), + }) + sidecars := types.BlobTxSidecars{tx1.BlobTxSidecar()} + block := types.NewBlockWithHeader(&types.Header{ + Number: big.NewInt(0), + Extra: []byte("test block"), + UncleHash: types.EmptyUncleHash, + TxHash: types.EmptyTxsHash, + ReceiptHash: types.EmptyReceiptsHash, + }) + dataNil := NewBlockPacket{ + Block: block, + TD: big.NewInt(1), + Sidecars: nil, + } + dataNonNil := NewBlockPacket{ + Block: block, + TD: big.NewInt(1), + Sidecars: sidecars, + } + sizeNonNil, rNonNil, _ := rlp.EncodeToReader(dataNonNil) + sizeNil, rNil, _ := rlp.EncodeToReader(dataNil) + + // Define the test cases + testCases := []struct { + name string + msg p2p.Msg + err error + }{ + { + name: "Valid block", + msg: p2p.Msg{ + Code: 1, + Size: uint32(sizeNonNil), + Payload: rNonNil, + }, + err: nil, + }, + { + name: "Nil sidecars", + msg: p2p.Msg{ + Code: 2, + Size: uint32(sizeNil), + Payload: rNil, + }, + err: nil, + }, + } + + protos := []p2p.Protocol{ + { + Name: "eth", + Version: ETH67, + }, + { + Name: "eth", + Version: ETH68, + }, + { + Name: "bsc", + Version: bsc.Bsc1, + }, + } + caps := []p2p.Cap{ + { + Name: "eth", + Version: ETH67, + }, + { + Name: "eth", + Version: ETH68, + }, + { + Name: "bsc", + Version: bsc.Bsc1, + }, + } + // Create a source handler to send messages through and a sink peer to receive them + p2pEthSrc, p2pEthSink := p2p.MsgPipe() + defer p2pEthSrc.Close() + defer p2pEthSink.Close() + + localEth := NewPeer(ETH68, p2p.NewPeerWithProtocols(enode.ID{1}, protos, "", caps), p2pEthSrc, nil) + + // Run the tests + for _, tc := range testCases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + err := handleNewBlock(backend, tc.msg, localEth) + if err != tc.err { + t.Errorf("expected error %v, got %v", tc.err, err) + } + }) + } + +} + +func makeBlkBlobs(n, nPerTx int) types.BlobTxSidecars { + if n <= 0 { + return nil + } + ret := make(types.BlobTxSidecars, n) + for i := 0; i < n; i++ { + blobs := make([]kzg4844.Blob, nPerTx) + commitments := make([]kzg4844.Commitment, nPerTx) + proofs := make([]kzg4844.Proof, nPerTx) + for i := 0; i < nPerTx; i++ { + io.ReadFull(rand2.Reader, blobs[i][:]) + commitments[i], _ = kzg4844.BlobToCommitment(blobs[i]) + proofs[i], _ = kzg4844.ComputeBlobProof(blobs[i], commitments[i]) + } + ret[i] = &types.BlobTxSidecar{ + Blobs: blobs, + Commitments: commitments, + Proofs: proofs, + } + } + return ret +} diff --git a/eth/protocols/eth/handlers.go b/eth/protocols/eth/handlers.go index 069e92dadf..7ab2bce707 100644 --- a/eth/protocols/eth/handlers.go +++ b/eth/protocols/eth/handlers.go @@ -224,10 +224,24 @@ func ServiceGetBlockBodiesQuery(chain *core.BlockChain, query GetBlockBodiesRequ lookups >= 2*maxBodiesServe { break } - if data := chain.GetBodyRLP(hash); len(data) != 0 { - bodies = append(bodies, data) - bytes += len(data) + body := chain.GetBody(hash) + if body == nil { + continue + } + blobs := chain.GetBlobsByHash(hash) + bodyWithBlobs := &BlockBody{ + Transactions: body.Transactions, + Uncles: body.Uncles, + Withdrawals: body.Withdrawals, + Sidecars: blobs, + } + enc, err := rlp.EncodeToBytes(bodyWithBlobs) + if err != nil { + log.Error("block body encode err", "hash", hash, "err", err) + continue } + bodies = append(bodies, enc) + bytes += len(enc) } return bodies } @@ -293,9 +307,12 @@ func handleNewBlock(backend Backend, msg Decoder, peer *Peer) error { if err := msg.Decode(ann); err != nil { return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) } + + // Now that we have our packet, perform operations using the interface methods if err := ann.sanityCheck(); err != nil { return err } + if hash := types.CalcUncleHash(ann.Block.Uncles()); hash != ann.Block.UncleHash() { log.Warn("Propagated block has invalid uncles", "have", hash, "exp", ann.Block.UncleHash()) return nil // TODO(karalabe): return error eventually, but wait a few releases diff --git a/eth/protocols/eth/peer.go b/eth/protocols/eth/peer.go index c58d133d1a..df23b4e727 100644 --- a/eth/protocols/eth/peer.go +++ b/eth/protocols/eth/peer.go @@ -318,16 +318,23 @@ func (p *Peer) SendNewBlock(block *types.Block, td *big.Int) error { // Mark all the block hash as known, but ensure we don't overflow our limits p.knownBlocks.Add(block.Hash()) return p2p.Send(p.rw, NewBlockMsg, &NewBlockPacket{ - Block: block, - TD: td, + Block: block, + TD: td, + Sidecars: block.Blobs(), }) } // AsyncSendNewBlock queues an entire block for propagation to a remote peer. If // the peer's broadcast queue is full, the event is silently dropped. func (p *Peer) AsyncSendNewBlock(block *types.Block, td *big.Int) { + bp := &blockPropagation{ + block: block, + td: td, + sidecars: block.Blobs(), + } + select { - case p.queuedBlocks <- &blockPropagation{block: block, td: td}: + case p.queuedBlocks <- bp: // Mark all the block hash as known, but ensure we don't overflow our limits p.knownBlocks.Add(block.Hash()) default: diff --git a/eth/protocols/eth/protocol.go b/eth/protocols/eth/protocol.go index 18d33cdb72..2caa65c0ed 100644 --- a/eth/protocols/eth/protocol.go +++ b/eth/protocols/eth/protocol.go @@ -216,8 +216,9 @@ type BlockHeadersRLPPacket struct { // NewBlockPacket is the network packet for the block propagation message. type NewBlockPacket struct { - Block *types.Block - TD *big.Int + Block *types.Block + TD *big.Int + Sidecars []*types.BlobTxSidecar `rlp:"optional"` } // sanityCheck verifies that the values are reasonable, as a DoS protection @@ -230,6 +231,19 @@ func (request *NewBlockPacket) sanityCheck() error { if tdlen := request.TD.BitLen(); tdlen > 100 { return fmt.Errorf("too large block TD: bitlen %d", tdlen) } + + if len(request.Sidecars) > 0 { + // todo 4844 do full sanity check for blob + for _, sidecar := range request.Sidecars { + lProofs := len(sidecar.Proofs) + lBlobs := len(sidecar.Blobs) + lCommitments := len(sidecar.Commitments) + if lProofs != lBlobs || lProofs != lCommitments || lCommitments != lBlobs { + return fmt.Errorf("mismatch of lengths of sidecar proofs %d, blobs %d, commitments %d", lProofs, lBlobs, lCommitments) + } + } + } + return nil } @@ -265,24 +279,26 @@ type BlockBodiesRLPPacket struct { // BlockBody represents the data content of a single block. type BlockBody struct { - Transactions []*types.Transaction // Transactions contained within a block - Uncles []*types.Header // Uncles contained within a block - Withdrawals []*types.Withdrawal `rlp:"optional"` // Withdrawals contained within a block + Transactions []*types.Transaction // Transactions contained within a block + Uncles []*types.Header // Uncles contained within a block + Withdrawals []*types.Withdrawal `rlp:"optional"` // Withdrawals contained within a block + Sidecars []*types.BlobTxSidecar `rlp:"optional"` // Sidecars contained within a block } // Unpack retrieves the transactions and uncles from the range packet and returns // them in a split flat format that's more consistent with the internal data structures. -func (p *BlockBodiesResponse) Unpack() ([][]*types.Transaction, [][]*types.Header, [][]*types.Withdrawal) { +func (p *BlockBodiesResponse) Unpack() ([][]*types.Transaction, [][]*types.Header, [][]*types.Withdrawal, [][]*types.BlobTxSidecar) { // TODO(matt): add support for withdrawals to fetchers var ( txset = make([][]*types.Transaction, len(*p)) uncleset = make([][]*types.Header, len(*p)) withdrawalset = make([][]*types.Withdrawal, len(*p)) + sidecarset = make([][]*types.BlobTxSidecar, len(*p)) ) for i, body := range *p { - txset[i], uncleset[i], withdrawalset[i] = body.Transactions, body.Uncles, body.Withdrawals + txset[i], uncleset[i], withdrawalset[i], sidecarset[i] = body.Transactions, body.Uncles, body.Withdrawals, body.Sidecars } - return txset, uncleset, withdrawalset + return txset, uncleset, withdrawalset, sidecarset } // GetReceiptsRequest represents a block receipts query. diff --git a/miner/worker.go b/miner/worker.go index b960f8b565..addbe83179 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -641,7 +641,6 @@ func (w *worker) resultLoop() { writeBlockTimer.UpdateSince(start) log.Info("Successfully sealed new block", "number", block.Number(), "sealhash", sealhash, "hash", hash, "elapsed", common.PrettyDuration(time.Since(task.createdAt))) - // Broadcast the block and announce chain insertion event w.mux.Post(core.NewMinedBlockEvent{Block: block}) case <-w.exitCh: @@ -1265,6 +1264,8 @@ func (w *worker) commit(env *environment, interval func(), update bool, start ti // https://github.com/ethereum/go-ethereum/issues/24299 env := env.copy() + block = block.WithBlobs(env.sidecars) + // If we're post merge, just ignore if !w.isTTDReached(block.Header()) { select {