From f045a35dc2e3713be5b5020511cd20930dbc6fdf Mon Sep 17 00:00:00 2001 From: Jernej Kos Date: Tue, 23 Jun 2020 12:31:10 +0200 Subject: [PATCH] go/consensus/tendermint: Add support for state sync --- .changelog/2880.feature.md | 1 + go/consensus/tendermint/abci/mux.go | 218 ++++++++++++++++++ go/consensus/tendermint/abci/state.go | 18 ++ go/consensus/tendermint/light.go | 183 +++++++++++++++ go/consensus/tendermint/statesync.go | 164 +++++++++++++ go/consensus/tendermint/tendermint.go | 56 +++++ go/oasis-test-runner/oasis/args.go | 16 ++ go/oasis-test-runner/oasis/oasis.go | 23 ++ .../scenario/e2e/consensus_state_sync.go | 140 +++++++++++ go/oasis-test-runner/scenario/e2e/e2e.go | 2 + 10 files changed, 821 insertions(+) create mode 100644 .changelog/2880.feature.md create mode 100644 go/consensus/tendermint/light.go create mode 100644 go/consensus/tendermint/statesync.go create mode 100644 go/oasis-test-runner/scenario/e2e/consensus_state_sync.go diff --git a/.changelog/2880.feature.md b/.changelog/2880.feature.md new file mode 100644 index 00000000000..ea8ee75a650 --- /dev/null +++ b/.changelog/2880.feature.md @@ -0,0 +1 @@ +go/consensus/tendermint: Add support for state sync diff --git a/go/consensus/tendermint/abci/mux.go b/go/consensus/tendermint/abci/mux.go index fb6d4c4b784..c8659f26708 100644 --- a/go/consensus/tendermint/abci/mux.go +++ b/go/consensus/tendermint/abci/mux.go @@ -31,6 +31,7 @@ import ( "github.com/oasisprotocol/oasis-core/go/consensus/tendermint/api" epochtime "github.com/oasisprotocol/oasis-core/go/epochtime/api" genesis "github.com/oasisprotocol/oasis-core/go/genesis/api" + "github.com/oasisprotocol/oasis-core/go/storage/mkvs/checkpoint" upgrade "github.com/oasisprotocol/oasis-core/go/upgrade/api" ) @@ -881,6 +882,223 @@ func (mux *abciMux) Commit() types.ResponseCommit { } } +func (mux *abciMux) ListSnapshots(req types.RequestListSnapshots) types.ResponseListSnapshots { + // Get a list of all current checkpoints. + cps, err := mux.state.storage.Checkpointer().GetCheckpoints(mux.state.ctx, &checkpoint.GetCheckpointsRequest{ + Version: 1, + }) + if err != nil { + mux.logger.Error("failed to get checkpoints", + "err", err, + ) + return types.ResponseListSnapshots{} + } + + var rsp types.ResponseListSnapshots + for _, cp := range cps { + cpHash := cp.EncodedHash() + + rsp.Snapshots = append(rsp.Snapshots, &types.Snapshot{ + Height: cp.Root.Version, + Format: uint32(cp.Version), + Chunks: uint32(len(cp.Chunks)), + Hash: cpHash[:], + Metadata: cbor.Marshal(cp), + }) + } + + return rsp +} + +func (mux *abciMux) OfferSnapshot(req types.RequestOfferSnapshot) types.ResponseOfferSnapshot { + if req.Snapshot == nil { + return types.ResponseOfferSnapshot{Result: types.ResponseOfferSnapshot_REJECT} + } + if req.Snapshot.Format != 1 { + mux.logger.Warn("received snapshot with unsupported version", + "version", req.Snapshot.Format, + ) + return types.ResponseOfferSnapshot{Result: types.ResponseOfferSnapshot_REJECT_FORMAT} + } + + // Decode checkpoint metadata hash and sanity check against the request. + var metadataHash hash.Hash + metadataHash.FromBytes(req.Snapshot.Metadata) + var h hash.Hash + if err := h.UnmarshalBinary(req.Snapshot.Hash); err != nil { + mux.logger.Warn("received snapshot with malformed hash", + "err", err, + ) + return types.ResponseOfferSnapshot{Result: types.ResponseOfferSnapshot_REJECT} + } + if !metadataHash.Equal(&h) { + mux.logger.Warn("received snapshot with mismatching hash", + "expected_hash", h, + "hash", metadataHash, + ) + return types.ResponseOfferSnapshot{Result: types.ResponseOfferSnapshot_REJECT} + } + + // Decode checkpoint metadata. + var cp checkpoint.Metadata + if err := cbor.Unmarshal(req.Snapshot.Metadata, &cp); err != nil { + mux.logger.Warn("received snapshot with malformed metadata", + "err", err, + ) + return types.ResponseOfferSnapshot{Result: types.ResponseOfferSnapshot_REJECT} + } + + // Number of chunks must match. + if int(req.Snapshot.Chunks) != len(cp.Chunks) { + mux.logger.Warn("received snapshot with mismatching number of chunks", + "expected_chunks", len(cp.Chunks), + "chunks", req.Snapshot.Chunks, + ) + return types.ResponseOfferSnapshot{Result: types.ResponseOfferSnapshot_REJECT} + } + // Root hash must match. + var appHash hash.Hash + if err := appHash.UnmarshalBinary(req.AppHash); err != nil { + // NOTE: This should never happen as it indicates a problem with Tendermint. + mux.logger.Error("received request with malformed hash", + "err", err, + ) + return types.ResponseOfferSnapshot{Result: types.ResponseOfferSnapshot_ABORT} + } + if !cp.Root.Hash.Equal(&appHash) { + mux.logger.Warn("received snapshot with mismatching root hash", + "expected_root", appHash, + "root", cp.Root.Hash, + ) + return types.ResponseOfferSnapshot{Result: types.ResponseOfferSnapshot_REJECT} + } + + // Snapshot seems correct (e.g., it is for the correct root), start the restoration process. + if err := mux.state.storage.Checkpointer().StartRestore(mux.state.ctx, &cp); err != nil { + mux.logger.Error("failed to start restore", + "err", err, + ) + return types.ResponseOfferSnapshot{Result: types.ResponseOfferSnapshot_ABORT} + } + + mux.logger.Info("started state restore process", + "root", cp.Root, + ) + + return types.ResponseOfferSnapshot{Result: types.ResponseOfferSnapshot_ACCEPT} +} + +func (mux *abciMux) LoadSnapshotChunk(req types.RequestLoadSnapshotChunk) types.ResponseLoadSnapshotChunk { + // Fetch the metadata for the specified checkpoint. + cps, err := mux.state.storage.Checkpointer().GetCheckpoints(mux.state.ctx, &checkpoint.GetCheckpointsRequest{ + Version: uint16(req.Format), + RootVersion: &req.Height, + }) + if err != nil { + mux.logger.Error("failed to get checkpoints", + "err", err, + ) + return types.ResponseLoadSnapshotChunk{} + } + if len(cps) != 1 { + mux.logger.Error("failed to get checkpoints", + "cps", len(cps), + ) + return types.ResponseLoadSnapshotChunk{} + } + + // Fetch the chunk itself. + chunk, err := cps[0].GetChunkMetadata(uint64(req.Chunk)) + if err != nil { + mux.logger.Error("failed to get chunk metadata", + "err", err, + ) + return types.ResponseLoadSnapshotChunk{} + } + var buf bytes.Buffer + if err := mux.state.storage.Checkpointer().GetCheckpointChunk(mux.state.ctx, chunk, &buf); err != nil { + mux.logger.Error("failed to get chunk", + "err", err, + ) + return types.ResponseLoadSnapshotChunk{} + } + + return types.ResponseLoadSnapshotChunk{Chunk: buf.Bytes()} +} + +func (mux *abciMux) ApplySnapshotChunk(req types.RequestApplySnapshotChunk) types.ResponseApplySnapshotChunk { + cp := mux.state.storage.Checkpointer().GetCurrentCheckpoint() + + mux.logger.Debug("attempting to restore a chunk", + "root", cp.Root, + "index", req.Index, + ) + + buf := bytes.NewBuffer(req.Chunk) + done, err := mux.state.storage.Checkpointer().RestoreChunk(mux.state.ctx, uint64(req.Index), buf) + switch { + case err == nil: + case errors.Is(err, checkpoint.ErrNoRestoreInProgress): + // This should never happen. + mux.logger.Error("ApplySnapshotChunk called without OfferSnapshot, aborting state sync") + return types.ResponseApplySnapshotChunk{Result: types.ResponseApplySnapshotChunk_ABORT} + case errors.Is(err, checkpoint.ErrChunkAlreadyRestored): + return types.ResponseApplySnapshotChunk{Result: types.ResponseApplySnapshotChunk_ACCEPT} + case errors.Is(err, checkpoint.ErrChunkCorrupted): + // Corrupted chunk, refetch. + mux.logger.Warn("received corrupted chunk", + "sender", req.Sender, + "index", req.Index, + "err", err, + ) + + return types.ResponseApplySnapshotChunk{ + RefetchChunks: []uint32{req.Index}, + // TODO: Consider banning the sender. + Result: types.ResponseApplySnapshotChunk_RETRY, + } + case errors.Is(err, checkpoint.ErrChunkProofVerificationFailed): + // Chunk was as specified in the manifest but did not match the reported root. In this case + // we need to abort processing the given snapshot. + mux.logger.Warn("chunk contains invalid proof, snapshot is bad", + "err", err, + ) + + return types.ResponseApplySnapshotChunk{Result: types.ResponseApplySnapshotChunk_REJECT_SNAPSHOT} + default: + // Unspecified error during restoration. + mux.logger.Error("error during chunk restoration, aborting state sync", + "err", err, + ) + + return types.ResponseApplySnapshotChunk{Result: types.ResponseApplySnapshotChunk_ABORT} + } + + // Check if we are done with the restoration. In this case, finalize the root. + if done { + err = mux.state.storage.NodeDB().Finalize(mux.state.ctx, cp.Root.Version, []hash.Hash{cp.Root.Hash}) + if err != nil { + mux.logger.Error("failed to finalize restored root", + "err", err, + ) + return types.ResponseApplySnapshotChunk{Result: types.ResponseApplySnapshotChunk_ABORT} + } + + if err = mux.state.doApplyStateSync(cp.Root); err != nil { + mux.logger.Error("failed to apply state sync root", + "err", err, + ) + return types.ResponseApplySnapshotChunk{Result: types.ResponseApplySnapshotChunk_ABORT} + } + + mux.logger.Info("successfully synced state", + "root", cp.Root, + ) + } + + return types.ResponseApplySnapshotChunk{Result: types.ResponseApplySnapshotChunk_ACCEPT} +} + func (mux *abciMux) doCleanup() { mux.state.doCleanup() diff --git a/go/consensus/tendermint/abci/state.go b/go/consensus/tendermint/abci/state.go index e5cf7d3f683..3e290a3a2e8 100644 --- a/go/consensus/tendermint/abci/state.go +++ b/go/consensus/tendermint/abci/state.go @@ -248,6 +248,24 @@ func (s *applicationState) doInitChain(now time.Time) error { return s.doCommitOrInitChainLocked(now) } +func (s *applicationState) doApplyStateSync(root storage.Root) error { + s.blockLock.Lock() + defer s.blockLock.Unlock() + + s.stateRoot = root + + s.deliverTxTree.Close() + s.deliverTxTree = mkvs.NewWithRoot(nil, s.storage.NodeDB(), root, mkvs.WithoutWriteLog()) + s.checkTxTree.Close() + s.checkTxTree = mkvs.NewWithRoot(nil, s.storage.NodeDB(), root, mkvs.WithoutWriteLog()) + + if err := s.doCommitOrInitChainLocked(time.Time{}); err != nil { + return err + } + + return nil +} + func (s *applicationState) doCommit(now time.Time) (uint64, error) { s.blockLock.Lock() defer s.blockLock.Unlock() diff --git a/go/consensus/tendermint/light.go b/go/consensus/tendermint/light.go new file mode 100644 index 00000000000..6b04d70ea07 --- /dev/null +++ b/go/consensus/tendermint/light.go @@ -0,0 +1,183 @@ +package tendermint + +import ( + "bytes" + "context" + "errors" + "fmt" + "time" + + "google.golang.org/grpc" + + tmlight "github.com/tendermint/tendermint/light" + tmlightprovider "github.com/tendermint/tendermint/light/provider" + tmproto "github.com/tendermint/tendermint/proto/tendermint/types" + tmtypes "github.com/tendermint/tendermint/types" + + "github.com/oasisprotocol/oasis-core/go/common/crypto/signature" + cmnGrpc "github.com/oasisprotocol/oasis-core/go/common/grpc" + "github.com/oasisprotocol/oasis-core/go/common/identity" + "github.com/oasisprotocol/oasis-core/go/common/node" + consensusAPI "github.com/oasisprotocol/oasis-core/go/consensus/api" +) + +// lightClientProvider implements Tendermint's light client provider interface using the Oasis Core +// light client API. +type lightClientProvider struct { + ctx context.Context + + chainID string + client consensusAPI.LightClientBackend +} + +// Implements tmlightprovider.Provider. +func (lp *lightClientProvider) ChainID() string { + return lp.chainID +} + +// Implements tmlightprovider.Provider. +func (lp *lightClientProvider) SignedHeader(height int64) (*tmtypes.SignedHeader, error) { + shdr, err := lp.client.GetSignedHeader(lp.ctx, height) + switch { + case err == nil: + case errors.Is(err, consensusAPI.ErrVersionNotFound): + return nil, tmlightprovider.ErrSignedHeaderNotFound + default: + return nil, fmt.Errorf("failed to fetch signed header: %w", err) + } + + // Decode Tendermint-specific signed header. + var protoSigHdr tmproto.SignedHeader + if err = protoSigHdr.Unmarshal(shdr.Meta); err != nil { + return nil, fmt.Errorf("received malformed header: %w", err) + } + sh, err := tmtypes.SignedHeaderFromProto(&protoSigHdr) + if err != nil { + return nil, fmt.Errorf("received malformed header: %w", err) + } + + if lp.chainID != sh.ChainID { + return nil, fmt.Errorf("incorrect chain ID (expected: %s got: %s)", + lp.chainID, + sh.ChainID, + ) + } + + return sh, nil +} + +// Implements tmlightprovider.Provider. +func (lp *lightClientProvider) ValidatorSet(height int64) (*tmtypes.ValidatorSet, error) { + vs, err := lp.client.GetValidatorSet(lp.ctx, height) + switch { + case err == nil: + case errors.Is(err, consensusAPI.ErrVersionNotFound): + return nil, tmlightprovider.ErrValidatorSetNotFound + default: + return nil, fmt.Errorf("failed to fetch validator set: %w", err) + } + + // Decode Tendermint-specific validator set. + var protoVals tmproto.ValidatorSet + if err = protoVals.Unmarshal(vs.Meta); err != nil { + return nil, fmt.Errorf("received malformed validator set: %w", err) + } + vals, err := tmtypes.ValidatorSetFromProto(&protoVals) + if err != nil { + return nil, fmt.Errorf("received malformed validator set: %w", err) + } + + return vals, nil +} + +// Implements tmlightprovider.Provider. +func (lp *lightClientProvider) ReportEvidence(ev tmtypes.Evidence) error { + // TODO: Implement SubmitEvidence. + return fmt.Errorf("not yet implemented") +} + +// newLightClientProvider creates a new provider for the Tendermint's light client. +// +// The provided chain ID must be the Tendermint chain ID. +func newLightClientProvider( + ctx context.Context, + chainID string, + address node.TLSAddress, +) (tmlightprovider.Provider, error) { + // Create TLS credentials. + opts := cmnGrpc.ClientOptions{ + CommonName: identity.CommonName, + ServerPubKeys: map[signature.PublicKey]bool{ + address.PubKey: true, + }, + } + creds, err := cmnGrpc.NewClientCreds(&opts) + if err != nil { + return nil, fmt.Errorf("failed to create TLS client credentials: %w", err) + } + + conn, err := cmnGrpc.Dial(address.Address.String(), grpc.WithTransportCredentials(creds)) + if err != nil { + return nil, fmt.Errorf("failed to dial public consensus service endpoint %s: %w", address, err) + } + + return &lightClientProvider{ + ctx: ctx, + chainID: chainID, + client: consensusAPI.NewConsensusLightClient(conn), + }, nil +} + +// lightService is a Tendermint consensus service that uses the light client API to talk with a +// remote Tendermint node and verify responses. +// +// This should eventually become a replacement for the full node tendermintService. +type lightService struct { + // lc is the Tendermint light client used for verifying headers. + lc *tmlight.Client + // client is the consensus light client backend connected to a remote node. + client consensusAPI.LightClientBackend +} + +func (ls *lightService) getParameters(ctx context.Context, height int64) (*tmproto.ConsensusParams, error) { + p, err := ls.client.GetParameters(ctx, height) + if err != nil { + return nil, err + } + if p.Height <= 0 { + return nil, fmt.Errorf("malformed height in response: %d", p.Height) + } + + // Decode Tendermint-specific parameters. + var params tmproto.ConsensusParams + if err = params.Unmarshal(p.Meta); err != nil { + return nil, fmt.Errorf("malformed parameters: %w", err) + } + if err = tmtypes.ValidateConsensusParams(params); err != nil { + return nil, fmt.Errorf("malformed parameters: %w", err) + } + + // Fetch the header from the light client. + h, err := ls.lc.VerifyHeaderAtHeight(p.Height, time.Now()) + if err != nil { + return nil, fmt.Errorf("failed to fetch header %d from light client: %w", p.Height, err) + } + + // Verify hash. + if localHash := tmtypes.HashConsensusParams(params); !bytes.Equal(localHash, h.ConsensusHash) { + return nil, fmt.Errorf("mismatched parameters hash (expected: %X got: %X)", + h.ConsensusHash, + localHash, + ) + } + + return ¶ms, nil +} + +// newLightService creates a light Tendermint consensus service. +func newLightService(client consensusAPI.LightClientBackend, lc *tmlight.Client) (*lightService, error) { + return &lightService{ + lc: lc, + client: client, + }, nil +} diff --git a/go/consensus/tendermint/statesync.go b/go/consensus/tendermint/statesync.go new file mode 100644 index 00000000000..c01590707a0 --- /dev/null +++ b/go/consensus/tendermint/statesync.go @@ -0,0 +1,164 @@ +package tendermint + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/spf13/viper" + tmlight "github.com/tendermint/tendermint/light" + tmlightprovider "github.com/tendermint/tendermint/light/provider" + tmlightdb "github.com/tendermint/tendermint/light/store/db" + tmstate "github.com/tendermint/tendermint/state" + tmstatesync "github.com/tendermint/tendermint/statesync" + tmtypes "github.com/tendermint/tendermint/types" + tmdb "github.com/tendermint/tm-db" + + "github.com/oasisprotocol/oasis-core/go/common/logging" + "github.com/oasisprotocol/oasis-core/go/common/node" +) + +// stateProviderConfig is the configuration for the state provider. +type stateProviderConfig struct { + // ChainID is the Tendermint chain ID. + ChainID string + + // ConsensusNodes is a list of nodes exposing the Oasis Core public consensus services that are + // used to fetch data required for syncing light clients. The first node is considered the + // primary and at least two nodes must be specified. + ConsensusNodes []node.TLSAddress + + // TrustOptions are Tendermint light client trust options. + TrustOptions tmlight.TrustOptions +} + +type stateProvider struct { + sync.Mutex + + ctx context.Context + lc *tmlight.Client + + logger *logging.Logger +} + +// Implements tmstatesync.StateProvider. +func (sp *stateProvider) AppHash(height uint64) ([]byte, error) { + sp.Lock() + defer sp.Unlock() + + // We have to fetch the next height, which contains the app hash for the previous height. + header, err := sp.lc.VerifyHeaderAtHeight(int64(height+1), time.Now()) + if err != nil { + return nil, err + } + return header.AppHash, nil +} + +// Implements tmstatesync.StateProvider. +func (sp *stateProvider) Commit(height uint64) (*tmtypes.Commit, error) { + sp.Lock() + defer sp.Unlock() + + header, err := sp.lc.VerifyHeaderAtHeight(int64(height), time.Now()) + if err != nil { + return nil, err + } + return header.Commit, nil +} + +// Implements tmstatesync.StateProvider. +func (sp *stateProvider) State(height uint64) (tmstate.State, error) { + sp.Lock() + defer sp.Unlock() + + state := tmstate.State{ + ChainID: sp.lc.ChainID(), + Version: tmstate.InitStateVersion, + } + + // We need to verify up until h+2, to get the validator set. This also prefetches the headers + // for h and h+1 in the typical case where the trusted header is after the snapshot height. + _, err := sp.lc.VerifyHeaderAtHeight(int64(height+2), time.Now()) + if err != nil { + return tmstate.State{}, err + } + header, err := sp.lc.VerifyHeaderAtHeight(int64(height), time.Now()) + if err != nil { + return tmstate.State{}, err + } + nextHeader, err := sp.lc.VerifyHeaderAtHeight(int64(height+1), time.Now()) + if err != nil { + return tmstate.State{}, err + } + state.LastBlockHeight = header.Height + state.LastBlockTime = header.Time + state.LastBlockID = header.Commit.BlockID + state.AppHash = nextHeader.AppHash + state.LastResultsHash = nextHeader.LastResultsHash + + state.LastValidators, _, err = sp.lc.TrustedValidatorSet(int64(height)) + if err != nil { + return tmstate.State{}, err + } + state.Validators, _, err = sp.lc.TrustedValidatorSet(int64(height + 1)) + if err != nil { + return tmstate.State{}, err + } + state.NextValidators, _, err = sp.lc.TrustedValidatorSet(int64(height + 2)) + if err != nil { + return tmstate.State{}, err + } + state.LastHeightValidatorsChanged = int64(height) + + // Fetch consensus parameters with light client verification. + primaryClient := sp.lc.Primary().(*lightClientProvider).client + ls, err := newLightService(primaryClient, sp.lc) + if err != nil { + return tmstate.State{}, fmt.Errorf("failed to create a new light service: %w", err) + } + params, err := ls.getParameters(sp.ctx, nextHeader.Height) + if err != nil { + return tmstate.State{}, fmt.Errorf("failed to fetch consensus parameters for height %d: %w", + nextHeader.Height, + err, + ) + } + state.ConsensusParams = *params + + return state, nil +} + +func newStateProvider(ctx context.Context, cfg stateProviderConfig) (tmstatesync.StateProvider, error) { + if numNodes := len(cfg.ConsensusNodes); numNodes < 2 { + return nil, fmt.Errorf("at least two consensus nodes must be provided (got %d)", numNodes) + } + + var providers []tmlightprovider.Provider + for _, address := range cfg.ConsensusNodes { + p, err := newLightClientProvider(ctx, cfg.ChainID, address) + if err != nil { + return nil, fmt.Errorf("failed to create light client provider: %w", err) + } + providers = append(providers, p) + } + + lc, err := tmlight.NewClient( + cfg.ChainID, + cfg.TrustOptions, + providers[0], // Primary provider. + providers[1:], // Witnesses. + tmlightdb.New(tmdb.NewMemDB(), ""), + tmlight.MaxRetryAttempts(5), + tmlight.Logger(newLogAdapter(!viper.GetBool(cfgLogDebug))), + ) + if err != nil { + return nil, fmt.Errorf("failed to create light client: %w", err) + } + + return &stateProvider{ + ctx: ctx, + lc: lc, + logger: logging.GetLogger("consensus/tendermint/stateprovider"), + }, nil +} diff --git a/go/consensus/tendermint/tendermint.go b/go/consensus/tendermint/tendermint.go index 553161ea879..941842f109d 100644 --- a/go/consensus/tendermint/tendermint.go +++ b/go/consensus/tendermint/tendermint.go @@ -20,6 +20,7 @@ import ( tmconfig "github.com/tendermint/tendermint/config" tmlog "github.com/tendermint/tendermint/libs/log" tmpubsub "github.com/tendermint/tendermint/libs/pubsub" + tmlight "github.com/tendermint/tendermint/light" tmmempool "github.com/tendermint/tendermint/mempool" tmnode "github.com/tendermint/tendermint/node" tmp2p "github.com/tendermint/tendermint/p2p" @@ -28,6 +29,7 @@ import ( tmcli "github.com/tendermint/tendermint/rpc/client/local" tmrpctypes "github.com/tendermint/tendermint/rpc/core/types" tmstate "github.com/tendermint/tendermint/state" + tmstatesync "github.com/tendermint/tendermint/statesync" tmtypes "github.com/tendermint/tendermint/types" tmdb "github.com/tendermint/tm-db" @@ -131,6 +133,18 @@ const ( // CfgConsensusDebugDisableCheckTx disables CheckTx. CfgConsensusDebugDisableCheckTx = "consensus.tendermint.debug.disable_check_tx" + // CfgConsensusStateSyncEnabled enabled consensus state sync. + CfgConsensusStateSyncEnabled = "consensus.tendermint.state_sync.enabled" + // CfgConsensusStateSyncConsensusNode specifies nodes exposing public consensus services which + // are used to sync a light client. + CfgConsensusStateSyncConsensusNode = "consensus.tendermint.state_sync.consensus_node" + // CfgConsensusStateSyncTrustPeriod is the light client trust period. + CfgConsensusStateSyncTrustPeriod = "consensus.tendermint.state_sync.trust_period" + // CfgConsensusStateSyncTrustHeight is the known trusted height for the light client. + CfgConsensusStateSyncTrustHeight = "consensus.tendermint.state_sync.trust_height" + // CfgConsensusStateSyncTrustHash is the known trusted block header hash for the light client. + CfgConsensusStateSyncTrustHash = "consensus.tendermint.state_sync.trust_hash" + // StateDir is the name of the directory located inside the node's data // directory which contains the tendermint state. StateDir = "tendermint" @@ -1145,6 +1159,40 @@ func (t *tendermintService) lazyInit() error { return db, nil } + // Configure state sync if enabled. + var stateProvider tmstatesync.StateProvider + if viper.GetBool(CfgConsensusStateSyncEnabled) { + t.Logger.Info("state sync enabled") + + // Enable state sync in the configuration. + tenderConfig.StateSync.Enable = true + tenderConfig.StateSync.TrustHash = viper.GetString(CfgConsensusStateSyncTrustHash) + + // Create new state sync state provider. + cfg := stateProviderConfig{ + ChainID: tmGenDoc.ChainID, + TrustOptions: tmlight.TrustOptions{ + Period: viper.GetDuration(CfgConsensusStateSyncTrustPeriod), + Height: int64(viper.GetUint64(CfgConsensusStateSyncTrustHeight)), + Hash: tenderConfig.StateSync.TrustHashBytes(), + }, + } + for _, rawAddr := range viper.GetStringSlice(CfgConsensusStateSyncConsensusNode) { + var addr node.TLSAddress + if err = addr.UnmarshalText([]byte(rawAddr)); err != nil { + return fmt.Errorf("failed to parse state sync consensus node address (%s): %w", rawAddr, err) + } + + cfg.ConsensusNodes = append(cfg.ConsensusNodes, addr) + } + if stateProvider, err = newStateProvider(t.ctx, cfg); err != nil { + t.Logger.Error("failed to create state sync state provider", + "err", err, + ) + return fmt.Errorf("failed to create state sync state provider: %w", err) + } + } + // HACK: tmnode.NewNode() triggers block replay and or ABCI chain // initialization, instead of t.node.Start(). This is a problem // because at the time that lazyInit() is called, none of the ABCI @@ -1174,6 +1222,7 @@ func (t *tendermintService) lazyInit() error { wrapDbProvider, tmnode.DefaultMetricsProvider(tenderConfig.Instrumentation), newLogAdapter(!viper.GetBool(cfgLogDebug)), + tmnode.StateProvider(stateProvider), ) if err != nil { return fmt.Errorf("tendermint: failed to create node: %w", err) @@ -1626,6 +1675,13 @@ func init() { Flags.Bool(CfgConsensusDebugDisableCheckTx, false, "do not perform CheckTx on incoming transactions (UNSAFE)") Flags.Bool(CfgDebugUnsafeReplayRecoverCorruptedWAL, false, "Enable automatic recovery from corrupted WAL during replay (UNSAFE).") + // State sync. + Flags.Bool(CfgConsensusStateSyncEnabled, false, "enable state sync") + Flags.StringSlice(CfgConsensusStateSyncConsensusNode, []string{}, "consensus node to use for syncing the light client") + Flags.Duration(CfgConsensusStateSyncTrustPeriod, 24*time.Hour, "light client trust period") + Flags.Uint64(CfgConsensusStateSyncTrustHeight, 0, "light client trusted height") + Flags.String(CfgConsensusStateSyncTrustHash, "", "light client trusted consensus header hash") + _ = Flags.MarkHidden(cfgLogDebug) _ = Flags.MarkHidden(CfgDebugP2PAddrBookLenient) _ = Flags.MarkHidden(CfgDebugP2PAllowDuplicateIP) diff --git a/go/oasis-test-runner/oasis/args.go b/go/oasis-test-runner/oasis/args.go index 3161c217913..d76affb1b54 100644 --- a/go/oasis-test-runner/oasis/args.go +++ b/go/oasis-test-runner/oasis/args.go @@ -175,6 +175,22 @@ func (args *argBuilder) tendermintDebugAllowDuplicateIP() *argBuilder { return args } +func (args *argBuilder) tendermintStateSync( + consensusNodes []string, + trustHeight uint64, + trustHash string, +) *argBuilder { + args.vec = append(args.vec, + "--"+tendermint.CfgConsensusStateSyncEnabled, + "--"+tendermint.CfgConsensusStateSyncTrustHeight, strconv.FormatUint(trustHeight, 10), + "--"+tendermint.CfgConsensusStateSyncTrustHash, trustHash, + ) + for _, address := range consensusNodes { + args.vec = append(args.vec, "--"+tendermint.CfgConsensusStateSyncConsensusNode, address) + } + return args +} + func (args *argBuilder) storageBackend(backend string) *argBuilder { args.vec = append(args.vec, []string{ "--" + storage.CfgBackend, backend, diff --git a/go/oasis-test-runner/oasis/oasis.go b/go/oasis-test-runner/oasis/oasis.go index fcbb23435ad..363c90bbb7c 100644 --- a/go/oasis-test-runner/oasis/oasis.go +++ b/go/oasis-test-runner/oasis/oasis.go @@ -53,6 +53,13 @@ const ( maxNodes = 32 // Arbitrary ) +// ConsensusStateSyncCfg is a node's consensus state sync configuration. +type ConsensusStateSyncCfg struct { + ConsensusNodes []string + TrustHeight uint64 + TrustHash string +} + // Node defines the common fields for all node types. type Node struct { // nolint: maligned sync.Mutex @@ -76,6 +83,7 @@ type Node struct { // nolint: maligned logWatcherHandlerFactories []log.WatcherHandlerFactory consensus ConsensusFixture + consensusStateSync *ConsensusStateSyncCfg customGrpcSocketPath string } @@ -178,6 +186,14 @@ func (n *Node) handleExit(cmdErr error) error { } } +// SetConsensusStateSync configures wheteher a node should perform +func (n *Node) SetConsensusStateSync(cfg *ConsensusStateSyncCfg) { + n.Lock() + defer n.Unlock() + + n.consensusStateSync = cfg +} + // NodeCfg defines the common node configuration options. type NodeCfg struct { // nolint: maligned AllowEarlyTermination bool @@ -727,6 +743,13 @@ func (net *Network) startOasisNode( extraArgs = extraArgs.debugDontBlameOasis() extraArgs = extraArgs.grpcDebugGrpcInternalSocketPath(node.customGrpcSocketPath) } + if node.consensusStateSync != nil { + extraArgs = extraArgs.tendermintStateSync( + node.consensusStateSync.ConsensusNodes, + node.consensusStateSync.TrustHeight, + node.consensusStateSync.TrustHash, + ) + } if viper.IsSet(metrics.CfgMetricsAddr) { extraArgs = extraArgs.appendNodeMetrics(node) } diff --git a/go/oasis-test-runner/scenario/e2e/consensus_state_sync.go b/go/oasis-test-runner/scenario/e2e/consensus_state_sync.go new file mode 100644 index 00000000000..898c34f8545 --- /dev/null +++ b/go/oasis-test-runner/scenario/e2e/consensus_state_sync.go @@ -0,0 +1,140 @@ +package e2e + +import ( + "context" + "encoding/hex" + "fmt" + "time" + + consensus "github.com/oasisprotocol/oasis-core/go/consensus/api" + "github.com/oasisprotocol/oasis-core/go/oasis-test-runner/env" + "github.com/oasisprotocol/oasis-core/go/oasis-test-runner/oasis" + "github.com/oasisprotocol/oasis-core/go/oasis-test-runner/scenario" +) + +var ( + // ConsensusStateSync is the consensus state sync scenario. + ConsensusStateSync scenario.Scenario = &consensusStateSyncImpl{ + E2E: *NewE2E("consensus-state-sync"), + } +) + +type consensusStateSyncImpl struct { + E2E +} + +func (sc *consensusStateSyncImpl) Clone() scenario.Scenario { + return &consensusStateSyncImpl{ + E2E: sc.E2E.Clone(), + } +} + +func (sc *consensusStateSyncImpl) Fixture() (*oasis.NetworkFixture, error) { + f, err := sc.E2E.Fixture() + if err != nil { + return nil, err + } + + // Add an extra validator. + f.Validators = append(f.Validators, + oasis.ValidatorFixture{Entity: 1, Consensus: oasis.ConsensusFixture{EnableConsensusRPCWorker: true}}, + ) + + return f, nil +} + +func (sc *consensusStateSyncImpl) Run(childEnv *env.Env) error { + if err := sc.Net.Start(); err != nil { + return err + } + + sc.Logger.Info("waiting for network to come up") + ctx := context.Background() + if err := sc.Net.Controller().WaitNodesRegistered(ctx, len(sc.Net.Validators())); err != nil { + return err + } + + // Stop one of the validators. + val := sc.Net.Validators()[2] + if err := val.Stop(); err != nil { + return fmt.Errorf("failed to stop validator: %w", err) + } + + // Let the network run for 50 blocks. This should generate some checkpoints. + blockCh, blockSub, err := sc.Net.Controller().Consensus.WatchBlocks(ctx) + if err != nil { + return err + } + defer blockSub.Close() + + sc.Logger.Info("waiting for some blocks") + var blk *consensus.Block + for { + select { + case blk = <-blockCh: + if blk.Height < 50 { + continue + } + case <-time.After(30 * time.Second): + return fmt.Errorf("timed out waiting for blocks") + } + + break + } + + sc.Logger.Info("got some blocks, starting the validator back", + "trust_height", blk.Height, + "trust_hash", hex.EncodeToString(blk.Hash), + ) + + // Get the TLS public key from the validators. + var consensusNodes []string + for _, v := range sc.Net.Validators()[:2] { + ctrl, err := oasis.NewController(v.SocketPath()) + if err != nil { + return fmt.Errorf("failed to create controller for validator %s: %w", v.Name, err) + } + + status, err := ctrl.GetStatus(ctx) + if err != nil { + return fmt.Errorf("failed to get status for validator %s: %w", v.Name, err) + } + + if status.Registration.Descriptor == nil { + return fmt.Errorf("validator %s has not registered", v.Name) + } + if len(status.Registration.Descriptor.TLS.Addresses) == 0 { + return fmt.Errorf("validator %s has no TLS addresses", v.Name) + } + + tlsAddress := status.Registration.Descriptor.TLS.Addresses[0] + rawAddress, err := tlsAddress.MarshalText() + if err != nil { + return fmt.Errorf("failed to marshal TLS address: %w", err) + } + consensusNodes = append(consensusNodes, string(rawAddress)) + } + + // Configure state sync for the consensus validator. + val.SetConsensusStateSync(&oasis.ConsensusStateSyncCfg{ + ConsensusNodes: consensusNodes, + TrustHeight: uint64(blk.Height), + TrustHash: hex.EncodeToString(blk.Hash), + }) + + if err := val.Start(); err != nil { + return fmt.Errorf("failed to start validator back: %w", err) + } + + // Wait for the validator to finish syncing. + sc.Logger.Info("waiting for the validator to sync") + valCtrl, err := oasis.NewController(val.SocketPath()) + if err != nil { + return err + } + if err = valCtrl.WaitSync(ctx); err != nil { + return err + } + + return nil +} diff --git a/go/oasis-test-runner/scenario/e2e/e2e.go b/go/oasis-test-runner/scenario/e2e/e2e.go index 40d73d65997..5b24db07f83 100644 --- a/go/oasis-test-runner/scenario/e2e/e2e.go +++ b/go/oasis-test-runner/scenario/e2e/e2e.go @@ -322,6 +322,8 @@ func RegisterScenarios() error { Debond, // Early query test. EarlyQuery, + // Consensus state sync. + ConsensusStateSync, } { if err := cmd.Register(s); err != nil { return err