From a011ed72482f1466e3260ca9413977d0a3152207 Mon Sep 17 00:00:00 2001 From: ywqzzy <592838129@qq.com> Date: Wed, 13 Dec 2023 15:31:53 +0800 Subject: [PATCH 1/6] tmp save --- .../lightning/backend/external/sort_test.go | 21 +++++++++++++++++++ 1 file changed, 21 insertions(+) create mode 100644 br/pkg/lightning/backend/external/sort_test.go diff --git a/br/pkg/lightning/backend/external/sort_test.go b/br/pkg/lightning/backend/external/sort_test.go new file mode 100644 index 0000000000000..90ac254e3856a --- /dev/null +++ b/br/pkg/lightning/backend/external/sort_test.go @@ -0,0 +1,21 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package external + +import "testing" + +func TestGlobalSortLocalBasic(t *testing.T) { + +} From bea008dd3e85ee29e6e401f93e8daf6b622de1d6 Mon Sep 17 00:00:00 2001 From: ywqzzy <592838129@qq.com> Date: Wed, 13 Dec 2023 16:16:03 +0800 Subject: [PATCH 2/6] add whole process test --- .../lightning/backend/external/sort_test.go | 299 +++++++++++++++++- 1 file changed, 298 insertions(+), 1 deletion(-) diff --git a/br/pkg/lightning/backend/external/sort_test.go b/br/pkg/lightning/backend/external/sort_test.go index 90ac254e3856a..5f55104b84ae5 100644 --- a/br/pkg/lightning/backend/external/sort_test.go +++ b/br/pkg/lightning/backend/external/sort_test.go @@ -14,8 +14,305 @@ package external -import "testing" +import ( + "bytes" + "context" + "fmt" + "math" + "testing" + "time" + + "github.com/jfcg/sorty/v2" + "github.com/pingcap/tidb/br/pkg/lightning/backend/kv" + "github.com/pingcap/tidb/br/pkg/lightning/common" + "github.com/pingcap/tidb/br/pkg/membuf" + "github.com/pingcap/tidb/br/pkg/storage" + dbkv "github.com/pingcap/tidb/pkg/kv" + "github.com/pingcap/tidb/pkg/util/size" + "github.com/stretchr/testify/require" + "golang.org/x/exp/rand" +) + +// like scheduler code in add index and import into. +func splitDataAndStatFiles(datas []string, stats []string) ([][]string, [][]string) { + dataGroup := make([][]string, 0, 10) + statGroup := make([][]string, 0, 10) + + start := 0 + step := 10 + for start < len(datas) { + end := start + step + if end > len(datas) { + end = len(datas) + } + dataGroup = append(dataGroup, datas[start:end]) + statGroup = append(statGroup, stats[start:end]) + start = end + } + return dataGroup, statGroup +} func TestGlobalSortLocalBasic(t *testing.T) { + // 1. write data step + seed := time.Now().Unix() + rand.Seed(uint64(seed)) + t.Logf("seed: %d", seed) + ctx := context.Background() + memStore := storage.NewMemStorage() + memSizeLimit := (rand.Intn(10) + 1) * 400 + lastStepDatas := make([]string, 0, 10) + lastStepStats := make([]string, 0, 10) + var startKey, endKey dbkv.Key + + closeFn := func(s *WriterSummary) { + for _, stat := range s.MultipleFilesStats { + for i := range stat.Filenames { + lastStepDatas = append(lastStepDatas, stat.Filenames[i][0]) + lastStepStats = append(lastStepStats, stat.Filenames[i][1]) + } + } + if len(startKey) == 0 && len(endKey) == 0 { + startKey = s.Min.Clone() + endKey = s.Max.Clone().Next() + } + startKey = BytesMin(startKey, s.Min.Clone()) + endKey = BytesMax(endKey, s.Max.Clone().Next()) + } + + w := NewWriterBuilder(). + SetPropSizeDistance(100). + SetPropKeysDistance(2). + SetMemorySizeLimit(uint64(memSizeLimit)). + SetBlockSize(memSizeLimit). + SetOnCloseFunc(closeFn). + Build(memStore, "/test", "0") + + writer := NewEngineWriter(w) + kvCnt := rand.Intn(10) + 10000 + kvs := make([]common.KvPair, kvCnt) + for i := 0; i < kvCnt; i++ { + kvs[i] = common.KvPair{ + Key: []byte(fmt.Sprintf("key%05d", i)), + Val: []byte("56789"), + } + } + + require.NoError(t, writer.AppendRows(ctx, nil, kv.MakeRowsFromKvPairs(kvs))) + _, err := writer.Close(ctx) + require.NoError(t, err) + + // 2. read and sort step + splitter, err := NewRangeSplitter( + ctx, + lastStepDatas, + lastStepStats, + memStore, + int64(memSizeLimit), // make the group small for testing + math.MaxInt64, + 4*1024*1024*1024, + math.MaxInt64, + true, + ) + require.NoError(t, err) + + bufPool := membuf.NewPool() + loaded := &memKVsAndBuffers{} + kvIdx := 0 + + for { + endKeyOfGroup, dataFilesOfGroup, statFilesOfGroup, _, err := splitter.SplitOneRangesGroup() + require.NoError(t, err) + curEnd := endKeyOfGroup + if len(endKeyOfGroup) == 0 { + curEnd = dbkv.Key(kvs[len(kvs)-1].Key).Next() + } + + err = readAllData( + ctx, + memStore, + dataFilesOfGroup, + statFilesOfGroup, + startKey, + curEnd, + bufPool, + loaded, + ) + + require.NoError(t, err) + // check kvs sorted + sorty.MaxGor = uint64(8) + sorty.Sort(len(loaded.keys), func(i, k, r, s int) bool { + if bytes.Compare(loaded.keys[i], loaded.keys[k]) < 0 { // strict comparator like < or > + if r != s { + loaded.keys[r], loaded.keys[s] = loaded.keys[s], loaded.keys[r] + loaded.values[r], loaded.values[s] = loaded.values[s], loaded.values[r] + } + return true + } + return false + }) + for i, key := range loaded.keys { + require.EqualValues(t, kvs[kvIdx].Key, key) + require.EqualValues(t, kvs[kvIdx].Val, loaded.values[i]) + kvIdx++ + } + + // release + loaded.keys = nil + loaded.values = nil + loaded.memKVBuffers = nil + copy(startKey, curEnd) + + if len(endKeyOfGroup) == 0 { + break + } + } + + err = splitter.Close() + require.NoError(t, err) +} + +func TestGlobalSortLocalWithMerge(t *testing.T) { + // 1. write data step + seed := time.Now().Unix() + rand.Seed(uint64(seed)) + t.Logf("seed: %d", seed) + ctx := context.Background() + memStore := storage.NewMemStorage() + memSizeLimit := (rand.Intn(10) + 1) * 400 + + w := NewWriterBuilder(). + SetPropSizeDistance(100). + SetPropKeysDistance(2). + SetMemorySizeLimit(uint64(memSizeLimit)). + SetBlockSize(memSizeLimit). + Build(memStore, "/test", "0") + + writer := NewEngineWriter(w) + kvCnt := rand.Intn(10) + 10000 + kvs := make([]common.KvPair, kvCnt) + for i := 0; i < kvCnt; i++ { + kvs[i] = common.KvPair{ + Key: []byte(fmt.Sprintf("key%05d", i)), + Val: []byte("56789"), + } + } + + require.NoError(t, writer.AppendRows(ctx, nil, kv.MakeRowsFromKvPairs(kvs))) + _, err := writer.Close(ctx) + require.NoError(t, err) + // 2. merge step + + datas, stats, err := GetAllFileNames(ctx, memStore, "") + require.NoError(t, err) + + dataGroup, _ := splitDataAndStatFiles(datas, stats) + + lastStepDatas := make([]string, 0, 10) + lastStepStats := make([]string, 0, 10) + var startKey, endKey dbkv.Key + + closeFn := func(s *WriterSummary) { + for _, stat := range s.MultipleFilesStats { + for i := range stat.Filenames { + lastStepDatas = append(lastStepDatas, stat.Filenames[i][0]) + lastStepStats = append(lastStepStats, stat.Filenames[i][1]) + } + + } + if len(startKey) == 0 && len(endKey) == 0 { + startKey = s.Min.Clone() + endKey = s.Max.Clone().Next() + } + startKey = BytesMin(startKey, s.Min.Clone()) + endKey = BytesMax(endKey, s.Max.Clone().Next()) + } + + for _, group := range dataGroup { + MergeOverlappingFiles( + ctx, + group, + memStore, + int64(5*size.MB), + 100, + "/test2", + 52, + 8*1024, + 100, + 2, + closeFn, + 1, + true, + ) + } + + // 3. read and sort step + splitter, err := NewRangeSplitter( + ctx, + datas, + stats, + memStore, + int64(memSizeLimit), // make the group small for testing + math.MaxInt64, + 4*1024*1024*1024, + math.MaxInt64, + true, + ) + require.NoError(t, err) + + bufPool := membuf.NewPool() + loaded := &memKVsAndBuffers{} + kvIdx := 0 + + for { + endKeyOfGroup, dataFilesOfGroup, statFilesOfGroup, _, err := splitter.SplitOneRangesGroup() + require.NoError(t, err) + curEnd := endKeyOfGroup + if len(endKeyOfGroup) == 0 { + curEnd = dbkv.Key(kvs[len(kvs)-1].Key).Next() + } + + err = readAllData( + ctx, + memStore, + dataFilesOfGroup, + statFilesOfGroup, + startKey, + curEnd, + bufPool, + loaded, + ) + + require.NoError(t, err) + // check kvs sorted + sorty.MaxGor = uint64(8) + sorty.Sort(len(loaded.keys), func(i, k, r, s int) bool { + if bytes.Compare(loaded.keys[i], loaded.keys[k]) < 0 { // strict comparator like < or > + if r != s { + loaded.keys[r], loaded.keys[s] = loaded.keys[s], loaded.keys[r] + loaded.values[r], loaded.values[s] = loaded.values[s], loaded.values[r] + } + return true + } + return false + }) + for i, key := range loaded.keys { + require.EqualValues(t, kvs[kvIdx].Key, key) + require.EqualValues(t, kvs[kvIdx].Val, loaded.values[i]) + kvIdx++ + } + + // release + loaded.keys = nil + loaded.values = nil + loaded.memKVBuffers = nil + copy(startKey, curEnd) + + if len(endKeyOfGroup) == 0 { + break + } + } + err = splitter.Close() + require.NoError(t, err) } From 4aba59f31f30b519bb757643e8759bac831a29de Mon Sep 17 00:00:00 2001 From: ywqzzy <592838129@qq.com> Date: Wed, 13 Dec 2023 17:02:13 +0800 Subject: [PATCH 3/6] update --- br/pkg/lightning/backend/external/BUILD.bazel | 2 ++ .../lightning/backend/external/sort_test.go | 36 +++++++++++-------- 2 files changed, 24 insertions(+), 14 deletions(-) diff --git a/br/pkg/lightning/backend/external/BUILD.bazel b/br/pkg/lightning/backend/external/BUILD.bazel index 7f0c692eefa33..cb146e46226af 100644 --- a/br/pkg/lightning/backend/external/BUILD.bazel +++ b/br/pkg/lightning/backend/external/BUILD.bazel @@ -58,6 +58,7 @@ go_test( "file_test.go", "iter_test.go", "onefile_writer_test.go", + "sort_test.go", "split_test.go", "util_test.go", "writer_test.go", @@ -81,6 +82,7 @@ go_test( "@com_github_aws_aws_sdk_go//service/s3", "@com_github_cockroachdb_pebble//:pebble", "@com_github_docker_go_units//:go-units", + "@com_github_google_uuid//:uuid", "@com_github_jfcg_sorty_v2//:sorty", "@com_github_johannesboyne_gofakes3//:gofakes3", "@com_github_johannesboyne_gofakes3//backend/s3mem", diff --git a/br/pkg/lightning/backend/external/sort_test.go b/br/pkg/lightning/backend/external/sort_test.go index 5f55104b84ae5..29872aca78a25 100644 --- a/br/pkg/lightning/backend/external/sort_test.go +++ b/br/pkg/lightning/backend/external/sort_test.go @@ -17,11 +17,12 @@ package external import ( "bytes" "context" - "fmt" "math" + "slices" "testing" "time" + "github.com/google/uuid" "github.com/jfcg/sorty/v2" "github.com/pingcap/tidb/br/pkg/lightning/backend/kv" "github.com/pingcap/tidb/br/pkg/lightning/common" @@ -33,7 +34,7 @@ import ( "golang.org/x/exp/rand" ) -// like scheduler code in add index and import into. +// like scheduler code for merge sort step in add index and import into. func splitDataAndStatFiles(datas []string, stats []string) ([][]string, [][]string) { dataGroup := make([][]string, 0, 10) statGroup := make([][]string, 0, 10) @@ -92,10 +93,13 @@ func TestGlobalSortLocalBasic(t *testing.T) { kvs := make([]common.KvPair, kvCnt) for i := 0; i < kvCnt; i++ { kvs[i] = common.KvPair{ - Key: []byte(fmt.Sprintf("key%05d", i)), + Key: []byte(uuid.New().String()), Val: []byte("56789"), } } + slices.SortFunc(kvs, func(i, j common.KvPair) int { + return bytes.Compare(i.Key, j.Key) + }) require.NoError(t, writer.AppendRows(ctx, nil, kv.MakeRowsFromKvPairs(kvs))) _, err := writer.Close(ctx) @@ -122,9 +126,9 @@ func TestGlobalSortLocalBasic(t *testing.T) { for { endKeyOfGroup, dataFilesOfGroup, statFilesOfGroup, _, err := splitter.SplitOneRangesGroup() require.NoError(t, err) - curEnd := endKeyOfGroup + curEnd := dbkv.Key(endKeyOfGroup).Clone() if len(endKeyOfGroup) == 0 { - curEnd = dbkv.Key(kvs[len(kvs)-1].Key).Next() + curEnd = dbkv.Key(kvs[len(kvs)-1].Key).Next().Clone() } err = readAllData( @@ -161,7 +165,7 @@ func TestGlobalSortLocalBasic(t *testing.T) { loaded.keys = nil loaded.values = nil loaded.memKVBuffers = nil - copy(startKey, curEnd) + startKey = curEnd.Clone() if len(endKeyOfGroup) == 0 { break @@ -193,16 +197,20 @@ func TestGlobalSortLocalWithMerge(t *testing.T) { kvs := make([]common.KvPair, kvCnt) for i := 0; i < kvCnt; i++ { kvs[i] = common.KvPair{ - Key: []byte(fmt.Sprintf("key%05d", i)), + Key: []byte(uuid.New().String()), Val: []byte("56789"), } } + slices.SortFunc(kvs, func(i, j common.KvPair) int { + return bytes.Compare(i.Key, j.Key) + }) + require.NoError(t, writer.AppendRows(ctx, nil, kv.MakeRowsFromKvPairs(kvs))) _, err := writer.Close(ctx) require.NoError(t, err) - // 2. merge step + // 2. merge step datas, stats, err := GetAllFileNames(ctx, memStore, "") require.NoError(t, err) @@ -236,7 +244,7 @@ func TestGlobalSortLocalWithMerge(t *testing.T) { int64(5*size.MB), 100, "/test2", - 52, + 100, 8*1024, 100, 2, @@ -249,8 +257,8 @@ func TestGlobalSortLocalWithMerge(t *testing.T) { // 3. read and sort step splitter, err := NewRangeSplitter( ctx, - datas, - stats, + lastStepDatas, + lastStepStats, memStore, int64(memSizeLimit), // make the group small for testing math.MaxInt64, @@ -267,9 +275,9 @@ func TestGlobalSortLocalWithMerge(t *testing.T) { for { endKeyOfGroup, dataFilesOfGroup, statFilesOfGroup, _, err := splitter.SplitOneRangesGroup() require.NoError(t, err) - curEnd := endKeyOfGroup + curEnd := dbkv.Key(endKeyOfGroup).Clone() if len(endKeyOfGroup) == 0 { - curEnd = dbkv.Key(kvs[len(kvs)-1].Key).Next() + curEnd = dbkv.Key(kvs[len(kvs)-1].Key).Next().Clone() } err = readAllData( @@ -306,7 +314,7 @@ func TestGlobalSortLocalWithMerge(t *testing.T) { loaded.keys = nil loaded.values = nil loaded.memKVBuffers = nil - copy(startKey, curEnd) + startKey = curEnd.Clone() if len(endKeyOfGroup) == 0 { break From d368e9ded52b7e14c0ee85b79da0fac70ffe2c20 Mon Sep 17 00:00:00 2001 From: ywqzzy <592838129@qq.com> Date: Thu, 14 Dec 2023 10:59:25 +0800 Subject: [PATCH 4/6] add testutil --- br/pkg/lightning/backend/external/BUILD.bazel | 4 +- .../lightning/backend/external/reader_test.go | 91 +---------- .../lightning/backend/external/sort_test.go | 141 +----------------- br/pkg/lightning/backend/external/testutil.go | 109 ++++++++++++++ 4 files changed, 122 insertions(+), 223 deletions(-) create mode 100644 br/pkg/lightning/backend/external/testutil.go diff --git a/br/pkg/lightning/backend/external/BUILD.bazel b/br/pkg/lightning/backend/external/BUILD.bazel index 9cae59ea2f03a..fd540a71a8f2e 100644 --- a/br/pkg/lightning/backend/external/BUILD.bazel +++ b/br/pkg/lightning/backend/external/BUILD.bazel @@ -15,6 +15,7 @@ go_library( "reader.go", "split.go", "stat_reader.go", + "testutil.go", "util.go", "writer.go", ], @@ -40,6 +41,7 @@ go_library( "@com_github_jfcg_sorty_v2//:sorty", "@com_github_pingcap_errors//:errors", "@com_github_pingcap_failpoint//:failpoint", + "@com_github_stretchr_testify//require", "@org_golang_x_sync//errgroup", "@org_uber_go_atomic//:atomic", "@org_uber_go_zap//:zap", @@ -59,8 +61,8 @@ go_test( "file_test.go", "iter_test.go", "onefile_writer_test.go", - "sort_test.go", "reader_test.go", + "sort_test.go", "split_test.go", "util_test.go", "writer_test.go", diff --git a/br/pkg/lightning/backend/external/reader_test.go b/br/pkg/lightning/backend/external/reader_test.go index 05be20abc14ab..1f3f791a31fcb 100644 --- a/br/pkg/lightning/backend/external/reader_test.go +++ b/br/pkg/lightning/backend/external/reader_test.go @@ -18,99 +18,18 @@ import ( "bytes" "context" "fmt" - "math" "slices" "testing" "time" - "github.com/jfcg/sorty/v2" "github.com/pingcap/tidb/br/pkg/lightning/backend/kv" "github.com/pingcap/tidb/br/pkg/lightning/common" - "github.com/pingcap/tidb/br/pkg/membuf" "github.com/pingcap/tidb/br/pkg/storage" - dbkv "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/util/size" "github.com/stretchr/testify/require" "golang.org/x/exp/rand" ) -func testReadAndCompare( - ctx context.Context, - t *testing.T, - kvs []common.KvPair, - store storage.ExternalStorage, - memSizeLimit int) { - datas, stats, err := GetAllFileNames(ctx, store, "") - require.NoError(t, err) - - splitter, err := NewRangeSplitter( - ctx, - datas, - stats, - store, - int64(memSizeLimit), // make the group small for testing - math.MaxInt64, - 4*1024*1024*1024, - math.MaxInt64, - true, - ) - require.NoError(t, err) - - bufPool := membuf.NewPool() - loaded := &memKVsAndBuffers{} - curStart := kvs[0].Key - kvIdx := 0 - - for { - endKeyOfGroup, dataFilesOfGroup, statFilesOfGroup, _, err := splitter.SplitOneRangesGroup() - require.NoError(t, err) - curEnd := endKeyOfGroup - if len(endKeyOfGroup) == 0 { - curEnd = dbkv.Key(kvs[len(kvs)-1].Key).Next() - } - - err = readAllData( - ctx, - store, - dataFilesOfGroup, - statFilesOfGroup, - curStart, - curEnd, - bufPool, - loaded, - ) - - require.NoError(t, err) - // check kvs sorted - sorty.MaxGor = uint64(8) - sorty.Sort(len(loaded.keys), func(i, k, r, s int) bool { - if bytes.Compare(loaded.keys[i], loaded.keys[k]) < 0 { // strict comparator like < or > - if r != s { - loaded.keys[r], loaded.keys[s] = loaded.keys[s], loaded.keys[r] - loaded.values[r], loaded.values[s] = loaded.values[s], loaded.values[r] - } - return true - } - return false - }) - for i, key := range loaded.keys { - require.EqualValues(t, kvs[kvIdx].Key, key) - require.EqualValues(t, kvs[kvIdx].Val, loaded.values[i]) - kvIdx++ - } - - // release - loaded.keys = nil - loaded.values = nil - loaded.memKVBuffers = nil - copy(curStart, curEnd) - - if len(endKeyOfGroup) == 0 { - break - } - } -} - func TestReadAllDataBasic(t *testing.T) { seed := time.Now().Unix() rand.Seed(uint64(seed)) @@ -144,7 +63,10 @@ func TestReadAllDataBasic(t *testing.T) { return bytes.Compare(i.Key, j.Key) }) - testReadAndCompare(ctx, t, kvs, memStore, memSizeLimit) + datas, stats, err := GetAllFileNames(ctx, memStore, "") + require.NoError(t, err) + + testReadAndCompare(ctx, t, kvs, memStore, datas, stats, kvs[0].Key, memSizeLimit) } func TestReadAllOneFile(t *testing.T) { @@ -179,5 +101,8 @@ func TestReadAllOneFile(t *testing.T) { slices.SortFunc(kvs, func(i, j common.KvPair) int { return bytes.Compare(i.Key, j.Key) }) - testReadAndCompare(ctx, t, kvs, memStore, memSizeLimit) + + datas, stats, err := GetAllFileNames(ctx, memStore, "") + require.NoError(t, err) + testReadAndCompare(ctx, t, kvs, memStore, datas, stats, kvs[0].Key, memSizeLimit) } diff --git a/br/pkg/lightning/backend/external/sort_test.go b/br/pkg/lightning/backend/external/sort_test.go index 29872aca78a25..667dbcd51fd7a 100644 --- a/br/pkg/lightning/backend/external/sort_test.go +++ b/br/pkg/lightning/backend/external/sort_test.go @@ -17,16 +17,13 @@ package external import ( "bytes" "context" - "math" "slices" "testing" "time" "github.com/google/uuid" - "github.com/jfcg/sorty/v2" "github.com/pingcap/tidb/br/pkg/lightning/backend/kv" "github.com/pingcap/tidb/br/pkg/lightning/common" - "github.com/pingcap/tidb/br/pkg/membuf" "github.com/pingcap/tidb/br/pkg/storage" dbkv "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/util/size" @@ -106,74 +103,7 @@ func TestGlobalSortLocalBasic(t *testing.T) { require.NoError(t, err) // 2. read and sort step - splitter, err := NewRangeSplitter( - ctx, - lastStepDatas, - lastStepStats, - memStore, - int64(memSizeLimit), // make the group small for testing - math.MaxInt64, - 4*1024*1024*1024, - math.MaxInt64, - true, - ) - require.NoError(t, err) - - bufPool := membuf.NewPool() - loaded := &memKVsAndBuffers{} - kvIdx := 0 - - for { - endKeyOfGroup, dataFilesOfGroup, statFilesOfGroup, _, err := splitter.SplitOneRangesGroup() - require.NoError(t, err) - curEnd := dbkv.Key(endKeyOfGroup).Clone() - if len(endKeyOfGroup) == 0 { - curEnd = dbkv.Key(kvs[len(kvs)-1].Key).Next().Clone() - } - - err = readAllData( - ctx, - memStore, - dataFilesOfGroup, - statFilesOfGroup, - startKey, - curEnd, - bufPool, - loaded, - ) - - require.NoError(t, err) - // check kvs sorted - sorty.MaxGor = uint64(8) - sorty.Sort(len(loaded.keys), func(i, k, r, s int) bool { - if bytes.Compare(loaded.keys[i], loaded.keys[k]) < 0 { // strict comparator like < or > - if r != s { - loaded.keys[r], loaded.keys[s] = loaded.keys[s], loaded.keys[r] - loaded.values[r], loaded.values[s] = loaded.values[s], loaded.values[r] - } - return true - } - return false - }) - for i, key := range loaded.keys { - require.EqualValues(t, kvs[kvIdx].Key, key) - require.EqualValues(t, kvs[kvIdx].Val, loaded.values[i]) - kvIdx++ - } - - // release - loaded.keys = nil - loaded.values = nil - loaded.memKVBuffers = nil - startKey = curEnd.Clone() - - if len(endKeyOfGroup) == 0 { - break - } - } - - err = splitter.Close() - require.NoError(t, err) + testReadAndCompare(ctx, t, kvs, memStore, lastStepDatas, lastStepStats, startKey, memSizeLimit) } func TestGlobalSortLocalWithMerge(t *testing.T) { @@ -255,72 +185,5 @@ func TestGlobalSortLocalWithMerge(t *testing.T) { } // 3. read and sort step - splitter, err := NewRangeSplitter( - ctx, - lastStepDatas, - lastStepStats, - memStore, - int64(memSizeLimit), // make the group small for testing - math.MaxInt64, - 4*1024*1024*1024, - math.MaxInt64, - true, - ) - require.NoError(t, err) - - bufPool := membuf.NewPool() - loaded := &memKVsAndBuffers{} - kvIdx := 0 - - for { - endKeyOfGroup, dataFilesOfGroup, statFilesOfGroup, _, err := splitter.SplitOneRangesGroup() - require.NoError(t, err) - curEnd := dbkv.Key(endKeyOfGroup).Clone() - if len(endKeyOfGroup) == 0 { - curEnd = dbkv.Key(kvs[len(kvs)-1].Key).Next().Clone() - } - - err = readAllData( - ctx, - memStore, - dataFilesOfGroup, - statFilesOfGroup, - startKey, - curEnd, - bufPool, - loaded, - ) - - require.NoError(t, err) - // check kvs sorted - sorty.MaxGor = uint64(8) - sorty.Sort(len(loaded.keys), func(i, k, r, s int) bool { - if bytes.Compare(loaded.keys[i], loaded.keys[k]) < 0 { // strict comparator like < or > - if r != s { - loaded.keys[r], loaded.keys[s] = loaded.keys[s], loaded.keys[r] - loaded.values[r], loaded.values[s] = loaded.values[s], loaded.values[r] - } - return true - } - return false - }) - for i, key := range loaded.keys { - require.EqualValues(t, kvs[kvIdx].Key, key) - require.EqualValues(t, kvs[kvIdx].Val, loaded.values[i]) - kvIdx++ - } - - // release - loaded.keys = nil - loaded.values = nil - loaded.memKVBuffers = nil - startKey = curEnd.Clone() - - if len(endKeyOfGroup) == 0 { - break - } - } - - err = splitter.Close() - require.NoError(t, err) + testReadAndCompare(ctx, t, kvs, memStore, lastStepDatas, lastStepStats, startKey, memSizeLimit) } diff --git a/br/pkg/lightning/backend/external/testutil.go b/br/pkg/lightning/backend/external/testutil.go new file mode 100644 index 0000000000000..97fbfd5baa54c --- /dev/null +++ b/br/pkg/lightning/backend/external/testutil.go @@ -0,0 +1,109 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package external + +import ( + "bytes" + "context" + "math" + "testing" + + "github.com/jfcg/sorty/v2" + "github.com/pingcap/tidb/br/pkg/lightning/common" + "github.com/pingcap/tidb/br/pkg/membuf" + "github.com/pingcap/tidb/br/pkg/storage" + dbkv "github.com/pingcap/tidb/pkg/kv" + "github.com/stretchr/testify/require" +) + +func testReadAndCompare( + ctx context.Context, + t *testing.T, + kvs []common.KvPair, + store storage.ExternalStorage, + datas []string, + stats []string, + startKey dbkv.Key, + memSizeLimit int) { + + splitter, err := NewRangeSplitter( + ctx, + datas, + stats, + store, + int64(memSizeLimit), // make the group small for testing + math.MaxInt64, + 4*1024*1024*1024, + math.MaxInt64, + true, + ) + require.NoError(t, err) + + bufPool := membuf.NewPool() + loaded := &memKVsAndBuffers{} + curStart := startKey.Clone() + kvIdx := 0 + + for { + endKeyOfGroup, dataFilesOfGroup, statFilesOfGroup, _, err := splitter.SplitOneRangesGroup() + require.NoError(t, err) + curEnd := dbkv.Key(endKeyOfGroup).Clone() + if len(endKeyOfGroup) == 0 { + curEnd = dbkv.Key(kvs[len(kvs)-1].Key).Next() + } + + err = readAllData( + ctx, + store, + dataFilesOfGroup, + statFilesOfGroup, + curStart, + curEnd, + bufPool, + loaded, + ) + + require.NoError(t, err) + // check kvs sorted + sorty.MaxGor = uint64(8) + sorty.Sort(len(loaded.keys), func(i, k, r, s int) bool { + if bytes.Compare(loaded.keys[i], loaded.keys[k]) < 0 { // strict comparator like < or > + if r != s { + loaded.keys[r], loaded.keys[s] = loaded.keys[s], loaded.keys[r] + loaded.values[r], loaded.values[s] = loaded.values[s], loaded.values[r] + } + return true + } + return false + }) + for i, key := range loaded.keys { + require.EqualValues(t, kvs[kvIdx].Key, key) + require.EqualValues(t, kvs[kvIdx].Val, loaded.values[i]) + kvIdx++ + } + + // release + loaded.keys = nil + loaded.values = nil + loaded.memKVBuffers = nil + curStart = curEnd.Clone() + + if len(endKeyOfGroup) == 0 { + break + } + } + err = splitter.Close() + require.NoError(t, err) +} From ebe1c15c559c3448150028e196a723ff6cb4275d Mon Sep 17 00:00:00 2001 From: ywqzzy <592838129@qq.com> Date: Thu, 14 Dec 2023 11:02:12 +0800 Subject: [PATCH 5/6] refine --- br/pkg/lightning/backend/external/reader_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/br/pkg/lightning/backend/external/reader_test.go b/br/pkg/lightning/backend/external/reader_test.go index 1f3f791a31fcb..e7fcd6c5e0586 100644 --- a/br/pkg/lightning/backend/external/reader_test.go +++ b/br/pkg/lightning/backend/external/reader_test.go @@ -104,5 +104,6 @@ func TestReadAllOneFile(t *testing.T) { datas, stats, err := GetAllFileNames(ctx, memStore, "") require.NoError(t, err) + testReadAndCompare(ctx, t, kvs, memStore, datas, stats, kvs[0].Key, memSizeLimit) } From 74790ee0b4337435e261dd387ff46c35d4fd5715 Mon Sep 17 00:00:00 2001 From: ywqzzy <592838129@qq.com> Date: Thu, 14 Dec 2023 11:04:07 +0800 Subject: [PATCH 6/6] refine --- .../lightning/backend/external/sort_test.go | 19 ------------------ br/pkg/lightning/backend/external/testutil.go | 20 +++++++++++++++++++ 2 files changed, 20 insertions(+), 19 deletions(-) diff --git a/br/pkg/lightning/backend/external/sort_test.go b/br/pkg/lightning/backend/external/sort_test.go index 667dbcd51fd7a..7ff2f9e20853a 100644 --- a/br/pkg/lightning/backend/external/sort_test.go +++ b/br/pkg/lightning/backend/external/sort_test.go @@ -31,25 +31,6 @@ import ( "golang.org/x/exp/rand" ) -// like scheduler code for merge sort step in add index and import into. -func splitDataAndStatFiles(datas []string, stats []string) ([][]string, [][]string) { - dataGroup := make([][]string, 0, 10) - statGroup := make([][]string, 0, 10) - - start := 0 - step := 10 - for start < len(datas) { - end := start + step - if end > len(datas) { - end = len(datas) - } - dataGroup = append(dataGroup, datas[start:end]) - statGroup = append(statGroup, stats[start:end]) - start = end - } - return dataGroup, statGroup -} - func TestGlobalSortLocalBasic(t *testing.T) { // 1. write data step seed := time.Now().Unix() diff --git a/br/pkg/lightning/backend/external/testutil.go b/br/pkg/lightning/backend/external/testutil.go index 97fbfd5baa54c..0879021a32917 100644 --- a/br/pkg/lightning/backend/external/testutil.go +++ b/br/pkg/lightning/backend/external/testutil.go @@ -107,3 +107,23 @@ func testReadAndCompare( err = splitter.Close() require.NoError(t, err) } + +// split data and stat files into groups for merge step. +// like scheduler code for merge sort step in add index and import into. +func splitDataAndStatFiles(datas []string, stats []string) ([][]string, [][]string) { + dataGroup := make([][]string, 0, 10) + statGroup := make([][]string, 0, 10) + + start := 0 + step := 10 + for start < len(datas) { + end := start + step + if end > len(datas) { + end = len(datas) + } + dataGroup = append(dataGroup, datas[start:end]) + statGroup = append(statGroup, stats[start:end]) + start = end + } + return dataGroup, statGroup +}