Skip to content

Commit

Permalink
Sync Committee: Add batch ID tracking and parent-child batch linking
Browse files Browse the repository at this point in the history
* Introduced parent-child links between block batches (`BlockBatch.ParentId` field). First batch generated by SyncCommittee is expected to have nil parent;

* Introduced functionality to track the latest batch ID in `BlockStorage` and link block batches through parent batch IDs;
  • Loading branch information
zadykian committed Feb 24, 2025
1 parent 1eb5bf9 commit 40abc33
Show file tree
Hide file tree
Showing 6 changed files with 123 additions and 19 deletions.
8 changes: 7 additions & 1 deletion nil/services/synccommittee/core/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type AggregatorTaskStorage interface {

type AggregatorBlockStorage interface {
TryGetLatestFetched(ctx context.Context) (*types.MainBlockRef, error)
TryGetLatestBatchId(ctx context.Context) (*types.BatchId, error)
SetBlockBatch(ctx context.Context, batch *types.BlockBatch) error
}

Expand Down Expand Up @@ -226,7 +227,12 @@ func (agg *aggregator) createBlockBatch(ctx context.Context, mainShardBlock *jso
childBlocks = append(childBlocks, childBlock)
}

return types.NewBlockBatch(mainShardBlock, childBlocks)
latestBatchId, err := agg.blockStorage.TryGetLatestBatchId(ctx)
if err != nil {
return nil, fmt.Errorf("error reading latest batch id: %w", err)
}

return types.NewBlockBatch(latestBatchId, mainShardBlock, childBlocks)
}

// handleBlockBatch checks the validity of a block and stores it if valid.
Expand Down
3 changes: 2 additions & 1 deletion nil/services/synccommittee/core/block_batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ func (s *BlockBatchTestSuite) TestNewBlockBatch() {

for _, testCase := range testCases {
s.Run(testCase.name, func() {
batch, err := types.NewBlockBatch(testCase.mainShardBlock, testCase.childBlocks)
parentBatchId := types.NewBatchId()
batch, err := types.NewBlockBatch(&parentBatchId, testCase.mainShardBlock, testCase.childBlocks)
testCase.errPredicate(err)

if err != nil {
Expand Down
84 changes: 73 additions & 11 deletions nil/services/synccommittee/internal/storage/block_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ const (
// Key: mainShardKey, Value: scTypes.MainBlockRef.
latestFetchedTable db.TableName = "latest_fetched"

// latestBatchIdTable stores identifier of the latest saved batch.
// Key: mainShardKey, Value: scTypes.BatchId.
latestBatchIdTable db.TableName = "latest_batch_id"

// stateRootTable stores the latest ProvedStateRoot (single value).
// Key: mainShardKey, Value: common.Hash.
stateRootTable db.TableName = "state_root"
Expand All @@ -44,10 +48,15 @@ const (
var mainShardKey = makeShardKey(types.MainShardId)

type blockEntry struct {
Block jsonrpc.RPCBlock `json:"block"`
IsProved bool `json:"isProved"`
BatchId scTypes.BatchId `json:"batchId"`
FetchedAt time.Time `json:"fetchedAt"`
Block jsonrpc.RPCBlock `json:"block"`
IsProved bool `json:"isProved"`
BatchId scTypes.BatchId `json:"batchId"`
ParentBatchId *scTypes.BatchId `json:"parentBatchId"`
FetchedAt time.Time `json:"fetchedAt"`
}

func (e *blockEntry) ParentId() scTypes.BlockId {
return scTypes.ParentBlockId(&e.Block)
}

type BlockStorageMetrics interface {
Expand Down Expand Up @@ -119,6 +128,51 @@ func (bs *BlockStorage) SetProvedStateRoot(ctx context.Context, stateRoot common
return bs.commit(tx)
}

// TryGetLatestBatchId retrieves the ID of the latest created batch
// or returns nil if:
// a) No batches have been created yet, or
// b) A full storage reset (starting from the first batch) has been triggered.
func (bs *BlockStorage) TryGetLatestBatchId(ctx context.Context) (*scTypes.BatchId, error) {
tx, err := bs.database.CreateRoTx(ctx)
if err != nil {
return nil, err
}
defer tx.Rollback()
return bs.getLatestBatchIdTx(tx)
}

func (bs *BlockStorage) getLatestBatchIdTx(tx db.RoTx) (*scTypes.BatchId, error) {
bytes, err := tx.Get(latestBatchIdTable, mainShardKey)
if bytes == nil || errors.Is(err, db.ErrKeyNotFound) {
return nil, nil
}
if err != nil {
return nil, fmt.Errorf("failed to get latest batch id: %w", err)
}
var batchId scTypes.BatchId
if err := batchId.UnmarshalText(bytes); err != nil {
return nil, err
}
return &batchId, nil
}

func (bs *BlockStorage) putLatestBatchIdTx(tx db.RwTx, batchId *scTypes.BatchId) error {
var bytes []byte

if batchId != nil {
var err error
bytes, err = batchId.MarshalText()
if err != nil {
return err
}
}

if err := tx.Put(latestBatchIdTable, mainShardKey, bytes); err != nil {
return fmt.Errorf("failed to put latest batch id: %w", err)
}
return nil
}

func (bs *BlockStorage) TryGetLatestFetched(ctx context.Context) (*scTypes.MainBlockRef, error) {
tx, err := bs.database.CreateRoTx(ctx)
if err != nil {
Expand Down Expand Up @@ -165,12 +219,12 @@ func (bs *BlockStorage) setBlockBatchImpl(ctx context.Context, batch *scTypes.Bl
}
defer tx.Rollback()

if err := bs.putBlockTx(tx, batch.Id, batch.MainShardBlock); err != nil {
if err := bs.putBlockTx(tx, batch, batch.MainShardBlock); err != nil {
return err
}

for _, childBlock := range batch.ChildBlocks {
if err := bs.putBlockTx(tx, batch.Id, childBlock); err != nil {
if err := bs.putBlockTx(tx, batch, childBlock); err != nil {
return err
}
}
Expand All @@ -183,12 +237,16 @@ func (bs *BlockStorage) setBlockBatchImpl(ctx context.Context, batch *scTypes.Bl
return err
}

if err := bs.putLatestBatchIdTx(tx, &batch.Id); err != nil {
return err
}

return bs.commit(tx)
}

func (bs *BlockStorage) putBlockTx(tx db.RwTx, batchId scTypes.BatchId, block *jsonrpc.RPCBlock) error {
func (bs *BlockStorage) putBlockTx(tx db.RwTx, batch *scTypes.BlockBatch, block *jsonrpc.RPCBlock) error {
currentTime := bs.timer.NowTime()
entry := blockEntry{Block: *block, BatchId: batchId, FetchedAt: currentTime}
entry := blockEntry{Block: *block, BatchId: batch.Id, ParentBatchId: batch.ParentId, FetchedAt: currentTime}
value, err := marshallEntry(&entry)
if err != nil {
return err
Expand Down Expand Up @@ -525,7 +583,7 @@ func (bs *BlockStorage) resetProgressImpl(ctx context.Context, firstMainHashToPu
}

if scTypes.IdFromBlock(&entry.Block) == startingId {
err := bs.resetLatestFetchedToParentOf(tx, entry)
err := bs.resetToParent(tx, entry)
if err != nil {
return err
}
Expand All @@ -539,14 +597,18 @@ func (bs *BlockStorage) resetProgressImpl(ctx context.Context, firstMainHashToPu
return bs.commit(tx)
}

func (bs *BlockStorage) resetLatestFetchedToParentOf(tx db.RwTx, entry *blockEntry) error {
func (bs *BlockStorage) resetToParent(tx db.RwTx, entry *blockEntry) error {
refToParent, err := scTypes.GetMainParentRef(&entry.Block)
if err != nil {
return fmt.Errorf("failed to get main block parent ref: %w", err)
}
if err := bs.putLatestFetchedBlockTx(tx, types.MainShardId, refToParent); err != nil {
return fmt.Errorf("failed to reset latest fetched block: %w", err)
}
if err := bs.putLatestBatchIdTx(tx, entry.ParentBatchId); err != nil {
return fmt.Errorf("failed to reset latest batch id: %w", err)
}

return nil
}

Expand Down Expand Up @@ -644,7 +706,7 @@ func (bs *BlockStorage) deleteMainBlockWithChildren(tx db.RwTx, mainShardEntry *
}

func (bs *BlockStorage) deleteBlock(tx db.RwTx, entry *blockEntry) error {
parentId := scTypes.ParentBlockId(&entry.Block)
parentId := entry.ParentId()
err := tx.Delete(blockParentIdxTable, parentId.Bytes())
if err != nil && !errors.Is(err, db.ErrKeyNotFound) {
return fmt.Errorf("failed to delete parent idx entry, parentId=%s: %w", parentId, err)
Expand Down
28 changes: 28 additions & 0 deletions nil/services/synccommittee/internal/storage/block_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,24 @@ func (s *BlockStorageTestSuite) TestSetBlockBatchSequentially_GetConcurrently()
waitGroup.Wait()
}

func (s *BlockStorageTestSuite) Test_LatestBatchId() {
const blocksCount = 5
batches := testaide.NewBatchesSequence(blocksCount)

latestBatchId, err := s.bs.TryGetLatestBatchId(s.ctx)
s.Require().NoError(err)
s.Require().Nil(latestBatchId)

for _, batch := range batches {
err := s.bs.SetBlockBatch(s.ctx, batch)
s.Require().NoError(err)

latestBatchId, err := s.bs.TryGetLatestBatchId(s.ctx)
s.Require().NoError(err)
s.Equal(&batch.Id, latestBatchId)
}
}

func (s *BlockStorageTestSuite) TestGetLastFetchedBlock() {
// initially latestFetched should be empty
latestFetched, err := s.bs.TryGetLatestFetched(s.ctx)
Expand Down Expand Up @@ -383,6 +401,8 @@ func (s *BlockStorageTestSuite) Test_ResetProgress_Block_Does_Not_Exists() {

latestFetchedBeforeReset, err := s.bs.TryGetLatestFetched(s.ctx)
s.Require().NoError(err)
latestBatchIdBeforeReset, err := s.bs.TryGetLatestBatchId(s.ctx)
s.Require().NoError(err)

nonExistentBlockHash := testaide.RandomHash()
err = s.bs.ResetProgress(s.ctx, nonExistentBlockHash)
Expand All @@ -399,6 +419,10 @@ func (s *BlockStorageTestSuite) Test_ResetProgress_Block_Does_Not_Exists() {
latestFetchedAfterReset, err := s.bs.TryGetLatestFetched(s.ctx)
s.Require().NoError(err)
s.Require().Equal(latestFetchedBeforeReset, latestFetchedAfterReset)

latestBatchIdAfterReset, err := s.bs.TryGetLatestBatchId(s.ctx)
s.Require().NoError(err)
s.Require().Equal(latestBatchIdBeforeReset, latestBatchIdAfterReset)
}

func (s *BlockStorageTestSuite) Test_ResetProgress() {
Expand Down Expand Up @@ -460,4 +484,8 @@ func (s *BlockStorageTestSuite) testResetProgress(passedHashIdx int) {
actualLatestFetched, err := s.bs.TryGetLatestFetched(s.ctx)
s.Require().NoError(err)
s.Require().Equal(expectedLatestFetched, actualLatestFetched)

actualLatestBatchId, err := s.bs.TryGetLatestBatchId(s.ctx)
s.Require().NoError(err)
s.Require().Equal(batches[passedHashIdx].ParentId, actualLatestBatchId)
}
9 changes: 5 additions & 4 deletions nil/services/synccommittee/internal/testaide/blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,10 @@ func NewBatchesSequence(batchesCount int) []*scTypes.BlockBatch {
nextBatch.MainShardBlock.Number = 0
nextBatch.MainShardBlock.ParentHash = common.EmptyHash
} else {
prevMainBlock := batches[len(batches)-1].MainShardBlock
nextBatch.MainShardBlock.ParentHash = prevMainBlock.Hash
nextBatch.MainShardBlock.Number = prevMainBlock.Number + 1
prevBatch := batches[len(batches)-1]
nextBatch.MainShardBlock.ParentHash = prevBatch.MainShardBlock.Hash
nextBatch.MainShardBlock.Number = prevBatch.MainShardBlock.Number + 1
nextBatch.ParentId = &prevBatch.Id
}
batches = append(batches, nextBatch)
}
Expand All @@ -93,7 +94,7 @@ func NewBlockBatch(childBlocksCount int) *scTypes.BlockBatch {
mainBlock.ChildBlocks = append(mainBlock.ChildBlocks, block.Hash)
}

batch, err := scTypes.NewBlockBatch(mainBlock, childBlocks)
batch, err := scTypes.NewBlockBatch(nil, mainBlock, childBlocks)
if err != nil {
panic(err)
}
Expand Down
10 changes: 8 additions & 2 deletions nil/services/synccommittee/internal/types/block_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func NewBatchId() BatchId { return BatchId(uuid.New()) }
func (id BatchId) String() string { return uuid.UUID(id).String() }
func (id BatchId) Bytes() []byte { return []byte(id.String()) }

// MarshalText implements the encoding.TextMarshller interface for BatchId.
// MarshalText implements the encoding.TextMarshaler interface for BatchId.
func (id BatchId) MarshalText() ([]byte, error) {
uuidValue := uuid.UUID(id)
return []byte(uuidValue.String()), nil
Expand All @@ -45,17 +45,23 @@ func (id *BatchId) Set(val string) error {

type BlockBatch struct {
Id BatchId `json:"id"`
ParentId *BatchId `json:"parentId"`
MainShardBlock *jsonrpc.RPCBlock `json:"mainShardBlock"`
ChildBlocks []*jsonrpc.RPCBlock `json:"childBlocks"`
}

func NewBlockBatch(mainShardBlock *jsonrpc.RPCBlock, childBlocks []*jsonrpc.RPCBlock) (*BlockBatch, error) {
func NewBlockBatch(
parentId *BatchId,
mainShardBlock *jsonrpc.RPCBlock,
childBlocks []*jsonrpc.RPCBlock,
) (*BlockBatch, error) {
if err := validateBatch(mainShardBlock, childBlocks); err != nil {
return nil, err
}

return &BlockBatch{
Id: NewBatchId(),
ParentId: parentId,
MainShardBlock: mainShardBlock,
ChildBlocks: childBlocks,
}, nil
Expand Down

0 comments on commit 40abc33

Please sign in to comment.