Skip to content
This repository has been archived by the owner on Oct 15, 2024. It is now read-only.

[R4R]support Index service recovery and indexHeight in Status api #106

Merged
merged 6 commits into from
Aug 13, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG_PENDING.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,6 @@
### FEATURES:

### IMPROVEMENTS:
- [index] [\#106](https://github.com/binance-chain/bnc-tendermint/pull/106) index service recover from data lost

### BUG FIXES:
75 changes: 44 additions & 31 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ import (
ctypes "github.com/tendermint/tendermint/rpc/core/types"
grpccore "github.com/tendermint/tendermint/rpc/grpc"
rpcserver "github.com/tendermint/tendermint/rpc/lib/server"
sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/snapshot"
sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/state/blockindex"
bkv "github.com/tendermint/tendermint/state/blockindex/kv"
nullblk "github.com/tendermint/tendermint/state/blockindex/null"
Expand Down Expand Up @@ -160,21 +160,22 @@ type Node struct {
isListening bool

// services
eventBus *types.EventBus // pub/sub for services
stateDB dbm.DB
blockStore *bc.BlockStore // store the blockchain to disk
bcReactor *bc.BlockchainReactor // for fast-syncing
mempoolReactor *mempl.MempoolReactor // for gossipping transactions
consensusState *cs.ConsensusState // latest consensus state
consensusReactor *cs.ConsensusReactor // for participating in the consensus
evidencePool *evidence.EvidencePool // tracking evidence
proxyApp proxy.AppConns // connection to the application
rpcListeners []net.Listener // rpc servers
txIndexer txindex.TxIndexer
blockIndexer blockindex.BlockIndexer
indexerService *txindex.IndexerService
eventBus *types.EventBus // pub/sub for services
stateDB dbm.DB
blockStore *bc.BlockStore // store the blockchain to disk
bcReactor *bc.BlockchainReactor // for fast-syncing
mempoolReactor *mempl.MempoolReactor // for gossipping transactions
consensusState *cs.ConsensusState // latest consensus state
consensusReactor *cs.ConsensusReactor // for participating in the consensus
evidencePool *evidence.EvidencePool // tracking evidence
proxyApp proxy.AppConns // connection to the application
rpcListeners []net.Listener // rpc servers
txIndexer txindex.TxIndexer
blockIndexer blockindex.BlockIndexer
indexerService *txindex.IndexerService
blockIndexService *blockindex.IndexerService
prometheusSrv *http.Server
indexHub *sm.IndexHub
prometheusSrv *http.Server
}

// NewNode returns a new, ready to go, Tendermint Node.
Expand Down Expand Up @@ -239,7 +240,7 @@ func NewNode(config *cfg.Config,

// Transaction indexing
var txIndexer txindex.TxIndexer
var txDB dbm.DB // TODO: remove by refactor defaultdbprovider to cache the created db instaces
var txDB dbm.DB // TODO: remove by refactor defaultdbprovider to cache the created db instaces
switch config.TxIndex.Indexer {
case "kv":
store, err := dbProvider(&DBContext{"tx_index", config})
Expand Down Expand Up @@ -286,6 +287,17 @@ func NewNode(config *cfg.Config,
return nil, err
}

csMetrics, p2pMetrics, memplMetrics, smMetrics := metricsProvider(genDoc.ChainID)

indexHub := sm.NewIndexHub(state.LastBlockHeight, stateDB, blockStore, eventBus, sm.IndexHubWithMetrics(smMetrics))
indexHub.RegisterIndexSvc(blockIndexerService)
indexHub.RegisterIndexSvc(txIndexerService)
indexHub.SetLogger(logger.With("module", "indexer_hub"))
err = indexHub.Start()
if err != nil {
return nil, err
}
rickyyangz marked this conversation as resolved.
Show resolved Hide resolved

// Create the handshaker, which calls RequestInfo, sets the AppVersion on the state,
// and replays any blocks as necessary to sync tendermint with the app.
consensusLogger := logger.With("module", "consensus")
Expand Down Expand Up @@ -346,8 +358,6 @@ func NewNode(config *cfg.Config,
consensusLogger.Info("This node is not a validator", "addr", addr, "pubKey", pubKey)
}

csMetrics, p2pMetrics, memplMetrics, smMetrics := metricsProvider(genDoc.ChainID)

// Make MempoolReactor
mempool := mempl.NewMempool(
config.Mempool,
Expand Down Expand Up @@ -583,19 +593,20 @@ func NewNode(config *cfg.Config,
nodeInfo: nodeInfo,
nodeKey: nodeKey,

stateDB: stateDB,
blockStore: blockStore,
bcReactor: bcReactor,
mempoolReactor: mempoolReactor,
consensusState: consensusState,
consensusReactor: consensusReactor,
evidencePool: evidencePool,
proxyApp: proxyApp,
txIndexer: txIndexer,
blockIndexer: blockIndexer,
indexerService: txIndexerService,
stateDB: stateDB,
blockStore: blockStore,
bcReactor: bcReactor,
mempoolReactor: mempoolReactor,
consensusState: consensusState,
consensusReactor: consensusReactor,
evidencePool: evidencePool,
proxyApp: proxyApp,
txIndexer: txIndexer,
blockIndexer: blockIndexer,
indexerService: txIndexerService,
blockIndexService: blockIndexerService,
eventBus: eventBus,
indexHub: indexHub,
eventBus: eventBus,
}
node.BaseService = *cmn.NewBaseService(logger, "Node", node)
return node, nil
Expand Down Expand Up @@ -666,6 +677,7 @@ func (n *Node) OnStop() {
n.eventBus.Stop()
n.indexerService.Stop()
n.blockIndexService.Stop()
n.indexHub.Stop()

// now stop the reactors
// TODO: gracefully disconnect from peers.
Expand Down Expand Up @@ -719,6 +731,7 @@ func (n *Node) ConfigureRPC() {
rpccore.SetProxyAppQuery(n.proxyApp.Query())
rpccore.SetTxIndexer(n.txIndexer)
rpccore.SetBlockIndexer(n.blockIndexer)
rpccore.SetIndexHub(n.indexHub)
rpccore.SetConsensusReactor(n.consensusReactor)
rpccore.SetEventBus(n.eventBus)
rpccore.SetLogger(n.Logger.With("module", "rpc"))
Expand All @@ -738,7 +751,7 @@ func (n *Node) startRPC() ([]net.Listener, error) {
// we may expose the rpc over both a unix and tcp socket
listeners := make([]net.Listener, len(listenAddrs))
var wsWorkerPool *gopool.Pool
if n.config.RPC.WebsocketPoolMaxSize > 1{
if n.config.RPC.WebsocketPoolMaxSize > 1 {
wsWorkerPool = gopool.NewPool(n.config.RPC.WebsocketPoolMaxSize, n.config.RPC.WebsocketPoolQueueSize, n.config.RPC.WebsocketPoolSpawnSize)
wsWorkerPool.SetLogger(n.Logger.With("module", "routine-pool"))
}
Expand Down
5 changes: 5 additions & 0 deletions rpc/core/pipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ var (
consensusReactor *consensus.ConsensusReactor
eventBus *types.EventBus // thread safe
mempool *mempl.Mempool
indexerHub *sm.IndexHub

logger log.Logger

Expand Down Expand Up @@ -133,6 +134,10 @@ func SetBlockIndexer(indexer blockindex.BlockIndexer) {
blockIndexer = indexer
}

func SetIndexHub(hub *sm.IndexHub) {
indexerHub = hub
}

func SetConsensusReactor(conR *consensus.ConsensusReactor) {
consensusReactor = conR
}
Expand Down
4 changes: 3 additions & 1 deletion rpc/core/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ import (
// "latest_app_hash": "0000000000000000",
// "latest_block_height": "18",
// "latest_block_time": "2018-09-17T11:42:19.149920551Z",
// "catching_up": false
// "catching_up": false,
// "index_height": "18"
// },
// "validator_info": {
// "address": "D9F56456D7C5793815D0E9AF07C3A355D0FC64FD",
Expand Down Expand Up @@ -106,6 +107,7 @@ func Status(ctx *rpctypes.Context) (*ctypes.ResultStatus, error) {
LatestBlockHeight: latestHeight,
LatestBlockTime: latestBlockTime,
CatchingUp: consensusReactor.FastSync(),
IndexHeight: indexerHub.GetHeight(),
},
ValidatorInfo: ctypes.ValidatorInfo{
Address: pubKey.Address(),
Expand Down
1 change: 1 addition & 0 deletions rpc/core/types/responses.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ type SyncInfo struct {
LatestBlockHeight int64 `json:"latest_block_height"`
LatestBlockTime time.Time `json:"latest_block_time"`
CatchingUp bool `json:"catching_up"`
IndexHeight int64 `json:"index_height"`
}

// Info about the node's validator
Expand Down
9 changes: 9 additions & 0 deletions state/blockindex/indexer_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ type IndexerService struct {

idr BlockIndexer
eventBus *types.EventBus

onIndex func(int64)
}

// NewIndexerService returns a new service instance.
Expand All @@ -28,6 +30,10 @@ func NewIndexerService(idr BlockIndexer, eventBus *types.EventBus) *IndexerServi
return is
}

func (is *IndexerService) SetOnIndex(callback func(int64)) {
is.onIndex = callback
}

// OnStart implements cmn.Service by subscribing for blocks and indexing them by hash.
func (is *IndexerService) OnStart() error {
blockHeadersSub, err := is.eventBus.SubscribeUnbuffered(context.Background(), subscriber, types.EventQueryNewBlockHeader)
Expand All @@ -45,6 +51,9 @@ func (is *IndexerService) OnStart() error {
} else {
is.Logger.Info("Indexed block", "height", header.Height, "hash", header.LastBlockID.Hash)
}
if is.onIndex != nil {
is.onIndex(header.Height)
}
}
}()
return nil
Expand Down
14 changes: 7 additions & 7 deletions state/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,13 @@ func BlockExecutorWithMetrics(metrics *Metrics) BlockExecutorOption {
func NewBlockExecutor(db dbm.DB, logger log.Logger, proxyApp proxy.AppConnConsensus,
mempool Mempool, evpool EvidencePool, withAppState bool, options ...BlockExecutorOption) *BlockExecutor {
res := &BlockExecutor{
db: db,
proxyApp: proxyApp,
eventBus: types.NopEventBus{},
mempool: mempool,
evpool: evpool,
logger: logger,
metrics: NopMetrics(),
db: db,
proxyApp: proxyApp,
eventBus: types.NopEventBus{},
mempool: mempool,
evpool: evpool,
logger: logger,
metrics: NopMetrics(),
withAppState: withAppState,
}

Expand Down
Loading