diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html
index 1332aad46c50..15174f25195b 100644
--- a/docs/generated/settings/settings.html
+++ b/docs/generated/settings/settings.html
@@ -28,6 +28,7 @@
kv.allocator.load_based_rebalancing | enumeration | 2 | whether to rebalance based on the distribution of QPS across stores [off = 0, leases = 1, leases and replicas = 2] |
kv.allocator.qps_rebalance_threshold | float | 0.25 | minimum fraction away from the mean a store's QPS (such as queries per second) can be before it is considered overfull or underfull |
kv.allocator.range_rebalance_threshold | float | 0.05 | minimum fraction away from the mean a store's range count can be before it is considered overfull or underfull |
+kv.bulk_io_write.concurrent_addsstable_requests | integer | 1 | number of AddSSTable requests a store will handle concurrently before queuing |
kv.bulk_io_write.concurrent_export_requests | integer | 3 | number of export requests a store will handle concurrently before queuing |
kv.bulk_io_write.concurrent_import_requests | integer | 1 | number of import requests a store will handle concurrently before queuing |
kv.bulk_io_write.max_rate | byte size | 8.0 EiB | the rate limit (bytes/sec) to use for writes to disk on behalf of bulk io ops |
diff --git a/pkg/ccl/storageccl/export.go b/pkg/ccl/storageccl/export.go
index 66e7382628f6..af378dcc4817 100644
--- a/pkg/ccl/storageccl/export.go
+++ b/pkg/ccl/storageccl/export.go
@@ -68,10 +68,10 @@ func evalExport(
reply.StartTime = gcThreshold
}
- if err := cArgs.EvalCtx.GetLimiters().ConcurrentExports.Begin(ctx); err != nil {
+ if err := cArgs.EvalCtx.GetLimiters().ConcurrentExportRequests.Begin(ctx); err != nil {
return result.Result{}, err
}
- defer cArgs.EvalCtx.GetLimiters().ConcurrentExports.Finish()
+ defer cArgs.EvalCtx.GetLimiters().ConcurrentExportRequests.Finish()
makeExportStorage := !args.ReturnSST || (args.Storage != roachpb.ExportStorage{})
if makeExportStorage || log.V(1) {
diff --git a/pkg/ccl/storageccl/import.go b/pkg/ccl/storageccl/import.go
index 5e9d3ed43308..e46bac1cbcfb 100644
--- a/pkg/ccl/storageccl/import.go
+++ b/pkg/ccl/storageccl/import.go
@@ -78,10 +78,10 @@ func evalImport(ctx context.Context, cArgs batcheval.CommandArgs) (*roachpb.Impo
return nil, errors.Wrap(err, "make key rewriter")
}
- if err := cArgs.EvalCtx.GetLimiters().ConcurrentImports.Begin(ctx); err != nil {
+ if err := cArgs.EvalCtx.GetLimiters().ConcurrentImportRequests.Begin(ctx); err != nil {
return nil, err
}
- defer cArgs.EvalCtx.GetLimiters().ConcurrentImports.Finish()
+ defer cArgs.EvalCtx.GetLimiters().ConcurrentImportRequests.Finish()
var iters []engine.SimpleIterator
for _, file := range args.Files {
diff --git a/pkg/roachpb/batch.go b/pkg/roachpb/batch.go
index 7bcbedcdde85..fd68da609924 100644
--- a/pkg/roachpb/batch.go
+++ b/pkg/roachpb/batch.go
@@ -217,6 +217,16 @@ func (ba *BatchRequest) IsSingleComputeChecksumRequest() bool {
return false
}
+// IsSingleAddSSTableRequest returns true iff the batch contains a single
+// request, and that request is an AddSSTableRequest.
+func (ba *BatchRequest) IsSingleAddSSTableRequest() bool {
+ if ba.IsSingleRequest() {
+ _, ok := ba.Requests[0].GetInner().(*AddSSTableRequest)
+ return ok
+ }
+ return false
+}
+
// IsCompleteTransaction determines whether a batch contains every write in a
// transactions.
func (ba *BatchRequest) IsCompleteTransaction() bool {
diff --git a/pkg/storage/batcheval/eval_context.go b/pkg/storage/batcheval/eval_context.go
index b2064837c22d..82a3fbf18bf9 100644
--- a/pkg/storage/batcheval/eval_context.go
+++ b/pkg/storage/batcheval/eval_context.go
@@ -34,9 +34,10 @@ import (
// Limiters is the collection of per-store limits used during cmd evaluation.
type Limiters struct {
- BulkIOWriteRate *rate.Limiter
- ConcurrentImports limit.ConcurrentRequestLimiter
- ConcurrentExports limit.ConcurrentRequestLimiter
+ BulkIOWriteRate *rate.Limiter
+ ConcurrentImportRequests limit.ConcurrentRequestLimiter
+ ConcurrentExportRequests limit.ConcurrentRequestLimiter
+ ConcurrentAddSSTableRequests limit.ConcurrentRequestLimiter
// concurrentRangefeedIters is a semaphore used to limit the number of
// rangefeeds in the "catch-up" state across the store. The "catch-up" state
// is a temporary state at the beginning of a rangefeed which is expensive
diff --git a/pkg/storage/store.go b/pkg/storage/store.go
index 58345fb1094e..75fcdf4a918a 100644
--- a/pkg/storage/store.go
+++ b/pkg/storage/store.go
@@ -123,6 +123,13 @@ var importRequestsLimit = settings.RegisterPositiveIntSetting(
1,
)
+// addSSTableRequestLimit limits concurrent AddSSTable requests.
+var addSSTableRequestLimit = settings.RegisterPositiveIntSetting(
+ "kv.bulk_io_write.concurrent_addsstable_requests",
+ "number of AddSSTable requests a store will handle concurrently before queuing",
+ 1,
+)
+
// concurrentRangefeedItersLimit limits concurrent rangefeed catchup iterators.
var concurrentRangefeedItersLimit = settings.RegisterPositiveIntSetting(
"kv.rangefeed.concurrent_catchup_iterators",
@@ -827,13 +834,13 @@ func NewStore(cfg StoreConfig, eng engine.Engine, nodeDesc *roachpb.NodeDescript
bulkIOWriteLimit.SetOnChange(&cfg.Settings.SV, func() {
s.limiters.BulkIOWriteRate.SetLimit(rate.Limit(bulkIOWriteLimit.Get(&cfg.Settings.SV)))
})
- s.limiters.ConcurrentImports = limit.MakeConcurrentRequestLimiter(
+ s.limiters.ConcurrentImportRequests = limit.MakeConcurrentRequestLimiter(
"importRequestLimiter", int(importRequestsLimit.Get(&cfg.Settings.SV)),
)
importRequestsLimit.SetOnChange(&cfg.Settings.SV, func() {
- s.limiters.ConcurrentImports.SetLimit(int(importRequestsLimit.Get(&cfg.Settings.SV)))
+ s.limiters.ConcurrentImportRequests.SetLimit(int(importRequestsLimit.Get(&cfg.Settings.SV)))
})
- s.limiters.ConcurrentExports = limit.MakeConcurrentRequestLimiter(
+ s.limiters.ConcurrentExportRequests = limit.MakeConcurrentRequestLimiter(
"exportRequestLimiter", int(ExportRequestsLimit.Get(&cfg.Settings.SV)),
)
// On low-CPU instances, a default limit value may still allow ExportRequests
@@ -847,7 +854,13 @@ func NewStore(cfg StoreConfig, eng engine.Engine, nodeDesc *roachpb.NodeDescript
if limit > exportCores {
limit = exportCores
}
- s.limiters.ConcurrentExports.SetLimit(limit)
+ s.limiters.ConcurrentExportRequests.SetLimit(limit)
+ })
+ s.limiters.ConcurrentAddSSTableRequests = limit.MakeConcurrentRequestLimiter(
+ "addSSTableRequestLimiter", int(addSSTableRequestLimit.Get(&cfg.Settings.SV)),
+ )
+ importRequestsLimit.SetOnChange(&cfg.Settings.SV, func() {
+ s.limiters.ConcurrentAddSSTableRequests.SetLimit(int(addSSTableRequestLimit.Get(&cfg.Settings.SV)))
})
s.limiters.ConcurrentRangefeedIters = limit.MakeConcurrentRequestLimiter(
"rangefeedIterLimiter", int(concurrentRangefeedItersLimit.Get(&cfg.Settings.SV)),
@@ -2735,6 +2748,15 @@ func (s *Store) Send(
}
}
+ // Limit the number of concurrent AddSSTable requests, since they're expensive
+ // and block all other writes to the same span.
+ if ba.IsSingleAddSSTableRequest() {
+ if err := s.limiters.ConcurrentAddSSTableRequests.Begin(ctx); err != nil {
+ return nil, roachpb.NewError(err)
+ }
+ defer s.limiters.ConcurrentAddSSTableRequests.Finish()
+ }
+
if err := ba.SetActiveTimestamp(s.Clock().Now); err != nil {
return nil, roachpb.NewError(err)
}