Skip to content

Commit

Permalink
runtime/src/enclave_rpc: Support peer feedback for concurrent requests
Browse files Browse the repository at this point in the history
  • Loading branch information
peternose committed Sep 30, 2024
1 parent 1e1a2f6 commit c874465
Show file tree
Hide file tree
Showing 9 changed files with 444 additions and 155 deletions.
1 change: 1 addition & 0 deletions .changelog/5872.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
runtime/src/enclave_rpc: Support peer feedback for concurrent requests
2 changes: 1 addition & 1 deletion go/p2p/rpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type PeerFeedback interface {
// The peer will be ignored during peer selection.
RecordBadPeer()

// PeerID returns the id of the peer.
// PeerID returns the ID of the peer.
PeerID() core.PeerID
}

Expand Down
25 changes: 22 additions & 3 deletions go/runtime/host/protocol/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ type Body struct {
// Host interface.
HostRPCCallRequest *HostRPCCallRequest `json:",omitempty"`
HostRPCCallResponse *HostRPCCallResponse `json:",omitempty"`
HostSubmitPeerFeedbackRequest *HostSubmitPeerFeedbackRequest `json:",omitempty"`
HostSubmitPeerFeedbackResponse *Empty `json:",omitempty"`
HostStorageSyncRequest *HostStorageSyncRequest `json:",omitempty"`
HostStorageSyncResponse *HostStorageSyncResponse `json:",omitempty"`
HostLocalStorageGetRequest *HostLocalStorageGetRequest `json:",omitempty"`
Expand Down Expand Up @@ -474,13 +476,15 @@ type RuntimeConsensusSyncRequest struct {

// HostRPCCallRequest is a host RPC call request message body.
type HostRPCCallRequest struct {
Endpoint string `json:"endpoint"`
Request []byte `json:"request"`
Kind enclaverpc.Kind `json:"kind,omitempty"`
Endpoint string `json:"endpoint"`
RequestID uint64 `json:"request_id"`
Request []byte `json:"request"`
Kind enclaverpc.Kind `json:"kind,omitempty"`

// Nodes are optional node identities in case the request should be forwarded to specific
// node instances and not to randomly chosen ones as selected by the host.
Nodes []signature.PublicKey `json:"nodes"`

// PeerFeedback contains optional peer feedback for the last RPC call under the given endpoint.
//
// This enables the runtime to notify the node whether the given peer should continue to be used
Expand All @@ -498,6 +502,21 @@ type HostRPCCallResponse struct {
Node signature.PublicKey `json:"node"`
}

// HostSubmitPeerFeedbackRequest is a host submit peer feedback request message body.
type HostSubmitPeerFeedbackRequest struct {
// Endpoint is the RPC endpoint.
Endpoint string `json:"endpoint"`

// RequestID is the ID of the RPC request to which the feedback belongs.
RequestID uint64 `json:"request_id"`

// PeerFeedback contains feedback related to the given RPC call.
//
// This enables the runtime to notify the node whether the given peer should continue to be used
// or not based on higher-level logic that lives in the runtime.
PeerFeedback enclaverpc.PeerFeedback `json:"peer_feedback"`
}

// HostStorageEndpoint is the host storage endpoint.
type HostStorageEndpoint uint8

Expand Down
24 changes: 22 additions & 2 deletions go/runtime/keymanager/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,32 @@ const EnclaveRPCEndpoint = "key-manager"

// Client is the key manager client interface.
type Client interface {
// CallEnclave calls the key manager via remote EnclaveRPC.
// CallEnclaveDeprecated calls the key manager via remote EnclaveRPC.
//
// The node to which the call will be routed is chosen at random from the key manager committee
// members. The latter can be restricted by specifying a non-empty list of allowed nodes.
//
// The provided peer feedback is optional feedback on the peer that handled the last EnclaveRPC
// request (if any) which may be used to inform the routing decision.
CallEnclave(ctx context.Context, data []byte, nodes []signature.PublicKey, kind enclaverpc.Kind, pf *enclaverpc.PeerFeedback) ([]byte, signature.PublicKey, error)
//
// Deprecated: This method is deprecated and will be removed in future versions.
CallEnclaveDeprecated(ctx context.Context, data []byte, nodes []signature.PublicKey, kind enclaverpc.Kind, pf *enclaverpc.PeerFeedback) ([]byte, signature.PublicKey, error)

// CallEnclave calls the key manager via remote enclave RPC.
//
// The node to which the call will be routed is chosen at random from the key manager committee
// members. The latter can be restricted by specifying a non-empty list of allowed nodes.
CallEnclave(ctx context.Context, requestID uint64, data []byte, nodes []signature.PublicKey, kind enclaverpc.Kind) (*EnclaveResponse, error)

// SubmitPeerFeedback submits peer feedback for the given request.
SubmitPeerFeedback(requestID uint64, feedback enclaverpc.PeerFeedback)
}

// EnclaveResponse is the enclave response.
type EnclaveResponse struct {
// Data contains the actual response data.
Data []byte

// Node is the public key of the node that generated the response.
Node signature.PublicKey
}
47 changes: 44 additions & 3 deletions go/runtime/registry/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
beacon "github.com/oasisprotocol/oasis-core/go/beacon/api"
"github.com/oasisprotocol/oasis-core/go/common"
"github.com/oasisprotocol/oasis-core/go/common/cbor"
"github.com/oasisprotocol/oasis-core/go/common/crypto/signature"
"github.com/oasisprotocol/oasis-core/go/common/identity"
"github.com/oasisprotocol/oasis-core/go/common/logging"
"github.com/oasisprotocol/oasis-core/go/common/node"
Expand Down Expand Up @@ -246,19 +247,56 @@ func (h *runtimeHostHandler) handleHostRPCCall(
if err != nil {
return nil, err
}
res, node, err := kmCli.CallEnclave(ctx, rq.Request, rq.Nodes, rq.Kind, rq.PeerFeedback)

if rq.RequestID == 0 {
var (
res []byte
node signature.PublicKey
)

res, node, err = kmCli.CallEnclaveDeprecated(ctx, rq.Request, rq.Nodes, rq.Kind, rq.PeerFeedback) //nolint:staticcheck // Suppress SA1019 deprecation warning
if err != nil {
return nil, err
}

return &protocol.HostRPCCallResponse{
Response: res,
Node: node,
}, nil
}

res, err := kmCli.CallEnclave(ctx, rq.RequestID, rq.Request, rq.Nodes, rq.Kind)
if err != nil {
return nil, err
}

return &protocol.HostRPCCallResponse{
Response: res,
Node: node,
Response: res.Data,
Node: res.Node,
}, nil
default:
return nil, fmt.Errorf("endpoint not supported")
}
}

func (h *runtimeHostHandler) handleHostSubmitPeerFeedback(
rq *protocol.HostSubmitPeerFeedbackRequest,
) (*protocol.Empty, error) {
switch rq.Endpoint {
case runtimeKeymanager.EnclaveRPCEndpoint:
kmCli, err := h.env.GetKeyManagerClient()
if err != nil {
return nil, err
}

kmCli.SubmitPeerFeedback(rq.RequestID, rq.PeerFeedback)

return &protocol.Empty{}, nil
default:
return nil, fmt.Errorf("endpoint not supported")
}
}

func (h *runtimeHostHandler) handleHostStorageSync(
ctx context.Context,
rq *protocol.HostStorageSyncRequest,
Expand Down Expand Up @@ -508,6 +546,9 @@ func (h *runtimeHostHandler) Handle(ctx context.Context, rq *protocol.Body) (*pr
case rq.HostRPCCallRequest != nil:
// RPC.
rsp.HostRPCCallResponse, err = h.handleHostRPCCall(ctx, rq.HostRPCCallRequest)
case rq.HostSubmitPeerFeedbackRequest != nil:
// Peer feedback.
rsp.HostSubmitPeerFeedbackResponse, err = h.handleHostSubmitPeerFeedback(rq.HostSubmitPeerFeedbackRequest)
case rq.HostStorageSyncRequest != nil:
// Storage sync.
rsp.HostStorageSyncResponse, err = h.handleHostStorageSync(ctx, rq.HostStorageSyncRequest)
Expand Down
133 changes: 127 additions & 6 deletions go/worker/common/committee/keymanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@ import (
"context"
"fmt"
"sync"
"time"

"github.com/libp2p/go-libp2p/core"
"golang.org/x/exp/maps"

"github.com/oasisprotocol/oasis-core/go/common"
"github.com/oasisprotocol/oasis-core/go/common/cache/lru"
"github.com/oasisprotocol/oasis-core/go/common/crypto/signature"
"github.com/oasisprotocol/oasis-core/go/common/logging"
cmSync "github.com/oasisprotocol/oasis-core/go/common/sync"
Expand All @@ -18,9 +20,20 @@ import (
"github.com/oasisprotocol/oasis-core/go/p2p/rpc"
registry "github.com/oasisprotocol/oasis-core/go/registry/api"
enclaverpc "github.com/oasisprotocol/oasis-core/go/runtime/enclaverpc/api"
runtimeKeymanager "github.com/oasisprotocol/oasis-core/go/runtime/keymanager/api"
keymanagerP2P "github.com/oasisprotocol/oasis-core/go/worker/keymanager/p2p"
)

const (
// peerFeedbackCacheSize is the maximum number of peer feedbacks
// in the cache.
peerFeedbackCacheSize = 100

// maxPeerFeedbackAge is the maximum age of peer feedback in the cache
// before it is discarded.
maxPeerFeedbackAge = time.Minute
)

// KeyManagerClientWrapper is a wrapper for the key manager P2P client that handles deferred
// initialization after the key manager runtime ID is known.
//
Expand All @@ -37,6 +50,7 @@ type KeyManagerClientWrapper struct {
logger *logging.Logger

lastPeerFeedback rpc.PeerFeedback
peerFeedbacks *lru.Cache
}

// Initialized returns a channel that gets closed when the client is initialized.
Expand Down Expand Up @@ -84,10 +98,11 @@ func (km *KeyManagerClientWrapper) SetKeyManagerID(id *common.Namespace) {
}

km.lastPeerFeedback = nil
km.peerFeedbacks.Clear()
}

// CallEnclave implements runtimeKeymanager.Client.
func (km *KeyManagerClientWrapper) CallEnclave(
// CallEnclaveDeprecated implements runtimeKeymanager.Client.
func (km *KeyManagerClientWrapper) CallEnclaveDeprecated(
ctx context.Context,
data []byte,
nodes []signature.PublicKey,
Expand Down Expand Up @@ -160,13 +175,109 @@ func (km *KeyManagerClientWrapper) CallEnclave(
return rsp.Data, node, nil
}

// CallEnclave implements runtimeKeymanager.Client.
func (km *KeyManagerClientWrapper) CallEnclave(
ctx context.Context,
requestID uint64,
data []byte,
nodes []signature.PublicKey,
kind enclaverpc.Kind,
) (*runtimeKeymanager.EnclaveResponse, error) {
cli, err := km.getKeyManagerClient()
if err != nil {
return nil, err
}

// Call only members of the key manager committee. If no nodes are given, use all members.
kmNodes := km.nt.Nodes(nodes)
if len(kmNodes) == 0 && len(nodes) > 0 {
return nil, fmt.Errorf("nodes not in committee")
}
peers := maps.Keys(kmNodes)

req := &keymanagerP2P.CallEnclaveRequest{
Data: data,
Kind: kind,
}

rsp, feedback, err := cli.CallEnclave(ctx, req, peers)
if err != nil {
return nil, err
}

node, ok := kmNodes[feedback.PeerID()]
if !ok {
return nil, fmt.Errorf("unknown peer id")
}

info := peerFeedbackInfo{
requestID: requestID,
feedback: feedback,
timestamp: time.Now(),
}

// Put is expected to never fail since byte capacity is not enabled.
_ = km.peerFeedbacks.Put(requestID, &info)

return &runtimeKeymanager.EnclaveResponse{
Data: rsp.Data,
Node: node,
}, nil
}

// SubmitPeerFeedback implements runtimeKeymanager.Client.
func (km *KeyManagerClientWrapper) SubmitPeerFeedback(requestID uint64, feedback enclaverpc.PeerFeedback) {
var info *peerFeedbackInfo

// Pop peer feedback info.
item, ok := km.peerFeedbacks.Peek(requestID)
if ok {
_ = km.peerFeedbacks.Remove(requestID)
info = item.(*peerFeedbackInfo)
}

// Discard expired feedbacks.
valid := ok && time.Since(info.timestamp) <= maxPeerFeedbackAge

km.logger.Debug("received peer feedback from runtime",
"request_id", requestID,
"peer_feedback", feedback,
"valid", valid,
)

if !valid {
return
}

switch feedback {
case enclaverpc.PeerFeedbackSuccess:
info.feedback.RecordSuccess()
case enclaverpc.PeerFeedbackFailure:
info.feedback.RecordFailure()
case enclaverpc.PeerFeedbackBadPeer:
info.feedback.RecordBadPeer()
default:
}
}

func (km *KeyManagerClientWrapper) getKeyManagerClient() (keymanagerP2P.Client, error) {
km.l.Lock()
defer km.l.Unlock()

if km.cli == nil {
return nil, fmt.Errorf("key manager not available")
}
return km.cli, nil
}

// NewKeyManagerClientWrapper creates a new key manager client wrapper.
func NewKeyManagerClientWrapper(p2p p2p.Service, consensus consensus.Backend, chainContext string, logger *logging.Logger) *KeyManagerClientWrapper {
return &KeyManagerClientWrapper{
p2p: p2p,
consensus: consensus,
chainContext: chainContext,
logger: logger,
p2p: p2p,
consensus: consensus,
chainContext: chainContext,
logger: logger,
peerFeedbacks: lru.New(lru.Capacity(peerFeedbackCacheSize, false)),
}
}

Expand Down Expand Up @@ -317,3 +428,13 @@ func newKeyManagerNodeTracker(p2p p2p.Service, consensus consensus.Backend, keym
logger: logging.GetLogger("worker/common/committee/keymanager/nodetracker"),
}
}

// peerFeedbackInfo stores information related to peer feedback.
type peerFeedbackInfo struct {
// requestID is the ID of the request.
requestID uint64
// feedback holds the peer feedback stored in this node.
feedback rpc.PeerFeedback
// timestamp is the time when the feedback was added to the cache.
timestamp time.Time
}
Loading

0 comments on commit c874465

Please sign in to comment.