Skip to content

Commit

Permalink
Merge pull request #5161 from oasisprotocol/kostko/fix/p2p-proposal-fwd
Browse files Browse the repository at this point in the history
go/worker/compute: Do not drop valid proposals
  • Loading branch information
kostko authored Feb 3, 2023
2 parents ed09477 + 1aaa8d7 commit 58f17c6
Show file tree
Hide file tree
Showing 5 changed files with 134 additions and 111 deletions.
7 changes: 7 additions & 0 deletions .changelog/5161.bugfix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
go/worker/compute: Do not drop valid proposals

Previously valid proposals could be dropped instead of being forwarded
via the P2P gossip when the local node's consensus view was slightly
behind even though the proposal was valid. With smaller committees and
certain topologies this could result in some nodes not getting the
proposals.
94 changes: 28 additions & 66 deletions go/worker/compute/executor/committee/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,7 @@ import (
var (
errSeenNewerBlock = fmt.Errorf("executor: seen newer block")
errRuntimeAborted = fmt.Errorf("executor: runtime aborted batch processing")
errIncompatibleHeader = p2pError.Permanent(fmt.Errorf("executor: incompatible header"))
errBatchTooLarge = p2pError.Permanent(fmt.Errorf("executor: batch too large"))
errIncorrectRole = fmt.Errorf("executor: incorrect role")
errIncorrectState = fmt.Errorf("executor: incorrect state")
errMsgFromNonTxnSched = fmt.Errorf("executor: received txn scheduler dispatch msg from non-txn scheduler")

// proposeTimeoutDelay is the duration to wait before submitting the propose timeout request.
Expand Down Expand Up @@ -117,6 +114,7 @@ type Node struct { // nolint: maligned
// Guarded by .commonNode.CrossNode.
proposingTimeout bool
missingTxsCancel context.CancelFunc
pendingProposals *pendingProposals

commonNode *committee.Node
commonCfg commonWorker.Config
Expand Down Expand Up @@ -206,12 +204,13 @@ func (n *Node) getMetricLabels() prometheus.Labels {
}
}

func (n *Node) queueBatchBlocking(ctx context.Context, proposal *commitment.Proposal) error {
func (n *Node) processProposal(ctx context.Context, proposal *commitment.Proposal) error {
rt, err := n.commonNode.Runtime.ActiveDescriptor(ctx)
if err != nil {
n.logger.Warn("failed to fetch active runtime descriptor",
"err", err,
)
// Do not forward the proposal further as we are unable to validate it.
return p2pError.Permanent(err)
}

Expand All @@ -221,6 +220,7 @@ func (n *Node) queueBatchBlocking(ctx context.Context, proposal *commitment.Prop
"max_batch_size", rt.TxnScheduler.MaxBatchSize,
"batch_size", len(proposal.Batch),
)
// Do not forward the proposal further as it seems invalid.
return errBatchTooLarge
}

Expand All @@ -231,7 +231,7 @@ func (n *Node) queueBatchBlocking(ctx context.Context, proposal *commitment.Prop

n.commonNode.CrossNode.Lock()
defer n.commonNode.CrossNode.Unlock()
return n.handleExternalBatchLocked(batch)
return n.handleProposalLocked(batch)
}

func (n *Node) bumpReselect() {
Expand Down Expand Up @@ -308,32 +308,6 @@ func (n *Node) HandleNewBlockLocked(blk *block.Block) {

// Perform actions based on current state.
switch state := n.state.(type) {
case StateWaitingForBlock:
// Check if this was the block we were waiting for.
currentHash := header.EncodedHash()
if currentHash.Equal(&state.batch.proposal.Header.PreviousHash) {
n.logger.Info("received block needed for batch processing")
clearProposalQueue = false
n.maybeStartProcessingBatchLocked(state.batch)
break
}

// Check if the new block is for the same or newer round than the
// one we are waiting for. In this case, we should abort as the
// block will never be seen.
curRound := header.Round
waitRound := state.batch.proposal.Header.Round - 1
if curRound >= waitRound {
n.logger.Warn("seen newer block while waiting for block")
n.transitionLocked(StateWaitingForBatch{})
break
}

// Continue waiting for block.
n.logger.Info("still waiting for block",
"current_round", curRound,
"wait_round", waitRound,
)
case StateWaitingForTxs:
// Stop waiting for transactions and start a new round.
n.logger.Warn("considering previous proposal invalid due to missing transactions")
Expand Down Expand Up @@ -396,6 +370,9 @@ func (n *Node) HandleNewBlockLocked(blk *block.Block) {

n.commonNode.TxPool.WakeupScheduler()
}

// Check if we have any pending proposals and attempt to handle them.
n.handlePendingProposalsLocked()
}

func (n *Node) proposeTimeoutLocked(roundCtx context.Context) error {
Expand Down Expand Up @@ -1313,45 +1290,29 @@ func (n *Node) HandleNewEventLocked(ev *roothash.Event) {
}

// Guarded by n.commonNode.CrossNode.
func (n *Node) handleExternalBatchLocked(batch *unresolvedBatch) error {
// If we are not waiting for a batch, don't do anything.
if _, ok := n.state.(StateWaitingForBatch); !ok {
return errIncorrectState
}

epoch := n.commonNode.Group.GetEpochSnapshot()

// We can only receive external batches if we are an executor member.
if !epoch.IsExecutorMember() {
n.logger.Error("got external batch while in incorrect role")
return errIncorrectRole
}
func (n *Node) handleProposalLocked(batch *unresolvedBatch) error {
n.logger.Debug("handling a new batch proposal",
"proposer", batch.proposal.NodeID,
"round", batch.proposal.Header.Round,
)

// Check if we have the correct block -- in this case, start processing the batch.
currentHash := n.commonNode.CurrentBlock.Header.EncodedHash()
if currentHash.Equal(&batch.proposal.Header.PreviousHash) {
n.maybeStartProcessingBatchLocked(batch)
return nil
}
// TODO: Handle proposal equivocation.

// Check if the current block is older than what is expected we base our batch
// on. In case it is equal or newer, but different, discard the batch.
curRound := n.commonNode.CurrentBlock.Header.Round
waitRound := batch.proposal.Header.Round - 1
if curRound >= waitRound {
n.logger.Warn("got external batch based on incompatible header",
"previous_hash", batch.proposal.Header.PreviousHash,
"round", batch.proposal.Header.Round,
)
return errIncompatibleHeader
if _, ok := n.state.(StateWaitingForBatch); !ok {
// Currently not waiting for batch.
} else if epoch := n.commonNode.Group.GetEpochSnapshot(); !epoch.IsExecutorMember() {
// Currently not an executor committee member.
} else {
// Maybe process if we have the correct block.
currentHash := n.commonNode.CurrentBlock.Header.EncodedHash()
if currentHash.Equal(&batch.proposal.Header.PreviousHash) {
n.maybeStartProcessingBatchLocked(batch)
return nil // Forward proposal.
}
}

// Wait for the correct block to arrive.
n.transitionLocked(StateWaitingForBlock{
batch: batch,
})

return nil
// When checks fail, add proposal into the queue of pending proposals for later retry.
return n.addPendingProposalLocked(batch)
}

// nudgeAvailabilityLocked checks whether the executor worker should declare itself available.
Expand Down Expand Up @@ -1624,6 +1585,7 @@ func NewNode(
commonCfg: commonCfg,
roleProvider: roleProvider,
committeeTopic: committeeTopic,
pendingProposals: newPendingProposals(),
ctx: ctx,
cancelCtx: cancel,
stopCh: make(chan struct{}),
Expand Down
14 changes: 3 additions & 11 deletions go/worker/compute/executor/committee/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,16 +68,12 @@ func (h *committeeMsgHandler) HandleMessage(ctx context.Context, peerID signatur
crash.Here(crashPointBatchReceiveAfter)

proposal := cm.Proposal

epoch := h.n.commonNode.Group.GetEpochSnapshot()
h.n.commonNode.CrossNode.Lock()
round := h.n.commonNode.CurrentBlock.Header.Round
h.n.commonNode.CrossNode.Unlock()

// Before opening the signed dispatch message, verify that it was actually signed by the
// current transaction scheduler.
if err := epoch.VerifyTxnSchedulerSigner(proposal.NodeID, round); err != nil {
// Not signed by the current txn scheduler.
if err := epoch.VerifyTxnSchedulerSigner(proposal.NodeID, proposal.Header.Round-1); err != nil {
// Not signed by the transaction scheduler for the round, do not forward.
return errMsgFromNonTxnSched
}

Expand All @@ -86,11 +82,7 @@ func (h *committeeMsgHandler) HandleMessage(ctx context.Context, peerID signatur
return p2pError.Permanent(err)
}

err := h.n.queueBatchBlocking(ctx, proposal)
if err != nil {
return err
}
return nil
return h.n.processProposal(ctx, proposal)
default:
return p2pError.ErrUnhandledMessage
}
Expand Down
96 changes: 96 additions & 0 deletions go/worker/compute/executor/committee/proposals.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package committee

import (
"fmt"

"github.com/google/btree"

p2pError "github.com/oasisprotocol/oasis-core/go/p2p/error"
)

// maxPendingProposals is the maximum number of pending proposals that can be queued.
const maxPendingProposals = 16

// pendingProposals is a priority queue of pending proposals, ordered by round.
type pendingProposals struct {
q *btree.BTreeG[*unresolvedBatch]
}

func proposalLessFunc(a, b *unresolvedBatch) bool {
return a.proposal.Header.Round < b.proposal.Header.Round
}

func newPendingProposals() *pendingProposals {
return &pendingProposals{
q: btree.NewG(2, proposalLessFunc),
}
}

// addPendingProposalLocked adds a new pending proposal that MUST HAVE already undergone basic
// validity checks and is therefore considered a valid proposal for the given round, but the node's
// local consensus view may not yet be ready to process the proposal.
//
// Must be called with the n.commonNode.CrossNode lock held.
func (n *Node) addPendingProposalLocked(batch *unresolvedBatch) error {
currentRound := n.commonNode.CurrentBlock.Header.Round
round := batch.proposal.Header.Round

// Drop any past proposals.
if round <= currentRound {
return p2pError.Permanent(fmt.Errorf("proposal round is in the past")) // Do not forward.
}

n.pendingProposals.q.ReplaceOrInsert(batch)

// In case of overflows, remove the round that is the most in the future.
n.prunePendingProposalsLocked()
if n.pendingProposals.q.Len() >= maxPendingProposals {
removed, _ := n.pendingProposals.q.DeleteMax()
if removed == batch {
return p2pError.Permanent(fmt.Errorf("proposal queue overflow")) // Do not forward.
}
}

return nil
}

// prunePendingProposalsLocked prunes any proposals which are not valid anymore.
//
// Must be called with the n.commonNode.CrossNode lock held.
func (n *Node) prunePendingProposalsLocked() {
currentRound := n.commonNode.CurrentBlock.Header.Round

for {
batch, ok := n.pendingProposals.q.Min()
if !ok {
break
}
if batch.proposal.Header.Round > currentRound {
// All further proposals are valid.
break
}

// Remove invalid proposals.
n.pendingProposals.q.DeleteMin()
}
}

// handlePendingProposalsLocked attempts to handle any pending proposals. At most one proposal is
// handled.
//
// Must be called with the n.commonNode.CrossNode lock held.
func (n *Node) handlePendingProposalsLocked() {
// Prune any invalid pending proposals.
n.prunePendingProposalsLocked()

// Dequeue the next proposal.
batch, ok := n.pendingProposals.q.DeleteMin()
if !ok {
return
}

// Ignoring the error is fine, because the proposal is either handled (no error) or added
// back to the queue (no error since an overflow cannot happen given we just removed it).
// Since we checked above that the proposal is valid there is no other option.
_ = n.handleProposalLocked(batch)
}
34 changes: 0 additions & 34 deletions go/worker/compute/executor/committee/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ const (
NotReady StateName = "NotReady"
// WaitingForBatch is the name of StateWaitingForBatch.
WaitingForBatch = "WaitingForBatch"
// WaitingForBlock is the name of StateWaitingForBlock.
WaitingForBlock = "WaitingForBlock"
// WaitingForEvent is the name of StateWaitingForEvent.
WaitingForEvent = "WaitingForEvent"
// WaitingForTxs is the name of the StateWaitingForTxs.
Expand All @@ -44,8 +42,6 @@ var validStateTransitions = map[StateName][]StateName{
// Transitions from WaitingForBatch state.
WaitingForBatch: {
WaitingForBatch,
// Received batch, need to catch up current block.
WaitingForBlock,
// Received batch, current block is up to date.
ProcessingBatch,
// Received batch, waiting for discrepancy event.
Expand All @@ -56,20 +52,6 @@ var validStateTransitions = map[StateName][]StateName{
NotReady,
},

// Transitions from WaitingForBlock state.
WaitingForBlock: {
// Abort: seen newer block while waiting for block.
WaitingForBatch,
// Seen block that we were waiting for.
ProcessingBatch,
// Seen block that we were waiting for, waiting for disrepancy event.
WaitingForEvent,
// Seen block that we were waiting for, waiting for transactions.
WaitingForTxs,
// Epoch transition occurred and we are no longer in the committee.
NotReady,
},

// Transitions from WaitingForEvent state.
WaitingForEvent: {
// Abort: seen newer block while waiting for event.
Expand Down Expand Up @@ -142,22 +124,6 @@ func (s StateWaitingForBatch) String() string {
return string(s.Name())
}

// StateWaitingForBlock is the waiting for block state.
type StateWaitingForBlock struct {
// Batch that is waiting to be processed.
batch *unresolvedBatch
}

// Name returns the name of the state.
func (s StateWaitingForBlock) Name() StateName {
return WaitingForBlock
}

// String returns a string representation of the state.
func (s StateWaitingForBlock) String() string {
return string(s.Name())
}

// StateWaitingForEvent is the waiting for event state.
type StateWaitingForEvent struct {
// Batch that is being processed.
Expand Down

0 comments on commit 58f17c6

Please sign in to comment.