diff --git a/core/types/block.go b/core/types/block.go index 995cf3c64f..1541125524 100644 --- a/core/types/block.go +++ b/core/types/block.go @@ -234,7 +234,8 @@ type Block struct { ReceivedFrom interface{} // blobs provides DA check - blobs BlobTxSidecars + blobs BlobTxSidecars + blobVersion uint32 } // "external" block encoding. used for eth protocol, etc. @@ -530,6 +531,11 @@ func (b *Block) WithBlobs(blobs BlobTxSidecars) *Block { return block } +// WithBlobs returns a block containing the given blobs. +func (b *Block) WithSidecarVersion(version uint32) { + b.blobVersion = version +} + // Hash returns the keccak256 hash of b's header. // The hash is computed on the first call and cached thereafter. func (b *Block) Hash() common.Hash { diff --git a/eth/handler.go b/eth/handler.go index b1fe53bb8c..3ebce5ec55 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -800,7 +800,7 @@ func (h *handler) BroadcastBlock(block *types.Block, propagate bool) { isCancun := h.chain.Config().IsCancun(block.Number(), block.Time()) for _, peer := range transfer { - if isCancun { + if !isCancun { peer.AsyncSendNewBlock(block, td) } else { peer.AsyncSendNewBlockAndBlob(block, td, params.BlobVersion, block.Blobs()) diff --git a/eth/handler_eth.go b/eth/handler_eth.go index be61d2e531..c192f42272 100644 --- a/eth/handler_eth.go +++ b/eth/handler_eth.go @@ -65,10 +65,7 @@ func (h *ethHandler) Handle(peer *eth.Peer, packet eth.Packet) error { return h.handleBlockAnnounces(peer, hashes, numbers) case *eth.NewBlockPacket: - return h.handleBlockBroadcast(peer, packet.Block, packet.TD) - - case *eth.NewBlockWithBlobPacket: - return h.handleBlockWithBlobBroadcast(peer, packet.Block, packet.TD, packet.Version, packet.Sidecars) + return h.handleBlockBroadcast(peer, packet) case *eth.NewPooledTransactionHashesPacket67: return h.txFetcher.Notify(peer.ID(), nil, nil, *packet) @@ -121,43 +118,26 @@ func (h *ethHandler) handleBlockAnnounces(peer *eth.Peer, hashes []common.Hash, // handleBlockBroadcast is invoked from a peer's message handler when it transmits a // block broadcast for the local node to process. -func (h *ethHandler) handleBlockBroadcast(peer *eth.Peer, block *types.Block, td *big.Int) error { +func (h *ethHandler) handleBlockBroadcast(peer *eth.Peer, packet *eth.NewBlockPacket) error { // Drop all incoming block announces from the p2p network if // the chain already entered the pos stage and disconnect the // remote peer. if h.merger.PoSFinalized() { return errors.New("disallowed block broadcast") } - // Schedule the block for import - h.blockFetcher.Enqueue(peer.ID(), block) - - // Assuming the block is importable by the peer, but possibly not yet done so, - // calculate the head hash and TD that the peer truly must have. - var ( - trueHead = block.ParentHash() - trueTD = new(big.Int).Sub(td, block.Difficulty()) - ) - // Update the peer's total difficulty if better than the previous - if _, td := peer.Head(); trueTD.Cmp(td) > 0 { - peer.SetHead(trueHead, trueTD) - h.chainSync.handlePeerEvent() + block := packet.Block + td := packet.TD + sidecars := packet.Sidecars + version := packet.Version + if sidecars != nil { + block = block.WithBlobs(sidecars) } - return nil -} - -// handleBlockBroadcast is invoked from a peer's message handler when it transmits a -// block broadcast for the local node to process. -func (h *ethHandler) handleBlockWithBlobBroadcast(peer *eth.Peer, block *types.Block, td *big.Int, version uint32, sidecars types.BlobTxSidecars) error { - // Drop all incoming block announces from the p2p network if - // the chain already entered the pos stage and disconnect the - // remote peer. - if h.merger.PoSFinalized() { - return errors.New("disallowed block broadcast") + if version != nil { + block.WithSidecarVersion(*version) } - // todo 4844 here enqueue in a fetcher that fetcher both block and blob - // todo OR have a separate fetcher for blobs that takes sidecars and saves + // Schedule the block for import - h.blockFetcher.Enqueue(peer.ID(), block.WithBlobs(sidecars)) // todo 4844 add version either in block or in sidecars + h.blockFetcher.Enqueue(peer.ID(), block) // Assuming the block is importable by the peer, but possibly not yet done so, // calculate the head hash and TD that the peer truly must have. diff --git a/eth/protocols/eth/handler_test.go b/eth/protocols/eth/handler_test.go index 310e75400b..4e48636420 100644 --- a/eth/protocols/eth/handler_test.go +++ b/eth/protocols/eth/handler_test.go @@ -17,6 +17,8 @@ package eth import ( + rand2 "crypto/rand" + "io" "math" "math/big" "math/rand" @@ -33,10 +35,14 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/vm" "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/crypto/kzg4844" + "github.com/ethereum/go-ethereum/eth/protocols/bsc" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/params" + "github.com/ethereum/go-ethereum/rlp" + "github.com/holiman/uint256" ) var ( @@ -146,7 +152,7 @@ func (b *testBackend) AcceptTxs() bool { panic("data processing tests should be done in the handler package") } func (b *testBackend) Handle(*Peer, Packet) error { - panic("data processing tests should be done in the handler package") + return nil } // Tests that block headers can be retrieved from a remote chain based on user queries. @@ -502,3 +508,159 @@ func testGetBlockReceipts(t *testing.T, protocol uint) { t.Errorf("receipts mismatch: %v", err) } } + +func TestHandleNewBlock(t *testing.T) { + t.Parallel() + + gen := func(n int, g *core.BlockGen) { + if n%2 == 0 { + w := &types.Withdrawal{ + Address: common.Address{0xaa}, + Amount: 42, + } + g.AddWithdrawal(w) + } + } + + backend := newTestBackendWithGenerator(maxBodiesServe+15, true, gen) + defer backend.close() + + peer, _ := newTestPeer("peer", ETH68, backend) + defer peer.close() + + v := new(uint32) + *v = 1 + genBlobs := makeBlkBlobs(1, 2) + tx1 := types.NewTx(&types.BlobTx{ + ChainID: new(uint256.Int).SetUint64(1), + GasTipCap: new(uint256.Int), + GasFeeCap: new(uint256.Int), + Gas: 0, + Value: new(uint256.Int), + Data: nil, + BlobFeeCap: new(uint256.Int), + BlobHashes: []common.Hash{common.HexToHash("0x34ec6e64f9cda8fe0451a391e4798085a3ef51a65ed1bfb016e34fc1a2028f8f"), common.HexToHash("0xb9a412e875f29fac436acde234f954e91173c4cf79814f6dcf630d8a6345747f")}, + Sidecar: genBlobs[0], + V: new(uint256.Int), + R: new(uint256.Int), + S: new(uint256.Int), + }) + sidecars := types.BlobTxSidecars{tx1.BlobTxSidecar()} + block := types.NewBlockWithHeader(&types.Header{ + Number: big.NewInt(0), + Extra: []byte("test block"), + UncleHash: types.EmptyUncleHash, + TxHash: types.EmptyTxsHash, + ReceiptHash: types.EmptyReceiptsHash, + }) + dataNil := NewBlockPacket{ + Block: block, + TD: big.NewInt(1), + Version: nil, + Sidecars: nil, + } + dataNonNil := NewBlockPacket{ + Block: block, + TD: big.NewInt(1), + Version: v, + Sidecars: sidecars, + } + sizeNonNil, rNonNil, _ := rlp.EncodeToReader(dataNonNil) + sizeNil, rNil, _ := rlp.EncodeToReader(dataNil) + + // Define the test cases + testCases := []struct { + name string + msg p2p.Msg + err error + }{ + { + name: "Valid block", + msg: p2p.Msg{ + Code: 1, + Size: uint32(sizeNonNil), + Payload: rNonNil, + }, + err: nil, + }, + { + name: "Nil sidecars", + msg: p2p.Msg{ + Code: 2, + Size: uint32(sizeNil), + Payload: rNil, + }, + err: nil, + }, + } + + protos := []p2p.Protocol{ + { + Name: "eth", + Version: ETH67, + }, + { + Name: "eth", + Version: ETH68, + }, + { + Name: "bsc", + Version: bsc.Bsc1, + }, + } + caps := []p2p.Cap{ + { + Name: "eth", + Version: ETH67, + }, + { + Name: "eth", + Version: ETH68, + }, + { + Name: "bsc", + Version: bsc.Bsc1, + }, + } + // Create a source handler to send messages through and a sink peer to receive them + p2pEthSrc, p2pEthSink := p2p.MsgPipe() + defer p2pEthSrc.Close() + defer p2pEthSink.Close() + + localEth := NewPeer(ETH68, p2p.NewPeerWithProtocols(enode.ID{1}, protos, "", caps), p2pEthSrc, nil) + + // Run the tests + for _, tc := range testCases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + err := handleNewBlock(backend, tc.msg, localEth) + if err != tc.err { + t.Errorf("expected error %v, got %v", tc.err, err) + } + }) + } + +} + +func makeBlkBlobs(n, nPerTx int) types.BlobTxSidecars { + if n <= 0 { + return nil + } + ret := make(types.BlobTxSidecars, n) + for i := 0; i < n; i++ { + blobs := make([]kzg4844.Blob, nPerTx) + commitments := make([]kzg4844.Commitment, nPerTx) + proofs := make([]kzg4844.Proof, nPerTx) + for i := 0; i < nPerTx; i++ { + io.ReadFull(rand2.Reader, blobs[i][:]) + commitments[i], _ = kzg4844.BlobToCommitment(blobs[i]) + proofs[i], _ = kzg4844.ComputeBlobProof(blobs[i], commitments[i]) + } + ret[i] = &types.BlobTxSidecar{ + Blobs: blobs, + Commitments: commitments, + Proofs: proofs, + } + } + return ret +} diff --git a/eth/protocols/eth/handlers.go b/eth/protocols/eth/handlers.go index 73e55fcd79..60b0163244 100644 --- a/eth/protocols/eth/handlers.go +++ b/eth/protocols/eth/handlers.go @@ -290,59 +290,33 @@ func handleNewBlockhashes(backend Backend, msg Decoder, peer *Peer) error { return backend.Handle(peer, ann) } -type BlockPacket interface { - GetBlock() *types.Block - SanityCheck() error - Name() string - Kind() byte -} - -// Implement the GetBlock method for both packet types -func (p *NewBlockPacket) GetBlock() *types.Block { - return p.Block -} - -func (p *NewBlockWithBlobPacket) GetBlock() *types.Block { - return p.Block -} - func handleNewBlock(backend Backend, msg Decoder, peer *Peer) error { - var packet BlockPacket - - // First, try to decode as NewBlockPacket - nbp := new(NewBlockPacket) - if err := msg.Decode(nbp); err == nil { - packet = nbp - } else { - // If that fails, try to decode as NewBlockWithBlobPacket - nbbp := new(NewBlockWithBlobPacket) - if err := msg.Decode(nbbp); err != nil { - return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) - } - packet = nbbp + // Retrieve and decode the propagated block + ann := new(NewBlockPacket) + if err := msg.Decode(ann); err != nil { + return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) } // Now that we have our packet, perform operations using the interface methods - if err := packet.SanityCheck(); err != nil { + if err := ann.sanityCheck(); err != nil { return err } - block := packet.GetBlock() - if hash := types.CalcUncleHash(block.Uncles()); hash != block.UncleHash() { - log.Warn("Propagated block has invalid uncles", "have", hash, "exp", block.UncleHash()) + if hash := types.CalcUncleHash(ann.Block.Uncles()); hash != ann.Block.UncleHash() { + log.Warn("Propagated block has invalid uncles", "have", hash, "exp", ann.Block.UncleHash()) return nil // TODO(karalabe): return error eventually, but wait a few releases } - if hash := types.DeriveSha(block.Transactions(), trie.NewStackTrie(nil)); hash != block.TxHash() { - log.Warn("Propagated block has invalid body", "have", hash, "exp", block.TxHash()) + if hash := types.DeriveSha(ann.Block.Transactions(), trie.NewStackTrie(nil)); hash != ann.Block.TxHash() { + log.Warn("Propagated block has invalid body", "have", hash, "exp", ann.Block.TxHash()) return nil // TODO(karalabe): return error eventually, but wait a few releases } - block.ReceivedAt = msg.Time() - block.ReceivedFrom = peer + ann.Block.ReceivedAt = msg.Time() + ann.Block.ReceivedFrom = peer // Mark the peer as owning the block - peer.markBlock(block.Hash()) + peer.markBlock(ann.Block.Hash()) - return backend.Handle(peer, packet) + return backend.Handle(peer, ann) } func handleBlockHeaders(backend Backend, msg Decoder, peer *Peer) error { diff --git a/eth/protocols/eth/peer.go b/eth/protocols/eth/peer.go index c9a2318464..e542473abc 100644 --- a/eth/protocols/eth/peer.go +++ b/eth/protocols/eth/peer.go @@ -333,10 +333,10 @@ func (p *Peer) SendNewBlock(block *types.Block, td *big.Int) error { func (p *Peer) SendNewBlockAndBlob(block *types.Block, td *big.Int, version uint32, sidecars types.BlobTxSidecars) error { // Mark all the block hash as known, but ensure we don't overflow our limits p.knownBlockAndBlobs.Add(block.Hash()) // todo 4844 check if adding only the block hash is okay - return p2p.Send(p.rw, NewBlockMsg, &NewBlockWithBlobPacket{ + return p2p.Send(p.rw, NewBlockMsg, &NewBlockPacket{ Block: block, TD: td, - Version: version, + Version: &version, Sidecars: sidecars, }) } diff --git a/eth/protocols/eth/protocol.go b/eth/protocols/eth/protocol.go index e7f52cdd9f..e4047cd026 100644 --- a/eth/protocols/eth/protocol.go +++ b/eth/protocols/eth/protocol.go @@ -216,20 +216,14 @@ type BlockHeadersRLPPacket struct { // NewBlockPacket is the network packet for the block propagation message. type NewBlockPacket struct { - Block *types.Block - TD *big.Int -} - -type NewBlockWithBlobPacket struct { - Block *types.Block - TD *big.Int - Version uint32 // 0: followed by sidecar array - // others: invalid, undefined right now - Sidecars types.BlobTxSidecars // each BlobTx will have a BlobTxSidecar + Block *types.Block + TD *big.Int + Version *uint32 `rlp:"optional"` + Sidecars types.BlobTxSidecars `rlp:"optional"` } // sanityCheck verifies that the values are reasonable, as a DoS protection -func (request *NewBlockPacket) SanityCheck() error { +func (request *NewBlockPacket) sanityCheck() error { if err := request.Block.SanityCheck(); err != nil { return err } @@ -238,26 +232,15 @@ func (request *NewBlockPacket) SanityCheck() error { if tdlen := request.TD.BitLen(); tdlen > 100 { return fmt.Errorf("too large block TD: bitlen %d", tdlen) } - return nil -} - -// sanityCheck verifies that the values are reasonable, as a DoS protection -func (request *NewBlockWithBlobPacket) SanityCheck() error { - if err := request.Block.SanityCheck(); err != nil { - return err - } - //TD at mainnet block #7753254 is 76 bits. If it becomes 100 million times - // larger, it will still fit within 100 bits - if tdlen := request.TD.BitLen(); tdlen > 100 { - return fmt.Errorf("too large block TD: bitlen %d", tdlen) - } - // todo 4844 do full sanity check for blob - for _, sidecar := range request.Sidecars { - lProofs := len(sidecar.Proofs) - lBlobs := len(sidecar.Blobs) - lCommitments := len(sidecar.Commitments) - if lProofs != lBlobs || lProofs != lCommitments || lCommitments != lBlobs { - return fmt.Errorf("mismatch of lengths of sidecar proofs %d, blobs %d, commitments %d", lProofs, lBlobs, lCommitments) + if len(request.Sidecars) > 0 { + // todo 4844 do full sanity check for blob + for _, sidecar := range request.Sidecars { + lProofs := len(sidecar.Proofs) + lBlobs := len(sidecar.Blobs) + lCommitments := len(sidecar.Commitments) + if lProofs != lBlobs || lProofs != lCommitments || lCommitments != lBlobs { + return fmt.Errorf("mismatch of lengths of sidecar proofs %d, blobs %d, commitments %d", lProofs, lBlobs, lCommitments) + } } } return nil @@ -410,9 +393,6 @@ func (*BlockBodiesResponse) Kind() byte { return BlockBodiesMsg } func (*NewBlockPacket) Name() string { return "NewBlock" } func (*NewBlockPacket) Kind() byte { return NewBlockMsg } -func (*NewBlockWithBlobPacket) Name() string { return "NewBlock" } -func (*NewBlockWithBlobPacket) Kind() byte { return NewBlockMsg } - func (*NewPooledTransactionHashesPacket67) Name() string { return "NewPooledTransactionHashes" } func (*NewPooledTransactionHashesPacket67) Kind() byte { return NewPooledTransactionHashesMsg } func (*NewPooledTransactionHashesPacket68) Name() string { return "NewPooledTransactionHashes" }