Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(blooms): Limit task retries in bloom planner #13139

Merged
merged 2 commits into from
Jun 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -3439,6 +3439,12 @@ shard_streams:
# CLI flag: -bloom-build.builder-response-timeout
[bloom_build_builder_response_timeout: <duration> | default = 0s]

# Experimental. Maximum number of retries for a failed task. If a task fails
# more than this number of times, it is considered failed and will not be
# retried. A value of 0 disables this limit.
# CLI flag: -bloom-build.task-max-retries
[bloom_build_task_max_retries: <int> | default = 3]

# Experimental. Length of the n-grams created when computing blooms from log
# lines.
# CLI flag: -bloom-compactor.ngram-length
Expand Down
1 change: 1 addition & 0 deletions pkg/bloombuild/planner/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type Limits interface {
BloomSplitSeriesKeyspaceBy(tenantID string) int
BloomBuildMaxBuilders(tenantID string) int
BuilderResponseTimeout(tenantID string) time.Duration
BloomTaskMaxRetries(tenantID string) int
}

type QueueLimits struct {
Expand Down
7 changes: 7 additions & 0 deletions pkg/bloombuild/planner/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type Metrics struct {
inflightRequests prometheus.Summary
tasksRequeued prometheus.Counter
taskLost prometheus.Counter
tasksFailed prometheus.Counter

buildStarted prometheus.Counter
buildCompleted *prometheus.CounterVec
Expand Down Expand Up @@ -79,6 +80,12 @@ func NewMetrics(
Name: "tasks_lost_total",
Help: "Total number of tasks lost due to not being picked up by a builder and failed to be requeued.",
}),
tasksFailed: promauto.With(r).NewCounter(prometheus.CounterOpts{
Namespace: metricsNamespace,
Subsystem: metricsSubsystem,
Name: "tasks_failed_total",
Help: "Total number of tasks that failed to be processed by builders (after the configured retries).",
}),

buildStarted: promauto.With(r).NewCounter(prometheus.CounterOpts{
Namespace: metricsNamespace,
Expand Down
15 changes: 15 additions & 0 deletions pkg/bloombuild/planner/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,7 @@ func (p *Planner) totalPendingTasks() (total int) {
func (p *Planner) enqueueTask(task *Task) error {
p.activeUsers.UpdateUserTimestamp(task.Tenant, time.Now())
return p.tasksQueue.Enqueue(task.Tenant, nil, task, func() {
task.timesEnqueued++
p.addPendingTask(task)
})
}
Expand Down Expand Up @@ -582,9 +583,23 @@ func (p *Planner) BuilderLoop(builder protos.PlannerForBuilder_BuilderLoopServer
}

if err := p.forwardTaskToBuilder(builder, builderID, task); err != nil {
maxRetries := p.limits.BloomTaskMaxRetries(task.Tenant)
if maxRetries > 0 && task.timesEnqueued >= maxRetries {
p.metrics.tasksFailed.Inc()
p.removePendingTask(task)
level.Error(logger).Log(
"msg", "task failed after max retries",
"retries", task.timesEnqueued,
"maxRetries", maxRetries,
"err", err,
)
continue
}

// Re-queue the task if the builder is failing to process the tasks
if err := p.enqueueTask(task); err != nil {
p.metrics.taskLost.Inc()
p.removePendingTask(task)
level.Error(logger).Log("msg", "error re-enqueuing task. this task will be lost", "err", err)
continue
}
Expand Down
46 changes: 36 additions & 10 deletions pkg/bloombuild/planner/planner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,9 @@ func Test_BuilderLoop(t *testing.T) {
expectedBuilderLoopError error

// modifyBuilder should leave the builder in a state where it will not return or return an error
modifyBuilder func(builder *fakeBuilder)
modifyBuilder func(builder *fakeBuilder)
shouldConsumeAfterModify bool

// resetBuilder should reset the builder to a state where it will return no errors
resetBuilder func(builder *fakeBuilder)
}{
Expand Down Expand Up @@ -434,6 +436,15 @@ func Test_BuilderLoop(t *testing.T) {
builder.SetReturnErrorMsg(false)
},
},
{
name: "exceed max retries",
limits: &fakeLimits{maxRetries: 1},
expectedBuilderLoopError: errPlannerIsNotRunning,
modifyBuilder: func(builder *fakeBuilder) {
builder.SetReturnError(true)
},
shouldConsumeAfterModify: true,
},
{
name: "timeout",
limits: &fakeLimits{
Expand Down Expand Up @@ -518,14 +529,24 @@ func Test_BuilderLoop(t *testing.T) {
require.NoError(t, err)
}

// Tasks should not be consumed
require.Neverf(
t, func() bool {
return planner.totalPendingTasks() == 0
},
5*time.Second, 10*time.Millisecond,
"all tasks were consumed but they should not be",
)
if tc.shouldConsumeAfterModify {
require.Eventuallyf(
t, func() bool {
return planner.totalPendingTasks() == 0
},
5*time.Second, 10*time.Millisecond,
"tasks not consumed, pending: %d", planner.totalPendingTasks(),
)
} else {
require.Neverf(
t, func() bool {
return planner.totalPendingTasks() == 0
},
5*time.Second, 10*time.Millisecond,
"all tasks were consumed but they should not be",
)
}

}

if tc.resetBuilder != nil {
Expand Down Expand Up @@ -655,7 +676,8 @@ func (f *fakeBuilder) Recv() (*protos.BuilderToPlanner, error) {
}

type fakeLimits struct {
timeout time.Duration
timeout time.Duration
maxRetries int
}

func (f *fakeLimits) BuilderResponseTimeout(_ string) time.Duration {
Expand All @@ -674,6 +696,10 @@ func (f *fakeLimits) BloomBuildMaxBuilders(_ string) int {
return 0
}

func (f *fakeLimits) BloomTaskMaxRetries(_ string) int {
return f.maxRetries
}

func parseDayTime(s string) config.DayTime {
t, err := time.Parse("2006-01-02", s)
if err != nil {
Expand Down
5 changes: 3 additions & 2 deletions pkg/bloombuild/planner/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ type Task struct {
*protos.Task

// Tracking
queueTime time.Time
ctx context.Context
timesEnqueued int
queueTime time.Time
ctx context.Context
}

func NewTask(ctx context.Context, queueTime time.Time, task *protos.Task) *Task {
Expand Down
6 changes: 6 additions & 0 deletions pkg/validation/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ type Limits struct {
BloomSplitSeriesKeyspaceBy int `yaml:"bloom_split_series_keyspace_by" json:"bloom_split_series_keyspace_by" category:"experimental"`
BloomBuildMaxBuilders int `yaml:"bloom_build_max_builders" json:"bloom_build_max_builders" category:"experimental"`
BuilderResponseTimeout time.Duration `yaml:"bloom_build_builder_response_timeout" json:"bloom_build_builder_response_timeout" category:"experimental"`
BloomTaskMaxRetries int `yaml:"bloom_build_task_max_retries" json:"bloom_build_task_max_retries" category:"experimental"`

BloomNGramLength int `yaml:"bloom_ngram_length" json:"bloom_ngram_length" category:"experimental"`
BloomNGramSkip int `yaml:"bloom_ngram_skip" json:"bloom_ngram_skip" category:"experimental"`
Expand Down Expand Up @@ -391,6 +392,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&l.BloomSplitSeriesKeyspaceBy, "bloom-build.split-keyspace-by", 256, "Experimental. Number of splits to create for the series keyspace when building blooms. The series keyspace is split into this many parts to parallelize bloom creation.")
f.IntVar(&l.BloomBuildMaxBuilders, "bloom-build.max-builders", 0, "Experimental. Maximum number of builders to use when building blooms. 0 allows unlimited builders.")
f.DurationVar(&l.BuilderResponseTimeout, "bloom-build.builder-response-timeout", 0, "Experimental. Timeout for a builder to finish a task. If a builder does not respond within this time, it is considered failed and the task will be requeued. 0 disables the timeout.")
f.IntVar(&l.BloomTaskMaxRetries, "bloom-build.task-max-retries", 3, "Experimental. Maximum number of retries for a failed task. If a task fails more than this number of times, it is considered failed and will not be retried. A value of 0 disables this limit.")

_ = l.BloomCompactorMaxBloomSize.Set(defaultBloomCompactorMaxBloomSize)
f.Var(&l.BloomCompactorMaxBloomSize, "bloom-compactor.max-bloom-size",
Expand Down Expand Up @@ -1005,6 +1007,10 @@ func (o *Overrides) BuilderResponseTimeout(userID string) time.Duration {
return o.getOverridesForUser(userID).BuilderResponseTimeout
}

func (o *Overrides) BloomTaskMaxRetries(userID string) int {
return o.getOverridesForUser(userID).BloomTaskMaxRetries
}

func (o *Overrides) BloomNGramLength(userID string) int {
return o.getOverridesForUser(userID).BloomNGramLength
}
Expand Down
Loading