Skip to content

Commit

Permalink
Merge pull request #3213 from oasisprotocol/kostko/fix/rt-round-timeout
Browse files Browse the repository at this point in the history
go/consensus/tendermint/apps/roothash: Clear timeout in emitEmptyBlock
  • Loading branch information
kostko authored Aug 26, 2020
2 parents 21c6f98 + a7c5872 commit f4db6a5
Show file tree
Hide file tree
Showing 5 changed files with 189 additions and 42 deletions.
1 change: 1 addition & 0 deletions .changelog/3213.trivial.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
go/consensus/tendermint/apps/roothash: Clear timeout in emitEmptyBlock
60 changes: 45 additions & 15 deletions go/consensus/tendermint/apps/roothash/roothash.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,9 @@ func (app *rootHashApplication) onCommitteeChanged(ctx *tmapi.Context, epoch epo

// Emit an empty epoch transition block in the new round. This is required so that
// the clients can be sure what state is final when an epoch transition occurs.
app.emitEmptyBlock(ctx, rtState, block.EpochTransition)
if err = app.emitEmptyBlock(ctx, rtState, block.EpochTransition); err != nil {
return fmt.Errorf("failed to emit empty block: %w", err)
}

// Set the executor pool.
rtState.ExecutorPool = executorPool
Expand Down Expand Up @@ -189,7 +191,9 @@ func (app *rootHashApplication) suspendUnpaidRuntime(
rtState.ExecutorPool = nil

// Emity an empty block signalling that the runtime was suspended.
app.emitEmptyBlock(ctx, rtState, block.Suspended)
if err := app.emitEmptyBlock(ctx, rtState, block.Suspended); err != nil {
return fmt.Errorf("failed to emit empty block: %w", err)
}

return nil
}
Expand Down Expand Up @@ -230,12 +234,19 @@ func (app *rootHashApplication) prepareNewCommittees(
return
}

func (app *rootHashApplication) emitEmptyBlock(ctx *tmapi.Context, runtime *roothashState.RuntimeState, hdrType block.HeaderType) {
func (app *rootHashApplication) emitEmptyBlock(ctx *tmapi.Context, runtime *roothashState.RuntimeState, hdrType block.HeaderType) error {
blk := block.NewEmptyBlock(runtime.CurrentBlock, uint64(ctx.Now().Unix()), hdrType)

runtime.CurrentBlock = blk
runtime.CurrentBlockHeight = ctx.BlockHeight()
if runtime.ExecutorPool != nil {
// Clear timeout if there was one scheduled.
if runtime.ExecutorPool.NextTimeout != commitment.TimeoutNever {
state := roothashState.NewMutableState(ctx.State())
if err := state.ClearRoundTimeout(ctx, runtime.Runtime.ID, runtime.ExecutorPool.NextTimeout); err != nil {
return fmt.Errorf("failed to clear round timeout: %w", err)
}
}
runtime.ExecutorPool.ResetCommitments()
}

Expand All @@ -248,6 +259,7 @@ func (app *rootHashApplication) emitEmptyBlock(ctx *tmapi.Context, runtime *root
Attribute(KeyFinalized, cbor.Marshal(tagV)).
Attribute(KeyRuntimeID, ValueRuntimeID(runtime.Runtime.ID)),
)
return nil
}

func (app *rootHashApplication) ExecuteTx(ctx *tmapi.Context, tx *transaction.Transaction) error {
Expand Down Expand Up @@ -413,11 +425,13 @@ func (app *rootHashApplication) processRoundTimeout(ctx *tmapi.Context, state *r
return nil
}

func (app *rootHashApplication) tryFinalizeExecutor(
// tryFinalizeExecutorCommits tries to finalize the executor commitments into a new runtime block.
// The caller must take care of clearing and scheduling the round timeouts.
func (app *rootHashApplication) tryFinalizeExecutorCommits(
ctx *tmapi.Context,
rtState *roothashState.RuntimeState,
forced bool,
) *block.Block {
) (*block.Block, error) {
runtime := rtState.Runtime
blockNr := rtState.CurrentBlock.Header.Round

Expand All @@ -437,16 +451,17 @@ func (app *rootHashApplication) tryFinalizeExecutor(
blk.Header.StateRoot = hdr.StateRoot
// Messages omitted on purpose.

// Timeout will be cleared by caller.
rtState.ExecutorPool.ResetCommitments()

return blk
return blk, nil
case commitment.ErrStillWaiting:
// Need more commits.
ctx.Logger().Debug("insufficient commitments for finality, waiting",
"round", blockNr,
)

return nil
return nil, nil
case commitment.ErrDiscrepancyDetected:
// Discrepancy has been detected.
ctx.Logger().Warn("executor discrepancy detected",
Expand All @@ -465,7 +480,7 @@ func (app *rootHashApplication) tryFinalizeExecutor(
Attribute(KeyExecutionDiscrepancyDetected, cbor.Marshal(tagV)).
Attribute(KeyRuntimeID, ValueRuntimeID(runtime.ID)),
)
return nil
return nil, nil
default:
}

Expand All @@ -476,11 +491,14 @@ func (app *rootHashApplication) tryFinalizeExecutor(
logging.LogEvent, roothash.LogEventRoundFailed,
)

app.emitEmptyBlock(ctx, rtState, block.RoundFailed)
return nil
if err := app.emitEmptyBlock(ctx, rtState, block.RoundFailed); err != nil {
return nil, fmt.Errorf("failed to emit empty block: %w", err)
}

return nil, nil
}

func (app *rootHashApplication) postProcessFinalizedBlock(ctx *tmapi.Context, rtState *roothashState.RuntimeState, blk *block.Block) {
func (app *rootHashApplication) postProcessFinalizedBlock(ctx *tmapi.Context, rtState *roothashState.RuntimeState, blk *block.Block) error {
sc := ctx.StartCheckpoint()
defer sc.Close()

Expand All @@ -498,9 +516,11 @@ func (app *rootHashApplication) postProcessFinalizedBlock(ctx *tmapi.Context, rt
)

// Substitute empty block.
app.emitEmptyBlock(ctx, rtState, block.RoundFailed)
if err := app.emitEmptyBlock(ctx, rtState, block.RoundFailed); err != nil {
return fmt.Errorf("failed to emit empty block: %w", err)
}

return
return nil
}
}

Expand All @@ -519,6 +539,7 @@ func (app *rootHashApplication) postProcessFinalizedBlock(ctx *tmapi.Context, rt
Attribute(KeyFinalized, cbor.Marshal(tagV)).
Attribute(KeyRuntimeID, ValueRuntimeID(rtState.Runtime.ID)),
)
return nil
}

func (app *rootHashApplication) tryFinalizeBlock(
Expand All @@ -527,6 +548,10 @@ func (app *rootHashApplication) tryFinalizeBlock(
forced bool,
) (err error) {
defer func(previousTimeout int64) {
if err != nil {
return
}

// Do not re-arm the round timeout if the timeout has not changed.
nextTimeout := rtState.ExecutorPool.NextTimeout
if previousTimeout == nextTimeout {
Expand Down Expand Up @@ -558,12 +583,17 @@ func (app *rootHashApplication) tryFinalizeBlock(
}
}(rtState.ExecutorPool.NextTimeout)

finalizedBlock := app.tryFinalizeExecutor(ctx, rtState, forced)
finalizedBlock, err := app.tryFinalizeExecutorCommits(ctx, rtState, forced)
if err != nil {
return fmt.Errorf("failed to finalize executor commits: %w", err)
}
if finalizedBlock == nil {
return nil
}

app.postProcessFinalizedBlock(ctx, rtState, finalizedBlock)
if err = app.postProcessFinalizedBlock(ctx, rtState, finalizedBlock); err != nil {
return fmt.Errorf("failed to post process finalized block: %w", err)
}
return nil
}

Expand Down
36 changes: 29 additions & 7 deletions go/consensus/tendermint/apps/roothash/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,27 +57,49 @@ func NewImmutableState(ctx context.Context, state api.ApplicationQueryState, ver
return &ImmutableState{is}, nil
}

// RuntimesWithRoundTimeouts returns the runtimes that have round timeouts scheduled at the given
// height.
func (s *ImmutableState) RuntimesWithRoundTimeouts(ctx context.Context, height int64) ([]common.Namespace, error) {
func (s *ImmutableState) runtimesWithRoundTimeouts(ctx context.Context, height *int64) ([]common.Namespace, []int64, error) {
it := s.is.NewIterator(ctx)
defer it.Close()

var startKey []byte
if height == nil {
startKey = roundTimeoutQueueKeyFmt.Encode()
} else {
startKey = roundTimeoutQueueKeyFmt.Encode(height)
}

var runtimeIDs []common.Namespace
for it.Seek(roundTimeoutQueueKeyFmt.Encode(height)); it.Valid(); it.Next() {
var heights []int64
for it.Seek(startKey); it.Valid(); it.Next() {
var decHeight int64
if !roundTimeoutQueueKeyFmt.Decode(it.Key(), &decHeight) || decHeight != height {
if !roundTimeoutQueueKeyFmt.Decode(it.Key(), &decHeight) || (height != nil && decHeight != *height) {
break
}

var runtimeID common.Namespace
if err := runtimeID.UnmarshalBinary(it.Value()); err != nil {
return nil, api.UnavailableStateError(err)
return nil, nil, api.UnavailableStateError(err)
}

runtimeIDs = append(runtimeIDs, runtimeID)
if height == nil {
heights = append(heights, decHeight)
}
}
return runtimeIDs, nil
return runtimeIDs, heights, nil
}

// RuntimesWithRoundTimeouts returns the runtimes that have round timeouts scheduled at the given
// height.
func (s *ImmutableState) RuntimesWithRoundTimeouts(ctx context.Context, height int64) ([]common.Namespace, error) {
runtimeIDs, _, err := s.runtimesWithRoundTimeouts(ctx, &height)
return runtimeIDs, err
}

// RuntimesWithRoundTimeoutsAny returns the runtimes that have round timeouts scheduled at any
// height.
func (s *ImmutableState) RuntimesWithRoundTimeoutsAny(ctx context.Context) ([]common.Namespace, []int64, error) {
return s.runtimesWithRoundTimeouts(ctx, nil)
}

// RuntimeState returns the roothash runtime state for a specific runtime.
Expand Down
17 changes: 17 additions & 0 deletions go/consensus/tendermint/apps/supplementarysanity/checks.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,14 +83,31 @@ func checkRootHash(ctx *abciAPI.Context, now epochtime.EpochTime) error {
}

blocks := make(map[common.Namespace]*block.Block)
runtimesByID := make(map[common.Namespace]*roothashState.RuntimeState)
for _, rt := range runtimes {
blocks[rt.Runtime.ID] = rt.CurrentBlock
runtimesByID[rt.Runtime.ID] = rt
}
err = roothash.SanityCheckBlocks(blocks)
if err != nil {
return fmt.Errorf("SanityCheckBlocks: %w", err)
}

// Make sure that runtime timeout state is consistent with actual timeouts.
runtimeIDs, heights, err := st.RuntimesWithRoundTimeoutsAny(ctx)
if err != nil {
return fmt.Errorf("RuntimesWithRoundTimeoutsAny: %w", err)
}
for i, id := range runtimeIDs {
height := heights[i]
if height < ctx.BlockHeight() {
return fmt.Errorf("round timeout for runtime %s was scheduled at %d but did not trigger", id, height)
}
if !runtimesByID[id].ExecutorPool.IsTimeout(height) {
return fmt.Errorf("runtime %s scheduled for timeout at %d but would not actually trigger", id, height)
}
}

// nothing to check yet
return nil
}
Expand Down
Loading

0 comments on commit f4db6a5

Please sign in to comment.