diff --git a/go/oasis-node/node_test.go b/go/oasis-node/node_test.go index 4d545ceb6c5..08e5d80b3ec 100644 --- a/go/oasis-node/node_test.go +++ b/go/oasis-node/node_test.go @@ -356,7 +356,7 @@ func testSchedulerClient(t *testing.T, node *testNode) { } func testStaking(t *testing.T, node *testNode) { - stakingTests.StakingImplementationTests(t, node.Consensus.Staking(), node.Consensus, node.Identity, node.entity, testRuntimeID) + stakingTests.StakingImplementationTests(t, node.Consensus.Staking(), node.Consensus, node.Identity, node.entity) } func testStakingClient(t *testing.T, node *testNode) { diff --git a/go/oasis-test-runner/scenario/e2e/runtime/helpers_runtime.go b/go/oasis-test-runner/scenario/e2e/runtime/helpers_runtime.go index 6984545084c..abe4cb483e0 100644 --- a/go/oasis-test-runner/scenario/e2e/runtime/helpers_runtime.go +++ b/go/oasis-test-runner/scenario/e2e/runtime/helpers_runtime.go @@ -21,6 +21,7 @@ import ( "github.com/oasisprotocol/oasis-core/go/oasis-test-runner/rust" "github.com/oasisprotocol/oasis-core/go/oasis-test-runner/scenario/e2e" registry "github.com/oasisprotocol/oasis-core/go/registry/api" + roothash "github.com/oasisprotocol/oasis-core/go/roothash/api" commonWorker "github.com/oasisprotocol/oasis-core/go/worker/common/api" ) @@ -549,3 +550,51 @@ func (sc *Scenario) ensureReplicationWorked(ctx context.Context, km *oasis.Keyma } return fmt.Errorf("consensus state missing km status") } + +func (sc *Scenario) WaitNextRuntimeBlock(ch <-chan *roothash.AnnotatedBlock) (*roothash.AnnotatedBlock, error) { + sc.Logger.Info("waiting for next runtime block") + + timer := time.NewTimer(10 * time.Second) + + for { + select { + case blk, ok := <-ch: + if !ok { + return nil, fmt.Errorf("runtime block channel closed") + } + return blk, nil + case <-timer.C: + return nil, fmt.Errorf("failed to receive runtime block") + } + } +} + +func (sc *Scenario) WaitRuntimeBlock(ch <-chan *roothash.AnnotatedBlock, round uint64) (*roothash.AnnotatedBlock, error) { + sc.Logger.Info("waiting for runtime block", + "round", round, + ) + + timer := time.NewTimer(10 * time.Second) + + for { + select { + case blk, ok := <-ch: + if !ok { + return nil, fmt.Errorf("runtime block channel closed") + } + sc.Logger.Info("new runtime block", + "round", blk.Block.Header.Round, + ) + if blk.Block.Header.Round > round { + return nil, fmt.Errorf("runtime block round in the past") + } + if blk.Block.Header.Round < round { + // Skip old rounds. + continue + } + return blk, nil + case <-timer.C: + return nil, fmt.Errorf("failed to receive runtime block") + } + } +} diff --git a/go/oasis-test-runner/scenario/e2e/runtime/runtime_governance.go b/go/oasis-test-runner/scenario/e2e/runtime/runtime_governance.go index 49b1959271d..b0875093c66 100644 --- a/go/oasis-test-runner/scenario/e2e/runtime/runtime_governance.go +++ b/go/oasis-test-runner/scenario/e2e/runtime/runtime_governance.go @@ -287,18 +287,9 @@ func (sc *runtimeGovernanceImpl) Run(ctx context.Context, _ *env.Env) error { rtNonce++ // Wait for next round. - for { - select { - case blk := <-blkCh: - sc.Logger.Debug("round transition", "round", blk.Block.Header.Round) - if blk.Block.Header.Round <= meta.Round { - continue - } - case <-time.After(waitTimeout): - return fmt.Errorf("timed out waiting for runtime rounds") - } - - break + _, err = sc.WaitRuntimeBlock(blkCh, meta.Round+1) + if err != nil { + return err } // Verify that the descriptor was updated. @@ -346,18 +337,9 @@ func (sc *runtimeGovernanceImpl) Run(ctx context.Context, _ *env.Env) error { // The bogus update should cause the runtime to panic, which will result // in no more blocks being produced. - for { - select { - case blk := <-blkCh: - sc.Logger.Debug("round transition", "round", blk.Block.Header.Round) - if blk.Block.Header.Round <= meta.Round { - continue - } - return fmt.Errorf("unexpected block, the bogus update should cause the runtime to panic") - case <-time.After(waitTimeout): - } - - break + blk, err := sc.WaitRuntimeBlock(blkCh, meta.Round+1) + if err == nil { + return fmt.Errorf("unexpected round %d, the bogus update should cause the runtime to panic", blk.Block.Header.Round) } sc.Logger.Info("checking that the update didn't succeed") diff --git a/go/oasis-test-runner/scenario/e2e/runtime/runtime_message.go b/go/oasis-test-runner/scenario/e2e/runtime/runtime_message.go index c3a78bb110b..e0a05eed5b4 100644 --- a/go/oasis-test-runner/scenario/e2e/runtime/runtime_message.go +++ b/go/oasis-test-runner/scenario/e2e/runtime/runtime_message.go @@ -3,7 +3,6 @@ package runtime import ( "context" "fmt" - "time" beacon "github.com/oasisprotocol/oasis-core/go/beacon/api" "github.com/oasisprotocol/oasis-core/go/oasis-test-runner/env" @@ -14,12 +13,8 @@ import ( staking "github.com/oasisprotocol/oasis-core/go/staking/api" ) -var ( - // RuntimeMessage is the runtime message scenario. - RuntimeMessage scenario.Scenario = newRuntimeMessage() - - waitTimeout = 10 * time.Second -) +// RuntimeMessage is the runtime message scenario. +var RuntimeMessage scenario.Scenario = newRuntimeMessage() type runtimeMessageImpl struct { Scenario @@ -91,50 +86,47 @@ func (sc *runtimeMessageImpl) Run(ctx context.Context, _ *env.Env) error { ) latestRound := txMetaResponse.Round - sc.Logger.Debug("watching runtime round transitions") - var reachedMsgRound bool - for { - select { - case blk := <-blkCh: - round := blk.Block.Header.Round - sc.Logger.Debug("round transition", "round", round, "header", blk.Block.Header) - switch { - case round < latestRound: - // Skip old rounds. - continue - case round > latestRound+1: - // Only two rounds are expected. - return fmt.Errorf("unexpected runtime round: %d", round) - default: - if ht := blk.Block.Header.HeaderType; ht != block.Normal { - return fmt.Errorf("expected normal round, got: %d", ht) - } - txs, err := c.GetTransactions(ctx, &api.GetTransactionsRequest{ - RuntimeID: KeyValueRuntimeID, - Round: round, - }) - if err != nil { - return err - } - switch round { - case latestRound: - // Round with the submitted consensus_transfer transaction. - if len(txs) != 1 { - return fmt.Errorf("expected 1 transaction at round: %d, got: %d", round, len(txs)) - } - case latestRound + 1: - // Round with no transactions - triggered due to message results. - if len(txs) != 0 { - return fmt.Errorf("expected 0 transactions at round: %d, got: %d", round, len(txs)) - } - reachedMsgRound = true - } - } - case <-time.After(waitTimeout): - if !reachedMsgRound { - return fmt.Errorf("timed out waiting for runtime rounds") - } - return nil - } + // Round with the submitted consensus_transfer transaction. + blk, err := sc.WaitRuntimeBlock(blkCh, latestRound) + if err != nil { + return err + } + if ht := blk.Block.Header.HeaderType; ht != block.Normal { + return fmt.Errorf("expected normal round, got: %d", ht) + } + + txs, err := c.GetTransactions(ctx, &api.GetTransactionsRequest{ + RuntimeID: blk.Block.Header.Namespace, + Round: blk.Block.Header.Round, + }) + if err != nil { + return err + } + + if len(txs) != 1 { + return fmt.Errorf("expected 1 transaction at round: %d, got: %d", blk.Block.Header.Round, len(txs)) } + + // Round with no transactions - triggered due to message results. + blk, err = sc.WaitRuntimeBlock(blkCh, blk.Block.Header.Round+1) + if err != nil { + return err + } + if ht := blk.Block.Header.HeaderType; ht != block.Normal { + return fmt.Errorf("expected normal round, got: %d", ht) + } + + txs, err = c.GetTransactions(ctx, &api.GetTransactionsRequest{ + RuntimeID: blk.Block.Header.Namespace, + Round: blk.Block.Header.Round, + }) + if err != nil { + return err + } + + if len(txs) != 0 { + return fmt.Errorf("expected 0 transactions at round: %d, got: %d", blk.Block.Header.Round, len(txs)) + } + + return nil } diff --git a/go/roothash/tests/tester.go b/go/roothash/tests/tester.go index e8a6a51a203..53cd1775b08 100644 --- a/go/roothash/tests/tester.go +++ b/go/roothash/tests/tester.go @@ -1001,7 +1001,7 @@ func nextRuntimeBlock(ch <-chan *api.AnnotatedBlock, start *block.Block) (*api.A select { case blk, ok := <-ch: if !ok { - return nil, fmt.Errorf("block channel closed") + return nil, fmt.Errorf("runtime block channel closed") } if start != nil && blk.Block.Header.Round < start.Header.Round { continue @@ -1019,7 +1019,7 @@ func nextConsensusBlock(ch <-chan *consensusAPI.Block) (*consensusAPI.Block, err select { case blk, ok := <-ch: if !ok { - return nil, fmt.Errorf("block channel closed") + return nil, fmt.Errorf("consensus block channel closed") } return blk, nil case <-time.After(recvTimeout): diff --git a/go/staking/tests/tester.go b/go/staking/tests/tester.go index d4227af71bd..0aef99a6ca3 100644 --- a/go/staking/tests/tester.go +++ b/go/staking/tests/tester.go @@ -12,14 +12,12 @@ import ( beacon "github.com/oasisprotocol/oasis-core/go/beacon/api" beaconTests "github.com/oasisprotocol/oasis-core/go/beacon/tests" - "github.com/oasisprotocol/oasis-core/go/common" "github.com/oasisprotocol/oasis-core/go/common/entity" "github.com/oasisprotocol/oasis-core/go/common/identity" "github.com/oasisprotocol/oasis-core/go/common/quantity" consensusAPI "github.com/oasisprotocol/oasis-core/go/consensus/api" "github.com/oasisprotocol/oasis-core/go/consensus/api/transaction" cmtTests "github.com/oasisprotocol/oasis-core/go/consensus/cometbft/tests" - "github.com/oasisprotocol/oasis-core/go/roothash/api/block" "github.com/oasisprotocol/oasis-core/go/staking/api" ) @@ -144,7 +142,6 @@ func StakingImplementationTests( consensus consensusAPI.Backend, identity *identity.Identity, entity *entity.Entity, - runtimeID common.Namespace, ) { for _, tc := range []struct { n string @@ -169,7 +166,7 @@ func StakingImplementationTests( // Separate test as it requires some arguments that others don't. t.Run("SlashConsensusEquivocation", func(t *testing.T) { state := newStakingTestsState(t, backend) - testSlashConsensusEquivocation(t, state, backend, consensus, identity, entity, runtimeID) + testSlashConsensusEquivocation(t, state, backend, consensus, identity, entity) }) } @@ -1185,7 +1182,6 @@ func testSlashConsensusEquivocation( consensus consensusAPI.Backend, ident *identity.Identity, ent *entity.Entity, - runtimeID common.Namespace, ) { ctx := context.Background() require := require.New(t) @@ -1231,11 +1227,6 @@ func testSlashConsensusEquivocation( t.Fatalf("failed to receive escrow event") } - // Subscribe to roothash blocks. - blocksCh, blocksSub, err := consensus.RootHash().WatchBlocks(ctx, runtimeID) - require.NoError(err, "WatchBlocks") - defer blocksSub.Close() - // Broadcast evidence. This is CometBFT-specific, if we ever have more than one // consensus backend, we need to change this part. blk, err := consensus.GetBlock(ctx, 1) @@ -1269,12 +1260,4 @@ WaitLoop: } } // XXX: no freezing is configured for this. - - // Wait for roothash block as re-scheduling must have taken place due to slashing. - select { - case blk := <-blocksCh: - require.Equal(block.EpochTransition, blk.Block.Header.HeaderType) - case <-time.After(recvTimeout): - t.Fatalf("failed to receive roothash block") - } } diff --git a/go/worker/compute/executor/committee/node.go b/go/worker/compute/executor/committee/node.go index 3a3b3226a3c..1b24b3cd89e 100644 --- a/go/worker/compute/executor/committee/node.go +++ b/go/worker/compute/executor/committee/node.go @@ -396,15 +396,17 @@ func (n *Node) scheduleBatch(ctx context.Context, round uint64, force bool) { batch := n.commonNode.TxPool.GetSchedulingSuggestion(rtInfo.Features.ScheduleControl.InitialBatchSize) defer n.commonNode.TxPool.FinishScheduling() switch { + case force: + // Batch flush timeout expired, schedule empty batch. case len(batch) > 0: // We have some transactions, schedule batch. - case force && len(n.roundResults.Messages) > 0: + case len(n.roundResults.Messages) > 0: // We have runtime message results (and batch timeout expired), schedule batch. - case force && inMsgMeta.Size > 0: + case inMsgMeta.Size > 0: // We have queued incoming runtime messages (and batch timeout expired), schedule batch. case n.rtState.LastNormalRound == n.rtState.GenesisBlock.Header.Round: // This is the runtime genesis, schedule batch. - case force && n.rtState.LastNormalHeight < n.epoch.GetEpochHeight(): + case n.rtState.LastNormalHeight < n.epoch.GetEpochHeight(): // No block in this epoch processed by runtime yet, schedule batch. default: // No need to schedule a batch. diff --git a/go/worker/compute/executor/tests/tester.go b/go/worker/compute/executor/tests/tester.go index 2de70a2299f..8aadaaaaeb1 100644 --- a/go/worker/compute/executor/tests/tester.go +++ b/go/worker/compute/executor/tests/tester.go @@ -158,7 +158,7 @@ func nextRuntimeBlock(ch <-chan *roothash.AnnotatedBlock, allowEmpty bool) (*api select { case blk, ok := <-ch: if !ok { - return nil, fmt.Errorf("block channel closed") + return nil, fmt.Errorf("runtime block channel closed") } if !allowEmpty && blk.Block.Header.IORoot.IsEmpty() { // Skip blocks without transactions.