Skip to content

Commit

Permalink
feat(repair): fallback to no-batching token ranges on error or being …
Browse files Browse the repository at this point in the history
…out of window

This is a safety valve ensuring that repair works fine even when it encounters many errors or can't progress because of a tight window.
  • Loading branch information
Michal-Leszczynski authored and karol-kokoszka committed Jun 20, 2024
1 parent 08e1824 commit 79eaa58
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 9 deletions.
21 changes: 12 additions & 9 deletions pkg/service/repair/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type generatorTools struct {
submitter submitter[job, jobResult]
ringDescriber scyllaclient.RingDescriber
stop *atomic.Bool
batching bool
logger log.Logger
}

Expand Down Expand Up @@ -284,15 +285,17 @@ func (tg *tableGenerator) newJob() (job, bool) {
}

func (tg *tableGenerator) getRangesToRepair(allRanges []scyllaclient.TokenRange, intensity int) []scyllaclient.TokenRange {
// Sending batched ranges in a single job results in better shard utilization.
// With intensity=10, normally SM would just send a job consisting of 10 ranges.
// It might happen that repairing 1 range takes more time than repairing the remaining 9.
// Then SM would be waiting for a repair job which repairs only 1 range,
// when given replica set could be repairing 9 additional ranges at the same time.
// Because of that, we send all ranges (limited to 1000 for safety) owned by given replica set per repair job.
// Controlling intensity happens by ranges_parallelism repair param.
const limit = 1000
intensity = min(len(allRanges), limit)
if tg.batching {
// Sending batched ranges in a single job results in better shard utilization.
// With intensity=10, normally SM would just send a job consisting of 10 ranges.
// It might happen that repairing 1 range takes more time than repairing the remaining 9.
// Then SM would be waiting for a repair job which repairs only 1 range,
// when given replica set could be repairing 9 additional ranges at the same time.
// Because of that, we send all ranges (limited to 1000 for safety) owned by given replica set per repair job.
// Controlling intensity happens by ranges_parallelism repair param.
const limit = 1000
intensity = min(len(allRanges), limit)
}
if tg.JobType != normalJobType {
intensity = len(allRanges)
}
Expand Down
80 changes: 80 additions & 0 deletions pkg/service/repair/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,15 @@ func (s *Service) Repair(ctx context.Context, clusterID, taskID, runID uuid.UUID
return errors.Wrap(err, "create generator")
}

batching, err := shouldBatchRanges(s.session, clusterID, taskID, runID)
if err != nil {
s.logger.Error(ctx, "Couldn't check if batching token ranges is safe", "error", err)
batching = false
} else {
s.logger.Info(ctx, "Checked if batching token ranges is safe", "result", batching)
}
gen.batching = batching

done := make(chan struct{}, 1)
go func() {
select {
Expand Down Expand Up @@ -452,3 +461,74 @@ func (s *Service) SetParallel(ctx context.Context, clusterID uuid.UUID, parallel
}).ExecRelease()
return errors.Wrap(err, "update db")
}

func shouldBatchRanges(session gocqlx.Session, clusterID, taskID, runID uuid.UUID) (bool, error) {
prevIDs, err := getAllPrevRunIDs(session, clusterID, taskID, runID)
if err != nil {
return false, err
}
if len(prevIDs) == 0 {
return true, nil
}

q := qb.Select(table.SchedulerTaskRun.Name()).Columns(
"status",
).Where(
qb.Eq("cluster_id"),
qb.Eq("type"),
qb.Eq("task_id"),
qb.Eq("id"),
).Query(session)
defer q.Release()

var status string
for _, id := range prevIDs {
err := q.BindMap(qb.M{
"cluster_id": clusterID,
"type": "repair",
"task_id": taskID,
"id": id,
}).Scan(&status)
if err != nil {
return false, errors.Wrap(err, "get prev run status")
}
// Fall back to no-batching when some of the previous runs:
// - finished with error
// - got out of scheduler window
if status == "WAITING" || status == "ERROR" {
return false, nil
}
}

return true, nil
}

func getAllPrevRunIDs(session gocqlx.Session, clusterID, taskID, runID uuid.UUID) ([]uuid.UUID, error) {
q := qb.Select(table.RepairRun.Name()).Columns(
"prev_id",
).Where(
qb.Eq("cluster_id"),
qb.Eq("task_id"),
qb.Eq("id"),
).Query(session)
defer q.Release()

var out []uuid.UUID
var prevID uuid.UUID
for {
err := q.BindMap(qb.M{
"cluster_id": clusterID,
"task_id": taskID,
"id": runID,
}).Scan(&prevID)
if err != nil {
return nil, errors.Wrap(err, "get prev run id")
}
if prevID == uuid.Nil {
return out, nil
}

out = append(out, prevID)
runID = prevID
}
}

0 comments on commit 79eaa58

Please sign in to comment.