diff --git a/pkg/kv/kvclient/kvstreamer/avg_response_estimator.go b/pkg/kv/kvclient/kvstreamer/avg_response_estimator.go index 56c45eff01d4..60597f53547f 100644 --- a/pkg/kv/kvclient/kvstreamer/avg_response_estimator.go +++ b/pkg/kv/kvclient/kvstreamer/avg_response_estimator.go @@ -10,33 +10,77 @@ package kvstreamer +import "github.com/cockroachdb/cockroach/pkg/settings" + // avgResponseEstimator is a helper that estimates the average size of responses // received by the Streamer. It is **not** thread-safe. type avgResponseEstimator struct { + avgResponseSizeMultiple float64 // responseBytes tracks the total footprint of all responses that the // Streamer has already received. - responseBytes int64 - numResponses int64 + responseBytes float64 + numResponses float64 } -// TODO(yuzefovich): use the optimizer-driven estimates. -const initialAvgResponseSize = 1 << 10 // 1KiB +const ( + // TODO(yuzefovich): use the optimizer-driven estimates. + initialAvgResponseSize = 1 << 10 // 1KiB + // This value was determined using tpchvec/bench test on all TPC-H queries. + defaultAvgResponseSizeMultiple = 1.5 +) + +// streamerAvgResponseSizeMultiple determines the multiple used when calculating +// the average response size. +var streamerAvgResponseSizeMultiple = settings.RegisterFloatSetting( + settings.TenantReadOnly, + "sql.distsql.streamer.avg_response_size_multiple", + "determines the multiple used when calculating the average response size by the streamer component", + defaultAvgResponseSizeMultiple, +) + +func (e *avgResponseEstimator) init(sv *settings.Values) { + e.avgResponseSizeMultiple = streamerAvgResponseSizeMultiple.Get(sv) +} func (e *avgResponseEstimator) getAvgResponseSize() int64 { if e.numResponses == 0 { return initialAvgResponseSize } + // Note that we're multiplying the average by a response size multiple for a + // couple of reasons: + // + // 1. this allows us to fulfill requests that are slightly larger than the + // current average. For example, imagine that we're processing three + // requests of sizes 100B, 110B, and 120B sequentially, one at a time. + // Without the multiple, after the first request, our estimate would be 100B + // so the second request would come back empty (with ResumeNextBytes=110), + // so we'd have to re-issue the second request. At that point the average is + // 105B, so the third request would again come back empty and need to be + // re-issued with larger TargetBytes. Having the multiple allows us to + // handle such a scenario without any requests coming back empty. In + // particular, TPC-H Q17 has similar setup. + // + // 2. this allows us to slowly grow the TargetBytes parameter over time when + // requests can be returned partially multiple times (i.e. Scan requests + // spanning multiple rows). Consider a case when a single Scan request has + // to return 1MB worth of data, but each row is only 100B. With the initial + // estimate of 1KB, every request would always come back with exactly 10 + // rows, and the avg response size would always stay at 1KB. We'd end up + // issuing 1000 of such requests. Having a multiple here allows us to grow + // the estimate over time, reducing the total number of requests needed + // (although the growth is still not fast enough). + // // TODO(yuzefovich): we currently use a simple average over the received // responses, but it is likely to be suboptimal because it would be unfair // to "large" batches that come in late (i.e. it would not be reactive // enough). Consider using another function here. - return e.responseBytes / e.numResponses + return int64(e.responseBytes / e.numResponses * e.avgResponseSizeMultiple) } // update updates the actual information of the estimator based on numResponses // responses that took up responseBytes bytes and correspond to a single // BatchResponse. func (e *avgResponseEstimator) update(responseBytes int64, numResponses int64) { - e.responseBytes += responseBytes - e.numResponses += numResponses + e.responseBytes += float64(responseBytes) + e.numResponses += float64(numResponses) } diff --git a/pkg/kv/kvclient/kvstreamer/avg_response_estimator_test.go b/pkg/kv/kvclient/kvstreamer/avg_response_estimator_test.go index 7c3337f59f26..f510d2e0c2b5 100644 --- a/pkg/kv/kvclient/kvstreamer/avg_response_estimator_test.go +++ b/pkg/kv/kvclient/kvstreamer/avg_response_estimator_test.go @@ -23,7 +23,10 @@ func TestAvgResponseEstimator(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - var e avgResponseEstimator + e := avgResponseEstimator{avgResponseSizeMultiple: defaultAvgResponseSizeMultiple} + withMultiple := func(s int64) int64 { + return int64(float64(s) * defaultAvgResponseSizeMultiple) + } // Before receiving any responses, we should be using the initial estimate. require.Equal(t, int64(initialAvgResponseSize), e.getAvgResponseSize()) @@ -31,8 +34,9 @@ func TestAvgResponseEstimator(t *testing.T) { // Simulate receiving a single response. firstResponseSize := int64(42) e.update(firstResponseSize, 1) - // The estimate should now be exactly the size of that single response. - require.Equal(t, firstResponseSize, e.getAvgResponseSize()) + // The estimate should now be the size of that single response times + // defaultAvgResponseSizeMultiple. + require.Equal(t, withMultiple(firstResponseSize), e.getAvgResponseSize()) // Simulate receiving 100 small BatchResponses. smallResponseSize := int64(63) @@ -40,8 +44,8 @@ func TestAvgResponseEstimator(t *testing.T) { e.update(smallResponseSize*5, 5) } // The estimate should now be pretty close to the size of a single response - // in the small BatchResponse. - diff := smallResponseSize - e.getAvgResponseSize() + // in the small BatchResponse (after adjusting with the multiple). + diff := withMultiple(smallResponseSize) - e.getAvgResponseSize() require.True(t, math.Abs(float64(diff))/float64(smallResponseSize) < 0.05) // Now simulate receiving 10 large BatchResponses. @@ -50,7 +54,7 @@ func TestAvgResponseEstimator(t *testing.T) { e.update(largeResponseSize*1000, 1000) } // The estimate should now be pretty close to the size of a single response - // in the large BatchResponse. - diff = largeResponseSize - e.getAvgResponseSize() + // in the large BatchResponse (after adjusting with the multiple). + diff = withMultiple(largeResponseSize) - e.getAvgResponseSize() require.True(t, math.Abs(float64(diff))/float64(smallResponseSize) < 0.15) } diff --git a/pkg/kv/kvclient/kvstreamer/streamer.go b/pkg/kv/kvclient/kvstreamer/streamer.go index 3a7dd8ddb366..575a00c76ec5 100644 --- a/pkg/kv/kvclient/kvstreamer/streamer.go +++ b/pkg/kv/kvclient/kvstreamer/streamer.go @@ -334,6 +334,7 @@ func NewStreamer( "single Streamer async concurrency", uint64(streamerConcurrencyLimit.Get(&st.SV)), ) + s.mu.avgResponseEstimator.init(&st.SV) return s }