Skip to content

Commit

Permalink
go/roothash: Add incoming runtime messages as per ADR 0011
Browse files Browse the repository at this point in the history
  • Loading branch information
kostko committed Jan 28, 2022
1 parent 7d8cb15 commit 6a038f4
Show file tree
Hide file tree
Showing 40 changed files with 1,296 additions and 160 deletions.
3 changes: 3 additions & 0 deletions .changelog/4415.breaking.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
go/roothash: Add incoming runtime messages as per [ADR 0011]

[ADR 0011]: docs/adr/0011-incoming-runtime-messages.md
45 changes: 31 additions & 14 deletions docs/adr/0011-incoming-runtime-messages.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## Changelog

- 2022-01-07: Update based on insights from implementation
- 2021-12-09: Introduce an explicit fee field, clarify token transfers
- 2021-10-26: Initial draft

Expand Down Expand Up @@ -58,6 +59,10 @@ type IncomingMessage struct {
// Caller is the address of the caller authenticated by the consensus layer.
Caller staking.Address `json:"caller"`

// Tag is an optional tag provided by the caller which is ignored and can be used to match
// processed incoming message events later.
Tag uint64 `json:"tag,omitempty"`

// Fee is the fee sent into the runtime as part of the message being sent.
// The fee is transferred before the message is processed by the runtime.
Fee quantity.Quantity `json:"fee,omitempty"`
Expand All @@ -81,8 +86,10 @@ specify the number and hash of incoming messages included in a batch as follows:
type ComputeResultsHeader struct {
// ... existing fields omitted ...

InMessagesCount uint64 `json:"in_msgs_count,omitempty"`
// InMessagesHash is the hash of processed incoming messages.
InMessagesHash *hash.Hash `json:"in_msgs_hash,omitempty"`
// InMessagesCount is the number of processed incoming messages.
InMessagesCount uint32 `json:"in_msgs_count,omitempty"`
}
```

Expand Down Expand Up @@ -150,12 +157,12 @@ type RuntimeStakingParameters struct {
This proposal introduces/updates the following consensus state in the roothash
module:

- **Incoming message queue metadata (`0x27`)**
- **Incoming message queue metadata (`0x28`)**

Metadata for the incoming message queue.

```
0x27 <H(runtime-id) (hash.Hash)>
0x28 <H(runtime-id) (hash.Hash)>
```

The value is the following CBOR-serialized structure:
Expand All @@ -171,13 +178,13 @@ module:
}
```

- **Incoming message queue item (`0x28`)**
- **Incoming message queue item (`0x29`)**

A queue of incoming messages pending to be delivered to the runtime in the
next round.

```
0x28 <H(runtime-id) (hash.Hash)> <sequence-no (uint64)>
0x29 <H(runtime-id) (hash.Hash)> <sequence-no (uint64)>
```

The value is a CBOR-serialized `IncomingMessage` structure.
Expand Down Expand Up @@ -259,28 +266,38 @@ submit message method the following actions are performed:
This proposal adds the following new query methods in the roothash module by
updating the `roothash.Backend` interface as follows:

<!-- markdownlint-disable line-length -->
```golang
type Backend interface {
// ... existing methods omitted ...

// HasIncomingMessages checks whether the given runtime has any incoming
// messages queued.
HasIncomingMessages(ctx context.Context, query *RuntimeQuery) (bool, error)
// GetIncomingMessageQueueMeta returns the given runtime's incoming message queue metadata.
GetIncomingMessageQueueMeta(ctx context.Context, request *RuntimeRequest) (*message.IncomingMessageQueueMeta, error)

// IncomingMessages returns a list of queued incoming messages for the given
// runtime.
IncomingMessages(ctx context.Context, query *IncomingMessagesQuery) ([]*IncomingMessage, error)
// GetIncomingMessageQueue returns the given runtime's queued incoming messages.
GetIncomingMessageQueue(ctx context.Context, request *InMessageQueueRequest) ([]*message.IncomingMessage, error)
}

// IncomingMessageQueueMeta is the incoming message queue metadata.
type IncomingMessageQueueMeta struct {
// Size contains the current size of the queue.
Size uint32 `json:"size,omitempty"`

// NextSequenceNumber contains the sequence number that should be used for the next queued
// message.
NextSequenceNumber uint64 `json:"next_sequence_number,omitempty"`
}

type IncomingMessagesQuery struct {
// InMessageQueueRequest is a request for queued incoming messages.
type InMessageQueueRequest struct {
RuntimeID common.Namespace `json:"runtime_id"`
Height int64 `json:"height"`

// Offset specifies the lowest message sequence number that should be
// considered in the returned list of messages.
Offset uint64 `json:"offset,omitempty"`
Limit uint32 `json:"limit,omitempty"`
}
```
<!-- markdownlint-enable line-length -->

### Runtime Host Protocol

Expand Down
15 changes: 10 additions & 5 deletions go/common/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,7 @@ func (n *Namespace) UnmarshalText(text []byte) error {
// For backwards compatibility (e.g. to be able to load the
// Cobalt Upgrade genesis file), fallback to accepting
// Base64-encoded namespace identifiers.
b, err := base64.StdEncoding.DecodeString(string(text))
if err != nil {
return err
}
return n.UnmarshalBinary(b)
return n.UnmarshalBase64(text)
}
return nil
}
Expand All @@ -98,6 +94,15 @@ func (n *Namespace) UnmarshalHex(text string) error {
return n.UnmarshalBinary(b)
}

// UnmarshalBase64 deserializes a Base64 text string into the given type.
func (n *Namespace) UnmarshalBase64(text []byte) error {
b, err := base64.StdEncoding.DecodeString(string(text))
if err != nil {
return err
}
return n.UnmarshalBinary(b)
}

// Equal compares vs another namespace for equality.
func (n *Namespace) Equal(cmp *Namespace) bool {
if cmp == nil {
Expand Down
11 changes: 11 additions & 0 deletions go/consensus/tendermint/apps/roothash/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
roothashState "github.com/oasisprotocol/oasis-core/go/consensus/tendermint/apps/roothash/state"
roothash "github.com/oasisprotocol/oasis-core/go/roothash/api"
"github.com/oasisprotocol/oasis-core/go/roothash/api/block"
"github.com/oasisprotocol/oasis-core/go/roothash/api/message"
)

// Query is the roothash query interface.
Expand All @@ -16,6 +17,8 @@ type Query interface {
GenesisBlock(context.Context, common.Namespace) (*block.Block, error)
RuntimeState(context.Context, common.Namespace) (*roothash.RuntimeState, error)
LastRoundResults(context.Context, common.Namespace) (*roothash.RoundResults, error)
IncomingMessageQueueMeta(context.Context, common.Namespace) (*message.IncomingMessageQueueMeta, error)
IncomingMessageQueue(ctx context.Context, id common.Namespace, offset uint64, limit uint32) ([]*message.IncomingMessage, error)
Genesis(context.Context) (*roothash.Genesis, error)
ConsensusParameters(context.Context) (*roothash.ConsensusParameters, error)
}
Expand Down Expand Up @@ -62,6 +65,14 @@ func (rq *rootHashQuerier) LastRoundResults(ctx context.Context, id common.Names
return rq.state.LastRoundResults(ctx, id)
}

func (rq *rootHashQuerier) IncomingMessageQueueMeta(ctx context.Context, id common.Namespace) (*message.IncomingMessageQueueMeta, error) {
return rq.state.IncomingMessageQueueMeta(ctx, id)
}

func (rq *rootHashQuerier) IncomingMessageQueue(ctx context.Context, id common.Namespace, offset uint64, limit uint32) ([]*message.IncomingMessage, error) {
return rq.state.IncomingMessageQueue(ctx, id, offset, limit)
}

func (rq *rootHashQuerier) ConsensusParameters(ctx context.Context) (*roothash.ConsensusParameters, error) {
return rq.state.ConsensusParameters(ctx)
}
Expand Down
109 changes: 86 additions & 23 deletions go/consensus/tendermint/apps/roothash/roothash.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
roothash "github.com/oasisprotocol/oasis-core/go/roothash/api"
"github.com/oasisprotocol/oasis-core/go/roothash/api/block"
"github.com/oasisprotocol/oasis-core/go/roothash/api/commitment"
"github.com/oasisprotocol/oasis-core/go/roothash/api/message"
scheduler "github.com/oasisprotocol/oasis-core/go/scheduler/api"
staking "github.com/oasisprotocol/oasis-core/go/staking/api"
)
Expand Down Expand Up @@ -372,6 +373,13 @@ func (app *rootHashApplication) ExecuteTx(ctx *tmapi.Context, tx *transaction.Tr
}

return app.submitEvidence(ctx, state, &ev)
case roothash.MethodSubmitMsg:
var msg roothash.SubmitMsg
if err := cbor.Unmarshal(tx.Body, &msg); err != nil {
return err
}

return app.submitMsg(ctx, state, &msg)
default:
return roothash.ErrInvalidArgument
}
Expand Down Expand Up @@ -508,9 +516,6 @@ func (app *rootHashApplication) processRoundTimeout(ctx *tmapi.Context, state *r
return fmt.Errorf("failed to finalize block: %w", err)
}

if err = state.SetRuntimeState(ctx, rtState); err != nil {
return fmt.Errorf("failed to set runtime state: %w", err)
}
return nil
}

Expand Down Expand Up @@ -560,6 +565,59 @@ func (app *rootHashApplication) tryFinalizeExecutorCommits(

ec := commit.ToDDResult().(*commitment.ExecutorCommitment)

// Update the incoming message queue by removing processed messages. Do one final check to
// make sure that the processed messages actually correspond to the provided hash.
state := roothashState.NewMutableState(ctx.State())
if ec.Header.InMessagesCount > 0 {
var meta *message.IncomingMessageQueueMeta
meta, err = state.IncomingMessageQueueMeta(ctx, rtState.Runtime.ID)
if err != nil {
return fmt.Errorf("failed to fetch incoming message queue metadata: %w", err)
}
var msgs []*message.IncomingMessage
msgs, err = state.IncomingMessageQueue(ctx, rtState.Runtime.ID, 0, ec.Header.InMessagesCount)
if err != nil {
return fmt.Errorf("failed to fetch incoming message queue: %w", err)
}
if inMsgsHash := message.InMessagesHash(msgs); !ec.Header.InMessagesHash.Equal(&inMsgsHash) {
ctx.Logger().Debug("finalized round contained invalid incoming message hash, failing instead",
"in_msgs_hash", inMsgsHash,
"ec_in_msgs_hash", *ec.Header.InMessagesHash,
)
// Make the round fail.
err = fmt.Errorf("finalized round contained invalid incoming message hash")
// TODO: All nodes contributing to this round should be penalized.
break
}
for _, msg := range msgs {
err = state.RemoveIncomingMessageFromQueue(ctx, rtState.Runtime.ID, msg.ID)
if err != nil {
return fmt.Errorf("failed to remove processed incoming message from queue: %w", err)
}

if meta.Size == 0 {
// This should NEVER happen.
return tmapi.UnavailableStateError(fmt.Errorf("inconsistent queue size (state corruption?)"))
}
meta.Size--

ctx.EmitEvent(
tmapi.NewEventBuilder(app.Name()).
TypedAttribute(&roothash.InMsgProcessedEvent{
ID: msg.ID,
Round: round,
Caller: msg.Caller,
Tag: msg.Tag,
}).
Attribute(KeyRuntimeID, ValueRuntimeID(rtState.Runtime.ID)),
)
}
err = state.SetIncomingMessageQueueMeta(ctx, rtState.Runtime.ID, meta)
if err != nil {
return fmt.Errorf("failed to set incoming message queue metadata: %w", err)
}
}

// Process any runtime messages.
var messageResults []*roothash.MessageEvent
if messageResults, err = app.processRuntimeMessages(ctx, rtState, ec.Messages); err != nil {
Expand Down Expand Up @@ -634,6 +692,7 @@ func (app *rootHashApplication) tryFinalizeExecutorCommits(
blk.Header.IORoot = *ec.Header.IORoot
blk.Header.StateRoot = *ec.Header.StateRoot
blk.Header.MessagesHash = *ec.Header.MessagesHash
blk.Header.InMessagesHash = *ec.Header.InMessagesHash

// Timeout will be cleared by caller.
pool.ResetCommitments(blk.Header.Round)
Expand All @@ -645,7 +704,6 @@ func (app *rootHashApplication) tryFinalizeExecutorCommits(
rtState.LastNormalHeight = ctx.BlockHeight() + 1

// Set last normal round results.
state := roothashState.NewMutableState(ctx.State())
err = state.SetLastRoundResults(ctx, rtState.Runtime.ID, &roothash.RoundResults{
Messages: messageResults,
GoodComputeEntities: goodComputeEntities,
Expand Down Expand Up @@ -699,23 +757,22 @@ func (app *rootHashApplication) tryFinalizeBlock(
ctx *tmapi.Context,
rtState *roothash.RuntimeState,
forced bool,
) (err error) {
defer func(previousTimeout int64) {
if err != nil {
return
}
) error {
ctx = ctx.NewTransaction()
defer ctx.Close()

// Do not re-arm the round timeout if the timeout has not changed.
nextTimeout := rtState.ExecutorPool.NextTimeout
if previousTimeout == nextTimeout {
return
}
state := roothashState.NewMutableState(ctx.State())
previousTimeout := rtState.ExecutorPool.NextTimeout

state := roothashState.NewMutableState(ctx.State())
if err := app.tryFinalizeExecutorCommits(ctx, rtState, forced); err != nil {
return err
}

// Do not re-arm the round timeout if the timeout has not changed.
if nextTimeout := rtState.ExecutorPool.NextTimeout; previousTimeout != nextTimeout {
if previousTimeout != commitment.TimeoutNever {
if err = state.ClearRoundTimeout(ctx, rtState.Runtime.ID, previousTimeout); err != nil {
err = fmt.Errorf("failed to clear round timeout: %w", err)
return
if err := state.ClearRoundTimeout(ctx, rtState.Runtime.ID, previousTimeout); err != nil {
return fmt.Errorf("failed to clear round timeout: %w", err)
}
}

Expand All @@ -729,14 +786,20 @@ func (app *rootHashApplication) tryFinalizeBlock(
"height", ctx.BlockHeight(),
"next_timeout", nextTimeout,
)
if err = state.ScheduleRoundTimeout(ctx, rtState.Runtime.ID, nextTimeout); err != nil {
err = fmt.Errorf("failed to schedule round timeout: %w", err)
return
if err := state.ScheduleRoundTimeout(ctx, rtState.Runtime.ID, nextTimeout); err != nil {
return fmt.Errorf("failed to schedule round timeout: %w", err)
}
}
}(rtState.ExecutorPool.NextTimeout)
}

return app.tryFinalizeExecutorCommits(ctx, rtState, forced)
// Update runtime state.
if err := state.SetRuntimeState(ctx, rtState); err != nil {
return fmt.Errorf("failed to set runtime state: %w", err)
}

ctx.Commit()

return nil
}

// New constructs a new roothash application instance.
Expand Down
Loading

0 comments on commit 6a038f4

Please sign in to comment.