Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cdc: add elastic CPU control to CDC event processing #91554

Merged
merged 3 commits into from
Nov 29, 2022

Conversation

jayshrivastava
Copy link
Contributor

@jayshrivastava jayshrivastava commented Nov 8, 2022

admission: make Pacer type available in SQL server config

Currently, the Pacer type is only used within KV, but will be used by SQL
in future changes. For example, code for encoding/decoding CDC events resides
in distSQL and is CPU intensive, so there is a plan to integrate admission
control to it in (#90089).
This change makes the Pacer type available to the SQL layer via the
execinfra.ServerConfig.

Because the Pacer was previously only used by KV, it lived in the kvadmission
package. Since this change makes it available outside of KV, it is moved to
the admission package.

Furthermore, this change adds a new method,
ElasticCPUGrantCoordinator.NewPacer, to instantiate new Pacer structs.
Since the ElasticCPUGrantCoordinator implements several features not relevant
to SQL, this change passes the coordinator to the SQL server config as
the interface PacerMaker, which makes only the NewPacer method accessible.

Currently tenant servers do not create grant coordinators for admission
control. This change retains that behavior, except it passes a nil
ElasticCPUGrandCoordinator which creates nil/noop Pacers. Adding these
coordinators to tenant servers is a change outside the scope of this commit and
is left as a TODO.

Release note: None

cdc: add elastic CPU control to CDC event processing

Previously, the CPU-bound work of CDC event processing (encoding /
decoding rows) had the potential to consume a lot of CPU and
disrupt foreground SQL traffic. This changes adds elastic CPU control
to event processing so that it does not use excessive CPU and
starve foreground traffic.

This change also adds two new, non-public cluster settings, which control
enabling/disabling CPU control for CDC event processing and controlling
the requested grant size measured in CPU time.

Fixes: #90089

Release note: None

roachtest: add initial scan only case to elastic cdc

Previously, this roachtest would not test changefeeds running with
initial_scan_only. This option tends to have a significant impact on
foreground latency due to high CPU usage, thus it should be included
in this test which measures CPU usage and foreground latency while
changefeeds are running.

Release note: None

@cockroach-teamcity
Copy link
Member

This change is Reviewable

@jayshrivastava
Copy link
Contributor Author

Before

  • value Encoding took 18.89% of CPU time
  • CPU was near 100% utilized
  • SQL service latency well over 100ms, peaking above 1s

image (2)

image (4)

After

  • value Encoding took 8.27% of CPU time
  • CPU at ~70% utilization
  • SQL service latency well under 100ms

image (3)

image

Copy link
Contributor Author

@jayshrivastava jayshrivastava left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained


pkg/ccl/changefeedccl/changefeedbase/settings.go line 270 at r1 (raw file):

		"before consuming events. after fully utilizing this CPU time, it will"+
		"request more",
	20*time.Millisecond,

I'm unsure what a reasonable default here would be. Event encoding / decoding time depends on the schema of the row. We also want don't want Pace to wait for quota on every loop iteration.

@jayshrivastava jayshrivastava marked this pull request as ready for review November 9, 2022 14:11
@jayshrivastava jayshrivastava requested review from a team as code owners November 9, 2022 14:11
Copy link
Contributor

@andrewbaptist andrewbaptist left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @jayshrivastava and @miretskiy)


-- commits line 5 at r1:
nit: typo "This change makes the controller"


pkg/ccl/changefeedccl/event_processing.go line 604 at r1 (raw file):

				return c.setWorkerError(err)
			}
			c.decInFlight()

Minor: I'm assuming pacer.Pace and consumer.ConsumeEvent don't normally return an err, however, if they do, it would be better to call c.decInFlight() before returning. Otherwise, the count will never decrease.


pkg/ccl/changefeedccl/changefeedbase/settings.go line 245 at r1 (raw file):

	"changefeed.event_consumer_workers",
	"the number of workers to use when processing events: <0 disables, "+
		"0 assigns a reasonable default, >0 assigns the setting value. for expirimental/core "+

nit: expirimental -> experimental


pkg/ccl/changefeedccl/changefeedbase/settings.go line 254 at r1 (raw file):

	settings.TenantWritable,
	"changefeed.event_consumer_worker_queue_size",
	"if changefeed.event_consumer_workers is enabled, this setting sets the maxmimum number of events"+

nit same issue here - add space between events and " -> of events "


pkg/ccl/changefeedccl/changefeedbase/settings.go line 267 at r1 (raw file):

	settings.TenantWritable,
	"changefeed.event_consumer_pacer_request_size",
	"an event consumer worker will perform a blocking request for CPU time"+

nit CPU time" -> CPU time " - add space after time see generated docs it becomes "timebefore". Also on the next line after will"


pkg/ccl/changefeedccl/changefeedbase/settings.go line 270 at r1 (raw file):

Previously, jayshrivastava (Jayant Shrivastava) wrote…

I'm unsure what a reasonable default here would be. Event encoding / decoding time depends on the schema of the row. We also want don't want Pace to wait for quota on every loop iteration.

minor: 9ms is likely a better default (assuming testing shows it works well). That is just under the length of the goroutine preemption time so on a busy system (which is when the Pacer will introduce pauses) it will cooperatively give up the CPU rather than waiting for preemption.

@irfansharif irfansharif requested a review from a team November 9, 2022 15:19
@miretskiy
Copy link
Contributor

I'm unsure what a reasonable default here would be. Event encoding / decoding time depends on the schema of the row. We also want don't want Pace to wait for quota on every loop iteration.

I suggest that we run tpcc or even better, tpc-e. elastic limiter already keeps track of few metrics (requests/returned); what would be nice is to add a histogram metric to keep track of how much real time was used. Alternatively, add a metric to changefeed code to capture amount of time spent performing encoding + emit into sink (since we may do compression). At any rate, whatever metric is used -- take 95th percentile as the configuration parameter.

// milliseconds of CPU time.
var EventConsumerPacerRequestSize = settings.RegisterDurationSetting(
settings.TenantWritable,
"changefeed.event_consumer_pacer_request_size",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: wdyt about changefeed.cpu.per_event_allocation or something similar?

// We cannot have a separate encoder and sink for parquet format (see
// parquet_sink_cloudstorage.go). Because of this the current nprox solution
// does not work for parquet format.
//
//TODO (ganeshb) Add support for parallel encoding
// TODO (jayshrivastava) enable parallel consumers for sinkless changefeeds.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you remember why this was disabled in the first place?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I replied in my latest comment

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having hard time finding it, I'm afraid.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remember doing a small fix for this, but we dropped it because the nprocs PR was big enough already and core changefeeds are still in beta. The problem is changeAggregator.Next() calls ConsumeEvent and immediately expects some data to be synchronously placed in the output buffer afterwards. Thus, async encoding/emitting doesn't work.

func makePacer(ac kvadmission.Controller, ru time.Duration) wrappedPacer {
pacer := wrappedPacer{}

if ac != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this a test only thing(that ac is nil)?

//
// TODO(jayant): remove wrappedPacer and makePacer once tenant servers come
// bootstrapped with admission control. call newPacer with the the correct tenantID.
func makePacer(ac kvadmission.Controller, ru time.Duration) wrappedPacer {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if controller itself could have a "NewPacer" method? It can even return a no-op pacer.

// Request CPU time to use for event consumption, block if this time is
// unavailable. If there is unused CPU time left from the last call to
// Pace, then use that time instead of blocking.
if err := pacer.Pace(ctx); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@irfansharif should review this usage -- if it's sane, etc (it does look okay to me).

Copy link
Contributor

@irfansharif irfansharif left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The results look great! Few minor notes, I'll leave the rubber stamping to others.

  • Can we address the TODO here as part of this patch? Maybe every time iters % 2 == 0 run the changefeed with initial_scan = 'only' instead.

    // TODO(irfansharif): Add a version of this test
    // with initial_scan = 'only' to demonstrate the
    // need+efficacy of using elastic CPU control in
    // changefeed workers. That too has a severe effect
    // on scheduling latencies.

  • kvadmission is supposed to only house admission control utilities for the KV package. It's a thin layer between pkg/kv/kvserver and pkg/admission. The only reason the Pacer type was found there was because the only use of it was under pkg/kv but that's no longer the case. The only reason kvadmission is depended on in pkg/server is because it's an injected dependency. Lets improve it all by:

    • Moving kvadmission.Pacer into pkg/util/admission.
    • The way to construct a pacer should be .NewPacer(unit time.Duration, wi admission.WorkInfo) hanging off the ElasticCPUGrantCoordinator type. You should have access to this type here.
      gcoords.Elastic,
    • What should we actually hang off of type sqlServerArgs struct? kvadmission.Controller is too much, it's doing too many things that are irrelevant to SQL. We just want the ability to construct a pacer. Feel free to define a narrow type admission.PacerConstructor interface with just the NewPacer signature above that the ElasticCPUGrantCoordinator tacitly implements.
    • If we dropped the dependency on kvadmission.Controller, we wouldn't need special casing for the missing tenant integration. Here for example we have a link to some admission queue that also works in tenant code. We should be able to do similar plumbing for this elastic CPU controller as well:
      sqlSQLResponseAdmissionQ: gcoords.Regular.GetWorkQueue(admission.SQLSQLResponseWork),
  • Add a cluster setting to disable this specific form of pacing, the narrow equivalent of kvadmission.rangefeed_catchup_scan_elastic_control.enabled. Default it to true, but during incidents it's always good to have a kill-switch.

  • Do we need the wrappedPacer type at all? A nil Pacer is something you can call .Close and .Pace on without it doing anything -- try it! We use this property when the cluster setting above is disabled.

  • In the pacer error path, I would recommend a different fallback: just log semi-infrequently and continue processing. Failing to provide latency isolation is less bad some times than failing parallel workers altogether. See what we're doing here:

    if err := i.pacer.Pace(ctx); err != nil {

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @andrewbaptist, @irfansharif, @jayshrivastava, and @miretskiy)


pkg/ccl/changefeedccl/event_processing.go line 568 at r2 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

I wonder if controller itself could have a "NewPacer" method? It can even return a no-op pacer.

+1 to NewPacer, perhaps as an interface, and not hanging off the kvadmission.Controller thing (see top-level comment). A nil pacer is still something you can call .Close or .Pace on BTW, see top-level comment. That's a "no-op" pacer.


pkg/ccl/changefeedccl/event_processing.go line 571 at r2 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

is this a test only thing(that ac is nil)?

We shouldn't need this, see top-level comment.


pkg/ccl/changefeedccl/event_processing.go line 597 at r2 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

@irfansharif should review this usage -- if it's sane, etc (it does look okay to me).

See comment above around error handling. I'll defer to you two. Could we call if err := pacer.Pace(ctx); err != nil { outside the switch block? Just in case we add new case statements here.


pkg/ccl/changefeedccl/changefeedbase/settings.go line 270 at r1 (raw file):

Previously, andrewbaptist (Andrew Baptist) wrote…

minor: 9ms is likely a better default (assuming testing shows it works well). That is just under the length of the goroutine preemption time so on a busy system (which is when the Pacer will introduce pauses) it will cooperatively give up the CPU rather than waiting for preemption.

I actually think 9ms is too little for this tight loop -- it's better to do as much work as possible without interacting with AC too often. I'm not sure I understand what the practical difference is between cooperatively giving up the CPU rather than waiting for pre-emption, we'll still get prempted. BTW Andrew, pre-emption in the Go scheduler could also just immediately put it on another inactive core, or not even pre-empt it if there's no waiting work on that core. There are some scheduler traces in #unified-overload-control to back me up.

I'll recommend something similar to what @miretskiy is saying: run an experiment with TPC-C, or TPC-E and see what the aggregate CPU-time from this tight loop typically is (I would just log.Infof it -- quicker than a histogram). I suspect it's 100ms+. Pick something like 100ms or 50ms and call it a day. We can always re-evaluate defaults if need be. I also don't think this upfront estimate matters much in practice because we correct for this error term post-hoc.

@jayshrivastava jayshrivastava requested a review from a team November 10, 2022 18:58
@jayshrivastava jayshrivastava requested review from a team as code owners November 10, 2022 18:58
@blathers-crl blathers-crl bot requested a review from irfansharif November 10, 2022 18:58
@jayshrivastava jayshrivastava force-pushed the elastic-nprocs branch 2 times, most recently from 58b9ab3 to eb3e4d8 Compare November 10, 2022 21:51
Copy link
Collaborator

@sumeerbhola sumeerbhola left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed 10 of 17 files at r3, 1 of 3 files at r4, 3 of 7 files at r5, 5 of 5 files at r6.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @irfansharif, @jayshrivastava, and @miretskiy)


pkg/ccl/changefeedccl/changefeedbase/settings.go line 270 at r1 (raw file):

Previously, irfansharif (irfan sharif) wrote…

I actually think 9ms is too little for this tight loop -- it's better to do as much work as possible without interacting with AC too often. I'm not sure I understand what the practical difference is between cooperatively giving up the CPU rather than waiting for pre-emption, we'll still get prempted. BTW Andrew, pre-emption in the Go scheduler could also just immediately put it on another inactive core, or not even pre-empt it if there's no waiting work on that core. There are some scheduler traces in #unified-overload-control to back me up.

I'll recommend something similar to what @miretskiy is saying: run an experiment with TPC-C, or TPC-E and see what the aggregate CPU-time from this tight loop typically is (I would just log.Infof it -- quicker than a histogram). I suspect it's 100ms+. Pick something like 100ms or 50ms and call it a day. We can always re-evaluate defaults if need be. I also don't think this upfront estimate matters much in practice because we correct for this error term post-hoc.

+! to making this 50+ms, and deciding based on measurement, since interacting with AC is not super cheap.

@jayshrivastava jayshrivastava force-pushed the elastic-nprocs branch 5 times, most recently from 1370310 to 18d5769 Compare November 11, 2022 18:48
Copy link
Contributor Author

@jayshrivastava jayshrivastava left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressing Irfan's Comments:

  • Addressing the TODO

  • kvadmission / admission package improvements

    • I moved the Pacer to it's own file in the admission package
    • I added ElasticCPUGrantCoordinator.NewPacer
    • I added the new PacerMaker interface to hide some of the ElasticCPUGrantCoordinators functionality
    • Unfortunately, the example you gave with the sqlSQLResponseAdmissionQ seems to be a noop with tenant servers. It only gets initialized for non-tenant servers in pkg/server/server.go and remains uninitialized in pkg/server/tenant.go. Every time it's used (such as below), it's wrapped with a nil check. I think it's best to keep this functionality until we initialize grant coordinators for tenant servers, which can be done in a separate patch. I left a TODO for this.
      if i.admissionQ != nil {
      if _, err := i.admissionQ.Admit(i.Ctx, i.admissionInfo); err != nil {
      // err includes the case of context cancellation while waiting for
      // admission.
      colexecerror.ExpectedError(err)
      }
      }
      if admissionQ != nil {
      if _, err := admissionQ.Admit(ctx, flowBase.admissionInfo); err != nil {
      return processMessageResult{err: err, consumerClosed: false}
      }
      }
  • Kill switch

    • I updated the existing setting to disable pacing if the value is set to 0
  • Wrapped Pacer

    • I removed this type. You're right - the nil pacer can be used as a noop pacer. The scheme I implemented is that tenant servers will be bootstrapped with a nil ElasticCPUGrantCoordinator, whose NewPacer method returns a nil pacer. Then this pacer is used in CDC event processing. CDC event processing is not aware if the pacer is nil or not. Whenever we decide to initialize a coordinator for tenant servers, the CDC code won't require changes.
  • Pacer Errors

    • I updated the code to log Pacer errors infrequently instead of returning them

@miretskiy, do we still want metrics for time spent encoding? You can see my comment below regarding 50ms. I did a small experiment with logging.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @irfansharif, @miretskiy, and @sumeerbhola)


pkg/ccl/changefeedccl/event_processing.go line 604 at r1 (raw file):

Previously, andrewbaptist (Andrew Baptist) wrote…

Minor: I'm assuming pacer.Pace and consumer.ConsumeEvent don't normally return an err, however, if they do, it would be better to call c.decInFlight() before returning. Otherwise, the count will never decrease.

I tried doing this and some tests deadlocked. Callingc.decInFlight() less often is definitely the more cautious approach. If there's a terminal error, the number of in flight events doesn't matter much.


pkg/ccl/changefeedccl/event_processing.go line 119 at r2 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

make this todo more specific -- it's about parquet.

Done.


pkg/ccl/changefeedccl/event_processing.go line 124 at r2 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

Do you remember why this was disabled in the first place?

This I remember doing a small fix for this, but we dropped it because the nprocs PR was big enough already and core changefeeds are still in beta. The problem is changeAggregator.Next() calls ConsumeEvent and immediately expects some data to be synchronously placed in the output buffer afterwards. Thus, async encoding/emitting doesn't work.


pkg/ccl/changefeedccl/event_processing.go line 125 at r2 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

I believe you've updated settings description already?

Yes.


pkg/ccl/changefeedccl/event_processing.go line 568 at r2 (raw file):

Previously, irfansharif (irfan sharif) wrote…

+1 to NewPacer, perhaps as an interface, and not hanging off the kvadmission.Controller thing (see top-level comment). A nil pacer is still something you can call .Close or .Pace on BTW, see top-level comment. That's a "no-op" pacer.

I refactored this quite a bit. We can have nil PacerMaker and nil Pacer, both of which basically operate as noops for tenant servers. More info in my top-level reply.


pkg/ccl/changefeedccl/event_processing.go line 573 at r2 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

this looks strange -- aren't you going to immediately close pacer?

Yes. That was a mistake. It's fixed now.


pkg/ccl/changefeedccl/event_processing.go line 597 at r2 (raw file):

Previously, irfansharif (irfan sharif) wrote…

See comment above around error handling. I'll defer to you two. Could we call if err := pacer.Pace(ctx); err != nil { outside the switch block? Just in case we add new case statements here.

Done.


pkg/ccl/changefeedccl/changefeedbase/settings.go line 270 at r1 (raw file):

Previously, sumeerbhola wrote…

+! to making this 50+ms, and deciding based on measurement, since interacting with AC is not super cheap.

So I did some testing as @irfansharif mentioned. I ran TPCC w/ 1000 warehouses with an initial scan on tpcc.order_line. Also tried a changefeed with the UPDATED, DIFF, RESOLVED options. The average CPU time per call to ConsumeEvent (measured with the grunning pkg) was 17391.80ns and 21452.73ns respectively. I think this measurement (and even a p95 measurement) depends a lot on the schema of the table and size of each row, so we should take it with a grain of salt.

I was a bit confused about what you meant by aggregate CPU time. The cumulative CPU time would just be a function of how many events we need to process, which does not seem like a useful measurement. I measured this to be 219990339846ns and 274068743360ns respectively... Large numbers because we ran a changefeed on a large table.

I'm also confused about why we ran the experiment with TPCC. I feel that the CPU time spent encoding events would be the same regardless of an external workload running. Unless we include context switching overhead in the requested grant size for some reason...

Nevertheless, it sounds like everyone's okay with 50ms. I updated the setting to default to 50ms. It means we can process ~2500 events before requesting another grant from AC (well, events coming from order_line table since each row takes ~20000ns). At the end of the day, it's a default setting and it can be changed for a particular table/schema.

I also sanity checked that the graphs look similar with 50ms.
image.png


pkg/ccl/changefeedccl/changefeedbase/settings.go line 266 at r2 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

nit: wdyt about changefeed.cpu.per_event_allocation or something similar?

I can do changefeed.cpu.per_event_consumer_worker_allocation. I don't want to use per_event since we will encode many events during that time.


pkg/ccl/changefeedccl/changefeedbase/settings.go line 272 at r2 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

I'm okay keeping this setting; I'm not sure we need it withPublic -- it's very low level.

Sure. I removed withPublic.

@jayshrivastava jayshrivastava force-pushed the elastic-nprocs branch 2 times, most recently from c61a7c0 to d625d49 Compare November 11, 2022 21:55
Copy link
Contributor

@miretskiy miretskiy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cdc PR needs to be updated to no longer mention server/sql as the type of the change.

// We cannot have a separate encoder and sink for parquet format (see
// parquet_sink_cloudstorage.go). Because of this the current nprox solution
// does not work for parquet format.
//
//TODO (ganeshb) Add support for parallel encoding
// TODO (jayshrivastava) enable parallel consumers for sinkless changefeeds.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having hard time finding it, I'm afraid.

}
pacer = cfg.PacerMaker.NewPacer(
pacerRequestUnit,
admission.WorkInfo{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This workInfo struct should probably have some sort of a tag (an op-string or whatnot) identifying the queue. I know that at this point, pacer does not export per queue stats, but it probably should.
A todo would be fine (in the admission code)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm worried about what would happen if you tag a WorkInfo with one queue's info, but send it to another. Also, maybe queues should collect stats instead of the Pacer type, because they are underlying mechanism of the `Pacer type.

It may make more sense to tag WorkInfo with the type/origin of the work and have queues collect metrics about the work coming in. We can group queues by their granter. For example, collect stats about cdc work being granted CPU time. @sumeerbhola, would you mind sharing your opinion about this?

@jayshrivastava jayshrivastava changed the title server/cdc: add elastic CPU control to CDC event processing cdc: add elastic CPU control to CDC event processing Nov 14, 2022
Copy link
Collaborator

@sumeerbhola sumeerbhola left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @irfansharif, @jayshrivastava, and @miretskiy)


pkg/ccl/changefeedccl/changefeedbase/settings.go line 270 at r1 (raw file):

Previously, jayshrivastava (Jayant Shrivastava) wrote…

So I did some testing as @irfansharif mentioned. I ran TPCC w/ 1000 warehouses with an initial scan on tpcc.order_line. Also tried a changefeed with the UPDATED, DIFF, RESOLVED options. The average CPU time per call to ConsumeEvent (measured with the grunning pkg) was 17391.80ns and 21452.73ns respectively. I think this measurement (and even a p95 measurement) depends a lot on the schema of the table and size of each row, so we should take it with a grain of salt.

I was a bit confused about what you meant by aggregate CPU time. The cumulative CPU time would just be a function of how many events we need to process, which does not seem like a useful measurement. I measured this to be 219990339846ns and 274068743360ns respectively... Large numbers because we ran a changefeed on a large table.

I'm also confused about why we ran the experiment with TPCC. I feel that the CPU time spent encoding events would be the same regardless of an external workload running. Unless we include context switching overhead in the requested grant size for some reason...

Nevertheless, it sounds like everyone's okay with 50ms. I updated the setting to default to 50ms. It means we can process ~2500 events before requesting another grant from AC (well, events coming from order_line table since each row takes ~20000ns). At the end of the day, it's a default setting and it can be changed for a particular table/schema.

I also sanity checked that the graphs look similar with 50ms.
image.png

Regarding "aggregate CPU-time", I'll give my take on it and @irfansharif may possibly differ.

Normally, the high CPU work that integrates with admission control takes the form of a request that is evaluated by doing a tight CPU intensive loop e.g. KV BatchRequest processing where the request is an ExportRequest. We can measure the goroutine running time of these, and have found that they frequently consume > 200ms for a single request. The admission.ElasticCPUWorkHandle was used to make these requests stop doing more after 100ms and return (with a resumption point that the caller would use to send the next request). The CatchupIterator.CatchUpScan is different in that it is not a request-response pattern, but one can measure the CPU time spent in that method since it loops and calls outputFn. The Pacer (that wraps the elastic handle and queue) is used to seek re-admission after every 100ms of CPU work.
So my notion of "aggregate CPU-time" for the initial scan case in this PR was similar to those: how much total CPU time do we spend in the goroutine that is consuming these events produced via the concurrent scans and calling kvEventToRowConsumer.ConsumeEvent. The effectiveness of using a 50ms granularity for AC reduces if the CPU time spent in this goroutine is typically less than 50ms.

An underlying assumption is that because of the concurrent scans this goroutine is typically not blocked in waiting state for an event -- if it was frequently blocked because the scans were not feeding it enough work, its latency impact on others would be inherently lower (in that it would not be competing as aggressively for CPU in the goroutine scheduler).

@jayshrivastava jayshrivastava force-pushed the elastic-nprocs branch 2 times, most recently from afb66c4 to d3bd357 Compare November 14, 2022 16:30
Copy link
Contributor Author

@jayshrivastava jayshrivastava left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @irfansharif, @miretskiy, and @sumeerbhola)


pkg/ccl/changefeedccl/changefeedbase/settings.go line 270 at r1 (raw file):

Previously, sumeerbhola wrote…

Regarding "aggregate CPU-time", I'll give my take on it and @irfansharif may possibly differ.

Normally, the high CPU work that integrates with admission control takes the form of a request that is evaluated by doing a tight CPU intensive loop e.g. KV BatchRequest processing where the request is an ExportRequest. We can measure the goroutine running time of these, and have found that they frequently consume > 200ms for a single request. The admission.ElasticCPUWorkHandle was used to make these requests stop doing more after 100ms and return (with a resumption point that the caller would use to send the next request). The CatchupIterator.CatchUpScan is different in that it is not a request-response pattern, but one can measure the CPU time spent in that method since it loops and calls outputFn. The Pacer (that wraps the elastic handle and queue) is used to seek re-admission after every 100ms of CPU work.
So my notion of "aggregate CPU-time" for the initial scan case in this PR was similar to those: how much total CPU time do we spend in the goroutine that is consuming these events produced via the concurrent scans and calling kvEventToRowConsumer.ConsumeEvent. The effectiveness of using a 50ms granularity for AC reduces if the CPU time spent in this goroutine is typically less than 50ms.

An underlying assumption is that because of the concurrent scans this goroutine is typically not blocked in waiting state for an event -- if it was frequently blocked because the scans were not feeding it enough work, its latency impact on others would be inherently lower (in that it would not be competing as aggressively for CPU in the goroutine scheduler).

Thanks for the context! Fortunately, in this scenario, we have goroutines which run for much longer than 50ms, and are typically not blocked.

@miretskiy
Copy link
Contributor

Thanks for the context! Fortunately, in this scenario, we have goroutines which run for much longer than 50ms, and are typically not blocked.

True that they run for longer than 50ms -- they run forever, hopefully, -- but they certainly can get blocked (i.e. trying to emit to the sink -- which is blocked). I don't think this changes your answer though, does it?

Copy link
Contributor

@miretskiy miretskiy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Giving 👍 since I'm going to be leaving soon on vaca.
You still need to get +1 from Irfan/Sumeer.

@jayshrivastava
Copy link
Contributor Author

@miretskiy TYFR! I'll wait on another ✅ before merging. Also, you're right, the consumers get blocked on emitting. I meant to say that they don't get blocked waiting for events to come through from kv scans, as Sumeer mentioned.

@sumeerbhola
Copy link
Collaborator

The initial scan also finishes at some point, yes? My mental model is clearly very fuzzy.

@miretskiy
Copy link
Contributor

miretskiy commented Nov 17, 2022 via email

@jayshrivastava
Copy link
Contributor Author

jayshrivastava commented Nov 17, 2022

I pushed another commit which adds the initial_scan_only case, to address @irfansharif's TODO. The results are good:

Before (latest master cockroach binary, my roachtest binary):
image (6)

After (my cockroach binary, my roachtest binary):
image (5)

Copy link
Contributor

@irfansharif irfansharif left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:lgtm_strong:, only left nits below. Thanks a ton for working through it.

I updated the existing setting to disable pacing if the value is set to 0

[mega nit] Mind just adding another setting instead? Something like kvadmission.rangefeed_catchup_scan_elastic_control.enabled as suggested above. It's easier to grep for .enabled when looking for cluster settings.

Reviewed 8 of 17 files at r3, 1 of 3 files at r4, 1 of 7 files at r5, 9 of 9 files at r7, 3 of 6 files at r8, 1 of 3 files at r9, 2 of 2 files at r10, 1 of 1 files at r11, all commit messages.
Reviewable status: :shipit: complete! 1 of 0 LGTMs obtained (waiting on @jayshrivastava, @miretskiy, and @sumeerbhola)


pkg/ccl/changefeedccl/event_processing.go line 134 at r9 (raw file):

Previously, jayshrivastava (Jayant Shrivastava) wrote…

I'm worried about what would happen if you tag a WorkInfo with one queue's info, but send it to another. Also, maybe queues should collect stats instead of the Pacer type, because they are underlying mechanism of the `Pacer type.

It may make more sense to tag WorkInfo with the type/origin of the work and have queues collect metrics about the work coming in. We can group queues by their granter. For example, collect stats about cdc work being granted CPU time. @sumeerbhola, would you mind sharing your opinion about this?

I doubt we'll export per "user-of-Pacer" type statistics, so this change as is is probably just fine for now/immediate future. It's pretty clear from metrics today which process is using these elastic CPU tokens since there are only a handful of uses (backups, changefeed parallel workers, catch up scans, additional pebble compression threads). Can revisit if it becomes useful to segment.


pkg/ccl/changefeedccl/event_processing.go line 127 at r10 (raw file):

		// Passing a nil Pacer is effectively a noop Pacer if
		// CPU control is disabled.
		var pacer *admission.Pacer = nil

[nit] The = nil is unnecessary.


pkg/ccl/changefeedccl/changefeedbase/settings.go line 270 at r1 (raw file):

Previously, jayshrivastava (Jayant Shrivastava) wrote…

Thanks for the context! Fortunately, in this scenario, we have goroutines which run for much longer than 50ms, and are typically not blocked.

This all sounds right to me. And also, great results!


pkg/cmd/roachtest/tests/admission_control_elastic_cdc.go line 127 at r11 (raw file):

								} else {
									createChangefeedStmt = "CREATE CHANGEFEED FOR tpcc.order_line, tpcc.stock, tpcc.customer " +
										"INTO 'null://' WITH initial_scan_only"

[nit] initial_scan = 'only' is the preferred form I think: #79324.


pkg/server/tenant.go line 880 at r7 (raw file):

	}

	// TODO(jayant): generate admission.NewGrantCoordinators and pass them

Can you name me in this TODO instead? And it should reference NewGrantCoordinatorSQL instead -- which like you've seen, is not hooked up. Also, [nit] the = nil is unnecessary


pkg/sql/execinfra/server_config.go line 196 at r7 (raw file):

	RangeStatsFetcher eval.RangeStatsFetcher

	PacerMaker admission.PacerMaker

Add a comment.


pkg/util/admission/grant_coordinator.go line 1020 at r7 (raw file):

// PacerMaker is used to construct a new admission.Pacer.
//
// PacerMaker interface should be used in the SQL layer in lieu of

Drop this paragraph, there's no need to talk about call sites which are ever changing. It's also talking a lot about the implementation which is distracting. Move this interface definition to admission.go which holds the other interfaces in this package. Call this PacerFactory instead (we have various "factory" interfaces elsewhere in CRDB) -- rename instances of "pacerMaker" to "admissionPacerFactory" when you do.


pkg/util/admission/grant_coordinator.go line 1024 at r7 (raw file):

// implements several features which are irrelevant to SQL.
type PacerMaker interface {
	// NewPacer constructs a new admission.Pacer, which may be nil.

Drop this comment, it's a single method interface and the top-level comment is enough. Also, there's no need to scare users off by mentioning "nil". All methods support the nil receiver so it should be irrelevant to users.


pkg/util/admission/grant_coordinator.go line 1028 at r7 (raw file):

}

// NewPacer implements the PacerMaker interface.

Add one of these interface assertions somewhere

var _ PacerFactory = &ElasticCPUGrantCoordinator{}

Currently, the Pacer type is only used within KV, but will be used by SQL
in future changes. For example, code for encoding/decoding CDC events resides
in distSQL and is CPU intensive, so there is a plan to integrate admission
control to it in (cockroachdb#90089).
This change makes the Pacer type available to the SQL layer via the
`execinfra.ServerConfig`.

Because the Pacer was previously only used by KV, it lived in the `kvadmission`
package. Since this change makes it available outside of KV, it is moved to
the `admission` package.

Furthermore, this change adds a new method,
`ElasticCPUGrantCoordinator.NewPacer`, to instantiate new Pacer structs.
Since the `ElasticCPUGrantCoordinator` implements several features not relevant
to SQL, this change passes the coordinator to the SQL server config as
the interface `PacerMaker`, which makes only the `NewPacer` method accessible.

Currently tenant servers do not create grant coordinators for admission
control. This change retains that behavior, except it passes a `nil
ElasticCPUGrandCoordinator` which creates `nil`/noop Pacers. Adding these
coordinators to tenant servers is a change outside the scope of this commit and
is left as a `TODO`.

Release note: None
Previously, the CPU-bound work of CDC event processing (encoding /
decoding rows) had the potential to consume a lot of CPU and
disrupt foreground SQL traffic. This changes adds elastic CPU control
to event processing so that it does not use excessive CPU and
starve foreground traffic.

This change also adds a new, non-public cluster setting, which controls
enabling/disabling CPU control for CDC event processing and controlling
the requested grant size measured in CPU time.

Fixes: cockroachdb#90089

Release note: None
Previously, this roachtest would not test changefeeds running with
`initial_scan_only`. This option tends to have a significant impact on
foreground latency due to high CPU usage, thus it should be included
in this test which measures CPU usage and foreground latency while
changefeeds are running.

Release note: None
Copy link
Contributor Author

@jayshrivastava jayshrivastava left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for reviewing!

Reviewable status: :shipit: complete! 1 of 0 LGTMs obtained (waiting on @andrewbaptist, @irfansharif, @miretskiy, and @sumeerbhola)


pkg/server/tenant.go line 880 at r7 (raw file):

Previously, irfansharif (irfan sharif) wrote…

Can you name me in this TODO instead? And it should reference NewGrantCoordinatorSQL instead -- which like you've seen, is not hooked up. Also, [nit] the = nil is unnecessary

Done. This nil is necessary though. Otherwise, the nil check in pacerFactory.NewPacer fails. See here

@jayshrivastava
Copy link
Contributor Author

bors r+

@craig craig bot merged commit edc6fda into cockroachdb:master Nov 29, 2022
@blathers-crl
Copy link

blathers-crl bot commented Nov 29, 2022

Encountered an error creating backports. Some common things that can go wrong:

  1. The backport branch might have already existed.
  2. There was a merge conflict.
  3. The backport branch contained merge commits.

You might need to create your backport manually using the backport tool.


error creating merge commit from 80a7ed9 to blathers/backport-release-22.2-91554: POST https://api.github.com/repos/cockroachdb/cockroach/merges: 409 Merge conflict []

you may need to manually resolve merge conflicts with the backport tool.

Backport to branch 22.2.x failed. See errors above.


🦉 Hoot! I am a Blathers, a bot for CockroachDB. My owner is otan.

@craig
Copy link
Contributor

craig bot commented Nov 29, 2022

Build succeeded:

@shermanCRL
Copy link
Contributor

We should default this on for v23.1 (master) and default it off for v22.2 backport.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

admission,changefeedccl: integrate parallel workers w/ elastic cpu control
7 participants