Skip to content

Commit

Permalink
global sort: raise maximum overlapping threshold (pingcap#50264)
Browse files Browse the repository at this point in the history
  • Loading branch information
lance6716 authored Jan 22, 2024
1 parent 0fc0084 commit 25afff4
Show file tree
Hide file tree
Showing 17 changed files with 365 additions and 76 deletions.
1 change: 1 addition & 0 deletions br/pkg/lightning/backend/external/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ go_test(
deps = [
"//br/pkg/lightning/backend/kv",
"//br/pkg/lightning/common",
"//br/pkg/lightning/config",
"//br/pkg/lightning/log",
"//br/pkg/membuf",
"//br/pkg/storage",
Expand Down
50 changes: 40 additions & 10 deletions br/pkg/lightning/backend/external/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,35 @@ type memKVsAndBuffers struct {
values [][]byte
memKVBuffers []*membuf.Buffer
size int
droppedSize int

// temporary fields to store KVs to reduce slice allocations.
keysPerFile [][][]byte
valuesPerFile [][][]byte
droppedSizePerFile []int
}

func (b *memKVsAndBuffers) build() {
sumKVCnt := 0
for _, keys := range b.keysPerFile {
sumKVCnt += len(keys)
}
b.keys = make([][]byte, 0, sumKVCnt)
b.values = make([][]byte, 0, sumKVCnt)
for i := range b.keysPerFile {
b.keys = append(b.keys, b.keysPerFile[i]...)
b.keysPerFile[i] = nil
b.values = append(b.values, b.valuesPerFile[i]...)
b.valuesPerFile[i] = nil
}
b.keysPerFile = nil
b.valuesPerFile = nil

b.droppedSize = 0
for _, size := range b.droppedSizePerFile {
b.droppedSize += size
}
b.droppedSizePerFile = nil
}

// Engine stored sorted key/value pairs in an external storage.
Expand Down Expand Up @@ -182,14 +211,11 @@ func getFilesReadConcurrency(
startKey, endKey []byte,
) ([]uint64, []uint64, error) {
result := make([]uint64, len(statsFiles))
startOffs, err := seekPropsOffsets(ctx, startKey, statsFiles, storage, false)
if err != nil {
return nil, nil, err
}
endOffs, err := seekPropsOffsets(ctx, endKey, statsFiles, storage, false)
offsets, err := seekPropsOffsets(ctx, []kv.Key{startKey, endKey}, statsFiles, storage, false)
if err != nil {
return nil, nil, err
}
startOffs, endOffs := offsets[0], offsets[1]
for i := range statsFiles {
result[i] = (endOffs[i] - startOffs[i]) / uint64(ConcurrentReaderBufferSizePerConc)
result[i] = max(result[i], 1)
Expand Down Expand Up @@ -225,10 +251,14 @@ func (e *Engine) loadBatchRegionData(ctx context.Context, startKey, endKey []byt
if err != nil {
return err
}
e.memKVsAndBuffers.build()

readSecond := time.Since(readStart).Seconds()
readDurHist.Observe(readSecond)
logutil.Logger(ctx).Info("reading external storage in loadBatchRegionData",
zap.Duration("cost time", time.Since(readStart)))
zap.Duration("cost time", time.Since(readStart)),
zap.Int("droppedSize", e.memKVsAndBuffers.droppedSize))

sortStart := time.Now()
oldSortyGor := sorty.MaxGor
sorty.MaxGor = uint64(e.workerConcurrency * 2)
Expand Down Expand Up @@ -293,9 +323,9 @@ func (e *Engine) LoadIngestData(
regionRanges []common.Range,
outCh chan<- common.DataAndRange,
) error {
// currently we assume the region size is 96MB and will download 96MB*40 = 3.8GB
// currently we assume the region size is 96MB and will download 96MB*32 = 3GB
// data at once
regionBatchSize := 40
regionBatchSize := 32
failpoint.Inject("LoadIngestDataBatchSize", func(val failpoint.Value) {
regionBatchSize = val.(int)
})
Expand Down Expand Up @@ -338,11 +368,11 @@ func (e *Engine) createMergeIter(ctx context.Context, start kv.Key) (*MergeKVIte
logger.Info("no stats files",
zap.String("startKey", hex.EncodeToString(start)))
} else {
offs, err := seekPropsOffsets(ctx, start, e.statsFiles, e.storage, e.checkHotspot)
offs, err := seekPropsOffsets(ctx, []kv.Key{start}, e.statsFiles, e.storage, e.checkHotspot)
if err != nil {
return nil, errors.Trace(err)
}
offsets = offs
offsets = offs[0]
logger.Debug("seek props offsets",
zap.Uint64s("offsets", offsets),
zap.String("startKey", hex.EncodeToString(start)),
Expand Down
12 changes: 7 additions & 5 deletions br/pkg/lightning/backend/external/iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -653,7 +653,7 @@ func newMergePropBaseIter(
// we are rely on the caller have reduced the overall overlapping to less than
// MergeSortOverlapThreshold for []MultipleFilesStat. And we are going to open
// about 8000 connection to read files.
preOpenLimit := limit * 8
preOpenLimit := limit * 2
preOpenLimit = min(preOpenLimit, int64(len(multiStat.Filenames)))
preOpenCh := make(chan chan readerAndError, preOpenLimit-limit)
closeCh := make(chan struct{})
Expand All @@ -672,7 +672,7 @@ func newMergePropBaseIter(
go func() {
defer close(asyncTask)
defer wg.Done()
rd, err := newStatsReader(ctx, exStorage, path, 500*1024)
rd, err := newStatsReader(ctx, exStorage, path, 250*1024)
select {
case <-closeCh:
_ = rd.Close()
Expand All @@ -690,10 +690,12 @@ func newMergePropBaseIter(
if !ok {
continue
}
_ = t.r.close()
if t.err == nil {
_ = t.r.close()
}
}
t, ok := <-asyncTask
if ok {
if ok && t.err == nil {
_ = t.r.close()
}
return
Expand All @@ -707,7 +709,7 @@ func newMergePropBaseIter(
for i := 0; i < int(limit); i++ {
path := multiStat.Filenames[i][1]
readerOpeners = append(readerOpeners, func() (*statReaderProxy, error) {
rd, err := newStatsReader(ctx, exStorage, path, 500*1024)
rd, err := newStatsReader(ctx, exStorage, path, 250*1024)
if err != nil {
return nil, err
}
Expand Down
1 change: 1 addition & 0 deletions br/pkg/lightning/backend/external/merge_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ func MergeOverlappingFilesV2(
logutil.Logger(ctx).Warn("read all data failed", zap.Error(err1))
return
}
loaded.build()
readTime := time.Since(now)
now = time.Now()
sorty.MaxGor = uint64(concurrency)
Expand Down
18 changes: 15 additions & 3 deletions br/pkg/lightning/backend/external/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,14 @@ func readAllData(
)
defer func() {
task.End(zap.ErrorLevel, err)
if err != nil {
output.keysPerFile = nil
output.valuesPerFile = nil
for _, b := range output.memKVBuffers {
b.Destroy()
}
output.memKVBuffers = nil
}
}()

concurrences, startOffsets, err := getFilesReadConcurrency(
Expand All @@ -67,7 +75,8 @@ func readAllData(
return err
}
eg, egCtx := util.NewErrorGroupWithRecoverWithCtx(ctx)
// TODO(lance6716): limit the concurrency of eg to 30 does not help
// limit the concurrency to avoid open too many connections at the same time
eg.SetLimit(1000)
for i := range dataFiles {
i := i
eg.Go(func() error {
Expand Down Expand Up @@ -126,6 +135,7 @@ func readOneFile(
keys := make([][]byte, 0, 1024)
values := make([][]byte, 0, 1024)
size := 0
droppedSize := 0

for {
k, v, err := rd.nextKV()
Expand All @@ -136,6 +146,7 @@ func readOneFile(
return err
}
if bytes.Compare(k, startKey) < 0 {
droppedSize += len(k) + len(v)
continue
}
if bytes.Compare(k, endKey) >= 0 {
Expand All @@ -149,10 +160,11 @@ func readOneFile(
}
readAndSortDurHist.Observe(time.Since(ts).Seconds())
output.mu.Lock()
output.keys = append(output.keys, keys...)
output.values = append(output.values, values...)
output.keysPerFile = append(output.keysPerFile, keys)
output.valuesPerFile = append(output.valuesPerFile, values)
output.memKVBuffers = append(output.memKVBuffers, memBuf)
output.size += size
output.droppedSizePerFile = append(output.droppedSizePerFile, droppedSize)
output.mu.Unlock()
return nil
}
151 changes: 151 additions & 0 deletions br/pkg/lightning/backend/external/split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,21 @@ import (
"context"
"fmt"
"math"
"net/http"
_ "net/http/pprof"
"slices"
"testing"
"time"

"github.com/docker/go-units"
"github.com/google/uuid"
"github.com/pingcap/tidb/br/pkg/lightning/config"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/util/size"
"github.com/stretchr/testify/require"
"golang.org/x/exp/rand"
"golang.org/x/sync/errgroup"
)

func TestGeneralProperties(t *testing.T) {
Expand Down Expand Up @@ -351,3 +359,146 @@ func TestExactlyKeyNum(t *testing.T) {
require.Equal(t, statFiles, splitStatFiles)
require.Equal(t, [][]byte{[]byte("key001"), []byte("key002")}, splitKeys)
}

func Test3KFilesRangeSplitter(t *testing.T) {
store := openTestingStorage(t)
ctx := context.Background()

// use HTTP pprof to debug
go func() {
http.ListenAndServe("localhost:6060", nil)
}()

// test the case that after one round merge step, we have 3000 stat files. In
// current merge step parameters, we will merge 4000 files of 256MB into 16
// files, so we directly write 4000*256MB/16 = 64GB data to onefile writer.
fileNum := 3000
statCh := make(chan []MultipleFilesStat, fileNum)
onClose := func(s *WriterSummary) {
statCh <- s.MultipleFilesStats
}

eg := errgroup.Group{}
eg.SetLimit(30)
for i := 0; i < fileNum; i++ {
i := i
eg.Go(func() error {
w := NewWriterBuilder().
SetMemorySizeLimit(DefaultMemSizeLimit).
SetBlockSize(32*units.MiB). // dataKVGroupBlockSize
SetWriterBatchCount(8*1024).
SetPropKeysDistance(8*1024).
SetPropSizeDistance(size.MB).
SetOnCloseFunc(onClose).
BuildOneFile(store, "/mock-test", uuid.New().String())
err := w.Init(ctx, int64(5*size.MB))
require.NoError(t, err)
// we don't need data files
err = w.dataWriter.Close(ctx)
require.NoError(t, err)
w.dataWriter = storage.NoopWriter{}

kvSize := 20 * size.KB
keySize := size.KB
key := make([]byte, keySize)
key[keySize-1] = byte(i % 256)
key[keySize-2] = byte(i / 256)
minKey := slices.Clone(key)
var maxKey []byte

memSize := uint64(0)
for j := 0; j < int(64*size.GB/kvSize); j++ {

// copied from OneFileWriter.WriteRow

if memSize >= DefaultMemSizeLimit {
memSize = 0
w.kvStore.Close()
encodedStat := w.rc.encode()
_, err := w.statWriter.Write(ctx, encodedStat)
if err != nil {
return err
}
w.rc.reset()
// the new prop should have the same offset with kvStore.
w.rc.currProp.offset = w.kvStore.offset
}
if len(w.rc.currProp.firstKey) == 0 {
w.rc.currProp.firstKey = key
}
w.rc.currProp.lastKey = key

memSize += kvSize
w.totalSize += kvSize
w.rc.currProp.size += kvSize - 2*lengthBytes
w.rc.currProp.keys++

if w.rc.currProp.size >= w.rc.propSizeDist ||
w.rc.currProp.keys >= w.rc.propKeysDist {
newProp := *w.rc.currProp
w.rc.props = append(w.rc.props, &newProp)
// reset currProp, and start to update this prop.
w.rc.currProp.firstKey = nil
w.rc.currProp.offset = memSize
w.rc.currProp.keys = 0
w.rc.currProp.size = 0
}

if j == int(64*size.GB/kvSize)-1 {
maxKey = slices.Clone(key)
}

// increase the key

for k := keySize - 3; k >= 0; k-- {
key[k]++
if key[k] != 0 {
break
}
}
}

// copied from mergeOverlappingFilesInternal
var stat MultipleFilesStat
stat.Filenames = append(stat.Filenames,
[2]string{w.dataFile, w.statFile})
stat.build([]kv.Key{minKey}, []kv.Key{maxKey})
statCh <- []MultipleFilesStat{stat}
return w.Close(ctx)
})
}

require.NoError(t, eg.Wait())

multiStat := make([]MultipleFilesStat, 0, fileNum)
for i := 0; i < fileNum; i++ {
multiStat = append(multiStat, <-statCh...)
}
splitter, err := NewRangeSplitter(
ctx,
multiStat,
store,
int64(config.DefaultBatchSize),
int64(math.MaxInt64),
int64(config.SplitRegionSize),
int64(config.SplitRegionKeys),
false,
)
require.NoError(t, err)
var lastEndKey []byte
for {
endKey, _, statFiles, _, err := splitter.SplitOneRangesGroup()
require.NoError(t, err)
require.Greater(t, len(statFiles), 0)
if endKey == nil {
break
}
if lastEndKey != nil {
cmp := bytes.Compare(endKey, lastEndKey)
require.Equal(t, 1, cmp, "endKey: %v, lastEndKey: %v", endKey, lastEndKey)
}
lastEndKey = slices.Clone(endKey)
}
err = splitter.Close()
require.NoError(t, err)
}
2 changes: 1 addition & 1 deletion br/pkg/lightning/backend/external/stat_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type statsReader struct {
}

func newStatsReader(ctx context.Context, store storage.ExternalStorage, name string, bufSize int) (*statsReader, error) {
sr, err := openStoreReaderAndSeek(ctx, store, name, 0, 0)
sr, err := openStoreReaderAndSeek(ctx, store, name, 0, 250*1024)
if err != nil {
return nil, err
}
Expand Down
3 changes: 2 additions & 1 deletion br/pkg/lightning/backend/external/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,9 @@ func testReadAndCompare(
bufPool,
loaded,
)

require.NoError(t, err)
loaded.build()

// check kvs sorted
sorty.MaxGor = uint64(8)
sorty.Sort(len(loaded.keys), func(i, k, r, s int) bool {
Expand Down
Loading

0 comments on commit 25afff4

Please sign in to comment.