diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index aaa75891b..3abf4206c 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -17,5 +17,6 @@ ### FEATURES: ### IMPROVEMENTS: +- [index] [\#106](https://github.com/binance-chain/bnc-tendermint/pull/106) index service recover from data lost ### BUG FIXES: diff --git a/node/node.go b/node/node.go index 60da4071a..b824c55a1 100644 --- a/node/node.go +++ b/node/node.go @@ -37,8 +37,8 @@ import ( ctypes "github.com/tendermint/tendermint/rpc/core/types" grpccore "github.com/tendermint/tendermint/rpc/grpc" rpcserver "github.com/tendermint/tendermint/rpc/lib/server" - sm "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/snapshot" + sm "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/state/blockindex" bkv "github.com/tendermint/tendermint/state/blockindex/kv" nullblk "github.com/tendermint/tendermint/state/blockindex/null" @@ -160,21 +160,22 @@ type Node struct { isListening bool // services - eventBus *types.EventBus // pub/sub for services - stateDB dbm.DB - blockStore *bc.BlockStore // store the blockchain to disk - bcReactor *bc.BlockchainReactor // for fast-syncing - mempoolReactor *mempl.MempoolReactor // for gossipping transactions - consensusState *cs.ConsensusState // latest consensus state - consensusReactor *cs.ConsensusReactor // for participating in the consensus - evidencePool *evidence.EvidencePool // tracking evidence - proxyApp proxy.AppConns // connection to the application - rpcListeners []net.Listener // rpc servers - txIndexer txindex.TxIndexer - blockIndexer blockindex.BlockIndexer - indexerService *txindex.IndexerService + eventBus *types.EventBus // pub/sub for services + stateDB dbm.DB + blockStore *bc.BlockStore // store the blockchain to disk + bcReactor *bc.BlockchainReactor // for fast-syncing + mempoolReactor *mempl.MempoolReactor // for gossipping transactions + consensusState *cs.ConsensusState // latest consensus state + consensusReactor *cs.ConsensusReactor // for participating in the consensus + evidencePool *evidence.EvidencePool // tracking evidence + proxyApp proxy.AppConns // connection to the application + rpcListeners []net.Listener // rpc servers + txIndexer txindex.TxIndexer + blockIndexer blockindex.BlockIndexer + indexerService *txindex.IndexerService blockIndexService *blockindex.IndexerService - prometheusSrv *http.Server + indexHub *sm.IndexHub + prometheusSrv *http.Server } // NewNode returns a new, ready to go, Tendermint Node. @@ -239,7 +240,7 @@ func NewNode(config *cfg.Config, // Transaction indexing var txIndexer txindex.TxIndexer - var txDB dbm.DB // TODO: remove by refactor defaultdbprovider to cache the created db instaces + var txDB dbm.DB // TODO: remove by refactor defaultdbprovider to cache the created db instaces switch config.TxIndex.Indexer { case "kv": store, err := dbProvider(&DBContext{"tx_index", config}) @@ -286,6 +287,17 @@ func NewNode(config *cfg.Config, return nil, err } + csMetrics, p2pMetrics, memplMetrics, smMetrics := metricsProvider(genDoc.ChainID) + + indexHub := sm.NewIndexHub(state.LastBlockHeight, stateDB, blockStore, eventBus, sm.IndexHubWithMetrics(smMetrics)) + indexHub.RegisterIndexSvc(blockIndexerService) + indexHub.RegisterIndexSvc(txIndexerService) + indexHub.SetLogger(logger.With("module", "indexer_hub")) + err = indexHub.Start() + if err != nil { + return nil, err + } + // Create the handshaker, which calls RequestInfo, sets the AppVersion on the state, // and replays any blocks as necessary to sync tendermint with the app. consensusLogger := logger.With("module", "consensus") @@ -346,8 +358,6 @@ func NewNode(config *cfg.Config, consensusLogger.Info("This node is not a validator", "addr", addr, "pubKey", pubKey) } - csMetrics, p2pMetrics, memplMetrics, smMetrics := metricsProvider(genDoc.ChainID) - // Make MempoolReactor mempool := mempl.NewMempool( config.Mempool, @@ -583,19 +593,20 @@ func NewNode(config *cfg.Config, nodeInfo: nodeInfo, nodeKey: nodeKey, - stateDB: stateDB, - blockStore: blockStore, - bcReactor: bcReactor, - mempoolReactor: mempoolReactor, - consensusState: consensusState, - consensusReactor: consensusReactor, - evidencePool: evidencePool, - proxyApp: proxyApp, - txIndexer: txIndexer, - blockIndexer: blockIndexer, - indexerService: txIndexerService, + stateDB: stateDB, + blockStore: blockStore, + bcReactor: bcReactor, + mempoolReactor: mempoolReactor, + consensusState: consensusState, + consensusReactor: consensusReactor, + evidencePool: evidencePool, + proxyApp: proxyApp, + txIndexer: txIndexer, + blockIndexer: blockIndexer, + indexerService: txIndexerService, blockIndexService: blockIndexerService, - eventBus: eventBus, + indexHub: indexHub, + eventBus: eventBus, } node.BaseService = *cmn.NewBaseService(logger, "Node", node) return node, nil @@ -666,6 +677,7 @@ func (n *Node) OnStop() { n.eventBus.Stop() n.indexerService.Stop() n.blockIndexService.Stop() + n.indexHub.Stop() // now stop the reactors // TODO: gracefully disconnect from peers. @@ -719,6 +731,7 @@ func (n *Node) ConfigureRPC() { rpccore.SetProxyAppQuery(n.proxyApp.Query()) rpccore.SetTxIndexer(n.txIndexer) rpccore.SetBlockIndexer(n.blockIndexer) + rpccore.SetIndexHub(n.indexHub) rpccore.SetConsensusReactor(n.consensusReactor) rpccore.SetEventBus(n.eventBus) rpccore.SetLogger(n.Logger.With("module", "rpc")) @@ -738,7 +751,7 @@ func (n *Node) startRPC() ([]net.Listener, error) { // we may expose the rpc over both a unix and tcp socket listeners := make([]net.Listener, len(listenAddrs)) var wsWorkerPool *gopool.Pool - if n.config.RPC.WebsocketPoolMaxSize > 1{ + if n.config.RPC.WebsocketPoolMaxSize > 1 { wsWorkerPool = gopool.NewPool(n.config.RPC.WebsocketPoolMaxSize, n.config.RPC.WebsocketPoolQueueSize, n.config.RPC.WebsocketPoolSpawnSize) wsWorkerPool.SetLogger(n.Logger.With("module", "routine-pool")) } diff --git a/rpc/core/pipe.go b/rpc/core/pipe.go index 6f1579b6d..43eabf53b 100644 --- a/rpc/core/pipe.go +++ b/rpc/core/pipe.go @@ -75,6 +75,7 @@ var ( consensusReactor *consensus.ConsensusReactor eventBus *types.EventBus // thread safe mempool *mempl.Mempool + indexerHub *sm.IndexHub logger log.Logger @@ -133,6 +134,10 @@ func SetBlockIndexer(indexer blockindex.BlockIndexer) { blockIndexer = indexer } +func SetIndexHub(hub *sm.IndexHub) { + indexerHub = hub +} + func SetConsensusReactor(conR *consensus.ConsensusReactor) { consensusReactor = conR } diff --git a/rpc/core/status.go b/rpc/core/status.go index f2e0624d4..f2f0ddb82 100644 --- a/rpc/core/status.go +++ b/rpc/core/status.go @@ -58,7 +58,8 @@ import ( // "latest_app_hash": "0000000000000000", // "latest_block_height": "18", // "latest_block_time": "2018-09-17T11:42:19.149920551Z", -// "catching_up": false +// "catching_up": false, +// "index_height": "18" // }, // "validator_info": { // "address": "D9F56456D7C5793815D0E9AF07C3A355D0FC64FD", @@ -106,6 +107,7 @@ func Status(ctx *rpctypes.Context) (*ctypes.ResultStatus, error) { LatestBlockHeight: latestHeight, LatestBlockTime: latestBlockTime, CatchingUp: consensusReactor.FastSync(), + IndexHeight: indexerHub.GetHeight(), }, ValidatorInfo: ctypes.ValidatorInfo{ Address: pubKey.Address(), diff --git a/rpc/core/types/responses.go b/rpc/core/types/responses.go index 74457b38a..075fa957a 100644 --- a/rpc/core/types/responses.go +++ b/rpc/core/types/responses.go @@ -63,6 +63,7 @@ type SyncInfo struct { LatestBlockHeight int64 `json:"latest_block_height"` LatestBlockTime time.Time `json:"latest_block_time"` CatchingUp bool `json:"catching_up"` + IndexHeight int64 `json:"index_height"` } // Info about the node's validator diff --git a/state/blockindex/indexer_service.go b/state/blockindex/indexer_service.go index 2a4b3bf81..f41768e0f 100644 --- a/state/blockindex/indexer_service.go +++ b/state/blockindex/indexer_service.go @@ -19,6 +19,8 @@ type IndexerService struct { idr BlockIndexer eventBus *types.EventBus + + onIndex func(int64) } // NewIndexerService returns a new service instance. @@ -28,6 +30,10 @@ func NewIndexerService(idr BlockIndexer, eventBus *types.EventBus) *IndexerServi return is } +func (is *IndexerService) SetOnIndex(callback func(int64)) { + is.onIndex = callback +} + // OnStart implements cmn.Service by subscribing for blocks and indexing them by hash. func (is *IndexerService) OnStart() error { blockHeadersSub, err := is.eventBus.SubscribeUnbuffered(context.Background(), subscriber, types.EventQueryNewBlockHeader) @@ -45,6 +51,9 @@ func (is *IndexerService) OnStart() error { } else { is.Logger.Info("Indexed block", "height", header.Height, "hash", header.LastBlockID.Hash) } + if is.onIndex != nil { + is.onIndex(header.Height) + } } }() return nil diff --git a/state/execution.go b/state/execution.go index 78ce171fb..07e76c9ad 100644 --- a/state/execution.go +++ b/state/execution.go @@ -54,13 +54,13 @@ func BlockExecutorWithMetrics(metrics *Metrics) BlockExecutorOption { func NewBlockExecutor(db dbm.DB, logger log.Logger, proxyApp proxy.AppConnConsensus, mempool Mempool, evpool EvidencePool, withAppState bool, options ...BlockExecutorOption) *BlockExecutor { res := &BlockExecutor{ - db: db, - proxyApp: proxyApp, - eventBus: types.NopEventBus{}, - mempool: mempool, - evpool: evpool, - logger: logger, - metrics: NopMetrics(), + db: db, + proxyApp: proxyApp, + eventBus: types.NopEventBus{}, + mempool: mempool, + evpool: evpool, + logger: logger, + metrics: NopMetrics(), withAppState: withAppState, } diff --git a/state/index.go b/state/index.go new file mode 100644 index 000000000..82b4b701e --- /dev/null +++ b/state/index.go @@ -0,0 +1,181 @@ +package state + +import ( + "sync" + + cmn "github.com/tendermint/tendermint/libs/common" + dbm "github.com/tendermint/tendermint/libs/db" + "github.com/tendermint/tendermint/types" + abci "github.com/tendermint/tendermint/abci/types" + +) + +const ( + MaxIndexLag = 100 +) + +var IndexHeightKey = []byte("IndexHeightKey") + +type IndexService interface { + SetOnIndex(callback func(int64)) +} + +type IndexHub struct { + cmn.BaseService + mtx sync.Mutex + + stateHeight int64 + expectHeight int64 + + // the total registered index service + numIdxSvc int + indexTaskCounter map[int64]int + indexTaskEvents chan int64 + + stateDB dbm.DB + blockStore BlockStore + eventBus types.BlockEventPublisher + + metrics *Metrics +} + +func NewIndexHub(initialHeight int64, stateDB dbm.DB, blockStore BlockStore, eventBus types.BlockEventPublisher, options ...IndexHubOption) *IndexHub { + ih := &IndexHub{ + stateHeight: initialHeight, + indexTaskCounter: make(map[int64]int, 0), + indexTaskEvents: make(chan int64, MaxIndexLag), + stateDB: stateDB, + blockStore: blockStore, + eventBus: eventBus, + metrics: NopMetrics(), + } + indexedHeight := ih.GetIndexedHeight() + if indexedHeight < 0 { + // no indexedHeight found, will do no recover + ih.expectHeight = ih.stateHeight + 1 + } else { + ih.expectHeight = indexedHeight + 1 + } + for _, option := range options { + option(ih) + } + ih.BaseService = *cmn.NewBaseService(nil, "indexHub", ih) + return ih +} + +type IndexHubOption func(*IndexHub) + +func IndexHubWithMetrics(metrics *Metrics) IndexHubOption { + return func(ih *IndexHub) { + ih.metrics = metrics + } +} + +func (ih *IndexHub) OnStart() error { + // start listen routine before recovering. + go ih.commitIndexRoutine() + ih.recoverIndex() + return nil +} + +func (ih *IndexHub) recoverIndex() { + for h := ih.expectHeight; h <= ih.stateHeight; h++ { + ih.Logger.Info("try to recover index", "height", h) + block := ih.blockStore.LoadBlock(h) + if block == nil { + ih.Logger.Error("index skip since the the block is missing", "height", h) + } else { + abciResponses, err := LoadABCIResponses(ih.stateDB, h) + if err != nil { + ih.Logger.Error("failed to load ABCIResponse, will use default") + abciResponses = NewABCIResponses(block) + abciResponses.EndBlock = &abci.ResponseEndBlock{} + abciResponses.BeginBlock = &abci.ResponseBeginBlock{} + } + abciValUpdates := abciResponses.EndBlock.ValidatorUpdates + validatorUpdates, err := types.PB2TM.ValidatorUpdates(abciValUpdates) + if err != nil { + ih.Logger.Error("failed to load validatorUpdates, will use nil by default") + } + fireEvents(ih.Logger, ih.eventBus, block, abciResponses, validatorUpdates) + } + } +} + +func (ih *IndexHub) commitIndexRoutine() { + for { + select { + case <-ih.Quit(): + return + case h := <-ih.indexTaskEvents: + ih.Logger.Info("finish index", "height", h) + ih.SetIndexedHeight(h) + ih.metrics.IndexHeight.Set(float64(h)) + } + } +} + +func (ih *IndexHub) RegisterIndexSvc(idx IndexService) { + ih.mtx.Lock() + defer ih.mtx.Unlock() + if ih.IsRunning() { + panic("can't RegisterIndexSvc when IndexHub is running") + } + idx.SetOnIndex(ih.CountDownAt) + ih.numIdxSvc++ +} + +// `CountDownAt` is a callback in index service, keep it simple and fast. +func (ih *IndexHub) CountDownAt(height int64) { + ih.mtx.Lock() + defer ih.mtx.Unlock() + count, exist := ih.indexTaskCounter[height] + if exist { + count = count - 1 + } else { + count = ih.numIdxSvc - 1 + } + // The higher block won't finish index before lower one. + if count == 0 && height == ih.expectHeight { + if exist { + delete(ih.indexTaskCounter, height) + } + ih.expectHeight = ih.expectHeight + 1 + ih.indexTaskEvents <- height + } else { + ih.indexTaskCounter[height] = count + } +} + +// set and get won't happen in the same time, won't lock +func (ih *IndexHub) SetIndexedHeight(h int64) { + rawHeight, err := cdc.MarshalBinaryBare(h) + if err != nil { + ih.Logger.Error("failed to MarshalBinaryBare for indexed height", "error", err, "height", h) + } else { + ih.stateDB.Set(IndexHeightKey, rawHeight) + } +} + +// if never store `IndexHeightKey` in index db, will return -1. +func (ih *IndexHub) GetIndexedHeight() int64 { + rawHeight := ih.stateDB.Get(IndexHeightKey) + if rawHeight == nil { + return -1 + } else { + var height int64 + err := cdc.UnmarshalBinaryBare(rawHeight, &height) + if err != nil { + // should not happen + panic(err) + } + return height + } +} + +// get indexed height from memory to save time for RPC +func (ih *IndexHub) GetHeight() int64 { + ih.mtx.Lock() + defer ih.mtx.Unlock() + return ih.expectHeight - 1 +} diff --git a/state/index_test.go b/state/index_test.go new file mode 100644 index 000000000..041f8a98f --- /dev/null +++ b/state/index_test.go @@ -0,0 +1,90 @@ +package state + +import ( + "math/rand" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + abci "github.com/tendermint/tendermint/abci/types" + "github.com/tendermint/tendermint/libs/db" + "github.com/tendermint/tendermint/libs/log" + "github.com/tendermint/tendermint/state/blockindex" + kv2 "github.com/tendermint/tendermint/state/blockindex/kv" + "github.com/tendermint/tendermint/state/txindex" + "github.com/tendermint/tendermint/state/txindex/kv" + "github.com/tendermint/tendermint/types" +) + +func TestSetHeight(t *testing.T) { + + indexDb := db.NewMemDB() + indexHub := NewIndexHub(0, indexDb, nil, nil) + indexHub.SetLogger(log.TestingLogger()) + + realHeightAtFirst := indexHub.GetIndexedHeight() + assert.Equal(t, int64(-1), realHeightAtFirst) + height := int64(1024) + indexHub.SetIndexedHeight(height) + realHeight := indexHub.GetIndexedHeight() + assert.Equal(t, height, realHeight) +} + +func TestCountDown(t *testing.T) { + // event bus + eventBus := types.NewEventBus() + eventBus.SetLogger(log.TestingLogger()) + err := eventBus.Start() + require.NoError(t, err) + defer eventBus.Stop() + + indexDb := db.NewMemDB() + + // start tx index + txIndexer := kv.NewTxIndex(indexDb, kv.IndexAllTags()) + txIndexSvc := txindex.NewIndexerService(txIndexer, eventBus) + txIndexSvc.SetLogger(log.TestingLogger()) + err = txIndexSvc.Start() + require.NoError(t, err) + defer txIndexSvc.Stop() + + // start block index + blockIndexer := kv2.NewBlockIndex(indexDb) + blockIndexSvc := blockindex.NewIndexerService(blockIndexer, eventBus) + blockIndexSvc.SetLogger(log.TestingLogger()) + err = blockIndexSvc.Start() + require.NoError(t, err) + defer blockIndexSvc.Stop() + + // start index hub + indexHub := NewIndexHub(0, indexDb, nil, eventBus) + indexHub.SetLogger(log.TestingLogger()) + indexHub.RegisterIndexSvc(txIndexSvc) + indexHub.RegisterIndexSvc(blockIndexSvc) + err = indexHub.Start() + assert.NoError(t, err) + + // publish block with txs + for h := int64(1); h < 10; h++ { + numTxs := rand.Int63n(5) + eventBus.PublishEventNewBlockHeader(types.EventDataNewBlockHeader{ + Header: types.Header{Height: h, NumTxs: numTxs}, + }) + for i := int64(0); i < numTxs; i++ { + txResult := &types.TxResult{ + Height: h, + Index: uint32(i), + Tx: types.Tx("foo"), + Result: abci.ResponseDeliverTx{Code: 0}, + } + eventBus.PublishEventTx(types.EventDataTx{*txResult}) + } + // In test case, 100ms is far enough for index + time.Sleep(100 * time.Millisecond) + assert.Equal(t, int64(h), indexHub.GetIndexedHeight()) + // test no memory leak + assert.Equal(t, len(indexHub.indexTaskCounter), 0) + } +} diff --git a/state/metrics.go b/state/metrics.go index bcd713f5f..23f01225e 100644 --- a/state/metrics.go +++ b/state/metrics.go @@ -17,6 +17,8 @@ const ( type Metrics struct { // Time between BeginBlock and EndBlock. BlockProcessingTime metrics.Histogram + // The latest height of block that have been indexed + IndexHeight metrics.Gauge } // PrometheusMetrics returns Metrics build using Prometheus client library. @@ -35,6 +37,12 @@ func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics { Help: "Time between BeginBlock and EndBlock in ms.", Buckets: stdprometheus.LinearBuckets(1, 10, 10), }, labels).With(labelsAndValues...), + IndexHeight: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "index_height", + Help: "The latest height of block that have been indexed", + }, append(labels)).With(labelsAndValues...), } } @@ -42,5 +50,6 @@ func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics { func NopMetrics() *Metrics { return &Metrics{ BlockProcessingTime: discard.NewHistogram(), + IndexHeight: discard.NewGauge(), } } diff --git a/state/store.go b/state/store.go index b9cf119ab..5938b202e 100644 --- a/state/store.go +++ b/state/store.go @@ -22,7 +22,7 @@ const ( const latestStateToKeep int64 = 1 << 20 func calcStateKey(height int64) []byte { - return []byte(fmt.Sprintf("stateKey:%v", height % latestStateToKeep)) + return []byte(fmt.Sprintf("stateKey:%v", height%latestStateToKeep)) } func calcValidatorsKey(height int64) []byte { diff --git a/state/txindex/indexer_service.go b/state/txindex/indexer_service.go index 03fa9c03e..de9d83688 100644 --- a/state/txindex/indexer_service.go +++ b/state/txindex/indexer_service.go @@ -19,6 +19,8 @@ type IndexerService struct { idr TxIndexer eventBus *types.EventBus + + onIndex func(int64) } // NewIndexerService returns a new service instance. @@ -28,6 +30,10 @@ func NewIndexerService(idr TxIndexer, eventBus *types.EventBus) *IndexerService return is } +func (is *IndexerService) SetOnIndex(callback func(int64)) { + is.onIndex = callback +} + // OnStart implements cmn.Service by subscribing for all transactions // and indexing them by tags. func (is *IndexerService) OnStart() error { @@ -65,6 +71,9 @@ func (is *IndexerService) OnStart() error { } else { is.Logger.Info("Indexed txs for block", "height", header.Height) } + if is.onIndex != nil { + is.onIndex(header.Height) + } } }() return nil