Skip to content

Commit

Permalink
Merge pull request #64 from irisnet/event-bus
Browse files Browse the repository at this point in the history
R4R: Start eventBus and indexerService before first block
  • Loading branch information
Haifeng Xi authored May 17, 2019
2 parents 5a1cb27 + 9143e3d commit b9ea8b8
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 37 deletions.
9 changes: 9 additions & 0 deletions consensus/replay.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ type Handshaker struct {
stateDB dbm.DB
initialState sm.State
store sm.BlockStore
eventBus types.BlockEventPublisher
genDoc *types.GenesisDoc
logger log.Logger

Expand All @@ -211,6 +212,7 @@ func NewHandshaker(stateDB dbm.DB, state sm.State,
stateDB: stateDB,
initialState: state,
store: store,
eventBus: types.NopEventBus{},
genDoc: genDoc,
logger: log.NewNopLogger(),
nBlocks: 0,
Expand All @@ -221,6 +223,12 @@ func (h *Handshaker) SetLogger(l log.Logger) {
h.logger = l
}

// SetEventBus - sets the event bus for publishing block related events.
// If not called, it defaults to types.NopEventBus.
func (h *Handshaker) SetEventBus(eventBus types.BlockEventPublisher) {
h.eventBus = eventBus
}

func (h *Handshaker) NBlocks() int {
return h.nBlocks
}
Expand Down Expand Up @@ -453,6 +461,7 @@ func (h *Handshaker) replayBlock(state sm.State, height int64, proxyApp proxy.Ap
meta := h.store.LoadBlockMeta(height)

blockExec := sm.NewBlockExecutor(h.stateDB, h.logger, proxyApp, sm.MockMempool{}, sm.MockEvidencePool{})
blockExec.SetEventBus(h.eventBus)

var err error
state, err = blockExec.ApplyBlock(state, meta.BlockID, block)
Expand Down
12 changes: 7 additions & 5 deletions consensus/replay_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,17 +326,19 @@ func newConsensusStateForReplay(config cfg.BaseConfig, csConfig *cfg.ConsensusCo
cmn.Exit(fmt.Sprintf("Error starting proxy app conns: %v", err))
}

eventBus := types.NewEventBus()
if err := eventBus.Start(); err != nil {
cmn.Exit(fmt.Sprintf("Failed to start event bus: %v", err))
}

handshaker := NewHandshaker(stateDB, state, blockStore, gdoc)
handshaker.SetEventBus(eventBus)

err = handshaker.Handshake(proxyApp, &config)
if err != nil {
cmn.Exit(fmt.Sprintf("Error on handshake: %v", err))
}

eventBus := types.NewEventBus()
if err := eventBus.Start(); err != nil {
cmn.Exit(fmt.Sprintf("Failed to start event bus: %v", err))
}

mempool, evpool := sm.MockMempool{}, sm.MockEvidencePool{}
blockExec := sm.NewBlockExecutor(stateDB, log.TestingLogger(), proxyApp.Consensus(), mempool, evpool)

Expand Down
74 changes: 42 additions & 32 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,11 +196,52 @@ func NewNode(config *cfg.Config,
return nil, fmt.Errorf("Error starting proxy app connections: %v", err)
}

// EventBus and IndexerService must be started before the handshake because
// we might need to index the txs of the replayed block as this might not have happened
// when the node stopped last time (i.e. the node stopped after it saved the block
// but before it indexed the txs, or, endblocker panicked)
eventBus := types.NewEventBus()
eventBus.SetLogger(logger.With("module", "events"))

err = eventBus.Start()
if err != nil {
return nil, err
}

// Transaction indexing
var txIndexer txindex.TxIndexer
switch config.TxIndex.Indexer {
case "kv":
store, err := dbProvider(&DBContext{"tx_index", config})
if err != nil {
return nil, err
}
if config.TxIndex.IndexTags != "" {
txIndexer = kv.NewTxIndex(store, kv.IndexTags(splitAndTrimEmpty(config.TxIndex.IndexTags, ",", " ")))
} else if config.TxIndex.IndexAllTags {
txIndexer = kv.NewTxIndex(store, kv.IndexAllTags())
} else {
txIndexer = kv.NewTxIndex(store)
}
default:
txIndexer = &null.TxIndex{}
}

indexerService := txindex.NewIndexerService(txIndexer, eventBus)
indexerService.SetLogger(logger.With("module", "txindex"))

err = indexerService.Start()
if err != nil {
return nil, err
}


// Create the handshaker, which calls RequestInfo, sets the AppVersion on the state,
// and replays any blocks as necessary to sync tendermint with the app.
consensusLogger := logger.With("module", "consensus")
handshaker := cs.NewHandshaker(stateDB, state, blockStore, genDoc)
handshaker.SetLogger(consensusLogger)
handshaker.SetEventBus(eventBus)
if err := handshaker.Handshake(proxyApp, &config.BaseConfig); err != nil {
return nil, fmt.Errorf("Error during handshake: %v", err)
}
Expand Down Expand Up @@ -322,35 +363,10 @@ func NewNode(config *cfg.Config,
consensusReactor := cs.NewConsensusReactor(consensusState, fastSync, cs.ReactorMetrics(csMetrics))
consensusReactor.SetLogger(consensusLogger)

eventBus := types.NewEventBus()
eventBus.SetLogger(logger.With("module", "events"))

// services which will be publishing and/or subscribing for messages (events)
// consensusReactor will set it on consensusState and blockExecutor
consensusReactor.SetEventBus(eventBus)

// Transaction indexing
var txIndexer txindex.TxIndexer
switch config.TxIndex.Indexer {
case "kv":
store, err := dbProvider(&DBContext{"tx_index", config})
if err != nil {
return nil, err
}
if config.TxIndex.IndexTags != "" {
txIndexer = kv.NewTxIndex(store, kv.IndexTags(splitAndTrimEmpty(config.TxIndex.IndexTags, ",", " ")))
} else if config.TxIndex.IndexAllTags {
txIndexer = kv.NewTxIndex(store, kv.IndexAllTags())
} else {
txIndexer = kv.NewTxIndex(store)
}
default:
txIndexer = &null.TxIndex{}
}

indexerService := txindex.NewIndexerService(txIndexer, eventBus)
indexerService.SetLogger(logger.With("module", "txindex"))

p2pLogger := logger.With("module", "p2p")
nodeInfo, err := makeNodeInfo(
config,
Expand Down Expand Up @@ -513,11 +529,6 @@ func (n *Node) OnStart() error {
time.Sleep(genTime.Sub(now))
}

err := n.eventBus.Start()
if err != nil {
return err
}

// Add private IDs to addrbook to block those peers being added
n.addrBook.AddPrivateIDs(splitAndTrimEmpty(n.config.P2P.PrivatePeerIDs, ",", " "))

Expand Down Expand Up @@ -566,8 +577,7 @@ func (n *Node) OnStart() error {
}
}

// start tx indexer
return n.indexerService.Start()
return nil
}

// OnStop stops the Node. It implements cmn.Service.
Expand Down

0 comments on commit b9ea8b8

Please sign in to comment.