Skip to content

Commit

Permalink
colexec: extend external sort benchmark for top K case
Browse files Browse the repository at this point in the history
Release note: None
  • Loading branch information
yuzefovich committed Jun 16, 2021
1 parent 8c4ae60 commit d2b2bc6
Showing 1 changed file with 52 additions and 40 deletions.
92 changes: 52 additions & 40 deletions pkg/sql/colexec/external_sort_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,50 +392,62 @@ func BenchmarkExternalSort(b *testing.B) {

for _, nBatches := range []int{1 << 1, 1 << 4, 1 << 8} {
for _, nCols := range []int{1, 2, 4} {
for _, spillForced := range []bool{false, true} {
flowCtx.Cfg.TestingKnobs.ForceDiskSpill = spillForced
name := fmt.Sprintf("rows=%d/cols=%d/spilled=%t", nBatches*coldata.BatchSize(), nCols, spillForced)
b.Run(name, func(b *testing.B) {
// 8 (bytes / int64) * nBatches (number of batches) * coldata.BatchSize() (rows /
// batch) * nCols (number of columns / row).
b.SetBytes(int64(8 * nBatches * coldata.BatchSize() * nCols))
typs := make([]*types.T, nCols)
for i := range typs {
typs[i] = types.Int
for _, topK := range []bool{false, true} {
for _, spillForced := range []bool{false, true} {
flowCtx.Cfg.TestingKnobs.ForceDiskSpill = spillForced
var topKSubstring string
if topK {
topKSubstring = "topK/"
}
batch := testAllocator.NewMemBatchWithMaxCapacity(typs)
batch.SetLength(coldata.BatchSize())
ordCols := make([]execinfrapb.Ordering_Column, nCols)
for i := range ordCols {
ordCols[i].ColIdx = uint32(i)
ordCols[i].Direction = execinfrapb.Ordering_Column_Direction(rng.Int() % 2)
col := batch.ColVec(i).Int64()
for j := 0; j < coldata.BatchSize(); j++ {
col[j] = rng.Int63() % int64((i*1024)+1)
name := fmt.Sprintf("rows=%d/cols=%d/%sspilled=%t", nBatches*coldata.BatchSize(), nCols, topKSubstring, spillForced)
b.Run(name, func(b *testing.B) {
// 8 (bytes / int64) * nBatches (number of batches) * coldata.BatchSize() (rows /
// batch) * nCols (number of columns / row).
b.SetBytes(int64(8 * nBatches * coldata.BatchSize() * nCols))
typs := make([]*types.T, nCols)
for i := range typs {
typs[i] = types.Int
}
}
b.ResetTimer()
for n := 0; n < b.N; n++ {
source := colexectestutils.NewFiniteBatchSource(testAllocator, batch, typs, nBatches)
var spilled bool
sorter, accounts, monitors, _, err := createDiskBackedSorter(
ctx, flowCtx, []colexecop.Operator{source}, typs, ordCols,
0 /* matchLen */, 0 /* k */, func() { spilled = true },
0 /* numForcedRepartitions */, false /* delegateFDAcquisitions */, queueCfg, &colexecop.TestingSemaphore{},
)
memAccounts = append(memAccounts, accounts...)
memMonitors = append(memMonitors, monitors...)
if err != nil {
b.Fatal(err)
batch := testAllocator.NewMemBatchWithMaxCapacity(typs)
batch.SetLength(coldata.BatchSize())
ordCols := make([]execinfrapb.Ordering_Column, nCols)
for i := range ordCols {
ordCols[i].ColIdx = uint32(i)
ordCols[i].Direction = execinfrapb.Ordering_Column_Direction(rng.Int() % 2)
col := batch.ColVec(i).Int64()
for j := 0; j < coldata.BatchSize(); j++ {
col[j] = rng.Int63() % int64((i*1024)+1)
}
}
sorter.Init(ctx)
for out := sorter.Next(); out.Length() != 0; out = sorter.Next() {
b.ResetTimer()
for n := 0; n < b.N; n++ {
source := colexectestutils.NewFiniteBatchSource(testAllocator, batch, typs, nBatches)
var spilled bool
k := uint64(0)
if topK {
// Pick the same value for K as we do in the
// in-memory top K sort benchmark.
k = 128
}
sorter, accounts, monitors, _, err := createDiskBackedSorter(
ctx, flowCtx, []colexecop.Operator{source}, typs, ordCols,
0 /* matchLen */, k, func() { spilled = true },
0 /* numForcedRepartitions */, false /* delegateFDAcquisitions */, queueCfg, &colexecop.TestingSemaphore{},
)
memAccounts = append(memAccounts, accounts...)
memMonitors = append(memMonitors, monitors...)
if err != nil {
b.Fatal(err)
}
sorter.Init(ctx)
for out := sorter.Next(); out.Length() != 0; out = sorter.Next() {
}
require.Equal(b, spillForced, spilled, fmt.Sprintf(
"expected: spilled=%t\tactual: spilled=%t", spillForced, spilled,
))
}
require.Equal(b, spillForced, spilled, fmt.Sprintf(
"expected: spilled=%t\tactual: spilled=%t", spillForced, spilled,
))
}
})
})
}
}
}
}
Expand Down

0 comments on commit d2b2bc6

Please sign in to comment.