Skip to content

Commit

Permalink
# Sync Committee: Transition to Batch-Level Operations
Browse files Browse the repository at this point in the history
## Overview
This update replaces block-level operations with batch-level processing across the Sync Committee services to:
- Simplify task cancellation logic.
- Support the new multi-block batching model.

## Block Storage Changes
- Introduced `batchEntry` struct to hold references to blocks, stored in a new dedicated table.
- Removed the `IsProved` field from `blockEntry`; it is now associated with the batch.
- Replaced `SetBlockAsProved(BlockId)` with `SetBatchAsProved(BatchId)`.
- Updated `TryGetNextProposalData()` to use the main shard block hash as the state root instead of `ChildBlocksRootHash`.
- Replaced `SetBlockAsProposed(BlockId)` with `SetBatchAsProposed(BatchId)`.

## Partial Reset Enhancements
- The starting point is now determined by `BatchId` instead of the main shard block hash.
- Switched from block-based iteration to a mechanism using lightweight batch headers (`batchEntry`), reducing the impact of individual block size on reset transaction size.
- Adjusted the default capacity limit from **200 blocks** to **100 batches**.
  • Loading branch information
zadykian committed Mar 7, 2025
1 parent 29105fd commit feb1cd7
Show file tree
Hide file tree
Showing 15 changed files with 425 additions and 300 deletions.
4 changes: 2 additions & 2 deletions nil/services/synccommittee/core/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ type aggregator struct {
blockStorage AggregatorBlockStorage
taskStorage AggregatorTaskStorage
batchCommitter batches.BatchCommitter
resetter reset.StateResetter
resetter *reset.StateResetter
timer common.Timer
metrics AggregatorMetrics
workerAction *concurrent.Suspendable
Expand All @@ -70,7 +70,7 @@ func NewAggregator(
rpcClient client.Client,
blockStorage AggregatorBlockStorage,
taskStorage AggregatorTaskStorage,
resetter reset.StateResetter,
resetter *reset.StateResetter,
timer common.Timer,
logger zerolog.Logger,
metrics AggregatorMetrics,
Expand Down
6 changes: 3 additions & 3 deletions nil/services/synccommittee/core/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,12 +142,12 @@ func (s *AggregatorTestSuite) Test_Main_Parent_Hash_Mismatch() {
for _, provedBatch := range batches[:2] {
err := s.blockStorage.SetBlockBatch(s.ctx, provedBatch)
s.Require().NoError(err)
err = s.blockStorage.SetBlockAsProved(s.ctx, scTypes.IdFromBlock(provedBatch.MainShardBlock))
err = s.blockStorage.SetBatchAsProved(s.ctx, provedBatch.Id)
s.Require().NoError(err)
}

// Set first batch as proposed, latestProvedStateRoot value is updated
err := s.blockStorage.SetBlockAsProposed(s.ctx, scTypes.IdFromBlock(batches[0].MainShardBlock))
err := s.blockStorage.SetBatchAsProposed(s.ctx, batches[0].Id)
s.Require().NoError(err)
latestProved, err := s.blockStorage.TryGetProvedStateRoot(s.ctx)
s.Require().NoError(err)
Expand Down Expand Up @@ -208,7 +208,7 @@ func (s *AggregatorTestSuite) Test_Fetch_Next_Valid() {

func (s *AggregatorTestSuite) Test_Block_Storage_Capacity_Exceeded() {
// only one test batch can fit in the storage
storageConfig := storage.NewBlockStorageConfig(testaide.BatchSize)
storageConfig := storage.NewBlockStorageConfig(1)
blockStorage := s.newTestBlockStorage(storageConfig)

batches := testaide.NewBatchesSequence(2)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,6 @@ func newTestSuccessProviderResult(taskToExecute *types.Task, executorId types.Ta

type noopStateResetLauncher struct{}

func (l *noopStateResetLauncher) LaunchPartialResetWithSuspension(_ context.Context, _ common.Hash) error {
func (l *noopStateResetLauncher) LaunchPartialResetWithSuspension(_ context.Context, _ types.BatchId) error {
return nil
}
24 changes: 12 additions & 12 deletions nil/services/synccommittee/core/proposer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (

"github.com/NilFoundation/nil/nil/common"
"github.com/NilFoundation/nil/nil/common/concurrent"
"github.com/NilFoundation/nil/nil/internal/types"
"github.com/NilFoundation/nil/nil/services/synccommittee/internal/metrics"
"github.com/NilFoundation/nil/nil/services/synccommittee/internal/rollupcontract"
"github.com/NilFoundation/nil/nil/services/synccommittee/internal/srv"
Expand All @@ -25,7 +24,7 @@ type ProposerStorage interface {

TryGetNextProposalData(ctx context.Context) (*scTypes.ProposalData, error)

SetBlockAsProposed(ctx context.Context, id scTypes.BlockId) error
SetBatchAsProposed(ctx context.Context, id scTypes.BatchId) error
}

type ProposerMetrics interface {
Expand Down Expand Up @@ -106,12 +105,14 @@ func (p *proposer) Run(ctx context.Context, started chan<- struct{}) error {
return err
}

close(started)
if started != nil {
close(started)
}

concurrent.RunTickerLoop(ctx, p.params.ProposingInterval,
func(ctx context.Context) {
if err := p.proposeNextBlock(ctx); err != nil {
p.logger.Error().Err(err).Msg("error during proved blocks proposing")
if err := p.proposeNextBatch(ctx); err != nil {
p.logger.Error().Err(err).Msg("error during proved batches proposing")
p.metrics.RecordError(ctx, p.Name())
return
}
Expand Down Expand Up @@ -194,7 +195,7 @@ func (p *proposer) updateStoredStateRoot(ctx context.Context, stateRoot common.H
return nil
}

func (p *proposer) proposeNextBlock(ctx context.Context) error {
func (p *proposer) proposeNextBatch(ctx context.Context) error {
if p.rollupContractWrapper == nil {
err := p.initializeProvedStateRoot(ctx)
if err != nil {
Expand All @@ -203,22 +204,21 @@ func (p *proposer) proposeNextBlock(ctx context.Context) error {
}
data, err := p.storage.TryGetNextProposalData(ctx)
if err != nil {
return fmt.Errorf("failed get next block to propose: %w", err)
return fmt.Errorf("failed get next proposal data: %w", err)
}
if data == nil {
p.logger.Debug().Msg("no block to propose")
p.logger.Debug().Msg("no batches to propose")
return nil
}

err = p.sendProof(ctx, data)
if err != nil {
return fmt.Errorf("failed to send proof to L1 for block with hash=%s: %w", data.MainShardBlockHash, err)
return fmt.Errorf("failed to send proof to L1 for batch with id=%s: %w", data.BatchId, err)
}

blockId := scTypes.NewBlockId(types.MainShardId, data.MainShardBlockHash)
err = p.storage.SetBlockAsProposed(ctx, blockId)
err = p.storage.SetBatchAsProposed(ctx, data.BatchId)
if err != nil {
return fmt.Errorf("failed set block with hash=%s as proposed: %w", data.MainShardBlockHash, err)
return fmt.Errorf("failed set batch with id=%s as proposed: %w", data.BatchId, err)
}
return nil
}
Expand Down
55 changes: 33 additions & 22 deletions nil/services/synccommittee/core/reset/resetter.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,58 +2,69 @@ package reset

import (
"context"
"fmt"

"github.com/NilFoundation/nil/nil/common"
"github.com/NilFoundation/nil/nil/common/logging"
scTypes "github.com/NilFoundation/nil/nil/services/synccommittee/internal/types"
"github.com/rs/zerolog"
)

type StateResetter interface {
type BatchResetter interface {
// ResetProgressPartial resets Sync Committee's block processing progress
// to a point preceding main shard block with the specified hash.
ResetProgressPartial(ctx context.Context, firstMainHashToPurge common.Hash) error
// to a point preceding batch with the specified ID.
ResetProgressPartial(ctx context.Context, firstBatchToPurge scTypes.BatchId) (purgedBatches []scTypes.BatchId, err error)

// ResetProgressNotProved resets Sync Committee's progress for all not yet proven blocks.
ResetProgressNotProved(ctx context.Context) error
}

func NewStateResetter(logger zerolog.Logger, resetters ...StateResetter) StateResetter {
return &compositeStateResetter{
resetters: resetters,
logger: logger,
func NewStateResetter(logger zerolog.Logger, batchResetter BatchResetter) *StateResetter {
return &StateResetter{
batchResetter: batchResetter,
logger: logger,
}
}

type compositeStateResetter struct {
resetters []StateResetter
logger zerolog.Logger
type StateResetter struct {
batchResetter BatchResetter
logger zerolog.Logger
}

func (r *compositeStateResetter) ResetProgressPartial(ctx context.Context, firstMainHashToPurge common.Hash) error {
func (r *StateResetter) ResetProgressPartial(ctx context.Context, failedBatchId scTypes.BatchId) error {
r.logger.Info().
Stringer(logging.FieldBlockMainChainHash, firstMainHashToPurge).
Stringer(logging.FieldBatchId, failedBatchId).
Msg("Started partial progress reset")

for _, resetter := range r.resetters {
if err := resetter.ResetProgressPartial(ctx, firstMainHashToPurge); err != nil {
return err
purgedBatchIds, err := r.batchResetter.ResetProgressPartial(ctx, failedBatchId)
if err != nil {
return err
}

for _, batchId := range purgedBatchIds {
// Tasks associated with the failed batch should not be cancelled at this point,
// they will be marked as failed later
if batchId == failedBatchId {
continue
}

// todo: cancel tasks in the storage

r.logger.Info().Stringer(logging.FieldBatchId, batchId).Msg("Cancelled batch tasks")
// todo: push cancellation requests to executors
}

r.logger.Info().
Stringer(logging.FieldBlockMainChainHash, firstMainHashToPurge).
Stringer(logging.FieldBatchId, failedBatchId).
Msg("Finished partial progress reset")

return nil
}

func (r *compositeStateResetter) ResetProgressNotProved(ctx context.Context) error {
func (r *StateResetter) ResetProgressNotProved(ctx context.Context) error {
r.logger.Info().Msg("Started not proven progress reset")

for _, resetter := range r.resetters {
if err := resetter.ResetProgressNotProved(ctx); err != nil {
return err
}
if err := r.batchResetter.ResetProgressNotProved(ctx); err != nil {
return fmt.Errorf("failed to reset progress not proved batches: %w", err)
}

r.logger.Info().Msg("Finished not proven progress reset")
Expand Down
20 changes: 10 additions & 10 deletions nil/services/synccommittee/core/reset/state_reset_launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import (
"fmt"
"time"

"github.com/NilFoundation/nil/nil/common"
"github.com/NilFoundation/nil/nil/common/logging"
scTypes "github.com/NilFoundation/nil/nil/services/synccommittee/internal/types"
"github.com/rs/zerolog"
)

Expand All @@ -27,14 +27,14 @@ type Service interface {

type stateResetLauncher struct {
blockFetcher BlockFetcher
resetter StateResetter
resetter *StateResetter
service Service
logger zerolog.Logger
}

func NewResetLauncher(
blockFetcher BlockFetcher,
resetter StateResetter,
resetter *StateResetter,
service Service,
logger zerolog.Logger,
) *stateResetLauncher {
Expand All @@ -46,22 +46,22 @@ func NewResetLauncher(
}
}

func (l *stateResetLauncher) LaunchPartialResetWithSuspension(ctx context.Context, firstMainHashToPurge common.Hash) error {
func (l *stateResetLauncher) LaunchPartialResetWithSuspension(ctx context.Context, failedBatchId scTypes.BatchId) error {
l.logger.Info().
Stringer(logging.FieldBlockMainChainHash, firstMainHashToPurge).
Stringer(logging.FieldBlockMainChainHash, failedBatchId).
Msg("Launching state reset process")

if err := l.blockFetcher.Pause(ctx); err != nil {
return fmt.Errorf("failed to pause block fetching: %w", err)
}

if err := l.resetter.ResetProgressPartial(ctx, firstMainHashToPurge); err != nil {
l.onResetError(ctx, err, firstMainHashToPurge)
if err := l.resetter.ResetProgressPartial(ctx, failedBatchId); err != nil {
l.onResetError(ctx, err, failedBatchId)
return nil
}

l.logger.Info().
Stringer(logging.FieldBlockMainChainHash, firstMainHashToPurge).
Stringer(logging.FieldBlockMainChainHash, failedBatchId).
Msgf("State reset completed, block fetching will be resumed after %s", fetchResumeDelay)

detachedCtx := context.WithoutCancel(ctx)
Expand All @@ -72,9 +72,9 @@ func (l *stateResetLauncher) LaunchPartialResetWithSuspension(ctx context.Contex
}

func (l *stateResetLauncher) onResetError(
ctx context.Context, resetErr error, failedMainBlockHash common.Hash,
ctx context.Context, resetErr error, failedBatchId scTypes.BatchId,
) {
l.logger.Error().Err(resetErr).Stringer(logging.FieldBlockMainChainHash, failedMainBlockHash).Send()
l.logger.Error().Err(resetErr).Stringer(logging.FieldBatchId, failedBatchId).Send()
l.resumeBlockFetching(ctx)
}

Expand Down
2 changes: 1 addition & 1 deletion nil/services/synccommittee/core/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func New(cfg *Config, database db.DB, ethClient rollupcontract.EthClient) (*Sync
blockStorage := storage.NewBlockStorage(database, storage.DefaultBlockStorageConfig(), timer, metricsHandler, logger)
taskStorage := storage.NewTaskStorage(database, timer, metricsHandler, logger)

// todo: add reset logic to TaskStorage (implement StateResetter interface) and pass it here in https://github.com/NilFoundation/nil/pull/419
// todo: add reset logic to TaskStorage and pass it here
stateResetter := reset.NewStateResetter(logger, blockStorage)

agg := NewAggregator(
Expand Down
21 changes: 9 additions & 12 deletions nil/services/synccommittee/core/task_state_change_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,35 +4,34 @@ import (
"context"
"fmt"

"github.com/NilFoundation/nil/nil/common"
"github.com/NilFoundation/nil/nil/common/logging"
"github.com/NilFoundation/nil/nil/services/synccommittee/internal/api"
"github.com/NilFoundation/nil/nil/services/synccommittee/internal/log"
"github.com/NilFoundation/nil/nil/services/synccommittee/internal/types"
"github.com/rs/zerolog"
)

type ProvedBlockSetter interface {
SetBlockAsProved(ctx context.Context, blockId types.BlockId) error
type ProvedBatchSetter interface {
SetBatchAsProved(ctx context.Context, batchId types.BatchId) error
}

type StateResetLauncher interface {
LaunchPartialResetWithSuspension(ctx context.Context, failedMainBlockHash common.Hash) error
LaunchPartialResetWithSuspension(ctx context.Context, failedBatchId types.BatchId) error
}

type taskStateChangeHandler struct {
blockSetter ProvedBlockSetter
batchSetter ProvedBatchSetter
stateResetLauncher StateResetLauncher
logger zerolog.Logger
}

func newTaskStateChangeHandler(
blockSetter ProvedBlockSetter,
batchSetter ProvedBatchSetter,
stateResetLauncher StateResetLauncher,
logger zerolog.Logger,
) api.TaskStateChangeHandler {
return &taskStateChangeHandler{
blockSetter: blockSetter,
batchSetter: batchSetter,
stateResetLauncher: stateResetLauncher,
logger: logger,
}
Expand All @@ -53,7 +52,7 @@ func (h *taskStateChangeHandler) OnTaskTerminated(ctx context.Context, task *typ
default:
log.NewTaskResultEvent(h.logger, zerolog.WarnLevel, result).
Msg("task execution failed with critical error, state will be reset")
return h.stateResetLauncher.LaunchPartialResetWithSuspension(ctx, task.BlockHash)
return h.stateResetLauncher.LaunchPartialResetWithSuspension(ctx, task.BatchId)
}
}

Expand All @@ -68,10 +67,8 @@ func (h *taskStateChangeHandler) onTaskSuccess(ctx context.Context, task *types.
Stringer(logging.FieldBatchId, task.BatchId).
Msg("Proof batch completed")

blockId := types.NewBlockId(task.ShardId, task.BlockHash)

if err := h.blockSetter.SetBlockAsProved(ctx, blockId); err != nil {
return fmt.Errorf("failed to set block with id=%s as proved: %w", blockId, err)
if err := h.batchSetter.SetBatchAsProved(ctx, task.BatchId); err != nil {
return fmt.Errorf("failed to set batch with id=%s as proved: %w", task.BatchId, err)
}

return nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type SyncCommitteeMetricsHandler struct {
blockBatchSize telemetry.Histogram

// BlockStorageMetrics
totalMainBlocksProved telemetry.Counter
totalBatchesProved telemetry.Counter

// ProposerMetrics
totalL1TxSent telemetry.Counter
Expand Down Expand Up @@ -66,7 +66,7 @@ func (h *SyncCommitteeMetricsHandler) init(attributes metric.MeasurementOption,
func (h *SyncCommitteeMetricsHandler) initBlockStorageMetrics(meter telemetry.Meter) error {
var err error

if h.totalMainBlocksProved, err = meter.Int64Counter(namespace + "total_main_blocks_proved"); err != nil {
if h.totalBatchesProved, err = meter.Int64Counter(namespace + "total_batches_proved"); err != nil {
return err
}

Expand Down Expand Up @@ -112,8 +112,8 @@ func (h *SyncCommitteeMetricsHandler) RecordBlockBatchSize(ctx context.Context,
h.blockBatchSize.Record(ctx, batchSize, h.attributes)
}

func (h *SyncCommitteeMetricsHandler) RecordMainBlockProved(ctx context.Context) {
h.totalMainBlocksProved.Add(ctx, 1, h.attributes)
func (h *SyncCommitteeMetricsHandler) RecordBatchProved(ctx context.Context) {
h.totalBatchesProved.Add(ctx, 1, h.attributes)
}

func (h *SyncCommitteeMetricsHandler) RecordProposerTxSent(ctx context.Context, proposalData *types.ProposalData) {
Expand Down
Loading

0 comments on commit feb1cd7

Please sign in to comment.