Skip to content

Commit

Permalink
Merge #83472
Browse files Browse the repository at this point in the history
83472: kvstreamer: improve the avg response size heuristic r=yuzefovich a=yuzefovich

This commit improves the heuristic we use for estimating the average
response size. Previously, we used a simple average, and now we multiple
the average by 1.5.

Having this multiple is good for a couple of reasons:
- this allows us to fulfill requests that are slightly larger than the
current average. For example, imagine that we're processing three
requests of sizes 100B, 110B, and 120B sequentially, one at a time.
Without the multiple, after the first request, our estimate would be 100B
so the second request would come back empty (with ResumeNextBytes=110),
so we'd have to re-issue the second request. At that point the average is
105B, so the third request would again come back empty and need to be
re-issued with larger TargetBytes. Having the multiple allows us to
handle such a scenario without any requests coming back empty. In
particular, TPCH Q17 has similar setup.
- this allows us to slowly grow the TargetBytes parameter over time when
requests can be returned partially multiple times (i.e. Scan requests
spanning multiple rows). Consider a case when a single Scan request has
to return 1MB worth of data, but each row is only 100B. With the initial
estimate of 1KB, every request would always come back with exactly 10
rows, and the avg response size would always stay at 1KB. We'd end up
issuing 1000 of such requests. Having a multiple here allows us to grow
the estimate over time, reducing the total number of requests needed.

This multiple seems to fix the remaining perf regression on Q17 when
comparing against the streamer OFF config.

This commit also introduces a cluster setting that controls this
multiple. Value of 1.5 was chosen using `tpchvec/bench` and this
setting.

Additionally, I introduced a similar cluster setting for the initial avg
response size estimate (currently hard-coded at 1KiB) and used
`tpchvec/bench`, and it showed that 1KiB value is pretty good. It was
also the value mentioned in the RFC, so I decided to remove the
corresponding setting.

Addresses: #82159.

Release note: None

Co-authored-by: Yahor Yuzefovich <[email protected]>
  • Loading branch information
craig[bot] and yuzefovich committed Jun 29, 2022
2 parents 908e429 + 7645a1f commit 6bcabcf
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 14 deletions.
58 changes: 51 additions & 7 deletions pkg/kv/kvclient/kvstreamer/avg_response_estimator.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,33 +10,77 @@

package kvstreamer

import "github.com/cockroachdb/cockroach/pkg/settings"

// avgResponseEstimator is a helper that estimates the average size of responses
// received by the Streamer. It is **not** thread-safe.
type avgResponseEstimator struct {
avgResponseSizeMultiple float64
// responseBytes tracks the total footprint of all responses that the
// Streamer has already received.
responseBytes int64
numResponses int64
responseBytes float64
numResponses float64
}

// TODO(yuzefovich): use the optimizer-driven estimates.
const initialAvgResponseSize = 1 << 10 // 1KiB
const (
// TODO(yuzefovich): use the optimizer-driven estimates.
initialAvgResponseSize = 1 << 10 // 1KiB
// This value was determined using tpchvec/bench test on all TPC-H queries.
defaultAvgResponseSizeMultiple = 1.5
)

// streamerAvgResponseSizeMultiple determines the multiple used when calculating
// the average response size.
var streamerAvgResponseSizeMultiple = settings.RegisterFloatSetting(
settings.TenantReadOnly,
"sql.distsql.streamer.avg_response_size_multiple",
"determines the multiple used when calculating the average response size by the streamer component",
defaultAvgResponseSizeMultiple,
)

func (e *avgResponseEstimator) init(sv *settings.Values) {
e.avgResponseSizeMultiple = streamerAvgResponseSizeMultiple.Get(sv)
}

func (e *avgResponseEstimator) getAvgResponseSize() int64 {
if e.numResponses == 0 {
return initialAvgResponseSize
}
// Note that we're multiplying the average by a response size multiple for a
// couple of reasons:
//
// 1. this allows us to fulfill requests that are slightly larger than the
// current average. For example, imagine that we're processing three
// requests of sizes 100B, 110B, and 120B sequentially, one at a time.
// Without the multiple, after the first request, our estimate would be 100B
// so the second request would come back empty (with ResumeNextBytes=110),
// so we'd have to re-issue the second request. At that point the average is
// 105B, so the third request would again come back empty and need to be
// re-issued with larger TargetBytes. Having the multiple allows us to
// handle such a scenario without any requests coming back empty. In
// particular, TPC-H Q17 has similar setup.
//
// 2. this allows us to slowly grow the TargetBytes parameter over time when
// requests can be returned partially multiple times (i.e. Scan requests
// spanning multiple rows). Consider a case when a single Scan request has
// to return 1MB worth of data, but each row is only 100B. With the initial
// estimate of 1KB, every request would always come back with exactly 10
// rows, and the avg response size would always stay at 1KB. We'd end up
// issuing 1000 of such requests. Having a multiple here allows us to grow
// the estimate over time, reducing the total number of requests needed
// (although the growth is still not fast enough).
//
// TODO(yuzefovich): we currently use a simple average over the received
// responses, but it is likely to be suboptimal because it would be unfair
// to "large" batches that come in late (i.e. it would not be reactive
// enough). Consider using another function here.
return e.responseBytes / e.numResponses
return int64(e.responseBytes / e.numResponses * e.avgResponseSizeMultiple)
}

// update updates the actual information of the estimator based on numResponses
// responses that took up responseBytes bytes and correspond to a single
// BatchResponse.
func (e *avgResponseEstimator) update(responseBytes int64, numResponses int64) {
e.responseBytes += responseBytes
e.numResponses += numResponses
e.responseBytes += float64(responseBytes)
e.numResponses += float64(numResponses)
}
18 changes: 11 additions & 7 deletions pkg/kv/kvclient/kvstreamer/avg_response_estimator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,25 +23,29 @@ func TestAvgResponseEstimator(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

var e avgResponseEstimator
e := avgResponseEstimator{avgResponseSizeMultiple: defaultAvgResponseSizeMultiple}
withMultiple := func(s int64) int64 {
return int64(float64(s) * defaultAvgResponseSizeMultiple)
}

// Before receiving any responses, we should be using the initial estimate.
require.Equal(t, int64(initialAvgResponseSize), e.getAvgResponseSize())

// Simulate receiving a single response.
firstResponseSize := int64(42)
e.update(firstResponseSize, 1)
// The estimate should now be exactly the size of that single response.
require.Equal(t, firstResponseSize, e.getAvgResponseSize())
// The estimate should now be the size of that single response times
// defaultAvgResponseSizeMultiple.
require.Equal(t, withMultiple(firstResponseSize), e.getAvgResponseSize())

// Simulate receiving 100 small BatchResponses.
smallResponseSize := int64(63)
for i := 0; i < 100; i++ {
e.update(smallResponseSize*5, 5)
}
// The estimate should now be pretty close to the size of a single response
// in the small BatchResponse.
diff := smallResponseSize - e.getAvgResponseSize()
// in the small BatchResponse (after adjusting with the multiple).
diff := withMultiple(smallResponseSize) - e.getAvgResponseSize()
require.True(t, math.Abs(float64(diff))/float64(smallResponseSize) < 0.05)

// Now simulate receiving 10 large BatchResponses.
Expand All @@ -50,7 +54,7 @@ func TestAvgResponseEstimator(t *testing.T) {
e.update(largeResponseSize*1000, 1000)
}
// The estimate should now be pretty close to the size of a single response
// in the large BatchResponse.
diff = largeResponseSize - e.getAvgResponseSize()
// in the large BatchResponse (after adjusting with the multiple).
diff = withMultiple(largeResponseSize) - e.getAvgResponseSize()
require.True(t, math.Abs(float64(diff))/float64(smallResponseSize) < 0.15)
}
1 change: 1 addition & 0 deletions pkg/kv/kvclient/kvstreamer/streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,7 @@ func NewStreamer(
"single Streamer async concurrency",
uint64(streamerConcurrencyLimit.Get(&st.SV)),
)
s.mu.avgResponseEstimator.init(&st.SV)
return s
}

Expand Down

0 comments on commit 6bcabcf

Please sign in to comment.