Skip to content

Commit

Permalink
go/worker/compute: Emit empty blocks
Browse files Browse the repository at this point in the history
  • Loading branch information
kostko authored and peternose committed Oct 5, 2023
1 parent 70c2b1c commit 2a53cf8
Show file tree
Hide file tree
Showing 8 changed files with 109 additions and 101 deletions.
2 changes: 1 addition & 1 deletion go/oasis-node/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ func testSchedulerClient(t *testing.T, node *testNode) {
}

func testStaking(t *testing.T, node *testNode) {
stakingTests.StakingImplementationTests(t, node.Consensus.Staking(), node.Consensus, node.Identity, node.entity, testRuntimeID)
stakingTests.StakingImplementationTests(t, node.Consensus.Staking(), node.Consensus, node.Identity, node.entity)
}

func testStakingClient(t *testing.T, node *testNode) {
Expand Down
49 changes: 49 additions & 0 deletions go/oasis-test-runner/scenario/e2e/runtime/helpers_runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/oasisprotocol/oasis-core/go/oasis-test-runner/rust"
"github.com/oasisprotocol/oasis-core/go/oasis-test-runner/scenario/e2e"
registry "github.com/oasisprotocol/oasis-core/go/registry/api"
roothash "github.com/oasisprotocol/oasis-core/go/roothash/api"
commonWorker "github.com/oasisprotocol/oasis-core/go/worker/common/api"
)

Expand Down Expand Up @@ -549,3 +550,51 @@ func (sc *Scenario) ensureReplicationWorked(ctx context.Context, km *oasis.Keyma
}
return fmt.Errorf("consensus state missing km status")
}

func (sc *Scenario) WaitNextRuntimeBlock(ch <-chan *roothash.AnnotatedBlock) (*roothash.AnnotatedBlock, error) {
sc.Logger.Info("waiting for next runtime block")

timer := time.NewTimer(10 * time.Second)

for {
select {
case blk, ok := <-ch:
if !ok {
return nil, fmt.Errorf("runtime block channel closed")
}
return blk, nil
case <-timer.C:
return nil, fmt.Errorf("failed to receive runtime block")
}
}
}

func (sc *Scenario) WaitRuntimeBlock(ch <-chan *roothash.AnnotatedBlock, round uint64) (*roothash.AnnotatedBlock, error) {
sc.Logger.Info("waiting for runtime block",
"round", round,
)

timer := time.NewTimer(10 * time.Second)

for {
select {
case blk, ok := <-ch:
if !ok {
return nil, fmt.Errorf("runtime block channel closed")
}
sc.Logger.Info("new runtime block",
"round", blk.Block.Header.Round,
)
if blk.Block.Header.Round > round {
return nil, fmt.Errorf("runtime block round in the past")
}
if blk.Block.Header.Round < round {
// Skip old rounds.
continue
}
return blk, nil
case <-timer.C:
return nil, fmt.Errorf("failed to receive runtime block")
}
}
}
30 changes: 6 additions & 24 deletions go/oasis-test-runner/scenario/e2e/runtime/runtime_governance.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,18 +287,9 @@ func (sc *runtimeGovernanceImpl) Run(ctx context.Context, _ *env.Env) error {
rtNonce++

// Wait for next round.
for {
select {
case blk := <-blkCh:
sc.Logger.Debug("round transition", "round", blk.Block.Header.Round)
if blk.Block.Header.Round <= meta.Round {
continue
}
case <-time.After(waitTimeout):
return fmt.Errorf("timed out waiting for runtime rounds")
}

break
_, err = sc.WaitRuntimeBlock(blkCh, meta.Round+1)
if err != nil {
return err
}

// Verify that the descriptor was updated.
Expand Down Expand Up @@ -346,18 +337,9 @@ func (sc *runtimeGovernanceImpl) Run(ctx context.Context, _ *env.Env) error {

// The bogus update should cause the runtime to panic, which will result
// in no more blocks being produced.
for {
select {
case blk := <-blkCh:
sc.Logger.Debug("round transition", "round", blk.Block.Header.Round)
if blk.Block.Header.Round <= meta.Round {
continue
}
return fmt.Errorf("unexpected block, the bogus update should cause the runtime to panic")
case <-time.After(waitTimeout):
}

break
blk, err := sc.WaitRuntimeBlock(blkCh, meta.Round+1)
if err == nil {
return fmt.Errorf("unexpected round %d, the bogus update should cause the runtime to panic", blk.Block.Header.Round)
}

sc.Logger.Info("checking that the update didn't succeed")
Expand Down
96 changes: 44 additions & 52 deletions go/oasis-test-runner/scenario/e2e/runtime/runtime_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package runtime
import (
"context"
"fmt"
"time"

beacon "github.com/oasisprotocol/oasis-core/go/beacon/api"
"github.com/oasisprotocol/oasis-core/go/oasis-test-runner/env"
Expand All @@ -14,12 +13,8 @@ import (
staking "github.com/oasisprotocol/oasis-core/go/staking/api"
)

var (
// RuntimeMessage is the runtime message scenario.
RuntimeMessage scenario.Scenario = newRuntimeMessage()

waitTimeout = 10 * time.Second
)
// RuntimeMessage is the runtime message scenario.
var RuntimeMessage scenario.Scenario = newRuntimeMessage()

type runtimeMessageImpl struct {
Scenario
Expand Down Expand Up @@ -91,50 +86,47 @@ func (sc *runtimeMessageImpl) Run(ctx context.Context, _ *env.Env) error {
)
latestRound := txMetaResponse.Round

sc.Logger.Debug("watching runtime round transitions")
var reachedMsgRound bool
for {
select {
case blk := <-blkCh:
round := blk.Block.Header.Round
sc.Logger.Debug("round transition", "round", round, "header", blk.Block.Header)
switch {
case round < latestRound:
// Skip old rounds.
continue
case round > latestRound+1:
// Only two rounds are expected.
return fmt.Errorf("unexpected runtime round: %d", round)
default:
if ht := blk.Block.Header.HeaderType; ht != block.Normal {
return fmt.Errorf("expected normal round, got: %d", ht)
}
txs, err := c.GetTransactions(ctx, &api.GetTransactionsRequest{
RuntimeID: KeyValueRuntimeID,
Round: round,
})
if err != nil {
return err
}
switch round {
case latestRound:
// Round with the submitted consensus_transfer transaction.
if len(txs) != 1 {
return fmt.Errorf("expected 1 transaction at round: %d, got: %d", round, len(txs))
}
case latestRound + 1:
// Round with no transactions - triggered due to message results.
if len(txs) != 0 {
return fmt.Errorf("expected 0 transactions at round: %d, got: %d", round, len(txs))
}
reachedMsgRound = true
}
}
case <-time.After(waitTimeout):
if !reachedMsgRound {
return fmt.Errorf("timed out waiting for runtime rounds")
}
return nil
}
// Round with the submitted consensus_transfer transaction.
blk, err := sc.WaitRuntimeBlock(blkCh, latestRound)
if err != nil {
return err
}
if ht := blk.Block.Header.HeaderType; ht != block.Normal {
return fmt.Errorf("expected normal round, got: %d", ht)
}

txs, err := c.GetTransactions(ctx, &api.GetTransactionsRequest{
RuntimeID: blk.Block.Header.Namespace,
Round: blk.Block.Header.Round,
})
if err != nil {
return err
}

if len(txs) != 1 {
return fmt.Errorf("expected 1 transaction at round: %d, got: %d", blk.Block.Header.Round, len(txs))
}

// Round with no transactions - triggered due to message results.
blk, err = sc.WaitRuntimeBlock(blkCh, blk.Block.Header.Round+1)
if err != nil {
return err
}
if ht := blk.Block.Header.HeaderType; ht != block.Normal {
return fmt.Errorf("expected normal round, got: %d", ht)
}

txs, err = c.GetTransactions(ctx, &api.GetTransactionsRequest{
RuntimeID: blk.Block.Header.Namespace,
Round: blk.Block.Header.Round,
})
if err != nil {
return err
}

if len(txs) != 0 {
return fmt.Errorf("expected 0 transactions at round: %d, got: %d", blk.Block.Header.Round, len(txs))
}

return nil
}
4 changes: 2 additions & 2 deletions go/roothash/tests/tester.go
Original file line number Diff line number Diff line change
Expand Up @@ -1001,7 +1001,7 @@ func nextRuntimeBlock(ch <-chan *api.AnnotatedBlock, start *block.Block) (*api.A
select {
case blk, ok := <-ch:
if !ok {
return nil, fmt.Errorf("block channel closed")
return nil, fmt.Errorf("runtime block channel closed")
}
if start != nil && blk.Block.Header.Round < start.Header.Round {
continue
Expand All @@ -1019,7 +1019,7 @@ func nextConsensusBlock(ch <-chan *consensusAPI.Block) (*consensusAPI.Block, err
select {
case blk, ok := <-ch:
if !ok {
return nil, fmt.Errorf("block channel closed")
return nil, fmt.Errorf("consensus block channel closed")
}
return blk, nil
case <-time.After(recvTimeout):
Expand Down
19 changes: 1 addition & 18 deletions go/staking/tests/tester.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,12 @@ import (

beacon "github.com/oasisprotocol/oasis-core/go/beacon/api"
beaconTests "github.com/oasisprotocol/oasis-core/go/beacon/tests"
"github.com/oasisprotocol/oasis-core/go/common"
"github.com/oasisprotocol/oasis-core/go/common/entity"
"github.com/oasisprotocol/oasis-core/go/common/identity"
"github.com/oasisprotocol/oasis-core/go/common/quantity"
consensusAPI "github.com/oasisprotocol/oasis-core/go/consensus/api"
"github.com/oasisprotocol/oasis-core/go/consensus/api/transaction"
cmtTests "github.com/oasisprotocol/oasis-core/go/consensus/cometbft/tests"
"github.com/oasisprotocol/oasis-core/go/roothash/api/block"
"github.com/oasisprotocol/oasis-core/go/staking/api"
)

Expand Down Expand Up @@ -144,7 +142,6 @@ func StakingImplementationTests(
consensus consensusAPI.Backend,
identity *identity.Identity,
entity *entity.Entity,
runtimeID common.Namespace,
) {
for _, tc := range []struct {
n string
Expand All @@ -169,7 +166,7 @@ func StakingImplementationTests(
// Separate test as it requires some arguments that others don't.
t.Run("SlashConsensusEquivocation", func(t *testing.T) {
state := newStakingTestsState(t, backend)
testSlashConsensusEquivocation(t, state, backend, consensus, identity, entity, runtimeID)
testSlashConsensusEquivocation(t, state, backend, consensus, identity, entity)
})
}

Expand Down Expand Up @@ -1185,7 +1182,6 @@ func testSlashConsensusEquivocation(
consensus consensusAPI.Backend,
ident *identity.Identity,
ent *entity.Entity,
runtimeID common.Namespace,
) {
ctx := context.Background()
require := require.New(t)
Expand Down Expand Up @@ -1231,11 +1227,6 @@ func testSlashConsensusEquivocation(
t.Fatalf("failed to receive escrow event")
}

// Subscribe to roothash blocks.
blocksCh, blocksSub, err := consensus.RootHash().WatchBlocks(ctx, runtimeID)
require.NoError(err, "WatchBlocks")
defer blocksSub.Close()

// Broadcast evidence. This is CometBFT-specific, if we ever have more than one
// consensus backend, we need to change this part.
blk, err := consensus.GetBlock(ctx, 1)
Expand Down Expand Up @@ -1269,12 +1260,4 @@ WaitLoop:
}
}
// XXX: no freezing is configured for this.

// Wait for roothash block as re-scheduling must have taken place due to slashing.
select {
case blk := <-blocksCh:
require.Equal(block.EpochTransition, blk.Block.Header.HeaderType)
case <-time.After(recvTimeout):
t.Fatalf("failed to receive roothash block")
}
}
8 changes: 5 additions & 3 deletions go/worker/compute/executor/committee/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,15 +396,17 @@ func (n *Node) scheduleBatch(ctx context.Context, round uint64, force bool) {
batch := n.commonNode.TxPool.GetSchedulingSuggestion(rtInfo.Features.ScheduleControl.InitialBatchSize)
defer n.commonNode.TxPool.FinishScheduling()
switch {
case force:
// Batch flush timeout expired, schedule empty batch.
case len(batch) > 0:
// We have some transactions, schedule batch.
case force && len(n.roundResults.Messages) > 0:
case len(n.roundResults.Messages) > 0:
// We have runtime message results (and batch timeout expired), schedule batch.
case force && inMsgMeta.Size > 0:
case inMsgMeta.Size > 0:
// We have queued incoming runtime messages (and batch timeout expired), schedule batch.
case n.rtState.LastNormalRound == n.rtState.GenesisBlock.Header.Round:
// This is the runtime genesis, schedule batch.
case force && n.rtState.LastNormalHeight < n.epoch.GetEpochHeight():
case n.rtState.LastNormalHeight < n.epoch.GetEpochHeight():
// No block in this epoch processed by runtime yet, schedule batch.
default:
// No need to schedule a batch.
Expand Down
2 changes: 1 addition & 1 deletion go/worker/compute/executor/tests/tester.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func nextRuntimeBlock(ch <-chan *roothash.AnnotatedBlock, allowEmpty bool) (*api
select {
case blk, ok := <-ch:
if !ok {
return nil, fmt.Errorf("block channel closed")
return nil, fmt.Errorf("runtime block channel closed")
}
if !allowEmpty && blk.Block.Header.IORoot.IsEmpty() {
// Skip blocks without transactions.
Expand Down

0 comments on commit 2a53cf8

Please sign in to comment.