diff --git a/nil/common/check/check.go b/nil/common/check/check.go index 9ef79224a..2e27d2202 100644 --- a/nil/common/check/check.go +++ b/nil/common/check/check.go @@ -24,6 +24,11 @@ func PanicIfNot(flag bool) { } } +// PanicIff panics on true with the given message. +func PanicIff(flag bool, format string, args ...interface{}) { + PanicIfNotf(!flag, format, args...) +} + // PanicIfNotf panics on false with the given message. func PanicIfNotf(flag bool, format string, args ...interface{}) { if !flag { diff --git a/nil/services/synccommittee/Makefile.inc b/nil/services/synccommittee/Makefile.inc index eeb4ab893..3a08ca311 100644 --- a/nil/services/synccommittee/Makefile.inc +++ b/nil/services/synccommittee/Makefile.inc @@ -13,16 +13,22 @@ generate_synccommittee_mocks: \ $(root_sce)/internal/api/task_handler_generated_mock.go: $(root_sce)/internal/api/task_handler.go $(root_sce)/internal/types/prover_tasks.go go generate $(root_sce)/internal/api -$(root_sce)/internal/api/task_request_handler_generated_mock.go: $(root_sce)/internal/api/task_request_handler.go $(root_sce)/internal/types/prover_tasks.go +$(root_sce)/internal/api/task_request_handler_generated_mock.go: \ + $(root_sce)/internal/api/task_request_handler.go \ + $(root_sce)/internal/types/task_result.go \ + $(root_sce)/internal/types/prover_tasks.go go generate $(root_sce)/internal/api -$(root_sce)/internal/api/task_state_change_handler_generated_mock.go: $(root_sce)/internal/api/task_state_change_handler.go $(root_sce)/internal/types/prover_tasks.go +$(root_sce)/internal/api/task_state_change_handler_generated_mock.go: \ + $(root_sce)/internal/api/task_state_change_handler.go \ + $(root_sce)/internal/types/prover_tasks.go go generate $(root_sce)/internal/api $(root_sce)/internal/scheduler/task_scheduler_generated_mock.go: \ $(root_sce)/internal/scheduler/task_scheduler.go \ $(root_sce)/internal/api/task_request_handler.go \ $(root_sce)/internal/types/prover_tasks.go \ + $(root_sce)/internal/types/task_result.go \ $(root_sce)/public/task_debug_api.go \ $(root_sce)/public/task_view.go go generate $(root_sce)/internal/scheduler @@ -48,16 +54,19 @@ synccommittee_types: \ $(root_sce)/internal/types/proverresulttype_string.go \ $(root_sce)/internal/types/taskstatus_string.go \ $(root_sce)/internal/types/circuittype_string.go \ + $(root_sce)/internal/types/taskerrtype_string.go \ $(root_sce)/public/taskdebugorder_string.go -$(root_sce)/internal/types/tasktype_string.go: $(root_sce)/internal/types/prover_tasks.go +$(root_sce)/internal/types/tasktype_string.go: $(root_sce)/internal/types/task_type.go go generate $(root_sce)/internal/types -$(root_sce)/internal/types/proverresulttype_string.go: $(root_sce)/internal/types/prover_tasks.go +$(root_sce)/internal/types/proverresulttype_string.go: $(root_sce)/internal/types/task_result.go go generate $(root_sce)/internal/types -$(root_sce)/internal/types/taskstatus_string.go: $(root_sce)/internal/types/prover_tasks.go +$(root_sce)/internal/types/taskstatus_string.go: $(root_sce)/internal/types/task_status.go go generate $(root_sce)/internal/types $(root_sce)/internal/types/circuittype_string.go: $(root_sce)/internal/types/prover_tasks.go go generate $(root_sce)/internal/types +$(root_sce)/internal/types/taskerrtype_string.go: $(root_sce)/internal/types/errors.go + go generate $(root_sce)/internal/types $(root_sce)/public/taskdebugorder_string.go: $(root_sce)/public/task_debug_api.go go generate $(root_sce)/public diff --git a/nil/services/synccommittee/core/block_tasks_integration_test.go b/nil/services/synccommittee/core/block_tasks_integration_test.go index 4c4d6e3e6..9774d08f5 100644 --- a/nil/services/synccommittee/core/block_tasks_integration_test.go +++ b/nil/services/synccommittee/core/block_tasks_integration_test.go @@ -2,7 +2,6 @@ package core import ( "context" - "errors" "testing" "github.com/NilFoundation/nil/nil/common" @@ -79,7 +78,7 @@ func (s *BlockTasksIntegrationTestSuite) Test_Provide_Tasks_And_Handle_Success_R proofTasks, err := batch.CreateProofTasks(s.timer.NowTime()) s.Require().NoError(err) - err = s.taskStorage.AddTaskEntries(s.ctx, proofTasks) + err = s.taskStorage.AddTaskEntries(s.ctx, proofTasks...) s.Require().NoError(err) executorId := testaide.RandomExecutorId() @@ -131,7 +130,7 @@ func (s *BlockTasksIntegrationTestSuite) Test_Provide_Tasks_And_Handle_Failure_R proofTasks, err := batch.CreateProofTasks(s.timer.NowTime()) s.Require().NoError(err) - err = s.taskStorage.AddTaskEntries(s.ctx, proofTasks) + err = s.taskStorage.AddTaskEntries(s.ctx, proofTasks...) s.Require().NoError(err) executorId := testaide.RandomExecutorId() @@ -157,7 +156,7 @@ func (s *BlockTasksIntegrationTestSuite) Test_Provide_Tasks_And_Handle_Failure_R aggregateProofsFailed := types.NewFailureProviderTaskResult( taskToExecute.Id, executorId, - errors.New("something went wrong"), + types.NewTaskExecError(types.TaskErrProofFailed, "block proof failed"), ) err = s.scheduler.SetTaskResult(s.ctx, aggregateProofsFailed) diff --git a/nil/services/synccommittee/core/task_state_change_handler.go b/nil/services/synccommittee/core/task_state_change_handler.go index 08856686d..2e5a10f57 100644 --- a/nil/services/synccommittee/core/task_state_change_handler.go +++ b/nil/services/synccommittee/core/task_state_change_handler.go @@ -4,6 +4,7 @@ import ( "context" "fmt" + "github.com/NilFoundation/nil/nil/common/logging" "github.com/NilFoundation/nil/nil/services/synccommittee/internal/api" "github.com/NilFoundation/nil/nil/services/synccommittee/internal/log" "github.com/NilFoundation/nil/nil/services/synccommittee/internal/storage" @@ -33,14 +34,15 @@ func (h taskStateChangeHandler) OnTaskTerminated(ctx context.Context, task *type return nil } - if !result.IsSuccess { - log.NewTaskEvent(h.logger, zerolog.WarnLevel, task). - Str("errorText", result.ErrorText). + if !result.IsSuccess() { + log.NewTaskResultEvent(h.logger, zerolog.WarnLevel, result). Msg("block proof task has failed, data won't be sent to the L1") return nil } - log.NewTaskEvent(h.logger, zerolog.InfoLevel, task).Msg("Proof batch completed") + log.NewTaskResultEvent(h.logger, zerolog.InfoLevel, result). + Stringer(logging.FieldBatchId, task.BatchId). + Msg("Proof batch completed") blockId := types.NewBlockId(task.ShardId, task.BlockHash) diff --git a/nil/services/synccommittee/internal/log/events.go b/nil/services/synccommittee/internal/log/events.go index 1b9ba7bf6..8b26a4fc4 100644 --- a/nil/services/synccommittee/internal/log/events.go +++ b/nil/services/synccommittee/internal/log/events.go @@ -28,9 +28,16 @@ func NewTaskResultEvent( result *types.TaskResult, ) *zerolog.Event { //nolint:zerologlint // 'must be dispatched by Msg or Send method' error is ignored - return logger.WithLevel(level). + event := logger.WithLevel(level). Stringer(logging.FieldTaskId, result.TaskId). Stringer(logging.FieldTaskExecutorId, result.Sender). - Bool("isSuccess", result.IsSuccess). - Str("errorText", result.ErrorText) + Bool("isSuccess", result.IsSuccess()) + + if result.IsSuccess() { + return event + } + + return event. + Str("errorText", result.Error.ErrText). + Stringer("errorType", result.Error.ErrType) } diff --git a/nil/services/synccommittee/internal/metrics/task_storage_metrics.go b/nil/services/synccommittee/internal/metrics/task_storage_metrics.go index 3ae329a04..d95140467 100644 --- a/nil/services/synccommittee/internal/metrics/task_storage_metrics.go +++ b/nil/services/synccommittee/internal/metrics/task_storage_metrics.go @@ -11,6 +11,11 @@ import ( "go.opentelemetry.io/otel/metric" ) +const ( + attrTaskType = "task.type" + attrTaskExecutor = "task.executor.id" +) + type taskStorageMetricsHandler struct { attributes metric.MeasurementOption @@ -77,7 +82,7 @@ func (h *taskStorageMetricsHandler) RecordTaskTerminated(ctx context.Context, ta taskAttributes := getTaskAttributes(taskEntry) h.currentActiveTasks.Add(ctx, -1, h.attributes, taskAttributes) - if taskResult.IsSuccess { + if taskResult.IsSuccess() { executionTimeMs := time.Since(*taskEntry.Started).Milliseconds() h.taskExecutionTimeMs.Record(ctx, executionTimeMs, h.attributes, taskAttributes) h.totalTasksSucceeded.Add(ctx, 1, h.attributes, taskAttributes) @@ -86,8 +91,12 @@ func (h *taskStorageMetricsHandler) RecordTaskTerminated(ctx context.Context, ta } } -func (h *taskStorageMetricsHandler) RecordTaskRescheduled(ctx context.Context, taskEntry *types.TaskEntry) { - taskAttributes := getTaskAttributes(taskEntry) +func (h *taskStorageMetricsHandler) RecordTaskRescheduled(ctx context.Context, taskType types.TaskType, previousExecutor types.TaskExecutorId) { + taskAttributes := telattr.With( + attribute.Stringer(attrTaskType, taskType), + attribute.Int64(attrTaskExecutor, int64(previousExecutor)), + ) + h.totalTasksRescheduled.Add(ctx, 1, h.attributes, taskAttributes) h.currentActiveTasks.Add(ctx, -1, h.attributes, taskAttributes) h.currentPendingTasks.Add(ctx, 1, h.attributes, taskAttributes) @@ -95,11 +104,11 @@ func (h *taskStorageMetricsHandler) RecordTaskRescheduled(ctx context.Context, t func getTaskAttributes(task *types.TaskEntry) metric.MeasurementOption { attributes := []attribute.KeyValue{ - attribute.Stringer("task.type", task.Task.TaskType), + attribute.Stringer(attrTaskType, task.Task.TaskType), } if task.Owner != types.UnknownExecutorId { - attributes = append(attributes, attribute.Int64("task.executor.id", int64(task.Owner))) + attributes = append(attributes, attribute.Int64(attrTaskExecutor, int64(task.Owner))) } return telattr.With(attributes...) diff --git a/nil/services/synccommittee/internal/rpc/task_debug_rpc_test.go b/nil/services/synccommittee/internal/rpc/task_debug_rpc_test.go index 23bf0e511..e694f3d59 100644 --- a/nil/services/synccommittee/internal/rpc/task_debug_rpc_test.go +++ b/nil/services/synccommittee/internal/rpc/task_debug_rpc_test.go @@ -363,7 +363,7 @@ func (s *TaskSchedulerDebugRpcTestSuite) requestAndSendResult( if completeSuccessfully { taskResult = testaide.NewSuccessTaskResult(taskToExec.Id, executor) } else { - taskResult = testaide.NewFailureTaskResult(taskToExec.Id, executor) + taskResult = testaide.NewNonRetryableErrorTaskResult(taskToExec.Id, executor) } err = s.storage.ProcessTaskResult(s.context, taskResult) diff --git a/nil/services/synccommittee/internal/rpc/task_request_handler_rpc_test.go b/nil/services/synccommittee/internal/rpc/task_request_handler_rpc_test.go index 2a099506f..3ae0d154f 100644 --- a/nil/services/synccommittee/internal/rpc/task_request_handler_rpc_test.go +++ b/nil/services/synccommittee/internal/rpc/task_request_handler_rpc_test.go @@ -1,7 +1,6 @@ package rpc import ( - "errors" "testing" "github.com/NilFoundation/nil/nil/services/synccommittee/internal/api" @@ -67,7 +66,11 @@ func (s *TaskRequestHandlerTestSuite) Test_TaskRequestHandler_UpdateTaskStatus() }, { "Failure_Result_Provider", - types.NewFailureProverTaskResult(types.NewTaskId(), testaide.RandomExecutorId(), errors.New("something went wrong")), + types.NewFailureProverTaskResult( + types.NewTaskId(), + testaide.RandomExecutorId(), + types.NewTaskExecError(types.TaskErrUnknown, "something went wrong"), + ), }, } diff --git a/nil/services/synccommittee/internal/scheduler/task_scheduler.go b/nil/services/synccommittee/internal/scheduler/task_scheduler.go index 706ca3763..8507b3311 100644 --- a/nil/services/synccommittee/internal/scheduler/task_scheduler.go +++ b/nil/services/synccommittee/internal/scheduler/task_scheduler.go @@ -98,8 +98,7 @@ func (s *taskSchedulerImpl) SetTaskResult(ctx context.Context, result *types.Tas entry, err := s.storage.TryGetTaskEntry(ctx, result.TaskId) if err != nil { - s.onTaskResultError(ctx, err, result) - return err + return s.onTaskResultError(ctx, err, result) } if entry == nil { @@ -107,14 +106,16 @@ func (s *taskSchedulerImpl) SetTaskResult(ctx context.Context, result *types.Tas return nil } - if err = s.stateHandler.OnTaskTerminated(ctx, &entry.Task, result); err != nil { - s.onTaskResultError(ctx, err, result) - return fmt.Errorf("%w: %w", ErrFailedToProcessTaskResult, err) + if err := result.ValidateAgainst(entry); err != nil { + return s.onTaskResultError(ctx, err, result) } - if err = s.storage.ProcessTaskResult(ctx, result); err != nil { - s.onTaskResultError(ctx, err, result) - return fmt.Errorf("%w: %w", ErrFailedToProcessTaskResult, err) + if err := s.stateHandler.OnTaskTerminated(ctx, &entry.Task, result); err != nil { + return s.onTaskResultError(ctx, err, result) + } + + if err := s.storage.ProcessTaskResult(ctx, result); err != nil { + return s.onTaskResultError(ctx, err, result) } return nil @@ -205,9 +206,10 @@ func (s *taskSchedulerImpl) GetTaskTree(ctx context.Context, taskId types.TaskId return s.storage.GetTaskTreeView(ctx, taskId) } -func (s *taskSchedulerImpl) onTaskResultError(ctx context.Context, err error, result *types.TaskResult) { - log.NewTaskResultEvent(s.logger, zerolog.ErrorLevel, result).Err(err).Msg("Failed to process task result") +func (s *taskSchedulerImpl) onTaskResultError(ctx context.Context, cause error, result *types.TaskResult) error { + log.NewTaskResultEvent(s.logger, zerolog.ErrorLevel, result).Err(cause).Msg("Failed to process task result") s.recordError(ctx) + return fmt.Errorf("%w: %w", ErrFailedToProcessTaskResult, cause) } func (s *taskSchedulerImpl) Run(ctx context.Context) error { diff --git a/nil/services/synccommittee/internal/storage/block_storage.go b/nil/services/synccommittee/internal/storage/block_storage.go index af3affc0d..3c01f44b0 100644 --- a/nil/services/synccommittee/internal/storage/block_storage.go +++ b/nil/services/synccommittee/internal/storage/block_storage.go @@ -61,11 +61,9 @@ type BlockStorageMetrics interface { } type blockStorage struct { - db db.DB - timer common.Timer - retryRunner common.RetryRunner - metrics BlockStorageMetrics - logger zerolog.Logger + commonStorage + timer common.Timer + metrics BlockStorageMetrics } func NewBlockStorage( @@ -75,19 +73,14 @@ func NewBlockStorage( logger zerolog.Logger, ) BlockStorage { return &blockStorage{ - db: database, - timer: timer, - retryRunner: badgerRetryRunner( - logger, - common.DoNotRetryIf(scTypes.ErrBlockMismatch), - ), - metrics: metrics, - logger: logger, + commonStorage: makeCommonStorage(database, logger, common.DoNotRetryIf(scTypes.ErrBlockMismatch)), + timer: timer, + metrics: metrics, } } func (bs *blockStorage) TryGetProvedStateRoot(ctx context.Context) (*common.Hash, error) { - tx, err := bs.db.CreateRoTx(ctx) + tx, err := bs.database.CreateRoTx(ctx) if err != nil { return nil, err } @@ -110,7 +103,7 @@ func (bs *blockStorage) getProvedStateRoot(tx db.RoTx) (*common.Hash, error) { } func (bs *blockStorage) SetProvedStateRoot(ctx context.Context, stateRoot common.Hash) error { - tx, err := bs.db.CreateRwTx(ctx) + tx, err := bs.database.CreateRwTx(ctx) if err != nil { return err } @@ -121,11 +114,11 @@ func (bs *blockStorage) SetProvedStateRoot(ctx context.Context, stateRoot common return err } - return tx.Commit() + return bs.commit(tx) } func (bs *blockStorage) TryGetLatestFetched(ctx context.Context) (*scTypes.MainBlockRef, error) { - tx, err := bs.db.CreateRoTx(ctx) + tx, err := bs.database.CreateRoTx(ctx) if err != nil { return nil, err } @@ -140,7 +133,7 @@ func (bs *blockStorage) TryGetLatestFetched(ctx context.Context) (*scTypes.MainB } func (bs *blockStorage) TryGetBlock(ctx context.Context, id scTypes.BlockId) (*jsonrpc.RPCBlock, error) { - tx, err := bs.db.CreateRoTx(ctx) + tx, err := bs.database.CreateRoTx(ctx) if err != nil { return nil, err } @@ -164,7 +157,7 @@ func (bs *blockStorage) SetBlockBatch(ctx context.Context, batch *scTypes.BlockB } func (bs *blockStorage) setBlockBatchImpl(ctx context.Context, batch *scTypes.BlockBatch) error { - tx, err := bs.db.CreateRwTx(ctx) + tx, err := bs.database.CreateRwTx(ctx) if err != nil { return err } @@ -188,7 +181,7 @@ func (bs *blockStorage) setBlockBatchImpl(ctx context.Context, batch *scTypes.Bl return err } - return tx.Commit() + return bs.commit(tx) } func (bs *blockStorage) putBlockTx(tx db.RwTx, batchId scTypes.BatchId, block *jsonrpc.RPCBlock) error { @@ -269,7 +262,7 @@ func (bs *blockStorage) SetBlockAsProved(ctx context.Context, id scTypes.BlockId } func (bs *blockStorage) setBlockAsProvedImpl(ctx context.Context, id scTypes.BlockId) (wasSet bool, err error) { - tx, err := bs.db.CreateRwTx(ctx) + tx, err := bs.database.CreateRwTx(ctx) if err != nil { return false, err } @@ -298,7 +291,7 @@ func (bs *blockStorage) setBlockAsProvedImpl(ctx context.Context, id scTypes.Blo return false, err } - if err := tx.Commit(); err != nil { + if err := bs.commit(tx); err != nil { return false, err } @@ -306,7 +299,7 @@ func (bs *blockStorage) setBlockAsProvedImpl(ctx context.Context, id scTypes.Blo } func (bs *blockStorage) TryGetNextProposalData(ctx context.Context) (*scTypes.ProposalData, error) { - tx, err := bs.db.CreateRoTx(ctx) + tx, err := bs.database.CreateRoTx(ctx) if err != nil { return nil, err } @@ -382,7 +375,7 @@ func (bs *blockStorage) SetBlockAsProposed(ctx context.Context, id scTypes.Block } func (bs *blockStorage) setBlockAsProposedImpl(ctx context.Context, id scTypes.BlockId) error { - tx, err := bs.db.CreateRwTx(ctx) + tx, err := bs.database.CreateRwTx(ctx) if err != nil { return err } @@ -421,7 +414,7 @@ func (bs *blockStorage) setBlockAsProposedImpl(ctx context.Context, id scTypes.B return err } - return tx.Commit() + return bs.commit(tx) } func isValidProposalCandidate(entry *blockEntry, parentHash common.Hash) bool { diff --git a/nil/services/synccommittee/internal/storage/common_storage.go b/nil/services/synccommittee/internal/storage/common_storage.go new file mode 100644 index 000000000..67d88a9de --- /dev/null +++ b/nil/services/synccommittee/internal/storage/common_storage.go @@ -0,0 +1,34 @@ +package storage + +import ( + "fmt" + + "github.com/NilFoundation/nil/nil/common" + "github.com/NilFoundation/nil/nil/internal/db" + "github.com/rs/zerolog" +) + +type commonStorage struct { + database db.DB + retryRunner common.RetryRunner + logger zerolog.Logger +} + +func makeCommonStorage( + database db.DB, + logger zerolog.Logger, + additionalRetryPolicies ...common.RetryPolicyFunc, +) commonStorage { + return commonStorage{ + database: database, + retryRunner: badgerRetryRunner(logger, additionalRetryPolicies...), + logger: logger, + } +} + +func (*commonStorage) commit(tx db.RwTx) error { + if err := tx.Commit(); err != nil { + return fmt.Errorf("failed to commit transaction: %w", err) + } + return nil +} diff --git a/nil/services/synccommittee/internal/storage/task_result_storage.go b/nil/services/synccommittee/internal/storage/task_result_storage.go index 7a89d1d6e..b01e4e856 100644 --- a/nil/services/synccommittee/internal/storage/task_result_storage.go +++ b/nil/services/synccommittee/internal/storage/task_result_storage.go @@ -7,7 +7,6 @@ import ( "errors" "fmt" - "github.com/NilFoundation/nil/nil/common" "github.com/NilFoundation/nil/nil/common/logging" "github.com/NilFoundation/nil/nil/internal/db" "github.com/NilFoundation/nil/nil/services/synccommittee/internal/types" @@ -36,16 +35,12 @@ func NewTaskResultStorage( logger zerolog.Logger, ) TaskResultStorage { return &taskResultStorage{ - database: db, - retryRunner: badgerRetryRunner(logger), - logger: logger, + commonStorage: makeCommonStorage(db, logger), } } type taskResultStorage struct { - database db.DB - retryRunner common.RetryRunner - logger zerolog.Logger + commonStorage } func (s *taskResultStorage) TryGetPending(ctx context.Context) (*types.TaskResult, error) { @@ -130,13 +125,6 @@ func (s *taskResultStorage) deleteImpl(ctx context.Context, taskId types.TaskId) return s.commit(tx) } -func (*taskResultStorage) commit(tx db.RwTx) error { - if err := tx.Commit(); err != nil { - return fmt.Errorf("failed to commit transaction: %w", err) - } - return nil -} - func marshallTaskResult(result *types.TaskResult) ([]byte, error) { bytes, err := json.Marshal(result) if err != nil { diff --git a/nil/services/synccommittee/internal/storage/task_result_storage_test.go b/nil/services/synccommittee/internal/storage/task_result_storage_test.go index 29a04b07d..66d72c62e 100644 --- a/nil/services/synccommittee/internal/storage/task_result_storage_test.go +++ b/nil/services/synccommittee/internal/storage/task_result_storage_test.go @@ -2,7 +2,6 @@ package storage import ( "context" - "errors" "testing" "github.com/NilFoundation/nil/nil/common/logging" @@ -69,7 +68,7 @@ func (s *TaskResultStorageSuite) Test_Delete_Same_Task_Result_N_Times() { result := types.NewFailureProviderTaskResult( types.NewTaskId(), testaide.RandomExecutorId(), - errors.New("something went wrong"), + types.NewTaskExecError(types.TaskErrUnknown, "something went wrong"), ) err := s.storage.Put(s.ctx, result) @@ -125,7 +124,7 @@ func newTaskResults() []*types.TaskResult { types.NewFailureProviderTaskResult( types.NewTaskId(), testaide.RandomExecutorId(), - errors.New("something went wrong"), + types.NewTaskExecError(types.TaskErrUnknown, "something went wrong"), ), types.NewSuccessProverTaskResult( types.NewTaskId(), @@ -141,7 +140,7 @@ func newTaskResults() []*types.TaskResult { types.NewFailureProverTaskResult( types.NewTaskId(), testaide.RandomExecutorId(), - errors.New("prover failed to handle task"), + types.NewTaskExecError(types.TaskErrIO, "prover failed to handle task"), ), } } diff --git a/nil/services/synccommittee/internal/storage/task_storage.go b/nil/services/synccommittee/internal/storage/task_storage.go index 9d88a34cc..d53191fb5 100644 --- a/nil/services/synccommittee/internal/storage/task_storage.go +++ b/nil/services/synccommittee/internal/storage/task_storage.go @@ -18,9 +18,11 @@ import ( ) // TaskEntriesTable BadgerDB tables, TaskId is used as a key -const ( - taskEntriesTable db.TableName = "task_entries" -) +const taskEntriesTable db.TableName = "task_entries" + +// rescheduledTasksPerTxLimit defines the maximum number of tasks that can be rescheduled +// in a single transaction of TaskStorage.RescheduleHangingTasks. +const rescheduledTasksPerTxLimit = 100 // TaskViewContainer is an interface for storing task view type TaskViewContainer interface { @@ -57,15 +59,13 @@ type TaskStorageMetrics interface { RecordTaskAdded(ctx context.Context, task *types.TaskEntry) RecordTaskStarted(ctx context.Context, taskEntry *types.TaskEntry) RecordTaskTerminated(ctx context.Context, taskEntry *types.TaskEntry, taskResult *types.TaskResult) - RecordTaskRescheduled(ctx context.Context, taskEntry *types.TaskEntry) + RecordTaskRescheduled(ctx context.Context, taskType types.TaskType, previousExecutor types.TaskExecutorId) } type taskStorage struct { - database db.DB - retryRunner common.RetryRunner - timer common.Timer - metrics TaskStorageMetrics - logger zerolog.Logger + commonStorage + timer common.Timer + metrics TaskStorageMetrics } func NewTaskStorage( @@ -75,14 +75,13 @@ func NewTaskStorage( logger zerolog.Logger, ) TaskStorage { return &taskStorage{ - database: db, - retryRunner: badgerRetryRunner( + commonStorage: makeCommonStorage( + db, logger, common.DoNotRetryIf(types.ErrTaskWrongExecutor, types.ErrTaskInvalidStatus, ErrTaskAlreadyExists), ), timer: timer, metrics: metrics, - logger: logger, } } @@ -142,7 +141,7 @@ func (st *taskStorage) addTaskEntriesImpl(ctx context.Context, tasks []*types.Ta return err } } - return tx.Commit() + return st.commit(tx) } func (st *taskStorage) addSingleTaskEntryTx(tx db.RwTx, entry *types.TaskEntry) error { @@ -182,12 +181,12 @@ func (st *taskStorage) GetTaskViews(ctx context.Context, destination TaskViewCon currentTime := st.timer.NowTime() - err = st.iterateOverTaskEntries(tx, func(entry *types.TaskEntry) error { + err = st.iterateOverTaskEntries(tx, func(entry *types.TaskEntry) (bool, error) { taskView := public.NewTaskView(entry, currentTime) if predicate(taskView) { destination.Add(taskView) } - return nil + return true, nil }) if err != nil { return fmt.Errorf("failed to retrieve tasks based on predicate: %w", err) @@ -254,16 +253,16 @@ func (st *taskStorage) GetTaskTreeView(ctx context.Context, rootTaskId types.Tas func (st *taskStorage) findTopPriorityTask(tx db.RoTx) (*types.TaskEntry, error) { var topPriorityTask *types.TaskEntry = nil - err := st.iterateOverTaskEntries(tx, func(entry *types.TaskEntry) error { + err := st.iterateOverTaskEntries(tx, func(entry *types.TaskEntry) (bool, error) { if entry.Status != types.WaitingForExecutor { - return nil + return true, nil } if entry.HasHigherPriorityThan(topPriorityTask) { topPriorityTask = entry } - return nil + return true, nil }) return topPriorityTask, err @@ -311,8 +310,8 @@ func (st *taskStorage) requestTaskToExecuteImpl(ctx context.Context, executor ty if err := st.putTaskEntry(tx, taskEntry); err != nil { return nil, fmt.Errorf("failed to update task entry: %w", err) } - if err = tx.Commit(); err != nil { - return nil, fmt.Errorf("failed to commit transaction: %w", err) + if err = st.commit(tx); err != nil { + return nil, err } return taskEntry, nil } @@ -342,13 +341,43 @@ func (st *taskStorage) processTaskResultImpl(ctx context.Context, res *types.Tas return err } + if err := res.ValidateAgainst(entry); err != nil { + return err + } + + if !res.IsSuccess() && res.Error.CanBeRetried() { + if err := st.rescheduleTaskTx(tx, entry, res.Error); err != nil { + return err + } + + if err := st.commit(tx); err != nil { + return err + } + + st.metrics.RecordTaskRescheduled(ctx, entry.Task.TaskType, res.Sender) + return nil + } + + if err := st.terminateTaskTx(tx, entry, res); err != nil { + return err + } + + if err := st.commit(tx); err != nil { + return err + } + + st.metrics.RecordTaskTerminated(ctx, entry, res) + return nil +} + +func (st *taskStorage) terminateTaskTx(tx db.RwTx, entry *types.TaskEntry, res *types.TaskResult) error { currentTime := st.timer.NowTime() if err := entry.Terminate(res, currentTime); err != nil { return err } - if res.IsSuccess { + if res.IsSuccess() { // We don't keep finished tasks in DB log.NewTaskResultEvent(st.logger, zerolog.DebugLevel, res). Msg("Task execution is completed successfully, removing it from the storage") @@ -360,14 +389,26 @@ func (st *taskStorage) processTaskResultImpl(ctx context.Context, res *types.Tas return err } - // Update all the tasks that are waiting for this result + if err := st.updateDependentsTx(tx, entry, res, currentTime); err != nil { + return err + } + + return nil +} + +func (st *taskStorage) updateDependentsTx( + tx db.RwTx, + entry *types.TaskEntry, + res *types.TaskResult, + currentTime time.Time, +) error { for taskId := range entry.Dependents { depEntry, err := st.extractTaskEntry(tx, taskId) if err != nil { return err } - resultEntry := types.NewTaskResultEntry(res, entry, currentTime) + resultEntry := types.NewTaskResultDetails(res, entry, currentTime) if err = depEntry.AddDependencyResult(*resultEntry); err != nil { return fmt.Errorf("failed to add dependency result to task with id=%s: %w", depEntry.Task.Id, err) @@ -377,70 +418,100 @@ func (st *taskStorage) processTaskResultImpl(ctx context.Context, res *types.Tas return err } } - if err := tx.Commit(); err != nil { - return err - } - - st.metrics.RecordTaskTerminated(ctx, entry, res) return nil } +type rescheduledTask struct { + taskType types.TaskType + previousExecutor types.TaskExecutorId +} + func (st *taskStorage) RescheduleHangingTasks(ctx context.Context, taskExecutionTimeout time.Duration) error { - return st.retryRunner.Do(ctx, func(ctx context.Context) error { - return st.rescheduleHangingTasksImpl(ctx, taskExecutionTimeout) + var rescheduled []rescheduledTask + err := st.retryRunner.Do(ctx, func(ctx context.Context) error { + var err error + rescheduled, err = st.rescheduleHangingTasksImpl(ctx, taskExecutionTimeout) + return err }) + if err != nil { + return err + } + + for _, entry := range rescheduled { + st.metrics.RecordTaskRescheduled(ctx, entry.taskType, entry.previousExecutor) + } + return nil } func (st *taskStorage) rescheduleHangingTasksImpl( ctx context.Context, taskExecutionTimeout time.Duration, -) error { +) (rescheduled []rescheduledTask, err error) { tx, err := st.database.CreateRwTx(ctx) if err != nil { - return err + return nil, err } defer tx.Rollback() - err = st.iterateOverTaskEntries(tx, func(entry *types.TaskEntry) error { + currentTime := st.timer.NowTime() + + err = st.iterateOverTaskEntries(tx, func(entry *types.TaskEntry) (bool, error) { if entry.Status != types.Running { - return nil + return true, nil } - currentTime := st.timer.NowTime() executionTime := currentTime.Sub(*entry.Started) if executionTime <= taskExecutionTimeout { - return nil + return true, nil } - st.metrics.RecordTaskRescheduled(ctx, entry) - - if err := st.rescheduleTaskTx(tx, entry, executionTime); err != nil { - return err + previousExecutor := entry.Owner + timeoutErr := types.NewTaskErrTimeout(executionTime, taskExecutionTimeout) + if err := st.rescheduleTaskTx(tx, entry, timeoutErr); err != nil { + return false, err } - return nil + rescheduled = append(rescheduled, rescheduledTask{entry.Task.TaskType, previousExecutor}) + shouldContinue := len(rescheduled) < rescheduledTasksPerTxLimit + return shouldContinue, nil }) if err != nil { - return err + return nil, err + } + + if err := st.commit(tx); err != nil { + return nil, err } - return tx.Commit() + return rescheduled, nil } -func (st *taskStorage) rescheduleTaskTx(tx db.RwTx, entry *types.TaskEntry, executionTime time.Duration) error { +func (st *taskStorage) rescheduleTaskTx( + tx db.RwTx, + entry *types.TaskEntry, + cause *types.TaskExecError, +) error { log.NewTaskEvent(st.logger, zerolog.WarnLevel, &entry.Task). + Err(cause). Stringer(logging.FieldTaskExecutorId, entry.Owner). - Dur(logging.FieldTaskExecTime, executionTime). - Msg("Task execution timeout, rescheduling") + Int("retryCount", entry.RetryCount). + Msg("Task execution error, rescheduling") if err := entry.ResetRunning(); err != nil { return fmt.Errorf("failed to reset task: %w", err) } - return st.putTaskEntry(tx, entry) + if err := st.putTaskEntry(tx, entry); err != nil { + return fmt.Errorf("failed to put rescheduled task: %w", err) + } + + return nil } -func (*taskStorage) iterateOverTaskEntries(tx db.RoTx, action func(entry *types.TaskEntry) error) error { +func (*taskStorage) iterateOverTaskEntries( + tx db.RoTx, + action func(entry *types.TaskEntry) (shouldContinue bool, err error), +) error { iter, err := tx.Range(taskEntriesTable, nil, nil) if err != nil { return err @@ -456,10 +527,13 @@ func (*taskStorage) iterateOverTaskEntries(tx db.RoTx, action func(entry *types. if err = gob.NewDecoder(bytes.NewBuffer(val)).Decode(&entry); err != nil { return fmt.Errorf("%w: failed to decode task with id %v: %w", ErrSerializationFailed, string(key), err) } - err = action(entry) + shouldContinue, err := action(entry) if err != nil { return err } + if !shouldContinue { + return nil + } } return nil diff --git a/nil/services/synccommittee/internal/storage/task_storage_test.go b/nil/services/synccommittee/internal/storage/task_storage_test.go index b4bee776a..52243f5f5 100644 --- a/nil/services/synccommittee/internal/storage/task_storage_test.go +++ b/nil/services/synccommittee/internal/storage/task_storage_test.go @@ -2,7 +2,6 @@ package storage import ( "context" - "errors" "sync" "sync/atomic" "testing" @@ -29,6 +28,8 @@ type TaskStorageSuite struct { ctx context.Context } +type taskResultFactory func(taskId types.TaskId, executorId types.TaskExecutorId) *types.TaskResult + func TestTaskStorageSuite(t *testing.T) { t.Parallel() suite.Run(t, new(TaskStorageSuite)) @@ -309,38 +310,72 @@ func (s *TaskStorageSuite) Test_ProcessTaskResult_Concurrently() { } func (s *TaskStorageSuite) Test_ProcessTaskResult_InvalidStateChange() { - testCases := []struct { - name string - oldStatus types.TaskStatus - }{ - {"WaitingForInput", types.WaitingForInput}, - {"WaitingForExecutor", types.WaitingForExecutor}, - {"Failed", types.Failed}, + taskStatuses := []types.TaskStatus{ + types.WaitingForInput, + types.WaitingForExecutor, + types.Failed, } - for _, testCase := range testCases { - s.Run(testCase.name+"_TrySetSuccess", func() { - s.tryToChangeStatus(testCase.oldStatus, true, false, types.ErrTaskInvalidStatus) - }) - s.Run(testCase.name+"_TrySetFailure", func() { - s.tryToChangeStatus(testCase.oldStatus, false, false, types.ErrTaskInvalidStatus) - }) + taskResults := getTestTaskResults(false) + + for _, taskStatus := range taskStatuses { + for _, result := range taskResults { + s.Run(taskStatus.String()+"_"+result.name, func() { + s.tryToChangeStatus(taskStatus, result.factory, types.ErrTaskInvalidStatus) + }) + } } } func (s *TaskStorageSuite) Test_ProcessTaskResult_WrongExecutor() { - s.Run("TrySetSuccess", func() { - s.tryToChangeStatus(types.Running, true, true, types.ErrTaskWrongExecutor) - }) - s.Run("TrySetFailure", func() { - s.tryToChangeStatus(types.Running, false, true, types.ErrTaskWrongExecutor) - }) + taskResults := getTestTaskResults(true) + + for _, result := range taskResults { + s.Run(result.name, func() { + s.tryToChangeStatus(types.Running, result.factory, types.ErrTaskWrongExecutor) + }) + } +} + +func getTestTaskResults(useDifferentExecutorId bool) []struct { + name string + factory taskResultFactory +} { + substituteId := func(execId types.TaskExecutorId) types.TaskExecutorId { + if useDifferentExecutorId { + return testaide.RandomExecutorId() + } + return execId + } + + return []struct { + name string + factory taskResultFactory + }{ + { + "Success", + func(taskId types.TaskId, executorId types.TaskExecutorId) *types.TaskResult { + return testaide.NewSuccessTaskResult(taskId, substituteId(executorId)) + }, + }, + { + "RetryableError", + func(taskId types.TaskId, executorId types.TaskExecutorId) *types.TaskResult { + return testaide.NewRetryableErrorTaskResult(taskId, substituteId(executorId)) + }, + }, + { + "NonRetryableError", + func(taskId types.TaskId, executorId types.TaskExecutorId) *types.TaskResult { + return testaide.NewNonRetryableErrorTaskResult(taskId, substituteId(executorId)) + }, + }, + } } func (s *TaskStorageSuite) tryToChangeStatus( oldStatus types.TaskStatus, - trySetSuccess bool, - useDifferentExecutorId bool, + resultToSubmit taskResultFactory, expectedError error, ) { s.T().Helper() @@ -351,17 +386,7 @@ func (s *TaskStorageSuite) tryToChangeStatus( err := s.ts.AddTaskEntries(s.ctx, taskEntry) s.Require().NoError(err) - if useDifferentExecutorId { - executorId = testaide.RandomExecutorId() - } - - var taskResult *types.TaskResult - if trySetSuccess { - taskResult = types.NewSuccessProverTaskResult(taskEntry.Task.Id, executorId, types.TaskResultAddresses{}, types.TaskResultData{}) - } else { - taskResult = types.NewFailureProverTaskResult(taskEntry.Task.Id, executorId, errors.New("some error")) - } - + taskResult := resultToSubmit(taskEntry.Task.Id, executorId) err = s.ts.ProcessTaskResult(s.ctx, taskResult) s.Require().ErrorIs(err, expectedError) } diff --git a/nil/services/synccommittee/internal/testaide/prover_tasks.go b/nil/services/synccommittee/internal/testaide/prover_tasks.go index 85754cdb3..3b243ecec 100644 --- a/nil/services/synccommittee/internal/testaide/prover_tasks.go +++ b/nil/services/synccommittee/internal/testaide/prover_tasks.go @@ -4,7 +4,6 @@ package testaide import ( "crypto/rand" - "errors" "math" "math/big" "time" @@ -93,10 +92,18 @@ func NewSuccessTaskResult(taskId types.TaskId, executor types.TaskExecutorId) *t ) } -func NewFailureTaskResult(taskId types.TaskId, executor types.TaskExecutorId) *types.TaskResult { +func NewRetryableErrorTaskResult(taskId types.TaskId, executor types.TaskExecutorId) *types.TaskResult { return types.NewFailureProverTaskResult( taskId, executor, - errors.New("something went wrong"), + types.NewTaskExecError(types.TaskErrUnknown, "something went wrong"), + ) +} + +func NewNonRetryableErrorTaskResult(taskId types.TaskId, executor types.TaskExecutorId) *types.TaskResult { + return types.NewFailureProverTaskResult( + taskId, + executor, + types.NewTaskExecError(types.TaskErrProofFailed, "failed to proof block"), ) } diff --git a/nil/services/synccommittee/internal/types/errors.go b/nil/services/synccommittee/internal/types/errors.go index 2a6c5856f..269998af8 100644 --- a/nil/services/synccommittee/internal/types/errors.go +++ b/nil/services/synccommittee/internal/types/errors.go @@ -3,6 +3,9 @@ package types import ( "errors" "fmt" + "time" + + "github.com/NilFoundation/nil/nil/common/check" ) var ( @@ -11,12 +14,84 @@ var ( ) var ( - ErrTaskInvalidStatus = errors.New("task has invalid status") - ErrTaskWrongExecutor = errors.New("task belongs to another executor") - ErrUnexpectedTaskType = errors.New("unexpected task type") - ErrBlockProofTaskFailed = errors.New("block proof task failed") + ErrTaskInvalidStatus = errors.New("task has invalid status") + ErrTaskWrongExecutor = errors.New("task belongs to another executor") ) -func UnexpectedTaskType(task *Task) error { - return fmt.Errorf("%w: taskType=%d, taskId=%s", ErrUnexpectedTaskType, task.TaskType, task.Id) +type TaskErrType int8 + +const ( + _ TaskErrType = iota + + // TaskErrTimeout indicates that a task failed as it exceeded the specified timeout duration. + TaskErrTimeout + + // TaskErrIO indicates an error related to input/output operations (RPC calls / Disk storage access / Network). + TaskErrIO + + // TaskErrInvalidInputData indicates an error caused by invalid or malformed input data to a task. + TaskErrInvalidInputData + + // TaskErrProofFailed indicates that a proof generation stage failed due to block inconsistency. + TaskErrProofFailed + + // TaskErrTerminated indicates that the task was explicitly terminated prior to completion. + TaskErrTerminated + + // TaskErrNotSupportedType indicates that an unsupported or unrecognized type was encountered by the executor. + TaskErrNotSupportedType + + // TaskErrUnknown indicates an unspecified task error. + TaskErrUnknown +) + +var RetryableErrors = map[TaskErrType]bool{ + TaskErrIO: true, + TaskErrTerminated: true, + TaskErrUnknown: true, +} + +type TaskExecError struct { + ErrType TaskErrType `json:"errCode"` + ErrText string `json:"errText"` +} + +func (e *TaskExecError) Error() string { + return fmt.Sprintf("%s: %s", e.ErrType, e.ErrText) +} + +func (e *TaskExecError) CanBeRetried() bool { + return RetryableErrors[e.ErrType] +} + +func NewTaskExecError(errType TaskErrType, errText string) *TaskExecError { + return &TaskExecError{ErrType: errType, ErrText: errText} +} + +func NewTaskExecErrorf(errType TaskErrType, format string, args ...interface{}) *TaskExecError { + return &TaskExecError{ErrType: errType, ErrText: fmt.Sprintf(format, args...)} +} + +func NewTaskErrTimeout(execTime, execTimeout time.Duration) *TaskExecError { + return NewTaskExecErrorf( + TaskErrTimeout, + "execution timeout exceeded: execTime=%s, execTimeout=%s", execTime, execTimeout, + ) +} + +func NewTaskErrNotSupportedType(task *Task) *TaskExecError { + return NewTaskExecErrorf(TaskErrNotSupportedType, "taskType=%d, taskId=%s", task.TaskType, task.Id) +} + +func NewTaskErrChildFailed(childResult *TaskResult) *TaskExecError { + check.PanicIff(childResult.IsSuccess(), "childResult is not failed") + + return NewTaskExecErrorf( + childResult.Error.ErrType, + "child prover task failed: childTaskId=%s, errorText=%s", childResult.TaskId, childResult.Error.ErrText, + ) +} + +func NewTaskErrUnknown(cause error) *TaskExecError { + return NewTaskExecErrorf(TaskErrUnknown, "task execution failed: %s", cause.Error()) } diff --git a/nil/services/synccommittee/internal/types/generate.go b/nil/services/synccommittee/internal/types/generate.go index c2781c1bf..9627098a7 100644 --- a/nil/services/synccommittee/internal/types/generate.go +++ b/nil/services/synccommittee/internal/types/generate.go @@ -4,3 +4,4 @@ package types //go:generate stringer -type=ProverResultType -trimprefix=ProverResultType //go:generate stringer -type=TaskStatus -trimprefix=TaskStatus //go:generate stringer -type=CircuitType -trimprefix=Circuit +//go:generate stringer -type=TaskErrType -trimprefix=TaskErr diff --git a/nil/services/synccommittee/internal/types/prover_tasks.go b/nil/services/synccommittee/internal/types/prover_tasks.go index ff0be51d8..5f445a4fd 100644 --- a/nil/services/synccommittee/internal/types/prover_tasks.go +++ b/nil/services/synccommittee/internal/types/prover_tasks.go @@ -3,8 +3,6 @@ package types import ( "errors" "fmt" - "maps" - "slices" "strconv" "time" @@ -15,48 +13,6 @@ import ( "github.com/google/uuid" ) -// TaskType Tasks have different types, it affects task input and priority -type TaskType uint8 - -const ( - TaskTypeNone TaskType = iota - AggregateProofs - ProofBlock - PartialProve - AggregatedChallenge - CombinedQ - AggregatedFRI - FRIConsistencyChecks - MergeProof -) - -var TaskTypes = map[string]TaskType{ - "AggregateProofs": AggregateProofs, - "ProofBlock": ProofBlock, - "PartialProve": PartialProve, - "AggregatedChallenge": AggregatedChallenge, - "CombinedQ": CombinedQ, - "AggregatedFRI": AggregatedFRI, - "FRIConsistencyChecks": FRIConsistencyChecks, - "MergeProof": MergeProof, -} - -func (t *TaskType) Set(str string) error { - if v, ok := TaskTypes[str]; ok { - *t = v - return nil - } - return fmt.Errorf("unknown task type: %s", str) -} - -func (*TaskType) Type() string { - return "TaskType" -} - -func (*TaskType) PossibleValues() []string { - return slices.Collect(maps.Keys(TaskTypes)) -} - type CircuitType uint8 const ( @@ -107,28 +63,6 @@ func (*TaskId) Type() string { return "TaskId" } -// Task results can have different types -type ProverResultType uint8 - -const ( - _ ProverResultType = iota - PartialProof - CommitmentState - PartialProofChallenges - AssignmentTableDescription - ThetaPower - PreprocessedCommonData - AggregatedChallenges - CombinedQPolynomial - AggregatedFRIProof - ProofOfWork - ConsistencyCheckChallenges - LPCConsistencyCheckProof - FinalProof - BlockProof - AggregatedProof -) - type TaskExecutorId uint32 const UnknownExecutorId TaskExecutorId = 0 @@ -150,8 +84,6 @@ func (*TaskExecutorId) Type() string { return "TaskExecutorId" } -type TaskResultAddresses map[ProverResultType]string - type TaskIdSet map[TaskId]bool func NewTaskIdSet() TaskIdSet { @@ -165,91 +97,6 @@ func (s TaskIdSet) Put(id TaskId) { // todo: declare separate task types for ProofProvider and Prover // https://www.notion.so/nilfoundation/Generic-Tasks-in-SyncCommittee-10ac614852608028b7ffcfd910deeef7?pvs=4 -type TaskResultData []byte - -// TaskResult represents the result of a task provided via RPC by the executor with id = TaskResult.Sender. -type TaskResult struct { - TaskId TaskId `json:"taskId"` - IsSuccess bool `json:"isSuccess"` - ErrorText string `json:"errorText,omitempty"` - Sender TaskExecutorId `json:"sender"` - DataAddresses TaskResultAddresses `json:"dataAddresses,omitempty"` - Data TaskResultData `json:"binaryData,omitempty"` -} - -func NewSuccessProviderTaskResult( - taskId TaskId, - proofProviderId TaskExecutorId, - dataAddresses TaskResultAddresses, - binaryData TaskResultData, -) *TaskResult { - return &TaskResult{ - TaskId: taskId, - IsSuccess: true, - Sender: proofProviderId, - DataAddresses: dataAddresses, - Data: binaryData, - } -} - -func NewFailureProviderTaskResult( - taskId TaskId, - proofProviderId TaskExecutorId, - err error, -) *TaskResult { - return &TaskResult{ - TaskId: taskId, - IsSuccess: false, - Sender: proofProviderId, - ErrorText: fmt.Sprintf("failed to proof block: %v", err), - } -} - -func NewSuccessProverTaskResult( - taskId TaskId, - sender TaskExecutorId, - dataAddresses TaskResultAddresses, - binaryData TaskResultData, -) *TaskResult { - return &TaskResult{ - TaskId: taskId, - IsSuccess: true, - Sender: sender, - DataAddresses: dataAddresses, - Data: binaryData, - } -} - -func NewFailureProverTaskResult( - taskId TaskId, - sender TaskExecutorId, - err error, -) *TaskResult { - return &TaskResult{ - TaskId: taskId, - Sender: sender, - IsSuccess: false, - ErrorText: fmt.Sprintf("failed to generate proof: %v", err), - } -} - -// TaskResultDetails represents the result of a task, extending TaskResult with additional task-specific metadata. -type TaskResultDetails struct { - TaskResult - TaskType TaskType `json:"type"` - CircuitType CircuitType `json:"circuitType"` - ExecutionTime time.Duration `json:"executionTime"` -} - -func NewTaskResultEntry(result *TaskResult, taskEntry *TaskEntry, currentTime time.Time) *TaskResultDetails { - return &TaskResultDetails{ - TaskResult: *result, - TaskType: taskEntry.Task.TaskType, - CircuitType: taskEntry.Task.CircuitType, - ExecutionTime: *taskEntry.ExecutionTime(currentTime), - } -} - // Task contains all the necessary data for either Prover or ProofProvider to perform computation type Task struct { Id TaskId `json:"id"` @@ -265,40 +112,6 @@ type Task struct { DependencyResults map[TaskId]TaskResultDetails `json:"dependencyResults"` } -type TaskStatus uint8 - -const ( - TaskStatusNone TaskStatus = iota - WaitingForInput - WaitingForExecutor - Running - Failed - Completed -) - -var TaskStatuses = map[string]TaskStatus{ - "WaitingForInput": WaitingForInput, - "WaitingForExecutor": WaitingForExecutor, - "Running": Running, - "Failed": Failed, -} - -func (t *TaskStatus) Set(str string) error { - if v, ok := TaskStatuses[str]; ok { - *t = v - return nil - } - return fmt.Errorf("unknown task status: %s", str) -} - -func (*TaskStatus) Type() string { - return "TaskStatus" -} - -func (*TaskStatus) PossibleValues() []string { - return slices.Collect(maps.Keys(TaskStatuses)) -} - // TaskEntry Wrapper for task to hold metadata like task status and dependencies type TaskEntry struct { // Task: task to be executed @@ -324,6 +137,9 @@ type TaskEntry struct { // Status: current status of the task Status TaskStatus + + // RetryCount specifies the number of times the task execution has been retried + RetryCount int } // AddDependency adds a dependency to the current task entry and updates the dependents and pending dependencies. @@ -352,7 +168,7 @@ func (t *TaskEntry) AddDependencyResult(res TaskResultDetails) error { } t.Task.DependencyResults[res.TaskId] = res - if res.IsSuccess { + if res.IsSuccess() { delete(t.PendingDependencies, res.TaskId) } if len(t.PendingDependencies) == 0 { @@ -381,19 +197,12 @@ func (t *TaskEntry) Start(executorId TaskExecutorId, currentTime time.Time) erro // Terminate transitions the status of a running task to Completed or Failed based on the input. func (t *TaskEntry) Terminate(result *TaskResult, currentTime time.Time) error { - if t.Owner != result.Sender { - return fmt.Errorf( - "%w: taskId=%v, taskStatus=%v, taskOwner=%v, requestSenderId=%v", - ErrTaskWrongExecutor, t.Task.Id, t.Status, t.Owner, result.Sender, - ) - } - - if t.Status != Running { - return errTaskInvalidStatus(t, "Terminate") + if err := result.ValidateAgainst(t); err != nil { + return err } var newStatus TaskStatus - if result.IsSuccess { + if result.IsSuccess() { newStatus = Completed } else { newStatus = Failed @@ -413,6 +222,7 @@ func (t *TaskEntry) ResetRunning() error { t.Started = nil t.Status = WaitingForExecutor t.Owner = UnknownExecutorId + t.RetryCount++ return nil } @@ -445,9 +255,9 @@ func (t *TaskEntry) HasHigherPriorityThan(other *TaskEntry) bool { return true } if t.Created != other.Created { - return t.Created.Before(t.Created) + return t.Created.Before(other.Created) } - return t.Task.TaskType < t.Task.TaskType + return t.Task.TaskType < other.Task.TaskType } // AsNewChildEntry creates a new TaskEntry with a new TaskId and sets the ParentTaskId to the current task's Id. diff --git a/nil/services/synccommittee/internal/types/task_result.go b/nil/services/synccommittee/internal/types/task_result.go index b22107f4b..ede75a5b2 100644 --- a/nil/services/synccommittee/internal/types/task_result.go +++ b/nil/services/synccommittee/internal/types/task_result.go @@ -1,6 +1,7 @@ package types import ( + "fmt" "time" "github.com/NilFoundation/nil/nil/common/check" @@ -40,8 +41,29 @@ type TaskResult struct { Data TaskResultData `json:"binaryData,omitempty"` } -func (t *TaskResult) IsSuccess() bool { - return t.Error == nil +func (r *TaskResult) IsSuccess() bool { + return r.Error == nil +} + +// ValidateAgainst checks the correctness of the TaskResult +// against the given TaskEntry and returns an error if invalid. +func (r *TaskResult) ValidateAgainst(entry *TaskEntry) error { + if r.TaskId != entry.Task.Id { + return fmt.Errorf("task result's taskId=%s does not match task entry's taskId=%s", r.TaskId, entry.Task.Id) + } + + if r.Sender == UnknownExecutorId || r.Sender != entry.Owner { + return fmt.Errorf( + "%w: taskId=%v, taskStatus=%v, taskOwner=%v, requestSenderId=%v", + ErrTaskWrongExecutor, entry.Task.Id, entry.Status, entry.Owner, r.Sender, + ) + } + + if entry.Status != Running { + return errTaskInvalidStatus(entry, "Validate") + } + + return nil } func NewSuccessProviderTaskResult( diff --git a/nil/services/synccommittee/proofprovider/task_handler.go b/nil/services/synccommittee/proofprovider/task_handler.go index e3f44bab1..fa966a271 100644 --- a/nil/services/synccommittee/proofprovider/task_handler.go +++ b/nil/services/synccommittee/proofprovider/task_handler.go @@ -29,7 +29,7 @@ func newTaskHandler( func (h *taskHandler) Handle(ctx context.Context, _ types.TaskExecutorId, task *types.Task) error { if (task.TaskType != types.ProofBlock) && (task.TaskType != types.AggregateProofs) { - return types.UnexpectedTaskType(task) + return types.NewTaskErrNotSupportedType(task) } log.NewTaskEvent(h.logger, zerolog.InfoLevel, task).Msg("Creating proof tasks for block") diff --git a/nil/services/synccommittee/proofprovider/task_handler_test.go b/nil/services/synccommittee/proofprovider/task_handler_test.go index d17d38633..8caa129d1 100644 --- a/nil/services/synccommittee/proofprovider/task_handler_test.go +++ b/nil/services/synccommittee/proofprovider/task_handler_test.go @@ -73,10 +73,10 @@ func (s *TaskHandlerTestSuite) TestReturnErrorOnUnexpectedTaskType() { s.Run(testCase.name, func() { task := testaide.NewTaskOfType(testCase.taskType) err := s.taskHandler.Handle(s.context, executorId, task) - s.Require().ErrorIs( + s.Require().ErrorContains( err, - types.ErrUnexpectedTaskType, - "taskHandler should have returned ErrUnexpectedTaskType on task of type %d", testCase.taskType, + types.TaskErrNotSupportedType.String(), + "taskHandler should have returned TaskErrNotSupportedType on task of type %d", testCase.taskType, ) }) } @@ -186,7 +186,7 @@ func (s *TaskHandlerTestSuite) requestTask(executorId types.TaskExecutorId, avai // Set result for task func (s *TaskHandlerTestSuite) completeTask(sender types.TaskExecutorId, id types.TaskId) { s.T().Helper() - result := &types.TaskResult{TaskId: id, IsSuccess: true, Sender: sender} + result := &types.TaskResult{TaskId: id, Sender: sender} err := s.taskStorage.ProcessTaskResult(s.context, result) s.Require().NoError(err) } diff --git a/nil/services/synccommittee/proofprovider/task_state_change_handler.go b/nil/services/synccommittee/proofprovider/task_state_change_handler.go index c95677b99..f83e23fa9 100644 --- a/nil/services/synccommittee/proofprovider/task_state_change_handler.go +++ b/nil/services/synccommittee/proofprovider/task_state_change_handler.go @@ -2,9 +2,8 @@ package proofprovider import ( "context" - "errors" - "fmt" + "github.com/NilFoundation/nil/nil/common/logging" "github.com/NilFoundation/nil/nil/services/synccommittee/internal/api" "github.com/NilFoundation/nil/nil/services/synccommittee/internal/log" "github.com/NilFoundation/nil/nil/services/synccommittee/internal/storage" @@ -12,8 +11,6 @@ import ( "github.com/rs/zerolog" ) -var ErrChildTaskFailed = errors.New("child prover task failed") - type taskStateChangeHandler struct { resultStorage storage.TaskResultStorage currentExecutorId types.TaskExecutorId @@ -43,18 +40,29 @@ func (h taskStateChangeHandler) OnTaskTerminated(ctx context.Context, task *type return nil } - if !result.IsSuccess { - log.NewTaskEvent(h.logger, zerolog.WarnLevel, task).Msgf("Prover task has failed") + if !result.IsSuccess() && result.Error.CanBeRetried() { + log.NewTaskResultEvent(h.logger, zerolog.WarnLevel, result). + Stringer(logging.FieldTaskParentId, task.ParentTaskId). + Msgf("Child task will be rescheduled for retry, parent is not updated") + return nil } var parentTaskResult *types.TaskResult - if result.IsSuccess { - parentTaskResult = types.NewSuccessProviderTaskResult(*task.ParentTaskId, h.currentExecutorId, result.DataAddresses, result.Data) + if result.IsSuccess() { + parentTaskResult = types.NewSuccessProviderTaskResult( + *task.ParentTaskId, + h.currentExecutorId, + result.DataAddresses, + result.Data, + ) } else { + log.NewTaskResultEvent(h.logger, zerolog.WarnLevel, result). + Stringer(logging.FieldTaskParentId, task.ParentTaskId). + Msgf("Prover task cannot be retried, parent will marked as failed") parentTaskResult = types.NewFailureProviderTaskResult( *task.ParentTaskId, h.currentExecutorId, - fmt.Errorf("%w: childTaskId=%s, errorText=%s", ErrChildTaskFailed, task.Id, result.ErrorText), + types.NewTaskErrChildFailed(result), ) } diff --git a/nil/services/synccommittee/prover/internal/constants/proof_producer_codes.go b/nil/services/synccommittee/prover/internal/constants/proof_producer_codes.go new file mode 100644 index 000000000..4dac37273 --- /dev/null +++ b/nil/services/synccommittee/prover/internal/constants/proof_producer_codes.go @@ -0,0 +1,24 @@ +package constants + +import ( + "github.com/NilFoundation/nil/nil/services/synccommittee/internal/types" +) + +// ProofProducerResultCode represents the result codes for proof-producer binary. +// Correspond to the values defined here: [TODO: add link to command_step.hpp] +type ProofProducerResultCode int + +const ( + ProofProducerSuccess ProofProducerResultCode = 0 + ProofProducerIOError ProofProducerResultCode = 10 + ProofProducerInvalidInput ProofProducerResultCode = 20 + ProofProducerProverError ProofProducerResultCode = 30 + ProofProducerUnknownError ProofProducerResultCode = 0xFF +) + +var ProofProducerErrors = map[ProofProducerResultCode]types.TaskErrType{ + ProofProducerIOError: types.TaskErrIO, + ProofProducerInvalidInput: types.TaskErrInvalidInputData, + ProofProducerProverError: types.TaskErrProofFailed, + ProofProducerUnknownError: types.TaskErrUnknown, +} diff --git a/nil/services/synccommittee/prover/task_handler.go b/nil/services/synccommittee/prover/task_handler.go index 04ab756bf..c13b3ba52 100644 --- a/nil/services/synccommittee/prover/task_handler.go +++ b/nil/services/synccommittee/prover/task_handler.go @@ -10,6 +10,7 @@ import ( "path/filepath" "slices" "strings" + "syscall" "github.com/NilFoundation/nil/nil/client" "github.com/NilFoundation/nil/nil/common" @@ -19,6 +20,7 @@ import ( "github.com/NilFoundation/nil/nil/services/synccommittee/internal/log" "github.com/NilFoundation/nil/nil/services/synccommittee/internal/storage" "github.com/NilFoundation/nil/nil/services/synccommittee/internal/types" + "github.com/NilFoundation/nil/nil/services/synccommittee/prover/internal/constants" "github.com/NilFoundation/nil/nil/services/synccommittee/prover/tracer" "github.com/rs/zerolog" ) @@ -438,14 +440,18 @@ func (h *taskHandler) makeCommandForTask(ctx context.Context, task *types.Task) func (h *taskHandler) Handle(ctx context.Context, executorId types.TaskExecutorId, task *types.Task) error { if task.TaskType == types.ProofBlock { - err := types.UnexpectedTaskType(task) - taskResult := types.NewFailureProverTaskResult(task.Id, executorId, fmt.Errorf("failed to create command for task: %w", err)) + err := types.NewTaskErrNotSupportedType(task) + taskResult := types.NewFailureProverTaskResult(task.Id, executorId, err) log.NewTaskEvent(h.logger, zerolog.ErrorLevel, task).Err(err).Msg("failed to create command for task") return h.resultStorage.Put(ctx, taskResult) } desc, err := h.makeCommandForTask(ctx, task) if err != nil { - taskResult := types.NewFailureProverTaskResult(task.Id, executorId, fmt.Errorf("failed to create command for task: %w", err)) + taskResult := types.NewFailureProverTaskResult( + task.Id, + executorId, + types.NewTaskExecErrorf(types.TaskErrUnknown, "failed to create command for task: %s", err.Error()), + ) log.NewTaskEvent(h.logger, zerolog.ErrorLevel, task). Err(err). Msg("failed to create command for task") @@ -461,14 +467,20 @@ func (h *taskHandler) Handle(ctx context.Context, executorId types.TaskExecutorI cmdString := strings.Join(cmd.Args, " ") h.logger.Info().Msgf("Run command %v\n", cmdString) err := cmd.Run() + h.logger.Trace().Msgf("Task execution stdout:\n%v\n", stdout.String()) if err != nil { - taskResult := types.NewFailureProverTaskResult(task.Id, executorId, fmt.Errorf("task execution failed: %w", err)) - timeSpent := h.timer.NowTime().Sub(startTime) log.NewTaskEvent(h.logger, zerolog.ErrorLevel, task). Str("commandText", cmdString). - Dur(logging.FieldTaskExecTime, timeSpent). + Dur(logging.FieldTaskExecTime, h.timer.NowTime().Sub(startTime)). Msgf("Task execution failed, stderr:\n%s\n", stderr.String()) + + taskResult := types.NewFailureProverTaskResult( + task.Id, + executorId, + h.cmdErrToTaskExec(err), + ) + return h.resultStorage.Put(ctx, taskResult) } } @@ -481,3 +493,32 @@ func (h *taskHandler) Handle(ctx context.Context, executorId types.TaskExecutorI taskResult := types.NewSuccessProverTaskResult(task.Id, executorId, desc.expectedResult, desc.binaryExpectedResults) return h.resultStorage.Put(ctx, taskResult) } + +// TODO RESC: move to command_executor once https://github.com/NilFoundation/nil/pull/3 is merged +func (h *taskHandler) cmdErrToTaskExec(err error) *types.TaskExecError { + var exitErr *exec.ExitError + if !errors.As(err, &exitErr) { + return types.NewTaskErrUnknown(err) + } + + status, ok := exitErr.Sys().(syscall.WaitStatus) + if !ok { + h.logger.Warn().Err(err).Msg("failed to get syscall.WaitStatus from exec.ExitError") + return types.NewTaskErrUnknown(err) + } + + if status.Signaled() { + signal := status.Signal() + return types.NewTaskExecErrorf(types.TaskErrTerminated, "process terminated by signal %s", signal) + } + + resultCode := constants.ProofProducerResultCode(status.ExitStatus()) + var taskErrType types.TaskErrType + if errType, ok := constants.ProofProducerErrors[resultCode]; ok { + taskErrType = errType + } else { + taskErrType = types.TaskErrUnknown + } + + return types.NewTaskExecErrorf(taskErrType, "process exited with code %d", resultCode) +} diff --git a/nil/services/synccommittee/public/task_view.go b/nil/services/synccommittee/public/task_view.go index 01362bf1d..a0de2c967 100644 --- a/nil/services/synccommittee/public/task_view.go +++ b/nil/services/synccommittee/public/task_view.go @@ -97,12 +97,17 @@ func NewTaskTreeFromEntry(taskEntry *types.TaskEntry, currentTime time.Time) *Ta func NewTaskTreeFromResult(result *types.TaskResultDetails) *TaskTreeView { var taskStatus TaskStatus - if result.IsSuccess { + if result.IsSuccess() { taskStatus = types.Completed } else { taskStatus = types.Failed } + var errorText string + if result.Error != nil { + errorText = result.Error.ErrText + } + return &TaskTreeView{ TaskViewCommon: TaskViewCommon{ Id: result.TaskId, @@ -113,7 +118,7 @@ func NewTaskTreeFromResult(result *types.TaskResultDetails) *TaskTreeView { Owner: result.Sender, Status: taskStatus, }, - ResultErrorText: result.ErrorText, + ResultErrorText: errorText, Dependencies: emptyDependencies(), } }