Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

Commit

Permalink
lightning: fix the bug that calculate unfinished ranges may miss some…
Browse files Browse the repository at this point in the history
… range (#1413) (#1416)
  • Loading branch information
ti-chi-bot authored Sep 13, 2021
1 parent 885b217 commit adff4c1
Showing 1 changed file with 29 additions and 33 deletions.
62 changes: 29 additions & 33 deletions pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -1862,13 +1862,18 @@ func (local *local) writeAndIngestByRanges(ctx context.Context, engineFile *File
var allErrLock sync.Mutex
var allErr error
var wg sync.WaitGroup

wg.Add(len(ranges))
metErr := atomic.NewBool(false)

for _, r := range ranges {
startKey := r.start
endKey := r.end
w := local.rangeConcurrency.Apply()
// if meet error here, skip try more here to allow fail fast.
if metErr.Load() {
local.rangeConcurrency.Recycle(w)
break
}
wg.Add(1)
go func(w *worker.Worker) {
defer func() {
local.rangeConcurrency.Recycle(w)
Expand All @@ -1895,6 +1900,9 @@ func (local *local) writeAndIngestByRanges(ctx context.Context, engineFile *File
allErrLock.Lock()
allErr = multierr.Append(allErr, err)
allErrLock.Unlock()
if err != nil {
metErr.Store(true)
}
}(w)
}

Expand Down Expand Up @@ -2021,43 +2029,31 @@ func sortAndMergeRanges(ranges []Range) []Range {
}

func filterOverlapRange(ranges []Range, finishedRanges []Range) []Range {
if len(finishedRanges) == 0 {
if len(ranges) == 0 || len(finishedRanges) == 0 {
return ranges
}

result := make([]Range, 0, len(ranges))
rIdx := 0
fIdx := 0
for rIdx < len(ranges) && fIdx < len(finishedRanges) {
if bytes.Compare(ranges[rIdx].end, finishedRanges[fIdx].start) <= 0 {
result = append(result, ranges[rIdx])
rIdx++
} else if bytes.Compare(ranges[rIdx].start, finishedRanges[fIdx].end) >= 0 {
fIdx++
} else if bytes.Compare(ranges[rIdx].start, finishedRanges[fIdx].start) < 0 {
result = append(result, Range{start: ranges[rIdx].start, end: finishedRanges[fIdx].start})
switch bytes.Compare(ranges[rIdx].end, finishedRanges[fIdx].end) {
case -1:
rIdx++
case 0:
rIdx++
fIdx++
case 1:
ranges[rIdx].start = finishedRanges[fIdx].end
fIdx++
result := make([]Range, 0)
for _, r := range ranges {
start := r.start
end := r.end
for len(finishedRanges) > 0 && bytes.Compare(finishedRanges[0].start, end) < 0 {
fr := finishedRanges[0]
if bytes.Compare(fr.start, start) > 0 {
result = append(result, Range{start: start, end: fr.start})
}
} else if bytes.Compare(ranges[rIdx].end, finishedRanges[fIdx].end) > 0 {
ranges[rIdx].start = finishedRanges[fIdx].end
fIdx++
} else {
rIdx++
if bytes.Compare(fr.end, start) > 0 {
start = fr.end
}
if bytes.Compare(fr.end, end) > 0 {
break
}
finishedRanges = finishedRanges[1:]
}
if bytes.Compare(start, end) < 0 {
result = append(result, Range{start: start, end: end})
}
}

if rIdx < len(ranges) {
result = append(result, ranges[rIdx:]...)
}

return result
}

Expand Down

0 comments on commit adff4c1

Please sign in to comment.