Skip to content

Commit

Permalink
Merge pull request ethereum#303 from maticnetwork/merge-subs2
Browse files Browse the repository at this point in the history
Merge subs2
  • Loading branch information
0xsharma authored Jan 25, 2022
2 parents 87a095e + 58e4311 commit 55c34d6
Show file tree
Hide file tree
Showing 17 changed files with 977 additions and 280 deletions.
29 changes: 29 additions & 0 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ type BlockChain struct {
borReceiptsCache *lru.Cache // Cache for the most recent bor receipt receipts per block
stateSyncData []*types.StateSyncData // State sync data
stateSyncFeed event.Feed // State sync feed
chain2HeadFeed event.Feed // Reorg/NewHead/Fork data feed
}

// NewBlockChain returns a fully initialised block chain using information
Expand Down Expand Up @@ -1640,10 +1641,21 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
for _, data := range bc.stateSyncData {
bc.stateSyncFeed.Send(StateSyncEvent{Data: data})
}

bc.chain2HeadFeed.Send(Chain2HeadEvent{
Type: Chain2HeadCanonicalEvent,
NewChain: []*types.Block{block},
})

// BOR
}
} else {
bc.chainSideFeed.Send(ChainSideEvent{Block: block})

bc.chain2HeadFeed.Send(Chain2HeadEvent{
Type: Chain2HeadForkEvent,
NewChain: []*types.Block{block},
})
}
return status, nil
}
Expand Down Expand Up @@ -1737,6 +1749,11 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er
defer func() {
if lastCanon != nil && bc.CurrentBlock().Hash() == lastCanon.Hash() {
bc.chainHeadFeed.Send(ChainHeadEvent{lastCanon})

bc.chain2HeadFeed.Send(Chain2HeadEvent{
Type: Chain2HeadCanonicalEvent,
NewChain: []*types.Block{lastCanon},
})
}
}()
// Start the parallel header verifier
Expand Down Expand Up @@ -2262,6 +2279,13 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
}
// Ensure the user sees large reorgs
if len(oldChain) > 0 && len(newChain) > 0 {

bc.chain2HeadFeed.Send(Chain2HeadEvent{
Type: Chain2HeadReorgEvent,
NewChain: newChain,
OldChain: oldChain,
})

logFn := log.Info
msg := "Chain reorg detected"
if len(oldChain) > 63 {
Expand Down Expand Up @@ -2570,6 +2594,11 @@ func (bc *BlockChain) SubscribeChainHeadEvent(ch chan<- ChainHeadEvent) event.Su
return bc.scope.Track(bc.chainHeadFeed.Subscribe(ch))
}

// SubscribeChain2HeadEvent registers a subscription of ChainHeadEvent. ()
func (bc *BlockChain) SubscribeChain2HeadEvent(ch chan<- Chain2HeadEvent) event.Subscription {
return bc.scope.Track(bc.chain2HeadFeed.Subscribe(ch))
}

// SubscribeChainSideEvent registers a subscription of ChainSideEvent.
func (bc *BlockChain) SubscribeChainSideEvent(ch chan<- ChainSideEvent) event.Subscription {
return bc.scope.Track(bc.chainSideFeed.Subscribe(ch))
Expand Down
134 changes: 134 additions & 0 deletions core/blockchain_bor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package core

import (
"math/big"
"testing"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus/ethash"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/params"
)

func TestChain2HeadEvent(t *testing.T) {
var (
db = rawdb.NewMemoryDatabase()
key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
addr1 = crypto.PubkeyToAddress(key1.PublicKey)
gspec = &Genesis{
Config: params.TestChainConfig,
Alloc: GenesisAlloc{addr1: {Balance: big.NewInt(10000000000000000)}},
}
genesis = gspec.MustCommit(db)
signer = types.LatestSigner(gspec.Config)
)

blockchain, _ := NewBlockChain(db, nil, gspec.Config, ethash.NewFaker(), vm.Config{}, nil, nil)
defer blockchain.Stop()

chain2HeadCh := make(chan Chain2HeadEvent, 64)
blockchain.SubscribeChain2HeadEvent(chain2HeadCh)

chain, _ := GenerateChain(gspec.Config, genesis, ethash.NewFaker(), db, 3, func(i int, gen *BlockGen) {})
if _, err := blockchain.InsertChain(chain); err != nil {
t.Fatalf("failed to insert chain: %v", err)
}

replacementBlocks, _ := GenerateChain(gspec.Config, genesis, ethash.NewFaker(), db, 4, func(i int, gen *BlockGen) {
tx, err := types.SignTx(types.NewContractCreation(gen.TxNonce(addr1), new(big.Int), 1000000, gen.header.BaseFee, nil), signer, key1)
if i == 2 {
gen.OffsetTime(-9)
}
if err != nil {
t.Fatalf("failed to create tx: %v", err)
}
gen.AddTx(tx)
})

if _, err := blockchain.InsertChain(replacementBlocks); err != nil {
t.Fatalf("failed to insert chain: %v", err)
}

type eventTest struct {
Type string
Added []common.Hash
Removed []common.Hash
}

readEvent := func(expect *eventTest) {
select {
case ev := <-chain2HeadCh:
if ev.Type != expect.Type {
t.Fatal("Type mismatch")
}

if len(ev.NewChain) != len(expect.Added) {
t.Fatal("Newchain and Added Array Size don't match")
}
if len(ev.OldChain) != len(expect.Removed) {
t.Fatal("Oldchain and Removed Array Size don't match")
}

for j := 0; j < len(ev.OldChain); j++ {
if ev.OldChain[j].Hash() != expect.Removed[j] {
t.Fatal("Oldchain hashes Do Not Match")
}
}
for j := 0; j < len(ev.NewChain); j++ {
if ev.NewChain[j].Hash() != expect.Added[j] {
t.Fatal("Newchain hashes Do Not Match")
}
}
case <-time.After(2 * time.Second):
t.Fatal("timeout")
}
}

// head event
readEvent(&eventTest{
Type: Chain2HeadCanonicalEvent,
Added: []common.Hash{
chain[2].Hash(),
}})

// fork event
readEvent(&eventTest{
Type: Chain2HeadForkEvent,
Added: []common.Hash{
replacementBlocks[0].Hash(),
}})

// fork event
readEvent(&eventTest{
Type: Chain2HeadForkEvent,
Added: []common.Hash{
replacementBlocks[1].Hash(),
}})

// reorg event
//In this event the channel recieves an array of Blocks in NewChain and OldChain
readEvent(&eventTest{
Type: Chain2HeadReorgEvent,
Added: []common.Hash{
replacementBlocks[2].Hash(),
replacementBlocks[1].Hash(),
replacementBlocks[0].Hash(),
},
Removed: []common.Hash{
chain[2].Hash(),
chain[1].Hash(),
chain[0].Hash(),
},
})

// head event
readEvent(&eventTest{
Type: Chain2HeadCanonicalEvent,
Added: []common.Hash{
replacementBlocks[3].Hash(),
}})
}
13 changes: 13 additions & 0 deletions core/bor_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,16 @@ import (
type StateSyncEvent struct {
Data *types.StateSyncData
}

var (
Chain2HeadReorgEvent = "reorg"
Chain2HeadCanonicalEvent = "head"
Chain2HeadForkEvent = "fork"
)

// For tracking reorgs related information
type Chain2HeadEvent struct {
NewChain []*types.Block
OldChain []*types.Block
Type string
}
2 changes: 2 additions & 0 deletions docs/cli/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,6 @@

- [```status```](./status.md)

- [```chain watch```](./chain_watch.md)

- [```version```](./version.md)
3 changes: 3 additions & 0 deletions docs/cli/chain_watch.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Chain watch

The ```chain watch``` command is used to view the chainHead, reorg and fork events in real-time.
5 changes: 5 additions & 0 deletions eth/bor_api_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,3 +67,8 @@ func (b *EthAPIBackend) GetBorBlockTransactionWithBlockHash(ctx context.Context,
func (b *EthAPIBackend) SubscribeStateSyncEvent(ch chan<- core.StateSyncEvent) event.Subscription {
return b.eth.BlockChain().SubscribeStateSyncEvent(ch)
}

// SubscribeChain2HeadEvent subscribes to reorg/head/fork event
func (b *EthAPIBackend) SubscribeChain2HeadEvent(ch chan<- core.Chain2HeadEvent) event.Subscription {
return b.eth.BlockChain().SubscribeChain2HeadEvent(ch)
}
78 changes: 73 additions & 5 deletions ethstats/ethstats.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ const (
txChanSize = 4096
// chainHeadChanSize is the size of channel listening to ChainHeadEvent.
chainHeadChanSize = 10

// chain2HeadChanSize is the size of channel listening to Chain2HeadEvent.
chain2HeadChanSize = 10
)

// backend encompasses the bare-minimum functionality needed for ethstats reporting
Expand All @@ -68,6 +71,9 @@ type backend interface {
GetTd(ctx context.Context, hash common.Hash) *big.Int
Stats() (pending int, queued int)
SyncProgress() ethereum.SyncProgress

// Bor
SubscribeChain2HeadEvent(ch chan<- core.Chain2HeadEvent) event.Subscription
}

// fullNodeBackend encompasses the functionality necessary for a full node
Expand Down Expand Up @@ -96,6 +102,9 @@ type Service struct {

headSub event.Subscription
txSub event.Subscription

//bor related sub
chain2headSub event.Subscription
}

// connWrapper is a wrapper to prevent concurrent-write or concurrent-read on the
Expand Down Expand Up @@ -195,7 +204,9 @@ func (s *Service) Start() error {
s.headSub = s.backend.SubscribeChainHeadEvent(chainHeadCh)
txEventCh := make(chan core.NewTxsEvent, txChanSize)
s.txSub = s.backend.SubscribeNewTxsEvent(txEventCh)
go s.loop(chainHeadCh, txEventCh)
chain2HeadCh := make(chan core.Chain2HeadEvent, chain2HeadChanSize)
s.chain2headSub = s.backend.SubscribeChain2HeadEvent(chain2HeadCh)
go s.loop(chainHeadCh, chain2HeadCh, txEventCh)

log.Info("Stats daemon started")
return nil
Expand All @@ -211,12 +222,13 @@ func (s *Service) Stop() error {

// loop keeps trying to connect to the netstats server, reporting chain events
// until termination.
func (s *Service) loop(chainHeadCh chan core.ChainHeadEvent, txEventCh chan core.NewTxsEvent) {
func (s *Service) loop(chainHeadCh chan core.ChainHeadEvent, chain2HeadCh chan core.Chain2HeadEvent, txEventCh chan core.NewTxsEvent) {
// Start a goroutine that exhausts the subscriptions to avoid events piling up
var (
quitCh = make(chan struct{})
headCh = make(chan *types.Block, 1)
txCh = make(chan struct{}, 1)
quitCh = make(chan struct{})
headCh = make(chan *types.Block, 1)
txCh = make(chan struct{}, 1)
head2Ch = make(chan core.Chain2HeadEvent, 100)
)
go func() {
var lastTx mclock.AbsTime
Expand All @@ -231,6 +243,13 @@ func (s *Service) loop(chainHeadCh chan core.ChainHeadEvent, txEventCh chan core
default:
}

// Notify of chain2head events, but drop if too frequent
case chain2head := <-chain2HeadCh:
select {
case head2Ch <- chain2head:
default:
}

// Notify of new transaction events, but drop if too frequent
case <-txEventCh:
if time.Duration(mclock.Now()-lastTx) < time.Second {
Expand Down Expand Up @@ -333,6 +352,12 @@ func (s *Service) loop(chainHeadCh chan core.ChainHeadEvent, txEventCh chan core
if err = s.reportPending(conn); err != nil {
log.Warn("Post-block transaction stats report failed", "err", err)
}

case chain2head := <-head2Ch:
if err = s.reportChain2Head(conn, &chain2head); err != nil {
log.Warn("Reorg stats report failed", "err", err)
}

case <-txCh:
if err = s.reportPending(conn); err != nil {
log.Warn("Transaction stats report failed", "err", err)
Expand Down Expand Up @@ -750,6 +775,49 @@ func (s *Service) reportPending(conn *connWrapper) error {
return conn.WriteJSON(report)
}

type blockStub struct {
Hash string `json:"hash"`
Number uint64 `json:"number"`
ParentHash string `json:"parent_hash"`
}

func createStub(b *types.Block) *blockStub {
s := &blockStub{
Hash: b.Hash().String(),
ParentHash: b.ParentHash().String(),
Number: b.NumberU64(),
}
return s
}

type ChainHeadEvent struct {
NewChain []*blockStub `json:"added"`
OldChain []*blockStub `json:"removed"`
Type string `json:"type"`
}

// reportChain2Head checks for reorg and sends current head to stats server.
func (s *Service) reportChain2Head(conn *connWrapper, chain2HeadData *core.Chain2HeadEvent) error {
chainHeadEvent := ChainHeadEvent{
Type: chain2HeadData.Type,
}
for _, block := range chain2HeadData.NewChain {
chainHeadEvent.NewChain = append(chainHeadEvent.NewChain, createStub(block))
}
for _, block := range chain2HeadData.OldChain {
chainHeadEvent.OldChain = append(chainHeadEvent.OldChain, createStub(block))
}

stats := map[string]interface{}{
"id": s.node,
"event": chainHeadEvent,
}
report := map[string][]interface{}{
"emit": {"headEvent", stats},
}
return conn.WriteJSON(report)
}

// nodeStats is the information to report about the local node.
type nodeStats struct {
Active bool `json:"active"`
Expand Down
1 change: 0 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ github.com/Azure/go-autorest/autorest/date v0.2.0 h1:yW+Zlqf26583pE43KhfnhFcdmSW
github.com/Azure/go-autorest/autorest/date v0.2.0/go.mod h1:vcORJHLJEh643/Ioh9+vPmf1Ij9AEBM5FuBIXLmIy0g=
github.com/Azure/go-autorest/autorest/mocks v0.1.0/go.mod h1:OTyCOPRA2IgIlWxVYxBee2F5Gr4kF2zd2J5cFRaIDN0=
github.com/Azure/go-autorest/autorest/mocks v0.2.0/go.mod h1:OTyCOPRA2IgIlWxVYxBee2F5Gr4kF2zd2J5cFRaIDN0=
github.com/Azure/go-autorest/autorest/mocks v0.3.0 h1:qJumjCaCudz+OcqE9/XtEPfvtOjOmKaui4EOpFI6zZc=
github.com/Azure/go-autorest/autorest/mocks v0.3.0/go.mod h1:a8FDP3DYzQ4RYfVAxAN3SVSiiO77gL2j2ronKKP0syM=
github.com/Azure/go-autorest/logger v0.1.0 h1:ruG4BSDXONFRrZZJ2GUXDiUyVpayPmb1GnWeHDdaNKY=
github.com/Azure/go-autorest/logger v0.1.0/go.mod h1:oExouG+K6PryycPJfVSxi/koC6LSNgds39diKLz7Vrc=
Expand Down
Loading

0 comments on commit 55c34d6

Please sign in to comment.