Skip to content

Commit

Permalink
Sync Committee: Task-related Refactoring
Browse files Browse the repository at this point in the history
* `TaskStorage`: Replaced two `AddSingle(...)` and `AddTaskEntries(..)` methods with single variadic one;
* Minor refactoring of task-related functions and methods;
* `AddTaskEntries(...)` methods now return `ErrSerializationFailed` is case if task with a given id already exists;
* `BlockStorage.SetBlockAsProved`: checking if block is already marked as proved;
* Extracted `TaskResult`, `TaskStatus` and `TaskType` into separate files;
  • Loading branch information
zadykian committed Jan 26, 2025
1 parent 7004435 commit 09aa5d5
Show file tree
Hide file tree
Showing 13 changed files with 378 additions and 104 deletions.
2 changes: 1 addition & 1 deletion nil/services/synccommittee/core/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ func (agg *Aggregator) createProofTasks(ctx context.Context, batch *types.BlockB
return fmt.Errorf("error creating proof tasks, mainHash=%s: %w", batch.MainShardBlock.Hash, err)
}

if err := agg.taskStorage.AddTaskEntries(ctx, proofTasks); err != nil {
if err := agg.taskStorage.AddTaskEntries(ctx, proofTasks...); err != nil {
return fmt.Errorf("error adding task entries, mainHash=%s: %w", batch.MainShardBlock.Hash, err)
}

Expand Down
10 changes: 5 additions & 5 deletions nil/services/synccommittee/internal/rpc/task_debug_rpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ type getTaskTestCase struct {

func (s *TaskSchedulerDebugRpcTestSuite) Test_Get_Tasks() {
entries := newTaskEntries(s.timer.NowTime())
err := s.storage.AddTaskEntries(s.context, entries)
err := s.storage.AddTaskEntries(s.context, entries...)
s.Require().NoError(err)

testCases := []getTaskTestCase{
Expand Down Expand Up @@ -234,7 +234,7 @@ func (s *TaskSchedulerDebugRpcTestSuite) Test_Get_Task_Tree_Empty_Storage() {

func (s *TaskSchedulerDebugRpcTestSuite) Test_Get_Task_Tree_Not_Found() {
entries := newTaskEntries(s.timer.NowTime())
err := s.storage.AddTaskEntries(s.context, entries)
err := s.storage.AddTaskEntries(s.context, entries...)
s.Require().NoError(err)

someRootId := types.NewTaskId()
Expand All @@ -246,7 +246,7 @@ func (s *TaskSchedulerDebugRpcTestSuite) Test_Get_Task_Tree_Not_Found() {
func (s *TaskSchedulerDebugRpcTestSuite) Test_Get_Task_Tree_No_Dependencies() {
now := s.timer.NowTime()
entry := testaide.NewTaskEntry(now, running, testaide.RandomExecutorId())
err := s.storage.AddSingleTaskEntry(s.context, *entry)
err := s.storage.AddTaskEntries(s.context, entry)
s.Require().NoError(err)

taskTree, err := s.rpcClient.GetTaskTree(s.context, entry.Task.Id)
Expand Down Expand Up @@ -282,7 +282,7 @@ func (s *TaskSchedulerDebugRpcTestSuite) Test_Get_Task_Tree_With_Dependencies()
taskE := testaide.NewTaskEntry(now, types.Running, testaide.RandomExecutorId())
taskC.AddDependency(taskE)

err := s.storage.AddTaskEntries(s.context, []*types.TaskEntry{taskA, taskB, taskC, taskD, taskE})
err := s.storage.AddTaskEntries(s.context, taskA, taskB, taskC, taskD, taskE)
s.Require().NoError(err)

taskATree, err := s.rpcClient.GetTaskTree(s.context, taskA.Task.Id)
Expand Down Expand Up @@ -326,7 +326,7 @@ func (s *TaskSchedulerDebugRpcTestSuite) Test_Get_Task_Tree_With_Terminated_Depe
taskC := testaide.NewTaskEntry(now.Add(-1*time.Minute), types.WaitingForExecutor, types.UnknownExecutorId)
taskA.AddDependency(taskC)

err := s.storage.AddTaskEntries(s.context, []*types.TaskEntry{taskA, taskB, taskC})
err := s.storage.AddTaskEntries(s.context, taskA, taskB, taskC)
s.Require().NoError(err)

executor := testaide.RandomExecutorId()
Expand Down
35 changes: 23 additions & 12 deletions nil/services/synccommittee/internal/storage/block_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,40 +258,51 @@ func (bs *blockStorage) setProposeParentHash(tx db.RwTx, block *jsonrpc.RPCBlock
}

func (bs *blockStorage) SetBlockAsProved(ctx context.Context, id scTypes.BlockId) error {
if err := bs.setBlockAsProvedImpl(ctx, id); err != nil {
wasSet, err := bs.setBlockAsProvedImpl(ctx, id)
if err != nil {
return err
}
bs.metrics.RecordMainBlockProved(ctx)
if wasSet {
bs.metrics.RecordMainBlockProved(ctx)
}
return nil
}

func (bs *blockStorage) setBlockAsProvedImpl(ctx context.Context, id scTypes.BlockId) error {
func (bs *blockStorage) setBlockAsProvedImpl(ctx context.Context, id scTypes.BlockId) (wasSet bool, err error) {
tx, err := bs.db.CreateRwTx(ctx)
if err != nil {
return err
return false, err
}
defer tx.Rollback()

entry, err := bs.getBlockEntry(tx, id)
if err != nil {
return err
return false, err
}

if entry == nil {
return fmt.Errorf("block with id=%s is not found", id.String())
return false, fmt.Errorf("block with id=%s is not found", id)
}
if entry.IsProved {
bs.logger.Debug().Stringer("blockId", id).Msg("block is already marked as proved")
return false, nil
}

entry.IsProved = true
value, err := marshallEntry(entry)
if err != nil {
return err
return false, err
}

if err := tx.Put(blocksTable, id.Bytes(), value); err != nil {
return err
return false, err
}

return tx.Commit()
if err := tx.Commit(); err != nil {
return false, err
}

return true, nil
}

func (bs *blockStorage) TryGetNextProposalData(ctx context.Context) (*scTypes.ProposalData, error) {
Expand Down Expand Up @@ -321,7 +332,7 @@ func (bs *blockStorage) TryGetNextProposalData(ctx context.Context) (*scTypes.Pr

var mainShardEntry *blockEntry
err = iterateOverEntries(tx, func(entry *blockEntry) (bool, error) {
if isValidProposalCandidate(entry, parentHash) {
if isValidProposalCandidate(entry, *parentHash) {
mainShardEntry = entry
return false, nil
}
Expand Down Expand Up @@ -413,10 +424,10 @@ func (bs *blockStorage) setBlockAsProposedImpl(ctx context.Context, id scTypes.B
return tx.Commit()
}

func isValidProposalCandidate(entry *blockEntry, parentHash *common.Hash) bool {
func isValidProposalCandidate(entry *blockEntry, parentHash common.Hash) bool {
return entry.Block.ShardId == types.MainShardId &&
entry.IsProved &&
entry.Block.ParentHash == *parentHash
entry.Block.ParentHash == parentHash
}

// getParentOfNextToPropose retrieves parent's hash of the next block to propose
Expand Down
15 changes: 15 additions & 0 deletions nil/services/synccommittee/internal/storage/block_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,21 @@ func (s *BlockStorageTestSuite) TestSetBlockAsProved() {
s.Require().NoError(err)
}

func (s *BlockStorageTestSuite) Test_SetBlockAsProved_Multiple_Times() {
batch := testaide.NewBlockBatch(3)
mainBlockId := scTypes.IdFromBlock(batch.MainShardBlock)
err := s.bs.SetBlockBatch(s.ctx, batch)
s.Require().NoError(err)

for range 3 {
err := s.bs.SetBlockAsProved(s.ctx, mainBlockId)
s.Require().NoError(err)
}

err = s.bs.SetBlockAsProposed(s.ctx, mainBlockId)
s.Require().NoError(err)
}

func (s *BlockStorageTestSuite) TestSetBlockAsProposed_DoesNotExist() {
randomId := testaide.RandomBlockId()
err := s.bs.SetBlockAsProposed(s.ctx, randomId)
Expand Down
6 changes: 5 additions & 1 deletion nil/services/synccommittee/internal/storage/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,8 @@ package storage

import "errors"

var ErrSerializationFailed = errors.New("failed to serialize/deserialize object")
var (
ErrTaskAlreadyExists = errors.New("task with a given identifier already exists")
ErrSerializationFailed = errors.New("failed to serialize/deserialize object")
errNilTaskEntry = errors.New("task entry cannot be nil")
)
Loading

0 comments on commit 09aa5d5

Please sign in to comment.