Skip to content

Commit

Permalink
Merge branch 'release-5.2' into pr/3311
Browse files Browse the repository at this point in the history
  • Loading branch information
overvenus committed Dec 10, 2021
2 parents 2db2544 + 036ff34 commit 3d96953
Show file tree
Hide file tree
Showing 239 changed files with 779 additions and 247 deletions.
15 changes: 10 additions & 5 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -145,10 +145,10 @@ integration_test_build: check_failpoint_ctl
integration_test: integration_test_mysql

integration_test_mysql:
tests/run.sh mysql "$(CASE)"
tests/integration_tests/run.sh mysql "$(CASE)"

integration_test_kafka: check_third_party_binary
tests/run.sh kafka "$(CASE)"
tests/integration_tests/run.sh kafka "$(CASE)"

fmt: tools/bin/gofumports tools/bin/shfmt
@echo "gofmt (simplify)"
Expand Down Expand Up @@ -186,16 +186,21 @@ tidy:

check: check-copyright fmt lint check-static tidy errdoc check-leaktest-added check-merge-conflicts

coverage:
integration_test_coverage:
GO111MODULE=off go get github.com/wadey/gocovmerge
gocovmerge "$(TEST_DIR)"/cov.* | grep -vE ".*.pb.go|$(CDC_PKG)/testing_utils/.*|$(CDC_PKG)/cdc/kv/testing.go|$(CDC_PKG)/cdc/entry/schema_test_helper.go|$(CDC_PKG)/cdc/sink/simple_mysql_tester.go|.*.__failpoint_binding__.go" > "$(TEST_DIR)/all_cov.out"
grep -vE ".*.pb.go|$(CDC_PKG)/testing_utils/.*|$(CDC_PKG)/cdc/kv/testing.go|$(CDC_PKG)/cdc/sink/simple_mysql_tester.go|.*.__failpoint_binding__.go" "$(TEST_DIR)/cov.unit.out" > "$(TEST_DIR)/unit_cov.out"
ifeq ("$(JenkinsCI)", "1")
GO111MODULE=off go get github.com/mattn/goveralls
@goveralls -coverprofile=$(TEST_DIR)/all_cov.out -service=jenkins-ci -repotoken $(COVERALLS_TOKEN)
@bash <(curl -s https://codecov.io/bash) -f $(TEST_DIR)/unit_cov.out -t $(CODECOV_TOKEN)
else
go tool cover -html "$(TEST_DIR)/all_cov.out" -o "$(TEST_DIR)/all_cov.html"
endif

unit_test_coverage:
grep -vE ".*.pb.go|$(CDC_PKG)/testing_utils/.*|$(CDC_PKG)/cdc/kv/testing.go|$(CDC_PKG)/cdc/sink/simple_mysql_tester.go|.*.__failpoint_binding__.go" "$(TEST_DIR)/cov.unit.out" > "$(TEST_DIR)/unit_cov.out"
ifeq ("$(JenkinsCI)", "1")
@bash <(curl -s https://codecov.io/bash) -f $(TEST_DIR)/unit_cov.out -t $(CODECOV_TOKEN)
else
go tool cover -html "$(TEST_DIR)/unit_cov.out" -o "$(TEST_DIR)/unit_cov.html"
go tool cover -func="$(TEST_DIR)/unit_cov.out"
endif
Expand Down
36 changes: 24 additions & 12 deletions cdc/capture/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
cdcContext "github.com/pingcap/ticdc/pkg/context"
cerror "github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/orchestrator"
"github.com/pingcap/ticdc/pkg/pdtime"
"github.com/pingcap/ticdc/pkg/version"
tidbkv "github.com/pingcap/tidb/kv"
pd "github.com/tikv/pd/client"
Expand All @@ -54,12 +55,12 @@ type Capture struct {
session *concurrency.Session
election *concurrency.Election

pdClient pd.Client
kvStorage tidbkv.Storage
etcdClient *kv.CDCEtcdClient
grpcPool kv.GrpcPool

cancel context.CancelFunc
pdClient pd.Client
kvStorage tidbkv.Storage
etcdClient *kv.CDCEtcdClient
grpcPool kv.GrpcPool
TimeAcquirer pdtime.TimeAcquirer
cancel context.CancelFunc

newProcessorManager func() *processor.Manager
newOwner func(pd.Client) *owner.Owner
Expand Down Expand Up @@ -99,6 +100,12 @@ func (c *Capture) reset(ctx context.Context) error {
}
c.session = sess
c.election = concurrency.NewElection(sess, kv.CaptureOwnerKey)

if c.TimeAcquirer != nil {
c.TimeAcquirer.Stop()
}
c.TimeAcquirer = pdtime.NewTimeAcquirer(c.pdClient)

if c.grpcPool != nil {
c.grpcPool.Close()
}
Expand Down Expand Up @@ -147,11 +154,12 @@ func (c *Capture) Run(ctx context.Context) error {

func (c *Capture) run(stdCtx context.Context) error {
ctx := cdcContext.NewContext(stdCtx, &cdcContext.GlobalVars{
PDClient: c.pdClient,
KVStorage: c.kvStorage,
CaptureInfo: c.info,
EtcdClient: c.etcdClient,
GrpcPool: c.grpcPool,
PDClient: c.pdClient,
KVStorage: c.kvStorage,
CaptureInfo: c.info,
EtcdClient: c.etcdClient,
GrpcPool: c.grpcPool,
TimeAcquirer: c.TimeAcquirer,
})
err := c.register(ctx)
if err != nil {
Expand All @@ -165,7 +173,7 @@ func (c *Capture) run(stdCtx context.Context) error {
cancel()
}()
wg := new(sync.WaitGroup)
wg.Add(3)
wg.Add(4)
var ownerErr, processorErr error
go func() {
defer wg.Done()
Expand All @@ -187,6 +195,10 @@ func (c *Capture) run(stdCtx context.Context) error {
processorErr = c.runEtcdWorker(ctx, c.processorManager, model.NewGlobalState(), processorFlushInterval)
log.Info("the processor routine has exited", zap.Error(processorErr))
}()
go func() {
defer wg.Done()
c.TimeAcquirer.Run(ctx)
}()
go func() {
defer wg.Done()
c.grpcPool.RecycleConn(ctx)
Expand Down
24 changes: 16 additions & 8 deletions cdc/entry/schema_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -652,7 +652,7 @@ func (s *schemaSnapshot) Tables() map[model.TableID]*model.TableInfo {
// SchemaStorage stores the schema information with multi-version
type SchemaStorage interface {
// GetSnapshot returns the snapshot which of ts is specified
GetSnapshot(ctx context.Context, ts uint64) (*schemaSnapshot, error)
GetSnapshot(ctx context.Context, ts uint64) (*SingleSchemaSnapshot, error)
// GetLastSnapshot returns the last snapshot
GetLastSnapshot() *schemaSnapshot
// HandleDDLJob creates a new snapshot in storage and handles the ddl job
Expand All @@ -661,8 +661,9 @@ type SchemaStorage interface {
AdvanceResolvedTs(ts uint64)
// ResolvedTs returns the resolved ts of the schema storage
ResolvedTs() uint64
// DoGC removes snaps which of ts less than this specified ts
DoGC(ts uint64)
// DoGC removes snaps that are no longer needed at the specified TS.
// It returns the TS from which the oldest maintained snapshot is valid.
DoGC(ts uint64) (lastSchemaTs uint64)
}

type schemaStorageImpl struct {
Expand Down Expand Up @@ -796,7 +797,7 @@ func (s *schemaStorageImpl) ResolvedTs() uint64 {
}

// DoGC removes snaps which of ts less than this specified ts
func (s *schemaStorageImpl) DoGC(ts uint64) {
func (s *schemaStorageImpl) DoGC(ts uint64) (lastSchemaTs uint64) {
s.snapsMu.Lock()
defer s.snapsMu.Unlock()
var startIdx int
Expand All @@ -807,17 +808,24 @@ func (s *schemaStorageImpl) DoGC(ts uint64) {
startIdx = i
}
if startIdx == 0 {
return
return s.snaps[0].currentTs
}
if log.GetLevel() == zapcore.DebugLevel {
log.Debug("Do GC in schema storage")
for i := 0; i < startIdx; i++ {
s.snaps[i].PrintStatus(log.Debug)
}
}
s.snaps = s.snaps[startIdx:]
atomic.StoreUint64(&s.gcTs, s.snaps[0].currentTs)
log.Info("finished gc in schema storage", zap.Uint64("gcTs", s.snaps[0].currentTs))

// copy the part of the slice that is needed instead of re-slicing it
// to maximize efficiency of Go runtime GC.
newSnaps := make([]*schemaSnapshot, len(s.snaps)-startIdx)
copy(newSnaps, s.snaps[startIdx:])
s.snaps = newSnaps

lastSchemaTs = s.snaps[0].currentTs
atomic.StoreUint64(&s.gcTs, lastSchemaTs)
return
}

// SkipJob skip the job should not be executed
Expand Down
8 changes: 6 additions & 2 deletions cdc/entry/schema_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -623,7 +623,9 @@ func (t *schemaSuite) TestMultiVersionStorage(c *check.C) {
_, exist = snap.TableByID(3)
c.Assert(exist, check.IsFalse)

storage.DoGC(0)
lastSchemaTs := storage.DoGC(0)
c.Check(lastSchemaTs, check.Equals, uint64(0))

snap, err = storage.GetSnapshot(ctx, 100)
c.Assert(err, check.IsNil)
_, exist = snap.SchemaByID(1)
Expand All @@ -644,7 +646,9 @@ func (t *schemaSuite) TestMultiVersionStorage(c *check.C) {
_, exist = snap.TableByID(3)
c.Assert(exist, check.IsFalse)

storage.DoGC(155)
lastSchemaTs = storage.DoGC(155)
c.Check(lastSchemaTs, check.Equals, uint64(140))

storage.AdvanceResolvedTs(185)

snap, err = storage.GetSnapshot(ctx, 180)
Expand Down
13 changes: 7 additions & 6 deletions cdc/kv/region_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ func (w *regionWorker) checkShouldExit() error {
return nil
}

func (w *regionWorker) handleSingleRegionError(ctx context.Context, err error, state *regionFeedState) error {
func (w *regionWorker) handleSingleRegionError(err error, state *regionFeedState) error {
if state.lastResolvedTs > state.sri.ts {
state.sri.ts = state.lastResolvedTs
}
Expand Down Expand Up @@ -273,7 +273,9 @@ func (w *regionWorker) handleSingleRegionError(ctx context.Context, err error, s
}

revokeToken := !state.initialized
err2 := w.session.onRegionFail(ctx, regionErrorInfo{
// since the context used in region worker will be cancelled after region
// worker exits, we must use the parent context to prevent regionErrorInfo loss.
err2 := w.session.onRegionFail(w.parentCtx, regionErrorInfo{
singleRegionInfo: state.sri,
err: err,
}, revokeToken)
Expand Down Expand Up @@ -388,26 +390,25 @@ func (w *regionWorker) processEvent(ctx context.Context, event *regionStatefulEv
case *cdcpb.Event_Entries_:
err = w.handleEventEntry(ctx, x, event.state)
if err != nil {
err = w.handleSingleRegionError(ctx, err, event.state)
err = w.handleSingleRegionError(err, event.state)
}
case *cdcpb.Event_Admin_:
log.Info("receive admin event", zap.Stringer("event", event.changeEvent))
case *cdcpb.Event_Error:
err = w.handleSingleRegionError(
ctx,
cerror.WrapError(cerror.ErrEventFeedEventError, &eventError{err: x.Error}),
event.state,
)
case *cdcpb.Event_ResolvedTs:
if err = w.handleResolvedTs(ctx, x.ResolvedTs, event.state); err != nil {
err = w.handleSingleRegionError(ctx, err, event.state)
err = w.handleSingleRegionError(err, event.state)
}
}
}

if event.resolvedTs != nil {
if err = w.handleResolvedTs(ctx, event.resolvedTs.Ts, event.state); err != nil {
err = w.handleSingleRegionError(ctx, err, event.state)
err = w.handleSingleRegionError(err, event.state)
}
}
event.state.lock.Unlock()
Expand Down
13 changes: 6 additions & 7 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package owner
import (
"context"
"sync"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
Expand Down Expand Up @@ -178,7 +177,9 @@ func (c *changefeed) tick(ctx cdcContext.Context, state *model.ChangefeedReactor
return errors.Trace(err)
}
if shouldUpdateState {
c.updateStatus(barrierTs)
pdTime, _ := ctx.GlobalVars().TimeAcquirer.CurrentTimeFromCached()
currentTs := oracle.GetPhysical(pdTime)
c.updateStatus(currentTs, barrierTs)
}
return nil
}
Expand Down Expand Up @@ -438,7 +439,7 @@ func (c *changefeed) asyncExecDDL(ctx cdcContext.Context, job *timodel.Job) (don
return done, nil
}

func (c *changefeed) updateStatus(barrierTs model.Ts) {
func (c *changefeed) updateStatus(currentTs int64, barrierTs model.Ts) {
resolvedTs := barrierTs
for _, position := range c.state.TaskPositions {
if resolvedTs > position.ResolvedTs {
Expand Down Expand Up @@ -470,12 +471,10 @@ func (c *changefeed) updateStatus(barrierTs model.Ts) {
}
return status, changed, nil
})

phyTs := oracle.ExtractPhysical(checkpointTs)

c.metricsChangefeedCheckpointTsGauge.Set(float64(phyTs))
// It is more accurate to get tso from PD, but in most cases since we have
// deployed NTP service, a little bias is acceptable here.
c.metricsChangefeedCheckpointTsLagGauge.Set(float64(oracle.GetPhysical(time.Now())-phyTs) / 1e3)
c.metricsChangefeedCheckpointTsLagGauge.Set(float64(currentTs-phyTs) / 1e3)
}

func (c *changefeed) Close() {
Expand Down
2 changes: 2 additions & 0 deletions cdc/owner/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pingcap/ticdc/pkg/config"
cdcContext "github.com/pingcap/ticdc/pkg/context"
"github.com/pingcap/ticdc/pkg/orchestrator"
"github.com/pingcap/ticdc/pkg/pdtime"
"github.com/pingcap/ticdc/pkg/txnutil/gc"
"github.com/pingcap/ticdc/pkg/util/testleak"
"github.com/pingcap/ticdc/pkg/version"
Expand Down Expand Up @@ -217,6 +218,7 @@ func (s *changefeedSuite) TestExecDDL(c *check.C) {
AdvertiseAddr: "127.0.0.1:0000",
Version: version.ReleaseVersion,
},
TimeAcquirer: pdtime.NewTimeAcquirer4Test(),
})
ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{
ID: "changefeed-id-test",
Expand Down
13 changes: 1 addition & 12 deletions cdc/processor/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/pingcap/ticdc/cdc/model"
tablepipeline "github.com/pingcap/ticdc/cdc/processor/pipeline"
cdcContext "github.com/pingcap/ticdc/pkg/context"
cerrors "github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/orchestrator"
Expand Down Expand Up @@ -62,17 +61,6 @@ func NewManager() *Manager {
}
}

// NewManager4Test creates a new processor manager for test
func NewManager4Test(
createTablePipeline func(ctx cdcContext.Context, tableID model.TableID, replicaInfo *model.TableReplicaInfo) (tablepipeline.TablePipeline, error),
) *Manager {
m := NewManager()
m.newProcessor = func(ctx cdcContext.Context) *processor {
return newProcessor4Test(ctx, createTablePipeline)
}
return m
}

// Tick implements the `orchestrator.State` interface
// the `state` parameter is sent by the etcd worker, the `state` must be a snapshot of KVs in etcd
// the Tick function of Manager create or remove processor instances according to the specified `state`, or pass the `state` to processor instances
Expand All @@ -82,6 +70,7 @@ func (m *Manager) Tick(stdCtx context.Context, state orchestrator.ReactorState)
if err := m.handleCommand(); err != nil {
return state, err
}

captureID := ctx.GlobalVars().CaptureInfo.ID
var inactiveChangefeedCount int
for changefeedID, changefeedState := range globalState.Changefeeds {
Expand Down
14 changes: 13 additions & 1 deletion cdc/processor/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,20 @@ type managerSuite struct {

var _ = check.Suite(&managerSuite{})

// NewManager4Test creates a new processor manager for test
func NewManager4Test(
c *check.C,
createTablePipeline func(ctx cdcContext.Context, tableID model.TableID, replicaInfo *model.TableReplicaInfo) (tablepipeline.TablePipeline, error),
) *Manager {
m := NewManager()
m.newProcessor = func(ctx cdcContext.Context) *processor {
return newProcessor4Test(ctx, c, createTablePipeline)
}
return m
}

func (s *managerSuite) resetSuit(ctx cdcContext.Context, c *check.C) {
s.manager = NewManager4Test(func(ctx cdcContext.Context, tableID model.TableID, replicaInfo *model.TableReplicaInfo) (tablepipeline.TablePipeline, error) {
s.manager = NewManager4Test(c, func(ctx cdcContext.Context, tableID model.TableID, replicaInfo *model.TableReplicaInfo) (tablepipeline.TablePipeline, error) {
return &mockTablePipeline{
tableID: tableID,
name: fmt.Sprintf("`test`.`table%d`", tableID),
Expand Down
8 changes: 8 additions & 0 deletions cdc/processor/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,13 @@ var (
Name: "exit_with_error_count",
Help: "counter for processor exits with error",
}, []string{"changefeed", "capture"})
processorSchemaStorageGcTsGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Subsystem: "processor",
Name: "schema_storage_gc_ts",
Help: "the TS of the currently maintained oldest snapshot in SchemaStorage",
}, []string{"changefeed", "capture"})
)

// InitMetrics registers all metrics used in processor
Expand All @@ -70,4 +77,5 @@ func InitMetrics(registry *prometheus.Registry) {
registry.MustRegister(checkpointTsLagGauge)
registry.MustRegister(syncTableNumGauge)
registry.MustRegister(processorErrorCounter)
registry.MustRegister(processorSchemaStorageGcTsGauge)
}
3 changes: 2 additions & 1 deletion cdc/processor/pipeline/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func (n *sinkNode) flushSink(ctx pipeline.NodeContext, resolvedTs model.Ts) (err

func (n *sinkNode) emitEvent(ctx pipeline.NodeContext, event *model.PolymorphicEvent) error {
if event == nil || event.Row == nil {
log.Warn("skip emit empty rows", zap.Any("event", event))
log.Warn("skip emit nil event", zap.Any("event", event))
return nil
}

Expand All @@ -161,6 +161,7 @@ func (n *sinkNode) emitEvent(ctx pipeline.NodeContext, event *model.PolymorphicE
// begin; insert into t (id) values (1); delete from t where id=1; commit;
// Just ignore these row changed events
if colLen == 0 && preColLen == 0 {
log.Warn("skip emit empty row event", zap.Any("event", event))
return nil
}

Expand Down
Loading

0 comments on commit 3d96953

Please sign in to comment.