diff --git a/.changelog/2440.feature.1.md b/.changelog/2440.feature.1.md new file mode 100644 index 00000000000..31aa0ef8842 --- /dev/null +++ b/.changelog/2440.feature.1.md @@ -0,0 +1 @@ +go/consensus: Add basic API for supporting light consensus clients diff --git a/.changelog/2440.feature.2.md b/.changelog/2440.feature.2.md new file mode 100644 index 00000000000..4b619ad6b64 --- /dev/null +++ b/.changelog/2440.feature.2.md @@ -0,0 +1,9 @@ +go/worker/consensusrpc: Add public consensus RPC services worker + +A public consensus services worker enables any full consensus node to expose +light client services to other nodes that may need them (e.g., they are needed +to support light clients). + +The worker can be enabled using `--worker.consensusrpc.enabled` and is +disabled by default. Enabling the public consensus services worker exposes +the light consensus client interface over publicly accessible gRPC. diff --git a/.changelog/2881.breaking.md b/.changelog/2881.breaking.md new file mode 100644 index 00000000000..85301ddfb8f --- /dev/null +++ b/.changelog/2881.breaking.md @@ -0,0 +1 @@ +go/common/node: Add RoleConsensusRPC role bit diff --git a/go/common/node/node.go b/go/common/node/node.go index 9bd4ca08ded..402fe8093a7 100644 --- a/go/common/node/node.go +++ b/go/common/node/node.go @@ -69,18 +69,20 @@ type Node struct { type RolesMask uint32 const ( - // RoleComputeWorker is Oasis compute worker role. + // RoleComputeWorker is the compute worker role. RoleComputeWorker RolesMask = 1 << 0 - // RoleStorageWorker is Oasis storage worker role. + // RoleStorageWorker is the storage worker role. RoleStorageWorker RolesMask = 1 << 1 - // RoleKeyManager is the Oasis key manager role. + // RoleKeyManager is the the key manager role. RoleKeyManager RolesMask = 1 << 2 - // RoleValidator is the Oasis validator role. + // RoleValidator is the validator role. RoleValidator RolesMask = 1 << 3 + // RoleConsensusRPC is the public consensus RPC services worker role. + RoleConsensusRPC RolesMask = 1 << 4 // RoleReserved are all the bits of the Oasis node roles bitmask // that are reserved and must not be used. - RoleReserved RolesMask = ((1 << 32) - 1) & ^((RoleValidator << 1) - 1) + RoleReserved RolesMask = ((1 << 32) - 1) & ^((RoleConsensusRPC << 1) - 1) ) // IsSingleRole returns true if RolesMask encodes a single valid role. @@ -102,11 +104,14 @@ func (m RolesMask) String() string { ret = append(ret, "storage") } if m&RoleKeyManager != 0 { - ret = append(ret, "key_manager") + ret = append(ret, "key-manager") } if m&RoleValidator != 0 { ret = append(ret, "validator") } + if m&RoleConsensusRPC != 0 { + ret = append(ret, "consensus-rpc") + } return strings.Join(ret, ",") } diff --git a/go/consensus/api/api.go b/go/consensus/api/api.go index 9818a7287cb..1060196646f 100644 --- a/go/consensus/api/api.go +++ b/go/consensus/api/api.go @@ -4,6 +4,7 @@ package api import ( "context" + "time" beacon "github.com/oasislabs/oasis-core/go/beacon/api" "github.com/oasislabs/oasis-core/go/common/cbor" @@ -42,9 +43,10 @@ var ( ErrVersionNotFound = errors.New(moduleName, 3, "consensus: version not found") ) -// ClientBackend is a limited consensus interface used by clients that -// connect to the local node. +// ClientBackend is a limited consensus interface used by clients that connect to the local full +// node. This is separate from light clients which use the LightClientBackend interface. type ClientBackend interface { + LightClientBackend TransactionAuthHandler // SubmitTx submits a signed consensus transaction. @@ -87,6 +89,10 @@ type ClientBackend interface { type Block struct { // Height contains the block height. Height int64 `json:"height"` + // Hash contains the block header hash. + Hash []byte `json:"hash"` + // Time is the second-granular consensus time. + Time time.Time `json:"time"` // Meta contains the consensus backend specific block metadata. Meta cbor.RawMessage `json:"meta"` } diff --git a/go/consensus/api/grpc.go b/go/consensus/api/grpc.go index 03387d86ca7..61b9d4d0566 100644 --- a/go/consensus/api/grpc.go +++ b/go/consensus/api/grpc.go @@ -15,6 +15,8 @@ import ( var ( // serviceName is the gRPC service name. serviceName = cmnGrpc.NewServiceName("Consensus") + // lightServiceName is the gRPC service name for the light consensus interface. + lightServiceName = cmnGrpc.NewServiceName("ConsensusLight") // methodSubmitTx is the SubmitTx method. methodSubmitTx = serviceName.NewMethod("SubmitTx", transaction.SignedTransaction{}) @@ -36,10 +38,17 @@ var ( // methodWatchBlocks is the WatchBlocks method. methodWatchBlocks = serviceName.NewMethod("WatchBlocks", nil) + // methodGetSignedHeader is the GetSignedHeader method. + methodGetSignedHeader = lightServiceName.NewMethod("GetSignedHeader", int64(0)) + // methodGetValidatorSet is the GetValidatorSet method. + methodGetValidatorSet = lightServiceName.NewMethod("GetValidatorSet", int64(0)) + // methodGetParameters is the GetParameters method. + methodGetParameters = lightServiceName.NewMethod("GetParameters", int64(0)) + // serviceDesc is the gRPC service descriptor. serviceDesc = grpc.ServiceDesc{ ServiceName: string(serviceName), - HandlerType: (*Backend)(nil), + HandlerType: (*ClientBackend)(nil), Methods: []grpc.MethodDesc{ { MethodName: methodSubmitTx.ShortName(), @@ -82,6 +91,26 @@ var ( }, }, } + + // lightServiceDesc is the gRPC service descriptor for the light consensus service. + lightServiceDesc = grpc.ServiceDesc{ + ServiceName: string(lightServiceName), + HandlerType: (*LightClientBackend)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: methodGetSignedHeader.ShortName(), + Handler: handlerGetSignedHeader, + }, + { + MethodName: methodGetValidatorSet.ShortName(), + Handler: handlerGetValidatorSet, + }, + { + MethodName: methodGetParameters.ShortName(), + Handler: handlerGetParameters, + }, + }, + } ) func handlerSubmitTx( // nolint: golint @@ -95,14 +124,14 @@ func handlerSubmitTx( // nolint: golint return nil, err } if interceptor == nil { - return nil, srv.(Backend).SubmitTx(ctx, rq) + return nil, srv.(ClientBackend).SubmitTx(ctx, rq) } info := &grpc.UnaryServerInfo{ Server: srv, FullMethod: methodSubmitTx.FullName(), } handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return nil, srv.(Backend).SubmitTx(ctx, req.(*transaction.SignedTransaction)) + return nil, srv.(ClientBackend).SubmitTx(ctx, req.(*transaction.SignedTransaction)) } return interceptor(ctx, rq, info, handler) } @@ -118,14 +147,14 @@ func handlerStateToGenesis( // nolint: golint return nil, err } if interceptor == nil { - return srv.(Backend).StateToGenesis(ctx, height) + return srv.(ClientBackend).StateToGenesis(ctx, height) } info := &grpc.UnaryServerInfo{ Server: srv, FullMethod: methodStateToGenesis.FullName(), } handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(Backend).StateToGenesis(ctx, req.(int64)) + return srv.(ClientBackend).StateToGenesis(ctx, req.(int64)) } return interceptor(ctx, height, info, handler) } @@ -141,14 +170,14 @@ func handlerEstimateGas( // nolint: golint return nil, err } if interceptor == nil { - return srv.(Backend).EstimateGas(ctx, rq) + return srv.(ClientBackend).EstimateGas(ctx, rq) } info := &grpc.UnaryServerInfo{ Server: srv, FullMethod: methodEstimateGas.FullName(), } handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(Backend).EstimateGas(ctx, req.(*EstimateGasRequest)) + return srv.(ClientBackend).EstimateGas(ctx, req.(*EstimateGasRequest)) } return interceptor(ctx, rq, info, handler) } @@ -164,14 +193,14 @@ func handlerGetSignerNonce( // nolint: golint return nil, err } if interceptor == nil { - return srv.(Backend).GetSignerNonce(ctx, rq) + return srv.(ClientBackend).GetSignerNonce(ctx, rq) } info := &grpc.UnaryServerInfo{ Server: srv, FullMethod: methodGetSignerNonce.FullName(), } handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(Backend).GetSignerNonce(ctx, req.(*GetSignerNonceRequest)) + return srv.(ClientBackend).GetSignerNonce(ctx, req.(*GetSignerNonceRequest)) } return interceptor(ctx, rq, info, handler) } @@ -187,14 +216,14 @@ func handlerGetEpoch( // nolint: golint return nil, err } if interceptor == nil { - return srv.(Backend).GetEpoch(ctx, height) + return srv.(ClientBackend).GetEpoch(ctx, height) } info := &grpc.UnaryServerInfo{ Server: srv, FullMethod: methodGetEpoch.FullName(), } handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(Backend).GetEpoch(ctx, req.(int64)) + return srv.(ClientBackend).GetEpoch(ctx, req.(int64)) } return interceptor(ctx, height, info, handler) } @@ -210,14 +239,14 @@ func handlerWaitEpoch( // nolint: golint return nil, err } if interceptor == nil { - return nil, srv.(Backend).WaitEpoch(ctx, epoch) + return nil, srv.(ClientBackend).WaitEpoch(ctx, epoch) } info := &grpc.UnaryServerInfo{ Server: srv, FullMethod: methodWaitEpoch.FullName(), } handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return nil, srv.(Backend).WaitEpoch(ctx, req.(epochtime.EpochTime)) + return nil, srv.(ClientBackend).WaitEpoch(ctx, req.(epochtime.EpochTime)) } return interceptor(ctx, epoch, info, handler) } @@ -233,14 +262,14 @@ func handlerGetBlock( // nolint: golint return nil, err } if interceptor == nil { - return srv.(Backend).GetBlock(ctx, height) + return srv.(ClientBackend).GetBlock(ctx, height) } info := &grpc.UnaryServerInfo{ Server: srv, FullMethod: methodGetBlock.FullName(), } handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(Backend).GetBlock(ctx, req.(int64)) + return srv.(ClientBackend).GetBlock(ctx, req.(int64)) } return interceptor(ctx, height, info, handler) } @@ -256,14 +285,14 @@ func handlerGetTransactions( // nolint: golint return nil, err } if interceptor == nil { - return srv.(Backend).GetTransactions(ctx, height) + return srv.(ClientBackend).GetTransactions(ctx, height) } info := &grpc.UnaryServerInfo{ Server: srv, FullMethod: methodGetTransactions.FullName(), } handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(Backend).GetTransactions(ctx, req.(int64)) + return srv.(ClientBackend).GetTransactions(ctx, req.(int64)) } return interceptor(ctx, height, info, handler) } @@ -274,7 +303,7 @@ func handlerWatchBlocks(srv interface{}, stream grpc.ServerStream) error { } ctx := stream.Context() - ch, sub, err := srv.(Backend).WatchBlocks(ctx) + ch, sub, err := srv.(ClientBackend).WatchBlocks(ctx) if err != nil { return err } @@ -296,13 +325,120 @@ func handlerWatchBlocks(srv interface{}, stream grpc.ServerStream) error { } } -// RegisterService registers a new consensus backend service with the -// given gRPC server. -func RegisterService(server *grpc.Server, service Backend) { +func handlerGetSignedHeader( // nolint: golint + srv interface{}, + ctx context.Context, + dec func(interface{}) error, + interceptor grpc.UnaryServerInterceptor, +) (interface{}, error) { + var height int64 + if err := dec(&height); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(LightClientBackend).GetSignedHeader(ctx, height) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: methodGetSignedHeader.FullName(), + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(LightClientBackend).GetSignedHeader(ctx, req.(int64)) + } + return interceptor(ctx, height, info, handler) +} + +func handlerGetValidatorSet( // nolint: golint + srv interface{}, + ctx context.Context, + dec func(interface{}) error, + interceptor grpc.UnaryServerInterceptor, +) (interface{}, error) { + var height int64 + if err := dec(&height); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(LightClientBackend).GetValidatorSet(ctx, height) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: methodGetValidatorSet.FullName(), + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(LightClientBackend).GetValidatorSet(ctx, req.(int64)) + } + return interceptor(ctx, height, info, handler) +} + +func handlerGetParameters( // nolint: golint + srv interface{}, + ctx context.Context, + dec func(interface{}) error, + interceptor grpc.UnaryServerInterceptor, +) (interface{}, error) { + var height int64 + if err := dec(&height); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(LightClientBackend).GetParameters(ctx, height) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: methodGetParameters.FullName(), + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(LightClientBackend).GetParameters(ctx, req.(int64)) + } + return interceptor(ctx, height, info, handler) +} + +// RegisterService registers a new client backend service with the given gRPC server. +func RegisterService(server *grpc.Server, service ClientBackend) { server.RegisterService(&serviceDesc, service) + RegisterLightService(server, service) +} + +// RegisterLightService registers a new light client backend service with the given gRPC server. +func RegisterLightService(server *grpc.Server, service LightClientBackend) { + server.RegisterService(&lightServiceDesc, service) +} + +type consensusLightClient struct { + conn *grpc.ClientConn +} + +// Implements LightClientBackend. +func (c *consensusLightClient) GetSignedHeader(ctx context.Context, height int64) (*SignedHeader, error) { + var rsp SignedHeader + if err := c.conn.Invoke(ctx, methodGetSignedHeader.FullName(), height, &rsp); err != nil { + return nil, err + } + return &rsp, nil +} + +// Implements LightClientBackend. +func (c *consensusLightClient) GetValidatorSet(ctx context.Context, height int64) (*ValidatorSet, error) { + var rsp ValidatorSet + if err := c.conn.Invoke(ctx, methodGetValidatorSet.FullName(), height, &rsp); err != nil { + return nil, err + } + return &rsp, nil +} + +// Implements LightClientBackend. +func (c *consensusLightClient) GetParameters(ctx context.Context, height int64) (*Parameters, error) { + var rsp Parameters + if err := c.conn.Invoke(ctx, methodGetParameters.FullName(), height, &rsp); err != nil { + return nil, err + } + return &rsp, nil } type consensusClient struct { + consensusLightClient + conn *grpc.ClientConn } @@ -399,5 +535,13 @@ func (c *consensusClient) WatchBlocks(ctx context.Context) (<-chan *Block, pubsu // NewConsensusClient creates a new gRPC consensus client service. func NewConsensusClient(c *grpc.ClientConn) ClientBackend { - return &consensusClient{c} + return &consensusClient{ + consensusLightClient: consensusLightClient{c}, + conn: c, + } +} + +// NewConsensusLightClient creates a new gRPC consensus light client service. +func NewConsensusLightClient(c *grpc.ClientConn) LightClientBackend { + return &consensusLightClient{c} } diff --git a/go/consensus/api/light.go b/go/consensus/api/light.go new file mode 100644 index 00000000000..bef57e18177 --- /dev/null +++ b/go/consensus/api/light.go @@ -0,0 +1,43 @@ +package api + +import "context" + +// LightClientBackend is the limited consensus interface used by light clients. +type LightClientBackend interface { + // GetSignedHeader returns the signed header for a specific height. + GetSignedHeader(ctx context.Context, height int64) (*SignedHeader, error) + + // GetValidatorSet returns the validator set for a specific height. + GetValidatorSet(ctx context.Context, height int64) (*ValidatorSet, error) + + // GetParameters returns the consensus parameters for a specific height. + GetParameters(ctx context.Context, height int64) (*Parameters, error) + + // TODO: Move SubmitEvidence etc. from Backend. +} + +// SignedHeader is a signed consensus block header. +type SignedHeader struct { + // Height contains the block height this header is for. + Height int64 `json:"height"` + // Meta contains the consensus backend specific signed header. + Meta []byte `json:"meta"` +} + +// ValidatorSet contains the validator set information. +type ValidatorSet struct { + // Height contains the block height this validator set is for. + Height int64 `json:"height"` + // Meta contains the consensus backend specific validator set. + Meta []byte `json:"meta"` +} + +// Parameters are the consensus backend parameters. +type Parameters struct { + // Height contains the block height these consensus parameters are for. + Height int64 `json:"height"` + // Meta contains the consensus backend specific consensus parameters. + Meta []byte `json:"meta"` + + // TODO: Consider also including consensus/genesis.Parameters which are backend-agnostic. +} diff --git a/go/consensus/tendermint/api/api.go b/go/consensus/tendermint/api/api.go index 5d82ea2ff0f..866bc80e8ae 100644 --- a/go/consensus/tendermint/api/api.go +++ b/go/consensus/tendermint/api/api.go @@ -149,6 +149,8 @@ func NewBlock(blk *tmtypes.Block) *consensus.Block { return &consensus.Block{ Height: blk.Header.Height, + Hash: blk.Header.Hash(), + Time: blk.Header.Time, Meta: rawMeta, } } diff --git a/go/consensus/tendermint/full.go b/go/consensus/tendermint/full.go new file mode 100644 index 00000000000..95f201e21d7 --- /dev/null +++ b/go/consensus/tendermint/full.go @@ -0,0 +1,75 @@ +package tendermint + +import ( + "context" + "fmt" + + tmamino "github.com/tendermint/go-amino" + tmrpctypes "github.com/tendermint/tendermint/rpc/core/types" + tmstate "github.com/tendermint/tendermint/state" + + consensusAPI "github.com/oasislabs/oasis-core/go/consensus/api" +) + +// We must use Tendermint's amino codec as some Tendermint's types are not easily unmarshallable. +var aminoCodec = tmamino.NewCodec() + +func init() { + tmrpctypes.RegisterAmino(aminoCodec) +} + +// Implements LightClientBackend. +func (t *tendermintService) GetSignedHeader(ctx context.Context, height int64) (*consensusAPI.SignedHeader, error) { + if err := t.ensureStarted(ctx); err != nil { + return nil, err + } + + commit, err := t.client.Commit(&height) + if err != nil { + return nil, fmt.Errorf("%w: tendermint: header query failed: %s", consensusAPI.ErrVersionNotFound, err.Error()) + } + + if commit.Header == nil { + return nil, fmt.Errorf("tendermint: header is nil") + } + + return &consensusAPI.SignedHeader{ + Height: commit.Header.Height, + Meta: aminoCodec.MustMarshalBinaryBare(commit.SignedHeader), + }, nil +} + +// Implements LightClientBackend. +func (t *tendermintService) GetValidatorSet(ctx context.Context, height int64) (*consensusAPI.ValidatorSet, error) { + if err := t.ensureStarted(ctx); err != nil { + return nil, err + } + + // Don't use the client as that imposes stupid pagination. Access the state database directly. + vals, err := tmstate.LoadValidators(t.stateDb, height) + if err != nil { + return nil, consensusAPI.ErrVersionNotFound + } + + return &consensusAPI.ValidatorSet{ + Height: height, + Meta: aminoCodec.MustMarshalBinaryBare(vals), + }, nil +} + +// Implements LightClientBackend. +func (t *tendermintService) GetParameters(ctx context.Context, height int64) (*consensusAPI.Parameters, error) { + if err := t.ensureStarted(ctx); err != nil { + return nil, err + } + + params, err := t.client.ConsensusParams(&height) + if err != nil { + return nil, fmt.Errorf("%w: tendermint: consensus params query failed: %s", consensusAPI.ErrVersionNotFound, err.Error()) + } + + return &consensusAPI.Parameters{ + Height: params.BlockHeight, + Meta: aminoCodec.MustMarshalBinaryBare(params.ConsensusParams), + }, nil +} diff --git a/go/consensus/tendermint/tendermint.go b/go/consensus/tendermint/tendermint.go index 16653342c4e..98951bfab1a 100644 --- a/go/consensus/tendermint/tendermint.go +++ b/go/consensus/tendermint/tendermint.go @@ -25,6 +25,7 @@ import ( tmcli "github.com/tendermint/tendermint/rpc/client/local" tmrpctypes "github.com/tendermint/tendermint/rpc/core/types" tmtypes "github.com/tendermint/tendermint/types" + tmdb "github.com/tendermint/tm-db" beaconAPI "github.com/oasislabs/oasis-core/go/beacon/api" "github.com/oasislabs/oasis-core/go/common" @@ -199,6 +200,8 @@ type tendermintService struct { blockNotifier *pubsub.Broker failMonitor *failMonitor + stateDb tmdb.DB + beacon beaconAPI.Backend epochtime epochtimeAPI.Backend keymanager keymanagerAPI.Backend @@ -728,6 +731,20 @@ func (t *tendermintService) WatchBlocks(ctx context.Context) (<-chan *consensusA return mapCh, sub, nil } +func (t *tendermintService) ensureStarted(ctx context.Context) error { + // Make sure that the Tendermint service has started so that we + // have the client interface available. + select { + case <-t.startedCh: + case <-t.ctx.Done(): + return t.ctx.Err() + case <-ctx.Done(): + return ctx.Err() + } + + return nil +} + func (t *tendermintService) initialize() error { t.Lock() defer t.Unlock() @@ -802,14 +819,8 @@ func (t *tendermintService) initialize() error { } func (t *tendermintService) GetTendermintBlock(ctx context.Context, height int64) (*tmtypes.Block, error) { - // Make sure that the Tendermint service has started so that we - // have the client interface available. - select { - case <-t.startedCh: - case <-t.ctx.Done(): - return nil, t.ctx.Err() - case <-ctx.Done(): - return nil, ctx.Err() + if err := t.ensureStarted(ctx); err != nil { + return nil, err } var tmHeight int64 @@ -1035,6 +1046,25 @@ func (t *tendermintService) lazyInit() error { return err } + // HACK: Wrap the provider so we can extract the state database handle. This is required because + // Tendermint does not expose a way to access the state database and we need it to bypass some + // stupid things like pagination on the in-process "client". + wrapDbProvider := func(dbCtx *tmnode.DBContext) (tmdb.DB, error) { + db, derr := dbProvider(dbCtx) + if derr != nil { + return nil, derr + } + + switch dbCtx.ID { + case "state": + // Tendermint state database. + t.stateDb = db + default: + } + + return db, nil + } + // HACK: tmnode.NewNode() triggers block replay and or ABCI chain // initialization, instead of t.node.Start(). This is a problem // because at the time that lazyInit() is called, none of the ABCI @@ -1048,13 +1078,17 @@ func (t *tendermintService) lazyInit() error { &tmp2p.NodeKey{PrivKey: crypto.SignerToTendermint(t.nodeSigner)}, tmproxy.NewLocalClientCreator(t.mux.Mux()), tendermintGenesisProvider, - dbProvider, + wrapDbProvider, tmnode.DefaultMetricsProvider(tenderConfig.Instrumentation), newLogAdapter(!viper.GetBool(cfgLogDebug)), ) if err != nil { return fmt.Errorf("tendermint: failed to create node: %w", err) } + if t.stateDb == nil { + // Sanity check for the above wrapDbProvider hack in case the DB provider changes. + panic("tendermint: state database not set") + } t.client = tmcli.New(t.node) t.failMonitor = newFailMonitor(t.Logger, t.node.ConsensusState().Wait) diff --git a/go/consensus/tests/tester.go b/go/consensus/tests/tester.go index d87d282dca7..2ad2e005628 100644 --- a/go/consensus/tests/tester.go +++ b/go/consensus/tests/tester.go @@ -68,4 +68,20 @@ func ConsensusImplementationTests(t *testing.T, backend consensus.ClientBackend) }) require.NoError(err, "GetSignerNonce") require.Equal(uint64(0), nonce, "Nonce should be zero") + + // Light client API. + shdr, err := backend.GetSignedHeader(ctx, blk.Height) + require.NoError(err, "GetSignedHeader") + require.Equal(shdr.Height, blk.Height, "returned header height should be correct") + require.NotNil(shdr.Meta, "returned header should contain metadata") + + vals, err := backend.GetValidatorSet(ctx, blk.Height) + require.NoError(err, "GetValidatorSet") + require.Equal(vals.Height, blk.Height, "returned validator set height should be correct") + require.NotNil(vals.Meta, "returned validator set should contain metadata") + + params, err := backend.GetParameters(ctx, blk.Height) + require.NoError(err, "GetParameters") + require.Equal(params.Height, blk.Height, "returned parameters height should be correct") + require.NotNil(params.Meta, "returned parameters should contain metadata") } diff --git a/go/go.mod b/go/go.mod index ed5aa187592..36cc0d19fbf 100644 --- a/go/go.mod +++ b/go/go.mod @@ -55,7 +55,7 @@ require ( github.com/spf13/viper v1.6.3 github.com/steveyen/gtreap v0.0.0-20150807155958-0abe01ef9be2 // indirect github.com/stretchr/testify v1.5.1 - github.com/tendermint/go-amino v0.15.0 // indirect + github.com/tendermint/go-amino v0.15.0 github.com/tendermint/tendermint v0.32.8 github.com/tendermint/tm-db v0.5.1 github.com/thepudds/fzgo v0.2.2 diff --git a/go/oasis-node/cmd/node/node.go b/go/oasis-node/cmd/node/node.go index 7c58ebe854f..586c211ed4b 100644 --- a/go/oasis-node/cmd/node/node.go +++ b/go/oasis-node/cmd/node/node.go @@ -61,6 +61,7 @@ import ( "github.com/oasislabs/oasis-core/go/worker/compute/executor" "github.com/oasislabs/oasis-core/go/worker/compute/merge" "github.com/oasislabs/oasis-core/go/worker/compute/txnscheduler" + workerConsensusRPC "github.com/oasislabs/oasis-core/go/worker/consensusrpc" workerKeymanager "github.com/oasislabs/oasis-core/go/worker/keymanager" "github.com/oasislabs/oasis-core/go/worker/registration" workerSentry "github.com/oasislabs/oasis-core/go/worker/sentry" @@ -126,6 +127,7 @@ type Node struct { P2P *p2p.P2P RegistrationWorker *registration.Worker KeymanagerWorker *workerKeymanager.Worker + ConsensusWorker *workerConsensusRPC.Worker } // Cleanup cleans up after the node has terminated. @@ -340,6 +342,13 @@ func (n *Node) initWorkers(logger *logging.Logger) error { } n.svcMgr.Register(n.TransactionSchedulerWorker) + // Initialize the public consensus services worker. + n.ConsensusWorker, err = workerConsensusRPC.New(n.CommonWorker, n.RegistrationWorker) + if err != nil { + return err + } + n.svcMgr.Register(n.ConsensusWorker) + return nil } @@ -384,8 +393,18 @@ func (n *Node) startWorkers(logger *logging.Logger) error { return err } + // Start the public consensus services worker. + if err := n.ConsensusWorker.Start(); err != nil { + return fmt.Errorf("consensus worker: %w", err) + } + // Only start the external gRPC server if any workers are enabled. - if n.StorageWorker.Enabled() || n.TransactionSchedulerWorker.Enabled() || n.MergeWorker.Enabled() || n.KeymanagerWorker.Enabled() { + if n.StorageWorker.Enabled() || + n.TransactionSchedulerWorker.Enabled() || + n.MergeWorker.Enabled() || + n.KeymanagerWorker.Enabled() || + n.ConsensusWorker.Enabled() { + if err := n.CommonWorker.Grpc.Start(); err != nil { logger.Error("failed to start external gRPC server", "err", err, @@ -793,6 +812,7 @@ func init() { workerCommon.Flags, workerStorage.Flags, workerSentry.Flags, + workerConsensusRPC.Flags, crash.InitFlags(), } { Flags.AddFlagSet(v) diff --git a/go/oasis-test-runner/oasis/args.go b/go/oasis-test-runner/oasis/args.go index 0ee717542df..7b6f45e229a 100644 --- a/go/oasis-test-runner/oasis/args.go +++ b/go/oasis-test-runner/oasis/args.go @@ -30,6 +30,7 @@ import ( "github.com/oasislabs/oasis-core/go/worker/common/p2p" "github.com/oasislabs/oasis-core/go/worker/compute" "github.com/oasislabs/oasis-core/go/worker/compute/txnscheduler" + workerConsensusRPC "github.com/oasislabs/oasis-core/go/worker/consensusrpc" "github.com/oasislabs/oasis-core/go/worker/keymanager" "github.com/oasislabs/oasis-core/go/worker/registration" workerSentry "github.com/oasislabs/oasis-core/go/worker/sentry" @@ -377,6 +378,11 @@ func (args *argBuilder) workerTxnschedulerCheckTxEnabled() *argBuilder { return args } +func (args *argBuilder) workerConsensusRPCEnabled() *argBuilder { + args.vec = append(args.vec, "--"+workerConsensusRPC.CfgWorkerEnabled) + return args +} + func (args *argBuilder) iasUseGenesis() *argBuilder { args.vec = append(args.vec, "--ias.use_genesis") return args diff --git a/go/oasis-test-runner/oasis/fixture.go b/go/oasis-test-runner/oasis/fixture.go index c84851ba998..b7c9a594792 100644 --- a/go/oasis-test-runner/oasis/fixture.go +++ b/go/oasis-test-runner/oasis/fixture.go @@ -133,6 +133,9 @@ type ConsensusFixture struct { // nolint: maligned // TendermintRecoverCorruptedWAL enables automatic recovery of corrupted Tendermint's WAL. TendermintRecoverCorruptedWAL bool `json:"tendermint_recover_corrupted_wal"` + + // EnableConsensusRPCWorker enables the public consensus RPC services worker. + EnableConsensusRPCWorker bool `json:"enable_consensusrpc_worker,omitempty"` } // TEEFixture is a TEE configuration fixture. diff --git a/go/oasis-test-runner/oasis/validator.go b/go/oasis-test-runner/oasis/validator.go index 67b2a2abe0c..d31fe8c9c7b 100644 --- a/go/oasis-test-runner/oasis/validator.go +++ b/go/oasis-test-runner/oasis/validator.go @@ -25,6 +25,7 @@ type Validator struct { tmAddress string consensusPort uint16 + clientPort uint16 } // ValidatorCfg is the Oasis validator provisioning configuration. @@ -66,6 +67,11 @@ func (val *Validator) ExportsPath() string { return nodeExportsPath(val.dir) } +// ExternalGRPCAddress returns the address of the node's external gRPC server. +func (val *Validator) ExternalGRPCAddress() string { + return fmt.Sprintf("127.0.0.1:%d", val.clientPort) +} + // Start starts an Oasis node. func (val *Validator) Start() error { return val.startNode() @@ -91,6 +97,10 @@ func (val *Validator) startNode() error { } else { args = args.appendSeedNodes(val.net) } + if val.consensus.EnableConsensusRPCWorker { + args = args.workerClientPort(val.clientPort). + workerConsensusRPCEnabled() + } if len(val.net.validators) >= 1 && val == val.net.validators[0] { args = args.supplementarysanityEnabled() @@ -130,6 +140,7 @@ func (net *Network) NewValidator(cfg *ValidatorCfg) (*Validator, error) { entity: cfg.Entity, sentries: cfg.Sentries, consensusPort: net.nextNodePort, + clientPort: net.nextNodePort + 1, } val.doStartNode = val.startNode @@ -191,7 +202,7 @@ func (net *Network) NewValidator(cfg *ValidatorCfg) (*Validator, error) { } net.validators = append(net.validators, val) - net.nextNodePort++ + net.nextNodePort += 2 if err := net.AddLogWatcher(&val.Node); err != nil { net.logger.Error("failed to add log watcher", diff --git a/go/oasis-test-runner/scenario/e2e/runtime.go b/go/oasis-test-runner/scenario/e2e/runtime.go index 52eba16d2f7..497d3605b19 100644 --- a/go/oasis-test-runner/scenario/e2e/runtime.go +++ b/go/oasis-test-runner/scenario/e2e/runtime.go @@ -193,9 +193,9 @@ func (sc *runtimeImpl) Fixture() (*oasis.NetworkFixture, error) { }, }, Validators: []oasis.ValidatorFixture{ - oasis.ValidatorFixture{Entity: 1}, - oasis.ValidatorFixture{Entity: 1}, - oasis.ValidatorFixture{Entity: 1}, + oasis.ValidatorFixture{Entity: 1, Consensus: oasis.ConsensusFixture{EnableConsensusRPCWorker: true}}, + oasis.ValidatorFixture{Entity: 1, Consensus: oasis.ConsensusFixture{EnableConsensusRPCWorker: true}}, + oasis.ValidatorFixture{Entity: 1, Consensus: oasis.ConsensusFixture{EnableConsensusRPCWorker: true}}, }, KeymanagerPolicies: []oasis.KeymanagerPolicyFixture{ oasis.KeymanagerPolicyFixture{Runtime: 0, Serial: 1}, diff --git a/go/storage/mkvs/checkpoint/checkpoint.go b/go/storage/mkvs/checkpoint/checkpoint.go index b4ae7e998d1..d916813b8ab 100644 --- a/go/storage/mkvs/checkpoint/checkpoint.go +++ b/go/storage/mkvs/checkpoint/checkpoint.go @@ -50,6 +50,10 @@ type ChunkProvider interface { type GetCheckpointsRequest struct { Version uint16 `json:"version"` Namespace common.Namespace `json:"namespace"` + + // RootVersion specifies an optional root version to limit the request to. If specified, only + // checkpoints for roots with the specific version will be considered. + RootVersion *uint64 `json:"root_version,omitempty"` } // Creator is a checkpoint creator. @@ -83,6 +87,10 @@ type Restorer interface { // StartRestore starts a checkpoint restoration process. StartRestore(ctx context.Context, checkpoint *Metadata) error + // GetCurrentCheckpoint returns the checkpoint that is being restored. If no restoration is in + // progress, this method may return nil. + GetCurrentCheckpoint() *Metadata + // RestoreChunk restores the given chunk into the underlying node database. // // This method requires that a restoration is in progress. @@ -125,16 +133,25 @@ type Metadata struct { Chunks []hash.Hash `json:"chunks"` } +// EncodedHash returns the encoded cryptographic hash of the checkpoint metadata. +func (m *Metadata) EncodedHash() hash.Hash { + var hh hash.Hash + + hh.From(m) + + return hh +} + // GetChunkMetadata returns the chunk metadata for the corresponding chunk. -func (c Metadata) GetChunkMetadata(idx uint64) (*ChunkMetadata, error) { - if idx >= uint64(len(c.Chunks)) { +func (m Metadata) GetChunkMetadata(idx uint64) (*ChunkMetadata, error) { + if idx >= uint64(len(m.Chunks)) { return nil, ErrChunkNotFound } return &ChunkMetadata{ - Version: c.Version, - Root: c.Root, + Version: m.Version, + Root: m.Root, Index: idx, - Digest: c.Chunks[int(idx)], + Digest: m.Chunks[int(idx)], }, nil } diff --git a/go/storage/mkvs/checkpoint/checkpoint_test.go b/go/storage/mkvs/checkpoint/checkpoint_test.go index 1865eab555e..5ac556a7c22 100644 --- a/go/storage/mkvs/checkpoint/checkpoint_test.go +++ b/go/storage/mkvs/checkpoint/checkpoint_test.go @@ -182,6 +182,9 @@ func TestFileCheckpointCreator(t *testing.T) { err = rs.StartRestore(ctx, cp) require.Error(err, "StartRestore should fail when a restore is already in progress") require.True(errors.Is(err, ErrRestoreAlreadyInProgress)) + rcp := rs.GetCurrentCheckpoint() + require.EqualValues(rcp, cp, "GetCurrentCheckpoint should return the checkpoint being restored") + require.NotSame(rcp, cp, "GetCurrentCheckpoint should return a copy") for i := 0; i < len(cp.Chunks); i++ { var cm *ChunkMetadata cm, err = cp.GetChunkMetadata(uint64(i)) diff --git a/go/storage/mkvs/checkpoint/file.go b/go/storage/mkvs/checkpoint/file.go index 13228a1d1e5..f615dec2f3e 100644 --- a/go/storage/mkvs/checkpoint/file.go +++ b/go/storage/mkvs/checkpoint/file.go @@ -110,7 +110,13 @@ func (fc *fileCreator) GetCheckpoints(ctx context.Context, request *GetCheckpoin return []*Metadata{}, nil } - matches, err := filepath.Glob(filepath.Join(fc.dataDir, "*", "*", checkpointMetadataFile)) + // Apply optional root version filter. + versionGlob := "*" + if request.RootVersion != nil { + versionGlob = strconv.FormatUint(*request.RootVersion, 10) + } + + matches, err := filepath.Glob(filepath.Join(fc.dataDir, versionGlob, "*", checkpointMetadataFile)) if err != nil { return nil, fmt.Errorf("checkpoint: failed to enumerate checkpoints: %w", err) } diff --git a/go/storage/mkvs/checkpoint/restorer.go b/go/storage/mkvs/checkpoint/restorer.go index 105077704a4..5d5d9c9255e 100644 --- a/go/storage/mkvs/checkpoint/restorer.go +++ b/go/storage/mkvs/checkpoint/restorer.go @@ -40,6 +40,18 @@ func (rs *restorer) StartRestore(ctx context.Context, checkpoint *Metadata) erro return nil } +func (rs *restorer) GetCurrentCheckpoint() *Metadata { + rs.Lock() + defer rs.Unlock() + + if rs.currentCheckpoint == nil { + return nil + } + + cp := *rs.currentCheckpoint + return &cp +} + // Implements Restorer. func (rs *restorer) RestoreChunk(ctx context.Context, idx uint64, r io.Reader) (bool, error) { chunk, err := func() (*ChunkMetadata, error) { diff --git a/go/worker/consensusrpc/worker.go b/go/worker/consensusrpc/worker.go new file mode 100644 index 00000000000..4376978c5a7 --- /dev/null +++ b/go/worker/consensusrpc/worker.go @@ -0,0 +1,105 @@ +// Package consensus implements publicly accessible consensus services. +package consensus + +import ( + "fmt" + + flag "github.com/spf13/pflag" + "github.com/spf13/viper" + + "github.com/oasislabs/oasis-core/go/common/logging" + "github.com/oasislabs/oasis-core/go/common/node" + consensus "github.com/oasislabs/oasis-core/go/consensus/api" + workerCommon "github.com/oasislabs/oasis-core/go/worker/common" + "github.com/oasislabs/oasis-core/go/worker/registration" +) + +const ( + // CfgWorkerEnabled enables the consensus RPC services worker. + CfgWorkerEnabled = "worker.consensusrpc.enabled" +) + +// Flags has the configuration flags. +var Flags = flag.NewFlagSet("", flag.ContinueOnError) + +// Worker is a worker providing publicly accessible consensus services. +// +// Currently this only exposes the consensus light client service. +type Worker struct { + enabled bool + + commonWorker *workerCommon.Worker + + quitCh chan struct{} + + logger *logging.Logger +} + +// Name returns the service name. +func (w *Worker) Name() string { + return "public consensus RPC services worker" +} + +// Enabled returns if worker is enabled. +func (w *Worker) Enabled() bool { + return w.enabled +} + +// Start starts the worker. +func (w *Worker) Start() error { + if w.enabled { + w.logger.Info("starting public consensus RPC services worker") + } + return nil +} + +// Stop halts the service. +func (w *Worker) Stop() { + close(w.quitCh) +} + +// Quit returns a channel that will be closed when the service terminates. +func (w *Worker) Quit() <-chan struct{} { + return w.quitCh +} + +// Cleanup performs the service specific post-termination cleanup. +func (w *Worker) Cleanup() { +} + +// New creates a new public consensus services worker. +func New(commonWorker *workerCommon.Worker, registration *registration.Worker) (*Worker, error) { + w := &Worker{ + enabled: Enabled(), + commonWorker: commonWorker, + quitCh: make(chan struct{}), + logger: logging.GetLogger("worker/consensusrpc"), + } + + if w.enabled { + // Register the consensus light client service. + consensus.RegisterLightService(commonWorker.Grpc.Server(), commonWorker.Consensus) + + // Publish our role to ease discovery for clients. + rp, err := registration.NewRoleProvider(node.RoleConsensusRPC) + if err != nil { + return nil, fmt.Errorf("failed to create role provider: %w", err) + } + + // The consensus RPC service is available immediately. + rp.SetAvailable(func(*node.Node) error { return nil }) + } + + return w, nil +} + +// Enabled reads our enabled flag from viper. +func Enabled() bool { + return viper.GetBool(CfgWorkerEnabled) +} + +func init() { + Flags.Bool(CfgWorkerEnabled, false, "Enable public consensus RPC services worker") + + _ = viper.BindPFlags(Flags) +}