diff --git a/nil/services/synccommittee/core/aggregator.go b/nil/services/synccommittee/core/aggregator.go index ed0cbb237..d8d8ed878 100644 --- a/nil/services/synccommittee/core/aggregator.go +++ b/nil/services/synccommittee/core/aggregator.go @@ -33,6 +33,7 @@ type AggregatorTaskStorage interface { type AggregatorBlockStorage interface { TryGetLatestFetched(ctx context.Context) (*types.MainBlockRef, error) + TryGetLatestBatchId(ctx context.Context) (*types.BatchId, error) SetBlockBatch(ctx context.Context, batch *types.BlockBatch) error } @@ -226,7 +227,12 @@ func (agg *aggregator) createBlockBatch(ctx context.Context, mainShardBlock *jso childBlocks = append(childBlocks, childBlock) } - return types.NewBlockBatch(mainShardBlock, childBlocks) + latestBatchId, err := agg.blockStorage.TryGetLatestBatchId(ctx) + if err != nil { + return nil, fmt.Errorf("error reading latest batch id: %w", err) + } + + return types.NewBlockBatch(latestBatchId, mainShardBlock, childBlocks) } // handleBlockBatch checks the validity of a block and stores it if valid. diff --git a/nil/services/synccommittee/core/block_batch_test.go b/nil/services/synccommittee/core/block_batch_test.go index b9476a97d..554705115 100644 --- a/nil/services/synccommittee/core/block_batch_test.go +++ b/nil/services/synccommittee/core/block_batch_test.go @@ -100,7 +100,8 @@ func (s *BlockBatchTestSuite) TestNewBlockBatch() { for _, testCase := range testCases { s.Run(testCase.name, func() { - batch, err := types.NewBlockBatch(testCase.mainShardBlock, testCase.childBlocks) + parentBatchId := types.NewBatchId() + batch, err := types.NewBlockBatch(&parentBatchId, testCase.mainShardBlock, testCase.childBlocks) testCase.errPredicate(err) if err != nil { diff --git a/nil/services/synccommittee/internal/storage/block_storage.go b/nil/services/synccommittee/internal/storage/block_storage.go index 59428e6c8..536d4a785 100644 --- a/nil/services/synccommittee/internal/storage/block_storage.go +++ b/nil/services/synccommittee/internal/storage/block_storage.go @@ -32,6 +32,10 @@ const ( // Key: mainShardKey, Value: scTypes.MainBlockRef. latestFetchedTable db.TableName = "latest_fetched" + // latestBatchIdTable stores identifier of the latest saved batch. + // Key: mainShardKey, Value: scTypes.BatchId. + latestBatchIdTable db.TableName = "latest_batch_id" + // stateRootTable stores the latest ProvedStateRoot (single value). // Key: mainShardKey, Value: common.Hash. stateRootTable db.TableName = "state_root" @@ -44,10 +48,15 @@ const ( var mainShardKey = makeShardKey(types.MainShardId) type blockEntry struct { - Block jsonrpc.RPCBlock `json:"block"` - IsProved bool `json:"isProved"` - BatchId scTypes.BatchId `json:"batchId"` - FetchedAt time.Time `json:"fetchedAt"` + Block jsonrpc.RPCBlock `json:"block"` + IsProved bool `json:"isProved"` + BatchId scTypes.BatchId `json:"batchId"` + ParentBatchId *scTypes.BatchId `json:"parentBatchId"` + FetchedAt time.Time `json:"fetchedAt"` +} + +func (e *blockEntry) ParentId() scTypes.BlockId { + return scTypes.ParentBlockId(&e.Block) } type BlockStorageMetrics interface { @@ -119,6 +128,51 @@ func (bs *BlockStorage) SetProvedStateRoot(ctx context.Context, stateRoot common return bs.commit(tx) } +// TryGetLatestBatchId retrieves the ID of the latest created batch +// or returns nil if: +// a) No batches have been created yet, or +// b) A full storage reset (starting from the first batch) has been triggered. +func (bs *BlockStorage) TryGetLatestBatchId(ctx context.Context) (*scTypes.BatchId, error) { + tx, err := bs.database.CreateRoTx(ctx) + if err != nil { + return nil, err + } + defer tx.Rollback() + return bs.getLatestBatchIdTx(tx) +} + +func (bs *BlockStorage) getLatestBatchIdTx(tx db.RoTx) (*scTypes.BatchId, error) { + bytes, err := tx.Get(latestBatchIdTable, mainShardKey) + if bytes == nil || errors.Is(err, db.ErrKeyNotFound) { + return nil, nil + } + if err != nil { + return nil, fmt.Errorf("failed to get latest batch id: %w", err) + } + var batchId scTypes.BatchId + if err := batchId.UnmarshalText(bytes); err != nil { + return nil, err + } + return &batchId, nil +} + +func (bs *BlockStorage) putLatestBatchIdTx(tx db.RwTx, batchId *scTypes.BatchId) error { + var bytes []byte + + if batchId != nil { + var err error + bytes, err = batchId.MarshalText() + if err != nil { + return err + } + } + + if err := tx.Put(latestBatchIdTable, mainShardKey, bytes); err != nil { + return fmt.Errorf("failed to put latest batch id: %w", err) + } + return nil +} + func (bs *BlockStorage) TryGetLatestFetched(ctx context.Context) (*scTypes.MainBlockRef, error) { tx, err := bs.database.CreateRoTx(ctx) if err != nil { @@ -165,12 +219,12 @@ func (bs *BlockStorage) setBlockBatchImpl(ctx context.Context, batch *scTypes.Bl } defer tx.Rollback() - if err := bs.putBlockTx(tx, batch.Id, batch.MainShardBlock); err != nil { + if err := bs.putBlockTx(tx, batch, batch.MainShardBlock); err != nil { return err } for _, childBlock := range batch.ChildBlocks { - if err := bs.putBlockTx(tx, batch.Id, childBlock); err != nil { + if err := bs.putBlockTx(tx, batch, childBlock); err != nil { return err } } @@ -183,12 +237,16 @@ func (bs *BlockStorage) setBlockBatchImpl(ctx context.Context, batch *scTypes.Bl return err } + if err := bs.putLatestBatchIdTx(tx, &batch.Id); err != nil { + return err + } + return bs.commit(tx) } -func (bs *BlockStorage) putBlockTx(tx db.RwTx, batchId scTypes.BatchId, block *jsonrpc.RPCBlock) error { +func (bs *BlockStorage) putBlockTx(tx db.RwTx, batch *scTypes.BlockBatch, block *jsonrpc.RPCBlock) error { currentTime := bs.timer.NowTime() - entry := blockEntry{Block: *block, BatchId: batchId, FetchedAt: currentTime} + entry := blockEntry{Block: *block, BatchId: batch.Id, ParentBatchId: batch.ParentId, FetchedAt: currentTime} value, err := marshallEntry(&entry) if err != nil { return err @@ -525,7 +583,7 @@ func (bs *BlockStorage) resetProgressImpl(ctx context.Context, firstMainHashToPu } if scTypes.IdFromBlock(&entry.Block) == startingId { - err := bs.resetLatestFetchedToParentOf(tx, entry) + err := bs.resetToParent(tx, entry) if err != nil { return err } @@ -539,7 +597,7 @@ func (bs *BlockStorage) resetProgressImpl(ctx context.Context, firstMainHashToPu return bs.commit(tx) } -func (bs *BlockStorage) resetLatestFetchedToParentOf(tx db.RwTx, entry *blockEntry) error { +func (bs *BlockStorage) resetToParent(tx db.RwTx, entry *blockEntry) error { refToParent, err := scTypes.GetMainParentRef(&entry.Block) if err != nil { return fmt.Errorf("failed to get main block parent ref: %w", err) @@ -547,6 +605,10 @@ func (bs *BlockStorage) resetLatestFetchedToParentOf(tx db.RwTx, entry *blockEnt if err := bs.putLatestFetchedBlockTx(tx, types.MainShardId, refToParent); err != nil { return fmt.Errorf("failed to reset latest fetched block: %w", err) } + if err := bs.putLatestBatchIdTx(tx, entry.ParentBatchId); err != nil { + return fmt.Errorf("failed to reset latest batch id: %w", err) + } + return nil } @@ -644,7 +706,7 @@ func (bs *BlockStorage) deleteMainBlockWithChildren(tx db.RwTx, mainShardEntry * } func (bs *BlockStorage) deleteBlock(tx db.RwTx, entry *blockEntry) error { - parentId := scTypes.ParentBlockId(&entry.Block) + parentId := entry.ParentId() err := tx.Delete(blockParentIdxTable, parentId.Bytes()) if err != nil && !errors.Is(err, db.ErrKeyNotFound) { return fmt.Errorf("failed to delete parent idx entry, parentId=%s: %w", parentId, err) diff --git a/nil/services/synccommittee/internal/storage/block_storage_test.go b/nil/services/synccommittee/internal/storage/block_storage_test.go index 97893bbe3..6e350fc89 100644 --- a/nil/services/synccommittee/internal/storage/block_storage_test.go +++ b/nil/services/synccommittee/internal/storage/block_storage_test.go @@ -88,6 +88,24 @@ func (s *BlockStorageTestSuite) TestSetBlockBatchSequentially_GetConcurrently() waitGroup.Wait() } +func (s *BlockStorageTestSuite) Test_LatestBatchId() { + const blocksCount = 5 + batches := testaide.NewBatchesSequence(blocksCount) + + latestBatchId, err := s.bs.TryGetLatestBatchId(s.ctx) + s.Require().NoError(err) + s.Require().Nil(latestBatchId) + + for _, batch := range batches { + err := s.bs.SetBlockBatch(s.ctx, batch) + s.Require().NoError(err) + + latestBatchId, err := s.bs.TryGetLatestBatchId(s.ctx) + s.Require().NoError(err) + s.Equal(&batch.Id, latestBatchId) + } +} + func (s *BlockStorageTestSuite) TestGetLastFetchedBlock() { // initially latestFetched should be empty latestFetched, err := s.bs.TryGetLatestFetched(s.ctx) @@ -383,6 +401,8 @@ func (s *BlockStorageTestSuite) Test_ResetProgress_Block_Does_Not_Exists() { latestFetchedBeforeReset, err := s.bs.TryGetLatestFetched(s.ctx) s.Require().NoError(err) + latestBatchIdBeforeReset, err := s.bs.TryGetLatestBatchId(s.ctx) + s.Require().NoError(err) nonExistentBlockHash := testaide.RandomHash() err = s.bs.ResetProgress(s.ctx, nonExistentBlockHash) @@ -399,6 +419,10 @@ func (s *BlockStorageTestSuite) Test_ResetProgress_Block_Does_Not_Exists() { latestFetchedAfterReset, err := s.bs.TryGetLatestFetched(s.ctx) s.Require().NoError(err) s.Require().Equal(latestFetchedBeforeReset, latestFetchedAfterReset) + + latestBatchIdAfterReset, err := s.bs.TryGetLatestBatchId(s.ctx) + s.Require().NoError(err) + s.Require().Equal(latestBatchIdBeforeReset, latestBatchIdAfterReset) } func (s *BlockStorageTestSuite) Test_ResetProgress() { @@ -460,4 +484,8 @@ func (s *BlockStorageTestSuite) testResetProgress(passedHashIdx int) { actualLatestFetched, err := s.bs.TryGetLatestFetched(s.ctx) s.Require().NoError(err) s.Require().Equal(expectedLatestFetched, actualLatestFetched) + + actualLatestBatchId, err := s.bs.TryGetLatestBatchId(s.ctx) + s.Require().NoError(err) + s.Require().Equal(batches[passedHashIdx].ParentId, actualLatestBatchId) } diff --git a/nil/services/synccommittee/internal/testaide/blocks.go b/nil/services/synccommittee/internal/testaide/blocks.go index bb97a59d5..ff3c102ba 100644 --- a/nil/services/synccommittee/internal/testaide/blocks.go +++ b/nil/services/synccommittee/internal/testaide/blocks.go @@ -72,9 +72,10 @@ func NewBatchesSequence(batchesCount int) []*scTypes.BlockBatch { nextBatch.MainShardBlock.Number = 0 nextBatch.MainShardBlock.ParentHash = common.EmptyHash } else { - prevMainBlock := batches[len(batches)-1].MainShardBlock - nextBatch.MainShardBlock.ParentHash = prevMainBlock.Hash - nextBatch.MainShardBlock.Number = prevMainBlock.Number + 1 + prevBatch := batches[len(batches)-1] + nextBatch.MainShardBlock.ParentHash = prevBatch.MainShardBlock.Hash + nextBatch.MainShardBlock.Number = prevBatch.MainShardBlock.Number + 1 + nextBatch.ParentId = &prevBatch.Id } batches = append(batches, nextBatch) } @@ -93,7 +94,7 @@ func NewBlockBatch(childBlocksCount int) *scTypes.BlockBatch { mainBlock.ChildBlocks = append(mainBlock.ChildBlocks, block.Hash) } - batch, err := scTypes.NewBlockBatch(mainBlock, childBlocks) + batch, err := scTypes.NewBlockBatch(nil, mainBlock, childBlocks) if err != nil { panic(err) } diff --git a/nil/services/synccommittee/internal/types/block_batch.go b/nil/services/synccommittee/internal/types/block_batch.go index 7091fef97..fa286b962 100644 --- a/nil/services/synccommittee/internal/types/block_batch.go +++ b/nil/services/synccommittee/internal/types/block_batch.go @@ -19,7 +19,7 @@ func NewBatchId() BatchId { return BatchId(uuid.New()) } func (id BatchId) String() string { return uuid.UUID(id).String() } func (id BatchId) Bytes() []byte { return []byte(id.String()) } -// MarshalText implements the encoding.TextMarshller interface for BatchId. +// MarshalText implements the encoding.TextMarshaler interface for BatchId. func (id BatchId) MarshalText() ([]byte, error) { uuidValue := uuid.UUID(id) return []byte(uuidValue.String()), nil @@ -45,17 +45,23 @@ func (id *BatchId) Set(val string) error { type BlockBatch struct { Id BatchId `json:"id"` + ParentId *BatchId `json:"parentId"` MainShardBlock *jsonrpc.RPCBlock `json:"mainShardBlock"` ChildBlocks []*jsonrpc.RPCBlock `json:"childBlocks"` } -func NewBlockBatch(mainShardBlock *jsonrpc.RPCBlock, childBlocks []*jsonrpc.RPCBlock) (*BlockBatch, error) { +func NewBlockBatch( + parentId *BatchId, + mainShardBlock *jsonrpc.RPCBlock, + childBlocks []*jsonrpc.RPCBlock, +) (*BlockBatch, error) { if err := validateBatch(mainShardBlock, childBlocks); err != nil { return nil, err } return &BlockBatch{ Id: NewBatchId(), + ParentId: parentId, MainShardBlock: mainShardBlock, ChildBlocks: childBlocks, }, nil