diff --git a/cdc/capture.go b/cdc/capture.go index 031a3c1a0cd..5010365fb81 100644 --- a/cdc/capture.go +++ b/cdc/capture.go @@ -27,6 +27,7 @@ import ( cerror "github.com/pingcap/ticdc/pkg/errors" "github.com/pingcap/ticdc/pkg/security" "github.com/pingcap/ticdc/pkg/util" + pd "github.com/tikv/pd/client" "go.etcd.io/etcd/clientv3" "go.etcd.io/etcd/clientv3/concurrency" "go.etcd.io/etcd/mvcc" @@ -49,6 +50,7 @@ type processorOpts struct { // Capture represents a Capture server, it monitors the changefeed information in etcd and schedules Task on it. type Capture struct { etcdClient kv.CDCEtcdClient + pdCli pd.Client credential *security.Credential processors map[string]*processor @@ -67,6 +69,7 @@ type Capture struct { func NewCapture( ctx context.Context, pdEndpoints []string, + pdCli pd.Client, credential *security.Credential, advertiseAddr string, opts *processorOpts, @@ -126,6 +129,7 @@ func NewCapture( election: elec, info: info, opts: opts, + pdCli: pdCli, } return @@ -245,7 +249,7 @@ func (c *Capture) assignTask(ctx context.Context, task *Task) (*processor, error zap.String("changefeed", task.ChangeFeedID)) p, err := runProcessor( - ctx, c.credential, c.session, *cf, task.ChangeFeedID, *c.info, task.CheckpointTS, c.opts.flushCheckpointInterval) + ctx, c.pdCli, c.credential, c.session, *cf, task.ChangeFeedID, *c.info, task.CheckpointTS, c.opts.flushCheckpointInterval) if err != nil { log.Error("run processor failed", zap.String("changefeed", task.ChangeFeedID), diff --git a/cdc/owner_test.go b/cdc/owner_test.go index b486cd19ff5..ddf89b6a862 100644 --- a/cdc/owner_test.go +++ b/cdc/owner_test.go @@ -504,7 +504,7 @@ func (s *ownerSuite) TestHandleAdmin(c *check.C) { defer sink.Close() //nolint:errcheck sampleCF.sink = sink - capture, err := NewCapture(ctx, []string{s.clientURL.String()}, + capture, err := NewCapture(ctx, []string{s.clientURL.String()}, nil, &security.Credential{}, "127.0.0.1:12034", &processorOpts{flushCheckpointInterval: time.Millisecond * 200}) c.Assert(err, check.IsNil) err = capture.Campaign(ctx) @@ -806,7 +806,7 @@ func (s *ownerSuite) TestWatchCampaignKey(c *check.C) { defer s.TearDownTest(c) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - capture, err := NewCapture(ctx, []string{s.clientURL.String()}, + capture, err := NewCapture(ctx, []string{s.clientURL.String()}, nil, &security.Credential{}, "127.0.0.1:12034", &processorOpts{}) c.Assert(err, check.IsNil) err = capture.Campaign(ctx) @@ -864,7 +864,7 @@ func (s *ownerSuite) TestCleanUpStaleTasks(c *check.C) { defer cancel() addr := "127.0.0.1:12034" ctx = util.PutCaptureAddrInCtx(ctx, addr) - capture, err := NewCapture(ctx, []string{s.clientURL.String()}, + capture, err := NewCapture(ctx, []string{s.clientURL.String()}, nil, &security.Credential{}, addr, &processorOpts{}) c.Assert(err, check.IsNil) err = s.client.PutCaptureInfo(ctx, capture.info, capture.session.Lease()) diff --git a/cdc/processor.go b/cdc/processor.go index 11f391f708a..cfcfb4c62c1 100644 --- a/cdc/processor.go +++ b/cdc/processor.go @@ -66,8 +66,6 @@ const ( schemaStorageGCLag = time.Minute * 20 ) -var fNewPDCli = pd.NewClientWithContext - type processor struct { id string captureInfo model.CaptureInfo @@ -156,6 +154,7 @@ func (t *tableInfo) safeStop() (stopped bool, checkpointTs model.Ts) { // newProcessor creates and returns a processor for the specified change feed func newProcessor( ctx context.Context, + pdCli pd.Client, credential *security.Credential, session *concurrency.Session, changefeed model.ChangeFeedInfo, @@ -167,12 +166,6 @@ func newProcessor( flushCheckpointInterval time.Duration, ) (*processor, error) { etcdCli := session.Client() - endpoints := session.Client().Endpoints() - pdCli, err := fNewPDCli(ctx, endpoints, credential.PDSecurityOption()) - if err != nil { - return nil, errors.Annotatef( - cerror.WrapError(cerror.ErrNewProcessorFailed, err), "create pd client failed, addr: %v", endpoints) - } cdcEtcdCli := kv.NewCDCEtcdClient(ctx, etcdCli) limitter := puller.NewBlurResourceLimmter(defaultMemBufferCapacity) @@ -1258,6 +1251,7 @@ func (p *processor) isStopped() bool { // runProcessor creates a new processor then starts it. func runProcessor( ctx context.Context, + pdCli pd.Client, credential *security.Credential, session *concurrency.Session, info model.ChangeFeedInfo, @@ -1284,7 +1278,7 @@ func runProcessor( cancel() return nil, errors.Trace(err) } - processor, err := newProcessor(ctx, credential, session, info, sink, + processor, err := newProcessor(ctx, pdCli, credential, session, info, sink, changefeedID, captureInfo, checkpointTs, errCh, flushCheckpointInterval) if err != nil { cancel() diff --git a/cdc/server.go b/cdc/server.go index 002a3a0cd25..5ab1d25d900 100644 --- a/cdc/server.go +++ b/cdc/server.go @@ -238,6 +238,18 @@ func (s *Server) Run(ctx context.Context) error { if err != nil { return err } + + kvStore, err := kv.CreateTiStore(strings.Join(s.pdEndpoints, ","), s.opts.credential) + if err != nil { + return errors.Trace(err) + } + defer func() { + err := kvStore.Close() + if err != nil { + log.Warn("kv store close failed", zap.Error(err)) + } + }() + ctx = util.PutKVStorageInCtx(ctx, kvStore) // When a capture suicided, restart it for { if err := s.run(ctx); cerror.ErrCaptureSuicide.NotEqual(err) { @@ -306,20 +318,9 @@ func (s *Server) campaignOwnerLoop(ctx context.Context) error { func (s *Server) run(ctx context.Context) (err error) { ctx = util.PutCaptureAddrInCtx(ctx, s.opts.advertiseAddr) ctx = util.PutTimezoneInCtx(ctx, s.opts.timezone) - kvStore, err := kv.CreateTiStore(strings.Join(s.pdEndpoints, ","), s.opts.credential) - if err != nil { - return errors.Trace(err) - } - defer func() { - err := kvStore.Close() - if err != nil { - log.Warn("kv store close failed", zap.Error(err)) - } - }() - ctx = util.PutKVStorageInCtx(ctx, kvStore) procOpts := &processorOpts{flushCheckpointInterval: s.opts.processorFlushInterval} - capture, err := NewCapture(ctx, s.pdEndpoints, s.opts.credential, s.opts.advertiseAddr, procOpts) + capture, err := NewCapture(ctx, s.pdEndpoints, s.pdClient, s.opts.credential, s.opts.advertiseAddr, procOpts) if err != nil { return err }