From 2a70e3e43ff87f9548d0cf3ea7d2c7cfeaa86b57 Mon Sep 17 00:00:00 2001 From: Cong Zhao Date: Thu, 13 Dec 2018 15:24:08 +0800 Subject: [PATCH] #121 state sync mode --- Gopkg.lock | 159 -------- abci/client/client.go | 6 + abci/client/grpc_client.go | 16 + abci/client/local_client.go | 22 ++ abci/client/socket_client.go | 18 + abci/example/kvstore/persistent_kvstore.go | 20 + abci/types/application.go | 47 +++ blockchain/reactor.go | 15 + blockchain/state_pool.go | 407 +++++++++++++++++++++ blockchain/state_reactor.go | 364 ++++++++++++++++++ blockchain/wire.go | 1 + config/config.go | 3 + config/toml.go | 2 + node/node.go | 28 +- proxy/app_conn.go | 52 +++ proxy/multi_app_conn.go | 18 + state/store.go | 23 ++ types/params.go | 3 + 18 files changed, 1040 insertions(+), 164 deletions(-) create mode 100644 blockchain/state_pool.go create mode 100644 blockchain/state_reactor.go diff --git a/Gopkg.lock b/Gopkg.lock index 51d5c7d2112..45f3511fb37 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -384,90 +384,6 @@ revision = "faa6e731944e2b7b6a46ad202902851e8ce85bee" version = "v0.12.0" -[[projects]] - digest = "1:099b0dcac7e6e5eedd1a0abf809fc1be64e5e9e46706783142768cba1eb5548f" - name = "github.com/tendermint/tendermint" - packages = [ - "abci/client", - "abci/example/code", - "abci/example/counter", - "abci/example/kvstore", - "abci/server", - "abci/tests/server", - "abci/types", - "abci/version", - "benchmarks/proto", - "blockchain", - "cmd/tendermint/commands", - "config", - "consensus", - "consensus/types", - "crypto", - "crypto/ed25519", - "crypto/encoding/amino", - "crypto/internal/benchmarking", - "crypto/merkle", - "crypto/multisig", - "crypto/multisig/bitarray", - "crypto/secp256k1", - "crypto/tmhash", - "evidence", - "libs/autofile", - "libs/bech32", - "libs/cli", - "libs/cli/flags", - "libs/clist", - "libs/common", - "libs/db", - "libs/db/remotedb", - "libs/db/remotedb/grpcdb", - "libs/db/remotedb/proto", - "libs/errors", - "libs/events", - "libs/flowrate", - "libs/log", - "libs/pubsub", - "libs/pubsub/query", - "libs/test", - "lite", - "lite/client", - "lite/errors", - "lite/proxy", - "mempool", - "node", - "p2p", - "p2p/conn", - "p2p/dummy", - "p2p/pex", - "p2p/upnp", - "privval", - "proxy", - "rpc/client", - "rpc/client/mock", - "rpc/core", - "rpc/core/types", - "rpc/grpc", - "rpc/lib", - "rpc/lib/client", - "rpc/lib/server", - "rpc/lib/types", - "rpc/test", - "state", - "state/txindex", - "state/txindex/kv", - "state/txindex/null", - "tools/tm-monitor/eventmeter", - "tools/tm-monitor/mock", - "tools/tm-monitor/monitor", - "types", - "types/proto3", - "types/time", - "version", - ] - pruneopts = "UT" - revision = "0c9c3292c918617624f6f3fbcd95eceade18bcd5" - version = "v0.25.0" - [[projects]] branch = "master" digest = "1:bcc56b3f6583305a362d58adfadd4448ef7d6b112c32fb6b6fc5960c26c4f7c7" @@ -630,81 +546,6 @@ "github.com/tendermint/ed25519", "github.com/tendermint/ed25519/extra25519", "github.com/tendermint/go-amino", - "github.com/tendermint/tendermint/abci/client", - "github.com/tendermint/tendermint/abci/example/code", - "github.com/tendermint/tendermint/abci/example/counter", - "github.com/tendermint/tendermint/abci/example/kvstore", - "github.com/tendermint/tendermint/abci/server", - "github.com/tendermint/tendermint/abci/tests/server", - "github.com/tendermint/tendermint/abci/types", - "github.com/tendermint/tendermint/abci/version", - "github.com/tendermint/tendermint/benchmarks/proto", - "github.com/tendermint/tendermint/blockchain", - "github.com/tendermint/tendermint/cmd/tendermint/commands", - "github.com/tendermint/tendermint/config", - "github.com/tendermint/tendermint/consensus", - "github.com/tendermint/tendermint/consensus/types", - "github.com/tendermint/tendermint/crypto", - "github.com/tendermint/tendermint/crypto/ed25519", - "github.com/tendermint/tendermint/crypto/encoding/amino", - "github.com/tendermint/tendermint/crypto/internal/benchmarking", - "github.com/tendermint/tendermint/crypto/merkle", - "github.com/tendermint/tendermint/crypto/multisig", - "github.com/tendermint/tendermint/crypto/multisig/bitarray", - "github.com/tendermint/tendermint/crypto/secp256k1", - "github.com/tendermint/tendermint/crypto/tmhash", - "github.com/tendermint/tendermint/evidence", - "github.com/tendermint/tendermint/libs/autofile", - "github.com/tendermint/tendermint/libs/bech32", - "github.com/tendermint/tendermint/libs/cli", - "github.com/tendermint/tendermint/libs/cli/flags", - "github.com/tendermint/tendermint/libs/clist", - "github.com/tendermint/tendermint/libs/common", - "github.com/tendermint/tendermint/libs/db", - "github.com/tendermint/tendermint/libs/db/remotedb", - "github.com/tendermint/tendermint/libs/db/remotedb/grpcdb", - "github.com/tendermint/tendermint/libs/db/remotedb/proto", - "github.com/tendermint/tendermint/libs/errors", - "github.com/tendermint/tendermint/libs/events", - "github.com/tendermint/tendermint/libs/flowrate", - "github.com/tendermint/tendermint/libs/log", - "github.com/tendermint/tendermint/libs/pubsub", - "github.com/tendermint/tendermint/libs/pubsub/query", - "github.com/tendermint/tendermint/libs/test", - "github.com/tendermint/tendermint/lite", - "github.com/tendermint/tendermint/lite/client", - "github.com/tendermint/tendermint/lite/errors", - "github.com/tendermint/tendermint/lite/proxy", - "github.com/tendermint/tendermint/mempool", - "github.com/tendermint/tendermint/node", - "github.com/tendermint/tendermint/p2p", - "github.com/tendermint/tendermint/p2p/conn", - "github.com/tendermint/tendermint/p2p/dummy", - "github.com/tendermint/tendermint/p2p/pex", - "github.com/tendermint/tendermint/p2p/upnp", - "github.com/tendermint/tendermint/privval", - "github.com/tendermint/tendermint/proxy", - "github.com/tendermint/tendermint/rpc/client", - "github.com/tendermint/tendermint/rpc/client/mock", - "github.com/tendermint/tendermint/rpc/core", - "github.com/tendermint/tendermint/rpc/core/types", - "github.com/tendermint/tendermint/rpc/grpc", - "github.com/tendermint/tendermint/rpc/lib", - "github.com/tendermint/tendermint/rpc/lib/client", - "github.com/tendermint/tendermint/rpc/lib/server", - "github.com/tendermint/tendermint/rpc/lib/types", - "github.com/tendermint/tendermint/rpc/test", - "github.com/tendermint/tendermint/state", - "github.com/tendermint/tendermint/state/txindex", - "github.com/tendermint/tendermint/state/txindex/kv", - "github.com/tendermint/tendermint/state/txindex/null", - "github.com/tendermint/tendermint/tools/tm-monitor/eventmeter", - "github.com/tendermint/tendermint/tools/tm-monitor/mock", - "github.com/tendermint/tendermint/tools/tm-monitor/monitor", - "github.com/tendermint/tendermint/types", - "github.com/tendermint/tendermint/types/proto3", - "github.com/tendermint/tendermint/types/time", - "github.com/tendermint/tendermint/version", "golang.org/x/crypto/bcrypt", "golang.org/x/crypto/chacha20poly1305", "golang.org/x/crypto/curve25519", diff --git a/abci/client/client.go b/abci/client/client.go index 5b85c41adcd..0e749eb5dfb 100644 --- a/abci/client/client.go +++ b/abci/client/client.go @@ -48,6 +48,12 @@ type Client interface { InitChainSync(types.RequestInitChain) (*types.ResponseInitChain, error) BeginBlockSync(types.RequestBeginBlock) (*types.ResponseBeginBlock, error) EndBlockSync(types.RequestEndBlock) (*types.ResponseEndBlock, error) + + LatestSnapshot() (height int64, numKeys map[string]int64, err error) // query application state height and numOfKeys + ReadSnapshotChunk(height int64, startIndex, endIndex int64) (chunk map[string][][]byte, err error) + StartRecovery(height int64, numKeys map[string]int64) error + WriteRecoveryChunk(storeName string, chunk [][]byte) error + EndRecovery() error } //---------------------------------------- diff --git a/abci/client/grpc_client.go b/abci/client/grpc_client.go index 1781929208a..361066cc4f1 100644 --- a/abci/client/grpc_client.go +++ b/abci/client/grpc_client.go @@ -308,3 +308,19 @@ func (cli *grpcClient) EndBlockSync(params types.RequestEndBlock) (*types.Respon reqres := cli.EndBlockAsync(params) return reqres.Response.GetEndBlock(), cli.Error() } + +func (cli *grpcClient) LatestSnapshot() (height int64, numKeys map[string]int64, err error) { + return 0, make(map[string]int64), nil +} +func (cli *grpcClient) ReadSnapshotChunk(height int64, startIndex, endIndex int64) (chunk map[string][][]byte, err error) { + return make(map[string][][]byte), nil +} +func (cli *grpcClient) StartRecovery(height int64, numKeys map[string]int64) error { + return nil +} +func (cli *grpcClient) WriteRecoveryChunk(storeName string, chunk [][]byte) error { + return nil +} +func (cli *grpcClient) EndRecovery() error { + return nil +} diff --git a/abci/client/local_client.go b/abci/client/local_client.go index 22f08eb1b6a..acb5dfe1d4e 100644 --- a/abci/client/local_client.go +++ b/abci/client/local_client.go @@ -152,6 +152,28 @@ func (app *localClient) EndBlockAsync(req types.RequestEndBlock) *ReqRes { ) } +func (app *localClient) LatestSnapshot() (height int64, numKeys map[string]int64, err error) { + app.mtx.Lock() + defer app.mtx.Unlock() + return app.Application.LatestSnapshot() +} + +func (app *localClient) ReadSnapshotChunk(height int64, startIndex, endIndex int64) (chunk map[string][][]byte, err error) { + return app.Application.ReadSnapshotChunk(height, startIndex, endIndex) +} + +func (app *localClient) StartRecovery(height int64, numKeys map[string]int64) error { + return app.Application.StartRecovery(height, numKeys) +} + +func (app *localClient) WriteRecoveryChunk(storeName string, chunk [][]byte) error { + return app.Application.WriteRecoveryChunk(storeName, chunk) +} + +func (app *localClient) EndRecovery() error { + return app.Application.EndRecovery() +} + //------------------------------------------------------- func (app *localClient) FlushSync() error { diff --git a/abci/client/socket_client.go b/abci/client/socket_client.go index 8c961544992..1273c464b1a 100644 --- a/abci/client/socket_client.go +++ b/abci/client/socket_client.go @@ -343,6 +343,24 @@ func (cli *socketClient) EndBlockSync(req types.RequestEndBlock) (*types.Respons //---------------------------------------- +func (cli *socketClient) LatestSnapshot() (height int64, numKeys map[string]int64, err error) { + return 0, make(map[string]int64), nil +} +func (cli *socketClient) ReadSnapshotChunk(height int64, startIndex, endIndex int64) (chunk map[string][][]byte, err error) { + return make(map[string][][]byte), nil +} +func (cli *socketClient) StartRecovery(height int64, numKeys map[string]int64) error { + return nil +} +func (cli *socketClient) WriteRecoveryChunk(storeName string, chunk [][]byte) error { + return nil +} +func (cli *socketClient) EndRecovery() error { + return nil +} + +//---------------------------------------- + func (cli *socketClient) queueRequest(req *types.Request) *ReqRes { reqres := NewReqRes(req) diff --git a/abci/example/kvstore/persistent_kvstore.go b/abci/example/kvstore/persistent_kvstore.go index f67920222ac..57bed319483 100644 --- a/abci/example/kvstore/persistent_kvstore.go +++ b/abci/example/kvstore/persistent_kvstore.go @@ -114,6 +114,26 @@ func (app *PersistentKVStoreApplication) EndBlock(req types.RequestEndBlock) typ return types.ResponseEndBlock{ValidatorUpdates: app.ValUpdates} } +func (app *PersistentKVStoreApplication) LatestSnapshot() (height int64, numKeys map[string]int64, err error) { + return 0, make(map[string]int64), nil +} + +func (app *PersistentKVStoreApplication) ReadSnapshotChunk(height int64, startIndex, endIndex int64) (chunk map[string][][]byte, err error) { + return make(map[string][][]byte, 0), nil +} + +func (app *PersistentKVStoreApplication) StartRecovery(height int64, numKeys map[string]int64) error { + return nil +} + +func (app *PersistentKVStoreApplication) WriteRecoveryChunk(storeName string, chunk [][]byte) error { + return nil +} + +func (app *PersistentKVStoreApplication) EndRecovery() error { + return nil +} + //--------------------------------------------- // update validators diff --git a/abci/types/application.go b/abci/types/application.go index bf5b6c059ca..f055cdbf667 100644 --- a/abci/types/application.go +++ b/abci/types/application.go @@ -24,6 +24,13 @@ type Application interface { DeliverTx(tx []byte) ResponseDeliverTx // Deliver a tx for full processing EndBlock(RequestEndBlock) ResponseEndBlock // Signals the end of a block, returns changes to the validator set Commit() ResponseCommit // Commit the state and return the application Merkle root hash + + // State Connection + LatestSnapshot() (height int64, numKeys map[string]int64, err error) // query application state height and numOfKeys + ReadSnapshotChunk(height int64, startIndex, endIndex int64) (chunk map[string][][]byte, err error) + StartRecovery(height int64, numKeys map[string]int64) error + WriteRecoveryChunk(storeName string, chunk [][]byte) error + EndRecovery() error } //------------------------------------------------------- @@ -78,6 +85,26 @@ func (BaseApplication) EndBlock(req RequestEndBlock) ResponseEndBlock { return ResponseEndBlock{} } +func (BaseApplication) LatestSnapshot() (height int64, numKeys map[string]int64, err error) { + return 0, make(map[string]int64), nil +} + +func (BaseApplication) ReadSnapshotChunk(height int64, startIndex, endIndex int64) (chunk map[string][][]byte, err error) { + return make(map[string][][]byte, 0), nil +} + +func (BaseApplication) StartRecovery(height int64, numKeys map[string]int64) error { + return nil +} + +func (BaseApplication) WriteRecoveryChunk(storeName string, chunk [][]byte) error { + return nil +} + +func (BaseApplication) EndRecovery() error { + return nil +} + //------------------------------------------------------- // GRPCApplication is a GRPC wrapper for Application @@ -141,3 +168,23 @@ func (app *GRPCApplication) EndBlock(ctx context.Context, req *RequestEndBlock) res := app.app.EndBlock(*req) return &res, nil } + +func (app *GRPCApplication) LatestSnapshot() (height int64, numKeys map[string]int64, err error) { + return 0, make(map[string]int64), nil +} + +func (app *GRPCApplication) ReadSnapshotChunk(height int64, startIndex, endIndex int64) (chunk map[string][][]byte, err error) { + return make(map[string][][]byte, 0), nil +} + +func (app *GRPCApplication) StartRecovery(height int64, numKeys map[string]int64) error { + return nil +} + +func (app *GRPCApplication) WriteRecoveryChunk(storeName string, chunk [][]byte) error { + return nil +} + +func (app *GRPCApplication) EndRecovery() error { + return nil +} diff --git a/blockchain/reactor.go b/blockchain/reactor.go index fc1b1f4d349..4a51c1c38aa 100644 --- a/blockchain/reactor.go +++ b/blockchain/reactor.go @@ -124,6 +124,21 @@ func (bcR *BlockchainReactor) OnStop() { bcR.pool.Stop() } +// SwitchToBlockchain switches from fastest_sync mode to blockchain mode. +// It resets the state, turns off fastest_sync, and starts the blockchain state-machine +func (bcR *BlockchainReactor) SwitchToBlockchain(state sm.State) { + bcR.Logger.Info("SwitchToBlockchain") + bcR.initialState = state + + bcR.fastSync = true + + err := bcR.pool.Start() + if err != nil { + bcR.Logger.Error("Error starting blockchainReactor", "err", err) + return + } +} + // GetChannels implements Reactor func (bcR *BlockchainReactor) GetChannels() []*p2p.ChannelDescriptor { return []*p2p.ChannelDescriptor{ diff --git a/blockchain/state_pool.go b/blockchain/state_pool.go new file mode 100644 index 00000000000..fee175cddaf --- /dev/null +++ b/blockchain/state_pool.go @@ -0,0 +1,407 @@ +package blockchain + +import ( + "errors" + "fmt" + "math" + "sync" + "time" + + cmn "github.com/tendermint/tendermint/libs/common" + flow "github.com/tendermint/tendermint/libs/flowrate" + "github.com/tendermint/tendermint/libs/log" + + "github.com/tendermint/tendermint/p2p" + sm "github.com/tendermint/tendermint/state" +) + +/* + XXX: This file is copied from blockchain/pool.go +*/ + +/* + Peers self report their heights when we join the block pool. + Starting from our latest pool.height, we request blocks + in sequence from peers that reported higher heights than ours. + Every so often we ask peers what height they're on so we can keep going. + + Requests are continuously made for blocks of higher heights until + the limit is reached. If most of the requests have no available peers, and we + are not at peer limits, we can probably switch to consensus reactor +*/ + +type StatePool struct { + cmn.BaseService + startTime time.Time + + mtx sync.Mutex + // block requests + requester *spRequester + height int64 // the lowest key in requesters. + // peers + peers map[p2p.ID]*spPeer + maxPeerHeight int64 + + requestsCh chan<- StateRequest + errorsCh chan<- peerError +} + +func NewStatePool(start int64, requestsCh chan<- StateRequest, errorsCh chan<- peerError) *StatePool { + sp := &StatePool{ + peers: make(map[p2p.ID]*spPeer), + + height: start, + + requestsCh: requestsCh, + errorsCh: errorsCh, + } + sp.BaseService = *cmn.NewBaseService(nil, "StatePool", sp) + return sp +} + +func (pool *StatePool) OnStart() error { + pool.startTime = time.Now() + return nil +} + +func (pool *StatePool) OnStop() {} + +func (pool *StatePool) removeTimedoutPeers() { + pool.mtx.Lock() + defer pool.mtx.Unlock() + + for _, peer := range pool.peers { + if !peer.didTimeout && peer.numPending > 0 { + curRate := peer.recvMonitor.Status().CurRate + // curRate can be 0 on start + if curRate != 0 && curRate < minRecvRate { + err := errors.New("peer is not sending us data fast enough") + pool.sendError(err, peer.id) + pool.Logger.Error("SendTimeout", "peer", peer.id, + "reason", err, + "curRate", fmt.Sprintf("%d KB/s", curRate/1024), + "minRate", fmt.Sprintf("%d KB/s", minRecvRate/1024)) + peer.didTimeout = true + } + } + if peer.didTimeout { + pool.removePeer(peer.id) + } + } +} + +// TODO: relax conditions, prevent abuse. +func (pool *StatePool) IsCaughtUp() bool { + pool.mtx.Lock() + defer pool.mtx.Unlock() + + // Need at least 1 peer to be considered caught up. + if len(pool.peers) == 0 { + pool.Logger.Debug("Statepool has no peers") + return false + } + + // some conditions to determine if we're caught up + receivedStateOrTimedOut := (pool.height > 0 || time.Since(pool.startTime) > 5*time.Second) + ourChainIsLongestAmongPeers := pool.maxPeerHeight == 0 || pool.height >= pool.maxPeerHeight + isCaughtUp := receivedStateOrTimedOut && ourChainIsLongestAmongPeers + return isCaughtUp +} + +// MaxPeerHeight returns the highest height reported by a peer. +func (pool *StatePool) MaxPeerHeight() int64 { + pool.mtx.Lock() + defer pool.mtx.Unlock() + return pool.maxPeerHeight +} + +// Sets the peer's alleged blockchain height. +func (pool *StatePool) SetPeerHeight(peerID p2p.ID, height int64) { + pool.mtx.Lock() + defer pool.mtx.Unlock() + + peer := pool.peers[peerID] + if peer != nil { + peer.height = height + } else { + peer = newSPPeer(pool, peerID, height) + peer.setLogger(pool.Logger.With("peer", peerID)) + pool.peers[peerID] = peer + } + + if height > pool.maxPeerHeight { + pool.maxPeerHeight = height + } +} + +func (pool *StatePool) RemovePeer(peerID p2p.ID) { + pool.mtx.Lock() + defer pool.mtx.Unlock() + + pool.removePeer(peerID) +} + +func (pool *StatePool) removePeer(peerID p2p.ID) { + if pool.requester.getPeerID() == peerID { + pool.requester.redo() + } + delete(pool.peers, peerID) +} + +// Pick an available peer with at least the given minHeight. +// If no peers are available, returns nil. +func (pool *StatePool) pickIncrAvailablePeer(minHeight int64) *spPeer { + pool.mtx.Lock() + defer pool.mtx.Unlock() + + for _, peer := range pool.peers { + if peer.didTimeout { + pool.removePeer(peer.id) + continue + } + if peer.numPending >= maxPendingRequestsPerPeer { + continue + } + if peer.height < minHeight { + continue + } + peer.incrPending() + return peer + } + return nil +} + +func (pool *StatePool) makeRequester(height int64) { + pool.mtx.Lock() + defer pool.mtx.Unlock() + + request := newSPRequester(pool, height) + // request.SetLogger(pool.Logger.With("height", nextHeight)) + + pool.requester = request + + err := request.Start() + if err != nil { + request.Logger.Error("Error starting request", "err", err) + } +} + +func (pool *StatePool) sendRequest(height int64, peerID p2p.ID) { + if !pool.IsRunning() { + return + } + pool.requestsCh <- StateRequest{height, peerID} +} + +func (pool *StatePool) sendError(err error, peerID p2p.ID) { + if !pool.IsRunning() { + return + } + pool.errorsCh <- peerError{err, peerID} +} + +//------------------------------------- + +type spPeer struct { + pool *StatePool + id p2p.ID + recvMonitor *flow.Monitor + + height int64 + numPending int32 + timeout *time.Timer + didTimeout bool + + logger log.Logger +} + +func newSPPeer(pool *StatePool, peerID p2p.ID, height int64) *spPeer { + peer := &spPeer{ + pool: pool, + id: peerID, + height: height, + numPending: 0, + logger: log.NewNopLogger(), + } + return peer +} + +func (peer *spPeer) setLogger(l log.Logger) { + peer.logger = l +} + +func (peer *spPeer) resetMonitor() { + peer.recvMonitor = flow.New(time.Second, time.Second*40) + initialValue := float64(minRecvRate) * math.E + peer.recvMonitor.SetREMA(initialValue) +} + +func (peer *spPeer) resetTimeout() { + if peer.timeout == nil { + peer.timeout = time.AfterFunc(peerTimeout, peer.onTimeout) + } else { + peer.timeout.Reset(peerTimeout) + } +} + +func (peer *spPeer) incrPending() { + if peer.numPending == 0 { + peer.resetMonitor() + peer.resetTimeout() + } + peer.numPending++ +} + +func (peer *spPeer) decrPending(recvSize int) { + peer.numPending-- + if peer.numPending == 0 { + peer.timeout.Stop() + } else { + peer.recvMonitor.Update(recvSize) + peer.resetTimeout() + } +} + +func (peer *spPeer) onTimeout() { + peer.pool.mtx.Lock() + defer peer.pool.mtx.Unlock() + + err := errors.New("peer did not send us anything") + peer.pool.sendError(err, peer.id) + peer.logger.Error("SendTimeout", "reason", err, "timeout", peerTimeout) + peer.didTimeout = true +} + +//------------------------------------- + +type spRequester struct { + cmn.BaseService + pool *StatePool + height int64 + gotStateCh chan struct{} + redoCh chan struct{} + + mtx sync.Mutex + peerID p2p.ID + state *sm.State +} + +func newSPRequester(pool *StatePool, height int64) *spRequester { + spr := &spRequester{ + pool: pool, + height: height, + gotStateCh: make(chan struct{}, 1), + redoCh: make(chan struct{}, 1), + + peerID: "", + state: nil, + } + spr.BaseService = *cmn.NewBaseService(nil, "spRequester", spr) + return spr +} + +func (spr *spRequester) OnStart() error { + go spr.requestRoutine() + return nil +} + +// Returns true if the peer matches and state doesn't already exist. +func (spr *spRequester) setState(state *sm.State, peerID p2p.ID) bool { + spr.mtx.Lock() + if spr.state != nil || spr.peerID != peerID { + spr.mtx.Unlock() + return false + } + spr.state = state + spr.mtx.Unlock() + + select { + case spr.gotStateCh <- struct{}{}: + default: + } + return true +} + +func (spr *spRequester) getState() *sm.State { + spr.mtx.Lock() + defer spr.mtx.Unlock() + return spr.state +} + +func (spr *spRequester) getPeerID() p2p.ID { + spr.mtx.Lock() + defer spr.mtx.Unlock() + return spr.peerID +} + +// This is called from the requestRoutine, upon redo(). +func (spr *spRequester) reset() { + spr.mtx.Lock() + defer spr.mtx.Unlock() + + spr.peerID = "" + spr.state = nil +} + +// Tells spRequester to pick another peer and try again. +// NOTE: Nonblocking, and does nothing if another redo +// was already requested. +func (spr *spRequester) redo() { + select { + case spr.redoCh <- struct{}{}: + default: + } +} + +// Responsible for making more requests as necessary +// Returns only when a state is found (e.g. AddState() is called) +func (spr *spRequester) requestRoutine() { +OUTER_LOOP: + for { + // Pick a peer to send request to. + var peer *spPeer + PICK_PEER_LOOP: + for { + if !spr.IsRunning() || !spr.pool.IsRunning() { + return + } + peer = spr.pool.pickIncrAvailablePeer(spr.height) + if peer == nil { + //log.Info("No peers available", "height", height) + time.Sleep(requestIntervalMS * time.Millisecond) + continue PICK_PEER_LOOP + } + break PICK_PEER_LOOP + } + spr.mtx.Lock() + spr.peerID = peer.id + spr.mtx.Unlock() + + // Send request and wait. + spr.pool.sendRequest(spr.height, peer.id) + WAIT_LOOP: + for { + select { + case <-spr.pool.Quit(): + spr.Stop() + return + case <-spr.Quit(): + return + case <-spr.redoCh: + spr.reset() + continue OUTER_LOOP + case <-spr.gotStateCh: + // We got a block! + // Continue the for-loop and wait til Quit. + continue WAIT_LOOP + } + } + } +} + +//------------------------------------- + +type StateRequest struct { + Height int64 + PeerID p2p.ID +} diff --git a/blockchain/state_reactor.go b/blockchain/state_reactor.go new file mode 100644 index 00000000000..dbb54423152 --- /dev/null +++ b/blockchain/state_reactor.go @@ -0,0 +1,364 @@ +package blockchain + +import ( + "fmt" + "github.com/tendermint/tendermint/proxy" + "reflect" + "time" + + amino "github.com/tendermint/go-amino" + + dbm "github.com/tendermint/tendermint/libs/db" + "github.com/tendermint/tendermint/libs/log" + "github.com/tendermint/tendermint/p2p" + sm "github.com/tendermint/tendermint/state" + "github.com/tendermint/tendermint/types" +) + +/* + XXX: This file is copied from blockchain/reactor.go +*/ + +const ( + // BlockchainStateChannel is a channel for state and status updates (`StateStore` height) + BlockchainStateChannel = byte(0x35) + + tryStateSyncIntervalMS = 10 + + // stop syncing when last block's time is + // within this much of the system time. + // stopSyncingDurationMinutes = 10 + + // ask for best height every 10s + stateStatusUpdateIntervalSeconds = 10 + // check if we should switch to blockchain reactor + switchToFastSyncIntervalSeconds = 1 + + // NOTE: keep up to date with bcBlockResponseMessage + bcStateResponseMessagePrefixSize = 4 + bcStateResponseMessageFieldKeySize = 1 + maxStateMsgSize = types.MaxStateSizeBytes + + bcStateResponseMessagePrefixSize + + bcStateResponseMessageFieldKeySize +) + +// BlockchainReactor handles long-term catchup syncing. +type StateReactor struct { + p2p.BaseReactor + + // immutable + initialState sm.State + + stateDB dbm.DB + app proxy.AppConnState + pool *StatePool + fastestSyncHeight int64 // positive for enable this reactor + + requestsCh <-chan StateRequest + errorsCh <-chan peerError +} + +// NewBlockchainReactor returns new reactor instance. +func NewStateReactor(state sm.State, stateDB dbm.DB, app proxy.AppConnState, fastestSyncHeight int64) *StateReactor { + + // TODO: revisit doesn't need + //if state.LastBlockHeight != store.Height() { + // panic(fmt.Sprintf("state (%v) and store (%v) height mismatch", state.LastBlockHeight, + // store.Height())) + //} + + requestsCh := make(chan StateRequest, maxTotalRequesters) + + const capacity = 1000 // must be bigger than peers count + errorsCh := make(chan peerError, capacity) // so we don't block in #Receive#pool.AddBlock + + pool := NewStatePool( + fastestSyncHeight, + requestsCh, + errorsCh, + ) + + bcSR := &StateReactor{ + initialState: state, + stateDB: stateDB, + app: app, + pool: pool, + fastestSyncHeight: fastestSyncHeight, + requestsCh: requestsCh, + errorsCh: errorsCh, + } + bcSR.BaseReactor = *p2p.NewBaseReactor("BlockchainStateReactor", bcSR) + return bcSR +} + +// SetLogger implements cmn.Service by setting the logger on reactor and pool. +func (bcSR *StateReactor) SetLogger(l log.Logger) { + bcSR.BaseService.Logger = l + bcSR.pool.Logger = l +} + +// OnStart implements cmn.Service. +func (bcSR *StateReactor) OnStart() error { + if bcSR.fastestSyncHeight > 0 { + err := bcSR.pool.Start() + if err != nil { + return err + } + go bcSR.poolRoutine() + } + return nil +} + +// OnStop implements cmn.Service. +func (bcSR *StateReactor) OnStop() { + bcSR.pool.Stop() +} + +// GetChannels implements Reactor +func (_ *StateReactor) GetChannels() []*p2p.ChannelDescriptor { + return []*p2p.ChannelDescriptor{ + { + ID: BlockchainStateChannel, + Priority: 10, + SendQueueCapacity: 1000, + RecvBufferCapacity: 50 * 4096, + RecvMessageCapacity: maxStateMsgSize, + }, + } +} + +// AddPeer implements Reactor by sending our state to peer. +func (bcSR *StateReactor) AddPeer(peer p2p.Peer) { + _, numKeys, _ := bcSR.app.LatestSnapshot() + msgBytes := cdc.MustMarshalBinaryBare(&bcStateStatusResponseMessage{sm.LoadState(bcSR.stateDB).LastBlockHeight, numKeys}) + if !peer.Send(BlockchainStateChannel, msgBytes) { + // doing nothing, will try later in `poolRoutine` + } + // peer is added to the pool once we receive the first + // bcStateStatusResponseMessage from the peer and call pool.SetPeerHeight +} + +// RemovePeer implements Reactor by removing peer from the pool. +func (bcSR *StateReactor) RemovePeer(peer p2p.Peer, reason interface{}) { + bcSR.pool.RemovePeer(peer.ID()) +} + +// respondToPeer loads a state and sends it to the requesting peer, +// if we have it. Otherwise, we'll respond saying we don't have it. +// According to the Tendermint spec, if all nodes are honest, +// no node should be requesting for a state that's non-existent. +func (bcSR *StateReactor) respondToPeer(msg *bcStateRequestMessage, + src p2p.Peer) (queued bool) { + + state := sm.LoadStateForHeight(bcSR.stateDB, msg.Height) + if state == nil { + bcSR.Logger.Info("Peer asking for a state we don't have", "src", src, "height", msg.Height) + + msgBytes := cdc.MustMarshalBinaryBare(&bcNoStateResponseMessage{Height: msg.Height}) + return src.TrySend(BlockchainStateChannel, msgBytes) + } + + appState, err := bcSR.app.ReadSnapshotChunk(msg.Height, 0, 0) + if err != nil { + bcSR.Logger.Info("Peer asking for an application state we don't have", "src", src, "height", msg.Height, "err", err) + + msgBytes := cdc.MustMarshalBinaryBare(&bcNoStateResponseMessage{Height: msg.Height}) + return src.TrySend(BlockchainStateChannel, msgBytes) + } + + msgBytes := cdc.MustMarshalBinaryBare(&bcStateResponseMessage{State: state, applicationState: appState}) + return src.TrySend(BlockchainStateChannel, msgBytes) + +} + +// Receive implements Reactor by handling 4 types of messages (look below). +func (bcSR *StateReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) { + msg, err := decodeStateMsg(msgBytes) + if err != nil { + bcSR.Logger.Error("Error decoding message", "src", src, "chId", chID, "msg", msg, "err", err, "bytes", msgBytes) + bcSR.Switch.StopPeerForError(src, err) + return + } + + bcSR.Logger.Debug("Receive", "src", src, "chID", chID, "msg", msg) + + switch msg := msg.(type) { + case *bcStateRequestMessage: + if queued := bcSR.respondToPeer(msg, src); !queued { + // Unfortunately not queued since the queue is full. + } + case *bcStateResponseMessage: + // Got a block. + //bcSR.pool.AddState(src.ID(), msg.State, len(msgBytes)) + //bcSR.pool.PopRequest() + + sm.SaveState(bcSR.stateDB, *msg.State) + for store, kv := range msg.applicationState { + err := bcSR.app.WriteRecoveryChunk(store, kv) + if err != nil { + bcSR.Logger.Error("Failed to recover application state", "store", store, "numOfKeys", len(kv)/2) + } + } + bcSR.app.EndRecovery() + + bcSR.Logger.Info("Time to switch to blockchain reactor!", "height", msg.State.LastBlockHeight) + bcSR.pool.Stop() + + bcR := bcSR.Switch.Reactor("BLOCKCHAIN").(*BlockchainReactor) + bcR.SwitchToBlockchain(*msg.State) + case *bcStateStatusRequestMessage: + // Send peer our state. + height, numKeys, err := bcSR.app.LatestSnapshot() + if err != nil { + bcSR.Logger.Error("failed to load application state", "err", err) + } + state := sm.LoadState(bcSR.stateDB) + if state.LastBlockHeight != height { + bcSR.Logger.Error("application and state height is inconsistent") + } + msgBytes := cdc.MustMarshalBinaryBare(&bcStateStatusResponseMessage{state.LastBlockHeight, numKeys}) + queued := src.TrySend(BlockchainStateChannel, msgBytes) + if !queued { + // sorry + } + case *bcStateStatusResponseMessage: + // Got a peer status. Unverified. + bcSR.pool.SetPeerHeight(src.ID(), msg.Height) + bcSR.pool.makeRequester(msg.Height) + default: + bcSR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg))) + } +} + +// Handle messages from the poolReactor telling the reactor what to do. +// NOTE: Don't sleep in the FOR_LOOP or otherwise slow it down! +func (bcSR *StateReactor) poolRoutine() { + + statusUpdateTicker := time.NewTicker(statusUpdateIntervalSeconds * time.Second) + +FOR_LOOP: + for { + select { + case request := <-bcSR.requestsCh: + peer := bcSR.Switch.Peers().Get(request.PeerID) + if peer == nil { + continue FOR_LOOP // Peer has since been disconnected. + } + msgBytes := cdc.MustMarshalBinaryBare(&bcStateRequestMessage{request.Height}) + queued := peer.TrySend(BlockchainStateChannel, msgBytes) + if !queued { + // We couldn't make the request, send-queue full. + // The pool handles timeouts, just let it go. + continue FOR_LOOP + } + + case err := <-bcSR.errorsCh: + peer := bcSR.Switch.Peers().Get(err.peerID) + if peer != nil { + bcSR.Switch.StopPeerForError(peer, err) + } + + case <-statusUpdateTicker.C: + // ask for status updates + go bcSR.BroadcastStateStatusRequest() // nolint: errcheck + + //case <-switchToBlockTicker.C: + // height, numPending, lenRequesters := bcSR.pool.GetStatus() + // outbound, inbound, _ := bcSR.Switch.NumPeers() + // bcSR.Logger.Debug("Block ticker", "numPending", numPending, "total", lenRequesters, + // "outbound", outbound, "inbound", inbound) + // if bcSR.pool.IsCaughtUp() { + // bcSR.Logger.Info("Time to switch to consensus reactor!", "height", height) + // bcSR.pool.Stop() + // + // bcR := bcSR.Switch.Reactor("BLOCKCHAIN").(*BlockchainReactor) + // bcR.SwitchToBlockchain(state, blocksSynced) + // + // break FOR_LOOP + // } + + case <-bcSR.Quit(): + break FOR_LOOP + } + } +} + +// BroadcastStatusRequest broadcasts `StateStore` height. +func (bcSR *StateReactor) BroadcastStateStatusRequest() error { + msgBytes := cdc.MustMarshalBinaryBare(&bcStateStatusRequestMessage{sm.LoadState(bcSR.stateDB).LastBlockHeight}) + bcSR.Switch.Broadcast(BlockchainStateChannel, msgBytes) + return nil +} + +//----------------------------------------------------------------------------- +// Messages + +// BlockchainMessage is a generic message for this reactor. +type BlockchainStateMessage interface{} + +func RegisterBlockchainStateMessages(cdc *amino.Codec) { + cdc.RegisterInterface((*BlockchainStateMessage)(nil), nil) + cdc.RegisterConcrete(&bcStateRequestMessage{}, "tendermint/blockchain/StateRequest", nil) + cdc.RegisterConcrete(&bcStateResponseMessage{}, "tendermint/blockchain/StateResponse", nil) + cdc.RegisterConcrete(&bcNoStateResponseMessage{}, "tendermint/blockchain/NoStateResponse", nil) + cdc.RegisterConcrete(&bcStateStatusResponseMessage{}, "tendermint/blockchain/StateStatusResponse", nil) + cdc.RegisterConcrete(&bcStateStatusRequestMessage{}, "tendermint/blockchain/StateStatusRequest", nil) +} + +func decodeStateMsg(bz []byte) (msg BlockchainStateMessage, err error) { + if len(bz) > maxMsgSize { + return msg, fmt.Errorf("Staet msg exceeds max size (%d > %d)", len(bz), maxMsgSize) + } + err = cdc.UnmarshalBinaryBare(bz, &msg) + return +} + +//------------------------------------- + +type bcStateRequestMessage struct { + Height int64 +} + +func (m *bcStateRequestMessage) String() string { + return fmt.Sprintf("[bcStateRequestMessage %v]", m.Height) +} + +type bcNoStateResponseMessage struct { + Height int64 +} + +func (brm *bcNoStateResponseMessage) String() string { + return fmt.Sprintf("[bcNoStateResponseMessage %d]", brm.Height) +} + +//------------------------------------- + +type bcStateResponseMessage struct { + State *sm.State + applicationState map[string][][]byte // one key followed by one value +} + +func (m *bcStateResponseMessage) String() string { + return fmt.Sprintf("[bcStateResponseMessage %v]", m.State.LastBlockHeight) +} + +//------------------------------------- + +type bcStateStatusRequestMessage struct { + Height int64 +} + +func (m *bcStateStatusRequestMessage) String() string { + return fmt.Sprintf("[bcStateStatusRequestMessage %v]", m.Height) +} + +//------------------------------------- + +type bcStateStatusResponseMessage struct { + Height int64 + numKeys map[string]int64 +} + +func (m *bcStateStatusResponseMessage) String() string { + return fmt.Sprintf("[bcStateStatusResponseMessage %v]", m.Height) +} diff --git a/blockchain/wire.go b/blockchain/wire.go index 91156fa8f2c..38bea8433cd 100644 --- a/blockchain/wire.go +++ b/blockchain/wire.go @@ -9,5 +9,6 @@ var cdc = amino.NewCodec() func init() { RegisterBlockchainMessages(cdc) + RegisterBlockchainStateMessages(cdc) types.RegisterBlockAmino(cdc) } diff --git a/config/config.go b/config/config.go index 39a02775a38..837d5ee402f 100644 --- a/config/config.go +++ b/config/config.go @@ -113,6 +113,8 @@ type BaseConfig struct { // and verifying their commits FastSync bool `mapstructure:"fast_sync"` + FastestSyncHeight int64 `mapstructure:"fastest_sync_height"` + // Database backend: leveldb | memdb | cleveldb DBBackend string `mapstructure:"db_backend"` @@ -161,6 +163,7 @@ func DefaultBaseConfig() BaseConfig { LogLevel: DefaultPackageLogLevels(), ProfListenAddress: "", FastSync: true, + FastestSyncHeight: -1, FilterPeers: false, DBBackend: "leveldb", DBPath: "data", diff --git a/config/toml.go b/config/toml.go index 35bb2ab5961..873a7f83530 100644 --- a/config/toml.go +++ b/config/toml.go @@ -77,6 +77,8 @@ moniker = "{{ .BaseConfig.Moniker }}" # and verifying their commits fast_sync = {{ .BaseConfig.FastSync }} +fastest_sync_height = {{ .BaseConfig.FastestSyncHeight }} + # Database backend: leveldb | memdb | cleveldb db_backend = "{{ .BaseConfig.DBBackend }}" diff --git a/node/node.go b/node/node.go index ebab55b1995..cc6c7e9a4ba 100644 --- a/node/node.go +++ b/node/node.go @@ -13,7 +13,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" - amino "github.com/tendermint/go-amino" + "github.com/tendermint/go-amino" abci "github.com/tendermint/tendermint/abci/types" bc "github.com/tendermint/tendermint/blockchain" @@ -32,8 +32,8 @@ import ( rpccore "github.com/tendermint/tendermint/rpc/core" ctypes "github.com/tendermint/tendermint/rpc/core/types" grpccore "github.com/tendermint/tendermint/rpc/grpc" - rpc "github.com/tendermint/tendermint/rpc/lib" - rpcserver "github.com/tendermint/tendermint/rpc/lib/server" + "github.com/tendermint/tendermint/rpc/lib" + "github.com/tendermint/tendermint/rpc/lib/server" sm "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/state/txindex" "github.com/tendermint/tendermint/state/txindex/kv" @@ -287,12 +287,28 @@ func NewNode(config *cfg.Config, evidenceReactor := evidence.NewEvidenceReactor(evidencePool) evidenceReactor.SetLogger(evidenceLogger) + fastestSyncHeight := config.FastestSyncHeight + if state.Validators.Size() == 1 { + addr, _ := state.Validators.GetByIndex(0) + if bytes.Equal(privValidator.GetAddress(), addr) { + fastestSyncHeight = -1 + } + } + if state.LastBlockHeight > fastestSyncHeight { + // if we are already more advance than requested, we don't need fastest sync + // this will prevent fastest sync after restart node after first launch + + fastestSyncHeight = -1 + } + // TODO: revisit - seems doesn't need Copy state + stateReactor := bc.NewStateReactor(state, stateDB, proxyApp.State(), fastestSyncHeight) + blockExecLogger := logger.With("module", "state") // make block executor for consensus and blockchain reactors to execute blocks blockExec := sm.NewBlockExecutor(stateDB, blockExecLogger, proxyApp.Consensus(), mempool, evidencePool, config.WithAppStat) // Make BlockchainReactor - bcReactor := bc.NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync) + bcReactor := bc.NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync && fastestSyncHeight == -1) bcReactor.SetLogger(logger.With("module", "blockchain")) // Make ConsensusReactor @@ -309,7 +325,7 @@ func NewNode(config *cfg.Config, if privValidator != nil { consensusState.SetPrivValidator(privValidator) } - consensusReactor := cs.NewConsensusReactor(consensusState, fastSync) + consensusReactor := cs.NewConsensusReactor(consensusState, fastSync && fastestSyncHeight != -1) consensusReactor.SetLogger(consensusLogger) eventBus := types.NewEventBus() @@ -409,6 +425,7 @@ func NewNode(config *cfg.Config, ) sw.SetLogger(p2pLogger) sw.AddReactor("MEMPOOL", mempoolReactor) + sw.AddReactor("STATE", stateReactor) sw.AddReactor("BLOCKCHAIN", bcReactor) sw.AddReactor("CONSENSUS", consensusReactor) sw.AddReactor("EVIDENCE", evidenceReactor) @@ -767,6 +784,7 @@ func makeNodeInfo( Network: chainID, Version: version.Version, Channels: []byte{ + bc.BlockchainStateChannel, bc.BlockchainChannel, cs.StateChannel, cs.DataChannel, cs.VoteChannel, cs.VoteSetBitsChannel, mempl.MempoolChannel, diff --git a/proxy/app_conn.go b/proxy/app_conn.go index 1870adc615a..6a0afe56cbc 100644 --- a/proxy/app_conn.go +++ b/proxy/app_conn.go @@ -31,6 +31,17 @@ type AppConnMempool interface { FlushSync() error } +type AppConnState interface { + SetResponseCallback(abcicli.Callback) + Error() error + + LatestSnapshot() (height int64, numKeys map[string]int64, err error) // query application state height and numOfKeys + ReadSnapshotChunk(height int64, startIndex, endIndex int64) (chunk map[string][][]byte, err error) + StartRecovery(height int64, numKeys map[string]int64) error + WriteRecoveryChunk(storeName string, chunk [][]byte) error + EndRecovery() error +} + type AppConnQuery interface { Error() error @@ -44,6 +55,47 @@ type AppConnQuery interface { //----------------------------------------------------------------------------------------- // Implements AppConnConsensus (subset of abcicli.Client) +type appConnState struct { + appConn abcicli.Client +} + +func NewAppConnState(appConn abcicli.Client) *appConnState { + return &appConnState{ + appConn: appConn, + } +} + +func (app *appConnState) SetResponseCallback(cb abcicli.Callback) { + app.appConn.SetResponseCallback(cb) +} + +func (app *appConnState) Error() error { + return app.appConn.Error() +} + +func (app *appConnState) LatestSnapshot() (height int64, numKeys map[string]int64, err error) { + return app.appConn.LatestSnapshot() +} + +func (app *appConnState) ReadSnapshotChunk(height int64, startIndex, endIndex int64) (chunk map[string][][]byte, err error) { + return app.appConn.ReadSnapshotChunk(height, startIndex, endIndex) +} + +func (app *appConnState) StartRecovery(height int64, numKeys map[string]int64) error { + return app.appConn.StartRecovery(height, numKeys) +} + +func (app *appConnState) WriteRecoveryChunk(storeName string, chunk [][]byte) error { + return app.appConn.WriteRecoveryChunk(storeName, chunk) +} + +func (app *appConnState) EndRecovery() error { + return app.appConn.EndRecovery() +} + +//----------------------------------------------------------------------------------------- +// Implements AppConnConsensus (subset of abcicli.Client) + type appConnConsensus struct { appConn abcicli.Client } diff --git a/proxy/multi_app_conn.go b/proxy/multi_app_conn.go index b5897d8a5c0..25b61dcfcbd 100644 --- a/proxy/multi_app_conn.go +++ b/proxy/multi_app_conn.go @@ -12,6 +12,7 @@ import ( type AppConns interface { cmn.Service + State() AppConnState Mempool() AppConnMempool Consensus() AppConnConsensus Query() AppConnQuery @@ -30,6 +31,7 @@ func NewAppConns(clientCreator ClientCreator) AppConns { type multiAppConn struct { cmn.BaseService + stateConn *appConnState mempoolConn *appConnMempool consensusConn *appConnConsensus queryConn *appConnQuery @@ -46,6 +48,11 @@ func NewMultiAppConn(clientCreator ClientCreator) *multiAppConn { return multiAppConn } +// Returns the state connection +func (app *multiAppConn) State() AppConnState { + return app.stateConn +} + // Returns the mempool connection func (app *multiAppConn) Mempool() AppConnMempool { return app.mempoolConn @@ -73,6 +80,17 @@ func (app *multiAppConn) OnStart() error { } app.queryConn = NewAppConnQuery(querycli) + // state connection + statecli, err := app.clientCreator.NewABCIClient() + if err != nil { + return errors.Wrap(err, "Error creating ABCI client (state connection)") + } + statecli.SetLogger(app.Logger.With("module", "abci-client", "connection", "state")) + if err := statecli.Start(); err != nil { + return errors.Wrap(err, "Error starting ABCI client (state connection)") + } + app.stateConn = NewAppConnState(statecli) + // mempool connection memcli, err := app.clientCreator.NewABCIClient() if err != nil { diff --git a/state/store.go b/state/store.go index 2f90c747ea3..d61b5a24afe 100644 --- a/state/store.go +++ b/state/store.go @@ -11,6 +11,10 @@ import ( //------------------------------------------------------------------------ +func calcStateKey(height int64) []byte { + return []byte(fmt.Sprintf("stateKey:%v", height)) +} + func calcValidatorsKey(height int64) []byte { return []byte(fmt.Sprintf("validatorsKey:%v", height)) } @@ -62,6 +66,24 @@ func LoadState(db dbm.DB) State { return loadState(db, stateKey) } +func LoadStateForHeight(db dbm.DB, height int64) *State { + var state State + buf := db.Get(calcStateKey(height)) + if len(buf) == 0 { + return nil + } + + err := cdc.UnmarshalBinaryBare(buf, &state) + if err != nil { + // DATA HAS BEEN CORRUPTED OR THE SPEC HAS CHANGED + cmn.Exit(fmt.Sprintf(`LoadState: Data has been corrupted or its spec has changed: + %v\n`, err)) + } + // TODO: ensure that buf is completely read. + + return &state +} + func loadState(db dbm.DB, key []byte) (state State) { buf := db.Get(key) if len(buf) == 0 { @@ -96,6 +118,7 @@ func saveState(db dbm.DB, state State, key []byte) { saveValidatorsInfo(db, nextHeight+1, state.LastHeightValidatorsChanged, state.NextValidators) // Save next consensus params. saveConsensusParamsInfo(db, nextHeight, state.LastHeightConsensusParamsChanged, state.ConsensusParams) + db.SetSync(calcStateKey(nextHeight), state.Bytes()) //TODO: encapsulate with ival tree db.SetSync(stateKey, state.Bytes()) } diff --git a/types/params.go b/types/params.go index 46207c17c30..75c20662935 100644 --- a/types/params.go +++ b/types/params.go @@ -12,6 +12,9 @@ const ( // BlockPartSizeBytes is the size of one block part. BlockPartSizeBytes = 1024 * 1024 // 1MB + + // MaxStateSizeBytes is the maximum permitted size of the blocks. + MaxStateSizeBytes = 10485760 // 10MB ) // ConsensusParams contains consensus critical parameters that determine the