Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lightning: generate and send region job more smoothly (#42780) #43035

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions br/pkg/lightning/backend/local/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,11 @@ go_test(
"//br/pkg/lightning/backend",
"//br/pkg/lightning/backend/kv",
"//br/pkg/lightning/common",
<<<<<<< HEAD
"//br/pkg/lightning/glue",
=======
"//br/pkg/lightning/config",
>>>>>>> 44aa4cf3cd1 (lightning: generate and send region job more smoothly (#42780))
"//br/pkg/lightning/log",
"//br/pkg/lightning/mydump",
"//br/pkg/membuf",
Expand Down
81 changes: 7 additions & 74 deletions br/pkg/lightning/backend/local/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,14 +106,13 @@ type Engine struct {
// flush and ingest sst hold the rlock, other operation hold the wlock.
mutex sync.RWMutex

ctx context.Context
cancel context.CancelFunc
sstDir string
sstMetasChan chan metaOrFlush
ingestErr common.OnceError
wg sync.WaitGroup
sstIngester sstIngester
finishedRanges syncedRanges
ctx context.Context
cancel context.CancelFunc
sstDir string
sstMetasChan chan metaOrFlush
ingestErr common.OnceError
wg sync.WaitGroup
sstIngester sstIngester

// sst seq lock
seqLock sync.Mutex
Expand Down Expand Up @@ -907,72 +906,6 @@ func (e *Engine) loadEngineMeta() error {
return nil
}

// sortAndMergeRanges sort the ranges and merge range that overlaps with each other into a single range.
func sortAndMergeRanges(ranges []Range) []Range {
if len(ranges) == 0 {
return ranges
}

slices.SortFunc(ranges, func(i, j Range) bool {
return bytes.Compare(i.start, j.start) < 0
})

curEnd := ranges[0].end
i := 0
for j := 1; j < len(ranges); j++ {
if bytes.Compare(curEnd, ranges[j].start) >= 0 {
if bytes.Compare(curEnd, ranges[j].end) < 0 {
curEnd = ranges[j].end
}
} else {
ranges[i].end = curEnd
i++
ranges[i].start = ranges[j].start
curEnd = ranges[j].end
}
}
ranges[i].end = curEnd
return ranges[:i+1]
}

func filterOverlapRange(ranges []Range, finishedRanges []Range) []Range {
if len(ranges) == 0 || len(finishedRanges) == 0 {
return ranges
}

result := make([]Range, 0)
for _, r := range ranges {
start := r.start
end := r.end
for len(finishedRanges) > 0 && bytes.Compare(finishedRanges[0].start, end) < 0 {
fr := finishedRanges[0]
if bytes.Compare(fr.start, start) > 0 {
result = append(result, Range{start: start, end: fr.start})
}
if bytes.Compare(fr.end, start) > 0 {
start = fr.end
}
if bytes.Compare(fr.end, end) > 0 {
break
}
finishedRanges = finishedRanges[1:]
}
if bytes.Compare(start, end) < 0 {
result = append(result, Range{start: start, end: end})
}
}
return result
}

func (e *Engine) unfinishedRanges(ranges []Range) []Range {
e.finishedRanges.Lock()
defer e.finishedRanges.Unlock()

e.finishedRanges.ranges = sortAndMergeRanges(e.finishedRanges.ranges)

return filterOverlapRange(ranges, e.finishedRanges.ranges)
}

func (e *Engine) newKVIter(ctx context.Context, opts *pebble.IterOptions) Iter {
if bytes.Compare(opts.LowerBound, normalIterStartKey) < 0 {
newOpts := *opts
Expand Down
Loading