Skip to content

Commit

Permalink
Sync Committee: Task Rescheduling
Browse files Browse the repository at this point in the history
* Declared `TaskExecError` with `TaskErrType` enum, integrated it into `TaskResult`;
* `TaskStorage.ProcessTaskResult`: resetting task for later re-execution in case of non-critical error;
* `TaskStateChangeHandler (proof_provider)`: do not propagate non-critical error to the parent task;
* `TaskHandler (prover)`: tracking kernel signals and return codes of the child process (`proof-producer`);

* `TaskScheduler`: validating result against task entry before `OnTaskTerminated` call;
* `TaskStorage`: publishing metrics only after successful tx commit in `ProcessTaskResult` and `RescheduleHangingTasks` methods;

[refactoring]
* Declared `commonStorage` type reused by `TaskStorage`, `BlockStorage` and `TaskResultStorage`;
  • Loading branch information
zadykian committed Jan 28, 2025
1 parent 09aa5d5 commit 82ccb92
Show file tree
Hide file tree
Showing 26 changed files with 535 additions and 393 deletions.
5 changes: 5 additions & 0 deletions nil/common/check/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ func PanicIfNot(flag bool) {
}
}

// PanicIff panics on true with the given message.
func PanicIff(flag bool, format string, args ...interface{}) {
PanicIfNotf(!flag, format, args...)
}

// PanicIfNotf panics on false with the given message.
func PanicIfNotf(flag bool, format string, args ...interface{}) {
if !flag {
Expand Down
19 changes: 14 additions & 5 deletions nil/services/synccommittee/Makefile.inc
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,22 @@ generate_synccommittee_mocks: \
$(root_sce)/internal/api/task_handler_generated_mock.go: $(root_sce)/internal/api/task_handler.go $(root_sce)/internal/types/prover_tasks.go
go generate $(root_sce)/internal/api

$(root_sce)/internal/api/task_request_handler_generated_mock.go: $(root_sce)/internal/api/task_request_handler.go $(root_sce)/internal/types/prover_tasks.go
$(root_sce)/internal/api/task_request_handler_generated_mock.go: \
$(root_sce)/internal/api/task_request_handler.go \
$(root_sce)/internal/types/task_result.go \
$(root_sce)/internal/types/prover_tasks.go
go generate $(root_sce)/internal/api

$(root_sce)/internal/api/task_state_change_handler_generated_mock.go: $(root_sce)/internal/api/task_state_change_handler.go $(root_sce)/internal/types/prover_tasks.go
$(root_sce)/internal/api/task_state_change_handler_generated_mock.go: \
$(root_sce)/internal/api/task_state_change_handler.go \
$(root_sce)/internal/types/prover_tasks.go
go generate $(root_sce)/internal/api

$(root_sce)/internal/scheduler/task_scheduler_generated_mock.go: \
$(root_sce)/internal/scheduler/task_scheduler.go \
$(root_sce)/internal/api/task_request_handler.go \
$(root_sce)/internal/types/prover_tasks.go \
$(root_sce)/internal/types/task_result.go \
$(root_sce)/public/task_debug_api.go \
$(root_sce)/public/task_view.go
go generate $(root_sce)/internal/scheduler
Expand All @@ -48,16 +54,19 @@ synccommittee_types: \
$(root_sce)/internal/types/proverresulttype_string.go \
$(root_sce)/internal/types/taskstatus_string.go \
$(root_sce)/internal/types/circuittype_string.go \
$(root_sce)/internal/types/taskerrtype_string.go \
$(root_sce)/public/taskdebugorder_string.go

$(root_sce)/internal/types/tasktype_string.go: $(root_sce)/internal/types/prover_tasks.go
$(root_sce)/internal/types/tasktype_string.go: $(root_sce)/internal/types/task_type.go
go generate $(root_sce)/internal/types
$(root_sce)/internal/types/proverresulttype_string.go: $(root_sce)/internal/types/prover_tasks.go
$(root_sce)/internal/types/proverresulttype_string.go: $(root_sce)/internal/types/task_result.go
go generate $(root_sce)/internal/types
$(root_sce)/internal/types/taskstatus_string.go: $(root_sce)/internal/types/prover_tasks.go
$(root_sce)/internal/types/taskstatus_string.go: $(root_sce)/internal/types/task_status.go
go generate $(root_sce)/internal/types
$(root_sce)/internal/types/circuittype_string.go: $(root_sce)/internal/types/prover_tasks.go
go generate $(root_sce)/internal/types
$(root_sce)/internal/types/taskerrtype_string.go: $(root_sce)/internal/types/errors.go
go generate $(root_sce)/internal/types
$(root_sce)/public/taskdebugorder_string.go: $(root_sce)/public/task_debug_api.go
go generate $(root_sce)/public

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package core

import (
"context"
"errors"
"testing"

"github.com/NilFoundation/nil/nil/common"
Expand Down Expand Up @@ -79,7 +78,7 @@ func (s *BlockTasksIntegrationTestSuite) Test_Provide_Tasks_And_Handle_Success_R
proofTasks, err := batch.CreateProofTasks(s.timer.NowTime())
s.Require().NoError(err)

err = s.taskStorage.AddTaskEntries(s.ctx, proofTasks)
err = s.taskStorage.AddTaskEntries(s.ctx, proofTasks...)
s.Require().NoError(err)

executorId := testaide.RandomExecutorId()
Expand Down Expand Up @@ -131,7 +130,7 @@ func (s *BlockTasksIntegrationTestSuite) Test_Provide_Tasks_And_Handle_Failure_R
proofTasks, err := batch.CreateProofTasks(s.timer.NowTime())
s.Require().NoError(err)

err = s.taskStorage.AddTaskEntries(s.ctx, proofTasks)
err = s.taskStorage.AddTaskEntries(s.ctx, proofTasks...)
s.Require().NoError(err)

executorId := testaide.RandomExecutorId()
Expand All @@ -157,7 +156,7 @@ func (s *BlockTasksIntegrationTestSuite) Test_Provide_Tasks_And_Handle_Failure_R
aggregateProofsFailed := types.NewFailureProviderTaskResult(
taskToExecute.Id,
executorId,
errors.New("something went wrong"),
types.NewTaskExecError(types.TaskErrProofFailed, "block proof failed"),
)

err = s.scheduler.SetTaskResult(s.ctx, aggregateProofsFailed)
Expand Down
10 changes: 6 additions & 4 deletions nil/services/synccommittee/core/task_state_change_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"

"github.com/NilFoundation/nil/nil/common/logging"
"github.com/NilFoundation/nil/nil/services/synccommittee/internal/api"
"github.com/NilFoundation/nil/nil/services/synccommittee/internal/log"
"github.com/NilFoundation/nil/nil/services/synccommittee/internal/storage"
Expand Down Expand Up @@ -33,14 +34,15 @@ func (h taskStateChangeHandler) OnTaskTerminated(ctx context.Context, task *type
return nil
}

if !result.IsSuccess {
log.NewTaskEvent(h.logger, zerolog.WarnLevel, task).
Str("errorText", result.ErrorText).
if !result.IsSuccess() {
log.NewTaskResultEvent(h.logger, zerolog.WarnLevel, result).
Msg("block proof task has failed, data won't be sent to the L1")
return nil
}

log.NewTaskEvent(h.logger, zerolog.InfoLevel, task).Msg("Proof batch completed")
log.NewTaskResultEvent(h.logger, zerolog.InfoLevel, result).
Stringer(logging.FieldBatchId, task.BatchId).
Msg("Proof batch completed")

blockId := types.NewBlockId(task.ShardId, task.BlockHash)

Expand Down
13 changes: 10 additions & 3 deletions nil/services/synccommittee/internal/log/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,16 @@ func NewTaskResultEvent(
result *types.TaskResult,
) *zerolog.Event {
//nolint:zerologlint // 'must be dispatched by Msg or Send method' error is ignored
return logger.WithLevel(level).
event := logger.WithLevel(level).
Stringer(logging.FieldTaskId, result.TaskId).
Stringer(logging.FieldTaskExecutorId, result.Sender).
Bool("isSuccess", result.IsSuccess).
Str("errorText", result.ErrorText)
Bool("isSuccess", result.IsSuccess())

if result.IsSuccess() {
return event
}

return event.
Str("errorText", result.Error.ErrText).
Stringer("errorType", result.Error.ErrType)
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@ import (
"go.opentelemetry.io/otel/metric"
)

const (
attrTaskType = "task.type"
attrTaskExecutor = "task.executor.id"
)

type taskStorageMetricsHandler struct {
attributes metric.MeasurementOption

Expand Down Expand Up @@ -77,7 +82,7 @@ func (h *taskStorageMetricsHandler) RecordTaskTerminated(ctx context.Context, ta
taskAttributes := getTaskAttributes(taskEntry)
h.currentActiveTasks.Add(ctx, -1, h.attributes, taskAttributes)

if taskResult.IsSuccess {
if taskResult.IsSuccess() {
executionTimeMs := time.Since(*taskEntry.Started).Milliseconds()
h.taskExecutionTimeMs.Record(ctx, executionTimeMs, h.attributes, taskAttributes)
h.totalTasksSucceeded.Add(ctx, 1, h.attributes, taskAttributes)
Expand All @@ -86,20 +91,24 @@ func (h *taskStorageMetricsHandler) RecordTaskTerminated(ctx context.Context, ta
}
}

func (h *taskStorageMetricsHandler) RecordTaskRescheduled(ctx context.Context, taskEntry *types.TaskEntry) {
taskAttributes := getTaskAttributes(taskEntry)
func (h *taskStorageMetricsHandler) RecordTaskRescheduled(ctx context.Context, taskType types.TaskType, previousExecutor types.TaskExecutorId) {
taskAttributes := telattr.With(
attribute.Stringer(attrTaskType, taskType),
attribute.Int64(attrTaskExecutor, int64(previousExecutor)),
)

h.totalTasksRescheduled.Add(ctx, 1, h.attributes, taskAttributes)
h.currentActiveTasks.Add(ctx, -1, h.attributes, taskAttributes)
h.currentPendingTasks.Add(ctx, 1, h.attributes, taskAttributes)
}

func getTaskAttributes(task *types.TaskEntry) metric.MeasurementOption {
attributes := []attribute.KeyValue{
attribute.Stringer("task.type", task.Task.TaskType),
attribute.Stringer(attrTaskType, task.Task.TaskType),
}

if task.Owner != types.UnknownExecutorId {
attributes = append(attributes, attribute.Int64("task.executor.id", int64(task.Owner)))
attributes = append(attributes, attribute.Int64(attrTaskExecutor, int64(task.Owner)))
}

return telattr.With(attributes...)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ func (s *TaskSchedulerDebugRpcTestSuite) requestAndSendResult(
if completeSuccessfully {
taskResult = testaide.NewSuccessTaskResult(taskToExec.Id, executor)
} else {
taskResult = testaide.NewFailureTaskResult(taskToExec.Id, executor)
taskResult = testaide.NewNonRetryableErrorTaskResult(taskToExec.Id, executor)
}

err = s.storage.ProcessTaskResult(s.context, taskResult)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package rpc

import (
"errors"
"testing"

"github.com/NilFoundation/nil/nil/services/synccommittee/internal/api"
Expand Down Expand Up @@ -67,7 +66,11 @@ func (s *TaskRequestHandlerTestSuite) Test_TaskRequestHandler_UpdateTaskStatus()
},
{
"Failure_Result_Provider",
types.NewFailureProverTaskResult(types.NewTaskId(), testaide.RandomExecutorId(), errors.New("something went wrong")),
types.NewFailureProverTaskResult(
types.NewTaskId(),
testaide.RandomExecutorId(),
types.NewTaskExecError(types.TaskErrUnknown, "something went wrong"),
),
},
}

Expand Down
22 changes: 12 additions & 10 deletions nil/services/synccommittee/internal/scheduler/task_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,23 +98,24 @@ func (s *taskSchedulerImpl) SetTaskResult(ctx context.Context, result *types.Tas

entry, err := s.storage.TryGetTaskEntry(ctx, result.TaskId)
if err != nil {
s.onTaskResultError(ctx, err, result)
return err
return s.onTaskResultError(ctx, err, result)
}

if entry == nil {
log.NewTaskResultEvent(s.logger, zerolog.WarnLevel, result).Msg("received task result update for unknown task id")
return nil
}

if err = s.stateHandler.OnTaskTerminated(ctx, &entry.Task, result); err != nil {
s.onTaskResultError(ctx, err, result)
return fmt.Errorf("%w: %w", ErrFailedToProcessTaskResult, err)
if err := result.ValidateAgainst(entry); err != nil {
return s.onTaskResultError(ctx, err, result)
}

if err = s.storage.ProcessTaskResult(ctx, result); err != nil {
s.onTaskResultError(ctx, err, result)
return fmt.Errorf("%w: %w", ErrFailedToProcessTaskResult, err)
if err := s.stateHandler.OnTaskTerminated(ctx, &entry.Task, result); err != nil {
return s.onTaskResultError(ctx, err, result)
}

if err := s.storage.ProcessTaskResult(ctx, result); err != nil {
return s.onTaskResultError(ctx, err, result)
}

return nil
Expand Down Expand Up @@ -205,9 +206,10 @@ func (s *taskSchedulerImpl) GetTaskTree(ctx context.Context, taskId types.TaskId
return s.storage.GetTaskTreeView(ctx, taskId)
}

func (s *taskSchedulerImpl) onTaskResultError(ctx context.Context, err error, result *types.TaskResult) {
log.NewTaskResultEvent(s.logger, zerolog.ErrorLevel, result).Err(err).Msg("Failed to process task result")
func (s *taskSchedulerImpl) onTaskResultError(ctx context.Context, cause error, result *types.TaskResult) error {
log.NewTaskResultEvent(s.logger, zerolog.ErrorLevel, result).Err(cause).Msg("Failed to process task result")
s.recordError(ctx)
return fmt.Errorf("%w: %w", ErrFailedToProcessTaskResult, cause)
}

func (s *taskSchedulerImpl) Run(ctx context.Context) error {
Expand Down
Loading

0 comments on commit 82ccb92

Please sign in to comment.