Skip to content

Commit

Permalink
kvcoord: Eliminate 1 Go routine from MuxRangeFeed
Browse files Browse the repository at this point in the history
Prior to this PR, the server side `MuxRangeFeed`
implementation spawned a separate Go routine executing
single RangeFeed for each incoming request.

This is wasteful and unnecessary.
Instead of blocking, and waiting for a single RangeFeed to complete,
have rangefeed related functions return a promise to return
a `*roachpb.Error` once rangefeed completes (`future.Future[*roachpb.Error]`).

Prior to this change MuxRangeFeed would spin up 4 Go routines
per range.  With this PR, the number is down to 3.
This improvement is particularly important when executing
rangefeed against large tables (10s-100s of thousands of ranges).

Informs #96395
Epic: None

Release note (enterprise change): Changefeeds running with
`changefeed.mux_rangefeed.enabled` setting set to true are
more efficient, particularly when executing against large tables.
  • Loading branch information
Yevgeniy Miretskiy committed Feb 8, 2023
1 parent 06972d0 commit 5aff403
Show file tree
Hide file tree
Showing 13 changed files with 218 additions and 181 deletions.
1 change: 1 addition & 0 deletions pkg/kv/kvserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ go_library(
"//pkg/util/encoding",
"//pkg/util/envutil",
"//pkg/util/errorutil",
"//pkg/util/future",
"//pkg/util/grpcutil",
"//pkg/util/grunning",
"//pkg/util/hlc",
Expand Down
34 changes: 25 additions & 9 deletions pkg/kv/kvserver/client_replica_circuit_breaker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,10 +396,6 @@ func TestReplicaCircuitBreaker_Liveness_QuorumLoss(t *testing.T) {

type dummyStream struct {
name string
t interface {
Helper()
Logf(string, ...interface{})
}
ctx context.Context
recv chan *roachpb.RangeFeedEvent
}
Expand All @@ -410,9 +406,9 @@ func (s *dummyStream) Context() context.Context {

func (s *dummyStream) Send(ev *roachpb.RangeFeedEvent) error {
if ev.Val == nil && ev.Error == nil {
s.t.Logf("%s: ignoring event: %v", s.name, ev)
return nil
}

select {
case <-s.ctx.Done():
return s.ctx.Err()
Expand All @@ -421,6 +417,26 @@ func (s *dummyStream) Send(ev *roachpb.RangeFeedEvent) error {
}
}

func waitReplicaRangeFeed(
ctx context.Context,
r *kvserver.Replica,
req *roachpb.RangeFeedRequest,
stream roachpb.RangeFeedEventSink,
) error {
f := r.RangeFeedPromise(req, stream, nil /* pacer */)
defer f.Join()
select {
case <-ctx.Done():
return ctx.Err()
case <-f.Done():
pErr, err := f.Get()
if err != nil {
return err
}
return pErr.GoError()
}
}

// This test verifies that RangeFeed bypasses the circuit breaker. When the
// breaker is tripped, existing RangeFeeds remain in place and new ones can be
// started. When the breaker untrips, these feeds can make progress. The test
Expand All @@ -440,9 +456,9 @@ func TestReplicaCircuitBreaker_RangeFeed(t *testing.T) {

ctx, cancel := context.WithCancel(ctx)
defer cancel()
stream1 := &dummyStream{t: t, ctx: ctx, name: "rangefeed1", recv: make(chan *roachpb.RangeFeedEvent)}
stream1 := &dummyStream{ctx: ctx, name: "rangefeed1", recv: make(chan *roachpb.RangeFeedEvent)}
require.NoError(t, tc.Stopper().RunAsyncTask(ctx, "stream1", func(ctx context.Context) {
err := tc.repls[0].RangeFeed(args, stream1, nil /* pacer */).GoError()
err := waitReplicaRangeFeed(ctx, tc.repls[0].Replica, args, stream1)
if ctx.Err() != nil {
return // main goroutine stopping
}
Expand Down Expand Up @@ -494,9 +510,9 @@ func TestReplicaCircuitBreaker_RangeFeed(t *testing.T) {

// Start another stream during the "outage" to make sure it isn't rejected by
// the breaker.
stream2 := &dummyStream{t: t, ctx: ctx, name: "rangefeed2", recv: make(chan *roachpb.RangeFeedEvent)}
stream2 := &dummyStream{ctx: ctx, name: "rangefeed2", recv: make(chan *roachpb.RangeFeedEvent)}
require.NoError(t, tc.Stopper().RunAsyncTask(ctx, "stream2", func(ctx context.Context) {
err := tc.repls[0].RangeFeed(args, stream2, nil /* pacer */).GoError()
err := waitReplicaRangeFeed(ctx, tc.repls[0].Replica, args, stream2)
if ctx.Err() != nil {
return // main goroutine stopping
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/rangefeed/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ go_library(
"//pkg/util/admission",
"//pkg/util/bufalloc",
"//pkg/util/envutil",
"//pkg/util/future",
"//pkg/util/hlc",
"//pkg/util/interval",
"//pkg/util/log",
Expand Down Expand Up @@ -64,6 +65,7 @@ go_test(
"//pkg/testutils/skip",
"//pkg/util",
"//pkg/util/encoding",
"//pkg/util/future",
"//pkg/util/hlc",
"//pkg/util/leaktest",
"//pkg/util/log",
Expand Down
10 changes: 5 additions & 5 deletions pkg/kv/kvserver/rangefeed/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/future"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/stop"
Expand Down Expand Up @@ -473,23 +474,22 @@ func (p *Processor) Register(
catchUpIterConstructor CatchUpIteratorConstructor,
withDiff bool,
stream Stream,
errC chan<- *roachpb.Error,
) (bool, *Filter) {
) (bool, *Filter, future.Future[*roachpb.Error]) {
// Synchronize the event channel so that this registration doesn't see any
// events that were consumed before this registration was called. Instead,
// it should see these events during its catch up scan.
p.syncEventC()

r := newRegistration(
span.AsRawSpanWithNoLocals(), startTS, catchUpIterConstructor, withDiff,
p.Config.EventChanCap, p.Metrics, stream, errC,
p.Config.EventChanCap, p.Metrics, stream,
)
select {
case p.regC <- r:
// Wait for response.
return true, <-p.filterResC
return true, <-p.filterResC, r.done
case <-p.stoppedC:
return false, nil
return false, nil, r.done
}
}

Expand Down
Loading

0 comments on commit 5aff403

Please sign in to comment.