Skip to content

Commit

Permalink
changefeedccl: use numcpu >> 2 workers for event consumers
Browse files Browse the repository at this point in the history
Previously, a default value of 8 was used for the kvevent parallel consumer.
The reason for this was that we observed performance improvements in a 15 node
32 VCPU cluster when we increased this parameter to 8. After 8, the
improvements were much smaller.

The issue with a default of 8 is that that on smaller machines, 8
workers can be too much overhead, especially since the work is CPU intensive.

This change updates the default to be runtime.NumCPU() >> 2 workers, which
aligns with using 8 workers on 32 VCPU machines.

Fixes cockroachdb#89589
Epic: none

Release note: None
  • Loading branch information
jayshrivastava committed Oct 12, 2022
1 parent e5b68cf commit bafa186
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 5 deletions.
2 changes: 1 addition & 1 deletion docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ bulkio.backup.read_with_priority_after duration 1m0s amount of time since the re
bulkio.stream_ingestion.minimum_flush_interval duration 5s the minimum timestamp between flushes; flushes may still occur if internal buffers fill up
changefeed.balance_range_distribution.enable boolean false if enabled, the ranges are balanced equally among all nodes
changefeed.event_consumer_worker_queue_size integer 16 if changefeed.event_consumer_workers is enabled, this setting sets the maxmimum number of eventswhich a worker can buffer
changefeed.event_consumer_workers integer 8 the number of workers to use when processing events; 0 or 1 disables
changefeed.event_consumer_workers integer 0 the number of workers to use when processing events: <0 disables, 0 assigns a reasonable default, >0 assigns the setting value
changefeed.fast_gzip.enabled boolean true use fast gzip implementation
changefeed.node_throttle_config string specifies node level throttling configuration for all changefeeeds
changefeed.schema_feed.read_with_priority_after duration 1m0s retry with high priority if we were not able to read descriptors for too long; 0 disables
Expand Down
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
<tr><td><code>bulkio.stream_ingestion.minimum_flush_interval</code></td><td>duration</td><td><code>5s</code></td><td>the minimum timestamp between flushes; flushes may still occur if internal buffers fill up</td></tr>
<tr><td><code>changefeed.balance_range_distribution.enable</code></td><td>boolean</td><td><code>false</code></td><td>if enabled, the ranges are balanced equally among all nodes</td></tr>
<tr><td><code>changefeed.event_consumer_worker_queue_size</code></td><td>integer</td><td><code>16</code></td><td>if changefeed.event_consumer_workers is enabled, this setting sets the maxmimum number of eventswhich a worker can buffer</td></tr>
<tr><td><code>changefeed.event_consumer_workers</code></td><td>integer</td><td><code>8</code></td><td>the number of workers to use when processing events; 0 or 1 disables</td></tr>
<tr><td><code>changefeed.event_consumer_workers</code></td><td>integer</td><td><code>0</code></td><td>the number of workers to use when processing events: <0 disables, 0 assigns a reasonable default, >0 assigns the setting value</td></tr>
<tr><td><code>changefeed.fast_gzip.enabled</code></td><td>boolean</td><td><code>true</code></td><td>use fast gzip implementation</td></tr>
<tr><td><code>changefeed.node_throttle_config</code></td><td>string</td><td><code></code></td><td>specifies node level throttling configuration for all changefeeeds</td></tr>
<tr><td><code>changefeed.schema_feed.read_with_priority_after</code></td><td>duration</td><td><code>1m0s</code></td><td>retry with high priority if we were not able to read descriptors for too long; 0 disables</td></tr>
Expand Down
6 changes: 3 additions & 3 deletions pkg/ccl/changefeedccl/changefeedbase/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,9 +238,9 @@ var UseMuxRangeFeed = settings.RegisterBoolSetting(
var EventConsumerWorkers = settings.RegisterIntSetting(
settings.TenantWritable,
"changefeed.event_consumer_workers",
"the number of workers to use when processing events; 0 or 1 disables",
int64(util.ConstantWithMetamorphicTestRange("changefeed.consumer_max_workers", 8, 0, 32)),
settings.NonNegativeInt,
"the number of workers to use when processing events: <0 disables, "+
"0 assigns a reasonable default, >0 assigns the setting value",
0,
).WithPublic()

// EventConsumerWorkerQueueSize specifies the maximum number of events a worker buffer.
Expand Down
13 changes: 13 additions & 0 deletions pkg/ccl/changefeedccl/event_processing.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"context"
"hash"
"hash/crc32"
"runtime"

"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdceval"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent"
Expand Down Expand Up @@ -103,6 +104,10 @@ func newEventConsumer(

// TODO (jayshrivastava) enable parallel consumers for sinkless changefeeds
numWorkers := changefeedbase.EventConsumerWorkers.Get(&cfg.Settings.SV)
if numWorkers == 0 {
// Pick a reasonable default.
numWorkers = defaultNumWorkers()
}
if numWorkers <= 1 || isSinkless {
c, err := makeConsumer(sink, spanFrontier)
if err != nil {
Expand Down Expand Up @@ -134,6 +139,14 @@ func newEventConsumer(
return c, ss, nil
}

func defaultNumWorkers() int64 {
idealNumber := runtime.NumCPU() >> 2
if idealNumber < 1 {
return 1
}
return int64(idealNumber)
}

func makeHasher() hash.Hash32 {
return crc32.New(crc32.MakeTable(crc32.IEEE))
}
Expand Down

0 comments on commit bafa186

Please sign in to comment.