diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index 3a4a6d43ee9b..7b6cc9f7ed68 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -134,12 +134,15 @@ func TestChangefeedReplanning(t *testing.T) { DistSQL: &execinfra.TestingKnobs{ Changefeed: &TestingKnobs{ HandleDistChangefeedError: func(err error) error { - select { - case errChan <- err: - return err - default: - return nil + if errors.Is(err, sql.ErrPlanChanged) { + select { + case errChan <- err: + return err + default: + return nil + } } + return nil }, ShouldReplan: func(ctx context.Context, oldPlan, newPlan *sql.PhysicalPlan) bool { select { @@ -1018,13 +1021,11 @@ func TestChangefeedInitialScan(t *testing.T) { for testName, changefeedStmt := range noInitialScanTests { t.Run(testName, func(t *testing.T) { sqlDB.Exec(t, `CREATE TABLE no_initial_scan (a INT PRIMARY KEY)`) + defer sqlDB.Exec(t, `DROP TABLE no_initial_scan`) sqlDB.Exec(t, `INSERT INTO no_initial_scan VALUES (1)`) noInitialScan := feed(t, f, changefeedStmt) - defer func() { - closeFeed(t, noInitialScan) - sqlDB.Exec(t, `DROP TABLE no_initial_scan`) - }() + defer closeFeed(t, noInitialScan) expectResolvedTimestamp(t, noInitialScan) @@ -1038,15 +1039,14 @@ func TestChangefeedInitialScan(t *testing.T) { for testName, changefeedStmtFormat := range initialScanTests { t.Run(testName, func(t *testing.T) { sqlDB.Exec(t, `CREATE TABLE initial_scan (a INT PRIMARY KEY)`) + defer sqlDB.Exec(t, `DROP TABLE initial_scan`) sqlDB.Exec(t, `INSERT INTO initial_scan VALUES (1), (2), (3)`) var tsStr string var i int sqlDB.QueryRow(t, `SELECT count(*), cluster_logical_timestamp() from initial_scan`).Scan(&i, &tsStr) initialScan := feed(t, f, fmt.Sprintf(changefeedStmtFormat, tsStr)) - defer func() { - closeFeed(t, initialScan) - sqlDB.Exec(t, `DROP TABLE initial_scan`) - }() + defer closeFeed(t, initialScan) + assertPayloads(t, initialScan, []string{ `initial_scan: [1]->{"after": {"a": 1}}`, `initial_scan: [2]->{"after": {"a": 2}}`, diff --git a/pkg/ccl/changefeedccl/changefeedbase/settings.go b/pkg/ccl/changefeedccl/changefeedbase/settings.go index 2d30d92dcb9b..abb8292b3867 100644 --- a/pkg/ccl/changefeedccl/changefeedbase/settings.go +++ b/pkg/ccl/changefeedccl/changefeedbase/settings.go @@ -224,7 +224,7 @@ var UseMuxRangeFeed = settings.RegisterBoolSetting( settings.TenantWritable, "changefeed.mux_rangefeed.enabled", "if true, changefeed uses multiplexing rangefeed RPC", - false, + util.ConstantWithMetamorphicTestBool("changefeed.mux_rangefeed.enabled", false), ) // EventConsumerWorkers specifies the maximum number of workers to use when diff --git a/pkg/ccl/changefeedccl/helpers_test.go b/pkg/ccl/changefeedccl/helpers_test.go index e15962dd1887..3725041c17a0 100644 --- a/pkg/ccl/changefeedccl/helpers_test.go +++ b/pkg/ccl/changefeedccl/helpers_test.go @@ -47,6 +47,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/util" + "github.com/cockroachdb/cockroach/pkg/util/contextutil" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/log/eventpb" @@ -93,7 +94,10 @@ func readNextMessages( if ctx.Err() != nil { return nil, ctx.Err() } - log.Infof(context.Background(), "About to read a message from %v (%T)", f, f) + if log.V(1) { + log.Infof(context.Background(), "About to read a message (%d out of %d) from %v (%T)", + len(actual), numMessages, f, f) + } m, err := f.Next() if log.V(1) { if m != nil { @@ -248,10 +252,17 @@ func assertPayloadsTimeout() time.Duration { func withTimeout( f cdctest.TestFeed, timeout time.Duration, fn func(ctx context.Context) error, ) error { - ctx, cancel := context.WithTimeout(context.Background(), timeout) - defer cancel() - defer stopFeedWhenDone(ctx, f)() - return fn(ctx) + var jobID jobspb.JobID + if jobFeed, ok := f.(cdctest.EnterpriseTestFeed); ok { + jobID = jobFeed.JobID() + } + return contextutil.RunWithTimeout(context.Background(), + fmt.Sprintf("withTimeout-%d", jobID), timeout, + func(ctx context.Context) error { + defer stopFeedWhenDone(ctx, f)() + return fn(ctx) + }, + ) } func assertPayloads(t testing.TB, f cdctest.TestFeed, expected []string) { diff --git a/pkg/ccl/changefeedccl/testfeed_test.go b/pkg/ccl/changefeedccl/testfeed_test.go index a27f066faac4..6a725acc4786 100644 --- a/pkg/ccl/changefeedccl/testfeed_test.go +++ b/pkg/ccl/changefeedccl/testfeed_test.go @@ -904,6 +904,10 @@ func (c *tableFeed) Partitions() []string { return []string{`0`, `1`, `2`} } +func timeoutOp(op string, id jobspb.JobID) string { + return fmt.Sprintf("%s-%d", op, id) +} + // Next implements the TestFeed interface. func (c *tableFeed) Next() (*cdctest.TestFeedMessage, error) { // sinkSink writes all changes to a table with primary key of topic, @@ -921,7 +925,7 @@ func (c *tableFeed) Next() (*cdctest.TestFeedMessage, error) { } if err := contextutil.RunWithTimeout( - context.Background(), "tableFeed.Next", timeout(), + context.Background(), timeoutOp("tableFeed.Next", c.jobID), timeout(), func(ctx context.Context) error { select { case <-ctx.Done(): @@ -1425,7 +1429,7 @@ func (c *cloudFeed) Next() (*cdctest.TestFeedMessage, error) { } if err := contextutil.RunWithTimeout( - context.Background(), "cloudFeed.Next", timeout(), + context.Background(), timeoutOp("cloudfeed.Next", c.jobID), timeout(), func(ctx context.Context) error { select { case <-ctx.Done(): @@ -1861,7 +1865,7 @@ func (k *kafkaFeed) Next() (*cdctest.TestFeedMessage, error) { for { var msg *sarama.ProducerMessage if err := contextutil.RunWithTimeout( - context.Background(), "kafka.Next", timeout(), + context.Background(), timeoutOp("kafka.Next", k.jobID), timeout(), func(ctx context.Context) error { select { case <-ctx.Done(): @@ -2147,7 +2151,7 @@ func (f *webhookFeed) Next() (*cdctest.TestFeedMessage, error) { } if err := contextutil.RunWithTimeout( - context.Background(), "webhook.Next", timeout(), + context.Background(), timeoutOp("webhook.Next", f.jobID), timeout(), func(ctx context.Context) error { select { case <-ctx.Done(): @@ -2399,7 +2403,7 @@ func (p *pubsubFeed) Next() (*cdctest.TestFeedMessage, error) { } if err := contextutil.RunWithTimeout( - context.Background(), "pubsub.Next", timeout(), + context.Background(), timeoutOp("pubsub.Next", p.jobID), timeout(), func(ctx context.Context) error { select { case <-ctx.Done(): @@ -2448,7 +2452,7 @@ func stopFeedWhenDone(ctx context.Context, f cdctest.TestFeed) func() { }) case jobFailedMarker: go whenDone(func() { - t.jobFailed(context.Canceled) + t.jobFailed(errors.New("stopping job due to TestFeed timeout")) }) } diff --git a/pkg/kv/kvclient/kvcoord/BUILD.bazel b/pkg/kv/kvclient/kvcoord/BUILD.bazel index a86cce08135c..3a50039cf22c 100644 --- a/pkg/kv/kvclient/kvcoord/BUILD.bazel +++ b/pkg/kv/kvclient/kvcoord/BUILD.bazel @@ -70,7 +70,6 @@ go_library( "//pkg/util/errorutil/unimplemented", "//pkg/util/grpcutil", "//pkg/util/hlc", - "//pkg/util/intsets", "//pkg/util/iterutil", "//pkg/util/limit", "//pkg/util/log", diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go b/pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go index 4f72216ec51a..555300c2a469 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go @@ -12,13 +12,18 @@ package kvcoord import ( "context" + "sync" + "sync/atomic" + "unsafe" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" - "github.com/cockroachdb/cockroach/pkg/util/intsets" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/pprofutil" "github.com/cockroachdb/cockroach/pkg/util/syncutil" - "github.com/cockroachdb/errors" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/logtags" ) // rangefeedMuxer is responsible for coordination and management of mux @@ -28,45 +33,55 @@ type rangefeedMuxer struct { // eventCh receives events from all active muxStreams. eventCh chan *roachpb.MuxRangeFeedEvent - // Context group controlling execution of MuxRangeFeed calls. + // Context group controlling execution of MuxRangeFeed calls. When this group + // cancels, the entire muxer shuts down. The goroutines started in `g` will + // always return `nil` errors except when they detect that the mux is shutting + // down. g ctxgroup.Group - // State pertaining to actively executing MuxRangeFeeds. + // When g cancels, demuxLoopDone gets closed. + demuxLoopDone chan struct{} + mu struct { syncutil.Mutex - // terminalErr set when a terminal error occurs. - // Subsequent calls to startMuxRangeFeed will return this error. - terminalErr error - - // Each call to start new range feed gets a unique ID which is echoed back - // by MuxRangeFeed rpc. This is done as a safety mechanism to make sure - // that we always send the event to the correct consumer -- even if the - // range feed is terminated and re-established rapidly. - nextStreamID int64 - - // muxClient contains a nodeID->MuxRangeFeedClient. - muxClients map[roachpb.NodeID]*muxClientState - // producers maps streamID to the event producer -- data sent back to the - // consumer of range feed events. - producers map[int64]*channelRangeFeedEventProducer + // map of active MuxRangeFeed clients. + clients map[roachpb.NodeID]*muxClientState } + + // Each call to start new range feed gets a unique ID which is echoed back + // by MuxRangeFeed rpc. This is done as a safety mechanism to make sure + // that we always send the event to the correct consumer -- even if the + // range feed is terminated and re-established rapidly. + // Accessed atomically. + seqID int64 + + // producers is a map of all rangefeeds running across all nodes. + // streamID -> *channelRangeFeedEventProducer. + producers syncutil.IntMap } +// muxClientState is the state maintained for each MuxRangeFeed rpc. type muxClientState struct { - client roachpb.Internal_MuxRangeFeedClient - streams intsets.Fast - cancel context.CancelFunc + initCtx termCtx // signaled when client ready to be used. + doneCtx terminationContext // signaled when client shuts down. + + // RPC state. Valid only after initCtx.Done(). + client roachpb.Internal_MuxRangeFeedClient + cancel context.CancelFunc + + // Number of consumers (ranges) running on this node; accessed under rangefeedMuxer lock. + numStreams int } func newRangefeedMuxer(g ctxgroup.Group) *rangefeedMuxer { m := &rangefeedMuxer{ - eventCh: make(chan *roachpb.MuxRangeFeedEvent), - g: g, + eventCh: make(chan *roachpb.MuxRangeFeedEvent), + demuxLoopDone: make(chan struct{}), + g: g, } - m.mu.muxClients = make(map[roachpb.NodeID]*muxClientState) - m.mu.producers = make(map[int64]*channelRangeFeedEventProducer) + m.mu.clients = make(map[roachpb.NodeID]*muxClientState) m.g.GoCtx(m.demuxLoop) return m @@ -75,214 +90,266 @@ func newRangefeedMuxer(g ctxgroup.Group) *rangefeedMuxer { // channelRangeFeedEventProducer is a rangeFeedEventProducer which receives // events on input channel, and returns events when Recv is called. type channelRangeFeedEventProducer struct { - ctx context.Context - termErrCh chan struct{} // Signalled to propagate terminal error the consumer. - termErr error // Set when terminal error occurs. - eventCh chan *roachpb.RangeFeedEvent + // Event producer utilizes two contexts: + // + // - callerCtx connected to singleRangeFeed, i.e. a context that will cancel + // if a single-range rangefeed fails (range stuck, parent ctx cancels). + // - muxClientCtx connected to receiveEventsFromNode, i.e. a streaming RPC to + // a node serving multiple rangefeeds. This cancels if, for example, the + // remote node goes down or there are networking issues. + // + // When singleRangeFeed blocks in Recv(), we have to respect cancellations in + // both contexts. The implementation of Recv() on this type does this. + callerCtx context.Context + muxClientCtx terminationContext + + streamID int64 // stream ID for this producer. + eventCh chan *roachpb.RangeFeedEvent // consumer event channel. } +var _ roachpb.RangeFeedEventProducer = (*channelRangeFeedEventProducer)(nil) + // Recv implements rangeFeedEventProducer interface. func (c *channelRangeFeedEventProducer) Recv() (*roachpb.RangeFeedEvent, error) { select { - case <-c.ctx.Done(): - return nil, c.ctx.Err() - case <-c.termErrCh: - return nil, c.termErr + case <-c.callerCtx.Done(): + return nil, c.callerCtx.Err() + case <-c.muxClientCtx.Done(): + return nil, c.muxClientCtx.Err() case e := <-c.eventCh: return e, nil } } -var _ roachpb.RangeFeedEventProducer = (*channelRangeFeedEventProducer)(nil) - // startMuxRangeFeed begins the execution of rangefeed for the specified // RangeFeedRequest. // The passed in client is only needed to establish MuxRangeFeed RPC. func (m *rangefeedMuxer) startMuxRangeFeed( ctx context.Context, client rpc.RestrictedInternalClient, req *roachpb.RangeFeedRequest, -) (_ roachpb.RangeFeedEventProducer, cleanup func(), _ error) { - producer, rpcClient, streamID, cleanup, err := m.connect(ctx, client, req.Replica.NodeID) +) (roachpb.RangeFeedEventProducer, func(), error) { + ms, err := m.establishMuxConnection(ctx, client, req.Replica.NodeID) if err != nil { - return nil, cleanup, err + return nil, nil, err } - req.StreamID = streamID - if err := rpcClient.Send(req); err != nil { - return nil, cleanup, err + + req.StreamID = atomic.AddInt64(&m.seqID, 1) + streamCtx := logtags.AddTag(ctx, "stream", req.StreamID) + producer := &channelRangeFeedEventProducer{ + callerCtx: streamCtx, + muxClientCtx: ms.doneCtx, + streamID: req.StreamID, + eventCh: make(chan *roachpb.RangeFeedEvent), } - return producer, cleanup, nil -} + m.producers.Store(req.StreamID, unsafe.Pointer(producer)) -// connect establishes MuxRangeFeed connection for the specified node, re-using -// the existing one if one exists. Returns event producer, RPC client to send -// requests on, the streamID which should be used when sending request, and a -// cleanup function. Cleanup function never nil, and must always be invoked, -// even if error is returned. -func (m *rangefeedMuxer) connect( - ctx context.Context, client rpc.RestrictedInternalClient, nodeID roachpb.NodeID, -) ( - _ roachpb.RangeFeedEventProducer, - _ roachpb.Internal_MuxRangeFeedClient, - streamID int64, - cleanup func(), - _ error, -) { - m.mu.Lock() - defer m.mu.Unlock() + if log.V(1) { + log.Info(streamCtx, "starting rangefeed") + } - streamID = m.mu.nextStreamID - m.mu.nextStreamID++ + cleanup := func() { + m.producers.Delete(req.StreamID) - cleanup = func() { m.mu.Lock() defer m.mu.Unlock() - delete(m.mu.producers, streamID) - // Cleanup mux state if it exists; it may be nil if this function exits - // early (for example if MuxRangeFeed call fails, before muxState - // initialized). - muxState := m.mu.muxClients[nodeID] - if muxState != nil { - muxState.streams.Remove(int(streamID)) - if muxState.streams.Len() == 0 { - // This node no longer has any active streams. - // Delete node from the muxClient list, and gracefully - // shutdown consumer go routine. - delete(m.mu.muxClients, nodeID) - muxState.cancel() + ms.numStreams-- + if ms.numStreams == 0 { + delete(m.mu.clients, req.Replica.NodeID) + if log.V(1) { + log.InfofDepth(streamCtx, 1, "shut down inactive mux for node %d", req.Replica.NodeID) } + ms.cancel() } } - if m.mu.terminalErr != nil { - return nil, nil, streamID, cleanup, m.mu.terminalErr + if err := ms.client.Send(req); err != nil { + cleanup() + return nil, nil, err } + return producer, cleanup, nil +} - var found bool - ms, found := m.mu.muxClients[nodeID] - +// establishMuxConnection establishes MuxRangeFeed RPC with the node. +func (m *rangefeedMuxer) establishMuxConnection( + ctx context.Context, client rpc.RestrictedInternalClient, nodeID roachpb.NodeID, +) (*muxClientState, error) { + // NB: the `ctx` in scope here belongs to a client for a single range feed, and must + // not influence the lifetime of the mux connection. At the time of writing, the caller + // is `singleRangeFeed` which calls into this method through its streamProducerFactory + // argument. + m.mu.Lock() + ms, found := m.mu.clients[nodeID] if !found { - ctx, cancel := context.WithCancel(ctx) - stream, err := client.MuxRangeFeed(ctx) - if err != nil { - cancel() - return nil, nil, streamID, cleanup, err - } - - ms = &muxClientState{client: stream, cancel: cancel} - m.mu.muxClients[nodeID] = ms + // Initialize muxClientState. + // Only initCtx is initialized here since we need to block on it. + // The rest of the initialization happens in startNodeMuxRangeFeed. + ms = &muxClientState{initCtx: makeTerminationContext()} + // Kick off client initialization on another Go routine. + // It is important that we start MuxRangeFeed RPC using long-lived + // context available in the main context group used for this muxer. m.g.GoCtx(func(ctx context.Context) error { - defer cancel() - return m.receiveEventsFromNode(ctx, nodeID, stream) + return m.startNodeMuxRangeFeed(ctx, ms, client, nodeID) }) + m.mu.clients[nodeID] = ms } + ms.numStreams++ + m.mu.Unlock() - // Start RangeFeed for this request. - ms.streams.Add(int(streamID)) + // Ensure mux client is ready. + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-ms.initCtx.Done(): + return ms, ms.initCtx.Err() + } +} - producer := &channelRangeFeedEventProducer{ - ctx: ctx, - termErrCh: make(chan struct{}), - eventCh: make(chan *roachpb.RangeFeedEvent), +// startNodeMuxRangeFeedLocked establishes MuxRangeFeed RPC with the node. +func (m *rangefeedMuxer) startNodeMuxRangeFeed( + ctx context.Context, + ms *muxClientState, + client rpc.RestrictedInternalClient, + nodeID roachpb.NodeID, +) error { + ctx = logtags.AddTag(ctx, "mux_n", nodeID) + // Add "generation" number to the context so that log messages and stacks can + // differentiate between multiple instances of mux rangefeed Go routine + // (this can happen when one was shutdown, then re-established). + ctx = logtags.AddTag(ctx, "gen", atomic.AddInt64(&m.seqID, 1)) + ctx, restore := pprofutil.SetProfilerLabelsFromCtxTags(ctx) + defer restore() + + if log.V(1) { + log.Info(ctx, "Establishing MuxRangeFeed") + start := timeutil.Now() + defer func() { + log.Infof(ctx, "MuxRangeFeed terminating after %s", timeutil.Since(start)) + }() + } + + doneCtx := makeTerminationContext() + ms.doneCtx = &doneCtx + ctx, cancel := context.WithCancel(ctx) + + ms.cancel = func() { + cancel() + doneCtx.close(context.Canceled) } - m.mu.producers[streamID] = producer - return producer, ms.client, streamID, cleanup, nil + defer ms.cancel() + + // NB: it is important that this Go routine never returns an error. Errors + // should be propagated to the caller either via initCtx.err, or doneCtx.err. + // We do this to make sure that this error does not kill entire context group. + // We want the caller (singleRangeFeed) to decide if this error is retry-able. + var err error + ms.client, err = client.MuxRangeFeed(ctx) + ms.initCtx.close(err) + + if err == nil { + doneCtx.close(m.receiveEventsFromNode(ctx, ms)) + } + + // We propagated error to the caller via init/done context. + return nil //nolint:returnerrcheck } // demuxLoop de-multiplexes events and sends them to appropriate rangefeed event // consumer. -func (m *rangefeedMuxer) demuxLoop(ctx context.Context) error { +func (m *rangefeedMuxer) demuxLoop(ctx context.Context) (retErr error) { + defer close(m.demuxLoopDone) + for { select { case <-ctx.Done(): return ctx.Err() case e := <-m.eventCh: - m.mu.Lock() - producer, found := m.mu.producers[e.StreamID] - m.mu.Unlock() - - if !found { - return m.shutdownWithError(errors.AssertionFailedf( - "expected to find consumer for streamID=%d", e.StreamID), - ) + var producer *channelRangeFeedEventProducer + if v, found := m.producers.Load(e.StreamID); found { + producer = (*channelRangeFeedEventProducer)(v) + } + + // The stream may already have terminated (either producer is nil, or + // producer.muxClientCtx.Done()). That's fine -- we may have encountered range + // split or similar rangefeed error, causing the caller to exit (and + // terminate this stream), but the server side stream termination is async + // and probabilistic (rangefeed registration output loop may have a + // checkpoint event available, *and* it may have context cancellation, but + // which one executes is a coin flip) and so it is possible that we may + // see additional event(s) arriving for a stream that is no longer active. + if producer == nil { + if log.V(1) { + log.Infof(ctx, "received stray event stream %d: %v", e.StreamID, e) + } + continue } select { case <-ctx.Done(): return ctx.Err() case producer.eventCh <- &e.RangeFeedEvent: + case <-producer.muxClientCtx.Done(): + if log.V(1) { + log.Infof(ctx, "received stray event stream %d: %v", e.StreamID, e) + } } } } } +// terminationContext (inspired by context.Context) describes +// termination information. +type terminationContext interface { + Done() <-chan struct{} + Err() error +} + +// termCtx implements terminationContext, and allows error to be set. +type termCtx struct { + sync.Once + done chan struct{} + err error +} + +func makeTerminationContext() termCtx { + return termCtx{done: make(chan struct{})} +} + +func (tc *termCtx) Done() <-chan struct{} { + return tc.done +} +func (tc *termCtx) Err() error { + return tc.err +} + +// close closes this context with specified error. +func (tc *termCtx) close(err error) { + tc.Do(func() { + tc.err = err + close(tc.done) + }) +} + // receiveEventsFromNode receives mux rangefeed events, and forwards them to the -// consumer channel. -func (m *rangefeedMuxer) receiveEventsFromNode( - ctx context.Context, nodeID roachpb.NodeID, stream roachpb.Internal_MuxRangeFeedClient, -) error { +// demuxLoop. +// Passed in context must be the context used to create ms.client. +func (m *rangefeedMuxer) receiveEventsFromNode(ctx context.Context, ms *muxClientState) error { for { - event, streamErr := stream.Recv() + event, streamErr := ms.client.Recv() if streamErr != nil { - m.propagateStreamTerminationErrorToConsumers(nodeID, streamErr) - // Since the stream error is handled above, we return nil to gracefully shut down - // this go routine. - return nil //nolint:returnerrcheck + return streamErr } select { case <-ctx.Done(): - return ctx.Err() + // Normally, when ctx is done, we would receive streamErr above. + // But it's possible that the context was canceled right after the last Recv(), + // and in that case we must exit. + return nil + case <-m.demuxLoopDone: + // demuxLoop exited, and so should we (happens when main context group completes) + return nil case m.eventCh <- event: } } } - -// propagateStreamTerminationErrorToConsumers called when mux stream running on -// a node encountered an error. All consumers will receive the stream -// termination error and will handle it appropriately. -func (m *rangefeedMuxer) propagateStreamTerminationErrorToConsumers( - nodeID roachpb.NodeID, streamErr error, -) { - // Grab muxStream associated with the node, and clear it out. - m.mu.Lock() - defer m.mu.Unlock() - - ms, streamFound := m.mu.muxClients[nodeID] - delete(m.mu.muxClients, nodeID) - // Note: it's okay if the stream is not found; this can happen if the - // nodeID was already removed from muxClients because we're trying to gracefully - // shutdown receiveEventsFromNode go routine. - if streamFound { - ms.streams.ForEach(func(streamID int) { - p := m.mu.producers[int64(streamID)] - delete(m.mu.producers, int64(streamID)) - if p != nil { - p.termErr = streamErr - close(p.termErrCh) - } - }) - } - -} - -// shutdownWithError terminates this rangefeedMuxer with a terminal error -// (usually an assertion failure). It's a bit unwieldy way to propagate this -// error to the caller, but as soon as one of consumers notices this terminal -// error, the context should be cancelled. -// Returns the terminal error passed in. -func (m *rangefeedMuxer) shutdownWithError(terminalErr error) error { - // Okay to run this under the lock since as soon as a consumer sees this terminal - // error, the whole rangefeed should terminate. - m.mu.Lock() - defer m.mu.Unlock() - - m.mu.terminalErr = terminalErr - for id, p := range m.mu.producers { - p.termErr = terminalErr - close(p.termErrCh) - delete(m.mu.producers, id) - } - - return terminalErr -} diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go index 241541621f75..15ed56b07d5c 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go @@ -416,6 +416,10 @@ func (ds *DistSender) partialRangeFeed( } // Establish a RangeFeed for a single Range. + if log.V(1) { + log.Infof(ctx, "RangeFeed starting for range %d@%s (%s)", token.Desc().RangeID, startAfter, span) + } + maxTS, err := ds.singleRangeFeed( ctx, span, startAfter, withDiff, token.Desc(), catchupSem, eventCh, streamProducerFactory, active.onRangeEvent, cfg) @@ -427,8 +431,8 @@ func (ds *DistSender) partialRangeFeed( active.setLastError(err) if log.V(1) { - log.Infof(ctx, "RangeFeed %s disconnected with last checkpoint %s ago: %v", - span, timeutil.Since(startAfter.GoTime()), err) + log.Infof(ctx, "RangeFeed %s@%s disconnected with last checkpoint %s ago: %v", + span, startAfter, timeutil.Since(startAfter.GoTime()), err) } switch { case errors.HasType(err, (*roachpb.StoreNotFoundError)(nil)) || @@ -507,10 +511,15 @@ func (ds *DistSender) singleRangeFeed( streamProducerFactory rangeFeedEventProducerFactory, onRangeEvent onRangeEventCb, cfg rangeFeedConfig, -) (hlc.Timestamp, error) { +) (_ hlc.Timestamp, retErr error) { // Ensure context is cancelled on all errors, to prevent gRPC stream leaks. ctx, cancelFeed := context.WithCancel(ctx) - defer cancelFeed() + defer func() { + if log.V(1) { + log.Infof(ctx, "singleRangeFeed terminating with err=%v", retErr) + } + cancelFeed() + }() admissionPri := admissionpb.BulkNormalPri if cfg.overSystemTable { @@ -640,6 +649,7 @@ func (ds *DistSender) singleRangeFeed( event, err = stream.Recv() return err }); err != nil { + log.VErrEventf(ctx, 2, "RPC error: %s", err) if err == io.EOF { return args.Timestamp, nil }