Skip to content

Commit

Permalink
go/roothash: Add support for observing gossiped executor commitments
Browse files Browse the repository at this point in the history
  • Loading branch information
kostko committed May 19, 2023
1 parent 53404a8 commit e121a5d
Show file tree
Hide file tree
Showing 5 changed files with 109 additions and 3 deletions.
9 changes: 7 additions & 2 deletions go/consensus/tendermint/apps/roothash/roothash.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/oasisprotocol/oasis-core/go/common/crypto/signature"
"github.com/oasisprotocol/oasis-core/go/common/logging"
"github.com/oasisprotocol/oasis-core/go/common/node"
"github.com/oasisprotocol/oasis-core/go/common/pubsub"
"github.com/oasisprotocol/oasis-core/go/consensus/api/transaction"
tmapi "github.com/oasisprotocol/oasis-core/go/consensus/tendermint/api"
governanceApi "github.com/oasisprotocol/oasis-core/go/consensus/tendermint/apps/governance/api"
Expand All @@ -38,6 +39,8 @@ var _ tmapi.Application = (*rootHashApplication)(nil)
type rootHashApplication struct {
state tmapi.ApplicationState
md tmapi.MessageDispatcher

ecNotifier *pubsub.Broker
}

func (app *rootHashApplication) Name() string {
Expand Down Expand Up @@ -818,6 +821,8 @@ func (app *rootHashApplication) tryFinalizeBlock(
}

// New constructs a new roothash application instance.
func New() tmapi.Application {
return &rootHashApplication{}
func New(ecNotifier *pubsub.Broker) tmapi.Application {
return &rootHashApplication{
ecNotifier: ecNotifier,
}
}
7 changes: 7 additions & 0 deletions go/consensus/tendermint/apps/roothash/transactions.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,13 @@ func (app *rootHashApplication) executorCommit(
cc *roothash.ExecutorCommit,
) (err error) {
if ctx.IsCheckOnly() {
// In case an executor commit notifier has been set up, push all commits into channel.
if app.ecNotifier != nil {
for _, ec := range cc.Commits {
ec := ec
app.ecNotifier.Broadcast(&ec)
}
}
return nil
}

Expand Down
18 changes: 17 additions & 1 deletion go/consensus/tendermint/roothash/roothash.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
app "github.com/oasisprotocol/oasis-core/go/consensus/tendermint/apps/roothash"
"github.com/oasisprotocol/oasis-core/go/roothash/api"
"github.com/oasisprotocol/oasis-core/go/roothash/api/block"
"github.com/oasisprotocol/oasis-core/go/roothash/api/commitment"
"github.com/oasisprotocol/oasis-core/go/roothash/api/message"
runtimeRegistry "github.com/oasisprotocol/oasis-core/go/runtime/registry"
)
Expand Down Expand Up @@ -78,6 +79,8 @@ type serviceClient struct {
trackedRuntime map[common.Namespace]*trackedRuntime

pruneHandler *pruneHandler

ecNotifier *pubsub.Broker
}

// Implements api.Backend.
Expand Down Expand Up @@ -236,6 +239,15 @@ func (sc *serviceClient) WatchEvents(ctx context.Context, id common.Namespace) (
return ch, sub, nil
}

// Implements api.Backend.
func (sc *serviceClient) WatchExecutorCommitments(ctx context.Context) (<-chan *commitment.ExecutorCommitment, pubsub.ClosableSubscription, error) {
sub := sc.ecNotifier.Subscribe()
ch := make(chan *commitment.ExecutorCommitment)
sub.Unwrap(ch)

return ch, sub, nil
}

// Implements api.Backend.
func (sc *serviceClient) TrackRuntime(ctx context.Context, history api.BlockHistory) error {
sc.pruneHandler.trackRuntime(history)
Expand Down Expand Up @@ -851,8 +863,11 @@ func New(
dataDir string,
backend tmapi.Backend,
) (ServiceClient, error) {
// Create the general executor commitment notifier.
ecNotifier := pubsub.NewBroker(false)

// Initialize and register the tendermint service component.
a := app.New()
a := app.New(ecNotifier)
if err := backend.RegisterApplication(a); err != nil {
return nil, err
}
Expand All @@ -876,6 +891,7 @@ func New(
cmdCh: make(chan interface{}, runtimeRegistry.MaxRuntimeCount),
trackedRuntime: make(map[common.Namespace]*trackedRuntime),
pruneHandler: ph,
ecNotifier: ecNotifier,
}, nil
}

Expand Down
7 changes: 7 additions & 0 deletions go/roothash/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,13 @@ type Backend interface {
// WatchEvents returns a stream of protocol events.
WatchEvents(ctx context.Context, runtimeID common.Namespace) (<-chan *Event, pubsub.ClosableSubscription, error)

// WatchExecutorCommitments returns a channel that produces a stream of executor commitments
// observed in the consensus layer P2P network.
//
// Note that these commitments may not have been processed by consensus, commitments may be
// received in any order and duplicates are possible.
WatchExecutorCommitments(ctx context.Context) (<-chan *commitment.ExecutorCommitment, pubsub.ClosableSubscription, error)

// TrackRuntime adds a runtime the history of which should be tracked.
TrackRuntime(ctx context.Context, history BlockHistory) error

Expand Down
71 changes: 71 additions & 0 deletions go/roothash/api/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
cmnGrpc "github.com/oasisprotocol/oasis-core/go/common/grpc"
"github.com/oasisprotocol/oasis-core/go/common/pubsub"
"github.com/oasisprotocol/oasis-core/go/roothash/api/block"
"github.com/oasisprotocol/oasis-core/go/roothash/api/commitment"
"github.com/oasisprotocol/oasis-core/go/roothash/api/message"
)

Expand Down Expand Up @@ -39,6 +40,8 @@ var (
methodWatchBlocks = serviceName.NewMethod("WatchBlocks", common.Namespace{})
// methodWatchEvents is the WatchEvents method.
methodWatchEvents = serviceName.NewMethod("WatchEvents", common.Namespace{})
// methodWatchExecutorCommitments is the WatchExecutorCommitments method.
methodWatchExecutorCommitments = serviceName.NewMethod("WatchExecutorCommitments", nil)

// serviceDesc is the gRPC service descriptor.
serviceDesc = grpc.ServiceDesc{
Expand Down Expand Up @@ -93,6 +96,11 @@ var (
Handler: handlerWatchEvents,
ServerStreams: true,
},
{
StreamName: methodWatchExecutorCommitments.ShortName(),
Handler: handlerWatchExecutorCommitments,
ServerStreams: true,
},
},
}
)
Expand Down Expand Up @@ -362,6 +370,34 @@ func handlerWatchEvents(srv interface{}, stream grpc.ServerStream) error {
}
}

func handlerWatchExecutorCommitments(srv interface{}, stream grpc.ServerStream) error {
if err := stream.RecvMsg(nil); err != nil {
return err
}

ctx := stream.Context()
ch, sub, err := srv.(Backend).WatchExecutorCommitments(ctx)
if err != nil {
return err
}
defer sub.Close()

for {
select {
case ec, ok := <-ch:
if !ok {
return nil
}

if err := stream.SendMsg(ec); err != nil {
return err
}
case <-ctx.Done():
return ctx.Err()
}
}
}

// RegisterService registers a new roothash service with the given gRPC server.
func RegisterService(server *grpc.Server, service Backend) {
server.RegisterService(&serviceDesc, service)
Expand Down Expand Up @@ -520,6 +556,41 @@ func (c *roothashClient) WatchEvents(ctx context.Context, runtimeID common.Names
return ch, sub, nil
}

func (c *roothashClient) WatchExecutorCommitments(ctx context.Context) (<-chan *commitment.ExecutorCommitment, pubsub.ClosableSubscription, error) {
ctx, sub := pubsub.NewContextSubscription(ctx)

stream, err := c.conn.NewStream(ctx, &serviceDesc.Streams[1], methodWatchExecutorCommitments.FullName())
if err != nil {
return nil, nil, err
}
if err = stream.SendMsg(nil); err != nil {
return nil, nil, err
}
if err = stream.CloseSend(); err != nil {
return nil, nil, err
}

ch := make(chan *commitment.ExecutorCommitment)
go func() {
defer close(ch)

for {
var ec commitment.ExecutorCommitment
if serr := stream.RecvMsg(&ec); serr != nil {
return
}

select {
case ch <- &ec:
case <-ctx.Done():
return
}
}
}()

return ch, sub, nil
}

// NewRootHashClient creates a new gRPC roothash client service.
func NewRootHashClient(c *grpc.ClientConn) Backend {
return &roothashClient{
Expand Down

0 comments on commit e121a5d

Please sign in to comment.