Skip to content

Commit

Permalink
tendermint#121 state sync mode
Browse files Browse the repository at this point in the history
  • Loading branch information
ackratos committed Dec 31, 2018
1 parent 53e8710 commit 2a70e3e
Show file tree
Hide file tree
Showing 18 changed files with 1,040 additions and 164 deletions.
159 changes: 0 additions & 159 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions abci/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ type Client interface {
InitChainSync(types.RequestInitChain) (*types.ResponseInitChain, error)
BeginBlockSync(types.RequestBeginBlock) (*types.ResponseBeginBlock, error)
EndBlockSync(types.RequestEndBlock) (*types.ResponseEndBlock, error)

LatestSnapshot() (height int64, numKeys map[string]int64, err error) // query application state height and numOfKeys
ReadSnapshotChunk(height int64, startIndex, endIndex int64) (chunk map[string][][]byte, err error)
StartRecovery(height int64, numKeys map[string]int64) error
WriteRecoveryChunk(storeName string, chunk [][]byte) error
EndRecovery() error
}

//----------------------------------------
Expand Down
16 changes: 16 additions & 0 deletions abci/client/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,3 +308,19 @@ func (cli *grpcClient) EndBlockSync(params types.RequestEndBlock) (*types.Respon
reqres := cli.EndBlockAsync(params)
return reqres.Response.GetEndBlock(), cli.Error()
}

func (cli *grpcClient) LatestSnapshot() (height int64, numKeys map[string]int64, err error) {
return 0, make(map[string]int64), nil
}
func (cli *grpcClient) ReadSnapshotChunk(height int64, startIndex, endIndex int64) (chunk map[string][][]byte, err error) {
return make(map[string][][]byte), nil
}
func (cli *grpcClient) StartRecovery(height int64, numKeys map[string]int64) error {
return nil
}
func (cli *grpcClient) WriteRecoveryChunk(storeName string, chunk [][]byte) error {
return nil
}
func (cli *grpcClient) EndRecovery() error {
return nil
}
22 changes: 22 additions & 0 deletions abci/client/local_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,28 @@ func (app *localClient) EndBlockAsync(req types.RequestEndBlock) *ReqRes {
)
}

func (app *localClient) LatestSnapshot() (height int64, numKeys map[string]int64, err error) {
app.mtx.Lock()
defer app.mtx.Unlock()
return app.Application.LatestSnapshot()
}

func (app *localClient) ReadSnapshotChunk(height int64, startIndex, endIndex int64) (chunk map[string][][]byte, err error) {
return app.Application.ReadSnapshotChunk(height, startIndex, endIndex)
}

func (app *localClient) StartRecovery(height int64, numKeys map[string]int64) error {
return app.Application.StartRecovery(height, numKeys)
}

func (app *localClient) WriteRecoveryChunk(storeName string, chunk [][]byte) error {
return app.Application.WriteRecoveryChunk(storeName, chunk)
}

func (app *localClient) EndRecovery() error {
return app.Application.EndRecovery()
}

//-------------------------------------------------------

func (app *localClient) FlushSync() error {
Expand Down
18 changes: 18 additions & 0 deletions abci/client/socket_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,24 @@ func (cli *socketClient) EndBlockSync(req types.RequestEndBlock) (*types.Respons

//----------------------------------------

func (cli *socketClient) LatestSnapshot() (height int64, numKeys map[string]int64, err error) {
return 0, make(map[string]int64), nil
}
func (cli *socketClient) ReadSnapshotChunk(height int64, startIndex, endIndex int64) (chunk map[string][][]byte, err error) {
return make(map[string][][]byte), nil
}
func (cli *socketClient) StartRecovery(height int64, numKeys map[string]int64) error {
return nil
}
func (cli *socketClient) WriteRecoveryChunk(storeName string, chunk [][]byte) error {
return nil
}
func (cli *socketClient) EndRecovery() error {
return nil
}

//----------------------------------------

func (cli *socketClient) queueRequest(req *types.Request) *ReqRes {
reqres := NewReqRes(req)

Expand Down
20 changes: 20 additions & 0 deletions abci/example/kvstore/persistent_kvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,26 @@ func (app *PersistentKVStoreApplication) EndBlock(req types.RequestEndBlock) typ
return types.ResponseEndBlock{ValidatorUpdates: app.ValUpdates}
}

func (app *PersistentKVStoreApplication) LatestSnapshot() (height int64, numKeys map[string]int64, err error) {
return 0, make(map[string]int64), nil
}

func (app *PersistentKVStoreApplication) ReadSnapshotChunk(height int64, startIndex, endIndex int64) (chunk map[string][][]byte, err error) {
return make(map[string][][]byte, 0), nil
}

func (app *PersistentKVStoreApplication) StartRecovery(height int64, numKeys map[string]int64) error {
return nil
}

func (app *PersistentKVStoreApplication) WriteRecoveryChunk(storeName string, chunk [][]byte) error {
return nil
}

func (app *PersistentKVStoreApplication) EndRecovery() error {
return nil
}

//---------------------------------------------
// update validators

Expand Down
47 changes: 47 additions & 0 deletions abci/types/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,13 @@ type Application interface {
DeliverTx(tx []byte) ResponseDeliverTx // Deliver a tx for full processing
EndBlock(RequestEndBlock) ResponseEndBlock // Signals the end of a block, returns changes to the validator set
Commit() ResponseCommit // Commit the state and return the application Merkle root hash

// State Connection
LatestSnapshot() (height int64, numKeys map[string]int64, err error) // query application state height and numOfKeys
ReadSnapshotChunk(height int64, startIndex, endIndex int64) (chunk map[string][][]byte, err error)
StartRecovery(height int64, numKeys map[string]int64) error
WriteRecoveryChunk(storeName string, chunk [][]byte) error
EndRecovery() error
}

//-------------------------------------------------------
Expand Down Expand Up @@ -78,6 +85,26 @@ func (BaseApplication) EndBlock(req RequestEndBlock) ResponseEndBlock {
return ResponseEndBlock{}
}

func (BaseApplication) LatestSnapshot() (height int64, numKeys map[string]int64, err error) {
return 0, make(map[string]int64), nil
}

func (BaseApplication) ReadSnapshotChunk(height int64, startIndex, endIndex int64) (chunk map[string][][]byte, err error) {
return make(map[string][][]byte, 0), nil
}

func (BaseApplication) StartRecovery(height int64, numKeys map[string]int64) error {
return nil
}

func (BaseApplication) WriteRecoveryChunk(storeName string, chunk [][]byte) error {
return nil
}

func (BaseApplication) EndRecovery() error {
return nil
}

//-------------------------------------------------------

// GRPCApplication is a GRPC wrapper for Application
Expand Down Expand Up @@ -141,3 +168,23 @@ func (app *GRPCApplication) EndBlock(ctx context.Context, req *RequestEndBlock)
res := app.app.EndBlock(*req)
return &res, nil
}

func (app *GRPCApplication) LatestSnapshot() (height int64, numKeys map[string]int64, err error) {
return 0, make(map[string]int64), nil
}

func (app *GRPCApplication) ReadSnapshotChunk(height int64, startIndex, endIndex int64) (chunk map[string][][]byte, err error) {
return make(map[string][][]byte, 0), nil
}

func (app *GRPCApplication) StartRecovery(height int64, numKeys map[string]int64) error {
return nil
}

func (app *GRPCApplication) WriteRecoveryChunk(storeName string, chunk [][]byte) error {
return nil
}

func (app *GRPCApplication) EndRecovery() error {
return nil
}
15 changes: 15 additions & 0 deletions blockchain/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,21 @@ func (bcR *BlockchainReactor) OnStop() {
bcR.pool.Stop()
}

// SwitchToBlockchain switches from fastest_sync mode to blockchain mode.
// It resets the state, turns off fastest_sync, and starts the blockchain state-machine
func (bcR *BlockchainReactor) SwitchToBlockchain(state sm.State) {
bcR.Logger.Info("SwitchToBlockchain")
bcR.initialState = state

bcR.fastSync = true

err := bcR.pool.Start()
if err != nil {
bcR.Logger.Error("Error starting blockchainReactor", "err", err)
return
}
}

// GetChannels implements Reactor
func (bcR *BlockchainReactor) GetChannels() []*p2p.ChannelDescriptor {
return []*p2p.ChannelDescriptor{
Expand Down
Loading

0 comments on commit 2a70e3e

Please sign in to comment.