diff --git a/nil/services/synccommittee/core/aggregator.go b/nil/services/synccommittee/core/aggregator.go index 8f6890b90..3de36504a 100644 --- a/nil/services/synccommittee/core/aggregator.go +++ b/nil/services/synccommittee/core/aggregator.go @@ -60,7 +60,7 @@ type aggregator struct { blockStorage AggregatorBlockStorage taskStorage AggregatorTaskStorage batchCommitter batches.BatchCommitter - resetter reset.StateResetter + resetter *reset.StateResetter timer common.Timer metrics AggregatorMetrics workerAction *concurrent.Suspendable @@ -70,7 +70,7 @@ func NewAggregator( rpcClient client.Client, blockStorage AggregatorBlockStorage, taskStorage AggregatorTaskStorage, - resetter reset.StateResetter, + resetter *reset.StateResetter, timer common.Timer, logger zerolog.Logger, metrics AggregatorMetrics, diff --git a/nil/services/synccommittee/core/aggregator_test.go b/nil/services/synccommittee/core/aggregator_test.go index 91e4a4cf8..6d6ffb429 100644 --- a/nil/services/synccommittee/core/aggregator_test.go +++ b/nil/services/synccommittee/core/aggregator_test.go @@ -142,12 +142,12 @@ func (s *AggregatorTestSuite) Test_Main_Parent_Hash_Mismatch() { for _, provedBatch := range batches[:2] { err := s.blockStorage.SetBlockBatch(s.ctx, provedBatch) s.Require().NoError(err) - err = s.blockStorage.SetBlockAsProved(s.ctx, scTypes.IdFromBlock(provedBatch.MainShardBlock)) + err = s.blockStorage.SetBatchAsProved(s.ctx, provedBatch.Id) s.Require().NoError(err) } // Set first batch as proposed, latestProvedStateRoot value is updated - err := s.blockStorage.SetBlockAsProposed(s.ctx, scTypes.IdFromBlock(batches[0].MainShardBlock)) + err := s.blockStorage.SetBatchAsProposed(s.ctx, batches[0].Id) s.Require().NoError(err) latestProved, err := s.blockStorage.TryGetProvedStateRoot(s.ctx) s.Require().NoError(err) @@ -208,7 +208,7 @@ func (s *AggregatorTestSuite) Test_Fetch_Next_Valid() { func (s *AggregatorTestSuite) Test_Block_Storage_Capacity_Exceeded() { // only one test batch can fit in the storage - storageConfig := storage.NewBlockStorageConfig(testaide.BatchSize) + storageConfig := storage.NewBlockStorageConfig(1) blockStorage := s.newTestBlockStorage(storageConfig) batches := testaide.NewBatchesSequence(2) diff --git a/nil/services/synccommittee/core/block_tasks_integration_test.go b/nil/services/synccommittee/core/block_tasks_integration_test.go index 420e2c914..72c52c256 100644 --- a/nil/services/synccommittee/core/block_tasks_integration_test.go +++ b/nil/services/synccommittee/core/block_tasks_integration_test.go @@ -186,6 +186,6 @@ func newTestSuccessProviderResult(taskToExecute *types.Task, executorId types.Ta type noopStateResetLauncher struct{} -func (l *noopStateResetLauncher) LaunchPartialResetWithSuspension(_ context.Context, _ common.Hash) error { +func (l *noopStateResetLauncher) LaunchPartialResetWithSuspension(_ context.Context, _ types.BatchId) error { return nil } diff --git a/nil/services/synccommittee/core/proposer.go b/nil/services/synccommittee/core/proposer.go index 9f6abbd8f..00c4c5807 100644 --- a/nil/services/synccommittee/core/proposer.go +++ b/nil/services/synccommittee/core/proposer.go @@ -8,7 +8,6 @@ import ( "github.com/NilFoundation/nil/nil/common" "github.com/NilFoundation/nil/nil/common/concurrent" - "github.com/NilFoundation/nil/nil/internal/types" "github.com/NilFoundation/nil/nil/services/synccommittee/internal/metrics" "github.com/NilFoundation/nil/nil/services/synccommittee/internal/rollupcontract" "github.com/NilFoundation/nil/nil/services/synccommittee/internal/srv" @@ -25,7 +24,7 @@ type ProposerStorage interface { TryGetNextProposalData(ctx context.Context) (*scTypes.ProposalData, error) - SetBlockAsProposed(ctx context.Context, id scTypes.BlockId) error + SetBatchAsProposed(ctx context.Context, id scTypes.BatchId) error } type ProposerMetrics interface { @@ -106,12 +105,14 @@ func (p *proposer) Run(ctx context.Context, started chan<- struct{}) error { return err } - close(started) + if started != nil { + close(started) + } concurrent.RunTickerLoop(ctx, p.params.ProposingInterval, func(ctx context.Context) { - if err := p.proposeNextBlock(ctx); err != nil { - p.logger.Error().Err(err).Msg("error during proved blocks proposing") + if err := p.proposeNextBatch(ctx); err != nil { + p.logger.Error().Err(err).Msg("error during proved batches proposing") p.metrics.RecordError(ctx, p.Name()) return } @@ -194,7 +195,7 @@ func (p *proposer) updateStoredStateRoot(ctx context.Context, stateRoot common.H return nil } -func (p *proposer) proposeNextBlock(ctx context.Context) error { +func (p *proposer) proposeNextBatch(ctx context.Context) error { if p.rollupContractWrapper == nil { err := p.initializeProvedStateRoot(ctx) if err != nil { @@ -203,22 +204,21 @@ func (p *proposer) proposeNextBlock(ctx context.Context) error { } data, err := p.storage.TryGetNextProposalData(ctx) if err != nil { - return fmt.Errorf("failed get next block to propose: %w", err) + return fmt.Errorf("failed get next proposal data: %w", err) } if data == nil { - p.logger.Debug().Msg("no block to propose") + p.logger.Debug().Msg("no batches to propose") return nil } err = p.sendProof(ctx, data) if err != nil { - return fmt.Errorf("failed to send proof to L1 for block with hash=%s: %w", data.MainShardBlockHash, err) + return fmt.Errorf("failed to send proof to L1 for batch with id=%s: %w", data.BatchId, err) } - blockId := scTypes.NewBlockId(types.MainShardId, data.MainShardBlockHash) - err = p.storage.SetBlockAsProposed(ctx, blockId) + err = p.storage.SetBatchAsProposed(ctx, data.BatchId) if err != nil { - return fmt.Errorf("failed set block with hash=%s as proposed: %w", data.MainShardBlockHash, err) + return fmt.Errorf("failed set batch with id=%s as proposed: %w", data.BatchId, err) } return nil } diff --git a/nil/services/synccommittee/core/reset/resetter.go b/nil/services/synccommittee/core/reset/resetter.go index 59163e047..54b7b1c51 100644 --- a/nil/services/synccommittee/core/reset/resetter.go +++ b/nil/services/synccommittee/core/reset/resetter.go @@ -2,58 +2,69 @@ package reset import ( "context" + "fmt" - "github.com/NilFoundation/nil/nil/common" "github.com/NilFoundation/nil/nil/common/logging" + scTypes "github.com/NilFoundation/nil/nil/services/synccommittee/internal/types" "github.com/rs/zerolog" ) -type StateResetter interface { +type BatchResetter interface { // ResetProgressPartial resets Sync Committee's block processing progress - // to a point preceding main shard block with the specified hash. - ResetProgressPartial(ctx context.Context, firstMainHashToPurge common.Hash) error + // to a point preceding batch with the specified ID. + ResetProgressPartial(ctx context.Context, firstBatchToPurge scTypes.BatchId) (purgedBatches []scTypes.BatchId, err error) // ResetProgressNotProved resets Sync Committee's progress for all not yet proven blocks. ResetProgressNotProved(ctx context.Context) error } -func NewStateResetter(logger zerolog.Logger, resetters ...StateResetter) StateResetter { - return &compositeStateResetter{ - resetters: resetters, - logger: logger, +func NewStateResetter(logger zerolog.Logger, batchResetter BatchResetter) *StateResetter { + return &StateResetter{ + batchResetter: batchResetter, + logger: logger, } } -type compositeStateResetter struct { - resetters []StateResetter - logger zerolog.Logger +type StateResetter struct { + batchResetter BatchResetter + logger zerolog.Logger } -func (r *compositeStateResetter) ResetProgressPartial(ctx context.Context, firstMainHashToPurge common.Hash) error { +func (r *StateResetter) ResetProgressPartial(ctx context.Context, failedBatchId scTypes.BatchId) error { r.logger.Info(). - Stringer(logging.FieldBlockMainChainHash, firstMainHashToPurge). + Stringer(logging.FieldBatchId, failedBatchId). Msg("Started partial progress reset") - for _, resetter := range r.resetters { - if err := resetter.ResetProgressPartial(ctx, firstMainHashToPurge); err != nil { - return err + purgedBatchIds, err := r.batchResetter.ResetProgressPartial(ctx, failedBatchId) + if err != nil { + return err + } + + for _, batchId := range purgedBatchIds { + // Tasks associated with the failed batch should not be cancelled at this point, + // they will be marked as failed later + if batchId == failedBatchId { + continue } + + // todo: cancel tasks in the storage + + r.logger.Info().Stringer(logging.FieldBatchId, batchId).Msg("Cancelled batch tasks") + // todo: push cancellation requests to executors } r.logger.Info(). - Stringer(logging.FieldBlockMainChainHash, firstMainHashToPurge). + Stringer(logging.FieldBatchId, failedBatchId). Msg("Finished partial progress reset") return nil } -func (r *compositeStateResetter) ResetProgressNotProved(ctx context.Context) error { +func (r *StateResetter) ResetProgressNotProved(ctx context.Context) error { r.logger.Info().Msg("Started not proven progress reset") - for _, resetter := range r.resetters { - if err := resetter.ResetProgressNotProved(ctx); err != nil { - return err - } + if err := r.batchResetter.ResetProgressNotProved(ctx); err != nil { + return fmt.Errorf("failed to reset progress not proved batches: %w", err) } r.logger.Info().Msg("Finished not proven progress reset") diff --git a/nil/services/synccommittee/core/reset/state_reset_launcher.go b/nil/services/synccommittee/core/reset/state_reset_launcher.go index 7612ac4e6..7e1605289 100644 --- a/nil/services/synccommittee/core/reset/state_reset_launcher.go +++ b/nil/services/synccommittee/core/reset/state_reset_launcher.go @@ -5,8 +5,8 @@ import ( "fmt" "time" - "github.com/NilFoundation/nil/nil/common" "github.com/NilFoundation/nil/nil/common/logging" + scTypes "github.com/NilFoundation/nil/nil/services/synccommittee/internal/types" "github.com/rs/zerolog" ) @@ -27,14 +27,14 @@ type Service interface { type stateResetLauncher struct { blockFetcher BlockFetcher - resetter StateResetter + resetter *StateResetter service Service logger zerolog.Logger } func NewResetLauncher( blockFetcher BlockFetcher, - resetter StateResetter, + resetter *StateResetter, service Service, logger zerolog.Logger, ) *stateResetLauncher { @@ -46,22 +46,22 @@ func NewResetLauncher( } } -func (l *stateResetLauncher) LaunchPartialResetWithSuspension(ctx context.Context, firstMainHashToPurge common.Hash) error { +func (l *stateResetLauncher) LaunchPartialResetWithSuspension(ctx context.Context, failedBatchId scTypes.BatchId) error { l.logger.Info(). - Stringer(logging.FieldBlockMainChainHash, firstMainHashToPurge). + Stringer(logging.FieldBlockMainChainHash, failedBatchId). Msg("Launching state reset process") if err := l.blockFetcher.Pause(ctx); err != nil { return fmt.Errorf("failed to pause block fetching: %w", err) } - if err := l.resetter.ResetProgressPartial(ctx, firstMainHashToPurge); err != nil { - l.onResetError(ctx, err, firstMainHashToPurge) + if err := l.resetter.ResetProgressPartial(ctx, failedBatchId); err != nil { + l.onResetError(ctx, err, failedBatchId) return nil } l.logger.Info(). - Stringer(logging.FieldBlockMainChainHash, firstMainHashToPurge). + Stringer(logging.FieldBlockMainChainHash, failedBatchId). Msgf("State reset completed, block fetching will be resumed after %s", fetchResumeDelay) detachedCtx := context.WithoutCancel(ctx) @@ -72,9 +72,9 @@ func (l *stateResetLauncher) LaunchPartialResetWithSuspension(ctx context.Contex } func (l *stateResetLauncher) onResetError( - ctx context.Context, resetErr error, failedMainBlockHash common.Hash, + ctx context.Context, resetErr error, failedBatchId scTypes.BatchId, ) { - l.logger.Error().Err(resetErr).Stringer(logging.FieldBlockMainChainHash, failedMainBlockHash).Send() + l.logger.Error().Err(resetErr).Stringer(logging.FieldBatchId, failedBatchId).Send() l.resumeBlockFetching(ctx) } diff --git a/nil/services/synccommittee/core/service.go b/nil/services/synccommittee/core/service.go index 3f28784ba..1c395170f 100644 --- a/nil/services/synccommittee/core/service.go +++ b/nil/services/synccommittee/core/service.go @@ -41,7 +41,7 @@ func New(cfg *Config, database db.DB, ethClient rollupcontract.EthClient) (*Sync blockStorage := storage.NewBlockStorage(database, storage.DefaultBlockStorageConfig(), timer, metricsHandler, logger) taskStorage := storage.NewTaskStorage(database, timer, metricsHandler, logger) - // todo: add reset logic to TaskStorage (implement StateResetter interface) and pass it here in https://github.com/NilFoundation/nil/pull/419 + // todo: add reset logic to TaskStorage and pass it here stateResetter := reset.NewStateResetter(logger, blockStorage) agg := NewAggregator( diff --git a/nil/services/synccommittee/core/task_state_change_handler.go b/nil/services/synccommittee/core/task_state_change_handler.go index 73dfbab15..6724c5175 100644 --- a/nil/services/synccommittee/core/task_state_change_handler.go +++ b/nil/services/synccommittee/core/task_state_change_handler.go @@ -4,7 +4,6 @@ import ( "context" "fmt" - "github.com/NilFoundation/nil/nil/common" "github.com/NilFoundation/nil/nil/common/logging" "github.com/NilFoundation/nil/nil/services/synccommittee/internal/api" "github.com/NilFoundation/nil/nil/services/synccommittee/internal/log" @@ -12,27 +11,27 @@ import ( "github.com/rs/zerolog" ) -type ProvedBlockSetter interface { - SetBlockAsProved(ctx context.Context, blockId types.BlockId) error +type ProvedBatchSetter interface { + SetBatchAsProved(ctx context.Context, batchId types.BatchId) error } type StateResetLauncher interface { - LaunchPartialResetWithSuspension(ctx context.Context, failedMainBlockHash common.Hash) error + LaunchPartialResetWithSuspension(ctx context.Context, failedBatchId types.BatchId) error } type taskStateChangeHandler struct { - blockSetter ProvedBlockSetter + batchSetter ProvedBatchSetter stateResetLauncher StateResetLauncher logger zerolog.Logger } func newTaskStateChangeHandler( - blockSetter ProvedBlockSetter, + batchSetter ProvedBatchSetter, stateResetLauncher StateResetLauncher, logger zerolog.Logger, ) api.TaskStateChangeHandler { return &taskStateChangeHandler{ - blockSetter: blockSetter, + batchSetter: batchSetter, stateResetLauncher: stateResetLauncher, logger: logger, } @@ -53,7 +52,7 @@ func (h *taskStateChangeHandler) OnTaskTerminated(ctx context.Context, task *typ default: log.NewTaskResultEvent(h.logger, zerolog.WarnLevel, result). Msg("task execution failed with critical error, state will be reset") - return h.stateResetLauncher.LaunchPartialResetWithSuspension(ctx, task.BlockHash) + return h.stateResetLauncher.LaunchPartialResetWithSuspension(ctx, task.BatchId) } } @@ -68,10 +67,8 @@ func (h *taskStateChangeHandler) onTaskSuccess(ctx context.Context, task *types. Stringer(logging.FieldBatchId, task.BatchId). Msg("Proof batch completed") - blockId := types.NewBlockId(task.ShardId, task.BlockHash) - - if err := h.blockSetter.SetBlockAsProved(ctx, blockId); err != nil { - return fmt.Errorf("failed to set block with id=%s as proved: %w", blockId, err) + if err := h.batchSetter.SetBatchAsProved(ctx, task.BatchId); err != nil { + return fmt.Errorf("failed to set batch with id=%s as proved: %w", task.BatchId, err) } return nil diff --git a/nil/services/synccommittee/internal/metrics/sync_committee_metrics.go b/nil/services/synccommittee/internal/metrics/sync_committee_metrics.go index f47488899..361579bb2 100644 --- a/nil/services/synccommittee/internal/metrics/sync_committee_metrics.go +++ b/nil/services/synccommittee/internal/metrics/sync_committee_metrics.go @@ -21,7 +21,7 @@ type SyncCommitteeMetricsHandler struct { blockBatchSize telemetry.Histogram // BlockStorageMetrics - totalMainBlocksProved telemetry.Counter + totalBatchesProved telemetry.Counter // ProposerMetrics totalL1TxSent telemetry.Counter @@ -66,7 +66,7 @@ func (h *SyncCommitteeMetricsHandler) init(attributes metric.MeasurementOption, func (h *SyncCommitteeMetricsHandler) initBlockStorageMetrics(meter telemetry.Meter) error { var err error - if h.totalMainBlocksProved, err = meter.Int64Counter(namespace + "total_main_blocks_proved"); err != nil { + if h.totalBatchesProved, err = meter.Int64Counter(namespace + "total_batches_proved"); err != nil { return err } @@ -112,8 +112,8 @@ func (h *SyncCommitteeMetricsHandler) RecordBlockBatchSize(ctx context.Context, h.blockBatchSize.Record(ctx, batchSize, h.attributes) } -func (h *SyncCommitteeMetricsHandler) RecordMainBlockProved(ctx context.Context) { - h.totalMainBlocksProved.Add(ctx, 1, h.attributes) +func (h *SyncCommitteeMetricsHandler) RecordBatchProved(ctx context.Context) { + h.totalBatchesProved.Add(ctx, 1, h.attributes) } func (h *SyncCommitteeMetricsHandler) RecordProposerTxSent(ctx context.Context, proposalData *types.ProposalData) { diff --git a/nil/services/synccommittee/internal/storage/block_storage.go b/nil/services/synccommittee/internal/storage/block_storage.go index 5af54ce73..01c2a98c9 100644 --- a/nil/services/synccommittee/internal/storage/block_storage.go +++ b/nil/services/synccommittee/internal/storage/block_storage.go @@ -24,9 +24,13 @@ const ( // Key: scTypes.BlockId (block's own id), Value: blockEntry. blocksTable db.TableName = "blocks" - // blockParentIdxTable is used for indexing blocks by their parent ids. - // Key: scTypes.BlockId (block's parent id), Value: scTypes.BlockId (block's own id); - blockParentIdxTable db.TableName = "blocks_parent_hash_idx" + // batchesTable stores blocks batches produced by the Sync Committee. + // Key: scTypes.BatchId, Value: batchEntry. + batchesTable db.TableName = "batches" + + // batchParentIdxTable is used for indexing batches by their parent ids. + // Key: scTypes.BatchId (batch's parent id), Value: scTypes.BatchId (batch's own id); + batchParentIdxTable db.TableName = "blocks_parent_hash_idx" // latestFetchedTable stores reference to the latest main shard block. // Key: mainShardKey, Value: scTypes.MainBlockRef. @@ -44,35 +48,61 @@ const ( // Key: mainShardKey, Value: common.Hash. nextToProposeTable db.TableName = "next_to_propose_parent_hash" - // storedBlocksCountTable stores the count of blocks that have been persisted in the database. + // storedBatchesCountTable stores the count of batches that have been persisted in the database. // Key: mainShardKey, Value: uint32. - storedBlocksCountTable db.TableName = "stored_blocks_count" + storedBatchesCountTable db.TableName = "stored_batches_count" ) var mainShardKey = makeShardKey(types.MainShardId) +type batchEntry struct { + Id scTypes.BatchId `json:"batchId"` + ParentId *scTypes.BatchId `json:"parentBatchId,omitempty"` + MainParentHash common.Hash `json:"mainParentHash"` + + MainBlockId scTypes.BlockId `json:"mainBlockId"` + ExecBlockIds []scTypes.BlockId `json:"execBlockIds"` + + IsProved bool `json:"isProved,omitempty"` + CreatedAt time.Time `json:"createdAt"` +} + +func newBatchEntry(batch *scTypes.BlockBatch, createdAt time.Time) *batchEntry { + execBlockIds := make([]scTypes.BlockId, 0, len(batch.ChildBlocks)) + for _, childBlock := range batch.ChildBlocks { + execBlockIds = append(execBlockIds, scTypes.IdFromBlock(childBlock)) + } + + return &batchEntry{ + Id: batch.Id, + ParentId: batch.ParentId, + MainParentHash: batch.MainShardBlock.ParentHash, + MainBlockId: scTypes.IdFromBlock(batch.MainShardBlock), + ExecBlockIds: execBlockIds, + CreatedAt: createdAt, + } +} + type blockEntry struct { - Block jsonrpc.RPCBlock `json:"block"` - IsProved bool `json:"isProved"` - BatchId scTypes.BatchId `json:"batchId"` - ParentBatchId *scTypes.BatchId `json:"parentBatchId"` - FetchedAt time.Time `json:"fetchedAt"` + Block jsonrpc.RPCBlock `json:"block"` + BatchId scTypes.BatchId `json:"batchId"` + FetchedAt time.Time `json:"fetchedAt"` } func newBlockEntry(block *jsonrpc.RPCBlock, containingBatch *scTypes.BlockBatch, fetchedAt time.Time) *blockEntry { return &blockEntry{ - Block: *block, - BatchId: containingBatch.Id, - ParentBatchId: containingBatch.ParentId, - FetchedAt: fetchedAt, + Block: *block, + BatchId: containingBatch.Id, + FetchedAt: fetchedAt, } } type BlockStorageMetrics interface { - RecordMainBlockProved(ctx context.Context) + RecordBatchProved(ctx context.Context) } type BlockStorageConfig struct { + // CapacityLimit defines the maximum number of stored batches. CapacityLimit uint32 } @@ -83,7 +113,7 @@ func NewBlockStorageConfig(capacityLimit uint32) BlockStorageConfig { } func DefaultBlockStorageConfig() BlockStorageConfig { - return NewBlockStorageConfig(200) + return NewBlockStorageConfig(100) } type BlockStorage struct { @@ -104,7 +134,10 @@ func NewBlockStorage( commonStorage: makeCommonStorage( database, logger, - common.DoNotRetryIf(scTypes.ErrBlockMismatch, scTypes.ErrBlockNotFound, scTypes.ErrBatchMismatch), + common.DoNotRetryIf( + scTypes.ErrBatchMismatch, scTypes.ErrBlockNotFound, scTypes.ErrBatchNotFound, scTypes.ErrBatchNotProved, + ErrStateRootNotInitialized, + ), ), config: config, timer: timer, @@ -261,23 +294,10 @@ func (bs *BlockStorage) setBlockBatchImpl(ctx context.Context, batch *scTypes.Bl } defer tx.Rollback() - if err := bs.addStoredCountTx(tx, int32(batch.BlocksCount())); err != nil { - return err - } - - currentTime := bs.timer.NowTime() - mainEntry := newBlockEntry(batch.MainShardBlock, batch, currentTime) - if err := bs.putBlockTx(tx, mainEntry); err != nil { + if err := bs.putBatchWithBlockTx(tx, batch); err != nil { return err } - for _, childBlock := range batch.ChildBlocks { - childEntry := newBlockEntry(childBlock, batch, currentTime) - if err := bs.putBlockTx(tx, childEntry); err != nil { - return err - } - } - if err := bs.setProposeParentHash(tx, batch.MainShardBlock); err != nil { return err } @@ -372,41 +392,36 @@ func (bs *BlockStorage) setProposeParentHash(tx db.RwTx, block *jsonrpc.RPCBlock return bs.setParentOfNextToPropose(tx, block.ParentHash) } -func (bs *BlockStorage) SetBlockAsProved(ctx context.Context, id scTypes.BlockId) error { - wasSet, err := bs.setBlockAsProvedImpl(ctx, id) +func (bs *BlockStorage) SetBatchAsProved(ctx context.Context, batchId scTypes.BatchId) error { + wasSet, err := bs.setBatchAsProvedImpl(ctx, batchId) if err != nil { return err } if wasSet { - bs.metrics.RecordMainBlockProved(ctx) + bs.metrics.RecordBatchProved(ctx) } return nil } -func (bs *BlockStorage) setBlockAsProvedImpl(ctx context.Context, id scTypes.BlockId) (wasSet bool, err error) { +func (bs *BlockStorage) setBatchAsProvedImpl(ctx context.Context, batchId scTypes.BatchId) (wasSet bool, err error) { tx, err := bs.database.CreateRwTx(ctx) if err != nil { return false, err } defer tx.Rollback() - entry, err := bs.getBlockEntry(tx, id, true) + entry, err := bs.getBatchTx(tx, batchId) if err != nil { return false, err } if entry.IsProved { - bs.logger.Debug().Stringer("blockId", id).Msg("block is already marked as proved") + bs.logger.Debug().Stringer(logging.FieldBatchId, batchId).Msg("batch is already marked as proved") return false, nil } entry.IsProved = true - value, err := marshallEntry(entry) - if err != nil { - return false, err - } - - if err := tx.Put(blocksTable, id.Bytes(), value); err != nil { + if err := bs.putBatchTx(tx, entry); err != nil { return false, err } @@ -429,7 +444,7 @@ func (bs *BlockStorage) TryGetNextProposalData(ctx context.Context) (*scTypes.Pr return nil, err } if currentProvedStateRoot == nil { - return nil, errors.New("proved state root was not initialized") + return nil, ErrStateRootNotInitialized } parentHash, err := bs.getParentOfNextToPropose(tx) @@ -442,30 +457,38 @@ func (bs *BlockStorage) TryGetNextProposalData(ctx context.Context) (*scTypes.Pr return nil, nil } - var mainShardEntry *blockEntry - for entry, err := range bs.storedBlocksIter(tx) { + var proposalCandidate *batchEntry + for entry, err := range bs.getStoredBatchesSeq(tx) { if err != nil { return nil, err } if isValidProposalCandidate(entry, *parentHash) { - mainShardEntry = entry + proposalCandidate = entry break } } - if mainShardEntry == nil { - bs.logger.Debug().Stringer("parentHash", parentHash).Msg("no proved main shard block found") + if proposalCandidate == nil { + bs.logger.Debug().Stringer("parentHash", parentHash).Msg("no proved batch found") return nil, nil } - transactions := scTypes.BlockTransactions(&mainShardEntry.Block) + return bs.createProposalDataTx(tx, proposalCandidate, currentProvedStateRoot) +} - childIds, err := scTypes.ChildBlockIds(&mainShardEntry.Block) +func (bs *BlockStorage) createProposalDataTx( + tx db.RoTx, + proposalCandidate *batchEntry, + currentProvedStateRoot *common.Hash, +) (*scTypes.ProposalData, error) { + mainBlockEntry, err := bs.getBlockEntry(tx, proposalCandidate.MainBlockId, true) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to get main block with id=%s: %w", proposalCandidate.MainBlockId, err) } - for _, childId := range childIds { + transactions := scTypes.BlockTransactions(&mainBlockEntry.Block) + + for _, childId := range proposalCandidate.ExecBlockIds { childEntry, err := bs.getBlockEntry(tx, childId, true) if err != nil { return nil, fmt.Errorf("failed to get child block with id=%s: %w", childId, err) @@ -476,41 +499,51 @@ func (bs *BlockStorage) TryGetNextProposalData(ctx context.Context) (*scTypes.Pr } return &scTypes.ProposalData{ - MainShardBlockHash: mainShardEntry.Block.Hash, + BatchId: proposalCandidate.Id, + MainShardBlockHash: mainBlockEntry.Block.Hash, Transactions: transactions, OldProvedStateRoot: *currentProvedStateRoot, - NewProvedStateRoot: mainShardEntry.Block.ChildBlocksRootHash, - MainBlockFetchedAt: mainShardEntry.FetchedAt, + NewProvedStateRoot: mainBlockEntry.Block.Hash, + MainBlockFetchedAt: mainBlockEntry.FetchedAt, }, nil } -func (bs *BlockStorage) SetBlockAsProposed(ctx context.Context, id scTypes.BlockId) error { +func (bs *BlockStorage) SetBatchAsProposed(ctx context.Context, id scTypes.BatchId) error { return bs.retryRunner.Do(ctx, func(ctx context.Context) error { - return bs.setBlockAsProposedImpl(ctx, id) + return bs.setBatchAsProposedImpl(ctx, id) }) } -func (bs *BlockStorage) setBlockAsProposedImpl(ctx context.Context, id scTypes.BlockId) error { +func (bs *BlockStorage) setBatchAsProposedImpl(ctx context.Context, id scTypes.BatchId) error { tx, err := bs.database.CreateRwTx(ctx) if err != nil { return err } defer tx.Rollback() - mainShardEntry, err := bs.getBlockEntry(tx, id, true) + batch, err := bs.getBatchTx(tx, id) if err != nil { return err } - if err := bs.validateMainShardEntry(tx, id, mainShardEntry); err != nil { + if !batch.IsProved { + return fmt.Errorf("%w, id=%s", scTypes.ErrBatchNotProved, id) + } + + mainShardEntry, err := bs.getBlockEntry(tx, batch.MainBlockId, true) + if err != nil { return err } - if err := bs.deleteMainBlockWithChildren(tx, mainShardEntry); err != nil { + if err := bs.validateMainShardEntry(tx, mainShardEntry); err != nil { return err } - if err := tx.Put(stateRootTable, mainShardKey, mainShardEntry.Block.ChildBlocksRootHash.Bytes()); err != nil { + if err := bs.deleteBatchWithBlocksTx(tx, batch); err != nil { + return err + } + + if err := tx.Put(stateRootTable, mainShardKey, mainShardEntry.Block.Hash.Bytes()); err != nil { return fmt.Errorf("failed to put state root: %w", err) } @@ -521,10 +554,8 @@ func (bs *BlockStorage) setBlockAsProposedImpl(ctx context.Context, id scTypes.B return bs.commit(tx) } -func isValidProposalCandidate(entry *blockEntry, parentHash common.Hash) bool { - return entry.Block.ShardId == types.MainShardId && - entry.IsProved && - entry.Block.ParentHash == parentHash +func isValidProposalCandidate(batch *batchEntry, parentHash common.Hash) bool { + return batch.IsProved && batch.MainParentHash == parentHash } // getParentOfNextToPropose retrieves parent's hash of the next block to propose @@ -552,19 +583,13 @@ func (bs *BlockStorage) setParentOfNextToPropose(tx db.RwTx, hash common.Hash) e return nil } -func (bs *BlockStorage) validateMainShardEntry(tx db.RoTx, id scTypes.BlockId, entry *blockEntry) error { - if entry == nil { - return fmt.Errorf("block with id=%s is not found", id.String()) - } +func (bs *BlockStorage) validateMainShardEntry(tx db.RoTx, entry *blockEntry) error { + id := scTypes.IdFromBlock(&entry.Block) if entry.Block.ShardId != types.MainShardId { return fmt.Errorf("block with id=%s is not from main shard", id.String()) } - if !entry.IsProved { - return fmt.Errorf("block with id=%s is not proved", id.String()) - } - parentHash, err := bs.getParentOfNextToPropose(tx) if err != nil { return err @@ -614,97 +639,118 @@ func (bs *BlockStorage) putLatestFetchedBlockTx(tx db.RwTx, shardId types.ShardI return nil } -// ResetProgressPartial resets the block storage state starting from the given main block hash: +// ResetProgressPartial resets the block storage state starting from the batch with given ID: // -// 1. Sets the latest fetched block reference to the parent of the block with hash == firstMainHashToPurge. +// 1. Picks first main shard block [B] from the batch with the given ID. +// +// 2. Sets the latest fetched block reference to the parent of the block [B]. // If the specified block is the first block in the chain, the new latest fetched value will be nil. // -// 2. Deletes all main and corresponding exec shard blocks starting from the block with hash == firstMainHashToPurge. -func (bs *BlockStorage) ResetProgressPartial(ctx context.Context, firstMainHashToPurge common.Hash) error { - return bs.retryRunner.Do(ctx, func(ctx context.Context) error { - return bs.resetProgressPartialImpl(ctx, firstMainHashToPurge) +// 3. Deletes all main and corresponding exec shard blocks starting from the block [B]. +func (bs *BlockStorage) ResetProgressPartial( + ctx context.Context, + firstBatchToPurge scTypes.BatchId, +) (purgedBatches []scTypes.BatchId, err error) { + err = bs.retryRunner.Do(ctx, func(ctx context.Context) error { + var err error + purgedBatches, err = bs.resetProgressPartialImpl(ctx, firstBatchToPurge) + return err }) + return } -func (bs *BlockStorage) resetProgressPartialImpl(ctx context.Context, firstMainHashToPurge common.Hash) error { +func (bs *BlockStorage) resetProgressPartialImpl( + ctx context.Context, + firstBatchToPurge scTypes.BatchId, +) (purgedBatches []scTypes.BatchId, err error) { tx, err := bs.database.CreateRwTx(ctx) if err != nil { - return err + return nil, err } defer tx.Rollback() - startingId := scTypes.NewBlockId(types.MainShardId, firstMainHashToPurge) - - startingEntry, err := bs.getBlockEntry(tx, startingId, true) + startingBatch, err := bs.getBatchTx(tx, firstBatchToPurge) if err != nil { - return err + return nil, err } - if err := bs.resetToParent(tx, startingEntry); err != nil { - return err + + if err := bs.resetToParent(tx, startingBatch); err != nil { + return nil, err } - for entry, err := range bs.getChainSequence(tx, startingId) { + for batch, err := range bs.getBatchesSequence(tx, firstBatchToPurge) { if err != nil { - return err + return nil, err } - if err := bs.deleteMainBlockWithChildren(tx, entry); err != nil { - return err + if err := bs.deleteBatchWithBlocksTx(tx, batch); err != nil { + return nil, err } + + purgedBatches = append(purgedBatches, batch.Id) } - return bs.commit(tx) + if err := bs.commit(tx); err != nil { + return nil, err + } + + return purgedBatches, nil } -func (bs *BlockStorage) resetToParent(tx db.RwTx, entry *blockEntry) error { - refToParent, err := scTypes.GetMainParentRef(&entry.Block) +func (bs *BlockStorage) resetToParent(tx db.RwTx, batch *batchEntry) error { + mainBlockEntry, err := bs.getBlockEntry(tx, batch.MainBlockId, true) + if err != nil { + return err + } + + refToParent, err := scTypes.GetMainParentRef(&mainBlockEntry.Block) if err != nil { return fmt.Errorf("failed to get main block parent ref: %w", err) } if err := bs.putLatestFetchedBlockTx(tx, types.MainShardId, refToParent); err != nil { return fmt.Errorf("failed to reset latest fetched block: %w", err) } - if err := bs.putLatestBatchIdTx(tx, entry.ParentBatchId); err != nil { + if err := bs.putLatestBatchIdTx(tx, batch.ParentId); err != nil { return fmt.Errorf("failed to reset latest batch id: %w", err) } return nil } -// getChainSequence iterates through a chain of blocks, starting from the block with the given id. -// It uses blockParentIdxTable to retrieve parent-child connections between blocks. -func (bs *BlockStorage) getChainSequence(tx db.RoTx, startingId scTypes.BlockId) iter.Seq2[*blockEntry, error] { - return func(yield func(*blockEntry, error) bool) { - startBlock, err := bs.getBlockEntry(tx, startingId, true) +// getBatchesSequence iterates through a chain of batches, starting from the batch with the given id. +// It uses batchParentIdxTable to retrieve parent-child connections between batches. +func (bs *BlockStorage) getBatchesSequence(tx db.RoTx, startingId scTypes.BatchId) iter.Seq2[*batchEntry, error] { + return func(yield func(*batchEntry, error) bool) { + startBatch, err := bs.getBatchTx(tx, startingId) if err != nil { yield(nil, err) return } - if !yield(startBlock, nil) { + if !yield(startBatch, nil) { return } - nextParentId := scTypes.IdFromBlock(&startBlock.Block) + nextParentId := startBatch.Id for { - nextIdBytes, err := tx.Get(blockParentIdxTable, nextParentId.Bytes()) + nextIdBytes, err := tx.Get(batchParentIdxTable, nextParentId.Bytes()) if err != nil && !errors.Is(err, db.ErrKeyNotFound) { - yield(nil, fmt.Errorf("failed to get parent idx entry, parentId=%s: %w", nextParentId, err)) + yield(nil, fmt.Errorf("failed to get parent batch idx entry, parentId=%s: %w", nextParentId, err)) return } if nextIdBytes == nil { break } - nextBlockEntry, err := bs.getBlockEntryBytesId(tx, nextIdBytes, true) + nextBatchEntry, err := bs.getBatchBytesIdTx(tx, nextIdBytes, true) if err != nil { yield(nil, err) return } - if !yield(nextBlockEntry, nil) { + if !yield(nextBatchEntry, nil) { return } - nextParentId = scTypes.IdFromBlock(&nextBlockEntry.Block) + nextParentId = nextBatchEntry.Id } } } @@ -716,11 +762,11 @@ func (bs *BlockStorage) getChainSequence(tx db.RoTx, startingId scTypes.BlockId) // 2. Deletes all main not yet proved blocks from the storage. func (bs *BlockStorage) ResetProgressNotProved(ctx context.Context) error { return bs.retryRunner.Do(ctx, func(ctx context.Context) error { - return bs.resetProgressNotProvenImpl(ctx) + return bs.resetProgressNotProvedImpl(ctx) }) } -func (bs *BlockStorage) resetProgressNotProvenImpl(ctx context.Context) error { +func (bs *BlockStorage) resetProgressNotProvedImpl(ctx context.Context) error { tx, err := bs.database.CreateRwTx(ctx) if err != nil { return err @@ -731,15 +777,15 @@ func (bs *BlockStorage) resetProgressNotProvenImpl(ctx context.Context) error { return fmt.Errorf("failed to reset latest fetched block: %w", err) } - for entry, err := range bs.storedBlocksIter(tx) { + for batch, err := range bs.getStoredBatchesSeq(tx) { if err != nil { return err } - if entry.Block.ShardId != types.MainShardId || entry.IsProved { + if batch.IsProved { continue } - if err := bs.deleteMainBlockWithChildren(tx, entry); err != nil { + if err := bs.deleteBatchWithBlocksTx(tx, batch); err != nil { return err } } @@ -771,7 +817,7 @@ func (bs *BlockStorage) getBlockEntryBytesId(tx db.RoTx, idBytes []byte, require return nil, fmt.Errorf("failed to get block with id=%s: %w", hex.EncodeToString(idBytes), err) } - entry, err := unmarshallEntry(idBytes, value) + entry, err := unmarshallEntry[blockEntry](idBytes, value) if err != nil { return nil, err } @@ -779,70 +825,137 @@ func (bs *BlockStorage) getBlockEntryBytesId(tx db.RoTx, idBytes []byte, require return entry, nil } -func (bs *BlockStorage) deleteMainBlockWithChildren(tx db.RwTx, mainShardEntry *blockEntry) error { - childIds, err := scTypes.ChildBlockIds(&mainShardEntry.Block) - if err != nil { +func (bs *BlockStorage) putBatchWithBlockTx(tx db.RwTx, batch *scTypes.BlockBatch) error { + if err := bs.addStoredCountTx(tx, 1); err != nil { return err } - blocksCount := int32(len(childIds) + 1) - if err := bs.addStoredCountTx(tx, -blocksCount); err != nil { + if batch.ParentId != nil { + err := tx.Put(batchParentIdxTable, batch.ParentId.Bytes(), batch.Id.Bytes()) + if err != nil && !errors.Is(err, db.ErrKeyNotFound) { + return fmt.Errorf( + "failed to put parent batch idx entry, batchId=%s, parentId=%s,: %w", batch.Id, batch.ParentId, err, + ) + } + } + + currentTime := bs.timer.NowTime() + + entry := newBatchEntry(batch, currentTime) + if err := bs.putBatchTx(tx, entry); err != nil { return err } - for _, childId := range childIds { - childEntry, err := bs.getBlockEntry(tx, childId, true) - if err != nil { - return fmt.Errorf("failed to get child block with id=%s: %w", childId, err) - } - if err := bs.deleteBlock(tx, childEntry); err != nil { + mainEntry := newBlockEntry(batch.MainShardBlock, batch, currentTime) + if err := bs.putBlockTx(tx, mainEntry); err != nil { + return err + } + + for _, childBlock := range batch.ChildBlocks { + childEntry := newBlockEntry(childBlock, batch, currentTime) + if err := bs.putBlockTx(tx, childEntry); err != nil { return err } } - if err := bs.deleteBlock(tx, mainShardEntry); err != nil { + return nil +} + +func (bs *BlockStorage) putBatchTx(tx db.RwTx, entry *batchEntry) error { + value, err := marshallEntry(entry) + if err != nil { + return fmt.Errorf("%w, id=%s", err, entry.Id) + } + + if err := tx.Put(batchesTable, entry.Id.Bytes(), value); err != nil { + return fmt.Errorf("failed to put batch with id=%s: %w", entry.Id, err) + } + + return nil +} + +func (bs *BlockStorage) getBatchTx(tx db.RoTx, id scTypes.BatchId) (*batchEntry, error) { + return bs.getBatchBytesIdTx(tx, id.Bytes(), true) +} + +func (bs *BlockStorage) getBatchBytesIdTx(tx db.RoTx, idBytes []byte, required bool) (*batchEntry, error) { + value, err := tx.Get(batchesTable, idBytes) + + switch { + case err == nil: + break + case errors.Is(err, db.ErrKeyNotFound) && required: + return nil, fmt.Errorf("%w, id=%s", scTypes.ErrBatchNotFound, hex.EncodeToString(idBytes)) + case errors.Is(err, db.ErrKeyNotFound): + return nil, nil + default: + return nil, fmt.Errorf("failed to get batch with id=%s: %w", hex.EncodeToString(idBytes), err) + } + + entry, err := unmarshallEntry[batchEntry](idBytes, value) + if err != nil { + return nil, err + } + + return entry, nil +} + +func (bs *BlockStorage) deleteBatchWithBlocksTx(tx db.RwTx, batch *batchEntry) error { + if err := bs.addStoredCountTx(tx, -1); err != nil { return err } + if err := tx.Delete(batchesTable, batch.Id.Bytes()); err != nil { + return fmt.Errorf("failed to delete batch with id=%s: %w", batch.Id, err) + } + + if batch.ParentId != nil { + err := tx.Delete(batchParentIdxTable, batch.ParentId.Bytes()) + if err != nil && !errors.Is(err, db.ErrKeyNotFound) { + return fmt.Errorf("failed to delete parent batch idx entry, parentId=%s: %w", batch.ParentId, err) + } + } + + if err := bs.deleteBlock(tx, batch.MainBlockId); err != nil { + return err + } + + for _, childId := range batch.ExecBlockIds { + if err := bs.deleteBlock(tx, childId); err != nil { + return err + } + } + return nil } func (bs *BlockStorage) putBlockTx(tx db.RwTx, entry *blockEntry) error { value, err := marshallEntry(entry) if err != nil { - return err + return fmt.Errorf("%w, hash=%s", err, entry.Block.Hash) } blockId := scTypes.IdFromBlock(&entry.Block) if err := tx.Put(blocksTable, blockId.Bytes(), value); err != nil { return fmt.Errorf("failed to put block %s: %w", blockId.String(), err) } - parentId := scTypes.ParentBlockId(&entry.Block) - if err := tx.Put(blockParentIdxTable, parentId.Bytes(), blockId.Bytes()); err != nil { - return fmt.Errorf("failed to put parent idx entry, parentId=%s: %w", parentId, err) - } return nil } -func (bs *BlockStorage) deleteBlock(tx db.RwTx, entry *blockEntry) error { - parentBlockId := scTypes.ParentBlockId(&entry.Block) - err := tx.Delete(blockParentIdxTable, parentBlockId.Bytes()) - if err != nil && !errors.Is(err, db.ErrKeyNotFound) { - return fmt.Errorf("failed to delete parent idx entry, parentId=%s: %w", parentBlockId, err) - } - - blockId := scTypes.IdFromBlock(&entry.Block) - if err := tx.Delete(blocksTable, blockId.Bytes()); err != nil { +func (bs *BlockStorage) deleteBlock(tx db.RwTx, blockId scTypes.BlockId) error { + err := tx.Delete(blocksTable, blockId.Bytes()) + if err != nil { return fmt.Errorf("failed to delete block with id=%s: %w", blockId, err) } return nil } -func (*BlockStorage) storedBlocksIter(tx db.RoTx) iter.Seq2[*blockEntry, error] { - return func(yield func(*blockEntry, error) bool) { - txIter, err := tx.Range(blocksTable, nil, nil) +// getStoredBatchesSeq returns a sequence of stored batches in an arbitrary order. +func (*BlockStorage) getStoredBatchesSeq(tx db.RoTx) iter.Seq2[*batchEntry, error] { + return func(yield func(*batchEntry, error) bool) { + txIter, err := tx.Range(batchesTable, nil, nil) if err != nil { yield(nil, err) return @@ -855,7 +968,7 @@ func (*BlockStorage) storedBlocksIter(tx db.RoTx) iter.Seq2[*blockEntry, error] yield(nil, err) return } - entry, err := unmarshallEntry(key, val) + entry, err := unmarshallEntry[batchEntry](key, val) if err != nil { yield(nil, err) return @@ -869,70 +982,71 @@ func (*BlockStorage) storedBlocksIter(tx db.RoTx) iter.Seq2[*blockEntry, error] } func (bs *BlockStorage) addStoredCountTx(tx db.RwTx, delta int32) error { - currentBlocksCount, err := bs.getBlocksCountTx(tx) + currentBatchesCount, err := bs.getBatchesCountTx(tx) if err != nil { return err } - signed := int32(currentBlocksCount) + delta + signed := int32(currentBatchesCount) + delta if signed < 0 { return fmt.Errorf( - "blocks count cannot be negative: delta=%d, current blocks count=%d", delta, currentBlocksCount, + "batches count cannot be negative: delta=%d, current blocks count=%d", delta, currentBatchesCount, ) } - newBlocksCount := uint32(signed) - if newBlocksCount > bs.config.CapacityLimit { + newBatchesCount := uint32(signed) + if newBatchesCount > bs.config.CapacityLimit { return fmt.Errorf( "%w: delta is %d, current storage size is %d, capacity limit is %d", - ErrCapacityLimitReached, delta, currentBlocksCount, bs.config.CapacityLimit, + ErrCapacityLimitReached, delta, currentBatchesCount, bs.config.CapacityLimit, ) } - return bs.putBlocksCountTx(tx, newBlocksCount) + return bs.putBatchesCountTx(tx, newBatchesCount) } -func (bs *BlockStorage) getBlocksCountTx(tx db.RoTx) (uint32, error) { - bytes, err := tx.Get(storedBlocksCountTable, mainShardKey) +func (bs *BlockStorage) getBatchesCountTx(tx db.RoTx) (uint32, error) { + bytes, err := tx.Get(storedBatchesCountTable, mainShardKey) switch { case err == nil: break case errors.Is(err, db.ErrKeyNotFound): return 0, nil default: - return 0, fmt.Errorf("failed to get blocks count: %w", err) + return 0, fmt.Errorf("failed to get batches count: %w", err) } count := binary.LittleEndian.Uint32(bytes) return count, nil } -func (bs *BlockStorage) putBlocksCountTx(tx db.RwTx, newValue uint32) error { +func (bs *BlockStorage) putBatchesCountTx(tx db.RwTx, newValue uint32) error { bytes := make([]byte, 4) binary.LittleEndian.PutUint32(bytes, newValue) - err := tx.Put(storedBlocksCountTable, mainShardKey, bytes) + err := tx.Put(storedBatchesCountTable, mainShardKey, bytes) if err != nil { - return fmt.Errorf("failed to put blocks count: %w (newValue is %d)", err, newValue) + return fmt.Errorf("failed to put batches count: %w (newValue is %d)", err, newValue) } return nil } -func marshallEntry(entry *blockEntry) ([]byte, error) { +func marshallEntry[E any](entry *E) ([]byte, error) { bytes, err := json.Marshal(entry) if err != nil { return nil, fmt.Errorf( - "%w: failed to encode block with hash %s: %w", ErrSerializationFailed, entry.Block.Hash, err, + "%w: failed to marshall entry: %w", ErrSerializationFailed, err, ) } return bytes, nil } -func unmarshallEntry(key []byte, val []byte) (*blockEntry, error) { - entry := &blockEntry{} +func unmarshallEntry[E any](key []byte, val []byte) (*E, error) { + entry := new(E) + if err := json.Unmarshal(val, entry); err != nil { return nil, fmt.Errorf( - "%w: failed to unmarshall block entry with id=%s: %w", ErrSerializationFailed, hex.EncodeToString(key), err, + "%w: failed to unmarshall entry with id=%s: %w", ErrSerializationFailed, hex.EncodeToString(key), err, ) } diff --git a/nil/services/synccommittee/internal/storage/block_storage_test.go b/nil/services/synccommittee/internal/storage/block_storage_test.go index 75fb78db5..e2f24b483 100644 --- a/nil/services/synccommittee/internal/storage/block_storage_test.go +++ b/nil/services/synccommittee/internal/storage/block_storage_test.go @@ -10,7 +10,6 @@ import ( "github.com/NilFoundation/nil/nil/common/check" "github.com/NilFoundation/nil/nil/common/logging" "github.com/NilFoundation/nil/nil/internal/db" - "github.com/NilFoundation/nil/nil/internal/types" "github.com/NilFoundation/nil/nil/services/rpc/jsonrpc" "github.com/NilFoundation/nil/nil/services/synccommittee/internal/metrics" "github.com/NilFoundation/nil/nil/services/synccommittee/internal/testaide" @@ -93,7 +92,7 @@ func (s *BlockStorageTestSuite) TestSetBlockBatchSequentially_GetConcurrently() } func (s *BlockStorageTestSuite) Test_SetBlockBatch_Capacity_Exceeded() { - allowedBatchesCount := int(DefaultBlockStorageConfig().CapacityLimit) / testaide.BatchSize + allowedBatchesCount := int(DefaultBlockStorageConfig().CapacityLimit) batches := testaide.NewBatchesSequence(allowedBatchesCount) for _, batch := range batches { @@ -111,10 +110,10 @@ func (s *BlockStorageTestSuite) Test_SetBlockBatch_Capacity_Exceeded() { s.Require().ErrorIs(err, ErrCapacityLimitReached) } -func (s *BlockStorageTestSuite) Test_SetBlockBatch_Free_Capacity_On_SetBlockAsProposed() { +func (s *BlockStorageTestSuite) Test_SetBlockBatch_Free_Capacity_On_SetBatchAsProposed() { batches := testaide.NewBatchesSequence(2) - capacityLimit := batches[0].BlocksCount() + const capacityLimit = 1 config := NewBlockStorageConfig(capacityLimit) storage := s.newTestBlockStorage(config) @@ -124,10 +123,10 @@ func (s *BlockStorageTestSuite) Test_SetBlockBatch_Free_Capacity_On_SetBlockAsPr err = storage.SetBlockBatch(s.ctx, batches[1]) s.Require().ErrorIs(err, ErrCapacityLimitReached) - provedMainBlockId := scTypes.IdFromBlock(batches[0].MainShardBlock) - err = storage.SetBlockAsProved(s.ctx, provedMainBlockId) + provedBatchId := batches[0].Id + err = storage.SetBatchAsProved(s.ctx, provedBatchId) s.Require().NoError(err) - err = storage.SetBlockAsProposed(s.ctx, provedMainBlockId) + err = storage.SetBatchAsProposed(s.ctx, provedBatchId) s.Require().NoError(err) err = storage.SetBlockBatch(s.ctx, batches[1]) @@ -183,52 +182,51 @@ func (s *BlockStorageTestSuite) TestGetLastFetchedBlock() { s.Require().Equal(batch.MainShardBlock.Hash, latestFetched.Hash) } -func (s *BlockStorageTestSuite) TestSetBlockAsProved_DoesNotExist() { - randomId := testaide.RandomBlockId() - err := s.bs.SetBlockAsProved(s.ctx, randomId) - s.Require().Errorf(err, "block with id=%s is not found", randomId.String()) +func (s *BlockStorageTestSuite) Test_SetBatchAsProved_Batch_Does_Not_Exist() { + randomId := scTypes.NewBatchId() + err := s.bs.SetBatchAsProved(s.ctx, randomId) + s.Require().ErrorIs(err, scTypes.ErrBatchNotFound) } -func (s *BlockStorageTestSuite) TestSetBlockAsProved() { +func (s *BlockStorageTestSuite) Test_SetBatchAsProved() { batch := testaide.NewBlockBatch(3) err := s.bs.SetBlockBatch(s.ctx, batch) s.Require().NoError(err) - err = s.bs.SetBlockAsProved(s.ctx, scTypes.IdFromBlock(batch.MainShardBlock)) + err = s.bs.SetBatchAsProved(s.ctx, batch.Id) s.Require().NoError(err) } -func (s *BlockStorageTestSuite) Test_SetBlockAsProved_Multiple_Times() { +func (s *BlockStorageTestSuite) Test_SetBatchAsProved_Multiple_Times() { batch := testaide.NewBlockBatch(3) - mainBlockId := scTypes.IdFromBlock(batch.MainShardBlock) err := s.bs.SetBlockBatch(s.ctx, batch) s.Require().NoError(err) for range 3 { - err := s.bs.SetBlockAsProved(s.ctx, mainBlockId) + err := s.bs.SetBatchAsProved(s.ctx, batch.Id) s.Require().NoError(err) } - err = s.bs.SetBlockAsProposed(s.ctx, mainBlockId) + err = s.bs.SetBatchAsProposed(s.ctx, batch.Id) s.Require().NoError(err) } -func (s *BlockStorageTestSuite) TestSetBlockAsProposed_DoesNotExist() { - randomId := testaide.RandomBlockId() - err := s.bs.SetBlockAsProposed(s.ctx, randomId) - s.Require().Errorf(err, "block with id=%s is not found", randomId.String()) +func (s *BlockStorageTestSuite) Test_SetBatchAsProposed_Batch_Does_Not_Exist() { + randomId := scTypes.NewBatchId() + err := s.bs.SetBatchAsProposed(s.ctx, randomId) + s.Require().ErrorIs(err, scTypes.ErrBatchNotFound) } -func (s *BlockStorageTestSuite) TestSetBlockAsProposed_IsNotProved() { +func (s *BlockStorageTestSuite) Test_SetBatchAsProposed_Batch_Is_Not_Proved() { batch := testaide.NewBlockBatch(3) err := s.bs.SetBlockBatch(s.ctx, batch) s.Require().NoError(err) - err = s.bs.SetBlockAsProposed(s.ctx, scTypes.IdFromBlock(batch.MainShardBlock)) - s.Require().Errorf(err, "block with hash=%s is not proved", batch.MainShardBlock.Hash.String()) + err = s.bs.SetBatchAsProposed(s.ctx, batch.Id) + s.Require().ErrorIs(err, scTypes.ErrBatchNotProved) } -func (s *BlockStorageTestSuite) TestSetBlockBatch_ParentHashMismatch() { +func (s *BlockStorageTestSuite) Test_SetBlockBatch_ParentHashMismatch() { prevBatch := testaide.NewBlockBatch(4) err := s.bs.SetBlockBatch(s.ctx, prevBatch) @@ -281,10 +279,9 @@ func (s *BlockStorageTestSuite) TestSetBlockBatch_ParentMismatch() { } } -func (s *BlockStorageTestSuite) TestSetBlockAsProposed_WithExecutionShardBlocks() { +func (s *BlockStorageTestSuite) Test_SetBatchAsProposed_WithExecutionShardBlocks() { const childBlocksCount = 3 batch := testaide.NewBlockBatch(childBlocksCount) - mainBlockId := scTypes.IdFromBlock(batch.MainShardBlock) err := s.bs.SetBlockBatch(s.ctx, batch) s.Require().NoError(err) @@ -296,10 +293,10 @@ func (s *BlockStorageTestSuite) TestSetBlockAsProposed_WithExecutionShardBlocks( err = s.bs.SetBlockBatch(s.ctx, nextBatch) s.Require().NoError(err) - err = s.bs.SetBlockAsProved(s.ctx, mainBlockId) + err = s.bs.SetBatchAsProved(s.ctx, batch.Id) s.Require().NoError(err) - err = s.bs.SetBlockAsProposed(s.ctx, mainBlockId) + err = s.bs.SetBatchAsProposed(s.ctx, batch.Id) s.Require().NoError(err) allBlocks := make([]*jsonrpc.RPCBlock, 0) @@ -313,13 +310,13 @@ func (s *BlockStorageTestSuite) TestSetBlockAsProposed_WithExecutionShardBlocks( } } -func (s *BlockStorageTestSuite) TestTryGetNextProposalData_NotInitializedStateRoot() { +func (s *BlockStorageTestSuite) Test_TryGetNextProposalData_NotInitializedStateRoot() { data, err := s.bs.TryGetNextProposalData(s.ctx) s.Require().Nil(data) s.Require().Error(err, "proved state root was not initialized") } -func (s *BlockStorageTestSuite) TestTryGetNextProposalData_BlockParentHashNotSet() { +func (s *BlockStorageTestSuite) Test_TryGetNextProposalData_BlockParentHashNotSet() { err := s.bs.SetProvedStateRoot(s.ctx, testaide.RandomHash()) s.Require().NoError(err) @@ -328,7 +325,7 @@ func (s *BlockStorageTestSuite) TestTryGetNextProposalData_BlockParentHashNotSet s.Require().NoError(err) } -func (s *BlockStorageTestSuite) TestTryGetNextProposalData_NoProvedMainShardBlockFound() { +func (s *BlockStorageTestSuite) Test_TryGetNextProposalData_NoProvedMainShardBlockFound() { err := s.bs.SetProvedStateRoot(s.ctx, testaide.RandomHash()) s.Require().NoError(err) @@ -341,7 +338,7 @@ func (s *BlockStorageTestSuite) TestTryGetNextProposalData_NoProvedMainShardBloc s.Require().NoError(err) } -func (s *BlockStorageTestSuite) TestTryGetNextProposalData_Collect_Transactions() { +func (s *BlockStorageTestSuite) Test_TryGetNextProposalData_Collect_Transactions() { err := s.bs.SetProvedStateRoot(s.ctx, testaide.RandomHash()) s.Require().NoError(err, "failed to set initial state root") @@ -358,7 +355,7 @@ func (s *BlockStorageTestSuite) TestTryGetNextProposalData_Collect_Transactions( err = s.bs.SetBlockBatch(s.ctx, batch) s.Require().NoError(err) - err = s.bs.SetBlockAsProved(s.ctx, scTypes.IdFromBlock(batch.MainShardBlock)) + err = s.bs.SetBatchAsProved(s.ctx, batch.Id) s.Require().NoError(err) data, err := s.bs.TryGetNextProposalData(s.ctx) @@ -367,7 +364,7 @@ func (s *BlockStorageTestSuite) TestTryGetNextProposalData_Collect_Transactions( s.Require().Len(data.Transactions, expectedTxCount) } -func (s *BlockStorageTestSuite) TestTryGetNextProposalData_Concurrently() { +func (s *BlockStorageTestSuite) Test_TryGetNextProposalData_Concurrently() { initialStateRoot := testaide.RandomHash() err := s.bs.SetProvedStateRoot(s.ctx, initialStateRoot) s.Require().NoError(err, "failed to set initial state root") @@ -384,11 +381,11 @@ func (s *BlockStorageTestSuite) TestTryGetNextProposalData_Concurrently() { waitGroup := sync.WaitGroup{} waitGroup.Add(blocksCount + 1) - // concurrently set blocks as proved - for _, block := range batches { + // concurrently set batches as proved + for _, batch := range batches { go func() { - err := s.bs.SetBlockAsProved(s.ctx, scTypes.IdFromBlock(block.MainShardBlock)) - s.NoError(err, "failed to set block as proved") + err := s.bs.SetBatchAsProved(s.ctx, batch.Id) + s.NoError(err, "failed to set batch as proved") waitGroup.Done() }() } @@ -413,9 +410,8 @@ func (s *BlockStorageTestSuite) TestTryGetNextProposalData_Concurrently() { } receivedData = append(receivedData, data) - blockId := scTypes.NewBlockId(types.MainShardId, data.MainShardBlockHash) - err = s.bs.SetBlockAsProposed(s.ctx, blockId) - s.NoError(err, "failed to set block as proposed") + err = s.bs.SetBatchAsProposed(s.ctx, data.BatchId) + s.NoError(err, "failed to set batch as proposed") } } waitGroup.Done() @@ -439,12 +435,12 @@ func (s *BlockStorageTestSuite) TestTryGetNextProposalData_Concurrently() { s.Equal(batch.MainShardBlock.Hash, data.MainShardBlockHash, txn("MainShardBlockHash")) s.Len(data.Transactions, expectedTxCount, txn("Transactions count")) - s.Equal(batch.MainShardBlock.ChildBlocksRootHash, data.NewProvedStateRoot, txn("NewProvedStateRoot")) + s.Equal(batch.MainShardBlock.Hash, data.NewProvedStateRoot, txn("NewProvedStateRoot")) if idx == 0 { s.Equal(initialStateRoot, data.OldProvedStateRoot, txn("OldProvedStateRoot")) } else { - s.Equal(batches[idx-1].MainShardBlock.ChildBlocksRootHash, data.OldProvedStateRoot, txn("OldProvedStateRoot")) + s.Equal(batches[idx-1].MainShardBlock.Hash, data.OldProvedStateRoot, txn("OldProvedStateRoot")) } } } @@ -464,9 +460,10 @@ func (s *BlockStorageTestSuite) Test_ResetProgressPartial_Block_Does_Not_Exists( latestBatchIdBeforeReset, err := s.bs.TryGetLatestBatchId(s.ctx) s.Require().NoError(err) - nonExistentBlockHash := testaide.RandomHash() - err = s.bs.ResetProgressPartial(s.ctx, nonExistentBlockHash) - s.Require().ErrorIs(err, scTypes.ErrBlockNotFound) + nonExistentBatchId := scTypes.NewBatchId() + purgedBatches, err := s.bs.ResetProgressPartial(s.ctx, nonExistentBatchId) + s.Require().ErrorIs(err, scTypes.ErrBatchNotFound) + s.Require().Empty(purgedBatches) for _, batch := range batches { s.requireBatch(batch, false) @@ -502,7 +499,7 @@ func (s *BlockStorageTestSuite) Test_ResetProgressPartial() { } } -func (s *BlockStorageTestSuite) testResetProgressPartial(passedHashIdx int) { +func (s *BlockStorageTestSuite) testResetProgressPartial(firstBatchToPurgeIdx int) { s.T().Helper() batches := testaide.NewBatchesSequence(resetTestBatchesCount) @@ -512,20 +509,24 @@ func (s *BlockStorageTestSuite) testResetProgressPartial(passedHashIdx int) { s.Require().NoError(err) } - firstMainHashToPurge := batches[passedHashIdx].MainShardBlock.Hash - err := s.bs.ResetProgressPartial(s.ctx, firstMainHashToPurge) + firstBatchIdToPurge := batches[firstBatchToPurgeIdx].Id + purgedBatches, err := s.bs.ResetProgressPartial(s.ctx, firstBatchIdToPurge) s.Require().NoError(err) + s.Require().Len(purgedBatches, len(batches)-firstBatchToPurgeIdx) + for i, batch := range batches[firstBatchToPurgeIdx:] { + s.Require().Equal(batch.Id, purgedBatches[i]) + } for i, batch := range batches { - shouldBePurged := i >= passedHashIdx + shouldBePurged := i >= firstBatchToPurgeIdx s.requireBatch(batch, shouldBePurged) } var expectedLatestFetched *scTypes.MainBlockRef - if passedHashIdx == 0 { + if firstBatchToPurgeIdx == 0 { expectedLatestFetched = nil } else { - expectedLatestFetched, _ = scTypes.NewBlockRef(batches[passedHashIdx-1].MainShardBlock) + expectedLatestFetched, _ = scTypes.NewBlockRef(batches[firstBatchToPurgeIdx-1].MainShardBlock) } actualLatestFetched, err := s.bs.TryGetLatestFetched(s.ctx) @@ -534,7 +535,7 @@ func (s *BlockStorageTestSuite) testResetProgressPartial(passedHashIdx int) { actualLatestBatchId, err := s.bs.TryGetLatestBatchId(s.ctx) s.Require().NoError(err) - s.Require().Equal(batches[passedHashIdx].ParentId, actualLatestBatchId) + s.Require().Equal(batches[firstBatchToPurgeIdx].ParentId, actualLatestBatchId) } func (s *BlockStorageTestSuite) Test_ResetProgressNonProved() { @@ -548,7 +549,7 @@ func (s *BlockStorageTestSuite) Test_ResetProgressNonProved() { const provedBatchesCount = 3 provedBatches := batches[:provedBatchesCount] for _, batch := range provedBatches { - err := s.bs.SetBlockAsProved(s.ctx, scTypes.IdFromBlock(batch.MainShardBlock)) + err := s.bs.SetBatchAsProved(s.ctx, batch.Id) s.Require().NoError(err) } @@ -568,12 +569,12 @@ func (s *BlockStorageTestSuite) Test_ResetProgressNonProved() { } } -func (s *BlockStorageTestSuite) Test_ResetProgressNonProved_10K_Blocks_To_Purge() { - capacityLimit := uint32(10_000) +func (s *BlockStorageTestSuite) Test_ResetProgressNonProved_1K_Batches_To_Purge() { + capacityLimit := uint32(1_000) config := NewBlockStorageConfig(capacityLimit) storage := s.newTestBlockStorage(config) - batchesCount := int(capacityLimit) / testaide.BatchSize + batchesCount := int(capacityLimit) batches := testaide.NewBatchesSequence(batchesCount) for _, batch := range batches { @@ -589,7 +590,9 @@ func (s *BlockStorageTestSuite) Test_ResetProgressNonProved_10K_Blocks_To_Purge( s.Require().Nil(latestFetched) for _, batch := range batches { - s.requireBatch(batch, true) + fromStorage, err := s.bs.TryGetBlock(s.ctx, scTypes.IdFromBlock(batch.MainShardBlock)) + s.Require().NoError(err) + s.Require().Nil(fromStorage) } } diff --git a/nil/services/synccommittee/internal/storage/errors.go b/nil/services/synccommittee/internal/storage/errors.go index bbc515629..ba4db1149 100644 --- a/nil/services/synccommittee/internal/storage/errors.go +++ b/nil/services/synccommittee/internal/storage/errors.go @@ -3,8 +3,9 @@ package storage import "errors" var ( - ErrTaskAlreadyExists = errors.New("task with a given identifier already exists") - ErrSerializationFailed = errors.New("failed to serialize/deserialize object") - ErrCapacityLimitReached = errors.New("storage capacity limit reached") - errNilTaskEntry = errors.New("task entry cannot be nil") + ErrStateRootNotInitialized = errors.New("proved state root is not initialized") + ErrTaskAlreadyExists = errors.New("task with a given identifier already exists") + ErrSerializationFailed = errors.New("failed to serialize/deserialize object") + ErrCapacityLimitReached = errors.New("storage capacity limit reached") + errNilTaskEntry = errors.New("task entry cannot be nil") ) diff --git a/nil/services/synccommittee/internal/types/block_batch.go b/nil/services/synccommittee/internal/types/block_batch.go index b63d24a13..4075b1c55 100644 --- a/nil/services/synccommittee/internal/types/block_batch.go +++ b/nil/services/synccommittee/internal/types/block_batch.go @@ -11,8 +11,10 @@ import ( ) var ( - ErrBatchNotReady = errors.New("batch is not ready for handling") - ErrBatchMismatch = errors.New("batch mismatch") + ErrBatchNotReady = errors.New("batch is not ready for handling") + ErrBatchMismatch = errors.New("batch mismatch") + ErrBatchNotProved = errors.New("batch is not proved") + ErrBlockMismatch = errors.New("block mismatch") ) // BatchId Unique ID of a batch of blocks. diff --git a/nil/services/synccommittee/internal/types/blocks.go b/nil/services/synccommittee/internal/types/blocks.go index 9fd26c848..57c603cd8 100644 --- a/nil/services/synccommittee/internal/types/blocks.go +++ b/nil/services/synccommittee/internal/types/blocks.go @@ -130,10 +130,6 @@ func IdFromBlock(block *jsonrpc.RPCBlock) BlockId { return BlockId{block.ShardId, block.Hash} } -func ParentBlockId(block *jsonrpc.RPCBlock) BlockId { - return BlockId{block.ShardId, block.ParentHash} -} - func ChildBlockIds(mainShardBlock *jsonrpc.RPCBlock) ([]BlockId, error) { if mainShardBlock == nil { return nil, errors.New("mainShardBlock cannot be nil") @@ -216,6 +212,7 @@ func NewTransaction(transaction *jsonrpc.RPCInTransaction) *PrunedTransaction { } type ProposalData struct { + BatchId BatchId MainShardBlockHash common.Hash Transactions []*PrunedTransaction OldProvedStateRoot common.Hash diff --git a/nil/services/synccommittee/internal/types/errors.go b/nil/services/synccommittee/internal/types/errors.go index 152391243..5b9b41cd8 100644 --- a/nil/services/synccommittee/internal/types/errors.go +++ b/nil/services/synccommittee/internal/types/errors.go @@ -10,7 +10,7 @@ import ( var ( ErrBlockNotFound = errors.New("block with the specified id is not found") - ErrBlockMismatch = errors.New("block mismatch") + ErrBatchNotFound = errors.New("batch with the specified id is not found") ErrBlockProcessing = errors.New("block processing error") )