Skip to content

Commit

Permalink
cherry pick pingcap#1217 to release-4.0
Browse files Browse the repository at this point in the history
Signed-off-by: ti-srebot <[email protected]>
  • Loading branch information
leoppro authored and ti-srebot committed Dec 16, 2020
1 parent 03f224c commit 53924ba
Show file tree
Hide file tree
Showing 4 changed files with 115 additions and 21 deletions.
10 changes: 9 additions & 1 deletion cdc/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ import (
"github.com/pingcap/ticdc/cdc/model"
cerror "github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/security"
<<<<<<< HEAD
=======
"github.com/pingcap/ticdc/pkg/util"
pd "github.com/tikv/pd/client"
>>>>>>> 2a09a89... *: Using the global singleton for pd client and tikv client, and fix pd client freeze (#1217)
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/clientv3/concurrency"
"go.etcd.io/etcd/mvcc"
Expand All @@ -48,6 +53,7 @@ type processorOpts struct {
// Capture represents a Capture server, it monitors the changefeed information in etcd and schedules Task on it.
type Capture struct {
etcdClient kv.CDCEtcdClient
pdCli pd.Client
credential *security.Credential

processors map[string]*processor
Expand All @@ -66,6 +72,7 @@ type Capture struct {
func NewCapture(
ctx context.Context,
pdEndpoints []string,
pdCli pd.Client,
credential *security.Credential,
advertiseAddr string,
opts *processorOpts,
Expand Down Expand Up @@ -126,6 +133,7 @@ func NewCapture(
election: elec,
info: info,
opts: opts,
pdCli: pdCli,
}

return
Expand Down Expand Up @@ -243,7 +251,7 @@ func (c *Capture) assignTask(ctx context.Context, task *Task) (*processor, error
zap.String("changefeedid", task.ChangeFeedID))

p, err := runProcessor(
ctx, c.credential, c.session, *cf, task.ChangeFeedID, *c.info, task.CheckpointTS, c.opts.flushCheckpointInterval)
ctx, c.pdCli, c.credential, c.session, *cf, task.ChangeFeedID, *c.info, task.CheckpointTS, c.opts.flushCheckpointInterval)
if err != nil {
log.Error("run processor failed",
zap.String("changefeedid", task.ChangeFeedID),
Expand Down
88 changes: 87 additions & 1 deletion cdc/owner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,7 @@ func (s *ownerSuite) TestHandleAdmin(c *check.C) {
defer sink.Close() //nolint:errcheck
sampleCF.sink = sink

capture, err := NewCapture(ctx, []string{s.clientURL.String()},
capture, err := NewCapture(ctx, []string{s.clientURL.String()}, nil,
&security.Credential{}, "127.0.0.1:12034", &processorOpts{flushCheckpointInterval: time.Millisecond * 200})
c.Assert(err, check.IsNil)
err = capture.Campaign(ctx)
Expand Down Expand Up @@ -801,8 +801,16 @@ func (s *ownerSuite) TestChangefeedApplyDDLJob(c *check.C) {
}

func (s *ownerSuite) TestWatchCampaignKey(c *check.C) {
<<<<<<< HEAD
ctx := context.Background()
capture, err := NewCapture(ctx, []string{s.clientURL.String()},
=======
defer testleak.AfterTest(c)()
defer s.TearDownTest(c)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
capture, err := NewCapture(ctx, []string{s.clientURL.String()}, nil,
>>>>>>> 2a09a89... *: Using the global singleton for pd client and tikv client, and fix pd client freeze (#1217)
&security.Credential{}, "127.0.0.1:12034", &processorOpts{})
c.Assert(err, check.IsNil)
err = capture.Campaign(ctx)
Expand Down Expand Up @@ -848,4 +856,82 @@ func (s *ownerSuite) TestWatchCampaignKey(c *check.C) {
time.Sleep(time.Millisecond * 100)
cancel()
wg.Wait()
<<<<<<< HEAD
=======

err = capture.etcdClient.Close()
c.Assert(err, check.IsNil)
}

func (s *ownerSuite) TestCleanUpStaleTasks(c *check.C) {
defer testleak.AfterTest(c)()
defer s.TearDownTest(c)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
addr := "127.0.0.1:12034"
ctx = util.PutCaptureAddrInCtx(ctx, addr)
capture, err := NewCapture(ctx, []string{s.clientURL.String()}, nil,
&security.Credential{}, addr, &processorOpts{})
c.Assert(err, check.IsNil)
err = s.client.PutCaptureInfo(ctx, capture.info, capture.session.Lease())
c.Assert(err, check.IsNil)

changefeed := "changefeed-name"
invalidCapture := uuid.New().String()
for _, captureID := range []string{capture.info.ID, invalidCapture} {
taskStatus := &model.TaskStatus{}
if captureID == invalidCapture {
taskStatus.Tables = map[model.TableID]*model.TableReplicaInfo{
51: {StartTs: 110},
}
}
err = s.client.PutTaskStatus(ctx, changefeed, captureID, taskStatus)
c.Assert(err, check.IsNil)
_, err = s.client.PutTaskPositionOnChange(ctx, changefeed, captureID, &model.TaskPosition{CheckPointTs: 100, ResolvedTs: 120})
c.Assert(err, check.IsNil)
err = s.client.PutTaskWorkload(ctx, changefeed, captureID, &model.TaskWorkload{})
c.Assert(err, check.IsNil)
}
err = s.client.SaveChangeFeedInfo(ctx, &model.ChangeFeedInfo{}, changefeed)
c.Assert(err, check.IsNil)

_, captureList, err := s.client.GetCaptures(ctx)
c.Assert(err, check.IsNil)
captures := make(map[model.CaptureID]*model.CaptureInfo)
for _, c := range captureList {
captures[c.ID] = c
}
owner, err := NewOwner(ctx, nil, &security.Credential{}, capture.session,
DefaultCDCGCSafePointTTL, time.Millisecond*200)
c.Assert(err, check.IsNil)
// It is better to update changefeed information by `loadChangeFeeds`, however
// `loadChangeFeeds` is too overweight, just mock enough information here.
owner.changeFeeds = map[model.ChangeFeedID]*changeFeed{
changefeed: {
id: changefeed,
orphanTables: make(map[model.TableID]model.Ts),
},
}
err = owner.rebuildCaptureEvents(ctx, captures)
c.Assert(err, check.IsNil)
c.Assert(len(owner.captures), check.Equals, 1)
c.Assert(owner.captures, check.HasKey, capture.info.ID)
c.Assert(owner.changeFeeds[changefeed].orphanTables, check.DeepEquals, map[model.TableID]model.Ts{51: 100})
// check stale tasks are cleaned up
statuses, err := s.client.GetAllTaskStatus(ctx, changefeed)
c.Assert(err, check.IsNil)
c.Assert(len(statuses), check.Equals, 1)
c.Assert(statuses, check.HasKey, capture.info.ID)
positions, err := s.client.GetAllTaskPositions(ctx, changefeed)
c.Assert(err, check.IsNil)
c.Assert(len(positions), check.Equals, 1)
c.Assert(positions, check.HasKey, capture.info.ID)
workloads, err := s.client.GetAllTaskWorkloads(ctx, changefeed)
c.Assert(err, check.IsNil)
c.Assert(len(workloads), check.Equals, 1)
c.Assert(workloads, check.HasKey, capture.info.ID)

err = capture.etcdClient.Close()
c.Assert(err, check.IsNil)
>>>>>>> 2a09a89... *: Using the global singleton for pd client and tikv client, and fix pd client freeze (#1217)
}
13 changes: 6 additions & 7 deletions cdc/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,13 @@ const (
schemaStorageGCLag = time.Minute * 20
)

<<<<<<< HEAD
var (
fNewPDCli = pd.NewClientWithContext
)

=======
>>>>>>> 2a09a89... *: Using the global singleton for pd client and tikv client, and fix pd client freeze (#1217)
type processor struct {
id string
captureInfo model.CaptureInfo
Expand Down Expand Up @@ -158,6 +161,7 @@ func (t *tableInfo) safeStop() (stopped bool, checkpointTs model.Ts) {
// newProcessor creates and returns a processor for the specified change feed
func newProcessor(
ctx context.Context,
pdCli pd.Client,
credential *security.Credential,
session *concurrency.Session,
changefeed model.ChangeFeedInfo,
Expand All @@ -169,12 +173,6 @@ func newProcessor(
flushCheckpointInterval time.Duration,
) (*processor, error) {
etcdCli := session.Client()
endpoints := session.Client().Endpoints()
pdCli, err := fNewPDCli(ctx, endpoints, credential.PDSecurityOption())
if err != nil {
return nil, errors.Annotatef(
cerror.WrapError(cerror.ErrNewProcessorFailed, err), "create pd client failed, addr: %v", endpoints)
}
cdcEtcdCli := kv.NewCDCEtcdClient(ctx, etcdCli)
limitter := puller.NewBlurResourceLimmter(defaultMemBufferCapacity)

Expand Down Expand Up @@ -1229,6 +1227,7 @@ func (p *processor) isStopped() bool {
// runProcessor creates a new processor then starts it.
func runProcessor(
ctx context.Context,
pdCli pd.Client,
credential *security.Credential,
session *concurrency.Session,
info model.ChangeFeedInfo,
Expand All @@ -1255,7 +1254,7 @@ func runProcessor(
cancel()
return nil, errors.Trace(err)
}
processor, err := newProcessor(ctx, credential, session, info, sink,
processor, err := newProcessor(ctx, pdCli, credential, session, info, sink,
changefeedID, captureInfo, checkpointTs, errCh, flushCheckpointInterval)
if err != nil {
cancel()
Expand Down
25 changes: 13 additions & 12 deletions cdc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,18 @@ func (s *Server) Run(ctx context.Context) error {
if err != nil {
return err
}

kvStore, err := kv.CreateTiStore(strings.Join(s.pdEndpoints, ","), s.opts.credential)
if err != nil {
return errors.Trace(err)
}
defer func() {
err := kvStore.Close()
if err != nil {
log.Warn("kv store close failed", zap.Error(err))
}
}()
ctx = util.PutKVStorageInCtx(ctx, kvStore)
// When a capture suicided, restart it
for {
if err := s.run(ctx); cerror.ErrCaptureSuicide.NotEqual(err) {
Expand Down Expand Up @@ -306,20 +318,9 @@ func (s *Server) campaignOwnerLoop(ctx context.Context) error {
func (s *Server) run(ctx context.Context) (err error) {
ctx = util.PutCaptureAddrInCtx(ctx, s.opts.advertiseAddr)
ctx = util.PutTimezoneInCtx(ctx, s.opts.timezone)
kvStore, err := kv.CreateTiStore(strings.Join(s.pdEndpoints, ","), s.opts.credential)
if err != nil {
return errors.Trace(err)
}
defer func() {
err := kvStore.Close()
if err != nil {
log.Warn("kv store close failed", zap.Error(err))
}
}()
ctx = util.PutKVStorageInCtx(ctx, kvStore)

procOpts := &processorOpts{flushCheckpointInterval: s.opts.processorFlushInterval}
capture, err := NewCapture(ctx, s.pdEndpoints, s.opts.credential, s.opts.advertiseAddr, procOpts)
capture, err := NewCapture(ctx, s.pdEndpoints, s.pdClient, s.opts.credential, s.opts.advertiseAddr, procOpts)
if err != nil {
return err
}
Expand Down

0 comments on commit 53924ba

Please sign in to comment.