From 7645a1fa0394f87a473468d62aca689aba32cebb Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Mon, 27 Jun 2022 14:55:11 -0700 Subject: [PATCH] kvstreamer: improve the avg response size heuristic This commit improves the heuristic we use for estimating the average response size. Previously, we used a simple average, and now we multiple the average by 1.5. Having this multiple is good for a couple of reasons: - this allows us to fulfill requests that are slightly larger than the current average. For example, imagine that we're processing three requests of sizes 100B, 110B, and 120B sequentially, one at a time. Without the multiple, after the first request, our estimate would be 100B so the second request would come back empty (with ResumeNextBytes=110), so we'd have to re-issue the second request. At that point the average is 105B, so the third request would again come back empty and need to be re-issued with larger TargetBytes. Having the multiple allows us to handle such a scenario without any requests coming back empty. In particular, TPCH Q17 has similar setup. - this allows us to slowly grow the TargetBytes parameter over time when requests can be returned partially multiple times (i.e. Scan requests spanning multiple rows). Consider a case when a single Scan request has to return 1MB worth of data, but each row is only 100B. With the initial estimate of 1KB, every request would always come back with exactly 10 rows, and the avg response size would always stay at 1KB. We'd end up issuing 1000 of such requests. Having a multiple here allows us to grow the estimate over time, reducing the total number of requests needed. This multiple seems to fix the remaining perf regression on Q17 when comparing against the streamer OFF config. This commit also introduces a cluster setting that controls this multiple. Value of 1.5 was chosen using `tpchvec/bench` and this setting. Additionally, I introduced a similar cluster setting for the initial avg response size estimate (currently hard-coded at 1KiB) and used `tpchvec/bench`, and it showed that 1KiB value is pretty good. It was also the value mentioned in the RFC, so I decided to remove the corresponding setting. Release note: None --- .../kvstreamer/avg_response_estimator.go | 58 ++++++++++++++++--- .../kvstreamer/avg_response_estimator_test.go | 18 +++--- pkg/kv/kvclient/kvstreamer/streamer.go | 1 + 3 files changed, 63 insertions(+), 14 deletions(-) diff --git a/pkg/kv/kvclient/kvstreamer/avg_response_estimator.go b/pkg/kv/kvclient/kvstreamer/avg_response_estimator.go index 56c45eff01d4..60597f53547f 100644 --- a/pkg/kv/kvclient/kvstreamer/avg_response_estimator.go +++ b/pkg/kv/kvclient/kvstreamer/avg_response_estimator.go @@ -10,33 +10,77 @@ package kvstreamer +import "github.com/cockroachdb/cockroach/pkg/settings" + // avgResponseEstimator is a helper that estimates the average size of responses // received by the Streamer. It is **not** thread-safe. type avgResponseEstimator struct { + avgResponseSizeMultiple float64 // responseBytes tracks the total footprint of all responses that the // Streamer has already received. - responseBytes int64 - numResponses int64 + responseBytes float64 + numResponses float64 } -// TODO(yuzefovich): use the optimizer-driven estimates. -const initialAvgResponseSize = 1 << 10 // 1KiB +const ( + // TODO(yuzefovich): use the optimizer-driven estimates. + initialAvgResponseSize = 1 << 10 // 1KiB + // This value was determined using tpchvec/bench test on all TPC-H queries. + defaultAvgResponseSizeMultiple = 1.5 +) + +// streamerAvgResponseSizeMultiple determines the multiple used when calculating +// the average response size. +var streamerAvgResponseSizeMultiple = settings.RegisterFloatSetting( + settings.TenantReadOnly, + "sql.distsql.streamer.avg_response_size_multiple", + "determines the multiple used when calculating the average response size by the streamer component", + defaultAvgResponseSizeMultiple, +) + +func (e *avgResponseEstimator) init(sv *settings.Values) { + e.avgResponseSizeMultiple = streamerAvgResponseSizeMultiple.Get(sv) +} func (e *avgResponseEstimator) getAvgResponseSize() int64 { if e.numResponses == 0 { return initialAvgResponseSize } + // Note that we're multiplying the average by a response size multiple for a + // couple of reasons: + // + // 1. this allows us to fulfill requests that are slightly larger than the + // current average. For example, imagine that we're processing three + // requests of sizes 100B, 110B, and 120B sequentially, one at a time. + // Without the multiple, after the first request, our estimate would be 100B + // so the second request would come back empty (with ResumeNextBytes=110), + // so we'd have to re-issue the second request. At that point the average is + // 105B, so the third request would again come back empty and need to be + // re-issued with larger TargetBytes. Having the multiple allows us to + // handle such a scenario without any requests coming back empty. In + // particular, TPC-H Q17 has similar setup. + // + // 2. this allows us to slowly grow the TargetBytes parameter over time when + // requests can be returned partially multiple times (i.e. Scan requests + // spanning multiple rows). Consider a case when a single Scan request has + // to return 1MB worth of data, but each row is only 100B. With the initial + // estimate of 1KB, every request would always come back with exactly 10 + // rows, and the avg response size would always stay at 1KB. We'd end up + // issuing 1000 of such requests. Having a multiple here allows us to grow + // the estimate over time, reducing the total number of requests needed + // (although the growth is still not fast enough). + // // TODO(yuzefovich): we currently use a simple average over the received // responses, but it is likely to be suboptimal because it would be unfair // to "large" batches that come in late (i.e. it would not be reactive // enough). Consider using another function here. - return e.responseBytes / e.numResponses + return int64(e.responseBytes / e.numResponses * e.avgResponseSizeMultiple) } // update updates the actual information of the estimator based on numResponses // responses that took up responseBytes bytes and correspond to a single // BatchResponse. func (e *avgResponseEstimator) update(responseBytes int64, numResponses int64) { - e.responseBytes += responseBytes - e.numResponses += numResponses + e.responseBytes += float64(responseBytes) + e.numResponses += float64(numResponses) } diff --git a/pkg/kv/kvclient/kvstreamer/avg_response_estimator_test.go b/pkg/kv/kvclient/kvstreamer/avg_response_estimator_test.go index 7c3337f59f26..f510d2e0c2b5 100644 --- a/pkg/kv/kvclient/kvstreamer/avg_response_estimator_test.go +++ b/pkg/kv/kvclient/kvstreamer/avg_response_estimator_test.go @@ -23,7 +23,10 @@ func TestAvgResponseEstimator(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - var e avgResponseEstimator + e := avgResponseEstimator{avgResponseSizeMultiple: defaultAvgResponseSizeMultiple} + withMultiple := func(s int64) int64 { + return int64(float64(s) * defaultAvgResponseSizeMultiple) + } // Before receiving any responses, we should be using the initial estimate. require.Equal(t, int64(initialAvgResponseSize), e.getAvgResponseSize()) @@ -31,8 +34,9 @@ func TestAvgResponseEstimator(t *testing.T) { // Simulate receiving a single response. firstResponseSize := int64(42) e.update(firstResponseSize, 1) - // The estimate should now be exactly the size of that single response. - require.Equal(t, firstResponseSize, e.getAvgResponseSize()) + // The estimate should now be the size of that single response times + // defaultAvgResponseSizeMultiple. + require.Equal(t, withMultiple(firstResponseSize), e.getAvgResponseSize()) // Simulate receiving 100 small BatchResponses. smallResponseSize := int64(63) @@ -40,8 +44,8 @@ func TestAvgResponseEstimator(t *testing.T) { e.update(smallResponseSize*5, 5) } // The estimate should now be pretty close to the size of a single response - // in the small BatchResponse. - diff := smallResponseSize - e.getAvgResponseSize() + // in the small BatchResponse (after adjusting with the multiple). + diff := withMultiple(smallResponseSize) - e.getAvgResponseSize() require.True(t, math.Abs(float64(diff))/float64(smallResponseSize) < 0.05) // Now simulate receiving 10 large BatchResponses. @@ -50,7 +54,7 @@ func TestAvgResponseEstimator(t *testing.T) { e.update(largeResponseSize*1000, 1000) } // The estimate should now be pretty close to the size of a single response - // in the large BatchResponse. - diff = largeResponseSize - e.getAvgResponseSize() + // in the large BatchResponse (after adjusting with the multiple). + diff = withMultiple(largeResponseSize) - e.getAvgResponseSize() require.True(t, math.Abs(float64(diff))/float64(smallResponseSize) < 0.15) } diff --git a/pkg/kv/kvclient/kvstreamer/streamer.go b/pkg/kv/kvclient/kvstreamer/streamer.go index 6418c7e3964b..1104742d0dbd 100644 --- a/pkg/kv/kvclient/kvstreamer/streamer.go +++ b/pkg/kv/kvclient/kvstreamer/streamer.go @@ -334,6 +334,7 @@ func NewStreamer( "single Streamer async concurrency", uint64(streamerConcurrencyLimit.Get(&st.SV)), ) + s.mu.avgResponseEstimator.init(&st.SV) return s }