Skip to content

Commit fc4f463

Browse files
committed
return task error
1 parent 1f4ed55 commit fc4f463

File tree

2 files changed

+32
-12
lines changed

2 files changed

+32
-12
lines changed

pkg/bloombuild/planner/planner.go

+30-10
Original file line numberDiff line numberDiff line change
@@ -728,6 +728,7 @@ func (p *Planner) BuilderLoop(builder protos.PlannerForBuilder_BuilderLoopServer
728728
return fmt.Errorf("dequeue() call resulted in nil response. builder: %s", builderID)
729729
}
730730
task := item.(*QueueTask)
731+
logger := log.With(logger, "task", task.ID)
731732

732733
queueTime := time.Since(task.queueTime)
733734
p.metrics.queueDuration.Observe(queueTime.Seconds())
@@ -739,7 +740,8 @@ func (p *Planner) BuilderLoop(builder protos.PlannerForBuilder_BuilderLoopServer
739740
continue
740741
}
741742

742-
if err := p.forwardTaskToBuilder(builder, builderID, task); err != nil {
743+
result, err := p.forwardTaskToBuilder(builder, builderID, task)
744+
if err != nil {
743745
maxRetries := p.limits.BloomTaskMaxRetries(task.Tenant)
744746
if maxRetries > 0 && task.timesEnqueued >= maxRetries {
745747
p.metrics.tasksFailed.Inc()
@@ -750,6 +752,10 @@ func (p *Planner) BuilderLoop(builder protos.PlannerForBuilder_BuilderLoopServer
750752
"maxRetries", maxRetries,
751753
"err", err,
752754
)
755+
task.resultsChannel <- &protos.TaskResult{
756+
TaskID: task.ID,
757+
Error: fmt.Errorf("task failed after max retries (%d): %w", maxRetries, err),
758+
}
753759
continue
754760
}
755761

@@ -758,12 +764,29 @@ func (p *Planner) BuilderLoop(builder protos.PlannerForBuilder_BuilderLoopServer
758764
p.metrics.taskLost.Inc()
759765
p.removePendingTask(task)
760766
level.Error(logger).Log("msg", "error re-enqueuing task. this task will be lost", "err", err)
767+
task.resultsChannel <- &protos.TaskResult{
768+
TaskID: task.ID,
769+
Error: fmt.Errorf("error re-enqueuing task: %w", err),
770+
}
761771
continue
762772
}
763773

764774
p.metrics.tasksRequeued.Inc()
765-
level.Error(logger).Log("msg", "error forwarding task to builder, Task requeued", "err", err)
775+
level.Error(logger).Log(
776+
"msg", "error forwarding task to builder, Task requeued",
777+
"retries", task.timesEnqueued,
778+
"err", err,
779+
)
780+
continue
766781
}
782+
783+
level.Debug(logger).Log(
784+
"msg", "task completed",
785+
"duration", time.Since(task.queueTime).Seconds(),
786+
"retries", task.timesEnqueued,
787+
)
788+
p.removePendingTask(task)
789+
task.resultsChannel <- result
767790
}
768791

769792
return errPlannerIsNotRunning
@@ -773,15 +796,13 @@ func (p *Planner) forwardTaskToBuilder(
773796
builder protos.PlannerForBuilder_BuilderLoopServer,
774797
builderID string,
775798
task *QueueTask,
776-
) error {
777-
defer p.removePendingTask(task)
778-
799+
) (*protos.TaskResult, error) {
779800
msg := &protos.PlannerToBuilder{
780801
Task: task.ToProtoTask(),
781802
}
782803

783804
if err := builder.Send(msg); err != nil {
784-
return fmt.Errorf("error sending task to builder (%s): %w", builderID, err)
805+
return nil, fmt.Errorf("error sending task to builder (%s): %w", builderID, err)
785806
}
786807

787808
// Launch a goroutine to wait for the response from the builder so we can
@@ -811,12 +832,11 @@ func (p *Planner) forwardTaskToBuilder(
811832
// Note: Errors from the result are not returned here since we don't retry tasks
812833
// that return with an error. I.e. we won't retry errors forwarded from the builder.
813834
// TODO(salvacorts): Filter and return errors that can be retried.
814-
task.resultsChannel <- result
815-
return nil
835+
return result, nil
816836
case err := <-errCh:
817-
return err
837+
return nil, err
818838
case <-timeout:
819-
return fmt.Errorf("timeout waiting for response from builder (%s)", builderID)
839+
return nil, fmt.Errorf("timeout waiting for response from builder (%s)", builderID)
820840
}
821841
}
822842

pkg/bloombuild/planner/planner_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -497,8 +497,8 @@ func Test_BuilderLoop(t *testing.T) {
497497
},
498498
} {
499499
t.Run(tc.name, func(t *testing.T) {
500-
logger := log.NewNopLogger()
501-
//logger := log.NewLogfmtLogger(os.Stdout)
500+
//logger := log.NewNopLogger()
501+
logger := log.NewLogfmtLogger(os.Stdout)
502502

503503
cfg := Config{
504504
PlanningInterval: 1 * time.Hour,

0 commit comments

Comments
 (0)