Skip to content

Commit 6a78b3b

Browse files
ashwanthgolimveitas
authored andcommitted
fix(block-scheduler): init record planner correctly (grafana#15390)
1 parent 9c4ac59 commit 6a78b3b

File tree

5 files changed

+49
-26
lines changed

5 files changed

+49
-26
lines changed

pkg/blockbuilder/scheduler/scheduler.go

+11-5
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ type Config struct {
2828
Interval time.Duration `yaml:"interval"`
2929
LookbackPeriod time.Duration `yaml:"lookback_period"`
3030
Strategy string `yaml:"strategy"`
31-
planner Planner `yaml:"-"` // validated planner
3231
TargetRecordCount int64 `yaml:"target_record_count"`
3332
}
3433

@@ -74,7 +73,6 @@ func (cfg *Config) Validate() error {
7473
if cfg.TargetRecordCount <= 0 {
7574
return errors.New("target record count must be a non-zero value")
7675
}
77-
cfg.planner = NewRecordCountPlanner(cfg.TargetRecordCount)
7876
default:
7977
return fmt.Errorf("invalid strategy: %s", cfg.Strategy)
8078
}
@@ -96,17 +94,25 @@ type BlockScheduler struct {
9694
}
9795

9896
// NewScheduler creates a new scheduler instance
99-
func NewScheduler(cfg Config, queue *JobQueue, offsetManager partition.OffsetManager, logger log.Logger, r prometheus.Registerer) *BlockScheduler {
97+
func NewScheduler(cfg Config, queue *JobQueue, offsetManager partition.OffsetManager, logger log.Logger, r prometheus.Registerer) (*BlockScheduler, error) {
98+
var planner Planner
99+
switch cfg.Strategy {
100+
case RecordCountStrategy:
101+
planner = NewRecordCountPlanner(offsetManager, cfg.TargetRecordCount, cfg.LookbackPeriod, logger)
102+
default:
103+
return nil, fmt.Errorf("invalid strategy: %s", cfg.Strategy)
104+
}
105+
100106
s := &BlockScheduler{
101107
cfg: cfg,
102-
planner: cfg.planner,
108+
planner: planner,
103109
offsetManager: offsetManager,
104110
logger: logger,
105111
metrics: NewMetrics(r),
106112
queue: queue,
107113
}
108114
s.Service = services.NewBasicService(nil, s.running, nil)
109-
return s
115+
return s, nil
110116
}
111117

112118
func (s *BlockScheduler) running(ctx context.Context) error {

pkg/blockbuilder/scheduler/scheduler_test.go

+23-13
Original file line numberDiff line numberDiff line change
@@ -40,13 +40,17 @@ func (m *mockOffsetManager) Commit(_ context.Context, _ int32, _ int64) error {
4040
return nil
4141
}
4242

43-
func newTestEnv(builderID string) *testEnv {
43+
func newTestEnv(builderID string) (*testEnv, error) {
4444
queue := NewJobQueue()
4545
mockOffsetMgr := &mockOffsetManager{
4646
topic: "test-topic",
4747
consumerGroup: "test-group",
4848
}
49-
scheduler := NewScheduler(Config{}, queue, mockOffsetMgr, log.NewNopLogger(), prometheus.NewRegistry())
49+
scheduler, err := NewScheduler(Config{Strategy: RecordCountStrategy}, queue, mockOffsetMgr, log.NewNopLogger(), prometheus.NewRegistry())
50+
if err != nil {
51+
return nil, err
52+
}
53+
5054
transport := types.NewMemoryTransport(scheduler)
5155
builder := NewWorker(builderID, transport)
5256

@@ -55,16 +59,20 @@ func newTestEnv(builderID string) *testEnv {
5559
scheduler: scheduler,
5660
transport: transport,
5761
builder: builder,
58-
}
62+
}, err
5963
}
6064

6165
func TestScheduleAndProcessJob(t *testing.T) {
62-
env := newTestEnv("test-builder-1")
66+
env, err := newTestEnv("test-builder-1")
67+
if err != nil {
68+
t.Fatalf("failed to create test environment: %v", err)
69+
}
70+
6371
ctx := context.Background()
6472

6573
// Create and enqueue a test job
6674
job := types.NewJob(1, types.Offsets{Min: 100, Max: 200})
67-
err := env.queue.Enqueue(job, 100)
75+
err = env.queue.Enqueue(job, 100)
6876
if err != nil {
6977
t.Fatalf("failed to enqueue job: %v", err)
7078
}
@@ -98,21 +106,27 @@ func TestScheduleAndProcessJob(t *testing.T) {
98106
}
99107

100108
func TestContextCancellation(t *testing.T) {
101-
env := newTestEnv("test-builder-1")
109+
env, err := newTestEnv("test-builder-1")
110+
if err != nil {
111+
t.Fatalf("failed to create test environment: %v", err)
112+
}
102113
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
103114
defer cancel()
104115

105116
// Try to get job after context timeout
106117
time.Sleep(20 * time.Millisecond)
107-
_, _, err := env.builder.GetJob(ctx)
118+
_, _, err = env.builder.GetJob(ctx)
108119
if err == nil {
109120
t.Error("expected error from cancelled context")
110121
}
111122
}
112123

113124
func TestMultipleBuilders(t *testing.T) {
114125
// Create first environment
115-
env1 := newTestEnv("test-builder-1")
126+
env1, err := newTestEnv("test-builder-1")
127+
if err != nil {
128+
t.Fatalf("failed to create test environment: %v", err)
129+
}
116130
// Create second builder using same scheduler
117131
builder2 := NewWorker("test-builder-2", env1.transport)
118132

@@ -123,7 +137,7 @@ func TestMultipleBuilders(t *testing.T) {
123137
job2 := types.NewJob(2, types.Offsets{Min: 300, Max: 400})
124138

125139
// Enqueue jobs
126-
err := env1.queue.Enqueue(job1, 100)
140+
err = env1.queue.Enqueue(job1, 100)
127141
if err != nil {
128142
t.Fatalf("failed to enqueue job1: %v", err)
129143
}
@@ -268,10 +282,6 @@ func TestConfig_Validate(t *testing.T) {
268282
if err != nil {
269283
t.Errorf("Validate() error = %v, wantErr nil", err)
270284
}
271-
// Check that planner is set for valid configs
272-
if tt.cfg.planner == nil {
273-
t.Error("Validate() did not set planner for valid config")
274-
}
275285
})
276286
}
277287
}

pkg/blockbuilder/scheduler/strategy.go

+8-3
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package scheduler
33
import (
44
"context"
55
"sort"
6+
"time"
67

78
"github.com/go-kit/log"
89
"github.com/go-kit/log/level"
@@ -13,7 +14,7 @@ import (
1314

1415
// OffsetReader is an interface to list offsets for all partitions of a topic from Kafka.
1516
type OffsetReader interface {
16-
GroupLag(context.Context) (map[int32]kadm.GroupMemberLag, error)
17+
GroupLag(context.Context, time.Duration) (map[int32]kadm.GroupMemberLag, error)
1718
}
1819

1920
type Planner interface {
@@ -32,13 +33,17 @@ var validStrategies = []string{
3233
// tries to consume upto targetRecordCount records per partition
3334
type RecordCountPlanner struct {
3435
targetRecordCount int64
36+
lookbackPeriod time.Duration
3537
offsetReader OffsetReader
3638
logger log.Logger
3739
}
3840

39-
func NewRecordCountPlanner(targetRecordCount int64) *RecordCountPlanner {
41+
func NewRecordCountPlanner(offsetReader OffsetReader, targetRecordCount int64, lookbackPeriod time.Duration, logger log.Logger) *RecordCountPlanner {
4042
return &RecordCountPlanner{
4143
targetRecordCount: targetRecordCount,
44+
lookbackPeriod: lookbackPeriod,
45+
offsetReader: offsetReader,
46+
logger: logger,
4247
}
4348
}
4449

@@ -47,7 +52,7 @@ func (p *RecordCountPlanner) Name() string {
4752
}
4853

4954
func (p *RecordCountPlanner) Plan(ctx context.Context) ([]*JobWithMetadata, error) {
50-
offsets, err := p.offsetReader.GroupLag(ctx)
55+
offsets, err := p.offsetReader.GroupLag(ctx, p.lookbackPeriod)
5156
if err != nil {
5257
level.Error(p.logger).Log("msg", "failed to get group lag", "err", err)
5358
return nil, err

pkg/blockbuilder/scheduler/strategy_test.go

+3-4
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"testing"
66
"time"
77

8+
"github.com/go-kit/log"
89
"github.com/stretchr/testify/require"
910
"github.com/twmb/franz-go/pkg/kadm"
1011

@@ -15,7 +16,7 @@ type mockOffsetReader struct {
1516
groupLag map[int32]kadm.GroupMemberLag
1617
}
1718

18-
func (m *mockOffsetReader) GroupLag(_ context.Context) (map[int32]kadm.GroupMemberLag, error) {
19+
func (m *mockOffsetReader) GroupLag(_ context.Context, _ time.Duration) (map[int32]kadm.GroupMemberLag, error) {
1920
return m.groupLag, nil
2021
}
2122

@@ -145,9 +146,7 @@ func TestRecordCountPlanner_Plan(t *testing.T) {
145146
TargetRecordCount: tc.recordCount,
146147
}
147148
require.NoError(t, cfg.Validate())
148-
149-
planner := NewRecordCountPlanner(tc.recordCount)
150-
planner.offsetReader = mockReader
149+
planner := NewRecordCountPlanner(mockReader, tc.recordCount, time.Hour, log.NewNopLogger())
151150

152151
jobs, err := planner.Plan(context.Background())
153152
require.NoError(t, err)

pkg/loki/modules.go

+4-1
Original file line numberDiff line numberDiff line change
@@ -1877,13 +1877,16 @@ func (t *Loki) initBlockScheduler() (services.Service, error) {
18771877
return nil, fmt.Errorf("creating kafka offset manager: %w", err)
18781878
}
18791879

1880-
s := blockscheduler.NewScheduler(
1880+
s, err := blockscheduler.NewScheduler(
18811881
t.Cfg.BlockScheduler,
18821882
blockscheduler.NewJobQueueWithLogger(logger),
18831883
offsetManager,
18841884
logger,
18851885
prometheus.DefaultRegisterer,
18861886
)
1887+
if err != nil {
1888+
return nil, err
1889+
}
18871890

18881891
blockprotos.RegisterSchedulerServiceServer(
18891892
t.Server.GRPC,

0 commit comments

Comments
 (0)