From 0538d371ea76858d0668980194eaa7b7cc347f0f Mon Sep 17 00:00:00 2001 From: Neil Shen Date: Tue, 18 Jan 2022 18:15:44 +0800 Subject: [PATCH] kv,puller(ticdc): add changefeed ID to kv client (#4373) ref pingcap/tiflow#3288 --- cdc/kv/client.go | 97 +++++++++++++++++++----- cdc/kv/client_bench_test.go | 4 +- cdc/kv/client_test.go | 52 ++++++------- cdc/kv/region_worker.go | 24 ++++-- cdc/kv/testing.go | 4 +- cdc/owner/ddl_puller.go | 2 + cdc/processor/pipeline/puller.go | 7 +- cdc/processor/pipeline/table.go | 3 +- cdc/processor/processor.go | 1 + cdc/puller/puller.go | 3 +- cdc/puller/puller_test.go | 3 +- pkg/regionspan/region_range_lock.go | 29 +++++-- pkg/regionspan/region_range_lock_test.go | 8 +- 13 files changed, 168 insertions(+), 69 deletions(-) diff --git a/cdc/kv/client.go b/cdc/kv/client.go index 79be03f7a41..197d208bdac 100644 --- a/cdc/kv/client.go +++ b/cdc/kv/client.go @@ -315,6 +315,7 @@ type CDCClient struct { regionCache *tikv.RegionCache kvStorage tikv.Storage pdClock pdtime.Clock + changefeed string regionLimiters *regionEventFeedLimiters } @@ -327,6 +328,7 @@ func NewCDCClient( grpcPool GrpcPool, regionCache *tikv.RegionCache, pdClock pdtime.Clock, + changefeed string, ) (c CDCKVClient) { clusterID := pd.GetClusterID(ctx) @@ -337,6 +339,7 @@ func NewCDCClient( grpcPool: grpcPool, regionCache: regionCache, pdClock: pdClock, + changefeed: changefeed, regionLimiters: defaultRegionEventFeedLimiters, } return @@ -356,12 +359,16 @@ func (c *CDCClient) newStream(ctx context.Context, addr string, storeID uint64) }() conn, err = c.grpcPool.GetConn(addr) if err != nil { - log.Info("get connection to store failed, retry later", zap.String("addr", addr), zap.Error(err)) + log.Info("get connection to store failed, retry later", + zap.String("addr", addr), zap.Error(err), + zap.String("changefeed", c.changefeed)) return } err = version.CheckStoreVersion(ctx, c.pd, storeID) if err != nil { - log.Error("check tikv version failed", zap.Error(err), zap.Uint64("storeID", storeID)) + log.Error("check tikv version failed", + zap.Error(err), zap.Uint64("storeID", storeID), + zap.String("changefeed", c.changefeed)) return } client := cdcpb.NewChangeDataClient(conn.ClientConn) @@ -369,14 +376,18 @@ func (c *CDCClient) newStream(ctx context.Context, addr string, storeID uint64) streamClient, err = client.EventFeed(ctx) if err != nil { err = cerror.WrapError(cerror.ErrTiKVEventFeed, err) - log.Info("establish stream to store failed, retry later", zap.String("addr", addr), zap.Error(err)) + log.Info("establish stream to store failed, retry later", + zap.String("addr", addr), zap.Error(err), + zap.String("changefeed", c.changefeed)) return } stream = &eventFeedStream{ client: streamClient, conn: conn, } - log.Debug("created stream to store", zap.String("addr", addr)) + log.Debug("created stream to store", + zap.String("addr", addr), + zap.String("changefeed", c.changefeed)) return nil }, retry.WithBackoffBaseDelay(500), retry.WithMaxTries(2), retry.WithIsRetryableErr(cerror.IsRetryableError)) return @@ -470,6 +481,8 @@ func newEventFeedSession( ) *eventFeedSession { id := strconv.FormatUint(allocID(), 10) kvClientCfg := config.GetGlobalServerConfig().KVClient + rangeLock := regionspan.NewRegionRangeLock( + totalSpan.Start, totalSpan.End, startTs, client.changefeed) return &eventFeedSession{ client: client, totalSpan: totalSpan, @@ -479,7 +492,7 @@ func newEventFeedSession( errCh: make(chan regionErrorInfo, defaultRegionChanSize), requestRangeCh: make(chan rangeRequestTask, defaultRegionChanSize), rateLimitQueue: make([]regionErrorInfo, 0, defaultRegionRateLimitQueueSize), - rangeLock: regionspan.NewRegionRangeLock(totalSpan.Start, totalSpan.End, startTs), + rangeLock: rangeLock, enableOldValue: enableOldValue, lockResolver: lockResolver, isPullerInit: isPullerInit, @@ -496,7 +509,9 @@ func (s *eventFeedSession) eventFeed(ctx context.Context, ts uint64) error { eventFeedGauge.Inc() defer eventFeedGauge.Dec() - log.Debug("event feed started", zap.Stringer("span", s.totalSpan), zap.Uint64("ts", ts)) + log.Debug("event feed started", + zap.Stringer("span", s.totalSpan), zap.Uint64("ts", ts), + zap.String("changefeed", s.client.changefeed)) g, ctx := errgroup.WithContext(ctx) @@ -555,6 +570,7 @@ func (s *eventFeedSession) eventFeed(ctx context.Context, ts uint64) error { zapFieldAddr = zap.String("addr", errInfo.singleRegionInfo.rpcCtx.Addr) } log.Info("EventFeed retry rate limited", + zap.String("changefeed", s.client.changefeed), zap.Uint64("regionID", errInfo.singleRegionInfo.verID.GetID()), zap.Uint64("ts", errInfo.singleRegionInfo.ts), zap.String("changefeed", cfID), zap.Stringer("span", errInfo.span), @@ -608,6 +624,7 @@ func (s *eventFeedSession) scheduleRegionRequest(ctx context.Context, sri single } case regionspan.LockRangeStatusStale: log.Info("request expired", + zap.String("changefeed", s.client.changefeed), zap.Uint64("regionID", sri.verID.GetID()), zap.Stringer("span", sri.span), zap.Reflect("retrySpans", res.RetryRanges)) @@ -655,7 +672,10 @@ func (s *eventFeedSession) scheduleRegionRequest(ctx context.Context, sri single // error handling. This function is non blocking even if error channel is full. // CAUTION: Note that this should only be called in a context that the region has locked it's range. func (s *eventFeedSession) onRegionFail(ctx context.Context, errorInfo regionErrorInfo, revokeToken bool) { - log.Debug("region failed", zap.Uint64("regionID", errorInfo.verID.GetID()), zap.Error(errorInfo.err)) + log.Debug("region failed", + zap.Uint64("regionID", errorInfo.verID.GetID()), + zap.Error(errorInfo.err), + zap.String("changefeed", s.client.changefeed)) s.rangeLock.UnlockRange(errorInfo.span.Start, errorInfo.span.End, errorInfo.verID.GetID(), errorInfo.verID.GetVer(), errorInfo.ts) if revokeToken { s.regionRouter.Release(errorInfo.rpcCtx.Addr) @@ -720,7 +740,9 @@ func (s *eventFeedSession) requestRegionToStore( pendingRegions, ok = storePendingRegions[rpcCtx.Addr] if !ok { // Should never happen - log.Panic("pending regions is not found for store", zap.String("store", rpcCtx.Addr)) + log.Panic("pending regions is not found for store", + zap.String("changefeed", s.client.changefeed), + zap.String("store", rpcCtx.Addr)) } } else { // when a new stream is established, always create a new pending @@ -730,6 +752,7 @@ func (s *eventFeedSession) requestRegionToStore( storePendingRegions[rpcCtx.Addr] = pendingRegions storeID := rpcCtx.Peer.GetStoreId() log.Info("creating new stream to store to send request", + zap.String("changefeed", s.client.changefeed), zap.Uint64("regionID", sri.verID.GetID()), zap.Uint64("requestID", requestID), zap.Uint64("storeID", storeID), @@ -740,6 +763,7 @@ func (s *eventFeedSession) requestRegionToStore( if err != nil { // if get stream failed, maybe the store is down permanently, we should try to relocate the active store log.Warn("get grpc stream client failed", + zap.String("changefeed", s.client.changefeed), zap.Uint64("regionID", sri.verID.GetID()), zap.Uint64("requestID", requestID), zap.Uint64("storeID", storeID), @@ -773,7 +797,9 @@ func (s *eventFeedSession) requestRegionToStore( if s.isPullerInit.IsInitialized() { logReq = log.Info } - logReq("start new request", zap.Reflect("request", req), zap.String("addr", rpcCtx.Addr)) + logReq("start new request", + zap.String("changefeed", s.client.changefeed), + zap.Reflect("request", req), zap.String("addr", rpcCtx.Addr)) err = stream.client.Send(req) @@ -781,6 +807,7 @@ func (s *eventFeedSession) requestRegionToStore( // to do extra work here. if err != nil { log.Warn("send request to stream failed", + zap.String("changefeed", s.client.changefeed), zap.String("addr", rpcCtx.Addr), zap.Uint64("storeID", getStoreID(rpcCtx)), zap.Uint64("regionID", sri.verID.GetID()), @@ -788,7 +815,8 @@ func (s *eventFeedSession) requestRegionToStore( zap.Error(err)) err1 := stream.client.CloseSend() if err1 != nil { - log.Warn("failed to close stream", zap.Error(err1)) + log.Warn("failed to close stream", + zap.Error(err1), zap.String("changefeed", s.client.changefeed)) } // Delete the stream from the map so that the next time the store is accessed, the stream will be // re-established. @@ -837,7 +865,9 @@ func (s *eventFeedSession) dispatchRequest( s.regionChSizeGauge.Dec() } - log.Debug("dispatching region", zap.Uint64("regionID", sri.verID.GetID())) + log.Debug("dispatching region", + zap.String("changefeed", s.client.changefeed), + zap.Uint64("regionID", sri.verID.GetID())) // Send a resolved ts to event channel first, for two reasons: // 1. Since we have locked the region range, and have maintained correct @@ -869,6 +899,7 @@ func (s *eventFeedSession) dispatchRequest( if rpcCtx == nil { // The region info is invalid. Retry the span. log.Info("cannot get rpcCtx, retry span", + zap.String("changefeed", s.client.changefeed), zap.Uint64("regionID", sri.verID.GetID()), zap.Stringer("span", sri.span)) errInfo := newRegionErrorInfo(sri, &rpcCtxUnavailableErr{verID: sri.verID}) @@ -908,17 +939,27 @@ func (s *eventFeedSession) divideAndSendEventFeedToRegions( for _, region := range regions { if region.GetMeta() == nil { err = cerror.ErrMetaNotInRegion.GenWithStackByArgs() - log.Warn("batch load region", zap.Stringer("span", nextSpan), zap.Error(err)) + log.Warn("batch load region", + zap.Stringer("span", nextSpan), zap.Error(err), + zap.String("changefeed", s.client.changefeed), + ) return err } metas = append(metas, region.GetMeta()) } if !regionspan.CheckRegionsLeftCover(metas, nextSpan) { err = cerror.ErrRegionsNotCoverSpan.GenWithStackByArgs(nextSpan, metas) - log.Warn("ScanRegions", zap.Stringer("span", nextSpan), zap.Reflect("regions", metas), zap.Error(err)) + log.Warn("ScanRegions", + zap.Stringer("span", nextSpan), + zap.Reflect("regions", metas), zap.Error(err), + zap.String("changefeed", s.client.changefeed), + ) return err } - log.Debug("ScanRegions", zap.Stringer("span", nextSpan), zap.Reflect("regions", metas)) + log.Debug("ScanRegions", + zap.Stringer("span", nextSpan), + zap.Reflect("regions", metas), + zap.String("changefeed", s.client.changefeed)) return nil }, retry.WithBackoffMaxDelay(50), retry.WithMaxTries(100), retry.WithIsRetryableErr(cerror.IsRetryableError)) if retryErr != nil { @@ -931,13 +972,19 @@ func (s *eventFeedSession) divideAndSendEventFeedToRegions( if err != nil { return errors.Trace(err) } - log.Debug("get partialSpan", zap.Stringer("span", partialSpan), zap.Uint64("regionID", region.Id)) + log.Debug("get partialSpan", + zap.Stringer("span", partialSpan), + zap.Uint64("regionID", region.Id), + zap.String("changefeed", s.client.changefeed)) nextSpan.Start = region.EndKey sri := newSingleRegionInfo(tiRegion.VerID(), partialSpan, ts, nil) s.scheduleRegionRequest(ctx, sri) - log.Debug("partialSpan scheduled", zap.Stringer("span", partialSpan), zap.Uint64("regionID", region.Id)) + log.Debug("partialSpan scheduled", + zap.Stringer("span", partialSpan), + zap.Uint64("regionID", region.Id), + zap.String("changefeed", s.client.changefeed)) // return if no more regions if regionspan.EndCompare(nextSpan.Start, span.End) >= 0 { @@ -1023,17 +1070,21 @@ func (s *eventFeedSession) handleError(ctx context.Context, errInfo regionErrorI return errUnreachable } else if compatibility := innerErr.GetCompatibility(); compatibility != nil { log.Error("tikv reported compatibility error, which is not expected", + zap.String("changefeed", s.client.changefeed), zap.String("rpcCtx", errInfo.rpcCtx.String()), zap.Stringer("error", compatibility)) return cerror.ErrVersionIncompatible.GenWithStackByArgs(compatibility) } else if mismatch := innerErr.GetClusterIdMismatch(); mismatch != nil { log.Error("tikv reported the request cluster ID mismatch error, which is not expected", + zap.String("changefeed", s.client.changefeed), zap.Uint64("tikvCurrentClusterID", mismatch.Current), zap.Uint64("requestClusterID", mismatch.Request)) return cerror.ErrClusterIDMismatch.GenWithStackByArgs(mismatch.Current, mismatch.Request) } else { metricFeedUnknownErrorCounter.Inc() - log.Warn("receive empty or unknown error msg", zap.Stringer("error", innerErr)) + log.Warn("receive empty or unknown error msg", + zap.String("changefeed", s.client.changefeed), + zap.Stringer("error", innerErr)) } case *rpcCtxUnavailableErr: metricFeedRPCCtxUnavailable.Inc() @@ -1084,7 +1135,9 @@ func (s *eventFeedSession) receiveFromStream( // Cancel the pending regions if the stream failed. Otherwise it will remain unhandled in the pendingRegions list // however not registered in the new reconnected stream. defer func() { - log.Info("stream to store closed", zap.String("addr", addr), zap.Uint64("storeID", storeID)) + log.Info("stream to store closed", + zap.String("changefeed", s.client.changefeed), + zap.String("addr", addr), zap.Uint64("storeID", storeID)) failpoint.Inject("kvClientStreamCloseDelay", nil) @@ -1129,12 +1182,14 @@ func (s *eventFeedSession) receiveFromStream( if status.Code(errors.Cause(err)) == codes.Canceled { log.Debug( "receive from stream canceled", + zap.String("changefeed", s.client.changefeed), zap.String("addr", addr), zap.Uint64("storeID", storeID), ) } else { log.Warn( "failed to receive from stream", + zap.String("changefeed", s.client.changefeed), zap.String("addr", addr), zap.Uint64("storeID", storeID), zap.Error(err), @@ -1174,6 +1229,7 @@ func (s *eventFeedSession) receiveFromStream( regionCount = len(cevent.ResolvedTs.Regions) } log.Warn("change data event size too large", + zap.String("changefeed", s.client.changefeed), zap.Int("size", size), zap.Int("eventLen", len(cevent.Events)), zap.Int("resolved region count", regionCount)) } @@ -1209,6 +1265,7 @@ func (s *eventFeedSession) sendRegionChangeEvent( if ok { if state.requestID < event.RequestId { log.Debug("region state entry will be replaced because received message of newer requestID", + zap.String("changefeed", s.client.changefeed), zap.Uint64("regionID", event.RegionId), zap.Uint64("oldRequestID", state.requestID), zap.Uint64("requestID", event.RequestId), @@ -1216,6 +1273,7 @@ func (s *eventFeedSession) sendRegionChangeEvent( isNewSubscription = true } else if state.requestID > event.RequestId { log.Warn("drop event due to event belongs to a stale request", + zap.String("changefeed", s.client.changefeed), zap.Uint64("regionID", event.RegionId), zap.Uint64("requestID", event.RequestId), zap.Uint64("currRequestID", state.requestID), @@ -1232,6 +1290,7 @@ func (s *eventFeedSession) sendRegionChangeEvent( state, ok = pendingRegions.take(event.RequestId) if !ok { log.Warn("drop event due to region feed is removed", + zap.String("changefeed", s.client.changefeed), zap.Uint64("regionID", event.RegionId), zap.Uint64("requestID", event.RequestId), zap.String("addr", addr)) @@ -1242,6 +1301,7 @@ func (s *eventFeedSession) sendRegionChangeEvent( worker.setRegionState(event.RegionId, state) } else if state.isStopped() { log.Warn("drop event due to region feed stopped", + zap.String("changefeed", s.client.changefeed), zap.Uint64("regionID", event.RegionId), zap.Uint64("requestID", event.RequestId), zap.String("addr", addr)) @@ -1271,6 +1331,7 @@ func (s *eventFeedSession) sendResolvedTs( if ok { if state.isStopped() { log.Debug("drop resolved ts due to region feed stopped", + zap.String("changefeed", s.client.changefeed), zap.Uint64("regionID", regionID), zap.Uint64("requestID", state.requestID), zap.String("addr", addr)) diff --git a/cdc/kv/client_bench_test.go b/cdc/kv/client_bench_test.go index 61e04858c2f..3f7aa9b5666 100644 --- a/cdc/kv/client_bench_test.go +++ b/cdc/kv/client_bench_test.go @@ -192,7 +192,7 @@ func prepareBenchMultiStore(b *testing.B, storeNum, regionNum int) ( defer grpcPool.Close() regionCache := tikv.NewRegionCache(pdClient) defer regionCache.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test()) + cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(), "") eventCh := make(chan model.RegionFeedEvent, 1000000) wg.Add(1) go func() { @@ -282,7 +282,7 @@ func prepareBench(b *testing.B, regionNum int) ( defer grpcPool.Close() regionCache := tikv.NewRegionCache(pdClient) defer regionCache.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test()) + cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(), "") eventCh := make(chan model.RegionFeedEvent, 1000000) wg.Add(1) go func() { diff --git a/cdc/kv/client_test.go b/cdc/kv/client_test.go index c86f49872d9..181de57e0e9 100644 --- a/cdc/kv/client_test.go +++ b/cdc/kv/client_test.go @@ -94,7 +94,7 @@ func (s *clientSuite) TestNewClient(c *check.C) { defer grpcPool.Close() regionCache := tikv.NewRegionCache(pdClient) defer regionCache.Close() - cli := NewCDCClient(context.Background(), pdClient, nil, grpcPool, regionCache, pdtime.NewClock4Test()) + cli := NewCDCClient(context.Background(), pdClient, nil, grpcPool, regionCache, pdtime.NewClock4Test(), "") c.Assert(cli, check.NotNil) } @@ -370,7 +370,7 @@ func (s *clientSuite) TestConnectOfflineTiKV(c *check.C) { defer grpcPool.Close() regionCache := tikv.NewRegionCache(pdClient) defer regionCache.Close() - cdcClient := NewCDCClient(context.Background(), pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test()) + cdcClient := NewCDCClient(context.Background(), pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(), "") // Take care of the eventCh, it's used to output resolvedTs event or kv event // It will stuck the normal routine eventCh := make(chan model.RegionFeedEvent, 50) @@ -470,7 +470,7 @@ func (s *clientSuite) TestRecvLargeMessageSize(c *check.C) { defer grpcPool.Close() regionCache := tikv.NewRegionCache(pdClient) defer regionCache.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test()) + cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(), "") eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { @@ -568,7 +568,7 @@ func (s *clientSuite) TestHandleError(c *check.C) { defer grpcPool.Close() regionCache := tikv.NewRegionCache(pdClient) defer regionCache.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test()) + cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(), "") eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { @@ -725,7 +725,7 @@ func (s *clientSuite) TestCompatibilityWithSameConn(c *check.C) { defer grpcPool.Close() regionCache := tikv.NewRegionCache(pdClient) defer regionCache.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test()) + cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(), "") eventCh := make(chan model.RegionFeedEvent, 50) var wg2 sync.WaitGroup wg2.Add(1) @@ -792,7 +792,7 @@ func (s *clientSuite) TestClusterIDMismatch(c *check.C) { defer grpcPool.Close() regionCache := tikv.NewRegionCache(pdClient) defer regionCache.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test()) + cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(), "") eventCh := make(chan model.RegionFeedEvent, 50) var wg2 sync.WaitGroup @@ -858,7 +858,7 @@ func (s *clientSuite) testHandleFeedEvent(c *check.C) { defer grpcPool.Close() regionCache := tikv.NewRegionCache(pdClient) defer regionCache.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test()) + cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(), "") eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { @@ -1314,7 +1314,7 @@ func (s *clientSuite) TestStreamSendWithError(c *check.C) { defer grpcPool.Close() regionCache := tikv.NewRegionCache(pdClient) defer regionCache.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test()) + cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(), "") eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { @@ -1424,7 +1424,7 @@ func (s *clientSuite) testStreamRecvWithError(c *check.C, failpointStr string) { defer grpcPool.Close() regionCache := tikv.NewRegionCache(pdClient) defer regionCache.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test()) + cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(), "") eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { @@ -1553,7 +1553,7 @@ func (s *clientSuite) TestStreamRecvWithErrorAndResolvedGoBack(c *check.C) { defer grpcPool.Close() regionCache := tikv.NewRegionCache(pdClient) defer regionCache.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test()) + cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(), "") eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { @@ -1762,7 +1762,7 @@ func (s *clientSuite) TestIncompatibleTiKV(c *check.C) { defer grpcPool.Close() regionCache := tikv.NewRegionCache(pdClient) defer regionCache.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test()) + cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(), "") // NOTICE: eventCh may block the main logic of EventFeed eventCh := make(chan model.RegionFeedEvent, 128) wg.Add(1) @@ -1838,7 +1838,7 @@ func (s *clientSuite) TestNoPendingRegionError(c *check.C) { defer grpcPool.Close() regionCache := tikv.NewRegionCache(pdClient) defer regionCache.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test()) + cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(), "") eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) @@ -1915,7 +1915,7 @@ func (s *clientSuite) TestDropStaleRequest(c *check.C) { defer grpcPool.Close() regionCache := tikv.NewRegionCache(pdClient) defer regionCache.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test()) + cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(), "") eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { @@ -2024,7 +2024,7 @@ func (s *clientSuite) TestResolveLock(c *check.C) { defer grpcPool.Close() regionCache := tikv.NewRegionCache(pdClient) defer regionCache.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test()) + cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(), "") eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { @@ -2124,7 +2124,7 @@ func (s *clientSuite) testEventCommitTsFallback(c *check.C, events []*cdcpb.Chan defer grpcPool.Close() regionCache := tikv.NewRegionCache(pdClient) defer regionCache.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test()) + cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(), "") eventCh := make(chan model.RegionFeedEvent, 50) var clientWg sync.WaitGroup clientWg.Add(1) @@ -2278,7 +2278,7 @@ func (s *clientSuite) testEventAfterFeedStop(c *check.C) { defer grpcPool.Close() regionCache := tikv.NewRegionCache(pdClient) defer regionCache.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test()) + cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(), "") eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { @@ -2457,7 +2457,7 @@ func (s *clientSuite) TestOutOfRegionRangeEvent(c *check.C) { defer grpcPool.Close() regionCache := tikv.NewRegionCache(pdClient) defer regionCache.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test()) + cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(), "") eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { @@ -2671,7 +2671,7 @@ func (s *clientSuite) TestResolveLockNoCandidate(c *check.C) { defer grpcPool.Close() regionCache := tikv.NewRegionCache(pdClient) defer regionCache.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test()) + cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(), "") eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { @@ -2765,7 +2765,7 @@ func (s *clientSuite) TestFailRegionReentrant(c *check.C) { defer grpcPool.Close() regionCache := tikv.NewRegionCache(pdClient) defer regionCache.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test()) + cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(), "") eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { @@ -2846,7 +2846,7 @@ func (s *clientSuite) TestClientV1UnlockRangeReentrant(c *check.C) { defer grpcPool.Close() regionCache := tikv.NewRegionCache(pdClient) defer regionCache.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test()) + cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(), "") eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { @@ -2912,7 +2912,7 @@ func (s *clientSuite) testClientErrNoPendingRegion(c *check.C) { defer grpcPool.Close() regionCache := tikv.NewRegionCache(pdClient) defer regionCache.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test()) + cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(), "") eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { @@ -2986,7 +2986,7 @@ func (s *clientSuite) testKVClientForceReconnect(c *check.C) { defer grpcPool.Close() regionCache := tikv.NewRegionCache(pdClient) defer regionCache.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test()) + cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(), "") eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { @@ -3136,7 +3136,7 @@ func (s *clientSuite) TestConcurrentProcessRangeRequest(c *check.C) { defer grpcPool.Close() regionCache := tikv.NewRegionCache(pdClient) defer regionCache.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test()) + cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(), "") eventCh := make(chan model.RegionFeedEvent, 100) wg.Add(1) go func() { @@ -3251,7 +3251,7 @@ func (s *clientSuite) TestEvTimeUpdate(c *check.C) { defer grpcPool.Close() regionCache := tikv.NewRegionCache(pdClient) defer regionCache.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test()) + cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(), "") eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { @@ -3372,7 +3372,7 @@ func (s *clientSuite) TestRegionWorkerExitWhenIsIdle(c *check.C) { defer grpcPool.Close() regionCache := tikv.NewRegionCache(pdClient) defer regionCache.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test()) + cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(), "") eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { @@ -3463,7 +3463,7 @@ func (s *clientSuite) TestPrewriteNotMatchError(c *check.C) { defer grpcPool.Close() regionCache := tikv.NewRegionCache(pdClient) defer regionCache.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test()) + cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(), "") eventCh := make(chan model.RegionFeedEvent, 50) baseAllocatedID := currentRequestID() diff --git a/cdc/kv/region_worker.go b/cdc/kv/region_worker.go index 9f288418183..f3a19e1f45a 100644 --- a/cdc/kv/region_worker.go +++ b/cdc/kv/region_worker.go @@ -241,6 +241,7 @@ func (w *regionWorker) handleSingleRegionError(err error, state *regionFeedState } regionID := state.sri.verID.GetID() log.Info("single region event feed disconnected", + zap.String("changefeed", w.session.client.changefeed), zap.Uint64("regionID", regionID), zap.Uint64("requestID", state.requestID), zap.Stringer("span", state.sri.span), @@ -301,7 +302,8 @@ func (w *regionWorker) resolveLock(ctx context.Context) error { case <-advanceCheckTicker.C: currentTimeFromPD, err := w.session.client.pdClock.CurrentTime() if err != nil { - log.Warn("failed to get current version from PD", zap.Error(err)) + log.Warn("failed to get current version from PD", + zap.Error(err), zap.String("changefeed", w.session.client.changefeed)) continue } expired := make([]*regionTsInfo, 0) @@ -335,7 +337,7 @@ func (w *regionWorker) resolveLock(ctx context.Context) error { if sinceLastResolvedTs >= resolveLockInterval { sinceLastEvent := time.Since(rts.ts.eventTime) if sinceLastResolvedTs > reconnectInterval && sinceLastEvent > reconnectInterval { - log.Warn("kv client reconnect triggered", + log.Warn("kv client reconnect triggered", zap.String("changefeed", w.session.client.changefeed), zap.Duration("duration", sinceLastResolvedTs), zap.Duration("since last event", sinceLastResolvedTs)) return errReconnect } @@ -351,6 +353,7 @@ func (w *regionWorker) resolveLock(ctx context.Context) error { continue } log.Warn("region not receiving resolved event from tikv or resolved ts is not pushing for too long time, try to resolve lock", + zap.String("changefeed", w.session.client.changefeed), zap.Uint64("regionID", rts.regionID), zap.Stringer("span", state.getRegionSpan()), zap.Duration("duration", sinceLastResolvedTs), @@ -359,7 +362,9 @@ func (w *regionWorker) resolveLock(ctx context.Context) error { ) err = w.session.lockResolver.Resolve(ctx, rts.regionID, maxVersion) if err != nil { - log.Warn("failed to resolve lock", zap.Uint64("regionID", rts.regionID), zap.Error(err)) + log.Warn("failed to resolve lock", + zap.Uint64("regionID", rts.regionID), zap.Error(err), + zap.String("changefeed", w.session.client.changefeed)) continue } rts.ts.penalty = 0 @@ -387,7 +392,9 @@ func (w *regionWorker) processEvent(ctx context.Context, event *regionStatefulEv err = w.handleSingleRegionError(err, event.state) } case *cdcpb.Event_Admin_: - log.Info("receive admin event", zap.Stringer("event", event.changeEvent)) + log.Info("receive admin event", + zap.Stringer("event", event.changeEvent), + zap.String("changefeed", w.session.client.changefeed)) case *cdcpb.Event_Error: err = w.handleSingleRegionError( cerror.WrapError(cerror.ErrEventFeedEventError, &eventError{err: x.Error}), @@ -438,7 +445,8 @@ func (w *regionWorker) eventHandler(ctx context.Context) error { // event == nil means the region worker should exit and re-establish // all existing regions. if !ok || event == nil { - log.Info("region worker closed by error") + log.Info("region worker closed by error", + zap.String("changefeed", w.session.client.changefeed)) exitEventHandler = true return } @@ -579,7 +587,9 @@ func (w *regionWorker) cancelStream(delay time.Duration) { // to avoid too frequent region rebuilt. time.Sleep(delay) } else { - log.Warn("gRPC stream cancel func not found", zap.String("addr", w.storeAddr)) + log.Warn("gRPC stream cancel func not found", + zap.String("addr", w.storeAddr), + zap.String("changefeed", w.session.client.changefeed)) } } @@ -632,6 +642,7 @@ func (w *regionWorker) handleEventEntry( case cdcpb.Event_INITIALIZED: if time.Since(state.startFeedTime) > 20*time.Second { log.Warn("The time cost of initializing is too much", + zap.String("changefeed", w.session.client.changefeed), zap.Duration("duration", time.Since(state.startFeedTime)), zap.Uint64("regionID", regionID)) } @@ -738,6 +749,7 @@ func (w *regionWorker) handleResolvedTs( if resolvedTs < state.lastResolvedTs { log.Warn("The resolvedTs is fallen back in kvclient", + zap.String("changefeed", w.session.client.changefeed), zap.String("EventType", "RESOLVED"), zap.Uint64("resolvedTs", resolvedTs), zap.Uint64("lastResolvedTs", state.lastResolvedTs), diff --git a/cdc/kv/testing.go b/cdc/kv/testing.go index 6a85e1f79d2..61cb9c5862c 100644 --- a/cdc/kv/testing.go +++ b/cdc/kv/testing.go @@ -153,7 +153,7 @@ func TestSplit(t require.TestingT, pdCli pd.Client, storage tikv.Storage, kvStor grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{}) regionCache := tikv.NewRegionCache(pdCli) - cli := NewCDCClient(context.Background(), pdCli, storage, grpcPool, regionCache, pdtime.NewClock4Test()) + cli := NewCDCClient(context.Background(), pdCli, storage, grpcPool, regionCache, pdtime.NewClock4Test(), "") startTS := mustGetTimestamp(t, storage) @@ -243,7 +243,7 @@ func TestGetKVSimple(t require.TestingT, pdCli pd.Client, storage tikv.Storage, grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{}) regionCache := tikv.NewRegionCache(pdCli) - cli := NewCDCClient(context.Background(), pdCli, storage, grpcPool, regionCache, pdtime.NewClock4Test()) + cli := NewCDCClient(context.Background(), pdCli, storage, grpcPool, regionCache, pdtime.NewClock4Test(), "") startTS := mustGetTimestamp(t, storage) lockresolver := txnutil.NewLockerResolver(storage) diff --git a/cdc/owner/ddl_puller.go b/cdc/owner/ddl_puller.go index 18e0374aa6a..ffdabfd9c6c 100644 --- a/cdc/owner/ddl_puller.go +++ b/cdc/owner/ddl_puller.go @@ -80,6 +80,8 @@ func newDDLPuller(ctx cdcContext.Context, startTs uint64) (DDLPuller, error) { ctx.GlobalVars().RegionCache, kvStorage, ctx.GlobalVars().PDClock, + // Add "_ddl_puller" to make it different from table pullers. + ctx.ChangefeedVars().ID+"_ddl_puller", startTs, []regionspan.Span{regionspan.GetDDLSpan(), regionspan.GetAddIndexDDLSpan()}, false) } diff --git a/cdc/processor/pipeline/puller.go b/cdc/processor/pipeline/puller.go index c32edd68b03..d28e395f7ca 100644 --- a/cdc/processor/pipeline/puller.go +++ b/cdc/processor/pipeline/puller.go @@ -31,16 +31,20 @@ type pullerNode struct { tableID model.TableID replicaInfo *model.TableReplicaInfo + changefeed string cancel context.CancelFunc wg *errgroup.Group } func newPullerNode( - tableID model.TableID, replicaInfo *model.TableReplicaInfo, tableName string) pipeline.Node { + tableID model.TableID, replicaInfo *model.TableReplicaInfo, + tableName, changefeed string, +) pipeline.Node { return &pullerNode{ tableID: tableID, replicaInfo: replicaInfo, tableName: tableName, + changefeed: changefeed, } } @@ -75,6 +79,7 @@ func (n *pullerNode) InitWithWaitGroup(ctx pipeline.NodeContext, wg *errgroup.Gr ctx.GlobalVars().RegionCache, ctx.GlobalVars().KVStorage, ctx.GlobalVars().PDClock, + n.changefeed, n.replicaInfo.StartTs, n.tableSpan(ctx), true) n.wg.Go(func() error { ctx.Throw(errors.Trace(plr.Run(ctxC))) diff --git a/cdc/processor/pipeline/table.go b/cdc/processor/pipeline/table.go index c3a01e73270..b0a45e269d5 100644 --- a/cdc/processor/pipeline/table.go +++ b/cdc/processor/pipeline/table.go @@ -178,6 +178,7 @@ func NewTablePipeline(ctx cdcContext.Context, sink sink.Sink, targetTs model.Ts) TablePipeline { ctx, cancel := cdcContext.WithCancel(ctx) + changefeed := ctx.ChangefeedVars().ID replConfig := ctx.ChangefeedVars().Info.Config tablePipeline := &tablePipelineImpl{ tableID: tableID, @@ -206,7 +207,7 @@ func NewTablePipeline(ctx cdcContext.Context, newSorterNode(tableName, tableID, replicaInfo.StartTs, flowController, mounter, replConfig) sinkNode := newSinkNode(tableID, sink, replicaInfo.StartTs, targetTs, flowController) - p.AppendNode(ctx, "puller", newPullerNode(tableID, replicaInfo, tableName)) + p.AppendNode(ctx, "puller", newPullerNode(tableID, replicaInfo, tableName, changefeed)) p.AppendNode(ctx, "sorter", sorterNode) if cyclicEnabled { p.AppendNode(ctx, "cyclic", newCyclicMarkNode(replicaInfo.MarkTableID)) diff --git a/cdc/processor/processor.go b/cdc/processor/processor.go index ffe0d848f57..e4f9251acd3 100644 --- a/cdc/processor/processor.go +++ b/cdc/processor/processor.go @@ -672,6 +672,7 @@ func (p *processor) createAndDriveSchemaStorage(ctx cdcContext.Context) (entry.S ctx.GlobalVars().RegionCache, ctx.GlobalVars().KVStorage, ctx.GlobalVars().PDClock, + ctx.ChangefeedVars().ID, checkpointTs, ddlspans, false) meta, err := kv.GetSnapshotMeta(kvStorage, checkpointTs) if err != nil { diff --git a/cdc/puller/puller.go b/cdc/puller/puller.go index cc0f6b8cf61..a04831a0eba 100644 --- a/cdc/puller/puller.go +++ b/cdc/puller/puller.go @@ -73,6 +73,7 @@ func NewPuller( regionCache *tikv.RegionCache, kvStorage tidbkv.Storage, pdClock pdtime.Clock, + changefeed string, checkpointTs uint64, spans []regionspan.Span, enableOldValue bool, @@ -89,7 +90,7 @@ func NewPuller( // the initial ts for frontier to 0. Once the puller level resolved ts // initialized, the ts should advance to a non-zero value. tsTracker := frontier.NewFrontier(0, comparableSpans...) - kvCli := kv.NewCDCKVClient(ctx, pdCli, tikvStorage, grpcPool, regionCache, pdClock) + kvCli := kv.NewCDCKVClient(ctx, pdCli, tikvStorage, grpcPool, regionCache, pdClock, changefeed) p := &pullerImpl{ kvCli: kvCli, kvStorage: tikvStorage, diff --git a/cdc/puller/puller_test.go b/cdc/puller/puller_test.go index 9c0ae6c069a..fe9c5d82da6 100644 --- a/cdc/puller/puller_test.go +++ b/cdc/puller/puller_test.go @@ -65,6 +65,7 @@ func newMockCDCKVClient( grpcPool kv.GrpcPool, regionCache *tikv.RegionCache, pdClock pdtime.Clock, + changefeed string, ) kv.CDCKVClient { return &mockCDCKVClient{ expectations: make(chan model.RegionFeedEvent, 1024), @@ -130,7 +131,7 @@ func (s *pullerSuite) newPullerForTest( regionCache := tikv.NewRegionCache(pdCli) defer regionCache.Close() plr := NewPuller( - ctx, pdCli, grpcPool, regionCache, store, pdtime.NewClock4Test(), + ctx, pdCli, grpcPool, regionCache, store, pdtime.NewClock4Test(), "", checkpointTs, spans, enableOldValue) wg.Add(1) go func() { diff --git a/pkg/regionspan/region_range_lock.go b/pkg/regionspan/region_range_lock.go index 5f5b97e3a7e..3792357e0ef 100644 --- a/pkg/regionspan/region_range_lock.go +++ b/pkg/regionspan/region_range_lock.go @@ -145,6 +145,7 @@ func allocID() uint64 { // version number, which should comes from the Region's Epoch version. The version is used to compare which range is // new and which is old if two ranges are overlapping. type RegionRangeLock struct { + changefeed string mu sync.Mutex rangeCheckpointTs *RangeTsMap rangeLock *btree.BTree @@ -154,8 +155,11 @@ type RegionRangeLock struct { } // NewRegionRangeLock creates a new RegionRangeLock. -func NewRegionRangeLock(startKey, endKey []byte, startTs uint64) *RegionRangeLock { +func NewRegionRangeLock( + startKey, endKey []byte, startTs uint64, changefeed string, +) *RegionRangeLock { return &RegionRangeLock{ + changefeed: changefeed, rangeCheckpointTs: NewRangeTsMap(startKey, endKey, startTs), rangeLock: btree.New(16), regionIDLock: make(map[uint64]*rangeLockEntry), @@ -215,7 +219,9 @@ func (l *RegionRangeLock) tryLockRange(startKey, endKey []byte, regionID, versio l.rangeLock.ReplaceOrInsert(newEntry) l.regionIDLock[regionID] = newEntry - log.Info("range locked", zap.Uint64("lockID", l.id), zap.Uint64("regionID", regionID), + log.Info("range locked", + zap.String("changefeed", l.changefeed), + zap.Uint64("lockID", l.id), zap.Uint64("regionID", regionID), zap.String("startKey", hex.EncodeToString(startKey)), zap.String("endKey", hex.EncodeToString(endKey)), zap.Uint64("checkpointTs", checkpointTs)) @@ -243,7 +249,9 @@ func (l *RegionRangeLock) tryLockRange(startKey, endKey []byte, regionID, versio retryRanges := make([]ComparableSpan, 0) currentRangeStartKey := startKey - log.Info("tryLockRange stale", zap.Uint64("lockID", l.id), zap.Uint64("regionID", regionID), + log.Info("tryLockRange stale", + zap.String("changefeed", l.changefeed), + zap.Uint64("lockID", l.id), zap.Uint64("regionID", regionID), zap.String("startKey", hex.EncodeToString(startKey)), zap.String("endKey", hex.EncodeToString(endKey)), zap.Strings("allOverlapping", overlapStr)) // DEBUG for _, r := range overlappingEntries { @@ -278,7 +286,9 @@ func (l *RegionRangeLock) tryLockRange(startKey, endKey []byte, regionID, versio } - log.Info("lock range blocked", zap.Uint64("lockID", l.id), zap.Uint64("regionID", regionID), + log.Info("lock range blocked", + zap.String("changefeed", l.changefeed), + zap.Uint64("lockID", l.id), zap.Uint64("regionID", regionID), zap.String("startKey", hex.EncodeToString(startKey)), zap.String("endKey", hex.EncodeToString(endKey)), zap.Strings("blockedBy", overlapStr)) // DEBUG return LockRangeResult{ @@ -324,6 +334,7 @@ func (l *RegionRangeLock) UnlockRange(startKey, endKey []byte, regionID, version if item == nil { log.Panic("unlocking a not locked range", + zap.String("changefeed", l.changefeed), zap.Uint64("regionID", regionID), zap.String("startKey", hex.EncodeToString(startKey)), zap.String("endKey", hex.EncodeToString(endKey)), @@ -334,6 +345,7 @@ func (l *RegionRangeLock) UnlockRange(startKey, endKey []byte, regionID, version entry := item.(*rangeLockEntry) if entry.regionID != regionID { log.Panic("unlocked a range but regionID mismatch", + zap.String("changefeed", l.changefeed), zap.Uint64("expectedRegionID", regionID), zap.Uint64("foundRegionID", entry.regionID), zap.String("startKey", hex.EncodeToString(startKey)), @@ -341,6 +353,7 @@ func (l *RegionRangeLock) UnlockRange(startKey, endKey []byte, regionID, version } if entry != l.regionIDLock[regionID] { log.Panic("range lock and region id lock mismatch when trying to unlock", + zap.String("changefeed", l.changefeed), zap.Uint64("unlockingRegionID", regionID), zap.String("rangeLockEntry", entry.String()), zap.String("regionIDLockEntry", l.regionIDLock[regionID].String())) @@ -348,8 +361,8 @@ func (l *RegionRangeLock) UnlockRange(startKey, endKey []byte, regionID, version delete(l.regionIDLock, regionID) if entry.version != version || !bytes.Equal(entry.endKey, endKey) { - log.Panic("unlocking region doesn't match the locked region. "+ - "Locked: [%v, %v), version %v; Unlocking: [%v, %v), %v", + log.Panic("unlocking region doesn't match the locked region", + zap.String("changefeed", l.changefeed), zap.Uint64("regionID", regionID), zap.String("startKey", hex.EncodeToString(startKey)), zap.String("endKey", hex.EncodeToString(endKey)), @@ -367,7 +380,9 @@ func (l *RegionRangeLock) UnlockRange(startKey, endKey []byte, regionID, version panic("unreachable") } l.rangeCheckpointTs.Set(startKey, endKey, checkpointTs) - log.Info("unlocked range", zap.Uint64("lockID", l.id), zap.Uint64("regionID", entry.regionID), + log.Info("unlocked range", + zap.String("changefeed", l.changefeed), + zap.Uint64("lockID", l.id), zap.Uint64("regionID", entry.regionID), zap.String("startKey", hex.EncodeToString(startKey)), zap.String("endKey", hex.EncodeToString(endKey)), zap.Uint64("checkpointTs", checkpointTs)) } diff --git a/pkg/regionspan/region_range_lock_test.go b/pkg/regionspan/region_range_lock_test.go index cfc891b228d..e159b6d412b 100644 --- a/pkg/regionspan/region_range_lock_test.go +++ b/pkg/regionspan/region_range_lock_test.go @@ -87,7 +87,7 @@ func TestRegionRangeLock(t *testing.T) { t.Parallel() ctx := context.TODO() - l := NewRegionRangeLock([]byte("a"), []byte("h"), math.MaxUint64) + l := NewRegionRangeLock([]byte("a"), []byte("h"), math.MaxUint64, "") mustLockRangeSuccess(ctx, t, l, "a", "e", 1, 1, math.MaxUint64) unlockRange(l, "a", "e", 1, 1, 100) @@ -104,7 +104,7 @@ func TestRegionRangeLock(t *testing.T) { func TestRegionRangeLockStale(t *testing.T) { t.Parallel() - l := NewRegionRangeLock([]byte("a"), []byte("z"), math.MaxUint64) + l := NewRegionRangeLock([]byte("a"), []byte("z"), math.MaxUint64, "") ctx := context.TODO() mustLockRangeSuccess(ctx, t, l, "c", "g", 1, 10, math.MaxUint64) mustLockRangeSuccess(ctx, t, l, "j", "n", 2, 8, math.MaxUint64) @@ -127,7 +127,7 @@ func TestRegionRangeLockLockingRegionID(t *testing.T) { t.Parallel() ctx := context.TODO() - l := NewRegionRangeLock([]byte("a"), []byte("z"), math.MaxUint64) + l := NewRegionRangeLock([]byte("a"), []byte("z"), math.MaxUint64, "") mustLockRangeSuccess(ctx, t, l, "c", "d", 1, 10, math.MaxUint64) mustLockRangeStale(ctx, t, l, "e", "f", 1, 5, "e", "f") @@ -163,7 +163,7 @@ func TestRegionRangeLockCanBeCancelled(t *testing.T) { t.Parallel() ctx, cancel := context.WithCancel(context.Background()) - l := NewRegionRangeLock([]byte("a"), []byte("z"), math.MaxUint64) + l := NewRegionRangeLock([]byte("a"), []byte("z"), math.MaxUint64, "") mustLockRangeSuccess(ctx, t, l, "g", "h", 1, 10, math.MaxUint64) wait := mustLockRangeWait(ctx, t, l, "g", "h", 1, 12) cancel()