Skip to content

Commit a508c2d

Browse files
committed
fix(block-builder): min job size support in planning
1 parent 5757404 commit a508c2d

File tree

3 files changed

+74
-17
lines changed

3 files changed

+74
-17
lines changed

pkg/blockbuilder/scheduler/scheduler.go

+16-2
Original file line numberDiff line numberDiff line change
@@ -149,15 +149,27 @@ func (s *BlockScheduler) runOnce(ctx context.Context) error {
149149

150150
s.publishLagMetrics(lag)
151151

152-
jobs, err := s.planner.Plan(ctx, 1) // TODO(owen-d): parallelize work within a partition
152+
// TODO(owen-d): parallelize work within a partition
153+
// TODO(owen-d): skip small jobs unless they're stale,
154+
// e.g. a partition which is no longer being written to shouldn't be orphaned
155+
jobs, err := s.planner.Plan(ctx, 1, 0)
153156
if err != nil {
154157
level.Error(s.logger).Log("msg", "failed to plan jobs", "err", err)
155158
}
159+
level.Info(s.logger).Log("msg", "planned jobs", "count", len(jobs))
156160

157161
for _, job := range jobs {
158162
// TODO: end offset keeps moving each time we plan jobs, maybe we should not use it as part of the job ID
159163

160164
added, status, err := s.idempotentEnqueue(job)
165+
level.Info(s.logger).Log(
166+
"msg", "enqueued job",
167+
"added", added,
168+
"status", status.String(),
169+
"err", err,
170+
"partition", job.Job.Partition(),
171+
"num_offsets", job.Offsets().Max-job.Offsets().Min,
172+
)
161173

162174
// if we've either added or encountered an error, move on; we're done this cycle
163175
if added || err != nil {
@@ -253,7 +265,9 @@ func (s *BlockScheduler) HandleCompleteJob(ctx context.Context, job *types.Job,
253265

254266
// TODO(owen-d): cleaner way to enqueue next job for this partition,
255267
// don't make it part of the response cycle to job completion, etc.
256-
jobs, err := s.planner.Plan(ctx, 1)
268+
// NB(owen-d): only immediately enqueue another job for this partition if]
269+
// the job is full. Otherwise, we'd repeatedly enqueue tiny jobs with a few records.
270+
jobs, err := s.planner.Plan(ctx, 1, int(s.cfg.TargetRecordCount))
257271
if err != nil {
258272
level.Error(logger).Log("msg", "failed to plan subsequent jobs", "err", err)
259273
}

pkg/blockbuilder/scheduler/strategy.go

+9-2
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ type OffsetReader interface {
1919

2020
type Planner interface {
2121
Name() string
22-
Plan(ctx context.Context, maxJobsPerPartition int) ([]*JobWithMetadata, error)
22+
Plan(ctx context.Context, maxJobsPerPartition int, minOffsetsPerJob int) ([]*JobWithMetadata, error)
2323
}
2424

2525
const (
@@ -51,7 +51,8 @@ func (p *RecordCountPlanner) Name() string {
5151
return RecordCountStrategy
5252
}
5353

54-
func (p *RecordCountPlanner) Plan(ctx context.Context, maxJobsPerPartition int) ([]*JobWithMetadata, error) {
54+
func (p *RecordCountPlanner) Plan(ctx context.Context, maxJobsPerPartition int, minOffsetsPerJob int) ([]*JobWithMetadata, error) {
55+
level.Info(p.logger).Log("msg", "planning jobs", "max_jobs_per_partition", maxJobsPerPartition, "target_record_count", p.targetRecordCount, "lookback_period", p.lookbackPeriod.String())
5556
offsets, err := p.offsetReader.GroupLag(ctx, p.lookbackPeriod)
5657
if err != nil {
5758
level.Error(p.logger).Log("msg", "failed to get group lag", "err", err)
@@ -80,6 +81,12 @@ func (p *RecordCountPlanner) Plan(ctx context.Context, maxJobsPerPartition int)
8081
}
8182

8283
currentEnd := min(currentStart+p.targetRecordCount, endOffset)
84+
85+
// Skip creating job if it's smaller than minimum size
86+
if currentEnd-currentStart < int64(minOffsetsPerJob) {
87+
break
88+
}
89+
8390
job := NewJobWithMetadata(
8491
types.NewJob(partitionOffset.Partition, types.Offsets{
8592
Min: currentStart,

pkg/blockbuilder/scheduler/strategy_test.go

+49-13
Original file line numberDiff line numberDiff line change
@@ -30,14 +30,16 @@ func compareJobs(t *testing.T, expected, actual *JobWithMetadata) {
3030

3131
func TestRecordCountPlanner_Plan(t *testing.T) {
3232
for _, tc := range []struct {
33-
name string
34-
recordCount int64
35-
expectedJobs []*JobWithMetadata
36-
groupLag map[int32]kadm.GroupMemberLag
33+
name string
34+
recordCount int64
35+
minOffsetsPerJob int
36+
expectedJobs []*JobWithMetadata
37+
groupLag map[int32]kadm.GroupMemberLag
3738
}{
3839
{
39-
name: "single partition, single job",
40-
recordCount: 100,
40+
name: "single partition, single job",
41+
recordCount: 100,
42+
minOffsetsPerJob: 0,
4143
groupLag: map[int32]kadm.GroupMemberLag{
4244
0: {
4345
Commit: kadm.Offset{
@@ -57,8 +59,9 @@ func TestRecordCountPlanner_Plan(t *testing.T) {
5759
},
5860
},
5961
{
60-
name: "single partition, multiple jobs",
61-
recordCount: 50,
62+
name: "single partition, multiple jobs",
63+
recordCount: 50,
64+
minOffsetsPerJob: 0,
6265
groupLag: map[int32]kadm.GroupMemberLag{
6366
0: {
6467
Commit: kadm.Offset{
@@ -82,8 +85,9 @@ func TestRecordCountPlanner_Plan(t *testing.T) {
8285
},
8386
},
8487
{
85-
name: "multiple partitions",
86-
recordCount: 100,
88+
name: "multiple partitions",
89+
recordCount: 100,
90+
minOffsetsPerJob: 0,
8791
groupLag: map[int32]kadm.GroupMemberLag{
8892
0: {
8993
Commit: kadm.Offset{
@@ -120,8 +124,9 @@ func TestRecordCountPlanner_Plan(t *testing.T) {
120124
},
121125
},
122126
{
123-
name: "no lag",
124-
recordCount: 100,
127+
name: "no lag",
128+
recordCount: 100,
129+
minOffsetsPerJob: 0,
125130
groupLag: map[int32]kadm.GroupMemberLag{
126131
0: {
127132
Commit: kadm.Offset{
@@ -135,6 +140,37 @@ func TestRecordCountPlanner_Plan(t *testing.T) {
135140
},
136141
expectedJobs: nil,
137142
},
143+
{
144+
name: "skip small jobs",
145+
recordCount: 100,
146+
minOffsetsPerJob: 40,
147+
groupLag: map[int32]kadm.GroupMemberLag{
148+
0: {
149+
Commit: kadm.Offset{
150+
At: 100,
151+
},
152+
End: kadm.ListedOffset{
153+
Offset: 130, // Only 30 records available, less than minimum
154+
},
155+
Partition: 0,
156+
},
157+
1: {
158+
Commit: kadm.Offset{
159+
At: 200,
160+
},
161+
End: kadm.ListedOffset{
162+
Offset: 300, // 100 records available, more than minimum
163+
},
164+
Partition: 1,
165+
},
166+
},
167+
expectedJobs: []*JobWithMetadata{
168+
NewJobWithMetadata(
169+
types.NewJob(1, types.Offsets{Min: 201, Max: 300}),
170+
99, // priority is total remaining: 300-201
171+
),
172+
},
173+
},
138174
} {
139175
t.Run(tc.name, func(t *testing.T) {
140176
mockReader := &mockOffsetReader{
@@ -147,7 +183,7 @@ func TestRecordCountPlanner_Plan(t *testing.T) {
147183
}
148184
require.NoError(t, cfg.Validate())
149185
planner := NewRecordCountPlanner(mockReader, tc.recordCount, time.Hour, log.NewNopLogger())
150-
jobs, err := planner.Plan(context.Background(), 0)
186+
jobs, err := planner.Plan(context.Background(), 0, tc.minOffsetsPerJob)
151187
require.NoError(t, err)
152188

153189
require.Equal(t, len(tc.expectedJobs), len(jobs))

0 commit comments

Comments
 (0)