Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sync Committee: Transition to Batch-Level Operations #505

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions nil/services/synccommittee/core/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ type aggregator struct {
blockStorage AggregatorBlockStorage
taskStorage AggregatorTaskStorage
batchCommitter batches.BatchCommitter
resetter reset.StateResetter
resetter *reset.StateResetter
timer common.Timer
metrics AggregatorMetrics
workerAction *concurrent.Suspendable
Expand All @@ -70,7 +70,7 @@ func NewAggregator(
rpcClient client.Client,
blockStorage AggregatorBlockStorage,
taskStorage AggregatorTaskStorage,
resetter reset.StateResetter,
resetter *reset.StateResetter,
timer common.Timer,
logger zerolog.Logger,
metrics AggregatorMetrics,
Expand Down
6 changes: 3 additions & 3 deletions nil/services/synccommittee/core/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,12 +142,12 @@ func (s *AggregatorTestSuite) Test_Main_Parent_Hash_Mismatch() {
for _, provedBatch := range batches[:2] {
err := s.blockStorage.SetBlockBatch(s.ctx, provedBatch)
s.Require().NoError(err)
err = s.blockStorage.SetBlockAsProved(s.ctx, scTypes.IdFromBlock(provedBatch.MainShardBlock))
err = s.blockStorage.SetBatchAsProved(s.ctx, provedBatch.Id)
s.Require().NoError(err)
}

// Set first batch as proposed, latestProvedStateRoot value is updated
err := s.blockStorage.SetBlockAsProposed(s.ctx, scTypes.IdFromBlock(batches[0].MainShardBlock))
err := s.blockStorage.SetBatchAsProposed(s.ctx, batches[0].Id)
s.Require().NoError(err)
latestProved, err := s.blockStorage.TryGetProvedStateRoot(s.ctx)
s.Require().NoError(err)
Expand Down Expand Up @@ -208,7 +208,7 @@ func (s *AggregatorTestSuite) Test_Fetch_Next_Valid() {

func (s *AggregatorTestSuite) Test_Block_Storage_Capacity_Exceeded() {
// only one test batch can fit in the storage
storageConfig := storage.NewBlockStorageConfig(testaide.BatchSize)
storageConfig := storage.NewBlockStorageConfig(1)
blockStorage := s.newTestBlockStorage(storageConfig)

batches := testaide.NewBatchesSequence(2)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,6 @@ func newTestSuccessProviderResult(taskToExecute *types.Task, executorId types.Ta

type noopStateResetLauncher struct{}

func (l *noopStateResetLauncher) LaunchPartialResetWithSuspension(_ context.Context, _ common.Hash) error {
func (l *noopStateResetLauncher) LaunchPartialResetWithSuspension(_ context.Context, _ types.BatchId) error {
return nil
}
24 changes: 12 additions & 12 deletions nil/services/synccommittee/core/proposer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (

"github.com/NilFoundation/nil/nil/common"
"github.com/NilFoundation/nil/nil/common/concurrent"
"github.com/NilFoundation/nil/nil/internal/types"
"github.com/NilFoundation/nil/nil/services/synccommittee/internal/metrics"
"github.com/NilFoundation/nil/nil/services/synccommittee/internal/rollupcontract"
"github.com/NilFoundation/nil/nil/services/synccommittee/internal/srv"
Expand All @@ -25,7 +24,7 @@ type ProposerStorage interface {

TryGetNextProposalData(ctx context.Context) (*scTypes.ProposalData, error)

SetBlockAsProposed(ctx context.Context, id scTypes.BlockId) error
SetBatchAsProposed(ctx context.Context, id scTypes.BatchId) error
}

type ProposerMetrics interface {
Expand Down Expand Up @@ -106,12 +105,14 @@ func (p *proposer) Run(ctx context.Context, started chan<- struct{}) error {
return err
}

close(started)
if started != nil {
close(started)
}

concurrent.RunTickerLoop(ctx, p.params.ProposingInterval,
func(ctx context.Context) {
if err := p.proposeNextBlock(ctx); err != nil {
p.logger.Error().Err(err).Msg("error during proved blocks proposing")
if err := p.proposeNextBatch(ctx); err != nil {
p.logger.Error().Err(err).Msg("error during proved batches proposing")
p.metrics.RecordError(ctx, p.Name())
return
}
Expand Down Expand Up @@ -194,7 +195,7 @@ func (p *proposer) updateStoredStateRoot(ctx context.Context, stateRoot common.H
return nil
}

func (p *proposer) proposeNextBlock(ctx context.Context) error {
func (p *proposer) proposeNextBatch(ctx context.Context) error {
if p.rollupContractWrapper == nil {
err := p.initializeProvedStateRoot(ctx)
if err != nil {
Expand All @@ -203,22 +204,21 @@ func (p *proposer) proposeNextBlock(ctx context.Context) error {
}
data, err := p.storage.TryGetNextProposalData(ctx)
if err != nil {
return fmt.Errorf("failed get next block to propose: %w", err)
return fmt.Errorf("failed get next proposal data: %w", err)
}
if data == nil {
p.logger.Debug().Msg("no block to propose")
p.logger.Debug().Msg("no batches to propose")
return nil
}

err = p.sendProof(ctx, data)
if err != nil {
return fmt.Errorf("failed to send proof to L1 for block with hash=%s: %w", data.MainShardBlockHash, err)
return fmt.Errorf("failed to send proof to L1 for batch with id=%s: %w", data.BatchId, err)
}

blockId := scTypes.NewBlockId(types.MainShardId, data.MainShardBlockHash)
err = p.storage.SetBlockAsProposed(ctx, blockId)
err = p.storage.SetBatchAsProposed(ctx, data.BatchId)
if err != nil {
return fmt.Errorf("failed set block with hash=%s as proposed: %w", data.MainShardBlockHash, err)
return fmt.Errorf("failed set batch with id=%s as proposed: %w", data.BatchId, err)
}
return nil
}
Expand Down
55 changes: 33 additions & 22 deletions nil/services/synccommittee/core/reset/resetter.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,58 +2,69 @@ package reset

import (
"context"
"fmt"

"github.com/NilFoundation/nil/nil/common"
"github.com/NilFoundation/nil/nil/common/logging"
scTypes "github.com/NilFoundation/nil/nil/services/synccommittee/internal/types"
"github.com/rs/zerolog"
)

type StateResetter interface {
type BatchResetter interface {
// ResetProgressPartial resets Sync Committee's block processing progress
// to a point preceding main shard block with the specified hash.
ResetProgressPartial(ctx context.Context, firstMainHashToPurge common.Hash) error
// to a point preceding batch with the specified ID.
ResetProgressPartial(ctx context.Context, firstBatchToPurge scTypes.BatchId) (purgedBatches []scTypes.BatchId, err error)

// ResetProgressNotProved resets Sync Committee's progress for all not yet proven blocks.
ResetProgressNotProved(ctx context.Context) error
}

func NewStateResetter(logger zerolog.Logger, resetters ...StateResetter) StateResetter {
return &compositeStateResetter{
resetters: resetters,
logger: logger,
func NewStateResetter(logger zerolog.Logger, batchResetter BatchResetter) *StateResetter {
return &StateResetter{
batchResetter: batchResetter,
logger: logger,
}
}

type compositeStateResetter struct {
resetters []StateResetter
logger zerolog.Logger
type StateResetter struct {
batchResetter BatchResetter
logger zerolog.Logger
}

func (r *compositeStateResetter) ResetProgressPartial(ctx context.Context, firstMainHashToPurge common.Hash) error {
func (r *StateResetter) ResetProgressPartial(ctx context.Context, failedBatchId scTypes.BatchId) error {
r.logger.Info().
Stringer(logging.FieldBlockMainChainHash, firstMainHashToPurge).
Stringer(logging.FieldBatchId, failedBatchId).
Msg("Started partial progress reset")

for _, resetter := range r.resetters {
if err := resetter.ResetProgressPartial(ctx, firstMainHashToPurge); err != nil {
return err
purgedBatchIds, err := r.batchResetter.ResetProgressPartial(ctx, failedBatchId)
if err != nil {
return err
}

for _, batchId := range purgedBatchIds {
// Tasks associated with the failed batch should not be cancelled at this point,
// they will be marked as failed later
if batchId == failedBatchId {
continue
}

// todo: cancel tasks in the storage

r.logger.Info().Stringer(logging.FieldBatchId, batchId).Msg("Cancelled batch tasks")
// todo: push cancellation requests to executors
}

r.logger.Info().
Stringer(logging.FieldBlockMainChainHash, firstMainHashToPurge).
Stringer(logging.FieldBatchId, failedBatchId).
Msg("Finished partial progress reset")

return nil
}

func (r *compositeStateResetter) ResetProgressNotProved(ctx context.Context) error {
func (r *StateResetter) ResetProgressNotProved(ctx context.Context) error {
r.logger.Info().Msg("Started not proven progress reset")

for _, resetter := range r.resetters {
if err := resetter.ResetProgressNotProved(ctx); err != nil {
return err
}
if err := r.batchResetter.ResetProgressNotProved(ctx); err != nil {
return fmt.Errorf("failed to reset progress not proved batches: %w", err)
}

r.logger.Info().Msg("Finished not proven progress reset")
Expand Down
20 changes: 10 additions & 10 deletions nil/services/synccommittee/core/reset/state_reset_launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import (
"fmt"
"time"

"github.com/NilFoundation/nil/nil/common"
"github.com/NilFoundation/nil/nil/common/logging"
scTypes "github.com/NilFoundation/nil/nil/services/synccommittee/internal/types"
"github.com/rs/zerolog"
)

Expand All @@ -27,14 +27,14 @@ type Service interface {

type stateResetLauncher struct {
blockFetcher BlockFetcher
resetter StateResetter
resetter *StateResetter
service Service
logger zerolog.Logger
}

func NewResetLauncher(
blockFetcher BlockFetcher,
resetter StateResetter,
resetter *StateResetter,
service Service,
logger zerolog.Logger,
) *stateResetLauncher {
Expand All @@ -46,22 +46,22 @@ func NewResetLauncher(
}
}

func (l *stateResetLauncher) LaunchPartialResetWithSuspension(ctx context.Context, firstMainHashToPurge common.Hash) error {
func (l *stateResetLauncher) LaunchPartialResetWithSuspension(ctx context.Context, failedBatchId scTypes.BatchId) error {
l.logger.Info().
Stringer(logging.FieldBlockMainChainHash, firstMainHashToPurge).
Stringer(logging.FieldBlockMainChainHash, failedBatchId).
Msg("Launching state reset process")

if err := l.blockFetcher.Pause(ctx); err != nil {
return fmt.Errorf("failed to pause block fetching: %w", err)
}

if err := l.resetter.ResetProgressPartial(ctx, firstMainHashToPurge); err != nil {
l.onResetError(ctx, err, firstMainHashToPurge)
if err := l.resetter.ResetProgressPartial(ctx, failedBatchId); err != nil {
l.onResetError(ctx, err, failedBatchId)
return nil
}

l.logger.Info().
Stringer(logging.FieldBlockMainChainHash, firstMainHashToPurge).
Stringer(logging.FieldBlockMainChainHash, failedBatchId).
Msgf("State reset completed, block fetching will be resumed after %s", fetchResumeDelay)

detachedCtx := context.WithoutCancel(ctx)
Expand All @@ -72,9 +72,9 @@ func (l *stateResetLauncher) LaunchPartialResetWithSuspension(ctx context.Contex
}

func (l *stateResetLauncher) onResetError(
ctx context.Context, resetErr error, failedMainBlockHash common.Hash,
ctx context.Context, resetErr error, failedBatchId scTypes.BatchId,
) {
l.logger.Error().Err(resetErr).Stringer(logging.FieldBlockMainChainHash, failedMainBlockHash).Send()
l.logger.Error().Err(resetErr).Stringer(logging.FieldBatchId, failedBatchId).Send()
l.resumeBlockFetching(ctx)
}

Expand Down
2 changes: 1 addition & 1 deletion nil/services/synccommittee/core/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func New(cfg *Config, database db.DB, ethClient rollupcontract.EthClient) (*Sync
blockStorage := storage.NewBlockStorage(database, storage.DefaultBlockStorageConfig(), timer, metricsHandler, logger)
taskStorage := storage.NewTaskStorage(database, timer, metricsHandler, logger)

// todo: add reset logic to TaskStorage (implement StateResetter interface) and pass it here in https://github.com/NilFoundation/nil/pull/419
// todo: add reset logic to TaskStorage and pass it here
stateResetter := reset.NewStateResetter(logger, blockStorage)

agg := NewAggregator(
Expand Down
21 changes: 9 additions & 12 deletions nil/services/synccommittee/core/task_state_change_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,35 +4,34 @@ import (
"context"
"fmt"

"github.com/NilFoundation/nil/nil/common"
"github.com/NilFoundation/nil/nil/common/logging"
"github.com/NilFoundation/nil/nil/services/synccommittee/internal/api"
"github.com/NilFoundation/nil/nil/services/synccommittee/internal/log"
"github.com/NilFoundation/nil/nil/services/synccommittee/internal/types"
"github.com/rs/zerolog"
)

type ProvedBlockSetter interface {
SetBlockAsProved(ctx context.Context, blockId types.BlockId) error
type ProvedBatchSetter interface {
SetBatchAsProved(ctx context.Context, batchId types.BatchId) error
}

type StateResetLauncher interface {
LaunchPartialResetWithSuspension(ctx context.Context, failedMainBlockHash common.Hash) error
LaunchPartialResetWithSuspension(ctx context.Context, failedBatchId types.BatchId) error
}

type taskStateChangeHandler struct {
blockSetter ProvedBlockSetter
batchSetter ProvedBatchSetter
stateResetLauncher StateResetLauncher
logger zerolog.Logger
}

func newTaskStateChangeHandler(
blockSetter ProvedBlockSetter,
batchSetter ProvedBatchSetter,
stateResetLauncher StateResetLauncher,
logger zerolog.Logger,
) api.TaskStateChangeHandler {
return &taskStateChangeHandler{
blockSetter: blockSetter,
batchSetter: batchSetter,
stateResetLauncher: stateResetLauncher,
logger: logger,
}
Expand All @@ -53,7 +52,7 @@ func (h *taskStateChangeHandler) OnTaskTerminated(ctx context.Context, task *typ
default:
log.NewTaskResultEvent(h.logger, zerolog.WarnLevel, result).
Msg("task execution failed with critical error, state will be reset")
return h.stateResetLauncher.LaunchPartialResetWithSuspension(ctx, task.BlockHash)
return h.stateResetLauncher.LaunchPartialResetWithSuspension(ctx, task.BatchId)
}
}

Expand All @@ -68,10 +67,8 @@ func (h *taskStateChangeHandler) onTaskSuccess(ctx context.Context, task *types.
Stringer(logging.FieldBatchId, task.BatchId).
Msg("Proof batch completed")

blockId := types.NewBlockId(task.ShardId, task.BlockHash)

if err := h.blockSetter.SetBlockAsProved(ctx, blockId); err != nil {
return fmt.Errorf("failed to set block with id=%s as proved: %w", blockId, err)
if err := h.batchSetter.SetBatchAsProved(ctx, task.BatchId); err != nil {
return fmt.Errorf("failed to set batch with id=%s as proved: %w", task.BatchId, err)
}

return nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type SyncCommitteeMetricsHandler struct {
blockBatchSize telemetry.Histogram

// BlockStorageMetrics
totalMainBlocksProved telemetry.Counter
totalBatchesProved telemetry.Counter

// ProposerMetrics
totalL1TxSent telemetry.Counter
Expand Down Expand Up @@ -66,7 +66,7 @@ func (h *SyncCommitteeMetricsHandler) init(attributes metric.MeasurementOption,
func (h *SyncCommitteeMetricsHandler) initBlockStorageMetrics(meter telemetry.Meter) error {
var err error

if h.totalMainBlocksProved, err = meter.Int64Counter(namespace + "total_main_blocks_proved"); err != nil {
if h.totalBatchesProved, err = meter.Int64Counter(namespace + "total_batches_proved"); err != nil {
return err
}

Expand Down Expand Up @@ -112,8 +112,8 @@ func (h *SyncCommitteeMetricsHandler) RecordBlockBatchSize(ctx context.Context,
h.blockBatchSize.Record(ctx, batchSize, h.attributes)
}

func (h *SyncCommitteeMetricsHandler) RecordMainBlockProved(ctx context.Context) {
h.totalMainBlocksProved.Add(ctx, 1, h.attributes)
func (h *SyncCommitteeMetricsHandler) RecordBatchProved(ctx context.Context) {
h.totalBatchesProved.Add(ctx, 1, h.attributes)
}

func (h *SyncCommitteeMetricsHandler) RecordProposerTxSent(ctx context.Context, proposalData *types.ProposalData) {
Expand Down
Loading