From 0be08e7e78bd8c6e42885bf374ff604f0ea8926e Mon Sep 17 00:00:00 2001 From: Yevgeniy Miretskiy Date: Tue, 26 Sep 2023 16:27:50 -0400 Subject: [PATCH] kvcoord: Fix mux rangefeed startup deadlock Previous PR #110919 modified rangefeed startup logic to rely on rate limit, instead of a semaphore. The issue exposed by catchup scan rate limiter is that it allowed many more (100/sec vs 8 prior) catchup scans to be started. If the range resides on a local node, a local "bypass" rpc is created instead (rpc/context.go) and this RPC bypass is implemented via buffered channels. The deadlock occurs when the client attempts to send request to the server, while holding the lock, but blocks (because channel is full -- i.e. we have sent 128 outstanding requests), and the server blocks for the same reason because the client mux goroutine is blocked attempting to acquire the lock to lookup the stream for the rangefeed message. The fix moves the state shared by the sender and the consumer -- namely the stream map -- outside of the sender lock. This fix adds a test that reliably fails under deadlock detection. Fixes #111111 Fixes #111165 Release note: None --- pkg/kv/kvclient/kvcoord/BUILD.bazel | 1 + .../kvcoord/dist_sender_mux_rangefeed.go | 85 +++++++++++-------- .../kvclient/kvcoord/dist_sender_rangefeed.go | 47 ++++++++-- .../kvcoord/dist_sender_rangefeed_test.go | 64 ++++++++++++++ pkg/rpc/context.go | 14 ++- 5 files changed, 168 insertions(+), 43 deletions(-) diff --git a/pkg/kv/kvclient/kvcoord/BUILD.bazel b/pkg/kv/kvclient/kvcoord/BUILD.bazel index d194c0cfb938..f086883a67fa 100644 --- a/pkg/kv/kvclient/kvcoord/BUILD.bazel +++ b/pkg/kv/kvclient/kvcoord/BUILD.bazel @@ -228,6 +228,7 @@ go_test( "@com_github_cockroachdb_errors//errutil", "@com_github_cockroachdb_redact//:redact", "@com_github_golang_mock//gomock", + "@com_github_sasha_s_go_deadlock//:go-deadlock", "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", "@org_golang_google_grpc//:go_default_library", diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go b/pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go index 013e0de08cd0..c8934f06640e 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go @@ -117,12 +117,13 @@ func muxRangeFeed( type muxStream struct { nodeID roachpb.NodeID + streams syncutil.IntMap // streamID -> *activeMuxRangeFeed + // mu must be held when starting rangefeed. mu struct { syncutil.Mutex - sender rangeFeedRequestSender - streams map[int64]*activeMuxRangeFeed - closed bool + sender rangeFeedRequestSender + closed bool } } @@ -290,7 +291,7 @@ func (s *activeMuxRangeFeed) start(ctx context.Context, m *rangefeedMuxer) error conn, err := m.establishMuxConnection(ctx, rpcClient, args.Replica.NodeID) if err == nil { - err = conn.startRangeFeed(streamID, s, &args) + err = conn.startRangeFeed(streamID, s, &args, m.cfg.knobs.beforeSendRequest) } if err != nil { @@ -380,7 +381,6 @@ func (m *rangefeedMuxer) startNodeMuxRangeFeed( ms := muxStream{nodeID: nodeID} ms.mu.sender = mux - ms.mu.streams = make(map[int64]*activeMuxRangeFeed) if err := future.MustSet(stream, muxStreamOrError{stream: &ms}); err != nil { return err } @@ -474,7 +474,7 @@ func (m *rangefeedMuxer) receiveEventsFromNode( if active.catchupRes != nil { m.metrics.Errors.RangefeedErrorCatchup.Inc(1) } - ms.deleteStream(event.StreamID) + ms.streams.Delete(event.StreamID) // Restart rangefeed on another goroutine. Restart might be a bit // expensive, particularly if we have to resolve span. We do not want // to block receiveEventsFromNode for too long. @@ -555,53 +555,66 @@ func (m *rangefeedMuxer) restartActiveRangeFeed( // on this node connection. If no error returned, registers stream // with this connection. Otherwise, stream is not registered. func (c *muxStream) startRangeFeed( - streamID int64, stream *activeMuxRangeFeed, req *kvpb.RangeFeedRequest, -) error { + streamID int64, stream *activeMuxRangeFeed, req *kvpb.RangeFeedRequest, beforeSend func(), +) (retErr error) { // NB: lock must be held for the duration of this method. + // The reasons for this are twofold: + // 1. Send calls must be protected against concurrent calls. + // 2. The muxStream may be in the process of restart -- that is receiveEventsFromNode just + // returned an error. When that happens, muxStream is closed, and all rangefeeds + // belonging to this muxStream are restarted. The lock here synchronizes with the close() + // call so that we either observe the fact that muxStream is closed when this method runs, + // or that the close waits until this call completes. + // Note also, the Send method may block. That's alright. If the call is blocked because + // the server side just returned an error, then, the send call should abort and cause an + // error to be returned, releasing the lock, and letting close proceed. c.mu.Lock() defer c.mu.Unlock() + // As soon as we issue Send below, the stream may return an event or an error that + // may be seen by the event consumer (receiveEventsFromNode). + // Therefore, we update streams map immediately, but undo this insert in case of an error, + // which is returned to the caller for retry. + c.streams.Store(streamID, unsafe.Pointer(stream)) + + defer func() { + if retErr != nil { + // undo stream registration. + c.streams.Delete(streamID) + } + }() + if c.mu.closed { return net.ErrClosed } - // Concurrent Send calls are not thread safe; thus Send calls must be - // synchronized. - if err := c.mu.sender.Send(req); err != nil { - return err + if beforeSend != nil { + beforeSend() } - // As soon as we issue Send above, the stream may return an error that - // may be seen by the event consumer (receiveEventsFromNode). - // Therefore, we update streams map under the lock to ensure that the - // receiver will be able to observe this stream. - c.mu.streams[streamID] = stream - return nil + return c.mu.sender.Send(req) } -func (c *muxStream) lookupStream(streamID int64) (a *activeMuxRangeFeed) { - c.mu.Lock() - a = c.mu.streams[streamID] - c.mu.Unlock() - return a -} - -func (c *muxStream) deleteStream(streamID int64) { - c.mu.Lock() - delete(c.mu.streams, streamID) - c.mu.Unlock() +func (c *muxStream) lookupStream(streamID int64) *activeMuxRangeFeed { + v, ok := c.streams.Load(streamID) + if ok { + return (*activeMuxRangeFeed)(v) + } + return nil } // close closes mux stream returning the list of active range feeds. -func (c *muxStream) close() []*activeMuxRangeFeed { +func (c *muxStream) close() (toRestart []*activeMuxRangeFeed) { + // NB: lock must be held for the duration of this method to synchronize with startRangeFeed. c.mu.Lock() + defer c.mu.Unlock() + c.mu.closed = true - toRestart := make([]*activeMuxRangeFeed, 0, len(c.mu.streams)) - for _, a := range c.mu.streams { - toRestart = append(toRestart, a) - } - c.mu.streams = nil - c.mu.Unlock() + + c.streams.Range(func(_ int64, v unsafe.Pointer) bool { + toRestart = append(toRestart, (*activeMuxRangeFeed)(v)) + return true + }) return toRestart } diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go index fed832f45d60..6b0b45d25b7e 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go @@ -101,6 +101,8 @@ type rangeFeedConfig struct { // captureMuxRangeFeedRequestSender is a callback invoked when mux // rangefeed establishes connection to the node. captureMuxRangeFeedRequestSender func(nodeID roachpb.NodeID, sender func(req *kvpb.RangeFeedRequest) error) + // beforeSendRequest is a mux rangefeed callback invoked prior to sending rangefeed request. + beforeSendRequest func() } } @@ -973,6 +975,14 @@ func TestingWithMuxRangeFeedRequestSenderCapture( }) } +// TestingWithBeforeSendRequest returns a test only option that invokes +// function before sending rangefeed request. +func TestingWithBeforeSendRequest(fn func()) RangeFeedOption { + return optionFunc(func(c *rangeFeedConfig) { + c.knobs.beforeSendRequest = fn + }) +} + // TestingMakeRangeFeedMetrics exposes makeDistSenderRangeFeedMetrics for test use. var TestingMakeRangeFeedMetrics = makeDistSenderRangeFeedMetrics @@ -983,10 +993,15 @@ type catchupScanRateLimiter struct { } func newCatchupScanRateLimiter(sv *settings.Values) *catchupScanRateLimiter { - rl := &catchupScanRateLimiter{sv: sv} - rl.limit = getCatchupRateLimit(rl.sv) - rl.pacer = quotapool.NewRateLimiter("distSenderCatchupLimit", rl.limit, 0 /* smooth rate */) - return rl + const slowAcquisitionThreshold = 5 * time.Second + lim := getCatchupRateLimit(sv) + return &catchupScanRateLimiter{ + sv: sv, + limit: lim, + pacer: quotapool.NewRateLimiter( + "distSenderCatchupLimit", lim, 0, /* smooth rate limit without burst */ + quotapool.OnSlowAcquisition(slowAcquisitionThreshold, logSlowCatchupScanAcquisition(slowAcquisitionThreshold))), + } } func getCatchupRateLimit(sv *settings.Values) quotapool.Limit { @@ -1001,8 +1016,28 @@ func (rl *catchupScanRateLimiter) Pace(ctx context.Context) error { // Take opportunity to update limits if they have changed. if lim := getCatchupRateLimit(rl.sv); lim != rl.limit { rl.limit = lim - rl.pacer.UpdateLimit(lim, 0) + rl.pacer.UpdateLimit(lim, 0 /* smooth rate limit without burst */) } - return rl.pacer.WaitN(ctx, 1) } + +// logSlowCatchupScanAcquisition is a function returning a quotapool.SlowAcquisitionFunction. +// It differs from the quotapool.LogSlowAcquisition in that only some of slow acquisition +// events are logged to reduce log spam. +func logSlowCatchupScanAcquisition(loggingMinInterval time.Duration) quotapool.SlowAcquisitionFunc { + logSlowAcquire := log.Every(loggingMinInterval) + + return func(ctx context.Context, poolName string, r quotapool.Request, start time.Time) func() { + shouldLog := logSlowAcquire.ShouldLog() + if shouldLog { + log.Warningf(ctx, "have been waiting %s attempting to acquire catchup scan quota", + timeutil.Since(start)) + } + + return func() { + if shouldLog { + log.Infof(ctx, "acquired catchup quota after %s", timeutil.Since(start)) + } + } + } +} diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go index fb1f44311e83..da6eb46baf08 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go @@ -44,6 +44,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" + "github.com/sasha-s/go-deadlock" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "google.golang.org/grpc" @@ -258,6 +259,9 @@ func channelWaitWithTimeout(t *testing.T, ch chan struct{}) { if util.RaceEnabled { timeOut *= 10 } + if syncutil.DeadlockEnabled { + timeOut = 2 * deadlock.Opts.DeadlockTimeout + } select { case <-ch: case <-time.After(timeOut): @@ -1052,3 +1056,63 @@ func TestMuxRangeFeedCanCloseStream(t *testing.T) { // Mux rangefeed should retry, and thus we expect frontier to keep advancing. } } + +// TestMuxRangeFeedDoesNotDeadlockWithLocalStreams verifies mux rangefeed does not +// deadlock when running against many local ranges. Local ranges use local RPC +// bypass (rpc/context.go) which utilize buffered channels for client/server streaming +// RPC communication. +func TestMuxRangeFeedDoesNotDeadlockWithLocalStreams(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + if !syncutil.DeadlockEnabled { + t.Log("skipping test: it requires deadlock detection enabled.") + return + } + + // Lower syncutil deadlock timeout. + deadlock.Opts.DeadlockTimeout = 2 * time.Minute + + // Make deadlock more likely: use unbuffered channel. + defer rpc.TestingSetLocalStreamChannelBufferSize(0)() + + ctx := context.Background() + tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + }) + defer tc.Stopper().Stop(ctx) + + ts := tc.Server(0).ApplicationLayer() + sqlDB := sqlutils.MakeSQLRunner(tc.ServerConn(0)) + + // Insert 1000 rows, and split them into many ranges. + sqlDB.ExecMultiple(t, + `SET CLUSTER SETTING kv.rangefeed.enabled = true`, + `SET CLUSTER SETTING kv.closed_timestamp.target_duration='100ms'`, + `ALTER DATABASE defaultdb CONFIGURE ZONE USING num_replicas = 1`, + `CREATE TABLE foo (key INT PRIMARY KEY)`, + ) + + startFrom := ts.Clock().Now() + + sqlDB.ExecMultiple(t, + `INSERT INTO foo (key) SELECT * FROM generate_series(1, 1000)`, + `ALTER TABLE foo SPLIT AT (SELECT * FROM generate_series(100, 900, 20))`, + ) + + fooDesc := desctestutils.TestingGetPublicTableDescriptor( + ts.DB(), keys.SystemSQLCodec, "defaultdb", "foo") + fooSpan := fooDesc.PrimaryIndexSpan(keys.SystemSQLCodec) + + allSeen, onValue := observeNValues(1000) + closeFeed := rangeFeed(ts.DistSenderI(), fooSpan, startFrom, onValue, true, + kvcoord.WithMuxRangeFeed(), + kvcoord.TestingWithBeforeSendRequest(func() { + // Prior to sending rangefeed request, block for just a bit + // to make deadlock more likely. + time.Sleep(100 * time.Millisecond) + }), + ) + defer closeFeed() + channelWaitWithTimeout(t, allSeen) +} diff --git a/pkg/rpc/context.go b/pkg/rpc/context.go index f4950b35ebad..2646c8e74b7b 100644 --- a/pkg/rpc/context.go +++ b/pkg/rpc/context.go @@ -19,6 +19,7 @@ import ( "io" "math" "net" + "sync/atomic" "time" "github.com/cockroachdb/cockroach/pkg/base" @@ -1258,6 +1259,17 @@ type pipe struct { errC chan error } +// buffer size for channel used to connect local streaming rpcs. +var localStreamChannelBufferSize int64 = 128 // accessed atomically. + +// TestingSetLocalStreamChannelBufferSize overrides channel buffer size +// used for streaming RPCs. +func TestingSetLocalStreamChannelBufferSize(s int64) func() { + old := atomic.LoadInt64(&localStreamChannelBufferSize) + atomic.StoreInt64(&localStreamChannelBufferSize, s) + return func() { atomic.StoreInt64(&localStreamChannelBufferSize, old) } +} + // makePipe creates a pipe and return it as its two ends. // // assignPtr is a function that implements *dst = *src for the type of the @@ -1267,7 +1279,7 @@ type pipe struct { // (i.e. interface{}) way. func makePipe(assignPtr func(dst interface{}, src interface{})) (pipeWriter, pipeReader) { p := &pipe{ - respC: make(chan interface{}, 128), + respC: make(chan interface{}, atomic.LoadInt64(&localStreamChannelBufferSize)), errC: make(chan error, 1), } w := pipeWriter{pipe: p}