diff --git a/.changelog/4438.feature.md b/.changelog/4438.feature.md new file mode 100644 index 00000000000..e93e83459d3 --- /dev/null +++ b/.changelog/4438.feature.md @@ -0,0 +1,4 @@ +Add support for runtime schedule control + +This feature gives runtimes control over scheduling transactions inside a +block, so the runtimes may implement their own more advanced policies. diff --git a/go/oasis-test-runner/scenario/e2e/runtime/keymanager_restart.go b/go/oasis-test-runner/scenario/e2e/runtime/keymanager_restart.go index fc376be847d..e73e78786a8 100644 --- a/go/oasis-test-runner/scenario/e2e/runtime/keymanager_restart.go +++ b/go/oasis-test-runner/scenario/e2e/runtime/keymanager_restart.go @@ -24,6 +24,18 @@ func newKmRestartImpl() scenario.Scenario { } } +func (sc *kmRestartImpl) Fixture() (*oasis.NetworkFixture, error) { + f, err := sc.runtimeImpl.Fixture() + if err != nil { + return nil, err + } + + // The round is allowed to fail until the keymanager becomes available after restart. + f.Network.DefaultLogWatcherHandlerFactories = nil + + return f, nil +} + func (sc *kmRestartImpl) Clone() scenario.Scenario { return &kmRestartImpl{ runtimeImpl: *sc.runtimeImpl.Clone().(*runtimeImpl), diff --git a/go/runtime/host/host.go b/go/runtime/host/host.go index fd686d250b0..ece649027f0 100644 --- a/go/runtime/host/host.go +++ b/go/runtime/host/host.go @@ -45,6 +45,9 @@ type Runtime interface { // ID is the runtime identifier. ID() common.Namespace + // GetInfo retrieves the runtime information. + GetInfo(ctx context.Context) (*protocol.RuntimeInfoResponse, error) + // Call sends a request message to the runtime over the Runtime Host Protocol and waits for the // response (which may be a failure). Call(ctx context.Context, body *protocol.Body) (*protocol.Body, error) diff --git a/go/runtime/host/mock/mock.go b/go/runtime/host/mock/mock.go index 913171ac193..51606ab5199 100644 --- a/go/runtime/host/mock/mock.go +++ b/go/runtime/host/mock/mock.go @@ -11,6 +11,7 @@ import ( "github.com/oasisprotocol/oasis-core/go/common/crypto/hash" "github.com/oasisprotocol/oasis-core/go/common/errors" "github.com/oasisprotocol/oasis-core/go/common/pubsub" + "github.com/oasisprotocol/oasis-core/go/common/version" "github.com/oasisprotocol/oasis-core/go/roothash/api/commitment" "github.com/oasisprotocol/oasis-core/go/runtime/host" "github.com/oasisprotocol/oasis-core/go/runtime/host/protocol" @@ -43,6 +44,15 @@ func (r *runtime) ID() common.Namespace { return r.runtimeID } +// Implements host.Runtime. +func (r *runtime) GetInfo(ctx context.Context) (rsp *protocol.RuntimeInfoResponse, err error) { + return &protocol.RuntimeInfoResponse{ + ProtocolVersion: version.RuntimeHostProtocol, + RuntimeVersion: version.MustFromString("0.0.0"), + Features: nil, + }, nil +} + // Implements host.Runtime. func (r *runtime) Call(ctx context.Context, body *protocol.Body) (*protocol.Body, error) { switch { diff --git a/go/runtime/host/protocol/connection.go b/go/runtime/host/protocol/connection.go index 1eefc8c6316..d683b61c41f 100644 --- a/go/runtime/host/protocol/connection.go +++ b/go/runtime/host/protocol/connection.go @@ -91,6 +91,9 @@ type Connection interface { // Close closes the connection. Close() + // GetInfo retrieves the runtime information. + GetInfo(ctx context.Context) (*RuntimeInfoResponse, error) + // Call sends a request to the other side and returns the response or error. Call(ctx context.Context, body *Body) (*Body, error) @@ -200,6 +203,8 @@ type connection struct { // nolint: maligned pendingRequests map[uint64]chan *Body nextRequestID uint64 + info *RuntimeInfoResponse + outCh chan *Message closeCh chan struct{} quitWg sync.WaitGroup @@ -254,6 +259,17 @@ func (c *connection) Close() { c.quitWg.Wait() } +// Implements Connection. +func (c *connection) GetInfo(ctx context.Context) (*RuntimeInfoResponse, error) { + c.Lock() + defer c.Unlock() + + if c.info == nil { + return nil, ErrNotReady + } + return c.info, nil +} + // Implements Connection. func (c *connection) Call(ctx context.Context, body *Body) (*Body, error) { if c.getState() != stateReady { @@ -551,6 +567,7 @@ func (c *connection) InitHost(ctx context.Context, conn net.Conn, hi *HostInfo) // Transition the protocol state to Ready. c.Lock() c.setStateLocked(stateReady) + c.info = info c.Unlock() return &rtVersion, nil diff --git a/go/runtime/host/protocol/connection_test.go b/go/runtime/host/protocol/connection_test.go index 4e399552a86..d82282ffe6b 100644 --- a/go/runtime/host/protocol/connection_test.go +++ b/go/runtime/host/protocol/connection_test.go @@ -94,6 +94,12 @@ func TestEchoRequestResponse(t *testing.T) { require.Panics(func() { _ = protoA.InitGuest(context.Background(), connA) }, "connection reinit should panic") require.Panics(func() { _, _ = protoB.InitHost(context.Background(), connB, &HostInfo{}) }, "connection reinit should panic") require.Panics(func() { _ = protoB.InitGuest(context.Background(), connB) }, "connection reinit should panic") + + _, err = protoA.GetInfo(context.Background()) + require.Error(err, "GetInfo should fail for guest connections") + info, err := protoB.GetInfo(context.Background()) + require.NoError(err, "GetInfo should succeed for host connections") + require.EqualValues(version.RuntimeHostProtocol, info.ProtocolVersion) } func TestBigMessage(t *testing.T) { diff --git a/go/runtime/host/protocol/types.go b/go/runtime/host/protocol/types.go index 2b175fe2d5d..5f8ed058249 100644 --- a/go/runtime/host/protocol/types.go +++ b/go/runtime/host/protocol/types.go @@ -103,6 +103,8 @@ type Body struct { HostLocalStorageSetResponse *Empty `json:",omitempty"` HostFetchConsensusBlockRequest *HostFetchConsensusBlockRequest `json:",omitempty"` HostFetchConsensusBlockResponse *HostFetchConsensusBlockResponse `json:",omitempty"` + HostFetchTxBatchRequest *HostFetchTxBatchRequest `json:",omitempty"` + HostFetchTxBatchResponse *HostFetchTxBatchResponse `json:",omitempty"` } // Type returns the message type by determining the name of the first non-nil member. @@ -151,13 +153,35 @@ type RuntimeInfoRequest struct { LocalConfig map[string]interface{} `json:"local_config,omitempty"` } -// RuntimeInfoResponse is a worker info response message body. +// Features is a set of supported runtime features. +type Features struct { + // ScheduleControl is the schedule control feature. + ScheduleControl *FeatureScheduleControl `json:"schedule_control,omitempty"` +} + +// HasScheduleControl returns true when the runtime supports the schedule control feature. +func (f *Features) HasScheduleControl() bool { + return f != nil && f.ScheduleControl != nil +} + +// FeatureScheduleControl is a feature specifying that the runtime supports controlling the +// scheduling of batches. This means that the scheduler should only take priority into account and +// ignore weights, leaving it up to the runtime to decide which transactions to include. +type FeatureScheduleControl struct { + // InitialBatchSize is the size of the initial batch of transactions. + InitialBatchSize uint32 `json:"initial_batch_size"` +} + +// RuntimeInfoResponse is a runtime info response message body. type RuntimeInfoResponse struct { - // ProtocolVersion is the runtime protocol version supported by the worker. + // ProtocolVersion is the runtime protocol version supported by the runtime. ProtocolVersion version.Version `json:"protocol_version"` // RuntimeVersion is the version of the runtime. RuntimeVersion version.Version `json:"runtime_version"` + + // Features describe the features supported by the runtime. + Features *Features `json:"features,omitempty"` } // RuntimeCapabilityTEERakInitRequest is a worker RFC 0009 CapabilityTEE @@ -281,8 +305,27 @@ func (b *ComputedBatch) String() string { return "" } +// ExecutionMode is the batch execution mode. +type ExecutionMode uint8 + +const ( + // ExecutionModeExecute is the execution mode where the batch of transactions is executed as-is + // without the ability to perform any modifications to the batch. + ExecutionModeExecute = 0 + // ExecutionModeSchedule is the execution mode where the runtime is in control of scheduling and + // may arbitrarily modify the batch during execution. + // + // This execution mode will only be used in case the runtime advertises to support the schedule + // control feature. In this case the call will only contain up to InitialBatchSize transactions + // and the runtime will need to request more if it needs more. + ExecutionModeSchedule = 1 +) + // RuntimeExecuteTxBatchRequest is a worker execute tx batch request message body. type RuntimeExecuteTxBatchRequest struct { + // Mode is the execution mode. + Mode ExecutionMode `json:"mode,omitempty"` + // ConsensusBlock is the consensus light block at the last finalized round // height (e.g., corresponding to .Block.Header.Round). ConsensusBlock consensus.LightBlock `json:"consensus_block"` @@ -311,6 +354,16 @@ type RuntimeExecuteTxBatchRequest struct { type RuntimeExecuteTxBatchResponse struct { Batch ComputedBatch `json:"batch"` BatchWeightLimits map[transaction.Weight]uint64 `json:"batch_weight_limits"` + + // TxHashes are the transaction hashes of the included batch. + TxHashes []hash.Hash `json:"tx_hashes,omitempty"` + // TxRejectHashes are the transaction hashes of transactions that should be immediately removed + // from the scheduling queue as they are invalid. + TxRejectHashes []hash.Hash `json:"tx_reject_hashes,omitempty"` + // TxInputRoot is the root hash of all transaction inputs. + TxInputRoot hash.Hash `json:"tx_input_root,omitempty"` + // TxInputWriteLog is the write log for generating transaction inputs. + TxInputWriteLog storage.WriteLog `json:"tx_input_write_log,omitempty"` } // RuntimeKeyManagerPolicyUpdateRequest is a runtime key manager policy request @@ -409,3 +462,19 @@ type HostFetchConsensusBlockRequest struct { type HostFetchConsensusBlockResponse struct { Block consensus.LightBlock `json:"block"` } + +// HostFetchTxBatchRequest is a request to host to fetch a further batch of transactions. +type HostFetchTxBatchRequest struct { + // Offset specifies the transaction hash that should serve as an offset when returning + // transactions from the pool. Transactions will be skipped until the given hash is encountered + // and only following transactions will be returned. + Offset *hash.Hash `json:"offset,omitempty"` + // Limit specifies the maximum size of the batch to request. + Limit uint32 `json:"limit"` +} + +// HostFetchTxBatchResponse is a response from host fetching the given transaction batch. +type HostFetchTxBatchResponse struct { + // Batch is a batch of transactions. + Batch [][]byte `json:"batch,omitempty"` +} diff --git a/go/runtime/host/sandbox/sandbox.go b/go/runtime/host/sandbox/sandbox.go index 6a78ea7ff06..435e9bec29c 100644 --- a/go/runtime/host/sandbox/sandbox.go +++ b/go/runtime/host/sandbox/sandbox.go @@ -106,14 +106,31 @@ func (r *sandboxedRuntime) ID() common.Namespace { return r.rtCfg.RuntimeID } +// Implements host.Runtime. +func (r *sandboxedRuntime) GetInfo(ctx context.Context) (rsp *protocol.RuntimeInfoResponse, err error) { + callFn := func() error { + r.RLock() + defer r.RUnlock() + + if r.conn == nil { + return fmt.Errorf("runtime is not ready") + } + rsp, err = r.conn.GetInfo(ctx) + return err + } + + // Retry call in case the runtime is not yet ready. + err = backoff.Retry(callFn, backoff.WithContext(cmnBackoff.NewExponentialBackOff(), ctx)) + return +} + // Implements host.Runtime. func (r *sandboxedRuntime) Call(ctx context.Context, body *protocol.Body) (rsp *protocol.Body, err error) { callFn := func() error { r.RLock() - conn := r.conn - r.RUnlock() + defer r.RUnlock() - if conn == nil { + if r.conn == nil { return fmt.Errorf("runtime is not ready") } rsp, err = r.conn.Call(ctx, body) diff --git a/go/runtime/registry/host.go b/go/runtime/registry/host.go index b372886e667..558ac852c5a 100644 --- a/go/runtime/registry/host.go +++ b/go/runtime/registry/host.go @@ -18,6 +18,7 @@ import ( "github.com/oasisprotocol/oasis-core/go/roothash/api/block" "github.com/oasisprotocol/oasis-core/go/runtime/host" "github.com/oasisprotocol/oasis-core/go/runtime/host/protocol" + "github.com/oasisprotocol/oasis-core/go/runtime/txpool" storage "github.com/oasisprotocol/oasis-core/go/storage/api" "github.com/oasisprotocol/oasis-core/go/storage/mkvs/syncer" ) @@ -117,6 +118,9 @@ type RuntimeHostHandlerEnvironment interface { // GetKeyManagerClient returns the key manager client for this runtime. GetKeyManagerClient(ctx context.Context) (keymanagerClientApi.Client, error) + + // GetTxPool returns the transaction pool for this runtime. + GetTxPool(ctx context.Context) (txpool.TransactionPool, error) } // RuntimeHostHandler is a runtime host handler suitable for compute runtimes. It provides the @@ -207,6 +211,23 @@ func (h *runtimeHostHandler) Handle(ctx context.Context, body *protocol.Body) (* Block: *lb, }}, nil } + // Transaction pool. + if rq := body.HostFetchTxBatchRequest; rq != nil { + txPool, err := h.env.GetTxPool(ctx) + if err != nil { + return nil, err + } + + batch := txPool.GetPrioritizedBatch(rq.Offset, rq.Limit) + raw := make([][]byte, 0, len(batch)) + for _, tx := range batch { + raw = append(raw, tx.Raw()) + } + + return &protocol.Body{HostFetchTxBatchResponse: &protocol.HostFetchTxBatchResponse{ + Batch: raw, + }}, nil + } return nil, errMethodNotSupported } @@ -262,7 +283,6 @@ func (n *runtimeHostNotifier) watchPolicyUpdates() { n.logger.Debug("no key manager needed for this runtime") continue } - // GetStatus(context.Context, *registry.NamespaceQuery) (*Status, error) var err error st, err = n.consensus.KeyManager().GetStatus(n.ctx, ®istry.NamespaceQuery{ diff --git a/go/runtime/scheduling/api/api.go b/go/runtime/scheduling/api/api.go index dbbbc850351..6afa630c6d7 100644 --- a/go/runtime/scheduling/api/api.go +++ b/go/runtime/scheduling/api/api.go @@ -20,6 +20,14 @@ type Scheduler interface { // GetBatch returns a batch of scheduled transactions (if any is available). GetBatch(force bool) []*transaction.CheckedTransaction + // GetPrioritizedBatch returns a batch of transactions ordered by priority but without taking + // any weight limits into account. + // + // Offset specifies the transaction hash that should serve as an offset when returning + // transactions from the pool. Transactions will be skipped until the given hash is encountered + // and only following transactions will be returned. + GetPrioritizedBatch(offset *hash.Hash, limit uint32) []*transaction.CheckedTransaction + // GetKnownBatch gets a set of known transactions from the transaction pool. // // For any missing transactions nil will be returned in their place and the map of missing diff --git a/go/runtime/scheduling/simple/simple.go b/go/runtime/scheduling/simple/simple.go index 472e21b07e6..75b7796b1de 100644 --- a/go/runtime/scheduling/simple/simple.go +++ b/go/runtime/scheduling/simple/simple.go @@ -49,6 +49,10 @@ func (s *scheduler) GetBatch(force bool) []*transaction.CheckedTransaction { return s.txPool.GetBatch(force) } +func (s *scheduler) GetPrioritizedBatch(offset *hash.Hash, limit uint32) []*transaction.CheckedTransaction { + return s.txPool.GetPrioritizedBatch(offset, limit) +} + func (s *scheduler) GetKnownBatch(batch []hash.Hash) ([]*transaction.CheckedTransaction, map[hash.Hash]int) { return s.txPool.GetKnownBatch(batch) } diff --git a/go/runtime/scheduling/simple/txpool/api/api.go b/go/runtime/scheduling/simple/txpool/api/api.go index ace93b9ad05..ae6a8220bfc 100644 --- a/go/runtime/scheduling/simple/txpool/api/api.go +++ b/go/runtime/scheduling/simple/txpool/api/api.go @@ -33,6 +33,14 @@ type TxPool interface { // GetBatch gets a transaction batch from the transaction pool. GetBatch(force bool) []*transaction.CheckedTransaction + // GetPrioritizedBatch returns a batch of transactions ordered by priority but without taking + // any weight limits into account. + // + // Offset specifies the transaction hash that should serve as an offset when returning + // transactions from the pool. Transactions will be skipped until the given hash is encountered + // and only following transactions will be returned. + GetPrioritizedBatch(offset *hash.Hash, limit uint32) []*transaction.CheckedTransaction + // GetKnownBatch gets a set of known transactions from the transaction pool. // // For any missing transactions nil will be returned in their place and the map of missing diff --git a/go/runtime/scheduling/simple/txpool/priorityqueue/priority_queue.go b/go/runtime/scheduling/simple/txpool/priorityqueue/priority_queue.go index 9e6dca1ccaa..f38711710bf 100644 --- a/go/runtime/scheduling/simple/txpool/priorityqueue/priority_queue.go +++ b/go/runtime/scheduling/simple/txpool/priorityqueue/priority_queue.go @@ -172,6 +172,11 @@ func (q *priorityQueue) GetBatch(force bool) []*transaction.CheckedTransaction { func (q *priorityQueue) removeTxsLocked(items []*item) { for _, item := range items { + // Skip already removed items to avoid corrupting the list in case of duplicates. + if _, exists := q.transactions[item.tx.Hash()]; !exists { + continue + } + delete(q.transactions, item.tx.Hash()) q.priorityIndex.Delete(item) for k, v := range item.tx.Weights() { @@ -196,6 +201,57 @@ func (q *priorityQueue) removeTxsLocked(items []*item) { } } +// Implements api.TxPool. +func (q *priorityQueue) GetPrioritizedBatch(offset *hash.Hash, limit uint32) []*transaction.CheckedTransaction { + q.Lock() + defer q.Unlock() + + var ( + batch []*transaction.CheckedTransaction + toRemove []*item + offsetItem btree.Item + ) + if offset != nil { + var exists bool + offsetItem, exists = q.transactions[*offset] + if !exists { + // Offset does not exist so no items will be matched anyway. + return nil + } + } + q.priorityIndex.DescendLessOrEqual(offsetItem, func(i btree.Item) bool { + item := i.(*item) + + for w, l := range q.weightLimits { + txW := item.tx.Weight(w) + // Transaction weight greater than the limit. Drop the tx from the pool. + if txW > l { + toRemove = append(toRemove, item) + return true + } + } + + // Skip the offset item itself (if specified). + if txHash := item.tx.Hash(); txHash.Equal(offset) { + return true + } + + // Add the tx to the batch. + batch = append(batch, item.tx) + if uint32(len(batch)) >= limit { //nolint: gosimple + return false + } + return true + }) + + // Remove transactions discovered to be too big to even fit the batch. + // This can happen if weight limits changed after the transaction was + // already set to be scheduled. + q.removeTxsLocked(toRemove) + + return batch +} + // Implements api.TxPool. func (q *priorityQueue) GetKnownBatch(batch []hash.Hash) ([]*transaction.CheckedTransaction, map[hash.Hash]int) { q.Lock() diff --git a/go/runtime/scheduling/simple/txpool/tester.go b/go/runtime/scheduling/simple/txpool/tester.go index 333054975b1..fad79bb4c71 100644 --- a/go/runtime/scheduling/simple/txpool/tester.go +++ b/go/runtime/scheduling/simple/txpool/tester.go @@ -87,9 +87,10 @@ func testBasic(t *testing.T, pool api.TxPool) { require.EqualValues(t, 10, len(batch), "Batch size") require.EqualValues(t, 51, pool.Size(), "Size") - hashes := make([]hash.Hash, len(batch)) - for i, tx := range batch { - hashes[i] = tx.Hash() + hashes := make([]hash.Hash, 0, len(batch)) + for _, tx := range batch { + hashes = append(hashes, tx.Hash()) + hashes = append(hashes, tx.Hash()) // Duplicate to ensure this is handled correctly. } pool.RemoveBatch(hashes) require.EqualValues(t, 41, pool.Size(), "Size") @@ -115,6 +116,10 @@ func testGetBatch(t *testing.T, pool api.TxPool) { require.EqualValues(t, 1, len(batch), "Batch size") require.EqualValues(t, 1, pool.Size(), "Size") + batch = pool.GetPrioritizedBatch(nil, 10) + require.EqualValues(t, 1, len(batch), "Batch size") + require.EqualValues(t, 1, pool.Size(), "Size") + hashes := make([]hash.Hash, len(batch)) for i, tx := range batch { hashes[i] = tx.Hash() @@ -344,6 +349,35 @@ func testPriority(t *testing.T, pool api.TxPool) { "elements should be returned by priority", ) + batch = pool.GetPrioritizedBatch(nil, 2) + require.Len(t, batch, 2, "two transactions should be returned") + require.EqualValues( + t, + []*transaction.CheckedTransaction{ + txs[2], // 20 + txs[0], // 10 + }, + batch, + "elements should be returned by priority", + ) + + offsetTx := txs[2].Hash() + batch = pool.GetPrioritizedBatch(&offsetTx, 2) + require.Len(t, batch, 2, "two transactions should be returned") + require.EqualValues( + t, + []*transaction.CheckedTransaction{ + txs[0], // 10 + txs[1], // 5 + }, + batch, + "elements should be returned by priority", + ) + + offsetTx.Empty() + batch = pool.GetPrioritizedBatch(&offsetTx, 2) + require.Len(t, batch, 0, "no transactions should be returned on invalid hash") + // When the pool is full, a higher priority transaction should still get queued. highTx := transaction.NewCheckedTransaction( []byte("hello world 6"), diff --git a/go/runtime/txpool/txpool.go b/go/runtime/txpool/txpool.go index ec14c79b489..2c63ff8be6e 100644 --- a/go/runtime/txpool/txpool.go +++ b/go/runtime/txpool/txpool.go @@ -75,6 +75,14 @@ type TransactionPool interface { // GetScheduledBatch returns a batch of transactions ready for scheduling. GetScheduledBatch(force bool) []*transaction.CheckedTransaction + // GetPrioritizedBatch returns a batch of transactions ordered by priority but without taking + // any weight limits into account. + // + // Offset specifies the transaction hash that should serve as an offset when returning + // transactions from the pool. Transactions will be skipped until the given hash is encountered + // and only following transactions will be returned. + GetPrioritizedBatch(offset *hash.Hash, limit uint32) []*transaction.CheckedTransaction + // GetKnownBatch gets a set of known transactions from the transaction pool. // // For any missing transactions nil will be returned in their place and the map of missing @@ -274,6 +282,13 @@ func (t *txPool) GetScheduledBatch(force bool) []*transaction.CheckedTransaction return t.scheduler.GetBatch(force) } +func (t *txPool) GetPrioritizedBatch(offset *hash.Hash, limit uint32) []*transaction.CheckedTransaction { + t.schedulerLock.Lock() + defer t.schedulerLock.Unlock() + + return t.scheduler.GetPrioritizedBatch(offset, limit) +} + func (t *txPool) GetKnownBatch(batch []hash.Hash) ([]*transaction.CheckedTransaction, map[hash.Hash]int) { t.schedulerLock.Lock() defer t.schedulerLock.Unlock() diff --git a/go/worker/common/committee/runtime_host.go b/go/worker/common/committee/runtime_host.go index d0193f55dad..f62efe4cfd0 100644 --- a/go/worker/common/committee/runtime_host.go +++ b/go/worker/common/committee/runtime_host.go @@ -8,6 +8,7 @@ import ( "github.com/oasisprotocol/oasis-core/go/runtime/host" "github.com/oasisprotocol/oasis-core/go/runtime/host/protocol" runtimeRegistry "github.com/oasisprotocol/oasis-core/go/runtime/registry" + "github.com/oasisprotocol/oasis-core/go/runtime/txpool" ) // Implements RuntimeHostHandlerFactory. @@ -38,6 +39,11 @@ func (env *nodeEnvironment) GetKeyManagerClient(ctx context.Context) (keymanager return env.n.KeyManagerClient, nil } +// Implements RuntimeHostHandlerEnvironment. +func (env *nodeEnvironment) GetTxPool(ctx context.Context) (txpool.TransactionPool, error) { + return env.n.TxPool, nil +} + // Implements RuntimeHostHandlerFactory. func (n *Node) NewRuntimeHostHandler() protocol.Handler { return runtimeRegistry.NewRuntimeHostHandler(&nodeEnvironment{n}, n.Runtime, n.Consensus) diff --git a/go/worker/compute/executor/committee/batch.go b/go/worker/compute/executor/committee/batch.go index 2b09ef6471e..20514f0b989 100644 --- a/go/worker/compute/executor/committee/batch.go +++ b/go/worker/compute/executor/committee/batch.go @@ -20,10 +20,18 @@ type unresolvedBatch struct { } func (ub *unresolvedBatch) String() string { - return fmt.Sprintf("UnresolvedBatch{hash: %s}", ub.proposal.Header.BatchHash) + switch { + case ub.proposal != nil: + return fmt.Sprintf("UnresolvedBatch{hash: %s}", ub.proposal.Header.BatchHash) + default: + return "UnresolvedBatch{?}" + } } func (ub *unresolvedBatch) hash() hash.Hash { + if ub.proposal == nil { + return hash.Hash{} + } return ub.proposal.Header.BatchHash } @@ -31,6 +39,9 @@ func (ub *unresolvedBatch) resolve(txPool txpool.TransactionPool) (transaction.R if ub.batch != nil { return ub.batch, nil } + if ub.proposal == nil { + return nil, fmt.Errorf("resolve called on unresolvable batch") + } if len(ub.proposal.Batch) == 0 { return transaction.RawBatch{}, nil } diff --git a/go/worker/compute/executor/committee/node.go b/go/worker/compute/executor/committee/node.go index 1c554169ea0..7a0ced41758 100644 --- a/go/worker/compute/executor/committee/node.go +++ b/go/worker/compute/executor/committee/node.go @@ -374,7 +374,7 @@ func (n *Node) HandleNewBlockLocked(blk *block.Block) { "header_io_root", header.IORoot, "proposed_io_root", state.proposedIORoot, "header_type", header.HeaderType, - "batch_size", len(state.raw), + "batch_size", len(state.txHashes), ) return } @@ -383,16 +383,11 @@ func (n *Node) HandleNewBlockLocked(blk *block.Block) { batchProcessingTime.With(n.getMetricLabels()).Observe(time.Since(state.batchStartTime).Seconds()) n.logger.Debug("removing processed batch from queue", - "batch_size", len(state.raw), + "batch_size", len(state.txHashes), "io_root", header.IORoot, ) - // Removed processed transactions from queue. - if err := n.removeTxBatch(state.raw); err != nil { - n.logger.Warn("failed removing processed batch from queue", - "err", err, - "batch_size", len(state.raw), - ) - } + // Remove processed transactions from queue. + n.commonNode.TxPool.RemoveTxBatch(state.txHashes) }() } @@ -437,19 +432,6 @@ func (n *Node) handleNewCheckedTransactions(txs []*transaction.CheckedTransactio } } -// removeTxBatch removes a batch from scheduling queue. -func (n *Node) removeTxBatch(batch transaction.RawBatch) error { - hashes := make([]hash.Hash, len(batch)) - for i, b := range batch { - hashes[i] = hash.NewFromBytes(b) - } - - // Remove transactions from the transaction pool. - n.commonNode.TxPool.RemoveTxBatch(hashes) - - return nil -} - func (n *Node) updateBatchWeightLimits(ctx context.Context, blk *block.Block, lb *consensus.LightBlock, epoch beacon.EpochTime) error { n.limitsLastUpdateLock.Lock() defer n.limitsLastUpdateLock.Unlock() @@ -603,7 +585,7 @@ func (n *Node) getRtStateAndRoundResults(ctx context.Context, height int64) (*ro return state, roundResults, nil } -func (n *Node) handleScheduleBatch(force bool) { +func (n *Node) handleScheduleBatch(force bool) { //nolint: gocyclo roundCtx, epoch, rtState, roundResults, blk, lb, err := func() ( context.Context, *committee.EpochSnapshot, @@ -675,9 +657,32 @@ func (n *Node) handleScheduleBatch(force bool) { return } + // Check what the runtime supports. + rt := n.commonNode.GetHostedRuntime() + if rt == nil { + n.logger.Debug("not scheduling a batch as the runtime is not yet ready") + return + } + rtInfo, err := rt.GetInfo(roundCtx) + if err != nil { + n.logger.Warn("not scheduling a batch as the runtime is broken", + "err", err, + ) + return + } + + var batch []*transaction.CheckedTransaction + switch { + case epoch.IsTransactionScheduler(blk.Header.Round) && rtInfo.Features.HasScheduleControl(): + // The runtime supports schedule control and we are the scheduler in this round. + batch = n.commonNode.TxPool.GetPrioritizedBatch(nil, rtInfo.Features.ScheduleControl.InitialBatchSize) + default: + // Just ask the scheduler for a batch of transactions. + batch = n.commonNode.TxPool.GetScheduledBatch(force) + } + // Ask the scheduler to get a batch of transactions for us and see if we should be proposing // a new batch to other nodes. - batch := n.commonNode.TxPool.GetScheduledBatch(force) switch { case len(batch) > 0: // We have some transactions, schedule batch. @@ -734,7 +739,16 @@ func (n *Node) handleScheduleBatch(force bool) { return } - n.logger.Debug("scheduling a batch", + // If the runtime supports schedule control we ask the runtime to schedule the batch and then + // propose that. + if rtInfo.Features.HasScheduleControl() { + n.startRuntimeBatchSchedulingLocked(rtState, roundResults, rt, batch) + return + } + + // Runtime does not support scheduling control, do our own scheduling. + + n.logger.Debug("runtime does not support scheduling control, scheduling batch", "batch_size", len(batch), "round_results", roundResults, ) @@ -775,40 +789,13 @@ func (n *Node) handleScheduleBatch(force bool) { } // Commit I/O tree to local storage. - - err = n.storage.Apply(roundCtx, &storage.ApplyRequest{ - Namespace: blk.Header.Namespace, - RootType: storage.RootTypeIO, - SrcRound: blk.Header.Round + 1, - SrcRoot: emptyRoot.Hash, - DstRound: blk.Header.Round + 1, - DstRoot: ioRoot, - WriteLog: ioWriteLog, - }) - if err != nil { + if err = n.schedulerStoreTransactions(roundCtx, blk, ioWriteLog, ioRoot); err != nil { n.logger.Error("failed to commit I/O tree to storage", "err", err, ) return } - // Create new proposal. - proposal := &commitment.Proposal{ - NodeID: n.commonNode.Identity.NodeSigner.Public(), - Header: commitment.ProposalHeader{ - Round: blk.Header.Round + 1, - PreviousHash: blk.Header.EncodedHash(), - BatchHash: ioRoot, - }, - Batch: txHashes, - } - if err = proposal.Sign(n.commonNode.Identity.NodeSigner, blk.Header.Namespace); err != nil { - n.logger.Error("failed to sign proposal header", - "err", err, - ) - return - } - n.commonNode.CrossNode.Lock() defer n.commonNode.CrossNode.Unlock() @@ -822,22 +809,128 @@ func (n *Node) handleScheduleBatch(force bool) { return } + proposal, err := n.schedulerCreateProposalLocked(roundCtx, ioRoot, txHashes) + if err != nil { + n.logger.Error("failed to create proposal", + "err", err, + ) + return + } + + // Also process the batch locally. + n.maybeStartProcessingBatchLocked(&unresolvedBatch{ + proposal: proposal, + batch: rawBatch, + }) +} + +func (n *Node) schedulerStoreTransactions(ctx context.Context, blk *block.Block, inputWriteLog storage.WriteLog, inputRoot hash.Hash) error { + var emptyRoot hash.Hash + emptyRoot.Empty() + + return n.storage.Apply(ctx, &storage.ApplyRequest{ + Namespace: blk.Header.Namespace, + RootType: storage.RootTypeIO, + SrcRound: blk.Header.Round + 1, + SrcRoot: emptyRoot, + DstRound: blk.Header.Round + 1, + DstRoot: inputRoot, + WriteLog: inputWriteLog, + }) +} + +func (n *Node) schedulerCreateProposalLocked(ctx context.Context, inputRoot hash.Hash, txHashes []hash.Hash) (*commitment.Proposal, error) { + blk := n.commonNode.CurrentBlock + + // Create new proposal. + proposal := &commitment.Proposal{ + NodeID: n.commonNode.Identity.NodeSigner.Public(), + Header: commitment.ProposalHeader{ + Round: blk.Header.Round + 1, + PreviousHash: blk.Header.EncodedHash(), + BatchHash: inputRoot, + }, + Batch: txHashes, + } + if err := proposal.Sign(n.commonNode.Identity.NodeSigner, blk.Header.Namespace); err != nil { + return nil, fmt.Errorf("failed to sign proposal header: %w", err) + } + n.logger.Debug("dispatching a new batch proposal", - "io_root", ioRoot, - "batch_size", len(batch), + "input_root", inputRoot, + "batch_size", len(txHashes), ) - n.commonNode.P2P.PublishCommittee(roundCtx, n.commonNode.Runtime.ID(), &p2p.CommitteeMessage{ + n.commonNode.P2P.PublishCommittee(ctx, n.commonNode.Runtime.ID(), &p2p.CommitteeMessage{ Epoch: n.commonNode.CurrentEpoch, Proposal: proposal, }) crash.Here(crashPointBatchPublishAfter) + return proposal, nil +} - // Also process the batch locally. - n.maybeStartProcessingBatchLocked(&unresolvedBatch{ - proposal: proposal, - batch: rawBatch, - }) +func (n *Node) startRuntimeBatchSchedulingLocked( + rtState *roothash.RuntimeState, + roundResults *roothash.RoundResults, + rt host.RichRuntime, + batch []*transaction.CheckedTransaction, +) { + n.logger.Debug("asking runtime to schedule batch", + "initial_batch_size", len(batch), + ) + + // Create batch processing context and channel for receiving the response. + ctx, cancel := context.WithCancel(n.roundCtx) + done := make(chan *processedBatch, 1) + + batchStartTime := time.Now() + n.transitionLocked(StateProcessingBatch{&unresolvedBatch{}, batchStartTime, cancel, done, protocol.ExecutionModeSchedule}) + + // Request the worker host to process a batch. This is done in a separate + // goroutine so that the committee node can continue processing blocks. + blk := n.commonNode.CurrentBlock + consensusBlk := n.commonNode.CurrentConsensusBlock + epoch := n.commonNode.CurrentEpoch + + go func() { + defer close(done) + + initialBatch := make([][]byte, 0, len(batch)) + for _, tx := range batch { + initialBatch = append(initialBatch, tx.Raw()) + } + + // Ask the runtime to execute the batch. + rsp, err := n.runtimeExecuteTxBatch( + ctx, + rt, + protocol.ExecutionModeSchedule, + epoch, + consensusBlk, + blk, + rtState, + roundResults, + hash.Hash{}, // IORoot is ignored as it is yet to be determined. + initialBatch, + ) + if err != nil { + n.logger.Error("runtime batch execution failed", + "err", err, + ) + return + } + + // Remove any rejected transactions. + n.commonNode.TxPool.RemoveTxBatch(rsp.TxRejectHashes) + + // Submit response to the executor worker. + done <- &processedBatch{ + computed: &rsp.Batch, + txHashes: rsp.TxHashes, + txInputRoot: rsp.TxInputRoot, + txInputWriteLog: rsp.TxInputWriteLog, + } + }() } // Guarded by n.commonNode.CrossNode. @@ -974,6 +1067,95 @@ func (n *Node) startLocalStorageReplication( return ch } +func (n *Node) runtimeExecuteTxBatch( + ctx context.Context, + rt host.RichRuntime, + mode protocol.ExecutionMode, + epoch beacon.EpochTime, + consensusBlk *consensus.LightBlock, + blk *block.Block, + state *roothash.RuntimeState, + roundResults *roothash.RoundResults, + inputRoot hash.Hash, + inputs transaction.RawBatch, +) (*protocol.RuntimeExecuteTxBatchResponse, error) { + // Fetch any incoming messages. + inMsgs, err := n.commonNode.Consensus.RootHash().GetIncomingMessageQueue(ctx, &roothash.InMessageQueueRequest{ + RuntimeID: n.commonNode.Runtime.ID(), + Height: consensusBlk.Height, + }) + if err != nil { + n.logger.Error("failed to fetch incoming runtime message queue metadata", + "err", err, + ) + return nil, err + } + + rq := &protocol.Body{ + RuntimeExecuteTxBatchRequest: &protocol.RuntimeExecuteTxBatchRequest{ + Mode: mode, + ConsensusBlock: *consensusBlk, + RoundResults: roundResults, + IORoot: inputRoot, + Inputs: inputs, + InMessages: inMsgs, + Block: *blk, + Epoch: epoch, + MaxMessages: state.Runtime.Executor.MaxMessages, + }, + } + batchSize.With(n.getMetricLabels()).Observe(float64(len(inputs))) + + rtStartTime := time.Now() + defer func() { + batchRuntimeProcessingTime.With(n.getMetricLabels()).Observe(time.Since(rtStartTime).Seconds()) + }() + + rsp, err := rt.Call(ctx, rq) + switch { + case err == nil: + case errors.Is(err, context.Canceled): + // Context was canceled while the runtime was processing a request. + n.logger.Error("batch processing aborted by context, restarting runtime") + + // Abort the runtime, so we can start processing the next batch. + abortCtx, cancel := context.WithTimeout(n.ctx, abortTimeout) + defer cancel() + + if err = rt.Abort(abortCtx, false); err != nil { + n.logger.Error("failed to abort the runtime", + "err", err, + ) + } + return nil, fmt.Errorf("batch processing aborted by context") + default: + n.logger.Error("error while sending batch processing request to runtime", + "err", err, + ) + return nil, err + } + crash.Here(crashPointBatchProcessStartAfter) + + if rsp.RuntimeExecuteTxBatchResponse == nil { + n.logger.Error("malformed response from runtime", + "response", rsp, + ) + return nil, fmt.Errorf("malformed response from runtime") + } + + // Update round batch weight limits. + n.limitsLastUpdateLock.Lock() + if err = n.commonNode.TxPool.UpdateWeightLimits(rsp.RuntimeExecuteTxBatchResponse.BatchWeightLimits); err != nil { + n.logger.Error("failed updating batch weight limits", + "err", err, + ) + } + n.limitsLastUpdate = blk.Header.Round + 1 + n.limitsLastUpdateLock.Unlock() + + return rsp.RuntimeExecuteTxBatchResponse, nil +} + // Guarded by n.commonNode.CrossNode. func (n *Node) startProcessingBatchLocked(batch *unresolvedBatch) { if n.commonNode.CurrentBlock == nil { @@ -1007,7 +1189,7 @@ func (n *Node) startProcessingBatchLocked(batch *unresolvedBatch) { done := make(chan *processedBatch, 1) batchStartTime := time.Now() - n.transitionLocked(StateProcessingBatch{batch, batchStartTime, cancel, done}) + n.transitionLocked(StateProcessingBatch{batch, batchStartTime, cancel, done, protocol.ExecutionModeExecute}) rt := n.commonNode.GetHostedRuntime() if rt == nil { @@ -1041,78 +1223,25 @@ func (n *Node) startProcessingBatchLocked(batch *unresolvedBatch) { // Optionally start local storage replication in parallel to batch dispatch. replicateCh := n.startLocalStorageReplication(ctx, blk, batch.hash(), resolvedBatch) - // Fetch any incoming messages. - inMsgs, err := n.commonNode.Consensus.RootHash().GetIncomingMessageQueue(ctx, &roothash.InMessageQueueRequest{ - RuntimeID: n.commonNode.Runtime.ID(), - Height: height, - }) + // Ask the runtime to execute the batch. + rsp, err := n.runtimeExecuteTxBatch( + ctx, + rt, + protocol.ExecutionModeExecute, + epoch, + consensusBlk, + blk, + state, + roundResults, + batch.hash(), + resolvedBatch, + ) if err != nil { - n.logger.Error("failed to fetch incoming runtime message queue metadata", - "err", err, - ) - return - } - - rq := &protocol.Body{ - RuntimeExecuteTxBatchRequest: &protocol.RuntimeExecuteTxBatchRequest{ - ConsensusBlock: *consensusBlk, - RoundResults: roundResults, - IORoot: batch.hash(), - Inputs: resolvedBatch, - InMessages: inMsgs, - Block: *blk, - Epoch: epoch, - MaxMessages: state.Runtime.Executor.MaxMessages, - }, - } - batchSize.With(n.getMetricLabels()).Observe(float64(len(resolvedBatch))) - - rtStartTime := time.Now() - defer func() { - batchRuntimeProcessingTime.With(n.getMetricLabels()).Observe(time.Since(rtStartTime).Seconds()) - }() - - rsp, err := rt.Call(ctx, rq) - switch { - case err == nil: - case errors.Is(err, context.Canceled): - // Context was canceled while the runtime was processing a request. - n.logger.Error("batch processing aborted by context, restarting runtime") - - // Abort the runtime, so we can start processing the next batch. - abortCtx, cancel := context.WithTimeout(n.ctx, abortTimeout) - defer cancel() - - if err = rt.Abort(abortCtx, false); err != nil { - n.logger.Error("failed to abort the runtime", - "err", err, - ) - } - return - default: - n.logger.Error("error while sending batch processing request to runtime", + n.logger.Error("runtime batch execution failed", "err", err, ) return } - crash.Here(crashPointBatchProcessStartAfter) - - if rsp.RuntimeExecuteTxBatchResponse == nil { - n.logger.Error("malformed response from runtime", - "response", rsp, - ) - return - } - - // Update round batch weight limits. - n.limitsLastUpdateLock.Lock() - if err = n.commonNode.TxPool.UpdateWeightLimits(rsp.RuntimeExecuteTxBatchResponse.BatchWeightLimits); err != nil { - n.logger.Error("failed updating batch weight limits", - "err", err, - ) - } - n.limitsLastUpdate = blk.Header.Round + 1 - n.limitsLastUpdateLock.Unlock() // Wait for replication to complete before proposing a batch to ensure that we can cleanly // apply any updates. @@ -1130,8 +1259,9 @@ func (n *Node) startProcessingBatchLocked(batch *unresolvedBatch) { // Submit response to the executor worker. done <- &processedBatch{ - computed: &rsp.RuntimeExecuteTxBatchResponse.Batch, + computed: &rsp.Batch, raw: resolvedBatch, + txHashes: rsp.TxHashes, } }() } @@ -1195,6 +1325,14 @@ func (n *Node) proposeBatch( ec.Messages = batch.Messages } + var inputRoot hash.Hash + switch { + case unresolved.proposal == nil: + inputRoot = processed.txInputRoot + default: + inputRoot = unresolved.hash() + } + // Commit I/O and state write logs to storage. storageErr := func() error { start := time.Now() @@ -1208,7 +1346,7 @@ func (n *Node) proposeBatch( Namespace: lastHeader.Namespace, RootType: storage.RootTypeIO, SrcRound: lastHeader.Round + 1, - SrcRoot: unresolved.hash(), + SrcRoot: inputRoot, DstRound: lastHeader.Round + 1, DstRoot: *batch.Header.IORoot, WriteLog: batch.IOWriteLog, @@ -1263,12 +1401,21 @@ func (n *Node) proposeBatch( return } + // Due to backwards compatibility with runtimes that don't provide transaction hashes as output + // we need to manually compute them here. + if len(processed.raw) > 0 && len(processed.txHashes) == 0 { + processed.txHashes = make([]hash.Hash, 0, len(processed.raw)) + for _, tx := range processed.raw { + processed.txHashes = append(processed.txHashes, hash.NewFromBytes(tx)) + } + } + switch storageErr { case nil: n.transitionLocked(StateWaitingForFinalize{ batchStartTime: state.batchStartTime, - raw: processed.raw, proposedIORoot: *ec.Header.IORoot, + txHashes: processed.txHashes, }) default: n.abortBatchLocked(storageErr) @@ -1456,8 +1603,34 @@ func (n *Node) handleProcessedBatch(batch *processedBatch, processingCh chan *pr roundCtx := n.roundCtx lastHeader := n.commonNode.CurrentBlock.Header - // Successfully processed a batch. - if batch != nil && batch.computed != nil { + switch { + case batch == nil || batch.computed == nil: + // There was an issue during batch processing. + case state.mode == protocol.ExecutionModeSchedule: + // Scheduling was processed successfully. + n.logger.Info("runtime has finished scheduling a batch", + "input_root", batch.txInputRoot, + "tx_hashes", batch.txHashes, + ) + err := n.schedulerStoreTransactions(roundCtx, n.commonNode.CurrentBlock, batch.txInputWriteLog, batch.txInputRoot) + if err != nil { + n.logger.Error("failed to store transaction", + "err", err, + ) + break + } + + // Create and submit a proposal. + _, err = n.schedulerCreateProposalLocked(roundCtx, batch.txInputRoot, batch.txHashes) + if err != nil { + n.logger.Error("failed to create proposal", + "err", err, + ) + break + } + fallthrough + default: + // Batch was processed successfully. stateBatch := state.batch n.commonNode.CrossNode.Unlock() n.logger.Info("worker has finished processing a batch") diff --git a/go/worker/compute/executor/committee/state.go b/go/worker/compute/executor/committee/state.go index 478dea09df3..242cc2fae24 100644 --- a/go/worker/compute/executor/committee/state.go +++ b/go/worker/compute/executor/committee/state.go @@ -8,6 +8,7 @@ import ( roothash "github.com/oasisprotocol/oasis-core/go/roothash/api" "github.com/oasisprotocol/oasis-core/go/runtime/host/protocol" "github.com/oasisprotocol/oasis-core/go/runtime/transaction" + storage "github.com/oasisprotocol/oasis-core/go/storage/api" ) // StateName is a symbolic state without the attached values. @@ -198,11 +199,17 @@ type StateProcessingBatch struct { cancelFn context.CancelFunc // Channel which will provide the result. done chan *processedBatch + // Execution mode. + mode protocol.ExecutionMode } type processedBatch struct { computed *protocol.ComputedBatch raw transaction.RawBatch + + txHashes []hash.Hash + txInputRoot hash.Hash + txInputWriteLog storage.WriteLog } // Name returns the name of the state. @@ -225,8 +232,8 @@ func (s *StateProcessingBatch) cancel() { // StateWaitingForFinalize is the waiting for finalize state. type StateWaitingForFinalize struct { batchStartTime time.Time - raw transaction.RawBatch proposedIORoot hash.Hash + txHashes []hash.Hash } // Name returns the name of the state. diff --git a/runtime/src/config.rs b/runtime/src/config.rs index 537c8cd79c5..816cc92819d 100644 --- a/runtime/src/config.rs +++ b/runtime/src/config.rs @@ -1,5 +1,5 @@ //! Runtime configuration. -use crate::{common::version::Version, consensus::verifier::TrustRoot}; +use crate::{common::version::Version, consensus::verifier::TrustRoot, types::Features}; /// Global runtime configuration. #[derive(Clone, Debug, Default)] @@ -10,6 +10,8 @@ pub struct Config { pub trust_root: Option, /// Storage configuration. pub storage: Storage, + /// Advertised runtime features. + pub features: Option, } /// Storage-related configuration. diff --git a/runtime/src/dispatcher.rs b/runtime/src/dispatcher.rs index 5b08840601a..79e450113a3 100644 --- a/runtime/src/dispatcher.rs +++ b/runtime/src/dispatcher.rs @@ -44,7 +44,7 @@ use crate::{ types::TxnBatch, Context as TxnContext, }, - types::{Body, ComputedBatch, Error}, + types::{Body, ComputedBatch, Error, ExecutionMode}, }; /// Maximum amount of requests that can be in the dispatcher queue. @@ -111,6 +111,7 @@ impl From for Error { /// State related to dispatching a runtime transaction. struct TxDispatchState { + mode: ExecutionMode, consensus_block: LightBlock, consensus_verifier: Arc, header: Header, @@ -320,6 +321,7 @@ impl Dispatcher { .await } Body::RuntimeExecuteTxBatchRequest { + mode, consensus_block, round_results, io_root, @@ -339,6 +341,7 @@ impl Dispatcher { inputs.unwrap_or_default(), in_msgs, TxDispatchState { + mode, consensus_block, consensus_verifier: state.consensus_verifier, header: block.header, @@ -367,6 +370,7 @@ impl Dispatcher { inputs, vec![], TxDispatchState { + mode: ExecutionMode::Execute, consensus_block, consensus_verifier: state.consensus_verifier, header: block.header, @@ -395,6 +399,7 @@ impl Dispatcher { method, args, TxDispatchState { + mode: ExecutionMode::Execute, consensus_block, consensus_verifier: state.consensus_verifier, header, @@ -567,7 +572,18 @@ impl Dispatcher { state.max_messages, state.check_only, ); - let mut results = txn_dispatcher.execute_batch(txn_ctx, &inputs, &in_msgs)?; + + // Perform execution based on the passed mode. + let mut results = match state.mode { + ExecutionMode::Execute => { + // Just execute the batch. + txn_dispatcher.execute_batch(txn_ctx, &inputs, &in_msgs)? + } + ExecutionMode::Schedule => { + // Allow the runtime to arbitrarily update the batch. + txn_dispatcher.schedule_and_execute_batch(txn_ctx, &mut inputs, &in_msgs)? + } + }; // Finalize state. let (state_write_log, new_state_root) = overlay @@ -605,21 +621,21 @@ impl Dispatcher { .expect("add transaction must succeed"); } - let (_, old_io_root) = txn_tree + let (input_write_log, input_io_root) = txn_tree .commit(Context::create_child(&ctx)) .expect("io commit must succeed"); - if old_io_root != io_root { + if state.mode == ExecutionMode::Execute && input_io_root != io_root { panic!( "dispatcher: I/O root inconsistent with inputs (expected: {:?} got: {:?})", - io_root, old_io_root + io_root, input_io_root ); } - for (tx_hash, result) in hashes.drain(..).zip(results.results.drain(..)) { + for (tx_hash, result) in hashes.iter().zip(results.results.drain(..)) { txn_tree .add_output( Context::create_child(&ctx), - tx_hash, + *tx_hash, result.output, result.tags, ) @@ -680,6 +696,10 @@ impl Dispatcher { messages: results.messages, }, batch_weight_limits: results.batch_weight_limits, + tx_hashes: hashes, + tx_reject_hashes: results.tx_reject_hashes, + tx_input_root: input_io_root, + tx_input_write_log: input_write_log, }) } diff --git a/runtime/src/protocol.rs b/runtime/src/protocol.rs index c132e7c62aa..016e3464a81 100644 --- a/runtime/src/protocol.rs +++ b/runtime/src/protocol.rs @@ -464,6 +464,7 @@ impl Protocol { Ok(RuntimeInfoResponse { protocol_version: BUILD_INFO.protocol_version, runtime_version: self.config.version, + features: self.config.features.clone(), }) } diff --git a/runtime/src/transaction/dispatcher.rs b/runtime/src/transaction/dispatcher.rs index ef9a6a10f3f..c10f7f1db32 100644 --- a/runtime/src/transaction/dispatcher.rs +++ b/runtime/src/transaction/dispatcher.rs @@ -24,6 +24,23 @@ pub trait Dispatcher: Send + Sync { in_msgs: &[roothash::IncomingMessage], ) -> Result; + /// Schedule and execute transactions in the given batch. + /// + /// The passed batch is an initial batch. In case the runtime needs additional items it should + /// request them from the host. + fn schedule_and_execute_batch( + &self, + _ctx: Context, + _initial_batch: &mut TxnBatch, + _in_msgs: &[roothash::IncomingMessage], + ) -> Result { + Err(RuntimeError::new( + "rhp/dispatcher", + 3, + "scheduling not supported", + )) + } + /// Check the transactions in the given batch for validity. fn check_batch( &self, @@ -62,6 +79,15 @@ impl Dispatcher for Box { T::execute_batch(&*self, ctx, batch, in_msgs) } + fn schedule_and_execute_batch( + &self, + ctx: Context, + initial_batch: &mut TxnBatch, + in_msgs: &[roothash::IncomingMessage], + ) -> Result { + T::schedule_and_execute_batch(&*self, ctx, initial_batch, in_msgs) + } + fn check_batch( &self, ctx: Context, @@ -93,6 +119,15 @@ impl Dispatcher for Arc { T::execute_batch(&*self, ctx, batch, in_msgs) } + fn schedule_and_execute_batch( + &self, + ctx: Context, + initial_batch: &mut TxnBatch, + in_msgs: &[roothash::IncomingMessage], + ) -> Result { + T::schedule_and_execute_batch(&*self, ctx, initial_batch, in_msgs) + } + fn check_batch( &self, ctx: Context, @@ -135,6 +170,10 @@ pub struct ExecuteBatchResult { /// Batch weight limits valid for next round. This is used as a fast-path, /// to avoid having the transaction scheduler query these on every round. pub batch_weight_limits: Option>, + /// Hashes of transactions to reject. + /// + /// Note that these are only taken into account in schedule execution mode. + pub tx_reject_hashes: Vec, } /// No-op dispatcher. @@ -162,6 +201,23 @@ impl Dispatcher for NoopDispatcher { block_tags: Tags::new(), batch_weight_limits: None, in_msgs_count: in_msgs.len(), + tx_reject_hashes: Vec::new(), + }) + } + + fn schedule_and_execute_batch( + &self, + _ctx: Context, + _initial_batch: &mut TxnBatch, + in_msgs: &[roothash::IncomingMessage], + ) -> Result { + Ok(ExecuteBatchResult { + results: Vec::new(), + messages: Vec::new(), + block_tags: Tags::new(), + batch_weight_limits: None, + in_msgs_count: in_msgs.len(), + tx_reject_hashes: Vec::new(), }) } diff --git a/runtime/src/types.rs b/runtime/src/types.rs index 18f04f66811..d7cbf24619b 100644 --- a/runtime/src/types.rs +++ b/runtime/src/types.rs @@ -155,6 +155,8 @@ pub enum Body { results: Vec, }, RuntimeExecuteTxBatchRequest { + #[cbor(optional, default)] + mode: ExecutionMode, consensus_block: LightBlock, round_results: roothash::RoundResults, io_root: Hash, @@ -170,6 +172,11 @@ pub enum Body { batch: ComputedBatch, #[cbor(optional)] batch_weight_limits: Option>, + + tx_hashes: Vec, + tx_reject_hashes: Vec, + tx_input_root: Hash, + tx_input_write_log: WriteLog, }, RuntimeKeyManagerPolicyUpdateRequest { signed_policy_raw: Vec, @@ -220,6 +227,15 @@ pub enum Body { HostFetchConsensusBlockResponse { block: LightBlock, }, + HostFetchTxBatchRequest { + #[cbor(optional)] + offset: Option, + limit: u32, + }, + HostFetchTxBatchResponse { + #[cbor(optional)] + batch: Option, + }, } /// A serializable error. @@ -274,11 +290,56 @@ pub struct RuntimeInfoRequest { pub local_config: BTreeMap, } +/// Set of supported runtime features. +#[derive(Clone, Debug, Default, cbor::Encode, cbor::Decode)] +pub struct Features { + /// Schedule control feature. + #[cbor(optional, default)] + pub schedule_control: Option, +} + +/// A feature specifying that the runtime supports controlling the scheduling of batches. This means +/// that the scheduler should only take priority into account and ignore weights, leaving it up to +/// the runtime to decide which transactions to include. +#[derive(Clone, Debug, cbor::Encode, cbor::Decode)] +pub struct FeatureScheduleControl { + /// Size of the initial batch of transactions. + pub initial_batch_size: u32, +} + /// Runtime information response. #[derive(Clone, Debug, cbor::Encode, cbor::Decode)] pub struct RuntimeInfoResponse { + /// The runtime protocol version supported by the runtime. pub protocol_version: Version, + + /// The version of the runtime. pub runtime_version: Version, + + /// Describes the features supported by the runtime. + #[cbor(optional, default)] + pub features: Option, +} + +/// Batch execution mode. +#[derive(Clone, Debug, PartialEq, Eq, cbor::Encode, cbor::Decode)] +pub enum ExecutionMode { + /// Execution mode where the batch of transactions is executed as-is without the ability to + /// perform and modifications to the batch. + Execute = 0, + /// Execution mode where the runtime is in control of scheduling and may arbitrarily modify the + /// batch during execution. + /// + /// This execution mode will only be used in case the runtime advertises to support the schedule + /// control feature. In this case the call will only contain up to InitialBatchSize transactions + /// and the runtime will need to request more if it needs more. + Schedule = 1, +} + +impl Default for ExecutionMode { + fn default() -> Self { + Self::Execute + } } /// Result of a CheckTx operation. diff --git a/tests/runtimes/simple-keyvalue/src/main.rs b/tests/runtimes/simple-keyvalue/src/main.rs index a3eb54adc5c..c9b25a28051 100644 --- a/tests/runtimes/simple-keyvalue/src/main.rs +++ b/tests/runtimes/simple-keyvalue/src/main.rs @@ -22,7 +22,7 @@ use oasis_core_runtime::{ types::TxnBatch, Context as TxnContext, }, - types::{CheckTxResult, Error as RuntimeError}, + types::{CheckTxResult, Error as RuntimeError, FeatureScheduleControl, Features}, version_from_cargo, Protocol, RpcDemux, RpcDispatcher, TxnDispatcher, }; use simple_keymanager::trusted_policy_signers; @@ -242,6 +242,45 @@ impl TxnDispatcher for Dispatcher { in_msgs_count: in_msgs.len(), block_tags: vec![], batch_weight_limits: None, + tx_reject_hashes: vec![], + }) + } + + fn schedule_and_execute_batch( + &self, + mut ctx: TxnContext, + batch: &mut TxnBatch, + in_msgs: &[IncomingMessage], + ) -> Result { + let mut ctx = Context { + core: &mut ctx, + host_info: &self.host_info, + key_manager: &self.key_manager, + messages: vec![], + }; + + Self::begin_block(&mut ctx)?; + + // Execute incoming messages. A real implementation should allocate resources for incoming + // messages and only execute as many messages as fits. + for in_msg in in_msgs { + Self::execute_in_msg(&mut ctx, in_msg); + } + + // Execute transactions. + // TODO: Actually do some batch reordering. + let mut results = vec![]; + for tx in batch.iter() { + results.push(Self::execute_tx(&mut ctx, tx)?); + } + + Ok(ExecuteBatchResult { + results, + messages: ctx.messages, + in_msgs_count: in_msgs.len(), + block_tags: vec![], + batch_weight_limits: None, + tx_reject_hashes: vec![], }) } @@ -298,6 +337,12 @@ pub fn main() { Config { version: version_from_cargo!(), trust_root, + features: Some(Features { + // Enable the schedule control feature. + schedule_control: Some(FeatureScheduleControl { + initial_batch_size: 10, + }), + }), ..Default::default() }, );