Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for runtime schedule control #4438

Merged
merged 5 commits into from
Jan 31, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .changelog/4438.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Add support for runtime schedule control

This feature gives runtimes control over scheduling transactions inside a
block, so the runtimes may implement their own more advanced policies.
12 changes: 12 additions & 0 deletions go/oasis-test-runner/scenario/e2e/runtime/keymanager_restart.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,18 @@ func newKmRestartImpl() scenario.Scenario {
}
}

func (sc *kmRestartImpl) Fixture() (*oasis.NetworkFixture, error) {
f, err := sc.runtimeImpl.Fixture()
if err != nil {
return nil, err
}

// The round is allowed to fail until the keymanager becomes available after restart.
f.Network.DefaultLogWatcherHandlerFactories = nil

return f, nil
}

func (sc *kmRestartImpl) Clone() scenario.Scenario {
return &kmRestartImpl{
runtimeImpl: *sc.runtimeImpl.Clone().(*runtimeImpl),
Expand Down
3 changes: 3 additions & 0 deletions go/runtime/host/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ type Runtime interface {
// ID is the runtime identifier.
ID() common.Namespace

// GetInfo retrieves the runtime information.
GetInfo(ctx context.Context) (*protocol.RuntimeInfoResponse, error)

// Call sends a request message to the runtime over the Runtime Host Protocol and waits for the
// response (which may be a failure).
Call(ctx context.Context, body *protocol.Body) (*protocol.Body, error)
Expand Down
10 changes: 10 additions & 0 deletions go/runtime/host/mock/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/oasisprotocol/oasis-core/go/common/crypto/hash"
"github.com/oasisprotocol/oasis-core/go/common/errors"
"github.com/oasisprotocol/oasis-core/go/common/pubsub"
"github.com/oasisprotocol/oasis-core/go/common/version"
"github.com/oasisprotocol/oasis-core/go/roothash/api/commitment"
"github.com/oasisprotocol/oasis-core/go/runtime/host"
"github.com/oasisprotocol/oasis-core/go/runtime/host/protocol"
Expand Down Expand Up @@ -43,6 +44,15 @@ func (r *runtime) ID() common.Namespace {
return r.runtimeID
}

// Implements host.Runtime.
func (r *runtime) GetInfo(ctx context.Context) (rsp *protocol.RuntimeInfoResponse, err error) {
return &protocol.RuntimeInfoResponse{
ProtocolVersion: version.RuntimeHostProtocol,
RuntimeVersion: version.MustFromString("0.0.0"),
Features: nil,
}, nil
}

// Implements host.Runtime.
func (r *runtime) Call(ctx context.Context, body *protocol.Body) (*protocol.Body, error) {
switch {
Expand Down
17 changes: 17 additions & 0 deletions go/runtime/host/protocol/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ type Connection interface {
// Close closes the connection.
Close()

// GetInfo retrieves the runtime information.
GetInfo(ctx context.Context) (*RuntimeInfoResponse, error)

// Call sends a request to the other side and returns the response or error.
Call(ctx context.Context, body *Body) (*Body, error)

Expand Down Expand Up @@ -200,6 +203,8 @@ type connection struct { // nolint: maligned
pendingRequests map[uint64]chan *Body
nextRequestID uint64

info *RuntimeInfoResponse

outCh chan *Message
closeCh chan struct{}
quitWg sync.WaitGroup
Expand Down Expand Up @@ -254,6 +259,17 @@ func (c *connection) Close() {
c.quitWg.Wait()
}

// Implements Connection.
func (c *connection) GetInfo(ctx context.Context) (*RuntimeInfoResponse, error) {
c.Lock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't this just use defer?

defer c.Unlock()

if c.info == nil {
return nil, ErrNotReady
}
return c.info, nil
}

// Implements Connection.
func (c *connection) Call(ctx context.Context, body *Body) (*Body, error) {
if c.getState() != stateReady {
Expand Down Expand Up @@ -551,6 +567,7 @@ func (c *connection) InitHost(ctx context.Context, conn net.Conn, hi *HostInfo)
// Transition the protocol state to Ready.
c.Lock()
c.setStateLocked(stateReady)
c.info = info
c.Unlock()

return &rtVersion, nil
Expand Down
6 changes: 6 additions & 0 deletions go/runtime/host/protocol/connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,12 @@ func TestEchoRequestResponse(t *testing.T) {
require.Panics(func() { _ = protoA.InitGuest(context.Background(), connA) }, "connection reinit should panic")
require.Panics(func() { _, _ = protoB.InitHost(context.Background(), connB, &HostInfo{}) }, "connection reinit should panic")
require.Panics(func() { _ = protoB.InitGuest(context.Background(), connB) }, "connection reinit should panic")

_, err = protoA.GetInfo(context.Background())
require.Error(err, "GetInfo should fail for guest connections")
info, err := protoB.GetInfo(context.Background())
require.NoError(err, "GetInfo should succeed for host connections")
require.EqualValues(version.RuntimeHostProtocol, info.ProtocolVersion)
}

func TestBigMessage(t *testing.T) {
Expand Down
73 changes: 71 additions & 2 deletions go/runtime/host/protocol/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ type Body struct {
HostLocalStorageSetResponse *Empty `json:",omitempty"`
HostFetchConsensusBlockRequest *HostFetchConsensusBlockRequest `json:",omitempty"`
HostFetchConsensusBlockResponse *HostFetchConsensusBlockResponse `json:",omitempty"`
HostFetchTxBatchRequest *HostFetchTxBatchRequest `json:",omitempty"`
HostFetchTxBatchResponse *HostFetchTxBatchResponse `json:",omitempty"`
}

// Type returns the message type by determining the name of the first non-nil member.
Expand Down Expand Up @@ -151,13 +153,35 @@ type RuntimeInfoRequest struct {
LocalConfig map[string]interface{} `json:"local_config,omitempty"`
}

// RuntimeInfoResponse is a worker info response message body.
// Features is a set of supported runtime features.
type Features struct {
// ScheduleControl is the schedule control feature.
ScheduleControl *FeatureScheduleControl `json:"schedule_control,omitempty"`
}

// HasScheduleControl returns true when the runtime supports the schedule control feature.
func (f *Features) HasScheduleControl() bool {
return f != nil && f.ScheduleControl != nil
}

// FeatureScheduleControl is a feature specifying that the runtime supports controlling the
// scheduling of batches. This means that the scheduler should only take priority into account and
// ignore weights, leaving it up to the runtime to decide which transactions to include.
type FeatureScheduleControl struct {
// InitialBatchSize is the size of the initial batch of transactions.
InitialBatchSize uint32 `json:"initial_batch_size"`
}

// RuntimeInfoResponse is a runtime info response message body.
type RuntimeInfoResponse struct {
// ProtocolVersion is the runtime protocol version supported by the worker.
// ProtocolVersion is the runtime protocol version supported by the runtime.
ProtocolVersion version.Version `json:"protocol_version"`

// RuntimeVersion is the version of the runtime.
RuntimeVersion version.Version `json:"runtime_version"`

// Features describe the features supported by the runtime.
Features *Features `json:"features,omitempty"`
}

// RuntimeCapabilityTEERakInitRequest is a worker RFC 0009 CapabilityTEE
Expand Down Expand Up @@ -281,8 +305,27 @@ func (b *ComputedBatch) String() string {
return "<ComputedBatch>"
}

// ExecutionMode is the batch execution mode.
type ExecutionMode uint8

const (
// ExecutionModeExecute is the execution mode where the batch of transactions is executed as-is
// without the ability to perform any modifications to the batch.
ExecutionModeExecute = 0
// ExecutionModeSchedule is the execution mode where the runtime is in control of scheduling and
// may arbitrarily modify the batch during execution.
//
// This execution mode will only be used in case the runtime advertises to support the schedule
// control feature. In this case the call will only contain up to InitialBatchSize transactions
// and the runtime will need to request more if it needs more.
ExecutionModeSchedule = 1
)

// RuntimeExecuteTxBatchRequest is a worker execute tx batch request message body.
type RuntimeExecuteTxBatchRequest struct {
// Mode is the execution mode.
Mode ExecutionMode `json:"mode,omitempty"`

// ConsensusBlock is the consensus light block at the last finalized round
// height (e.g., corresponding to .Block.Header.Round).
ConsensusBlock consensus.LightBlock `json:"consensus_block"`
Expand Down Expand Up @@ -311,6 +354,16 @@ type RuntimeExecuteTxBatchRequest struct {
type RuntimeExecuteTxBatchResponse struct {
Batch ComputedBatch `json:"batch"`
BatchWeightLimits map[transaction.Weight]uint64 `json:"batch_weight_limits"`

// TxHashes are the transaction hashes of the included batch.
TxHashes []hash.Hash `json:"tx_hashes,omitempty"`
// TxRejectHashes are the transaction hashes of transactions that should be immediately removed
// from the scheduling queue as they are invalid.
TxRejectHashes []hash.Hash `json:"tx_reject_hashes,omitempty"`
// TxInputRoot is the root hash of all transaction inputs.
TxInputRoot hash.Hash `json:"tx_input_root,omitempty"`
// TxInputWriteLog is the write log for generating transaction inputs.
TxInputWriteLog storage.WriteLog `json:"tx_input_write_log,omitempty"`
}

// RuntimeKeyManagerPolicyUpdateRequest is a runtime key manager policy request
Expand Down Expand Up @@ -409,3 +462,19 @@ type HostFetchConsensusBlockRequest struct {
type HostFetchConsensusBlockResponse struct {
Block consensus.LightBlock `json:"block"`
}

// HostFetchTxBatchRequest is a request to host to fetch a further batch of transactions.
type HostFetchTxBatchRequest struct {
// Offset specifies the transaction hash that should serve as an offset when returning
// transactions from the pool. Transactions will be skipped until the given hash is encountered
// and only following transactions will be returned.
Offset *hash.Hash `json:"offset,omitempty"`
// Limit specifies the maximum size of the batch to request.
Limit uint32 `json:"limit"`
}

// HostFetchTxBatchResponse is a response from host fetching the given transaction batch.
type HostFetchTxBatchResponse struct {
// Batch is a batch of transactions.
Batch [][]byte `json:"batch,omitempty"`
}
23 changes: 20 additions & 3 deletions go/runtime/host/sandbox/sandbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,14 +106,31 @@ func (r *sandboxedRuntime) ID() common.Namespace {
return r.rtCfg.RuntimeID
}

// Implements host.Runtime.
func (r *sandboxedRuntime) GetInfo(ctx context.Context) (rsp *protocol.RuntimeInfoResponse, err error) {
callFn := func() error {
r.RLock()
defer r.RUnlock()

if r.conn == nil {
return fmt.Errorf("runtime is not ready")
}
rsp, err = r.conn.GetInfo(ctx)
return err
}

// Retry call in case the runtime is not yet ready.
err = backoff.Retry(callFn, backoff.WithContext(cmnBackoff.NewExponentialBackOff(), ctx))
return
}

// Implements host.Runtime.
func (r *sandboxedRuntime) Call(ctx context.Context, body *protocol.Body) (rsp *protocol.Body, err error) {
callFn := func() error {
r.RLock()
conn := r.conn
r.RUnlock()
defer r.RUnlock()

if conn == nil {
if r.conn == nil {
return fmt.Errorf("runtime is not ready")
}
rsp, err = r.conn.Call(ctx, body)
Expand Down
22 changes: 21 additions & 1 deletion go/runtime/registry/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/oasisprotocol/oasis-core/go/roothash/api/block"
"github.com/oasisprotocol/oasis-core/go/runtime/host"
"github.com/oasisprotocol/oasis-core/go/runtime/host/protocol"
"github.com/oasisprotocol/oasis-core/go/runtime/txpool"
storage "github.com/oasisprotocol/oasis-core/go/storage/api"
"github.com/oasisprotocol/oasis-core/go/storage/mkvs/syncer"
)
Expand Down Expand Up @@ -117,6 +118,9 @@ type RuntimeHostHandlerEnvironment interface {

// GetKeyManagerClient returns the key manager client for this runtime.
GetKeyManagerClient(ctx context.Context) (keymanagerClientApi.Client, error)

// GetTxPool returns the transaction pool for this runtime.
GetTxPool(ctx context.Context) (txpool.TransactionPool, error)
}

// RuntimeHostHandler is a runtime host handler suitable for compute runtimes. It provides the
Expand Down Expand Up @@ -207,6 +211,23 @@ func (h *runtimeHostHandler) Handle(ctx context.Context, body *protocol.Body) (*
Block: *lb,
}}, nil
}
// Transaction pool.
if rq := body.HostFetchTxBatchRequest; rq != nil {
txPool, err := h.env.GetTxPool(ctx)
if err != nil {
return nil, err
}

batch := txPool.GetPrioritizedBatch(rq.Offset, rq.Limit)
raw := make([][]byte, 0, len(batch))
for _, tx := range batch {
raw = append(raw, tx.Raw())
}

return &protocol.Body{HostFetchTxBatchResponse: &protocol.HostFetchTxBatchResponse{
Batch: raw,
}}, nil
}

return nil, errMethodNotSupported
}
Expand Down Expand Up @@ -262,7 +283,6 @@ func (n *runtimeHostNotifier) watchPolicyUpdates() {
n.logger.Debug("no key manager needed for this runtime")
continue
}
// GetStatus(context.Context, *registry.NamespaceQuery) (*Status, error)

var err error
st, err = n.consensus.KeyManager().GetStatus(n.ctx, &registry.NamespaceQuery{
Expand Down
8 changes: 8 additions & 0 deletions go/runtime/scheduling/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,14 @@ type Scheduler interface {
// GetBatch returns a batch of scheduled transactions (if any is available).
GetBatch(force bool) []*transaction.CheckedTransaction

// GetPrioritizedBatch returns a batch of transactions ordered by priority but without taking
// any weight limits into account.
//
// Offset specifies the transaction hash that should serve as an offset when returning
// transactions from the pool. Transactions will be skipped until the given hash is encountered
// and only following transactions will be returned.
GetPrioritizedBatch(offset *hash.Hash, limit uint32) []*transaction.CheckedTransaction

// GetKnownBatch gets a set of known transactions from the transaction pool.
//
// For any missing transactions nil will be returned in their place and the map of missing
Expand Down
4 changes: 4 additions & 0 deletions go/runtime/scheduling/simple/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ func (s *scheduler) GetBatch(force bool) []*transaction.CheckedTransaction {
return s.txPool.GetBatch(force)
}

func (s *scheduler) GetPrioritizedBatch(offset *hash.Hash, limit uint32) []*transaction.CheckedTransaction {
return s.txPool.GetPrioritizedBatch(offset, limit)
}

func (s *scheduler) GetKnownBatch(batch []hash.Hash) ([]*transaction.CheckedTransaction, map[hash.Hash]int) {
return s.txPool.GetKnownBatch(batch)
}
Expand Down
8 changes: 8 additions & 0 deletions go/runtime/scheduling/simple/txpool/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,14 @@ type TxPool interface {
// GetBatch gets a transaction batch from the transaction pool.
GetBatch(force bool) []*transaction.CheckedTransaction

// GetPrioritizedBatch returns a batch of transactions ordered by priority but without taking
// any weight limits into account.
//
// Offset specifies the transaction hash that should serve as an offset when returning
// transactions from the pool. Transactions will be skipped until the given hash is encountered
// and only following transactions will be returned.
GetPrioritizedBatch(offset *hash.Hash, limit uint32) []*transaction.CheckedTransaction

// GetKnownBatch gets a set of known transactions from the transaction pool.
//
// For any missing transactions nil will be returned in their place and the map of missing
Expand Down
Loading