Skip to content

Commit

Permalink
Merge #96230
Browse files Browse the repository at this point in the history
96230: kvcoord:  Fix multiple MuxRangeFeed client bugs r=miretskiy a=miretskiy

This PR addresses multiple issues with MuxRangeFeed client
implementation, identified by running various CDC stress tests.

The first issue was incorrect context used when establishing
MuxRangeFeed with a node.  When calling `startMuxRangeFeed`, the caller
provided context was used to establish the RPC with a node.
However, this context was meant to be used only for a single range
rangefeed -- and not for a node level MuxRangeFeed RPC.  As a result,
as soon as the first rangefeed completed (perhaps due to an error, such
as range split), the entire node level MuxRangeFeed, along with all of
the ranges running on that node, would be canceled.

The second issue is a race between consumer termination (consumer is a
single range rangefeed), and the "demux" loop.  There were really two
races here.  First, when we return an error to the caller (e.g. range
split), the caller simply executes cleanup and returns.  The cleanup
removes the information about the stream from the mux rangefeed, which
produces an error.  Secondly, it is possible for the demux loop to
observe additional events even after receving event error.
This is because the rangefeed processor (server side) termination is asynchronous.
It is possible that when the server shuts down (i.e. cancels) output
loop due to an error (range split), the internal buffer may still have
some events (such as checkpoint).  It's a race whether or not such
event will be sent on the RPC stream.  If such event arrives,
demux loop will block forever because the caller already terminated.

The fixes in this PR were verified by observing non-flaky 
execution over 25000 runs of multiple CDC tests(TestChangefeedInitialScanOnly, and others).

Informs #95781

Release note: None

Co-authored-by: Yevgeniy Miretskiy <[email protected]>
  • Loading branch information
craig[bot] and Yevgeniy Miretskiy committed Feb 3, 2023
2 parents f148476 + 9b67f2f commit ca70b82
Show file tree
Hide file tree
Showing 7 changed files with 289 additions and 198 deletions.
26 changes: 13 additions & 13 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,12 +134,15 @@ func TestChangefeedReplanning(t *testing.T) {
DistSQL: &execinfra.TestingKnobs{
Changefeed: &TestingKnobs{
HandleDistChangefeedError: func(err error) error {
select {
case errChan <- err:
return err
default:
return nil
if errors.Is(err, sql.ErrPlanChanged) {
select {
case errChan <- err:
return err
default:
return nil
}
}
return nil
},
ShouldReplan: func(ctx context.Context, oldPlan, newPlan *sql.PhysicalPlan) bool {
select {
Expand Down Expand Up @@ -1018,13 +1021,11 @@ func TestChangefeedInitialScan(t *testing.T) {
for testName, changefeedStmt := range noInitialScanTests {
t.Run(testName, func(t *testing.T) {
sqlDB.Exec(t, `CREATE TABLE no_initial_scan (a INT PRIMARY KEY)`)
defer sqlDB.Exec(t, `DROP TABLE no_initial_scan`)
sqlDB.Exec(t, `INSERT INTO no_initial_scan VALUES (1)`)

noInitialScan := feed(t, f, changefeedStmt)
defer func() {
closeFeed(t, noInitialScan)
sqlDB.Exec(t, `DROP TABLE no_initial_scan`)
}()
defer closeFeed(t, noInitialScan)

expectResolvedTimestamp(t, noInitialScan)

Expand All @@ -1038,15 +1039,14 @@ func TestChangefeedInitialScan(t *testing.T) {
for testName, changefeedStmtFormat := range initialScanTests {
t.Run(testName, func(t *testing.T) {
sqlDB.Exec(t, `CREATE TABLE initial_scan (a INT PRIMARY KEY)`)
defer sqlDB.Exec(t, `DROP TABLE initial_scan`)
sqlDB.Exec(t, `INSERT INTO initial_scan VALUES (1), (2), (3)`)
var tsStr string
var i int
sqlDB.QueryRow(t, `SELECT count(*), cluster_logical_timestamp() from initial_scan`).Scan(&i, &tsStr)
initialScan := feed(t, f, fmt.Sprintf(changefeedStmtFormat, tsStr))
defer func() {
closeFeed(t, initialScan)
sqlDB.Exec(t, `DROP TABLE initial_scan`)
}()
defer closeFeed(t, initialScan)

assertPayloads(t, initialScan, []string{
`initial_scan: [1]->{"after": {"a": 1}}`,
`initial_scan: [2]->{"after": {"a": 2}}`,
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/changefeedbase/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ var UseMuxRangeFeed = settings.RegisterBoolSetting(
settings.TenantWritable,
"changefeed.mux_rangefeed.enabled",
"if true, changefeed uses multiplexing rangefeed RPC",
false,
util.ConstantWithMetamorphicTestBool("changefeed.mux_rangefeed.enabled", false),
)

// EventConsumerWorkers specifies the maximum number of workers to use when
Expand Down
21 changes: 16 additions & 5 deletions pkg/ccl/changefeedccl/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/contextutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/log/eventpb"
Expand Down Expand Up @@ -93,7 +94,10 @@ func readNextMessages(
if ctx.Err() != nil {
return nil, ctx.Err()
}
log.Infof(context.Background(), "About to read a message from %v (%T)", f, f)
if log.V(1) {
log.Infof(context.Background(), "About to read a message (%d out of %d) from %v (%T)",
len(actual), numMessages, f, f)
}
m, err := f.Next()
if log.V(1) {
if m != nil {
Expand Down Expand Up @@ -248,10 +252,17 @@ func assertPayloadsTimeout() time.Duration {
func withTimeout(
f cdctest.TestFeed, timeout time.Duration, fn func(ctx context.Context) error,
) error {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
defer stopFeedWhenDone(ctx, f)()
return fn(ctx)
var jobID jobspb.JobID
if jobFeed, ok := f.(cdctest.EnterpriseTestFeed); ok {
jobID = jobFeed.JobID()
}
return contextutil.RunWithTimeout(context.Background(),
fmt.Sprintf("withTimeout-%d", jobID), timeout,
func(ctx context.Context) error {
defer stopFeedWhenDone(ctx, f)()
return fn(ctx)
},
)
}

func assertPayloads(t testing.TB, f cdctest.TestFeed, expected []string) {
Expand Down
16 changes: 10 additions & 6 deletions pkg/ccl/changefeedccl/testfeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -904,6 +904,10 @@ func (c *tableFeed) Partitions() []string {
return []string{`0`, `1`, `2`}
}

func timeoutOp(op string, id jobspb.JobID) string {
return fmt.Sprintf("%s-%d", op, id)
}

// Next implements the TestFeed interface.
func (c *tableFeed) Next() (*cdctest.TestFeedMessage, error) {
// sinkSink writes all changes to a table with primary key of topic,
Expand All @@ -921,7 +925,7 @@ func (c *tableFeed) Next() (*cdctest.TestFeedMessage, error) {
}

if err := contextutil.RunWithTimeout(
context.Background(), "tableFeed.Next", timeout(),
context.Background(), timeoutOp("tableFeed.Next", c.jobID), timeout(),
func(ctx context.Context) error {
select {
case <-ctx.Done():
Expand Down Expand Up @@ -1425,7 +1429,7 @@ func (c *cloudFeed) Next() (*cdctest.TestFeedMessage, error) {
}

if err := contextutil.RunWithTimeout(
context.Background(), "cloudFeed.Next", timeout(),
context.Background(), timeoutOp("cloudfeed.Next", c.jobID), timeout(),
func(ctx context.Context) error {
select {
case <-ctx.Done():
Expand Down Expand Up @@ -1861,7 +1865,7 @@ func (k *kafkaFeed) Next() (*cdctest.TestFeedMessage, error) {
for {
var msg *sarama.ProducerMessage
if err := contextutil.RunWithTimeout(
context.Background(), "kafka.Next", timeout(),
context.Background(), timeoutOp("kafka.Next", k.jobID), timeout(),
func(ctx context.Context) error {
select {
case <-ctx.Done():
Expand Down Expand Up @@ -2147,7 +2151,7 @@ func (f *webhookFeed) Next() (*cdctest.TestFeedMessage, error) {
}

if err := contextutil.RunWithTimeout(
context.Background(), "webhook.Next", timeout(),
context.Background(), timeoutOp("webhook.Next", f.jobID), timeout(),
func(ctx context.Context) error {
select {
case <-ctx.Done():
Expand Down Expand Up @@ -2399,7 +2403,7 @@ func (p *pubsubFeed) Next() (*cdctest.TestFeedMessage, error) {
}

if err := contextutil.RunWithTimeout(
context.Background(), "pubsub.Next", timeout(),
context.Background(), timeoutOp("pubsub.Next", p.jobID), timeout(),
func(ctx context.Context) error {
select {
case <-ctx.Done():
Expand Down Expand Up @@ -2448,7 +2452,7 @@ func stopFeedWhenDone(ctx context.Context, f cdctest.TestFeed) func() {
})
case jobFailedMarker:
go whenDone(func() {
t.jobFailed(context.Canceled)
t.jobFailed(errors.New("stopping job due to TestFeed timeout"))
})
}

Expand Down
1 change: 0 additions & 1 deletion pkg/kv/kvclient/kvcoord/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ go_library(
"//pkg/util/errorutil/unimplemented",
"//pkg/util/grpcutil",
"//pkg/util/hlc",
"//pkg/util/intsets",
"//pkg/util/iterutil",
"//pkg/util/limit",
"//pkg/util/log",
Expand Down
Loading

0 comments on commit ca70b82

Please sign in to comment.