Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cdc: add elastic CPU control to CDC event processing #91554

Merged
merged 3 commits into from
Nov 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ bulkio.backup.read_timeout duration 5m0s amount of time after which a read attem
bulkio.backup.read_with_priority_after duration 1m0s amount of time since the read-as-of time above which a BACKUP should use priority when retrying reads
bulkio.stream_ingestion.minimum_flush_interval duration 5s the minimum timestamp between flushes; flushes may still occur if internal buffers fill up
changefeed.balance_range_distribution.enable boolean false if enabled, the ranges are balanced equally among all nodes
changefeed.event_consumer_worker_queue_size integer 16 if changefeed.event_consumer_workers is enabled, this setting sets the maxmimum number of eventswhich a worker can buffer
changefeed.event_consumer_workers integer 0 the number of workers to use when processing events: <0 disables, 0 assigns a reasonable default, >0 assigns the setting value
changefeed.event_consumer_worker_queue_size integer 16 if changefeed.event_consumer_workers is enabled, this setting sets the maxmimum number of events which a worker can buffer
changefeed.event_consumer_workers integer 0 the number of workers to use when processing events: <0 disables, 0 assigns a reasonable default, >0 assigns the setting value. for experimental/core changefeeds and changefeeds using parquet format, this is disabled
changefeed.fast_gzip.enabled boolean true use fast gzip implementation
changefeed.node_throttle_config string specifies node level throttling configuration for all changefeeeds
changefeed.schema_feed.read_with_priority_after duration 1m0s retry with high priority if we were not able to read descriptors for too long; 0 disables
Expand Down
4 changes: 2 additions & 2 deletions docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
<tr><td><code>bulkio.backup.read_with_priority_after</code></td><td>duration</td><td><code>1m0s</code></td><td>amount of time since the read-as-of time above which a BACKUP should use priority when retrying reads</td></tr>
<tr><td><code>bulkio.stream_ingestion.minimum_flush_interval</code></td><td>duration</td><td><code>5s</code></td><td>the minimum timestamp between flushes; flushes may still occur if internal buffers fill up</td></tr>
<tr><td><code>changefeed.balance_range_distribution.enable</code></td><td>boolean</td><td><code>false</code></td><td>if enabled, the ranges are balanced equally among all nodes</td></tr>
<tr><td><code>changefeed.event_consumer_worker_queue_size</code></td><td>integer</td><td><code>16</code></td><td>if changefeed.event_consumer_workers is enabled, this setting sets the maxmimum number of eventswhich a worker can buffer</td></tr>
<tr><td><code>changefeed.event_consumer_workers</code></td><td>integer</td><td><code>0</code></td><td>the number of workers to use when processing events: <0 disables, 0 assigns a reasonable default, >0 assigns the setting value</td></tr>
<tr><td><code>changefeed.event_consumer_worker_queue_size</code></td><td>integer</td><td><code>16</code></td><td>if changefeed.event_consumer_workers is enabled, this setting sets the maxmimum number of events which a worker can buffer</td></tr>
<tr><td><code>changefeed.event_consumer_workers</code></td><td>integer</td><td><code>0</code></td><td>the number of workers to use when processing events: <0 disables, 0 assigns a reasonable default, >0 assigns the setting value. for experimental/core changefeeds and changefeeds using parquet format, this is disabled</td></tr>
<tr><td><code>changefeed.fast_gzip.enabled</code></td><td>boolean</td><td><code>true</code></td><td>use fast gzip implementation</td></tr>
<tr><td><code>changefeed.node_throttle_config</code></td><td>string</td><td><code></code></td><td>specifies node level throttling configuration for all changefeeeds</td></tr>
<tr><td><code>changefeed.schema_feed.read_with_priority_after</code></td><td>duration</td><td><code>1m0s</code></td><td>retry with high priority if we were not able to read descriptors for too long; 0 disables</td></tr>
Expand Down
2 changes: 2 additions & 0 deletions pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ go_library(
"//pkg/sql/sqlutil",
"//pkg/sql/types",
"//pkg/util",
"//pkg/util/admission",
"//pkg/util/admission/admissionpb",
"//pkg/util/bitarray",
"//pkg/util/bufalloc",
"//pkg/util/cache",
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ func createBenchmarkChangefeed(
serverCfg := s.DistSQLServer().(*distsql.ServerImpl).ServerConfig
eventConsumer, err := newKVEventToRowConsumer(ctx, &serverCfg, nil, sf, initialHighWater,
sink, encoder, makeChangefeedConfigFromJobDetails(details),
execinfrapb.Expression{}, TestingKnobs{}, nil)
execinfrapb.Expression{}, TestingKnobs{}, nil, nil)

if err != nil {
return nil, nil, err
Expand Down
28 changes: 26 additions & 2 deletions pkg/ccl/changefeedccl/changefeedbase/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,16 +242,40 @@ var EventConsumerWorkers = settings.RegisterIntSetting(
settings.TenantWritable,
"changefeed.event_consumer_workers",
"the number of workers to use when processing events: <0 disables, "+
"0 assigns a reasonable default, >0 assigns the setting value",
"0 assigns a reasonable default, >0 assigns the setting value. for experimental/core "+
"changefeeds and changefeeds using parquet format, this is disabled",
0,
).WithPublic()

// EventConsumerWorkerQueueSize specifies the maximum number of events a worker buffer.
var EventConsumerWorkerQueueSize = settings.RegisterIntSetting(
settings.TenantWritable,
"changefeed.event_consumer_worker_queue_size",
"if changefeed.event_consumer_workers is enabled, this setting sets the maxmimum number of events"+
"if changefeed.event_consumer_workers is enabled, this setting sets the maxmimum number of events "+
"which a worker can buffer",
int64(util.ConstantWithMetamorphicTestRange("changefeed.event_consumer_worker_queue_size", 16, 0, 16)),
settings.NonNegativeInt,
).WithPublic()

// EventConsumerPacerRequestSize specifies how often (measured in CPU time)
// that event consumer workers request CPU time from admission control.
// For example, every N milliseconds of CPU work, request N more
// milliseconds of CPU time.
var EventConsumerPacerRequestSize = settings.RegisterDurationSetting(
settings.TenantWritable,
"changefeed.cpu.per_event_consumer_worker_allocation",
"an event consumer worker will perform a blocking request for CPU time "+
"before consuming events. after fully utilizing this CPU time, it will "+
"request more",
50*time.Millisecond,
settings.PositiveDuration,
)

// EventConsumerElasticCPUControlEnabled determines whether changefeed event
// processing integrates with elastic CPU control.
var EventConsumerElasticCPUControlEnabled = settings.RegisterBoolSetting(
settings.TenantWritable,
"changefeed.cpu.per_event_elastic_control.enabled",
"determines whether changefeed event processing integrates with elastic CPU control",
true,
)
68 changes: 65 additions & 3 deletions pkg/ccl/changefeedccl/event_processing.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,19 @@ import (
"hash"
"hash/crc32"
"runtime"
"time"

"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdceval"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/kvevent"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/sem/eval"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/util/admission"
"github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb"
"github.com/cockroachdb/cockroach/pkg/util/bufalloc"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
Expand All @@ -32,6 +36,10 @@ import (
"github.com/cockroachdb/errors"
)

// pacerLogEvery is used for logging errors instead of returning terminal
// errors when pacer.Pace returns an error.
var pacerLogEvery log.EveryN = log.Every(100 * time.Millisecond)

// eventContext holds metadata pertaining to event.
type eventContext struct {
updated, mvcc hlc.Timestamp
Expand Down Expand Up @@ -62,6 +70,17 @@ type kvEventToRowConsumer struct {

topicDescriptorCache map[TopicIdentifier]TopicDescriptor
topicNamer *TopicNamer

// This pacer is used to incorporate event consumption to elastic CPU
// control. This helps ensure that event encoding/decoding does not throttle
// foreground SQL traffic.
//
// Note that for pacer to function correctly,
// kvEventToRowConsumer.ConsumeEvent must be called by the same goroutine in a
// tight loop.
//
// The pacer is closed by kvEventToRowConsumer.Close.
pacer *admission.Pacer
}

func newEventConsumer(
Expand All @@ -85,6 +104,9 @@ func newEventConsumer(
return nil, nil, err
}

pacerRequestUnit := changefeedbase.EventConsumerPacerRequestSize.Get(&cfg.Settings.SV)
enablePacer := changefeedbase.EventConsumerElasticCPUControlEnabled.Get(&cfg.Settings.SV)

makeConsumer := func(s EventSink, frontier frontier) (eventConsumer, error) {
var err error
encoder, err := getEncoder(encodingOpts, feed.Targets)
Expand All @@ -100,22 +122,44 @@ func newEventConsumer(
}
}

// Passing a nil Pacer is effectively a noop Pacer if
// CPU control is disabled.
var pacer *admission.Pacer = nil
if enablePacer {
tenantID, ok := roachpb.TenantFromContext(ctx)
if !ok {
tenantID = roachpb.SystemTenantID
}
pacer = cfg.AdmissionPacerFactory.NewPacer(
pacerRequestUnit,
admission.WorkInfo{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This workInfo struct should probably have some sort of a tag (an op-string or whatnot) identifying the queue. I know that at this point, pacer does not export per queue stats, but it probably should.
A todo would be fine (in the admission code)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm worried about what would happen if you tag a WorkInfo with one queue's info, but send it to another. Also, maybe queues should collect stats instead of the Pacer type, because they are underlying mechanism of the `Pacer type.

It may make more sense to tag WorkInfo with the type/origin of the work and have queues collect metrics about the work coming in. We can group queues by their granter. For example, collect stats about cdc work being granted CPU time. @sumeerbhola, would you mind sharing your opinion about this?

TenantID: tenantID,
Priority: admissionpb.BulkNormalPri,
CreateTime: timeutil.Now().UnixNano(),
BypassAdmission: false,
},
)
}

return newKVEventToRowConsumer(ctx, cfg, evalCtx, frontier, cursor, s,
encoder, details, expr, knobs, topicNamer)
encoder, details, expr, knobs, topicNamer, pacer)
}

// TODO (jayshrivastava) enable parallel consumers for sinkless changefeeds
numWorkers := changefeedbase.EventConsumerWorkers.Get(&cfg.Settings.SV)
if numWorkers == 0 {
// Pick a reasonable default.
numWorkers = defaultNumWorkers()
}

// The descriptions for event_consumer_worker settings should also be updated
// when these TODOs are completed.
//
// TODO (ganeshb) Add support for parallel encoding when using parquet.
// We cannot have a separate encoder and sink for parquet format (see
// parquet_sink_cloudstorage.go). Because of this the current nprox solution
// does not work for parquet format.
//
//TODO (ganeshb) Add support for parallel encoding
// TODO (jayshrivastava) enable parallel consumers for sinkless changefeeds.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you remember why this was disabled in the first place?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I replied in my latest comment

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having hard time finding it, I'm afraid.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remember doing a small fix for this, but we dropped it because the nprocs PR was big enough already and core changefeeds are still in beta. The problem is changeAggregator.Next() calls ConsumeEvent and immediately expects some data to be synchronously placed in the output buffer afterwards. Thus, async encoding/emitting doesn't work.

if numWorkers <= 1 || isSinkless || encodingOpts.Format == changefeedbase.OptFormatParquet {
c, err := makeConsumer(sink, spanFrontier)
if err != nil {
Expand Down Expand Up @@ -174,6 +218,7 @@ func newKVEventToRowConsumer(
expr execinfrapb.Expression,
knobs TestingKnobs,
topicNamer *TopicNamer,
pacer *admission.Pacer,
) (*kvEventToRowConsumer, error) {
includeVirtual := details.Opts.IncludeVirtual()
keyOnly := details.Opts.KeyOnly()
Expand Down Expand Up @@ -215,6 +260,7 @@ func newKVEventToRowConsumer(
evaluator: evaluator,
safeExpr: safeExpr,
encodingFormat: encodingOpts.Format,
pacer: pacer,
}, nil
}

Expand Down Expand Up @@ -242,6 +288,15 @@ func (c *kvEventToRowConsumer) ConsumeEvent(ctx context.Context, ev kvevent.Even
return errors.AssertionFailedf("expected kv ev, got %v", ev.Type())
}

// Request CPU time to use for event consumption, block if this time is
// unavailable. If there is unused CPU time left from the last call to
// Pace, then use that time instead of blocking.
if err := c.pacer.Pace(ctx); err != nil {
if pacerLogEvery.ShouldLog() {
log.Errorf(ctx, "automatic pacing: %v", err)
}
}

schemaTimestamp := ev.KV().Value.Timestamp
prevSchemaTimestamp := schemaTimestamp
mvccTimestamp := ev.MVCCTimestamp()
Expand Down Expand Up @@ -382,6 +437,7 @@ func (c *kvEventToRowConsumer) ConsumeEvent(ctx context.Context, ev kvevent.Even
// Close is a noop for the kvEventToRowConsumer because it
// has no goroutines in flight.
func (c *kvEventToRowConsumer) Close() error {
c.pacer.Close()
return nil
}

Expand Down Expand Up @@ -527,6 +583,12 @@ func (c *parallelEventConsumer) startWorkers() error {
func (c *parallelEventConsumer) workerLoop(
ctx context.Context, consumer eventConsumer, id int64,
) error {
defer func() {
err := consumer.Close()
if err != nil {
log.Errorf(ctx, "closing consumer: %v", err)
}
}()
for {
select {
case <-ctx.Done():
Expand Down
18 changes: 10 additions & 8 deletions pkg/cmd/roachtest/tests/admission_control_elastic_cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,21 +114,23 @@ func registerElasticControlForCDC(r registry.Registry) {
time.Sleep(stopFeedsDuration) // buffer for cancellations to take effect/show up in metrics

t.Status(fmt.Sprintf("during: round %d: creating %d changefeeds (<%s)", i, changefeeds, time.Minute))

for j := 0; j < changefeeds; j++ {
stmtWithCursor := fmt.Sprintf(`
var createChangefeedStmt string
if i%2 == 0 {
createChangefeedStmt = fmt.Sprintf(`
CREATE CHANGEFEED FOR tpcc.order_line, tpcc.stock, tpcc.customer
INTO 'null://' WITH cursor = '-%ds'
`, int64(float64(i+1)*padDuration.Seconds())) // scanning as far back as possible (~ when the workload started)
if _, err := db.ExecContext(ctx, stmtWithCursor); err != nil {
} else {
createChangefeedStmt = "CREATE CHANGEFEED FOR tpcc.order_line, tpcc.stock, tpcc.customer " +
"INTO 'null://' WITH initial_scan = 'only'"
}

if _, err := db.ExecContext(ctx, createChangefeedStmt); err != nil {
return err
}
}

// TODO(irfansharif): Add a version of this test
// with initial_scan = 'only' to demonstrate the
// need+efficacy of using elastic CPU control in
// changefeed workers. That too has a severe effect
// on scheduling latencies.
}
return nil
})
Expand Down
Loading