Skip to content

Commit

Permalink
eth: remove NewBlockWithBlobPacket
Browse files Browse the repository at this point in the history
  • Loading branch information
emailtovamos committed Mar 12, 2024
1 parent eca640b commit 851ae4a
Show file tree
Hide file tree
Showing 7 changed files with 212 additions and 110 deletions.
8 changes: 7 additions & 1 deletion core/types/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,8 @@ type Block struct {
ReceivedFrom interface{}

// blobs provides DA check
blobs BlobTxSidecars
blobs BlobTxSidecars
blobVersion uint32
}

// "external" block encoding. used for eth protocol, etc.
Expand Down Expand Up @@ -530,6 +531,11 @@ func (b *Block) WithBlobs(blobs BlobTxSidecars) *Block {
return block
}

// WithBlobs returns a block containing the given blobs.
func (b *Block) WithSidecarVersion(version uint32) {
b.blobVersion = version
}

// Hash returns the keccak256 hash of b's header.
// The hash is computed on the first call and cached thereafter.
func (b *Block) Hash() common.Hash {
Expand Down
2 changes: 1 addition & 1 deletion eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -800,7 +800,7 @@ func (h *handler) BroadcastBlock(block *types.Block, propagate bool) {
isCancun := h.chain.Config().IsCancun(block.Number(), block.Time())

for _, peer := range transfer {
if isCancun {
if !isCancun {
peer.AsyncSendNewBlock(block, td)
} else {
peer.AsyncSendNewBlockAndBlob(block, td, params.BlobVersion, block.Blobs())
Expand Down
44 changes: 12 additions & 32 deletions eth/handler_eth.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,7 @@ func (h *ethHandler) Handle(peer *eth.Peer, packet eth.Packet) error {
return h.handleBlockAnnounces(peer, hashes, numbers)

case *eth.NewBlockPacket:
return h.handleBlockBroadcast(peer, packet.Block, packet.TD)

case *eth.NewBlockWithBlobPacket:
return h.handleBlockWithBlobBroadcast(peer, packet.Block, packet.TD, packet.Version, packet.Sidecars)
return h.handleBlockBroadcast(peer, packet)

case *eth.NewPooledTransactionHashesPacket67:
return h.txFetcher.Notify(peer.ID(), nil, nil, *packet)
Expand Down Expand Up @@ -121,43 +118,26 @@ func (h *ethHandler) handleBlockAnnounces(peer *eth.Peer, hashes []common.Hash,

// handleBlockBroadcast is invoked from a peer's message handler when it transmits a
// block broadcast for the local node to process.
func (h *ethHandler) handleBlockBroadcast(peer *eth.Peer, block *types.Block, td *big.Int) error {
func (h *ethHandler) handleBlockBroadcast(peer *eth.Peer, packet *eth.NewBlockPacket) error {
// Drop all incoming block announces from the p2p network if
// the chain already entered the pos stage and disconnect the
// remote peer.
if h.merger.PoSFinalized() {
return errors.New("disallowed block broadcast")
}
// Schedule the block for import
h.blockFetcher.Enqueue(peer.ID(), block)

// Assuming the block is importable by the peer, but possibly not yet done so,
// calculate the head hash and TD that the peer truly must have.
var (
trueHead = block.ParentHash()
trueTD = new(big.Int).Sub(td, block.Difficulty())
)
// Update the peer's total difficulty if better than the previous
if _, td := peer.Head(); trueTD.Cmp(td) > 0 {
peer.SetHead(trueHead, trueTD)
h.chainSync.handlePeerEvent()
block := packet.Block
td := packet.TD
sidecars := packet.Sidecars
version := packet.Version
if sidecars != nil {
block = block.WithBlobs(sidecars)
}
return nil
}

// handleBlockBroadcast is invoked from a peer's message handler when it transmits a
// block broadcast for the local node to process.
func (h *ethHandler) handleBlockWithBlobBroadcast(peer *eth.Peer, block *types.Block, td *big.Int, version uint32, sidecars types.BlobTxSidecars) error {
// Drop all incoming block announces from the p2p network if
// the chain already entered the pos stage and disconnect the
// remote peer.
if h.merger.PoSFinalized() {
return errors.New("disallowed block broadcast")
if version != nil {
block.WithSidecarVersion(*version)
}
// todo 4844 here enqueue in a fetcher that fetcher both block and blob
// todo OR have a separate fetcher for blobs that takes sidecars and saves

// Schedule the block for import
h.blockFetcher.Enqueue(peer.ID(), block.WithBlobs(sidecars)) // todo 4844 add version either in block or in sidecars
h.blockFetcher.Enqueue(peer.ID(), block)

// Assuming the block is importable by the peer, but possibly not yet done so,
// calculate the head hash and TD that the peer truly must have.
Expand Down
164 changes: 163 additions & 1 deletion eth/protocols/eth/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package eth

import (
rand2 "crypto/rand"
"io"
"math"
"math/big"
"math/rand"
Expand All @@ -33,10 +35,14 @@ import (
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/crypto/kzg4844"
"github.com/ethereum/go-ethereum/eth/protocols/bsc"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rlp"
"github.com/holiman/uint256"
)

var (
Expand Down Expand Up @@ -146,7 +152,7 @@ func (b *testBackend) AcceptTxs() bool {
panic("data processing tests should be done in the handler package")
}
func (b *testBackend) Handle(*Peer, Packet) error {
panic("data processing tests should be done in the handler package")
return nil
}

// Tests that block headers can be retrieved from a remote chain based on user queries.
Expand Down Expand Up @@ -502,3 +508,159 @@ func testGetBlockReceipts(t *testing.T, protocol uint) {
t.Errorf("receipts mismatch: %v", err)
}
}

func TestHandleNewBlock(t *testing.T) {
t.Parallel()

gen := func(n int, g *core.BlockGen) {
if n%2 == 0 {
w := &types.Withdrawal{
Address: common.Address{0xaa},
Amount: 42,
}
g.AddWithdrawal(w)
}
}

backend := newTestBackendWithGenerator(maxBodiesServe+15, true, gen)
defer backend.close()

peer, _ := newTestPeer("peer", ETH68, backend)
defer peer.close()

v := new(uint32)
*v = 1
genBlobs := makeBlkBlobs(1, 2)
tx1 := types.NewTx(&types.BlobTx{
ChainID: new(uint256.Int).SetUint64(1),
GasTipCap: new(uint256.Int),
GasFeeCap: new(uint256.Int),
Gas: 0,
Value: new(uint256.Int),
Data: nil,
BlobFeeCap: new(uint256.Int),
BlobHashes: []common.Hash{common.HexToHash("0x34ec6e64f9cda8fe0451a391e4798085a3ef51a65ed1bfb016e34fc1a2028f8f"), common.HexToHash("0xb9a412e875f29fac436acde234f954e91173c4cf79814f6dcf630d8a6345747f")},
Sidecar: genBlobs[0],
V: new(uint256.Int),
R: new(uint256.Int),
S: new(uint256.Int),
})
sidecars := types.BlobTxSidecars{tx1.BlobTxSidecar()}
block := types.NewBlockWithHeader(&types.Header{
Number: big.NewInt(0),
Extra: []byte("test block"),
UncleHash: types.EmptyUncleHash,
TxHash: types.EmptyTxsHash,
ReceiptHash: types.EmptyReceiptsHash,
})
dataNil := NewBlockPacket{
Block: block,
TD: big.NewInt(1),
Version: nil,
Sidecars: nil,
}
dataNonNil := NewBlockPacket{
Block: block,
TD: big.NewInt(1),
Version: v,
Sidecars: sidecars,
}
sizeNonNil, rNonNil, _ := rlp.EncodeToReader(dataNonNil)
sizeNil, rNil, _ := rlp.EncodeToReader(dataNil)

// Define the test cases
testCases := []struct {
name string
msg p2p.Msg
err error
}{
{
name: "Valid block",
msg: p2p.Msg{
Code: 1,
Size: uint32(sizeNonNil),
Payload: rNonNil,
},
err: nil,
},
{
name: "Nil sidecars",
msg: p2p.Msg{
Code: 2,
Size: uint32(sizeNil),
Payload: rNil,
},
err: nil,
},
}

protos := []p2p.Protocol{
{
Name: "eth",
Version: ETH67,
},
{
Name: "eth",
Version: ETH68,
},
{
Name: "bsc",
Version: bsc.Bsc1,
},
}
caps := []p2p.Cap{
{
Name: "eth",
Version: ETH67,
},
{
Name: "eth",
Version: ETH68,
},
{
Name: "bsc",
Version: bsc.Bsc1,
},
}
// Create a source handler to send messages through and a sink peer to receive them
p2pEthSrc, p2pEthSink := p2p.MsgPipe()
defer p2pEthSrc.Close()
defer p2pEthSink.Close()

localEth := NewPeer(ETH68, p2p.NewPeerWithProtocols(enode.ID{1}, protos, "", caps), p2pEthSrc, nil)

// Run the tests
for _, tc := range testCases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
err := handleNewBlock(backend, tc.msg, localEth)
if err != tc.err {
t.Errorf("expected error %v, got %v", tc.err, err)
}
})
}

Check failure on line 642 in eth/protocols/eth/handler_test.go

View workflow job for this annotation

GitHub Actions / golang-lint (1.21.x, ubuntu-latest)

unnecessary trailing newline (whitespace)
}

func makeBlkBlobs(n, nPerTx int) types.BlobTxSidecars {
if n <= 0 {
return nil
}
ret := make(types.BlobTxSidecars, n)
for i := 0; i < n; i++ {
blobs := make([]kzg4844.Blob, nPerTx)
commitments := make([]kzg4844.Commitment, nPerTx)
proofs := make([]kzg4844.Proof, nPerTx)
for i := 0; i < nPerTx; i++ {
io.ReadFull(rand2.Reader, blobs[i][:])
commitments[i], _ = kzg4844.BlobToCommitment(blobs[i])
proofs[i], _ = kzg4844.ComputeBlobProof(blobs[i], commitments[i])
}
ret[i] = &types.BlobTxSidecar{
Blobs: blobs,
Commitments: commitments,
Proofs: proofs,
}
}
return ret
}
52 changes: 13 additions & 39 deletions eth/protocols/eth/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,59 +290,33 @@ func handleNewBlockhashes(backend Backend, msg Decoder, peer *Peer) error {
return backend.Handle(peer, ann)
}

type BlockPacket interface {
GetBlock() *types.Block
SanityCheck() error
Name() string
Kind() byte
}

// Implement the GetBlock method for both packet types
func (p *NewBlockPacket) GetBlock() *types.Block {
return p.Block
}

func (p *NewBlockWithBlobPacket) GetBlock() *types.Block {
return p.Block
}

func handleNewBlock(backend Backend, msg Decoder, peer *Peer) error {
var packet BlockPacket

// First, try to decode as NewBlockPacket
nbp := new(NewBlockPacket)
if err := msg.Decode(nbp); err == nil {
packet = nbp
} else {
// If that fails, try to decode as NewBlockWithBlobPacket
nbbp := new(NewBlockWithBlobPacket)
if err := msg.Decode(nbbp); err != nil {
return fmt.Errorf("%w: message %v: %v", errDecode, msg, err)
}
packet = nbbp
// Retrieve and decode the propagated block
ann := new(NewBlockPacket)
if err := msg.Decode(ann); err != nil {
return fmt.Errorf("%w: message %v: %v", errDecode, msg, err)
}

// Now that we have our packet, perform operations using the interface methods
if err := packet.SanityCheck(); err != nil {
if err := ann.sanityCheck(); err != nil {
return err
}

block := packet.GetBlock()
if hash := types.CalcUncleHash(block.Uncles()); hash != block.UncleHash() {
log.Warn("Propagated block has invalid uncles", "have", hash, "exp", block.UncleHash())
if hash := types.CalcUncleHash(ann.Block.Uncles()); hash != ann.Block.UncleHash() {
log.Warn("Propagated block has invalid uncles", "have", hash, "exp", ann.Block.UncleHash())
return nil // TODO(karalabe): return error eventually, but wait a few releases
}
if hash := types.DeriveSha(block.Transactions(), trie.NewStackTrie(nil)); hash != block.TxHash() {
log.Warn("Propagated block has invalid body", "have", hash, "exp", block.TxHash())
if hash := types.DeriveSha(ann.Block.Transactions(), trie.NewStackTrie(nil)); hash != ann.Block.TxHash() {
log.Warn("Propagated block has invalid body", "have", hash, "exp", ann.Block.TxHash())
return nil // TODO(karalabe): return error eventually, but wait a few releases
}
block.ReceivedAt = msg.Time()
block.ReceivedFrom = peer
ann.Block.ReceivedAt = msg.Time()
ann.Block.ReceivedFrom = peer

// Mark the peer as owning the block
peer.markBlock(block.Hash())
peer.markBlock(ann.Block.Hash())

return backend.Handle(peer, packet)
return backend.Handle(peer, ann)
}

func handleBlockHeaders(backend Backend, msg Decoder, peer *Peer) error {
Expand Down
4 changes: 2 additions & 2 deletions eth/protocols/eth/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,10 +333,10 @@ func (p *Peer) SendNewBlock(block *types.Block, td *big.Int) error {
func (p *Peer) SendNewBlockAndBlob(block *types.Block, td *big.Int, version uint32, sidecars types.BlobTxSidecars) error {
// Mark all the block hash as known, but ensure we don't overflow our limits
p.knownBlockAndBlobs.Add(block.Hash()) // todo 4844 check if adding only the block hash is okay
return p2p.Send(p.rw, NewBlockMsg, &NewBlockWithBlobPacket{
return p2p.Send(p.rw, NewBlockMsg, &NewBlockPacket{
Block: block,
TD: td,
Version: version,
Version: &version,
Sidecars: sidecars,
})
}
Expand Down
Loading

0 comments on commit 851ae4a

Please sign in to comment.