diff --git a/.gitignore b/.gitignore index c980f0cad0..ac89c15d14 100644 --- a/.gitignore +++ b/.gitignore @@ -53,4 +53,7 @@ cmd/geth/__debug_bin cmd/bootnode/bootnode graphql/__debug_bin -tempdatadir \ No newline at end of file +tempdatadir +tempdatadir2 +config.toml +config2.toml diff --git a/cmd/devp2p/internal/ethtest/types.go b/cmd/devp2p/internal/ethtest/types.go index 09bb218d51..7df258da2e 100644 --- a/cmd/devp2p/internal/ethtest/types.go +++ b/cmd/devp2p/internal/ethtest/types.go @@ -112,7 +112,7 @@ type NewBlock eth.NewBlockPacket func (nb NewBlock) Code() int { return 23 } // NewPooledTransactionHashes is the network packet for the tx hash propagation message. -type NewPooledTransactionHashes eth.NewPooledTransactionHashesPacket +type NewPooledTransactionHashes eth.NewPooledTransactionHashesPacket66 func (nb NewPooledTransactionHashes) Code() int { return 24 } diff --git a/consensus/parlia/parlia.go b/consensus/parlia/parlia.go index 27c5ce5eb9..1b09226f51 100644 --- a/consensus/parlia/parlia.go +++ b/consensus/parlia/parlia.go @@ -210,10 +210,11 @@ func ParliaRLP(header *types.Header, chainId *big.Int) []byte { // Parlia is the consensus engine of BSC type Parlia struct { - chainConfig *params.ChainConfig // Chain config - config *params.ParliaConfig // Consensus engine configuration parameters for parlia consensus - genesisHash common.Hash - db ethdb.Database // Database to store and retrieve snapshot checkpoints + chainConfig *params.ChainConfig // Chain config + config *params.ParliaConfig // Consensus engine configuration parameters for parlia consensus + genesisHash common.Hash + db ethdb.Database // Database to store and retrieve snapshot checkpoints + blobDatabase *bloblevel.Storage recentSnaps *lru.ARCCache // Snapshots for recent block to speed up signatures *lru.ARCCache // Signatures of recent blocks to speed up mining @@ -240,6 +241,7 @@ type Parlia struct { func New( chainConfig *params.ChainConfig, db ethdb.Database, + blobDB *bloblevel.Storage, ethAPI *ethapi.PublicBlockChainAPI, genesisHash common.Hash, ) *Parlia { @@ -277,6 +279,7 @@ func New( config: parliaConfig, genesisHash: genesisHash, db: db, + blobDatabase: blobDB, ethAPI: ethAPI, recentSnaps: recentSnaps, signatures: signatures, @@ -1427,7 +1430,7 @@ func (p *Parlia) Seal(chain consensus.ChainHeaderReader, block *types.BlockAndSi } // todo sidecar needs to be signed somewhere - // todo save sidecars here? Since below there is code which will save the block anyway. + // todo 4844 save sidecars here? Since below there is code which will save the block anyway. if len(sidecars) > 0 { cfg := params.ChainConfig{ DataBlobs: ¶ms.DataBlobsConfig{ @@ -1435,21 +1438,12 @@ func (p *Parlia) Seal(chain consensus.ChainHeaderReader, block *types.BlockAndSi MinEpochsForBlobsSidecarsRequest: 1, }, } - err = blobStorage.SaveBlobSidecar(context.Background(), &cfg, sidecars) + err = p.blobDatabase.SaveBlobSidecar(context.Background(), &cfg, sidecars) if err != nil { log.Error("Sidecars could not be saved!", "err", err) } - //// Original byte array - //originalBytes := []byte{23, 195, 163, 130, 130, 113, 153, 124, 5, 232, 93, 202, 189, 136, 171, 137, 161, 47, 86, 125, 255, 243, 39, 238, 158, 161, 43, 251, 252, 66, 208, 117} - // - //// Create a new byte array of size 32 - //var byteArray32 [32]byte - // - //// Copy the original byte array into the new byte array - //copy(byteArray32[:], originalBytes) - - got, err1 := rawdb.GetBlobSidecarsByRoot(context.Background(), blobStorage, bytesutil.ToBytes32(sidecars[0].BlockRoot)) + got, err1 := rawdb.GetBlobSidecarsByRoot(context.Background(), p.blobDatabase, bytesutil.ToBytes32(sidecars[0].BlockRoot)) if err1 != nil { fmt.Println("Error while fetching sidecar", err1) } else { diff --git a/core/blockchain.go b/core/blockchain.go index 72f48cacaf..95c01f56d4 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -20,6 +20,7 @@ package core import ( "errors" "fmt" + "github.com/ethereum/go-ethereum/core/rawdb/bloblevel" "io" "math/big" "sort" @@ -189,11 +190,12 @@ type BlockChain struct { chainConfig *params.ChainConfig // Chain & network configuration cacheConfig *CacheConfig // Cache configuration for pruning - db ethdb.Database // Low level persistent database to store final content in - snaps *snapshot.Tree // Snapshot tree for fast trie leaf access - triegc *prque.Prque // Priority queue mapping block numbers to tries to gc - gcproc time.Duration // Accumulates canonical block processing for trie dumping - commitLock sync.Mutex // CommitLock is used to protect above field from being modified concurrently + db ethdb.Database // Low level persistent database to store final content in + blobDatabase bloblevel.Storage + snaps *snapshot.Tree // Snapshot tree for fast trie leaf access + triegc *prque.Prque // Priority queue mapping block numbers to tries to gc + gcproc time.Duration // Accumulates canonical block processing for trie dumping + commitLock sync.Mutex // CommitLock is used to protect above field from being modified concurrently // txLookupLimit is the maximum number of blocks from head whose tx indices // are reserved: diff --git a/core/events.go b/core/events.go index ce8bcca744..ab3556279d 100644 --- a/core/events.go +++ b/core/events.go @@ -30,6 +30,9 @@ type ReannoTxsEvent struct{ Txs []*types.Transaction } // NewMinedBlockEvent is posted when a block has been imported. type NewMinedBlockEvent struct{ Block *types.Block } +// NewMinedSidecarEvent is posted when a sidecar has been imported. +type NewMinedSidecarEvent struct{ Sidecar *types.Sidecar } + // RemovedLogsEvent is posted when a reorg happens type RemovedLogsEvent struct{ Logs []*types.Log } diff --git a/core/genesis.go b/core/genesis.go index e955ec9c70..ce7ec93a14 100644 --- a/core/genesis.go +++ b/core/genesis.go @@ -368,6 +368,9 @@ func (g *Genesis) ToBlock(db ethdb.Database) *types.Block { } if g.Config.IsCancun(g.Timestamp) { head.SetExcessDataGas(g.ExcessDataGas) + if head.ExcessDataGas == nil { + head.ExcessDataGas = big.NewInt(0) + } } } diff --git a/core/rawdb/freezer.go b/core/rawdb/freezer.go index 941a58a862..e8cf041785 100644 --- a/core/rawdb/freezer.go +++ b/core/rawdb/freezer.go @@ -427,7 +427,7 @@ func (f *freezer) freeze(db ethdb.KeyValueStore) { continue case *number < threshold: - log.Debug("Current full block not old enough", "number", *number, "hash", hash, "delay", threshold) + log.Debug("fCurrent full block not old enough", "number", *number, "hash", hash, "delay", threshold) backoff = true continue diff --git a/core/remote_state_verifier.go b/core/remote_state_verifier.go index bf43f6de81..79994e6d2a 100644 --- a/core/remote_state_verifier.go +++ b/core/remote_state_verifier.go @@ -106,6 +106,7 @@ func (vm *remoteVerifyManager) mainLoop() { pruneTicker := time.NewTicker(pruneInterval) defer pruneTicker.Stop() for { + fmt.Println("for loop of remoteVerifyManager mainLoop") select { case h := <-vm.chainBlockCh: vm.NewBlockVerifyTask(h.Block.Header()) @@ -143,6 +144,7 @@ func (vm *remoteVerifyManager) mainLoop() { vm.taskLock.RUnlock() return case <-vm.chainHeadSub.Err(): + fmt.Println("Chain head subscription error!!!!") return } } diff --git a/core/types/block.go b/core/types/block.go index fc3778d86c..bdb64884eb 100644 --- a/core/types/block.go +++ b/core/types/block.go @@ -397,9 +397,9 @@ func (b *Block) DecodeRLP(s *rlp.Stream) error { if err := s.Decode(&eb); err != nil { return err } - for i, tx := range *eb.Txs { + for _, tx := range *eb.Txs { if tx.wrapData != nil { // todo 4844 we may need to get rid of this, actually NOT - return fmt.Errorf("transactions in blocks must not contain wrap-data, tx %d is bad", i) + //return fmt.Errorf("transactions in blocks must not contain wrap-data, tx %d is bad", i) } } b.header, b.uncles, b.transactions = eb.Header, eb.Uncles, []*Transaction(*eb.Txs) @@ -409,12 +409,12 @@ func (b *Block) DecodeRLP(s *rlp.Stream) error { // EncodeRLP serializes b into the Ethereum RLP block format. func (b *Block) EncodeRLP(w io.Writer) error { - if b.header.ExcessDataGas != nil { - // This situation should not arise, but if it does (due to a bug) you'd silently produce an - // encoding that would fail to decode. ref: - // https://github.com/ethereum/go-ethereum/pull/26077 - return errors.New("nil WithdrawalsHash in header with non-nil ExcessDataGas") - } + //if b.header.ExcessDataGas != nil { + // // This situation should not arise, but if it does (due to a bug) you'd silently produce an + // // encoding that would fail to decode. ref: + // // https://github.com/ethereum/go-ethereum/pull/26077 + // return errors.New("nil WithdrawalsHash in header with non-nil ExcessDataGas") + //} return rlp.Encode(w, extblock{ Header: b.header, Txs: (*extBlockTxs)(&b.transactions), diff --git a/core/types/data_blob.go b/core/types/data_blob.go index 435e827a2d..5fb9b47c07 100644 --- a/core/types/data_blob.go +++ b/core/types/data_blob.go @@ -1,12 +1,14 @@ package types import ( + "encoding/binary" "encoding/hex" "errors" "fmt" "github.com/ethereum/go-ethereum/rlp" "github.com/prysmaticlabs/prysm/v4/consensus-types/primitives" "io" + "time" gokzg4844 "github.com/crate-crypto/go-kzg-4844" "github.com/ethereum/go-ethereum/common" @@ -427,7 +429,8 @@ func decodeTyped(b []byte) (BlobTxWrapper, error) { return blobTxWrapper, nil } -// todo this Sidecar needs to be saved separately from block so that it can be pruned time to time to take advantage of 4844 +// todo 4844 this Sidecar needs to be saved separately from block so that it can be pruned time to time to take advantage of 4844 +// todo 4844 Sidecar needs to implement EncodeRLP so that it can be broadcasted easily type Sidecar struct { BlockRoot []byte `json:"block_root"` //[]byte Index uint64 `json:"index"` @@ -437,6 +440,49 @@ type Sidecar struct { Blob Blob `json:"blob"` KZGCommitment KZGCommitment `json:"kzg_commitment"` KZGProof KZGProof `json:"kzg_proof"` + + // These fields are used by package eth to track + // inter-peer block relay. + ReceivedAt time.Time + ReceivedFrom interface{} +} + +func (s *Sidecar) SidecarToHash() common.Hash { + //hash := common.Hash{} + //copy(hash[:], s.BlockRoot[:]) + //binary.BigEndian.PutUint64(hash[len(s.BlockRoot):], s.Index) + //return hash + + // Convert the index to a byte slice. + indexBytes := make([]byte, 8) + binary.BigEndian.PutUint64(indexBytes, s.Index) + + // Concatenate BlockRoot and indexBytes. + combinedBytes := append(s.BlockRoot[:], indexBytes...) + + // Calculate the hash of the combined bytes. + hash := common.BytesToHash(combinedBytes) + + return hash +} + +// todo 4844 write test for it to know it actually works +func (s *Sidecar) HashToSidecarIdentifier(hash common.Hash) SidecarIdentifier { + var blockRoot []byte = make([]byte, len(hash)) + copy(blockRoot, hash[:]) + + indexBytes := hash[len(blockRoot):] + index := binary.BigEndian.Uint64(indexBytes) + + return SidecarIdentifier{ + Index: index, + BlockRoot: blockRoot, + } +} + +type SidecarIdentifier struct { + BlockRoot []byte `json:"block_root"` + Index uint64 `json:"index"` } type SignedSidecar struct { @@ -448,3 +494,44 @@ type BlockAndSidecars struct { Block *Block Sidecar []*Sidecar } + +func (s *Sidecar) EncodeRLP(w io.Writer) error { + return rlp.Encode(w, []interface{}{ + s.BlockRoot, + s.Index, + s.Slot, + s.BlockParentRoot, + s.ProposerIndex, + s.Blob, + s.KZGCommitment, + s.KZGProof, + s.ReceivedAt, + s.ReceivedFrom, + }) +} + +func (s *Sidecar) DecodeRLP(stream *rlp.Stream) error { + return stream.Decode(&struct { + BlockRoot *[]byte + Index *uint64 + Slot *primitives.Slot + BlockParentRoot *[]byte + ProposerIndex *uint64 + Blob *Blob + KZGCommitment *KZGCommitment + KZGProof *KZGProof + ReceivedAt *time.Time + ReceivedFrom *interface{} + }{ + &s.BlockRoot, + &s.Index, + &s.Slot, + &s.BlockParentRoot, + &s.ProposerIndex, + &s.Blob, + &s.KZGCommitment, + &s.KZGProof, + &s.ReceivedAt, + &s.ReceivedFrom, + }) +} diff --git a/core/types/data_blob_test.go b/core/types/data_blob_test.go new file mode 100644 index 0000000000..ad74728364 --- /dev/null +++ b/core/types/data_blob_test.go @@ -0,0 +1,71 @@ +package types + +import ( + "bytes" + "testing" + "time" + + "github.com/ethereum/go-ethereum/rlp" +) + +func TestSidecarEncodeDecodeRLP(t *testing.T) { + + testCases := []*Sidecar{ + { + BlockRoot: []byte{0x01}, + Index: 1, + Slot: 2, + BlockParentRoot: []byte{0x03}, + ProposerIndex: 3, + Blob: Blob{0x04}, + KZGCommitment: KZGCommitment{0x05}, + KZGProof: KZGProof{0x06}, + ReceivedAt: time.Now(), + ReceivedFrom: "test1", + }, + { + BlockRoot: []byte{0x10, 0x11}, + Index: 123, + Slot: 456, + BlockParentRoot: []byte{0x12, 0x13}, + ProposerIndex: 789, + Blob: Blob{0x14, 0x15}, + KZGCommitment: KZGCommitment{0x16, 0x17}, + KZGProof: KZGProof{0x18, 0x19}, + ReceivedAt: time.Now().Add(-1 * time.Hour), + ReceivedFrom: "test2", + }, + } + + for _, tc := range testCases { + // Encode to RLP + var buf bytes.Buffer + if err := tc.EncodeRLP(&buf); err != nil { + t.Fatal(err) + } + + // Decode back to new struct + sc := &Sidecar{} + if err := sc.DecodeRLP(rlp.NewStream(&buf, 0)); err != nil { + t.Fatal(err) + } + + // Check all fields match + if !bytes.Equal(tc.BlockRoot, sc.BlockRoot) { + t.Errorf("BlockRoot mismatch: %v %v", tc.BlockRoot, sc.BlockRoot) + } + if tc.Index != sc.Index { + t.Errorf("Index mismatch: %v %v", tc.Index, sc.Index) + } + if !bytes.Equal(tc.BlockParentRoot, sc.BlockParentRoot) { + t.Errorf("BlockParentRoot mismatch: %v %v", tc.BlockParentRoot, sc.BlockParentRoot) + } + //...check other fields + + //// Roundtrip test + //if !bytes.Equal(tc.Encode(), sc.Encode()) { + // t.Errorf("Encoding roundtrip failed") + //} + } + +} diff --git a/eth/backend.go b/eth/backend.go index e4b0e0ae98..b025f4d9b0 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -20,6 +20,7 @@ package eth import ( "errors" "fmt" + "github.com/ethereum/go-ethereum/core/rawdb/bloblevel" "math/big" "runtime" "sync" @@ -143,6 +144,10 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { ethashConfig := config.Ethash ethashConfig.NotifyFull = config.Miner.NotifyFull + // Initialise blob database + // todo 4844 do it using a function that is proper and not recreating for each restart + blobDatabase := bloblevel.NewStorage(rawdb.NewMemoryDatabase()) + // Assemble the Ethereum object chainDb, err := stack.OpenAndMergeDatabase("chaindata", config.DatabaseCache, config.DatabaseHandles, config.DatabaseFreezer, config.DatabaseDiff, "eth/db/chaindata/", false, config.PersistDiff, config.PruneAncientData) @@ -180,7 +185,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { log.Info("Unprotected transactions allowed") } ethAPI := ethapi.NewPublicBlockChainAPI(eth.APIBackend) - eth.engine = ethconfig.CreateConsensusEngine(stack, chainConfig, ðashConfig, config.Miner.Notify, config.Miner.Noverify, chainDb, ethAPI, genesisHash) + eth.engine = ethconfig.CreateConsensusEngine(stack, chainConfig, ðashConfig, config.Miner.Notify, config.Miner.Noverify, chainDb, blobDatabase, ethAPI, genesisHash) bcVersion := rawdb.ReadDatabaseVersion(chainDb) var dbVer = "" @@ -258,6 +263,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { if eth.handler, err = newHandler(&handlerConfig{ Database: chainDb, + BlobDatabase: blobDatabase, Chain: eth.blockchain, TxPool: eth.txPool, Merger: merger, @@ -630,6 +636,7 @@ func (s *Ethereum) Protocols() []p2p.Protocol { if !s.config.DisableBscProtocol { protos = append(protos, bsc.MakeProtocols((*bscHandler)(s.handler), s.bscDialCandidates)...) } + fmt.Println("protocols total: ", protos) return protos } diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 70dc7e15b9..cc6f2320fd 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -97,7 +97,7 @@ type headerTask struct { hashes []common.Hash } -type Downloader struct { +type Downloader struct { // todo 4844 possible a new Downloader for sidecars! mode uint32 // Synchronisation mode defining the strategy used (per sync cycle), use d.getMode() to get the SyncMode mux *event.TypeMux // Event multiplexer to announce sync operation events @@ -1216,7 +1216,7 @@ func (d *Downloader) fillHeaderSkeleton(from uint64, skeleton []*types.Header) ( // fetchBodies iteratively downloads the scheduled block bodies, taking any // available peers, reserving a chunk of blocks for each, waiting for delivery // and also periodically checking for timeouts. -func (d *Downloader) fetchBodies(from uint64) error { +func (d *Downloader) fetchBodies(from uint64) error { // todo 4844 maybe same way fetchSidecars() ? log.Debug("Downloading block bodies", "origin", from) err := d.concurrentFetch((*bodyQueue)(d)) diff --git a/eth/ethconfig/config.go b/eth/ethconfig/config.go index 29bb597410..c11610b5f9 100644 --- a/eth/ethconfig/config.go +++ b/eth/ethconfig/config.go @@ -18,6 +18,7 @@ package ethconfig import ( + "github.com/ethereum/go-ethereum/core/rawdb/bloblevel" "math/big" "os" "os/user" @@ -247,9 +248,9 @@ type Config struct { } // CreateConsensusEngine creates a consensus engine for the given chain configuration. -func CreateConsensusEngine(stack *node.Node, chainConfig *params.ChainConfig, config *ethash.Config, notify []string, noverify bool, db ethdb.Database, ee *ethapi.PublicBlockChainAPI, genesisHash common.Hash) consensus.Engine { +func CreateConsensusEngine(stack *node.Node, chainConfig *params.ChainConfig, config *ethash.Config, notify []string, noverify bool, db ethdb.Database, blobDB *bloblevel.Storage, ee *ethapi.PublicBlockChainAPI, genesisHash common.Hash) consensus.Engine { if chainConfig.Parlia != nil { - return parlia.New(chainConfig, db, ee, genesisHash) + return parlia.New(chainConfig, db, blobDB, ee, genesisHash) } // If proof-of-authority is requested, set it up var engine consensus.Engine diff --git a/eth/fetcher/block_fetcher.go b/eth/fetcher/block_fetcher.go index 7953e5e695..649cb01005 100644 --- a/eth/fetcher/block_fetcher.go +++ b/eth/fetcher/block_fetcher.go @@ -19,6 +19,7 @@ package fetcher import ( "errors" + "fmt" "math/rand" "time" @@ -353,6 +354,7 @@ func (f *BlockFetcher) loop() { defer completeTimer.Stop() for { + fmt.Println("for loop in func (f *BlockFetcher) loop()") // Clean up any expired block fetches for hash, announce := range f.fetching { if time.Since(announce.time) > fetchTimeout { @@ -363,6 +365,7 @@ func (f *BlockFetcher) loop() { height := f.chainHeight() for !f.queue.Empty() { op := f.queue.PopItem().(*blockOrHeaderInject) + fmt.Println("func (f *BlockFetcher) loop(): ", op.block.Number().String()) hash := op.hash() if f.queueChangeHook != nil { f.queueChangeHook(hash, false) @@ -394,6 +397,7 @@ func (f *BlockFetcher) loop() { return case notification := <-f.notify: + fmt.Println("notification := <-f.notify in fetcher loop") // A block was announced, make sure the peer isn't DOSing us blockAnnounceInMeter.Mark(1) @@ -681,6 +685,7 @@ func (f *BlockFetcher) loop() { case filter := <-f.bodyFilter: // Block bodies arrived, extract any explicitly requested blocks, return the rest + fmt.Println("Block body arrived...") var task *bodyFilterTask select { case task = <-filter: diff --git a/eth/fetcher/sidecar_fetcher.go b/eth/fetcher/sidecar_fetcher.go new file mode 100644 index 0000000000..b306ea4989 --- /dev/null +++ b/eth/fetcher/sidecar_fetcher.go @@ -0,0 +1 @@ +package fetcher diff --git a/eth/handler.go b/eth/handler.go index 6cdd4fbbb6..fafc7eb465 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -18,6 +18,8 @@ package eth import ( "errors" + "fmt" + "github.com/ethereum/go-ethereum/core/rawdb/bloblevel" "math" "math/big" "strings" @@ -105,7 +107,8 @@ type votePool interface { // handlerConfig is the collection of initialization parameters to create a full // node network handler. type handlerConfig struct { - Database ethdb.Database // Database for direct sync insertions + Database ethdb.Database // Database for direct sync insertions + BlobDatabase *bloblevel.Storage Chain *core.BlockChain // Blockchain to serve data from TxPool txPool // Transaction pool to propagate from VotePool votePool @@ -136,6 +139,7 @@ type handler struct { checkpointHash common.Hash // Block hash for the sync progress validator to cross reference database ethdb.Database + blobDatabase *bloblevel.Storage txpool txPool votepool votePool maliciousVoteMonitor *monitor.MaliciousVoteMonitor @@ -145,21 +149,24 @@ type handler struct { peersPerIP map[string]int peerPerIPLock sync.Mutex - downloader *downloader.Downloader - blockFetcher *fetcher.BlockFetcher - txFetcher *fetcher.TxFetcher - peers *peerSet - merger *consensus.Merger - - eventMux *event.TypeMux - txsCh chan core.NewTxsEvent - txsSub event.Subscription - reannoTxsCh chan core.ReannoTxsEvent - reannoTxsSub event.Subscription - minedBlockSub *event.TypeMuxSubscription - voteCh chan core.NewVoteEvent - votesSub event.Subscription - voteMonitorSub event.Subscription + downloader *downloader.Downloader // todo 4844 same for sidecars + blockFetcher *fetcher.BlockFetcher // todo 4844 do the same for blobs + blobFetcher *fetcher.BlockFetcher + + txFetcher *fetcher.TxFetcher + peers *peerSet + merger *consensus.Merger + + eventMux *event.TypeMux + txsCh chan core.NewTxsEvent + txsSub event.Subscription + reannoTxsCh chan core.ReannoTxsEvent + reannoTxsSub event.Subscription + minedBlockSub *event.TypeMuxSubscription + minedSidecarSub *event.TypeMuxSubscription + voteCh chan core.NewVoteEvent + votesSub event.Subscription + voteMonitorSub event.Subscription whitelist map[uint64]common.Hash @@ -186,6 +193,7 @@ func newHandler(config *handlerConfig) (*handler, error) { disablePeerTxBroadcast: config.DisablePeerTxBroadcast, eventMux: config.EventMux, database: config.Database, + blobDatabase: config.BlobDatabase, txpool: config.TxPool, votepool: config.VotePool, chain: config.Chain, @@ -341,28 +349,34 @@ func newHandler(config *handlerConfig) (*handler, error) { func (h *handler) runEthPeer(peer *eth.Peer, handler eth.Handler) error { // If the peer has a `snap` extension, wait for it to connect so we can have // a uniform initialization/teardown mechanism + //fmt.Println("IInside runEthPeer ...") snap, err := h.peers.waitSnapExtension(peer) if err != nil { + fmt.Println("runEthPeer error!!!!") peer.Log().Error("Snapshot extension barrier failed", "err", err) return err } diff, err := h.peers.waitDiffExtension(peer) if err != nil { + fmt.Println("runEthPeer error!!!!") peer.Log().Error("Diff extension barrier failed", "err", err) return err } trust, err := h.peers.waitTrustExtension(peer) if err != nil { + fmt.Println("runEthPeer error!!!!") peer.Log().Error("Trust extension barrier failed", "err", err) return err } bsc, err := h.peers.waitBscExtension(peer) if err != nil { + fmt.Println("runEthPeer error!!!!") peer.Log().Error("Bsc extension barrier failed", "err", err) return err } // TODO(karalabe): Not sure why this is needed if !h.chainSync.handlePeerEvent(peer) { + fmt.Println("runEthPeer error!!!!") return p2p.DiscQuitting } h.peerWG.Add(1) @@ -376,8 +390,11 @@ func (h *handler) runEthPeer(peer *eth.Peer, handler eth.Handler) error { number = head.Number.Uint64() td = h.chain.GetTd(hash, number) ) + // todo 4844 check that this forkID logic is fine for Cancun or CancunTime!!! As we moved from block based fork to time based fork! forkID := forkid.NewID(h.chain.Config(), h.chain.Genesis().Hash(), h.chain.CurrentHeader().Number.Uint64()) if err := peer.Handshake(h.networkID, td, hash, genesis.Hash(), forkID, h.forkFilter, ð.UpgradeStatusExtension{DisablePeerTxBroadcast: h.disablePeerTxBroadcast}); err != nil { + fmt.Println("Peer Handshake failed!!!") + fmt.Println("runEthPeer error!!!!") peer.Log().Debug("Ethereum handshake failed", "err", err) return err } @@ -396,6 +413,7 @@ func (h *handler) runEthPeer(peer *eth.Peer, handler eth.Handler) error { peerInfo := peer.Peer.Info() if !peerInfo.Network.Trusted { if reject || h.peers.len() >= h.maxPeers { + fmt.Println("runEthPeer error!!!!") return p2p.DiscTooManyPeers } } @@ -410,6 +428,7 @@ func (h *handler) runEthPeer(peer *eth.Peer, handler eth.Handler) error { h.peerPerIPLock.Lock() if num, ok := h.peersPerIP[remoteIP]; ok && num >= h.maxPeersPerIP { h.peerPerIPLock.Unlock() + fmt.Println("runEthPeer error!!!!") peer.Log().Info("The IP has too many peers", "ip", remoteIP, "maxPeersPerIP", h.maxPeersPerIP, "name", peerInfo.Name, "Enode", peerInfo.Enode) return p2p.DiscTooManyPeers @@ -421,6 +440,7 @@ func (h *handler) runEthPeer(peer *eth.Peer, handler eth.Handler) error { // Register the peer locally if err := h.peers.registerPeer(peer, snap, diff, trust, bsc); err != nil { + //fmt.Println("runEthPeer error!!!!") peer.Log().Error("Ethereum peer registration failed", "err", err) return err } @@ -428,15 +448,18 @@ func (h *handler) runEthPeer(peer *eth.Peer, handler eth.Handler) error { p := h.peers.peer(peer.ID()) if p == nil { + //fmt.Println("runEthPeer error!!!!") return errors.New("peer dropped during handling") } // Register the peer in the downloader. If the downloader considers it banned, we disconnect if err := h.downloader.RegisterPeer(peer.ID(), peer.Version(), peer); err != nil { peer.Log().Error("Failed to register peer in eth syncer", "err", err) + //fmt.Println("runEthPeer error!!!!") return err } if snap != nil { if err := h.downloader.SnapSyncer.Register(snap); err != nil { + //fmt.Println("runEthPeer error!!!!") peer.Log().Error("Failed to register peer in snap syncer", "err", err) return err } @@ -461,6 +484,7 @@ func (h *handler) runEthPeer(peer *eth.Peer, handler eth.Handler) error { req, err := peer.RequestHeadersByNumber(h.checkpointNumber, 1, 0, false, resCh) if err != nil { + fmt.Println("runEthPeer error!!!!") return err } // Start a timer to disconnect if the peer doesn't reply in time @@ -512,6 +536,7 @@ func (h *handler) runEthPeer(peer *eth.Peer, handler eth.Handler) error { req, err := peer.RequestHeadersByNumber(number, 1, 0, false, resCh) if err != nil { + fmt.Println("runEthPeer error!!!!") return err } go func(number uint64, hash common.Hash, req *eth.Request) { @@ -707,6 +732,11 @@ func (h *handler) Start(maxPeers int, maxPeersPerIP int) { h.minedBlockSub = h.eventMux.Subscribe(core.NewMinedBlockEvent{}) go h.minedBroadcastLoop() + // broadcast mined sidecars + h.wg.Add(1) + h.minedSidecarSub = h.eventMux.Subscribe(core.NewMinedSidecarEvent{}) + go h.minedSidecarBroadcastLoop() + // start sync handlers h.wg.Add(1) go h.chainSync.loop() @@ -731,6 +761,7 @@ func (h *handler) Stop() { h.txsSub.Unsubscribe() // quits txBroadcastLoop h.reannoTxsSub.Unsubscribe() // quits txReannounceLoop h.minedBlockSub.Unsubscribe() // quits blockBroadcastLoop + h.minedSidecarSub.Unsubscribe() if h.votepool != nil { h.votesSub.Unsubscribe() // quits voteBroadcastLoop if h.maliciousVoteMonitor != nil { @@ -756,14 +787,17 @@ func (h *handler) Stop() { // BroadcastBlock will either propagate a block to a subset of its peers, or // will only announce its availability (depending what's requested). func (h *handler) BroadcastBlock(block *types.Block, propagate bool) { + //fmt.Println("Inside BroadcastBlock") // Disable the block propagation if the chain has already entered the PoS // stage. The block propagation is delegated to the consensus layer. if h.merger.PoSFinalized() { + //fmt.Println("POS finalized...") return } // Disable the block propagation if it's the post-merge block. if beacon, ok := h.chain.Engine().(*beacon.Beacon); ok { if beacon.IsPoSHeader(block.Header()) { + //fmt.Println("Disable the block propagation if it's the post-merge block") return } } @@ -772,6 +806,7 @@ func (h *handler) BroadcastBlock(block *types.Block, propagate bool) { // If propagation is requested, send to a subset of the peer if propagate { + //fmt.Println("Propagate") // Calculate the TD of the block (it's not imported yet, so block.Td is not valid) var td *big.Int if parent := h.chain.GetBlock(block.ParentHash(), block.NumberU64()-1); parent != nil { @@ -802,6 +837,7 @@ func (h *handler) BroadcastBlock(block *types.Block, propagate bool) { // Otherwise if the block is indeed in our own chain, announce it if h.chain.HasBlock(hash, block.NumberU64()) { for _, peer := range peers { + fmt.Println("AsyncSendNewBlockHash") peer.AsyncSendNewBlockHash(block) } log.Trace("Announced block", "hash", hash, "recipients", len(peers), "duration", common.PrettyDuration(time.Since(block.ReceivedAt))) @@ -809,6 +845,22 @@ func (h *handler) BroadcastBlock(block *types.Block, propagate bool) { } // TODO 4844 BroadcastBlob? BroadcastSidecar? +func (h *handler) BroadcastSidecar(sidecar *types.Sidecar, propagate bool) { + hash := sidecar.SidecarToHash() + peers := h.peers.peersWithoutSidecar(hash) + + fmt.Println("peers length in BroadcastSidecar: ", len(peers)) + + if propagate { + // todo 4844 add propagate logic + for _, peer := range peers { + peer.AsyncSendNewSidecar(sidecar, big.NewInt(1)) + } + } + fmt.Println(peers) + fmt.Println("Broadcasted sidecar") + +} // BroadcastTransactions will propagate a batch of transactions // - To a square root of all peers @@ -826,6 +878,7 @@ func (h *handler) BroadcastTransactions(txs types.Transactions) { ) // Broadcast transactions to a batch of peers not knowing about it + // todo 4844 blob tx isn't broadcasted, decision yet to be made for _, tx := range txs { peers := h.peers.peersWithoutTransaction(tx.Hash()) // Send the tx unconditionally to a subset of our peers @@ -903,17 +956,34 @@ func (h *handler) BroadcastVote(vote *types.VoteEnvelope) { } // minedBroadcastLoop sends mined blocks to connected peers. +// todo 4844 do similar for sidecar func (h *handler) minedBroadcastLoop() { defer h.wg.Done() for obj := range h.minedBlockSub.Chan() { if ev, ok := obj.Data.(core.NewMinedBlockEvent); ok { + //fmt.Println("h.BroadcastBlock: ", ev.Block.Number().String()) h.BroadcastBlock(ev.Block, true) // First propagate block to peers h.BroadcastBlock(ev.Block, false) // Only then announce to the rest } } } +// minedSidecarBroadcastLoop sends mined blocks to connected peers. +func (h *handler) minedSidecarBroadcastLoop() { + defer h.wg.Done() + + for obj := range h.minedSidecarSub.Chan() { + if ev, ok := obj.Data.(core.NewMinedSidecarEvent); ok { + fmt.Println("h.BroadcastSidecar: ", ev.Sidecar.SidecarToHash()) + h.BroadcastSidecar(ev.Sidecar, true) // First propagate block to peers + //h.BroadcastSidecar(ev.Sidecar, false) // Only then announce to the rest + } else { + fmt.Println("No sidecar in minedSidecarBroadcastLoop") + } + } +} + // txBroadcastLoop announces new transactions to connected peers. func (h *handler) txBroadcastLoop() { defer h.wg.Done() diff --git a/eth/handler_eth.go b/eth/handler_eth.go index f7e3fba880..ec1b78c530 100644 --- a/eth/handler_eth.go +++ b/eth/handler_eth.go @@ -68,14 +68,22 @@ func (h *ethHandler) Handle(peer *eth.Peer, packet eth.Packet) error { case *eth.NewBlockPacket: return h.handleBlockBroadcast(peer, packet.Block, packet.TD) - case *eth.NewPooledTransactionHashesPacket: + case *eth.NewSidecarPacket: + return h.handleSidecarBroadcast(peer, packet.Sidecar, packet.TD) + + case *eth.NewPooledTransactionHashesPacket66: return h.txFetcher.Notify(peer.ID(), *packet) + case *eth.NewPooledTransactionHashesPacket68: + return h.txFetcher.Notify(peer.ID(), packet.Hashes) + case *eth.TransactionsPacket: - return h.txFetcher.Enqueue(peer.ID(), *packet, false) + txs := packet.Unwrap() + return h.txFetcher.Enqueue(peer.ID(), txs, false) case *eth.PooledTransactionsPacket: - return h.txFetcher.Enqueue(peer.ID(), *packet, true) + txs := packet.Unwrap() + return h.txFetcher.Enqueue(peer.ID(), txs, true) default: return fmt.Errorf("unexpected eth packet type: %T", packet) @@ -88,6 +96,7 @@ func (h *ethHandler) handleBlockAnnounces(peer *eth.Peer, hashes []common.Hash, // Drop all incoming block announces from the p2p network if // the chain already entered the pos stage and disconnect the // remote peer. + //fmt.Println("Handling block announcement in handleBlockAnnounces") if h.merger.PoSFinalized() { // TODO (MariusVanDerWijden) drop non-updated peers after the merge return nil @@ -146,3 +155,13 @@ func (h *ethHandler) handleBlockBroadcast(peer *eth.Peer, block *types.Block, td } return nil } + +// handleSidecarBroadcast is invoked from a peer's message handler when it transmits a +// block broadcast for the local node to process. +func (h *ethHandler) handleSidecarBroadcast(peer *eth.Peer, block *types.Sidecar, td *big.Int) error { + panic("Implement me handleSidecarBroadcast") + + //h.blobFetcher. + + return nil +} diff --git a/eth/peerset.go b/eth/peerset.go index c40a12bfa5..4aed6a6c14 100644 --- a/eth/peerset.go +++ b/eth/peerset.go @@ -540,6 +540,21 @@ func (ps *peerSet) peersWithoutBlock(hash common.Hash) []*ethPeer { return list } +// peersWithoutSidecar retrieves a list of peers that do not have a given sidecar in +// their set of known hashes, so it might be propagated to them. +func (ps *peerSet) peersWithoutSidecar(hash common.Hash) []*ethPeer { + ps.lock.RLock() + defer ps.lock.RUnlock() + + list := make([]*ethPeer, 0, len(ps.peers)) + for _, p := range ps.peers { + if !p.KnownSidecar(hash) { + list = append(list, p) + } + } + return list +} + // peersWithoutTransaction retrieves a list of peers that do not have a given // transaction in their set of known hashes. func (ps *peerSet) peersWithoutTransaction(hash common.Hash) []*ethPeer { diff --git a/eth/protocols/bsc/handler.go b/eth/protocols/bsc/handler.go index e993f255f3..233e6f8f04 100644 --- a/eth/protocols/bsc/handler.go +++ b/eth/protocols/bsc/handler.go @@ -68,6 +68,7 @@ func MakeProtocols(backend Backend, dnsdisc enode.Iterator) []p2p.Protocol { DialCandidates: dnsdisc, } } + fmt.Println("protocols: ", protocols) return protocols } @@ -100,6 +101,7 @@ func handleMessage(backend Backend, peer *Peer) error { if err != nil { return err } + fmt.Println("msg after peer.rw.ReadMsg() in handleMessage ", msg) if msg.Size > maxMessageSize { return fmt.Errorf("%w: %v > %v", errMsgTooLarge, msg.Size, maxMessageSize) } diff --git a/eth/protocols/eth/broadcast.go b/eth/protocols/eth/broadcast.go index 5d7af05260..e018df7747 100644 --- a/eth/protocols/eth/broadcast.go +++ b/eth/protocols/eth/broadcast.go @@ -17,6 +17,7 @@ package eth import ( + "fmt" "math/big" "github.com/ethereum/go-ethereum/common" @@ -37,19 +38,31 @@ type blockPropagation struct { td *big.Int } +// sidecarPropagation is a sidecar propagation event, waiting for its turn in the +// broadcast queue. +type sidecarPropagation struct { + sidecar *types.Sidecar + td *big.Int +} + // broadcastBlocks is a write loop that multiplexes blocks and block accouncements // to the remote peer. The goal is to have an async writer that does not lock up // node internals and at the same time rate limits queued data. func (p *Peer) broadcastBlocks() { + fmt.Println("func (p *Peer) broadcastBlocks()") for { select { case prop := <-p.queuedBlocks: + fmt.Println("about to send new block ", prop.block.Number().String()) + // todo 4844 is this really happening for every new block? if err := p.SendNewBlock(prop.block, prop.td); err != nil { + fmt.Println("Error sending new block! ", err) return } p.Log().Trace("Propagated block", "number", prop.block.Number(), "hash", prop.block.Hash(), "td", prop.td) case block := <-p.queuedBlockAnns: + fmt.Println("about to send block hash ", block.Number().String()) if err := p.SendNewBlockHashes([]common.Hash{block.Hash()}, []uint64{block.NumberU64()}); err != nil { return } @@ -61,6 +74,20 @@ func (p *Peer) broadcastBlocks() { } } +func (p *Peer) broadcastSidecars() { + for { + select { + case prop := <-p.queuedSidecars: + fmt.Println("broadcastSidecars(), prop := <-p.queuedSidecars") + if err := p.SendNewSidecar(prop.sidecar, prop.td); err != nil { + fmt.Println("Error sending new sidecar! ", err) + return + } + fmt.Println("sent sidecar...") + } + } +} + // broadcastTransactions is a write loop that schedules transaction broadcasts // to the remote peer. The goal is to have an async writer that does not lock up // node internals and at the same time rate limits queued data. @@ -77,12 +104,12 @@ func (p *Peer) broadcastTransactions() { // Pile transaction until we reach our allowed network limit var ( hashesCount uint64 - txs []*types.Transaction + txs []*types.NetworkTransaction size common.StorageSize ) for i := 0; i < len(queue) && size < maxTxPacketSize; i++ { if tx := p.txpool.Get(queue[i]); tx != nil { - txs = append(txs, tx) + txs = append(txs, types.NewNetworkTransaction(tx)) size += common.StorageSize(tx.Size()) } hashesCount++ @@ -146,13 +173,17 @@ func (p *Peer) announceTransactions() { if done == nil && len(queue) > 0 { // Pile transaction hashes until we reach our allowed network limit var ( - count int - pending []common.Hash - size common.StorageSize + count int + pending []common.Hash + pendingTypes []byte + pendingSizes []uint32 + size common.StorageSize ) for count = 0; count < len(queue) && size < maxTxPacketSize; count++ { - if p.txpool.Get(queue[count]) != nil { + if tx := p.txpool.Get(queue[count]); tx != nil { pending = append(pending, queue[count]) + pendingTypes = append(pendingTypes, tx.Type()) + pendingSizes = append(pendingSizes, uint32(tx.Size())) size += common.HashLength } } @@ -163,9 +194,16 @@ func (p *Peer) announceTransactions() { if len(pending) > 0 { done = make(chan struct{}) gopool.Submit(func() { - if err := p.sendPooledTransactionHashes(pending); err != nil { - fail <- err - return + if p.version >= ETH68 { + if err := p.sendPooledTransactionHashes68(pending, pendingTypes, pendingSizes); err != nil { + fail <- err + return + } + } else { + if err := p.sendPooledTransactionHashes66(pending); err != nil { + fail <- err + return + } } close(done) //p.Log().Trace("Sent transaction announcements", "count", len(pending)) diff --git a/eth/protocols/eth/handler.go b/eth/protocols/eth/handler.go index 9eb949f139..1d66575259 100644 --- a/eth/protocols/eth/handler.go +++ b/eth/protocols/eth/handler.go @@ -104,11 +104,12 @@ func MakeProtocols(backend Backend, network uint64, dnsdisc enode.Iterator) []p2 Version: version, Length: protocolLengths[version], Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error { + fmt.Println("Inside Run() of protocols[i] in MakeProtocols()...", version) peer := NewPeer(version, p, rw, backend.TxPool()) defer peer.Close() return backend.RunPeer(peer, func(peer *Peer) error { - return Handle(backend, peer) + return Handle(backend, peer) // todo 4844 this probably isn't getting executed }) }, NodeInfo: func() interface{} { @@ -151,6 +152,8 @@ func nodeInfo(chain *core.BlockChain, network uint64) *NodeInfo { // connection is torn down. func Handle(backend Backend, peer *Peer) error { for { + // todo 4844 this isn't getting executed regularly which it should! This is happening during Cancun + //fmt.Println("func Handle(backend Backend, peer *Peer) error....") if err := handleMessage(backend, peer); err != nil { peer.Log().Debug("Message handling failed in `eth`", "err", err) return err @@ -168,7 +171,7 @@ var eth66 = map[uint64]msgHandler{ NewBlockHashesMsg: handleNewBlockhashes, NewBlockMsg: handleNewBlock, TransactionsMsg: handleTransactions, - NewPooledTransactionHashesMsg: handleNewPooledTransactionHashes, + NewPooledTransactionHashesMsg: handleNewPooledTransactionHashes66, GetBlockHeadersMsg: handleGetBlockHeaders66, BlockHeadersMsg: handleBlockHeaders66, GetBlockBodiesMsg: handleGetBlockBodies66, @@ -181,20 +184,61 @@ var eth66 = map[uint64]msgHandler{ PooledTransactionsMsg: handlePooledTransactions66, } +var eth67 = map[uint64]msgHandler{ + NewBlockHashesMsg: handleNewBlockhashes, + NewBlockMsg: handleNewBlock, + TransactionsMsg: handleTransactions, + NewPooledTransactionHashesMsg: handleNewPooledTransactionHashes66, + GetBlockHeadersMsg: handleGetBlockHeaders66, + BlockHeadersMsg: handleBlockHeaders66, + GetBlockBodiesMsg: handleGetBlockBodies66, + BlockBodiesMsg: handleBlockBodies66, + GetReceiptsMsg: handleGetReceipts66, + ReceiptsMsg: handleReceipts66, + GetPooledTransactionsMsg: handleGetPooledTransactions66, + PooledTransactionsMsg: handlePooledTransactions66, +} + +var eth68 = map[uint64]msgHandler{ + NewBlockHashesMsg: handleNewBlockhashes, + NewBlockMsg: handleNewBlock, + NewSidecarMsg: handleNewSidecar, + TransactionsMsg: handleTransactions, + NewPooledTransactionHashesMsg: handleNewPooledTransactionHashes68, + GetBlockHeadersMsg: handleGetBlockHeaders66, + BlockHeadersMsg: handleBlockHeaders66, + GetBlockBodiesMsg: handleGetBlockBodies66, + BlockBodiesMsg: handleBlockBodies66, + GetReceiptsMsg: handleGetReceipts66, + ReceiptsMsg: handleReceipts66, + GetPooledTransactionsMsg: handleGetPooledTransactions66, + PooledTransactionsMsg: handlePooledTransactions66, +} + // handleMessage is invoked whenever an inbound message is received from a remote // peer. The remote connection is torn down upon returning any error. func handleMessage(backend Backend, peer *Peer) error { + //fmt.Println("Handling message from remote peer!") // Read the next message from the remote peer, and ensure it's fully consumed msg, err := peer.rw.ReadMsg() if err != nil { return err } + //fmt.Println("msg after peer.rw.ReadMsg() in eth/handler.go: ", msg) if msg.Size > maxMessageSize { return fmt.Errorf("%w: %v > %v", errMsgTooLarge, msg.Size, maxMessageSize) } defer msg.Discard() var handlers = eth66 + if peer.Version() == ETH67 { + fmt.Println("verion 67 :( ") + handlers = eth67 + } + if peer.Version() >= ETH68 { + //fmt.Println("eth68!!") + handlers = eth68 + } // Track the amount of time it takes to serve the request and run the handler if metrics.Enabled { @@ -209,6 +253,10 @@ func handleMessage(backend Backend, peer *Peer) error { }(time.Now()) } if handler := handlers[msg.Code]; handler != nil { + if msg.Code == NewSidecarMsg { + fmt.Println("New Sidecar Message got received!!!.....") + } + //fmt.Println("msg.Code, handler: ", msg.Code, handler) return handler(backend, msg, peer) } return fmt.Errorf("%w: %v", errInvalidMsgCode, msg.Code) diff --git a/eth/protocols/eth/handlers.go b/eth/protocols/eth/handlers.go index 8fc966e7ae..a7bf9b7bf4 100644 --- a/eth/protocols/eth/handlers.go +++ b/eth/protocols/eth/handlers.go @@ -351,6 +351,26 @@ func handleNewBlock(backend Backend, msg Decoder, peer *Peer) error { return backend.Handle(peer, ann) } +func handleNewSidecar(backend Backend, msg Decoder, peer *Peer) error { + fmt.Println("handleNewSidecar!!!!!!!!!!") + // Retrieve and decode the propagated block + ann := new(NewSidecarPacket) + if err := msg.Decode(ann); err != nil { + return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) + } + if err := ann.sanityCheck(); err != nil { + return err + } + + ann.Sidecar.ReceivedAt = msg.Time() + ann.Sidecar.ReceivedFrom = peer + + // Mark the peer as owning the block + peer.markSidecar(ann.Sidecar.SidecarToHash()) + + return backend.Handle(peer, ann) +} + func handleBlockHeaders66(backend Backend, msg Decoder, peer *Peer) error { // A batch of headers arrived to one of our previous requests res := new(BlockHeadersPacket66) @@ -430,13 +450,13 @@ func handleReceipts66(backend Backend, msg Decoder, peer *Peer) error { }, metadata) } -func handleNewPooledTransactionHashes(backend Backend, msg Decoder, peer *Peer) error { +func handleNewPooledTransactionHashes66(backend Backend, msg Decoder, peer *Peer) error { // New transaction announcement arrived, make sure we have // a valid and fresh chain to handle them if !backend.AcceptTxs() { return nil } - ann := new(NewPooledTransactionHashesPacket) + ann := new(NewPooledTransactionHashesPacket66) if err := msg.Decode(ann); err != nil { return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) } @@ -447,6 +467,26 @@ func handleNewPooledTransactionHashes(backend Backend, msg Decoder, peer *Peer) return backend.Handle(peer, ann) } +func handleNewPooledTransactionHashes68(backend Backend, msg Decoder, peer *Peer) error { + // New transaction announcement arrived, make sure we have + // a valid and fresh chain to handle them + if !backend.AcceptTxs() { + return nil + } + ann := new(NewPooledTransactionHashesPacket68) + if err := msg.Decode(ann); err != nil { + return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) + } + if len(ann.Hashes) != len(ann.Types) || len(ann.Hashes) != len(ann.Sizes) { + return fmt.Errorf("%w: message %v: invalid len of fields: %v %v %v", errDecode, msg, len(ann.Hashes), len(ann.Types), len(ann.Sizes)) + } + // Schedule all the unknown hashes for retrieval + for _, hash := range ann.Hashes { + peer.markTransaction(hash) + } + return backend.Handle(peer, ann) +} + func handleGetPooledTransactions66(backend Backend, msg Decoder, peer *Peer) error { // Decode the pooled transactions retrieval message var query GetPooledTransactionsPacket66 @@ -474,7 +514,7 @@ func answerGetPooledTransactions(backend Backend, query GetPooledTransactionsPac continue } // If known, encode and queue for response packet - if encoded, err := rlp.EncodeToBytes(tx); err != nil { + if encoded, err := rlp.EncodeToBytes(types.NewNetworkTransaction(tx)); err != nil { log.Error("Failed to encode transaction", "err", err) } else { hashes = append(hashes, hash) diff --git a/eth/protocols/eth/handshake.go b/eth/protocols/eth/handshake.go index d604f045f4..ff123b3fdc 100644 --- a/eth/protocols/eth/handshake.go +++ b/eth/protocols/eth/handshake.go @@ -147,6 +147,7 @@ func (p *Peer) readStatus(network uint64, status *StatusPacket, genesis common.H if err := forkFilter(status.ForkID); err != nil { return fmt.Errorf("%w: %v", errForkIDRejected, err) } + fmt.Println("Handshake successful! Probably") return nil } diff --git a/eth/protocols/eth/peer.go b/eth/protocols/eth/peer.go index b39aa56eee..e973a93634 100644 --- a/eth/protocols/eth/peer.go +++ b/eth/protocols/eth/peer.go @@ -17,6 +17,7 @@ package eth import ( + "fmt" "math/big" "math/rand" "sync" @@ -82,6 +83,10 @@ type Peer struct { queuedBlocks chan *blockPropagation // Queue of blocks to broadcast to the peer queuedBlockAnns chan *types.Block // Queue of blocks to announce to the peer + knownSidecars *knownCache // Set of Sidecars known by this peer + queuedSidecars chan *sidecarPropagation // Queue of sidecars to broadcast to the peer + queuedSidecarAnns chan *types.Sidecar // Queue of sidecars to announce to the peer + txpool TxPool // Transaction pool used by the broadcasters for liveness checks knownTxs *knownCache // Set of transaction hashes known to be known by this peer txBroadcast chan []common.Hash // Channel used to queue transaction propagation requests @@ -106,7 +111,9 @@ func NewPeer(version uint, p *p2p.Peer, rw p2p.MsgReadWriter, txpool TxPool) *Pe version: version, knownTxs: newKnownCache(maxKnownTxs), knownBlocks: newKnownCache(maxKnownBlocks), + knownSidecars: newKnownCache(maxKnownBlocks), queuedBlocks: make(chan *blockPropagation, maxQueuedBlocks), + queuedSidecars: make(chan *sidecarPropagation, maxQueuedBlocks), queuedBlockAnns: make(chan *types.Block, maxQueuedBlockAnns), txBroadcast: make(chan []common.Hash), txAnnounce: make(chan []common.Hash), @@ -119,6 +126,7 @@ func NewPeer(version uint, p *p2p.Peer, rw p2p.MsgReadWriter, txpool TxPool) *Pe } // Start up all the broadcasters go peer.broadcastBlocks() + go peer.broadcastSidecars() go peer.broadcastTransactions() go peer.announceTransactions() go peer.dispatcher() @@ -184,6 +192,11 @@ func (p *Peer) KnownBlock(hash common.Hash) bool { return p.knownBlocks.Contains(hash) } +// KnownSidecar returns whether peer is known to already have a sidecar. +func (p *Peer) KnownSidecar(hash common.Hash) bool { + return p.knownSidecars.Contains(hash) +} + // KnownTransaction returns whether peer is known to already have a transaction. func (p *Peer) KnownTransaction(hash common.Hash) bool { return p.knownTxs.Contains(hash) @@ -196,6 +209,13 @@ func (p *Peer) markBlock(hash common.Hash) { p.knownBlocks.Add(hash) } +// markSidecar marks a sidecar as known for the peer, ensuring that the sidecar will +// never be propagated to this particular peer. +func (p *Peer) markSidecar(hash common.Hash) { + // If we reached the memory allowance, drop a previously known sidecar hash + p.knownSidecars.Add(hash) +} + // markTransaction marks a transaction as known for the peer, ensuring that it // will never be propagated to this particular peer. func (p *Peer) markTransaction(hash common.Hash) { @@ -212,7 +232,7 @@ func (p *Peer) markTransaction(hash common.Hash) { // // The reasons this is public is to allow packages using this protocol to write // tests that directly send messages without having to do the asyn queueing. -func (p *Peer) SendTransactions(txs types.Transactions) error { +func (p *Peer) SendTransactions(txs types.NetworkTransactions) error { // Mark all the transactions as known, but ensure we don't overflow our limits for _, tx := range txs { p.knownTxs.Add(tx.Hash()) @@ -235,16 +255,29 @@ func (p *Peer) AsyncSendTransactions(hashes []common.Hash) { } } -// sendPooledTransactionHashes sends transaction hashes to the peer and includes +// sendPooledTransactionHashes66 sends transaction hashes to the peer and includes // them in its transaction hash set for future reference. // // This method is a helper used by the async transaction announcer. Don't call it // directly as the queueing (memory) and transmission (bandwidth) costs should // not be managed directly. -func (p *Peer) sendPooledTransactionHashes(hashes []common.Hash) error { +func (p *Peer) sendPooledTransactionHashes66(hashes []common.Hash) error { + // Mark all the transactions as known, but ensure we don't overflow our limits + p.knownTxs.Add(hashes...) + return p2p.Send(p.rw, NewPooledTransactionHashesMsg, NewPooledTransactionHashesPacket66(hashes)) +} + +// sendPooledTransactionHashes68 sends transaction hashes (tagged with their type +// and size) to the peer and includes them in its transaction hash set for future +// reference. +// +// This method is a helper used by the async transaction announcer. Don't call it +// directly as the queueing (memory) and transmission (bandwidth) costs should +// not be managed directly. +func (p *Peer) sendPooledTransactionHashes68(hashes []common.Hash, types []byte, sizes []uint32) error { // Mark all the transactions as known, but ensure we don't overflow our limits p.knownTxs.Add(hashes...) - return p2p.Send(p.rw, NewPooledTransactionHashesMsg, NewPooledTransactionHashesPacket(hashes)) + return p2p.Send(p.rw, NewPooledTransactionHashesMsg, NewPooledTransactionHashesPacket68{Types: types, Sizes: sizes, Hashes: hashes}) } // AsyncSendPooledTransactionHashes queues a list of transactions hashes to eventually @@ -301,16 +334,43 @@ func (p *Peer) AsyncSendNewBlockHash(block *types.Block) { } } +// AsyncSendNewSidecarHash queues the availability of a sidecar for propagation to a +// remote peer. If the peer's broadcast queue is full, the event is silently +// dropped. +func (p *Peer) AsyncSendNewSidecarHash(sidecar *types.Sidecar) { + select { + case p.queuedSidecarAnns <- sidecar: + // Mark all the sidecar hash as known, but ensure we don't overflow our limits + p.knownSidecars.Add(sidecar.SidecarToHash()) + default: + p.Log().Debug("Dropping sidecar announcement", "blockroot", sidecar.BlockRoot, "hash", sidecar.SidecarToHash()) + } +} + // SendNewBlock propagates an entire block to a remote peer. func (p *Peer) SendNewBlock(block *types.Block, td *big.Int) error { // Mark all the block hash as known, but ensure we don't overflow our limits p.knownBlocks.Add(block.Hash()) + fmt.Println("Sending NewBlockMsg: ", block.Number().String()) return p2p.Send(p.rw, NewBlockMsg, &NewBlockPacket{ Block: block, TD: td, }) } +// SendNewSidecar propagates an entire sidecar to a remote peer. +func (p *Peer) SendNewSidecar(sidecar *types.Sidecar, td *big.Int) error { + // Mark all the sidecar hash as known, but ensure we don't overflow our limits + p.knownSidecars.Add(sidecar.SidecarToHash()) + fmt.Println("Sending NewSidecarMsg!!!!!!!: ", sidecar.SidecarToHash()) + //return p2p.Send(p.rw, TransactionsMsg, types.NetworkTransactions{ + // {Tx: types.NewTx(&types.LegacyTx{Nonce: 12})}}) + return p2p.Send(p.rw, NewSidecarMsg, &NewSidecarPacket{ + Sidecar: sidecar, + TD: td, + }) +} + // AsyncSendNewBlock queues an entire block for propagation to a remote peer. If // the peer's broadcast queue is full, the event is silently dropped. func (p *Peer) AsyncSendNewBlock(block *types.Block, td *big.Int) { @@ -323,6 +383,19 @@ func (p *Peer) AsyncSendNewBlock(block *types.Block, td *big.Int) { } } +// AsyncSendNewSidecar queues an entire sidecar for propagation to a remote peer. If +// the peer's broadcast queue is full, the event is silently dropped. +func (p *Peer) AsyncSendNewSidecar(sidecar *types.Sidecar, td *big.Int) { + fmt.Println("Inside AsyncSendNewSidecar") + select { + case p.queuedSidecars <- &sidecarPropagation{sidecar: sidecar, td: td}: + // Mark all the block hash as known, but ensure we don't overflow our limits + p.knownSidecars.Add(sidecar.SidecarToHash()) + default: + p.Log().Debug("Dropping sidecar propagation", "root", sidecar.BlockRoot, "hash", sidecar.SidecarToHash()) + } +} + // SendBlockHeaders sends a batch of block headers to the remote peer. func (p *Peer) SendBlockHeaders(headers []*types.Header) error { return p2p.Send(p.rw, BlockHeadersMsg, BlockHeadersPacket(headers)) diff --git a/eth/protocols/eth/protocol.go b/eth/protocols/eth/protocol.go index 3e171d6f3e..9033543155 100644 --- a/eth/protocols/eth/protocol.go +++ b/eth/protocols/eth/protocol.go @@ -32,6 +32,7 @@ import ( const ( ETH66 = 66 ETH67 = 67 + ETH68 = 68 ) // ProtocolName is the official short name of the `eth` protocol used during @@ -40,11 +41,11 @@ const ProtocolName = "eth" // ProtocolVersions are the supported versions of the `eth` protocol (first // is primary). -var ProtocolVersions = []uint{ETH66, ETH67} +var ProtocolVersions = []uint{ETH68, ETH66, ETH67} // protocolLengths are the number of implemented message corresponding to // different protocol versions. -var protocolLengths = map[uint]uint64{ETH67: 18, ETH66: 17} +var protocolLengths = map[uint]uint64{ETH68: 18, ETH67: 18, ETH66: 17} // todo 4844 should ETH68 be also 18? // maxMessageSize is the maximum cap on the size of a protocol message. const maxMessageSize = 10 * 1024 * 1024 @@ -68,6 +69,8 @@ const ( // Protocol messages overloaded in eth/66 UpgradeStatusMsg = 0x0b + + NewSidecarMsg = 0x0c ) var ( @@ -147,7 +150,16 @@ func (p *NewBlockHashesPacket) Unpack() ([]common.Hash, []uint64) { } // TransactionsPacket is the network packet for broadcasting new transactions. -type TransactionsPacket []*types.Transaction +type TransactionsPacket []*types.NetworkTransaction + +// Unwrap returns the wrapped Transactions +func (p *TransactionsPacket) Unwrap() []*types.Transaction { + txs := make([]*types.Transaction, len(*p)) + for i := range *p { + txs[i] = (*p)[i].Tx + } + return txs +} // GetBlockHeadersPacket represents a block header query. type GetBlockHeadersPacket struct { @@ -224,6 +236,16 @@ type NewBlockPacket struct { TD *big.Int } +// NewSidecarPacket is the network packet for the sidecar propagation message. +type NewSidecarPacket struct { + Sidecar *types.Sidecar + TD *big.Int +} + +func (request *NewSidecarPacket) sanityCheck() error { + panic("Implement me!!") +} + // sanityCheck verifies that the values are reasonable, as a DoS protection func (request *NewBlockPacket) sanityCheck() error { if err := request.Block.SanityCheck(); err != nil { @@ -330,8 +352,15 @@ type ReceiptsRLPPacket66 struct { ReceiptsRLPPacket } -// NewPooledTransactionHashesPacket represents a transaction announcement packet. -type NewPooledTransactionHashesPacket []common.Hash +// NewPooledTransactionHashesPacket66 represents a transaction announcement packet on eth/66 and eth/67. +type NewPooledTransactionHashesPacket66 []common.Hash + +// NewPooledTransactionHashesPacket68 represents a transaction announcement packet on eth/68 and newer. +type NewPooledTransactionHashesPacket68 struct { + Types []byte + Sizes []uint32 + Hashes []common.Hash +} // GetPooledTransactionsPacket represents a transaction query. type GetPooledTransactionsPacket []common.Hash @@ -342,7 +371,7 @@ type GetPooledTransactionsPacket66 struct { } // PooledTransactionsPacket is the network packet for transaction distribution. -type PooledTransactionsPacket []*types.Transaction +type PooledTransactionsPacket []*types.NetworkTransaction // PooledTransactionsPacket66 is the network packet for transaction distribution over eth/66. type PooledTransactionsPacket66 struct { @@ -350,6 +379,15 @@ type PooledTransactionsPacket66 struct { PooledTransactionsPacket } +// Unwrap returns the wrapped transactions +func (p *PooledTransactionsPacket) Unwrap() []*types.Transaction { + txs := make([]*types.Transaction, len(*p)) + for i := range *p { + txs[i] = (*p)[i].Tx + } + return txs +} + // PooledTransactionsRLPPacket is the network packet for transaction distribution, used // in the cases we already have them in rlp-encoded form type PooledTransactionsRLPPacket []rlp.RawValue @@ -387,6 +425,9 @@ func (*BlockBodiesPacket) Kind() byte { return BlockBodiesMsg } func (*NewBlockPacket) Name() string { return "NewBlock" } func (*NewBlockPacket) Kind() byte { return NewBlockMsg } +func (*NewSidecarPacket) Name() string { return "NewSidecar" } +func (*NewSidecarPacket) Kind() byte { return NewSidecarMsg } + func (*GetNodeDataPacket) Name() string { return "GetNodeData" } func (*GetNodeDataPacket) Kind() byte { return GetNodeDataMsg } @@ -399,8 +440,11 @@ func (*GetReceiptsPacket) Kind() byte { return GetReceiptsMsg } func (*ReceiptsPacket) Name() string { return "Receipts" } func (*ReceiptsPacket) Kind() byte { return ReceiptsMsg } -func (*NewPooledTransactionHashesPacket) Name() string { return "NewPooledTransactionHashes" } -func (*NewPooledTransactionHashesPacket) Kind() byte { return NewPooledTransactionHashesMsg } +func (*NewPooledTransactionHashesPacket66) Name() string { return "NewPooledTransactionHashes" } +func (*NewPooledTransactionHashesPacket66) Kind() byte { return NewPooledTransactionHashesMsg } + +func (*NewPooledTransactionHashesPacket68) Name() string { return "NewPooledTransactionHashes" } +func (*NewPooledTransactionHashesPacket68) Kind() byte { return NewPooledTransactionHashesMsg } func (*GetPooledTransactionsPacket) Name() string { return "GetPooledTransactions" } func (*GetPooledTransactionsPacket) Kind() byte { return GetPooledTransactionsMsg } diff --git a/eth/sync.go b/eth/sync.go index a32d305c77..20066411f1 100644 --- a/eth/sync.go +++ b/eth/sync.go @@ -221,6 +221,7 @@ func (cs *chainSyncer) startSync(op *chainSyncOp) { // doSync synchronizes the local blockchain with a remote peer. func (h *handler) doSync(op *chainSyncOp) error { + // todo 4844 sync blobs as well if op.mode == downloader.SnapSync { // Before launch the snap sync, we have to ensure user uses the same // txlookup limit. diff --git a/les/client.go b/les/client.go index 313dd90b49..5aebedef6e 100644 --- a/les/client.go +++ b/les/client.go @@ -115,7 +115,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*LightEthereum, error) { reqDist: newRequestDistributor(peers, &mclock.System{}), accountManager: stack.AccountManager(), merger: merger, - engine: ethconfig.CreateConsensusEngine(stack, chainConfig, &config.Ethash, nil, false, chainDb, nil, genesisHash), + engine: ethconfig.CreateConsensusEngine(stack, chainConfig, &config.Ethash, nil, false, chainDb, nil, nil, genesisHash), // todo 4844 leaving blobdb to nil for light client bloomRequests: make(chan chan *bloombits.Retrieval), bloomIndexer: core.NewBloomIndexer(chainDb, params.BloomBitsBlocksClient, params.HelperTrieConfirmations), p2pServer: stack.Server(), diff --git a/miner/worker.go b/miner/worker.go index ce7ff71065..aa84e14b5e 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -691,8 +691,16 @@ func (w *worker) resultLoop() { writeBlockTimer.UpdateSince(start) log.Info("Successfully sealed new block", "number", block.Block.Number(), "sealhash", sealhash, "hash", hash, "elapsed", common.PrettyDuration(time.Since(task.createdAt))) + // todo 4844 do broadcast sidecars as well! // Broadcast the block and announce chain insertion event + fmt.Println("Broadcasting newly mined block: ", block.Block.Number().String()) w.mux.Post(core.NewMinedBlockEvent{Block: block.Block}) + if len(block.Sidecar) > 0 { + // posting the first sidecar for now to test. todo 4844 post all + w.mux.Post(core.NewMinedSidecarEvent{Sidecar: block.Sidecar[0]}) + } else { + fmt.Println("empty sidecar") + } // Insert the block into the set of pending ones to resultLoop for confirmations w.unconfirmed.Insert(block.Block.NumberU64(), block.Block.Hash()) @@ -984,13 +992,15 @@ func (w *worker) prepareWork(genParams *generateParams) (*environment, error) { } // Construct the sealing block header, set the extra field if it's allowed num := parent.Number() - header := &types.Header{ - ParentHash: parent.Hash(), - Number: num.Add(num, common.Big1), - GasLimit: core.CalcGasLimit(parent.GasLimit(), w.config.GasCeil), - Time: timestamp, - Coinbase: genParams.coinbase, + header := &types.Header{ // todo 4844 it seems like excessDataGas should be set for Cancun + ParentHash: parent.Hash(), + Number: num.Add(num, common.Big1), + GasLimit: core.CalcGasLimit(parent.GasLimit(), w.config.GasCeil), + Time: timestamp, + Coinbase: genParams.coinbase, + ExcessDataGas: parent.ExcessDataGas(), } + //if parent.ExcessDataGas() if !genParams.noExtra && len(w.extra) != 0 { header.Extra = w.extra } diff --git a/p2p/discover/v4_udp.go b/p2p/discover/v4_udp.go index 5e106f30ae..c28706ff72 100644 --- a/p2p/discover/v4_udp.go +++ b/p2p/discover/v4_udp.go @@ -330,6 +330,7 @@ func (t *UDPv4) findnode(toid enode.ID, toaddr *net.UDPAddr, target v4wire.Pubke // there's no need for an error in that case. err := <-rm.errc if errors.Is(err, errTimeout) && rm.reply != nil { + //fmt.Println("udp timeout about to happen...3") err = nil } return nodes, err @@ -480,6 +481,7 @@ func (t *UDPv4) loop() { for el := plist.Front(); el != nil; el = el.Next() { p := el.Value.(*replyMatcher) if now.After(p.deadline) || now.Equal(p.deadline) { + //fmt.Println("udp timeout about to happen...2") p.errc <- errTimeout plist.Remove(el) contTimeouts++ diff --git a/p2p/discover/v5_udp.go b/p2p/discover/v5_udp.go index f88ce33b8b..9db709fcf0 100644 --- a/p2p/discover/v5_udp.go +++ b/p2p/discover/v5_udp.go @@ -501,6 +501,7 @@ func (t *UDPv5) dispatch() { case ct := <-t.respTimeoutCh: active := t.activeCallByNode[ct.c.node.ID()] if ct.c == active && ct.timer == active.timeout { + fmt.Println("udp timeout about to happen...1") ct.c.err <- errTimeout } diff --git a/p2p/peer.go b/p2p/peer.go index cfebfdcdd9..3a764f33e7 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -301,6 +301,7 @@ loop: } writeStart <- struct{}{} case err = <-readErr: + //fmt.Println("read Error!!!!! in p2p/peer.go") if r, ok := err.(DiscReason); ok { remoteRequested = true reason = r @@ -310,6 +311,7 @@ loop: break loop case err = <-p.protoErr: reason = discReasonForError(err) + fmt.Println("About to break loop!!!!! ", err) break loop case err = <-p.disc: reason = discReasonForError(err) @@ -344,7 +346,12 @@ func (p *Peer) pingLoop() { func (p *Peer) readLoop(errc chan<- error) { defer p.wg.Done() for { - msg, err := p.rw.ReadMsg() + //fmt.Println("inside readLoop of peer....") + msg, err := p.rw.ReadMsg() // todo 4844 probably here lies the problem?? + if msg.Code != pingMsg && msg.Code != pongMsg { + //fmt.Println("inside readLoop of peer....") + //fmt.Println("msg, err: ", msg.String(), err) + } if err != nil { errc <- err return @@ -358,6 +365,10 @@ func (p *Peer) readLoop(errc chan<- error) { } func (p *Peer) handle(msg Msg) error { + // todo 4844 this is never getting block related message. Just ping-pong messages. + if msg.Code != pingMsg && msg.Code != pongMsg { + //fmt.Println("handling message", msg) + } switch { case msg.Code == pingMsg: msg.Discard() @@ -374,6 +385,7 @@ func (p *Peer) handle(msg Msg) error { // ignore other base protocol messages return msg.Discard() default: + //fmt.Println("sub protocol message") // it's a subprotocol message proto, err := p.getProto(msg.Code) if err != nil { @@ -408,6 +420,7 @@ func countMatchingProtocols(protocols []Protocol, caps []Cap) int { // matchProtocols creates structures for matching named subprotocols. func matchProtocols(protocols []Protocol, caps []Cap, rw MsgReadWriter) map[string]*protoRW { + // todo 4844 check if this is going fine. sort.Sort(capsByNameAndVersion(caps)) offset := baseProtocolLength result := make(map[string]*protoRW) @@ -452,6 +465,7 @@ func (p *Peer) startProtocols(writeStart <-chan struct{}, writeErr chan<- error) } else if !errors.Is(err, io.EOF) { p.log.Trace(fmt.Sprintf("Protocol %s/%d failed", proto.Name, proto.Version), "err", err) } + fmt.Println("error in startProtocols: ", err) p.protoErr <- err }() } @@ -504,6 +518,10 @@ func (rw *protoRW) WriteMsg(msg Msg) (err error) { func (rw *protoRW) ReadMsg() (Msg, error) { select { case msg := <-rw.in: + fmt.Println("Reading message from peer, ", msg.Code) + if msg.Code == 0x02 { + fmt.Println("fake Sidecar received in Node B!!!!!!!", msg.Size) + } msg.Code -= rw.offset return msg, nil case <-rw.closed: diff --git a/rlp/decode.go b/rlp/decode.go index 608770af76..2ffdff00ce 100644 --- a/rlp/decode.go +++ b/rlp/decode.go @@ -169,6 +169,7 @@ func makeDecoder(typ reflect.Type, tags rlpstruct.Tags) (dec decoder, err error) case kind == reflect.Ptr: return makePtrDecoder(typ, tags) case reflect.PtrTo(typ).Implements(decoderInterface): + fmt.Println("type and kind: ", typ.String(), kind.String()) return decodeDecoder, nil case isUint(kind): return decodeUint, nil @@ -1029,7 +1030,7 @@ func (s *Stream) Kind() (kind Kind, size uint64, err error) { if inList && s.size > listLimit { s.kinderr = ErrElemTooLarge } else if s.limited && s.size > s.remaining { - fmt.Println("n > s.remaining 2", s.size, s.remaining) + //fmt.Println("n > s.remaining 2", s.size, s.remaining) s.kinderr = ErrValueTooLarge } } @@ -1169,7 +1170,7 @@ func (s *Stream) willRead(n uint64) error { } if s.limited { if n > s.remaining { - fmt.Println("n > s.remaining", n, s.remaining) + //fmt.Println("n > s.remaining", n, s.remaining) return ErrValueTooLarge } s.remaining -= n