Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

filter(ticdc): Move filter from sink to mounter and owner/schema. #5936

Merged
merged 21 commits into from
Jun 28, 2022
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
070a53f
filter (ticdc): move filter from sink to mounter
asddongmen Jun 20, 2022
0ee0775
Merge branch 'master' into move_filter_to_mounter
asddongmen Jun 20, 2022
1439196
fix fmt error
asddongmen Jun 20, 2022
73e067c
add unit test
asddongmen Jun 20, 2022
b16ab2f
fix unit test error
asddongmen Jun 20, 2022
8b00edf
Merge branch 'master' into move_filter_to_mounter
asddongmen Jun 20, 2022
80470e9
address comments
asddongmen Jun 23, 2022
49413c4
Merge branch 'move_filter_to_mounter' of github.com:asddongmen/ticdc …
asddongmen Jun 23, 2022
c46ec5f
Merge branch 'master' into move_filter_to_mounter
asddongmen Jun 23, 2022
e3ebe94
address comment
asddongmen Jun 23, 2022
a0ad511
Merge branch 'move_filter_to_mounter' of github.com:asddongmen/ticdc …
asddongmen Jun 23, 2022
ed6e677
Merge branch 'master' into move_filter_to_mounter
asddongmen Jun 26, 2022
a23b4a9
add unit test
asddongmen Jun 27, 2022
54c3289
address comment
asddongmen Jun 27, 2022
0417ce3
clear debug log
asddongmen Jun 27, 2022
7e5106c
Merge branch 'master' into move_filter_to_mounter
asddongmen Jun 27, 2022
d220544
address comments
asddongmen Jun 28, 2022
28a0579
Merge branch 'move_filter_to_mounter' of github.com:asddongmen/ticdc …
asddongmen Jun 28, 2022
09da8d4
Merge branch 'master' into move_filter_to_mounter
ti-chi-bot Jun 28, 2022
b26c797
fix lint error
asddongmen Jun 28, 2022
02915cc
Merge branch 'move_filter_to_mounter' of github.com:asddongmen/ticdc …
asddongmen Jun 28, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 12 additions & 9 deletions cdc/entry/mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/pingcap/tidb/types"
"github.com/pingcap/tiflow/cdc/model"
cerror "github.com/pingcap/tiflow/pkg/errors"
pfilter "github.com/pingcap/tiflow/pkg/filter"
asddongmen marked this conversation as resolved.
Show resolved Hide resolved
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -66,15 +67,11 @@ type Mounter interface {
}

type mounterImpl struct {
schemaStorage SchemaStorage
tz *time.Location
workerNum int
enableOldValue bool
changefeedID model.ChangeFeedID

// index is an atomic variable to dispatch input events to workers.
index int64

schemaStorage SchemaStorage
tz *time.Location
enableOldValue bool
changefeedID model.ChangeFeedID
filter *pfilter.Filter
metricMountDuration prometheus.Observer
metricTotalRows prometheus.Gauge
}
Expand All @@ -83,12 +80,14 @@ type mounterImpl struct {
func NewMounter(schemaStorage SchemaStorage,
changefeedID model.ChangeFeedID,
tz *time.Location,
filter *pfilter.Filter,
enableOldValue bool,
) Mounter {
return &mounterImpl{
schemaStorage: schemaStorage,
changefeedID: changefeedID,
enableOldValue: enableOldValue,
filter: filter,
metricMountDuration: mountDuration.
WithLabelValues(changefeedID.Namespace, changefeedID.ID),
metricTotalRows: totalRowsCountGauge.
Expand All @@ -112,6 +111,10 @@ func (m *mounterImpl) DecodeEvent(ctx context.Context, pEvent *model.Polymorphic
pEvent.Row = rowEvent
pEvent.RawKV.Value = nil
pEvent.RawKV.OldValue = nil
pEvent.Row.Filtered = m.filter.ShouldIgnoreDMLEvent(
asddongmen marked this conversation as resolved.
Show resolved Hide resolved
pEvent.Row.StartTs,
pEvent.Row.Table.Schema,
pEvent.Row.Table.Table)
duration := time.Since(start)
if duration > time.Second {
m.metricMountDuration.Observe(duration.Seconds())
Expand Down
7 changes: 6 additions & 1 deletion cdc/entry/mounter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import (
"github.com/pingcap/tidb/testkit"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/config"
pfilter "github.com/pingcap/tiflow/pkg/filter"
"github.com/pingcap/tiflow/pkg/regionspan"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/oracle"
Expand Down Expand Up @@ -289,9 +291,12 @@ func testMounterDisableOldValue(t *testing.T, tc struct {
ver, err := store.CurrentVersion(oracle.GlobalTxnScope)
require.Nil(t, err)
scheamStorage.AdvanceResolvedTs(ver.Ver)
config := config.GetDefaultReplicaConfig()
filter, err := pfilter.NewFilter(config)
require.Nil(t, err)
mounter := NewMounter(scheamStorage,
model.DefaultChangeFeedID("c1"),
time.UTC, false).(*mounterImpl)
time.UTC, filter, false).(*mounterImpl)
mounter.tz = time.Local
ctx := context.Background()

Expand Down
2 changes: 2 additions & 0 deletions cdc/model/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,8 @@ type RowChangedEvent struct {

// SplitTxn marks this RowChangedEvent as the first line of a new txn.
SplitTxn bool `json:"-" msg:"-"`
// Filtered marks this RowChangedEvent was filtered by filter in mounter.
Filtered bool `json:"-" msg:"-"`
asddongmen marked this conversation as resolved.
Show resolved Hide resolved
}

// IsDelete returns true if the row is a delete event
Expand Down
42 changes: 39 additions & 3 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/pingcap/tiflow/cdc/scheduler"
cdcContext "github.com/pingcap/tiflow/pkg/context"
cerror "github.com/pingcap/tiflow/pkg/errors"
pfilter "github.com/pingcap/tiflow/pkg/filter"
"github.com/pingcap/tiflow/pkg/orchestrator"
"github.com/pingcap/tiflow/pkg/txnutil/gc"
"github.com/pingcap/tiflow/pkg/upstream"
Expand Down Expand Up @@ -73,6 +74,7 @@ type changefeed struct {
schema *schemaWrap4Owner
sink DDLSink
ddlPuller DDLPuller
filter *pfilter.Filter
initialized bool
// isRemoved is true if the changefeed is removed
isRemoved bool
Expand Down Expand Up @@ -351,6 +353,10 @@ LOOP:
if err != nil {
return errors.Trace(err)
}
c.filter, err = pfilter.NewFilter(c.state.Info.Config)
if err != nil {
return errors.Trace(err)
}
cancelCtx, cancel := cdcContext.WithCancel(ctx)
c.cancel = cancel

Expand Down Expand Up @@ -557,6 +563,11 @@ func (c *changefeed) handleBarrier(ctx cdcContext.Context) (uint64, error) {
return barrierTs, nil
}

// asyncExecDDLJob execute ddl job asynchronously, it returns true if the jod is done.
// 0. build ddl events from job.
asddongmen marked this conversation as resolved.
Show resolved Hide resolved
// 1. Apply ddl job to c.schema.
asddongmen marked this conversation as resolved.
Show resolved Hide resolved
// 2. Emit ddl event to redo manager.
asddongmen marked this conversation as resolved.
Show resolved Hide resolved
// 3. emit ddl event to ddl sink.
asddongmen marked this conversation as resolved.
Show resolved Hide resolved
func (c *changefeed) asyncExecDDLJob(ctx cdcContext.Context,
job *timodel.Job,
) (bool, error) {
Expand All @@ -567,6 +578,7 @@ func (c *changefeed) asyncExecDDLJob(ctx cdcContext.Context,
}

if c.ddlEventCache == nil {
// We must build ddl events from job before we call c.schema.HandleDDL(job).
ddlEvents, err := c.schema.BuildDDLEvents(job)
if err != nil {
log.Error("build DDL event fail", zap.String("changefeed", c.id.ID),
Expand All @@ -580,13 +592,14 @@ func (c *changefeed) asyncExecDDLJob(ctx cdcContext.Context,
checkpointTs := c.state.Info.GetCheckpointTs(c.state.Status)
// refresh checkpointTs and currentTableNames when a ddl job is received
c.sink.emitCheckpointTs(checkpointTs, c.currentTableNames)
// we apply ddl to update changefeed schema here.
err = c.schema.HandleDDL(job)
if err != nil {
return false, errors.Trace(err)
}
c.ddlEventCache = ddlEvents
for _, ddlEvent := range ddlEvents {
if c.redoManager.Enabled() {
c.ddlEventCache = c.filterDDLEvent(ddlEvents)
if c.redoManager.Enabled() {
for _, ddlEvent := range c.ddlEventCache {
err = c.redoManager.EmitDDLEvent(ctx, ddlEvent)
if err != nil {
return false, err
Expand Down Expand Up @@ -618,6 +631,9 @@ func (c *changefeed) asyncExecDDLEvent(ctx cdcContext.Context,
ddlEvent *model.DDLEvent,
) (done bool, err error) {
ddlEvent.Query, err = addSpecialComment(ddlEvent.Query)
if err != nil {
return false, err
}
if ddlEvent.TableInfo != nil &&
c.schema.IsIneligibleTableID(ddlEvent.TableInfo.TableID) {
log.Warn("ignore the DDL event of ineligible table",
Expand All @@ -632,6 +648,26 @@ func (c *changefeed) asyncExecDDLEvent(ctx cdcContext.Context,
return done, nil
}

func (c *changefeed) filterDDLEvent(ddlEvents []*model.DDLEvent) []*model.DDLEvent {
asddongmen marked this conversation as resolved.
Show resolved Hide resolved
res := make([]*model.DDLEvent, 0)
// filter ddl event here
for _, ddlEvent := range ddlEvents {
if c.filter.ShouldIgnoreDDLEvent(ddlEvent.StartTs, ddlEvent.Type, ddlEvent.TableInfo.Schema, ddlEvent.TableInfo.Table) {
log.Info(
"DDL event ignored",
zap.String("query", ddlEvent.Query),
zap.Uint64("startTs", ddlEvent.StartTs),
zap.Uint64("commitTs", ddlEvent.CommitTs),
zap.String("namespace", c.id.Namespace),
zap.String("changefeed", c.id.ID),
)
continue
}
res = append(res, ddlEvent)
}
return res
}

func (c *changefeed) updateMetrics(currentTs int64, checkpointTs, resolvedTs model.Ts) {
phyCkpTs := oracle.ExtractPhysical(checkpointTs)
c.metricsChangefeedCheckpointTsGauge.Set(float64(phyCkpTs))
Expand Down
72 changes: 60 additions & 12 deletions cdc/owner/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/pingcap/tiflow/cdc/scheduler"
"github.com/pingcap/tiflow/pkg/config"
cdcContext "github.com/pingcap/tiflow/pkg/context"
pfilter "github.com/pingcap/tiflow/pkg/filter"
"github.com/pingcap/tiflow/pkg/orchestrator"
"github.com/pingcap/tiflow/pkg/txnutil/gc"
"github.com/pingcap/tiflow/pkg/upstream"
Expand Down Expand Up @@ -165,7 +166,7 @@ func (m *mockScheduler) Rebalance() {}
// Close closes the scheduler and releases resources.
func (m *mockScheduler) Close(ctx context.Context) {}

func createChangefeed4Test(ctx cdcContext.Context, t *testing.T) (
func createChangefeed4Test(ctx cdcContext.Context, t *testing.T, config *config.ReplicaConfig) (
*changefeed, *orchestrator.ChangefeedReactorState,
map[model.CaptureID]*model.CaptureInfo, *orchestrator.ReactorStateTester,
) {
Expand All @@ -189,6 +190,9 @@ func createChangefeed4Test(ctx cdcContext.Context, t *testing.T) (
return &mockScheduler{}, nil
}
cf.upstream = up
f, err := pfilter.NewFilter(config)
require.Nil(t, err)
cf.filter = f
state := orchestrator.NewChangefeedReactorState(ctx.ChangefeedVars().ID)
tester := orchestrator.NewReactorStateTester(t, state, nil)
state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) {
Expand All @@ -204,7 +208,7 @@ func createChangefeed4Test(ctx cdcContext.Context, t *testing.T) (

func TestPreCheck(t *testing.T) {
ctx := cdcContext.NewBackendContext4Test(true)
cf, state, captures, tester := createChangefeed4Test(ctx, t)
cf, state, captures, tester := createChangefeed4Test(ctx, t, config.GetDefaultReplicaConfig())
cf.Tick(ctx, state, captures)
tester.MustApplyPatches()
require.NotNil(t, state.Status)
Expand All @@ -224,7 +228,7 @@ func TestPreCheck(t *testing.T) {

func TestInitialize(t *testing.T) {
ctx := cdcContext.NewBackendContext4Test(true)
cf, state, captures, tester := createChangefeed4Test(ctx, t)
cf, state, captures, tester := createChangefeed4Test(ctx, t, config.GetDefaultReplicaConfig())
defer cf.Close(ctx)
// pre check
cf.Tick(ctx, state, captures)
Expand All @@ -238,7 +242,7 @@ func TestInitialize(t *testing.T) {

func TestChangefeedHandleError(t *testing.T) {
ctx := cdcContext.NewBackendContext4Test(true)
cf, state, captures, tester := createChangefeed4Test(ctx, t)
cf, state, captures, tester := createChangefeed4Test(ctx, t, config.GetDefaultReplicaConfig())
defer cf.Close(ctx)
// pre check
cf.Tick(ctx, state, captures)
Expand Down Expand Up @@ -280,7 +284,7 @@ func TestExecDDL(t *testing.T) {
},
})

cf, state, captures, tester := createChangefeed4Test(ctx, t)
cf, state, captures, tester := createChangefeed4Test(ctx, t, config.GetDefaultReplicaConfig())
cf.upstream.KVStorage = helper.Storage()
defer cf.Close(ctx)
tickThreeTime := func() {
Expand Down Expand Up @@ -370,7 +374,7 @@ func TestEmitCheckpointTs(t *testing.T) {
},
})

cf, state, captures, tester := createChangefeed4Test(ctx, t)
cf, state, captures, tester := createChangefeed4Test(ctx, t, config.GetDefaultReplicaConfig())
cf.upstream.KVStorage = helper.Storage()

defer cf.Close(ctx)
Expand Down Expand Up @@ -421,7 +425,7 @@ func TestSyncPoint(t *testing.T) {
ctx := cdcContext.NewBackendContext4Test(true)
ctx.ChangefeedVars().Info.SyncPointEnabled = true
ctx.ChangefeedVars().Info.SyncPointInterval = 1 * time.Second
cf, state, captures, tester := createChangefeed4Test(ctx, t)
cf, state, captures, tester := createChangefeed4Test(ctx, t, config.GetDefaultReplicaConfig())
defer cf.Close(ctx)

// pre check
Expand Down Expand Up @@ -451,7 +455,7 @@ func TestSyncPoint(t *testing.T) {
func TestFinished(t *testing.T) {
ctx := cdcContext.NewBackendContext4Test(true)
ctx.ChangefeedVars().Info.TargetTs = ctx.ChangefeedVars().Info.StartTs + 1000
cf, state, captures, tester := createChangefeed4Test(ctx, t)
cf, state, captures, tester := createChangefeed4Test(ctx, t, config.GetDefaultReplicaConfig())
defer cf.Close(ctx)

// pre check
Expand Down Expand Up @@ -514,7 +518,7 @@ func testChangefeedReleaseResource(
redoLogDir string,
expectedInitialized bool,
) {
cf, state, captures, tester := createChangefeed4Test(ctx, t)
cf, state, captures, tester := createChangefeed4Test(ctx, t, config.GetDefaultReplicaConfig())

// pre check
cf.Tick(ctx, state, captures)
Expand Down Expand Up @@ -780,7 +784,7 @@ func TestExecRenameTablesDDL(t *testing.T) {
helper := entry.NewSchemaTestHelper(t)
defer helper.Close()
ctx := cdcContext.NewBackendContext4Test(true)
cf, state, captures, tester := createChangefeed4Test(ctx, t)
cf, state, captures, tester := createChangefeed4Test(ctx, t, config.GetDefaultReplicaConfig())
defer cf.Close(ctx)
// pre check
cf.Tick(ctx, state, captures)
Expand Down Expand Up @@ -869,7 +873,7 @@ func TestExecDropTablesDDL(t *testing.T) {
helper := entry.NewSchemaTestHelper(t)
defer helper.Close()
ctx := cdcContext.NewBackendContext4Test(true)
cf, state, captures, tester := createChangefeed4Test(ctx, t)
cf, state, captures, tester := createChangefeed4Test(ctx, t, config.GetDefaultReplicaConfig())
defer cf.Close(ctx)

// pre check
Expand Down Expand Up @@ -923,7 +927,7 @@ func TestExecDropViewsDDL(t *testing.T) {
helper := entry.NewSchemaTestHelper(t)
defer helper.Close()
ctx := cdcContext.NewBackendContext4Test(true)
cf, state, captures, tester := createChangefeed4Test(ctx, t)
cf, state, captures, tester := createChangefeed4Test(ctx, t, config.GetDefaultReplicaConfig())
defer cf.Close(ctx)

// pre check
Expand Down Expand Up @@ -979,3 +983,47 @@ func TestExecDropViewsDDL(t *testing.T) {
execDropStmt(jobs[0], "DROP VIEW `test1`.`view2`")
execDropStmt(jobs[1], "DROP VIEW `test1`.`view1`")
}

func TestFilterDDLEvent(t *testing.T) {
ddlEvents := []*model.DDLEvent{
{
StartTs: 1000,
CommitTs: 1010,
TableInfo: &model.SimpleTableInfo{
Schema: "test",
Table: "t1",
},
Type: timodel.ActionAddColumn,
Query: "ALTER TABLE test.t1 ADD COLUMN a int",
},
{
StartTs: 1020,
CommitTs: 1030,
TableInfo: &model.SimpleTableInfo{
Schema: "test",
Table: "t2",
},
Type: timodel.ActionAddColumn,
Query: "ALTER TABLE test.t2 ADD COLUMN a int",
},
{
StartTs: 1020,
CommitTs: 1030,
TableInfo: &model.SimpleTableInfo{
Schema: "test",
Table: "t3",
},
Type: timodel.ActionAddColumn,
Query: "ALTER TABLE test.t3 ADD COLUMN a int",
},
}
rc := config.GetDefaultReplicaConfig()
rc.Filter = &config.FilterConfig{
Rules: []string{"test.t1"},
}
ctx := cdcContext.NewBackendContext4Test(true)
cf, _, _, _ := createChangefeed4Test(ctx, t, rc)
defer cf.Close(ctx)
res := cf.filterDDLEvent(ddlEvents)
require.Equal(t, ddlEvents[:1], res)
}
10 changes: 2 additions & 8 deletions cdc/owner/ddl_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/pingcap/tiflow/cdc/sink/mysql"
cdcContext "github.com/pingcap/tiflow/pkg/context"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/filter"
"github.com/pingcap/tiflow/pkg/util"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -94,14 +93,9 @@ func newDDLSink() DDLSink {
type ddlSinkInitHandler func(ctx cdcContext.Context, a *ddlSinkImpl, id model.ChangeFeedID, info *model.ChangeFeedInfo) error

func ddlSinkInitializer(ctx cdcContext.Context, a *ddlSinkImpl, id model.ChangeFeedID, info *model.ChangeFeedInfo) error {
filter, err := filter.NewFilter(info.Config)
if err != nil {
return errors.Trace(err)
}

stdCtx := contextutil.PutChangefeedIDInCtx(ctx, id)
stdCtx = contextutil.PutRoleInCtx(stdCtx, util.RoleOwner)
s, err := sink.New(stdCtx, id, info.SinkURI, filter, info.Config, a.errCh)
s, err := sink.New(stdCtx, id, info.SinkURI, info.Config, a.errCh)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -189,7 +183,7 @@ func (s *ddlSinkImpl) run(ctx cdcContext.Context, id model.ChangeFeedID, info *m
failpoint.Inject("InjectChangefeedDDLError", func() {
err = cerror.ErrExecDDLFailed.GenWithStackByArgs()
})
if err == nil || cerror.ErrDDLEventIgnored.Equal(errors.Cause(err)) {
if err == nil {
log.Info("Execute DDL succeeded",
zap.String("namespace", ctx.ChangefeedVars().ID.Namespace),
zap.String("changefeed", ctx.ChangefeedVars().ID.ID),
Expand Down
Loading