6
6
"flag"
7
7
"fmt"
8
8
"net/http"
9
+ "sort"
9
10
"strconv"
10
11
"strings"
11
12
"time"
@@ -25,13 +26,12 @@ var (
25
26
)
26
27
27
28
type Config struct {
28
- ConsumerGroup string `yaml:"consumer_group"`
29
- Interval time.Duration `yaml:"interval"`
30
- LookbackPeriod time.Duration `yaml:"lookback_period"`
31
- Strategy string `yaml:"strategy"`
32
- TargetRecordCount int64 `yaml:"target_record_count"`
33
- MaxJobsPlannedPerInterval int `yaml:"max_jobs_planned_per_interval"`
34
- JobQueueConfig JobQueueConfig `yaml:"job_queue"`
29
+ ConsumerGroup string `yaml:"consumer_group"`
30
+ Interval time.Duration `yaml:"interval"`
31
+ LookbackPeriod time.Duration `yaml:"lookback_period"`
32
+ Strategy string `yaml:"strategy"`
33
+ TargetRecordCount int64 `yaml:"target_record_count"`
34
+ JobQueueConfig JobQueueConfig `yaml:"job_queue"`
35
35
}
36
36
37
37
func (cfg * Config ) RegisterFlagsWithPrefix (prefix string , f * flag.FlagSet ) {
@@ -56,12 +56,6 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
56
56
RecordCountStrategy ,
57
57
),
58
58
)
59
- f .IntVar (
60
- & cfg .MaxJobsPlannedPerInterval ,
61
- prefix + "max-jobs-planned-per-interval" ,
62
- 100 ,
63
- "Maximum number of jobs that the planner can return." ,
64
- )
65
59
cfg .JobQueueConfig .RegisterFlags (f )
66
60
}
67
61
@@ -155,33 +149,18 @@ func (s *BlockScheduler) runOnce(ctx context.Context) error {
155
149
156
150
s .publishLagMetrics (lag )
157
151
158
- jobs , err := s .planner .Plan (ctx , s . cfg . MaxJobsPlannedPerInterval )
152
+ jobs , err := s .planner .Plan (ctx , 1 ) // TODO(owen-d): parallelize work within a partition
159
153
if err != nil {
160
154
level .Error (s .logger ).Log ("msg" , "failed to plan jobs" , "err" , err )
161
155
}
162
156
163
157
for _ , job := range jobs {
164
158
// TODO: end offset keeps moving each time we plan jobs, maybe we should not use it as part of the job ID
165
159
166
- logger := log .With (
167
- s .logger ,
168
- "job" , job .Job .ID (),
169
- "priority" , job .Priority ,
170
- )
171
-
172
- status , ok := s .queue .Exists (job .Job )
173
-
174
- // scheduler is unaware of incoming job; enqueue
175
- if ! ok {
176
- level .Debug (logger ).Log (
177
- "msg" , "job does not exist, enqueueing" ,
178
- )
179
-
180
- // enqueue
181
- if err := s .queue .Enqueue (job .Job , job .Priority ); err != nil {
182
- level .Error (logger ).Log ("msg" , "failed to enqueue job" , "err" , err )
183
- }
160
+ added , status , err := s .idempotentEnqueue (job )
184
161
162
+ // if we've either added or encountered an error, move on; we're done this cycle
163
+ if added || err != nil {
185
164
continue
186
165
}
187
166
@@ -232,6 +211,34 @@ func (s *BlockScheduler) HandleGetJob(ctx context.Context) (*types.Job, bool, er
232
211
}
233
212
}
234
213
214
+ // if added is true, the job was added to the queue, otherwise status is the current status of the job
215
+ func (s * BlockScheduler ) idempotentEnqueue (job * JobWithMetadata ) (added bool , status types.JobStatus , err error ) {
216
+ logger := log .With (
217
+ s .logger ,
218
+ "job" , job .Job .ID (),
219
+ "priority" , job .Priority ,
220
+ )
221
+
222
+ status , ok := s .queue .Exists (job .Job )
223
+
224
+ // scheduler is unaware of incoming job; enqueue
225
+ if ! ok {
226
+ level .Debug (logger ).Log (
227
+ "msg" , "job does not exist, enqueueing" ,
228
+ )
229
+
230
+ // enqueue
231
+ if err := s .queue .Enqueue (job .Job , job .Priority ); err != nil {
232
+ level .Error (logger ).Log ("msg" , "failed to enqueue job" , "err" , err )
233
+ return false , types .JobStatusUnknown , err
234
+ }
235
+
236
+ return true , types .JobStatusPending , nil
237
+ }
238
+
239
+ return false , status , nil
240
+ }
241
+
235
242
func (s * BlockScheduler ) HandleCompleteJob (ctx context.Context , job * types.Job , success bool ) (err error ) {
236
243
logger := log .With (s .logger , "job" , job .ID ())
237
244
@@ -243,6 +250,23 @@ func (s *BlockScheduler) HandleCompleteJob(ctx context.Context, job *types.Job,
243
250
); err == nil {
244
251
s .queue .MarkComplete (job .ID (), types .JobStatusComplete )
245
252
level .Info (logger ).Log ("msg" , "job completed successfully" )
253
+
254
+ // TODO(owen-d): cleaner way to enqueue next job for this partition,
255
+ // don't make it part of the response cycle to job completion, etc.
256
+ jobs , err := s .planner .Plan (ctx , 1 )
257
+ if err != nil {
258
+ level .Error (logger ).Log ("msg" , "failed to plan subsequent jobs" , "err" , err )
259
+ }
260
+
261
+ // find first job for this partition
262
+ nextJob := sort .Search (len (jobs ), func (i int ) bool {
263
+ return jobs [i ].Job .Partition () >= job .Partition ()
264
+ })
265
+
266
+ if nextJob < len (jobs ) && jobs [nextJob ].Job .Partition () == job .Partition () {
267
+ _ , _ , _ = s .idempotentEnqueue (jobs [nextJob ])
268
+ }
269
+
246
270
return nil
247
271
}
248
272
0 commit comments