From 0e220820d520dad09d37de05bff6435a55c46a6a Mon Sep 17 00:00:00 2001 From: Jernej Kos Date: Tue, 25 Aug 2020 19:06:26 +0200 Subject: [PATCH 1/2] go/consensus/tendermint: Add roothash sanity checks --- .../tendermint/apps/roothash/state/state.go | 36 +++++++++++++++---- .../apps/supplementarysanity/checks.go | 17 +++++++++ 2 files changed, 46 insertions(+), 7 deletions(-) diff --git a/go/consensus/tendermint/apps/roothash/state/state.go b/go/consensus/tendermint/apps/roothash/state/state.go index dd740218e8b..071aebe1cda 100644 --- a/go/consensus/tendermint/apps/roothash/state/state.go +++ b/go/consensus/tendermint/apps/roothash/state/state.go @@ -57,27 +57,49 @@ func NewImmutableState(ctx context.Context, state api.ApplicationQueryState, ver return &ImmutableState{is}, nil } -// RuntimesWithRoundTimeouts returns the runtimes that have round timeouts scheduled at the given -// height. -func (s *ImmutableState) RuntimesWithRoundTimeouts(ctx context.Context, height int64) ([]common.Namespace, error) { +func (s *ImmutableState) runtimesWithRoundTimeouts(ctx context.Context, height *int64) ([]common.Namespace, []int64, error) { it := s.is.NewIterator(ctx) defer it.Close() + var startKey []byte + if height == nil { + startKey = roundTimeoutQueueKeyFmt.Encode() + } else { + startKey = roundTimeoutQueueKeyFmt.Encode(height) + } + var runtimeIDs []common.Namespace - for it.Seek(roundTimeoutQueueKeyFmt.Encode(height)); it.Valid(); it.Next() { + var heights []int64 + for it.Seek(startKey); it.Valid(); it.Next() { var decHeight int64 - if !roundTimeoutQueueKeyFmt.Decode(it.Key(), &decHeight) || decHeight != height { + if !roundTimeoutQueueKeyFmt.Decode(it.Key(), &decHeight) || (height != nil && decHeight != *height) { break } var runtimeID common.Namespace if err := runtimeID.UnmarshalBinary(it.Value()); err != nil { - return nil, api.UnavailableStateError(err) + return nil, nil, api.UnavailableStateError(err) } runtimeIDs = append(runtimeIDs, runtimeID) + if height == nil { + heights = append(heights, decHeight) + } } - return runtimeIDs, nil + return runtimeIDs, heights, nil +} + +// RuntimesWithRoundTimeouts returns the runtimes that have round timeouts scheduled at the given +// height. +func (s *ImmutableState) RuntimesWithRoundTimeouts(ctx context.Context, height int64) ([]common.Namespace, error) { + runtimeIDs, _, err := s.runtimesWithRoundTimeouts(ctx, &height) + return runtimeIDs, err +} + +// RuntimesWithRoundTimeoutsAny returns the runtimes that have round timeouts scheduled at any +// height. +func (s *ImmutableState) RuntimesWithRoundTimeoutsAny(ctx context.Context) ([]common.Namespace, []int64, error) { + return s.runtimesWithRoundTimeouts(ctx, nil) } // RuntimeState returns the roothash runtime state for a specific runtime. diff --git a/go/consensus/tendermint/apps/supplementarysanity/checks.go b/go/consensus/tendermint/apps/supplementarysanity/checks.go index 4a05236033b..f36f7d17217 100644 --- a/go/consensus/tendermint/apps/supplementarysanity/checks.go +++ b/go/consensus/tendermint/apps/supplementarysanity/checks.go @@ -83,14 +83,31 @@ func checkRootHash(ctx *abciAPI.Context, now epochtime.EpochTime) error { } blocks := make(map[common.Namespace]*block.Block) + runtimesByID := make(map[common.Namespace]*roothashState.RuntimeState) for _, rt := range runtimes { blocks[rt.Runtime.ID] = rt.CurrentBlock + runtimesByID[rt.Runtime.ID] = rt } err = roothash.SanityCheckBlocks(blocks) if err != nil { return fmt.Errorf("SanityCheckBlocks: %w", err) } + // Make sure that runtime timeout state is consistent with actual timeouts. + runtimeIDs, heights, err := st.RuntimesWithRoundTimeoutsAny(ctx) + if err != nil { + return fmt.Errorf("RuntimesWithRoundTimeoutsAny: %w", err) + } + for i, id := range runtimeIDs { + height := heights[i] + if height < ctx.BlockHeight() { + return fmt.Errorf("round timeout for runtime %s was scheduled at %d but did not trigger", id, height) + } + if !runtimesByID[id].ExecutorPool.IsTimeout(height) { + return fmt.Errorf("runtime %s scheduled for timeout at %d but would not actually trigger", id, height) + } + } + // nothing to check yet return nil } From a7c5872e3be394e932c71d27cf6d54fc1f300083 Mon Sep 17 00:00:00 2001 From: Jernej Kos Date: Tue, 25 Aug 2020 19:08:04 +0200 Subject: [PATCH 2/2] go/consensus/tendermint/apps/roothash: Clear timeout in emitEmptyBlock --- .changelog/3213.trivial.md | 1 + .../tendermint/apps/roothash/roothash.go | 60 ++++++--- go/roothash/tests/tester.go | 117 +++++++++++++++--- 3 files changed, 143 insertions(+), 35 deletions(-) create mode 100644 .changelog/3213.trivial.md diff --git a/.changelog/3213.trivial.md b/.changelog/3213.trivial.md new file mode 100644 index 00000000000..efc82d0c21b --- /dev/null +++ b/.changelog/3213.trivial.md @@ -0,0 +1 @@ +go/consensus/tendermint/apps/roothash: Clear timeout in emitEmptyBlock diff --git a/go/consensus/tendermint/apps/roothash/roothash.go b/go/consensus/tendermint/apps/roothash/roothash.go index b9f346da76c..534b42eb5b0 100644 --- a/go/consensus/tendermint/apps/roothash/roothash.go +++ b/go/consensus/tendermint/apps/roothash/roothash.go @@ -155,7 +155,9 @@ func (app *rootHashApplication) onCommitteeChanged(ctx *tmapi.Context, epoch epo // Emit an empty epoch transition block in the new round. This is required so that // the clients can be sure what state is final when an epoch transition occurs. - app.emitEmptyBlock(ctx, rtState, block.EpochTransition) + if err = app.emitEmptyBlock(ctx, rtState, block.EpochTransition); err != nil { + return fmt.Errorf("failed to emit empty block: %w", err) + } // Set the executor pool. rtState.ExecutorPool = executorPool @@ -189,7 +191,9 @@ func (app *rootHashApplication) suspendUnpaidRuntime( rtState.ExecutorPool = nil // Emity an empty block signalling that the runtime was suspended. - app.emitEmptyBlock(ctx, rtState, block.Suspended) + if err := app.emitEmptyBlock(ctx, rtState, block.Suspended); err != nil { + return fmt.Errorf("failed to emit empty block: %w", err) + } return nil } @@ -230,12 +234,19 @@ func (app *rootHashApplication) prepareNewCommittees( return } -func (app *rootHashApplication) emitEmptyBlock(ctx *tmapi.Context, runtime *roothashState.RuntimeState, hdrType block.HeaderType) { +func (app *rootHashApplication) emitEmptyBlock(ctx *tmapi.Context, runtime *roothashState.RuntimeState, hdrType block.HeaderType) error { blk := block.NewEmptyBlock(runtime.CurrentBlock, uint64(ctx.Now().Unix()), hdrType) runtime.CurrentBlock = blk runtime.CurrentBlockHeight = ctx.BlockHeight() if runtime.ExecutorPool != nil { + // Clear timeout if there was one scheduled. + if runtime.ExecutorPool.NextTimeout != commitment.TimeoutNever { + state := roothashState.NewMutableState(ctx.State()) + if err := state.ClearRoundTimeout(ctx, runtime.Runtime.ID, runtime.ExecutorPool.NextTimeout); err != nil { + return fmt.Errorf("failed to clear round timeout: %w", err) + } + } runtime.ExecutorPool.ResetCommitments() } @@ -248,6 +259,7 @@ func (app *rootHashApplication) emitEmptyBlock(ctx *tmapi.Context, runtime *root Attribute(KeyFinalized, cbor.Marshal(tagV)). Attribute(KeyRuntimeID, ValueRuntimeID(runtime.Runtime.ID)), ) + return nil } func (app *rootHashApplication) ExecuteTx(ctx *tmapi.Context, tx *transaction.Transaction) error { @@ -413,11 +425,13 @@ func (app *rootHashApplication) processRoundTimeout(ctx *tmapi.Context, state *r return nil } -func (app *rootHashApplication) tryFinalizeExecutor( +// tryFinalizeExecutorCommits tries to finalize the executor commitments into a new runtime block. +// The caller must take care of clearing and scheduling the round timeouts. +func (app *rootHashApplication) tryFinalizeExecutorCommits( ctx *tmapi.Context, rtState *roothashState.RuntimeState, forced bool, -) *block.Block { +) (*block.Block, error) { runtime := rtState.Runtime blockNr := rtState.CurrentBlock.Header.Round @@ -437,16 +451,17 @@ func (app *rootHashApplication) tryFinalizeExecutor( blk.Header.StateRoot = hdr.StateRoot // Messages omitted on purpose. + // Timeout will be cleared by caller. rtState.ExecutorPool.ResetCommitments() - return blk + return blk, nil case commitment.ErrStillWaiting: // Need more commits. ctx.Logger().Debug("insufficient commitments for finality, waiting", "round", blockNr, ) - return nil + return nil, nil case commitment.ErrDiscrepancyDetected: // Discrepancy has been detected. ctx.Logger().Warn("executor discrepancy detected", @@ -465,7 +480,7 @@ func (app *rootHashApplication) tryFinalizeExecutor( Attribute(KeyExecutionDiscrepancyDetected, cbor.Marshal(tagV)). Attribute(KeyRuntimeID, ValueRuntimeID(runtime.ID)), ) - return nil + return nil, nil default: } @@ -476,11 +491,14 @@ func (app *rootHashApplication) tryFinalizeExecutor( logging.LogEvent, roothash.LogEventRoundFailed, ) - app.emitEmptyBlock(ctx, rtState, block.RoundFailed) - return nil + if err := app.emitEmptyBlock(ctx, rtState, block.RoundFailed); err != nil { + return nil, fmt.Errorf("failed to emit empty block: %w", err) + } + + return nil, nil } -func (app *rootHashApplication) postProcessFinalizedBlock(ctx *tmapi.Context, rtState *roothashState.RuntimeState, blk *block.Block) { +func (app *rootHashApplication) postProcessFinalizedBlock(ctx *tmapi.Context, rtState *roothashState.RuntimeState, blk *block.Block) error { sc := ctx.StartCheckpoint() defer sc.Close() @@ -498,9 +516,11 @@ func (app *rootHashApplication) postProcessFinalizedBlock(ctx *tmapi.Context, rt ) // Substitute empty block. - app.emitEmptyBlock(ctx, rtState, block.RoundFailed) + if err := app.emitEmptyBlock(ctx, rtState, block.RoundFailed); err != nil { + return fmt.Errorf("failed to emit empty block: %w", err) + } - return + return nil } } @@ -519,6 +539,7 @@ func (app *rootHashApplication) postProcessFinalizedBlock(ctx *tmapi.Context, rt Attribute(KeyFinalized, cbor.Marshal(tagV)). Attribute(KeyRuntimeID, ValueRuntimeID(rtState.Runtime.ID)), ) + return nil } func (app *rootHashApplication) tryFinalizeBlock( @@ -527,6 +548,10 @@ func (app *rootHashApplication) tryFinalizeBlock( forced bool, ) (err error) { defer func(previousTimeout int64) { + if err != nil { + return + } + // Do not re-arm the round timeout if the timeout has not changed. nextTimeout := rtState.ExecutorPool.NextTimeout if previousTimeout == nextTimeout { @@ -558,12 +583,17 @@ func (app *rootHashApplication) tryFinalizeBlock( } }(rtState.ExecutorPool.NextTimeout) - finalizedBlock := app.tryFinalizeExecutor(ctx, rtState, forced) + finalizedBlock, err := app.tryFinalizeExecutorCommits(ctx, rtState, forced) + if err != nil { + return fmt.Errorf("failed to finalize executor commits: %w", err) + } if finalizedBlock == nil { return nil } - app.postProcessFinalizedBlock(ctx, rtState, finalizedBlock) + if err = app.postProcessFinalizedBlock(ctx, rtState, finalizedBlock); err != nil { + return fmt.Errorf("failed to post process finalized block: %w", err) + } return nil } diff --git a/go/roothash/tests/tester.go b/go/roothash/tests/tester.go index c805069dee9..3e9afd62ce0 100644 --- a/go/roothash/tests/tester.go +++ b/go/roothash/tests/tester.go @@ -15,7 +15,6 @@ import ( "github.com/oasisprotocol/oasis-core/go/common/crypto/hash" "github.com/oasisprotocol/oasis-core/go/common/crypto/signature" "github.com/oasisprotocol/oasis-core/go/common/identity" - "github.com/oasisprotocol/oasis-core/go/common/pubsub" consensusAPI "github.com/oasisprotocol/oasis-core/go/consensus/api" epochtime "github.com/oasisprotocol/oasis-core/go/epochtime/api" epochtimeTests "github.com/oasisprotocol/oasis-core/go/epochtime/tests" @@ -100,9 +99,11 @@ func RootHashImplementationTests(t *testing.T, backend api.Backend, consensus co t.Run("RoundTimeout", func(t *testing.T) { testRoundTimeout(t, backend, consensus, identity, rtStates) }) - } - // TODO: Test the various failures. + t.Run("RoundTimeoutWithEpochTransition", func(t *testing.T) { + testRoundTimeoutWithEpochTransition(t, backend, consensus, identity, rtStates) + }) + } } func testGenesisBlock(t *testing.T, backend api.Backend, state *runtimeState) { @@ -153,17 +154,11 @@ func testEpochTransitionBlock(t *testing.T, backend api.Backend, consensus conse v.genesisBlock = genesisBlock } - // Advance the epoch, get the committee. - epoch, err := consensus.EpochTime().GetEpoch(context.Background(), consensusAPI.HeightLatest) - require.NoError(err, "GetEpoch") - // Subscribe to blocks for all of the runtimes. var blkChannels []<-chan *api.AnnotatedBlock for i := range states { v := states[i] - var ch <-chan *api.AnnotatedBlock - var sub *pubsub.Subscription - ch, sub, err = backend.WatchBlocks(v.rt.Runtime.ID) + ch, sub, err := backend.WatchBlocks(v.rt.Runtime.ID) require.NoError(err, "WatchBlocks") defer sub.Close() @@ -177,27 +172,33 @@ func testEpochTransitionBlock(t *testing.T, backend api.Backend, consensus conse // Check for the expected post-epoch transition events. for i, state := range states { blkCh := blkChannels[i] - state.testEpochTransitionBlock(t, consensus.Scheduler(), epoch, blkCh) + state.testEpochTransitionBlock(t, consensus, blkCh) } // Check if GetGenesisBlock still returns the correct genesis block. for i := range states { - var blk *block.Block - blk, err = backend.GetGenesisBlock(context.Background(), states[i].rt.Runtime.ID, consensusAPI.HeightLatest) + blk, err := backend.GetGenesisBlock(context.Background(), states[i].rt.Runtime.ID, consensusAPI.HeightLatest) require.NoError(err, "GetGenesisBlock") require.EqualValues(0, blk.Header.Round, "retrieved block is genesis block") } } -func (s *runtimeState) testEpochTransitionBlock(t *testing.T, scheduler scheduler.Backend, epoch epochtime.EpochTime, ch <-chan *api.AnnotatedBlock) { - require := require.New(t) - +func (s *runtimeState) refreshCommittees(t *testing.T, consensus consensusAPI.Backend) { nodes := make(map[signature.PublicKey]*registryTests.TestNode) for _, node := range s.rt.TestNodes() { nodes[node.Node.ID] = node } - s.executorCommittee, s.storageCommittee = mustGetCommittee(t, s.rt, epoch+1, scheduler, nodes) + epoch, err := consensus.EpochTime().GetEpoch(context.Background(), consensusAPI.HeightLatest) + require.NoError(t, err, "GetEpoch") + + s.executorCommittee, s.storageCommittee = mustGetCommittee(t, s.rt, epoch, consensus.Scheduler(), nodes) +} + +func (s *runtimeState) testEpochTransitionBlock(t *testing.T, consensus consensusAPI.Backend, ch <-chan *api.AnnotatedBlock) { + require := require.New(t) + + s.refreshCommittees(t, consensus) // Wait to receive an epoch transition block. for { @@ -227,13 +228,14 @@ func testSuccessfulRound(t *testing.T, backend api.Backend, consensus consensusA } } -func (s *runtimeState) generateExecutorCommitments(t *testing.T, identity *identity.Identity, child *block.Block) ( +func (s *runtimeState) generateExecutorCommitments(t *testing.T, consensus consensusAPI.Backend, identity *identity.Identity, child *block.Block) ( parent *block.Block, executorCommits []commitment.ExecutorCommitment, executorNodes []*registryTests.TestNode, ) { require := require.New(t) + s.refreshCommittees(t, consensus) rt, executorCommittee := s.rt, s.executorCommittee dataDir, err := ioutil.TempDir("", "oasis-storage-test_") @@ -357,7 +359,7 @@ func (s *runtimeState) testSuccessfulRound(t *testing.T, backend api.Backend, co defer cancel() // Generate and submit all executor commitments. - parent, executorCommits, executorNodes := s.generateExecutorCommitments(t, identity, child) + parent, executorCommits, executorNodes := s.generateExecutorCommitments(t, consensus, identity, child) tx := api.NewExecutorCommitTx(0, nil, s.rt.Runtime.ID, executorCommits) err = consensusAPI.SignAndSubmitTx(ctx, consensus, executorNodes[0].Signer, tx) require.NoError(err, "ExecutorCommit") @@ -432,7 +434,7 @@ func (s *runtimeState) testRoundTimeout(t *testing.T, backend api.Backend, conse defer cancel() // Only submit a single commitment to cause a timeout. - _, executorCommits, executorNodes := s.generateExecutorCommitments(t, identity, child) + _, executorCommits, executorNodes := s.generateExecutorCommitments(t, consensus, identity, child) tx := api.NewExecutorCommitTx(0, nil, s.rt.Runtime.ID, executorCommits[:1]) err = consensusAPI.SignAndSubmitTx(ctx, consensus, executorNodes[0].Signer, tx) require.NoError(err, "ExecutorCommit") @@ -486,6 +488,81 @@ WaitForRoundTimeoutBlocks: } } +func testRoundTimeoutWithEpochTransition(t *testing.T, backend api.Backend, consensus consensusAPI.Backend, identity *identity.Identity, states []*runtimeState) { + for _, state := range states { + state.testRoundTimeoutWithEpochTransition(t, backend, consensus, identity) + } +} + +func (s *runtimeState) testRoundTimeoutWithEpochTransition(t *testing.T, backend api.Backend, consensus consensusAPI.Backend, identity *identity.Identity) { + require := require.New(t) + + child, err := backend.GetLatestBlock(context.Background(), s.rt.Runtime.ID, consensusAPI.HeightLatest) + require.NoError(err, "GetLatestBlock") + + ch, sub, err := backend.WatchBlocks(s.rt.Runtime.ID) + require.NoError(err, "WatchBlocks") + defer sub.Close() + + ctx, cancel := context.WithTimeout(context.Background(), recvTimeout) + defer cancel() + + // Only submit a single commitment to cause a timeout. + _, executorCommits, executorNodes := s.generateExecutorCommitments(t, consensus, identity, child) + tx := api.NewExecutorCommitTx(0, nil, s.rt.Runtime.ID, executorCommits[:1]) + err = consensusAPI.SignAndSubmitTx(ctx, consensus, executorNodes[0].Signer, tx) + require.NoError(err, "ExecutorCommit") + + consBlkCh, consBlkSub, err := consensus.WatchBlocks(context.Background()) + require.NoError(err, "WatchBlocks") + defer consBlkSub.Close() + + var startBlock int64 +WaitForRoundTimeoutBlocks: + for { + select { + case blk := <-consBlkCh: + if blk == nil { + t.Fatalf("block channel closed before reaching round timeout") + } + if startBlock == 0 { + startBlock = blk.Height + } + if blk.Height-startBlock > s.rt.Runtime.Executor.RoundTimeout/2 { + break WaitForRoundTimeoutBlocks + } + case <-time.After(recvTimeout): + t.Fatalf("failed to receive consensus block") + } + } + + // Trigger an epoch transition while the timeout is armed. + timeSource := consensus.EpochTime().(epochtime.SetableBackend) + epochtimeTests.MustAdvanceEpoch(t, timeSource, 1) + + // Ensure that the epoch transition was processed correctly. + for { + select { + case blk := <-ch: + header := blk.Block.Header + + // Skip initial round. + if header.Round == child.Header.Round { + continue + } + + // Next round must be an epoch transition. + require.EqualValues(child.Header.Round+1, header.Round, "block round") + require.EqualValues(block.EpochTransition, header.HeaderType, "block header type must be EpochTransition") + + // Nothing more to do after the block was received. + return + case <-time.After(recvTimeout): + t.Fatalf("failed to receive runtime block") + } + } +} + type testCommittee struct { committee *scheduler.Committee workers []*registryTests.TestNode