Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

go/consensus/tendermint/apps/roothash: Clear timeout in emitEmptyBlock #3213

Merged
merged 2 commits into from
Aug 26, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .changelog/3213.trivial.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
go/consensus/tendermint/apps/roothash: Clear timeout in emitEmptyBlock
60 changes: 45 additions & 15 deletions go/consensus/tendermint/apps/roothash/roothash.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,9 @@ func (app *rootHashApplication) onCommitteeChanged(ctx *tmapi.Context, epoch epo

// Emit an empty epoch transition block in the new round. This is required so that
// the clients can be sure what state is final when an epoch transition occurs.
app.emitEmptyBlock(ctx, rtState, block.EpochTransition)
if err = app.emitEmptyBlock(ctx, rtState, block.EpochTransition); err != nil {
return fmt.Errorf("failed to emit empty block: %w", err)
}

// Set the executor pool.
rtState.ExecutorPool = executorPool
Expand Down Expand Up @@ -189,7 +191,9 @@ func (app *rootHashApplication) suspendUnpaidRuntime(
rtState.ExecutorPool = nil

// Emity an empty block signalling that the runtime was suspended.
app.emitEmptyBlock(ctx, rtState, block.Suspended)
if err := app.emitEmptyBlock(ctx, rtState, block.Suspended); err != nil {
return fmt.Errorf("failed to emit empty block: %w", err)
}

return nil
}
Expand Down Expand Up @@ -230,12 +234,19 @@ func (app *rootHashApplication) prepareNewCommittees(
return
}

func (app *rootHashApplication) emitEmptyBlock(ctx *tmapi.Context, runtime *roothashState.RuntimeState, hdrType block.HeaderType) {
func (app *rootHashApplication) emitEmptyBlock(ctx *tmapi.Context, runtime *roothashState.RuntimeState, hdrType block.HeaderType) error {
blk := block.NewEmptyBlock(runtime.CurrentBlock, uint64(ctx.Now().Unix()), hdrType)

runtime.CurrentBlock = blk
runtime.CurrentBlockHeight = ctx.BlockHeight()
if runtime.ExecutorPool != nil {
// Clear timeout if there was one scheduled.
if runtime.ExecutorPool.NextTimeout != commitment.TimeoutNever {
state := roothashState.NewMutableState(ctx.State())
if err := state.ClearRoundTimeout(ctx, runtime.Runtime.ID, runtime.ExecutorPool.NextTimeout); err != nil {
return fmt.Errorf("failed to clear round timeout: %w", err)
}
}
runtime.ExecutorPool.ResetCommitments()
}

Expand All @@ -248,6 +259,7 @@ func (app *rootHashApplication) emitEmptyBlock(ctx *tmapi.Context, runtime *root
Attribute(KeyFinalized, cbor.Marshal(tagV)).
Attribute(KeyRuntimeID, ValueRuntimeID(runtime.Runtime.ID)),
)
return nil
}

func (app *rootHashApplication) ExecuteTx(ctx *tmapi.Context, tx *transaction.Transaction) error {
Expand Down Expand Up @@ -413,11 +425,13 @@ func (app *rootHashApplication) processRoundTimeout(ctx *tmapi.Context, state *r
return nil
}

func (app *rootHashApplication) tryFinalizeExecutor(
// tryFinalizeExecutorCommits tries to finalize the executor commitments into a new runtime block.
// The caller must take care of clearing and scheduling the round timeouts.
func (app *rootHashApplication) tryFinalizeExecutorCommits(
ctx *tmapi.Context,
rtState *roothashState.RuntimeState,
forced bool,
) *block.Block {
) (*block.Block, error) {
runtime := rtState.Runtime
blockNr := rtState.CurrentBlock.Header.Round

Expand All @@ -437,16 +451,17 @@ func (app *rootHashApplication) tryFinalizeExecutor(
blk.Header.StateRoot = hdr.StateRoot
// Messages omitted on purpose.

// Timeout will be cleared by caller.
rtState.ExecutorPool.ResetCommitments()

return blk
return blk, nil
case commitment.ErrStillWaiting:
// Need more commits.
ctx.Logger().Debug("insufficient commitments for finality, waiting",
"round", blockNr,
)

return nil
return nil, nil
case commitment.ErrDiscrepancyDetected:
// Discrepancy has been detected.
ctx.Logger().Warn("executor discrepancy detected",
Expand All @@ -465,7 +480,7 @@ func (app *rootHashApplication) tryFinalizeExecutor(
Attribute(KeyExecutionDiscrepancyDetected, cbor.Marshal(tagV)).
Attribute(KeyRuntimeID, ValueRuntimeID(runtime.ID)),
)
return nil
return nil, nil
default:
}

Expand All @@ -476,11 +491,14 @@ func (app *rootHashApplication) tryFinalizeExecutor(
logging.LogEvent, roothash.LogEventRoundFailed,
)

app.emitEmptyBlock(ctx, rtState, block.RoundFailed)
return nil
if err := app.emitEmptyBlock(ctx, rtState, block.RoundFailed); err != nil {
return nil, fmt.Errorf("failed to emit empty block: %w", err)
}

return nil, nil
}

func (app *rootHashApplication) postProcessFinalizedBlock(ctx *tmapi.Context, rtState *roothashState.RuntimeState, blk *block.Block) {
func (app *rootHashApplication) postProcessFinalizedBlock(ctx *tmapi.Context, rtState *roothashState.RuntimeState, blk *block.Block) error {
sc := ctx.StartCheckpoint()
defer sc.Close()

Expand All @@ -498,9 +516,11 @@ func (app *rootHashApplication) postProcessFinalizedBlock(ctx *tmapi.Context, rt
)

// Substitute empty block.
app.emitEmptyBlock(ctx, rtState, block.RoundFailed)
if err := app.emitEmptyBlock(ctx, rtState, block.RoundFailed); err != nil {
return fmt.Errorf("failed to emit empty block: %w", err)
}

return
return nil
}
}

Expand All @@ -519,6 +539,7 @@ func (app *rootHashApplication) postProcessFinalizedBlock(ctx *tmapi.Context, rt
Attribute(KeyFinalized, cbor.Marshal(tagV)).
Attribute(KeyRuntimeID, ValueRuntimeID(rtState.Runtime.ID)),
)
return nil
}

func (app *rootHashApplication) tryFinalizeBlock(
Expand All @@ -527,6 +548,10 @@ func (app *rootHashApplication) tryFinalizeBlock(
forced bool,
) (err error) {
defer func(previousTimeout int64) {
if err != nil {
return
}

// Do not re-arm the round timeout if the timeout has not changed.
nextTimeout := rtState.ExecutorPool.NextTimeout
if previousTimeout == nextTimeout {
Expand Down Expand Up @@ -558,12 +583,17 @@ func (app *rootHashApplication) tryFinalizeBlock(
}
}(rtState.ExecutorPool.NextTimeout)

finalizedBlock := app.tryFinalizeExecutor(ctx, rtState, forced)
finalizedBlock, err := app.tryFinalizeExecutorCommits(ctx, rtState, forced)
if err != nil {
return fmt.Errorf("failed to finalize executor commits: %w", err)
}
if finalizedBlock == nil {
return nil
}

app.postProcessFinalizedBlock(ctx, rtState, finalizedBlock)
if err = app.postProcessFinalizedBlock(ctx, rtState, finalizedBlock); err != nil {
return fmt.Errorf("failed to post process finalized block: %w", err)
}
return nil
}

Expand Down
36 changes: 29 additions & 7 deletions go/consensus/tendermint/apps/roothash/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,27 +57,49 @@ func NewImmutableState(ctx context.Context, state api.ApplicationQueryState, ver
return &ImmutableState{is}, nil
}

// RuntimesWithRoundTimeouts returns the runtimes that have round timeouts scheduled at the given
// height.
func (s *ImmutableState) RuntimesWithRoundTimeouts(ctx context.Context, height int64) ([]common.Namespace, error) {
func (s *ImmutableState) runtimesWithRoundTimeouts(ctx context.Context, height *int64) ([]common.Namespace, []int64, error) {
it := s.is.NewIterator(ctx)
defer it.Close()

var startKey []byte
if height == nil {
startKey = roundTimeoutQueueKeyFmt.Encode()
} else {
startKey = roundTimeoutQueueKeyFmt.Encode(height)
}

var runtimeIDs []common.Namespace
for it.Seek(roundTimeoutQueueKeyFmt.Encode(height)); it.Valid(); it.Next() {
var heights []int64
for it.Seek(startKey); it.Valid(); it.Next() {
var decHeight int64
if !roundTimeoutQueueKeyFmt.Decode(it.Key(), &decHeight) || decHeight != height {
if !roundTimeoutQueueKeyFmt.Decode(it.Key(), &decHeight) || (height != nil && decHeight != *height) {
break
}

var runtimeID common.Namespace
if err := runtimeID.UnmarshalBinary(it.Value()); err != nil {
return nil, api.UnavailableStateError(err)
return nil, nil, api.UnavailableStateError(err)
}

runtimeIDs = append(runtimeIDs, runtimeID)
if height == nil {
heights = append(heights, decHeight)
}
}
return runtimeIDs, nil
return runtimeIDs, heights, nil
}

// RuntimesWithRoundTimeouts returns the runtimes that have round timeouts scheduled at the given
// height.
func (s *ImmutableState) RuntimesWithRoundTimeouts(ctx context.Context, height int64) ([]common.Namespace, error) {
runtimeIDs, _, err := s.runtimesWithRoundTimeouts(ctx, &height)
return runtimeIDs, err
}

// RuntimesWithRoundTimeoutsAny returns the runtimes that have round timeouts scheduled at any
// height.
func (s *ImmutableState) RuntimesWithRoundTimeoutsAny(ctx context.Context) ([]common.Namespace, []int64, error) {
return s.runtimesWithRoundTimeouts(ctx, nil)
}

// RuntimeState returns the roothash runtime state for a specific runtime.
Expand Down
17 changes: 17 additions & 0 deletions go/consensus/tendermint/apps/supplementarysanity/checks.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,14 +83,31 @@ func checkRootHash(ctx *abciAPI.Context, now epochtime.EpochTime) error {
}

blocks := make(map[common.Namespace]*block.Block)
runtimesByID := make(map[common.Namespace]*roothashState.RuntimeState)
for _, rt := range runtimes {
blocks[rt.Runtime.ID] = rt.CurrentBlock
runtimesByID[rt.Runtime.ID] = rt
}
err = roothash.SanityCheckBlocks(blocks)
if err != nil {
return fmt.Errorf("SanityCheckBlocks: %w", err)
}

// Make sure that runtime timeout state is consistent with actual timeouts.
runtimeIDs, heights, err := st.RuntimesWithRoundTimeoutsAny(ctx)
if err != nil {
return fmt.Errorf("RuntimesWithRoundTimeoutsAny: %w", err)
}
for i, id := range runtimeIDs {
height := heights[i]
if height < ctx.BlockHeight() {
return fmt.Errorf("round timeout for runtime %s was scheduled at %d but did not trigger", id, height)
}
if !runtimesByID[id].ExecutorPool.IsTimeout(height) {
return fmt.Errorf("runtime %s scheduled for timeout at %d but would not actually trigger", id, height)
}
}

// nothing to check yet
return nil
}
Expand Down
Loading