From 8f3fd7a7f30fa6eefc9bc95879eee2653ab6c937 Mon Sep 17 00:00:00 2001 From: dongmen <20351731+asddongmen@users.noreply.github.com> Date: Thu, 21 Jul 2022 07:25:08 +0800 Subject: [PATCH] filter(ticdc): Add row filter. (#6095) close pingcap/tiflow#6160 --- cdc/api/v1/validator.go | 21 +- cdc/api/v2/api_helpers.go | 49 +++- cdc/api/v2/api_helpers_mock.go | 8 +- cdc/api/v2/api_helpers_test.go | 25 +- cdc/api/v2/changefeed.go | 29 +- cdc/api/v2/changefeed_test.go | 13 +- cdc/api/v2/model.go | 78 ++++- cdc/api/v2/model_test.go | 42 +++ cdc/entry/metrics.go | 8 + cdc/entry/mounter.go | 86 ++++-- cdc/entry/mounter_test.go | 7 +- cdc/entry/schema_storage.go | 4 +- cdc/entry/validator.go | 23 +- cdc/model/errors.go | 6 +- cdc/model/errors_test.go | 2 +- cdc/model/schema_storage.go | 5 +- cdc/owner/feed_state_manager.go | 20 +- cdc/owner/metrics.go | 8 + cdc/owner/schema.go | 29 +- cdc/processor/pipeline/sorter.go | 2 + cdc/processor/processor.go | 5 +- cdc/puller/ddl_puller.go | 12 +- cdc/puller/metrics.go | 8 + cdc/sink/mq/dispatcher/event_router.go | 2 +- cdc/verification/checker.go | 8 +- cdc/verification/checker_test.go | 9 +- cdc/verification/mock_checkSumChecker.go | 6 +- cmd/kafka-consumer/main.go | 2 +- dm/dm/config/task_test.go | 2 +- errors.toml | 2 +- go.mod | 2 +- go.sum | 4 +- pkg/cmd/cli/cli_changefeed_create.go | 23 +- pkg/errors/cdc_errors.go | 2 +- pkg/errors/helper.go | 8 +- pkg/errors/helper_test.go | 2 +- pkg/filter/expr_filter.go | 32 ++- pkg/filter/expr_filter_test.go | 8 +- pkg/filter/filter.go | 223 +++++++-------- pkg/filter/filter_test.go | 165 ++++++++--- pkg/filter/sql_event_filter.go | 216 ++++++++++++++ pkg/filter/sql_event_filter_test.go | 347 +++++++++++++++++++++++ pkg/filter/utils.go | 156 ++++++++++ pkg/filter/utils_test.go | 159 +++++++++++ pkg/util/tz.go | 8 + pkg/util/tz_test.go | 7 + 46 files changed, 1598 insertions(+), 285 deletions(-) create mode 100644 pkg/filter/sql_event_filter.go create mode 100644 pkg/filter/sql_event_filter_test.go create mode 100644 pkg/filter/utils.go create mode 100644 pkg/filter/utils_test.go diff --git a/cdc/api/v1/validator.go b/cdc/api/v1/validator.go index adaff3ce5c4..852605e891c 100644 --- a/cdc/api/v1/validator.go +++ b/cdc/api/v1/validator.go @@ -144,13 +144,20 @@ func verifyCreateChangefeedConfig( SyncPointInterval: 10 * time.Minute, CreatorVersion: version.ReleaseVersion, } - + f, err := filter.NewFilter(replicaConfig, "") + if err != nil { + return nil, err + } + tableInfos, ineligibleTables, _, err := entry.VerifyTables(f, + up.KVStorage, changefeedConfig.StartTS) + if err != nil { + return nil, err + } + err = f.Verify(tableInfos) + if err != nil { + return nil, err + } if !replicaConfig.ForceReplicate && !changefeedConfig.IgnoreIneligibleTable { - ineligibleTables, _, err := entry.VerifyTables(replicaConfig, - up.KVStorage, changefeedConfig.StartTS) - if err != nil { - return nil, err - } if len(ineligibleTables) != 0 { return nil, cerror.ErrTableIneligible.GenWithStackByArgs(ineligibleTables) } @@ -188,7 +195,7 @@ func VerifyUpdateChangefeedConfig(ctx context.Context, // verify rules if len(changefeedConfig.FilterRules) != 0 { newInfo.Config.Filter.Rules = changefeedConfig.FilterRules - _, err = filter.VerifyRules(newInfo.Config.Filter) + _, err = filter.VerifyTableRules(newInfo.Config.Filter) if err != nil { return nil, cerror.ErrChangefeedUpdateRefused.GenWithStackByArgs(err.Error()) } diff --git a/cdc/api/v2/api_helpers.go b/cdc/api/v2/api_helpers.go index 8d3a2d5006a..36cc8b413bf 100644 --- a/cdc/api/v2/api_helpers.go +++ b/cdc/api/v2/api_helpers.go @@ -63,6 +63,8 @@ type APIV2Helpers interface { cfg *ChangefeedConfig, oldInfo *model.ChangeFeedInfo, oldUpInfo *model.UpstreamInfo, + kvStorage tidbkv.Storage, + checkpointTs uint64, ) (*model.ChangeFeedInfo, *model.UpstreamInfo, error) // verifyUpstream verifies the upstreamConfig @@ -212,13 +214,19 @@ func (APIV2HelpersImpl) verifyCreateChangefeedConfig( "if use force replicate, old value feature must be enabled") } } - _, err = filter.VerifyRules(replicaCfg.Filter) + f, err := filter.NewFilter(replicaCfg, "") if err != nil { - return nil, cerror.ErrChangefeedUpdateRefused.GenWithStackByArgs(err.Error()) + return nil, errors.Cause(err) + } + tableInfos, ineligibleTables, _, err := entry.VerifyTables(f, kvStorage, cfg.StartTs) + if err != nil { + return nil, errors.Cause(err) + } + err = f.Verify(tableInfos) + if err != nil { + return nil, errors.Cause(err) } - if !replicaCfg.ForceReplicate && !cfg.ReplicaConfig.IgnoreIneligibleTable { - ineligibleTables, _, err := entry.VerifyTables(replicaCfg, kvStorage, cfg.StartTs) if err != nil { return nil, err } @@ -279,9 +287,13 @@ func (h APIV2HelpersImpl) verifyUpstream(ctx context.Context, // verifyUpdateChangefeedConfig verifies config to update // a changefeed and returns a changefeedInfo -func (APIV2HelpersImpl) verifyUpdateChangefeedConfig(ctx context.Context, - cfg *ChangefeedConfig, oldInfo *model.ChangeFeedInfo, +func (APIV2HelpersImpl) verifyUpdateChangefeedConfig( + ctx context.Context, + cfg *ChangefeedConfig, + oldInfo *model.ChangeFeedInfo, oldUpInfo *model.UpstreamInfo, + kvStorage tidbkv.Storage, + checkpointTs uint64, ) (*model.ChangeFeedInfo, *model.UpstreamInfo, error) { newInfo, err := oldInfo.Clone() if err != nil { @@ -308,6 +320,23 @@ func (APIV2HelpersImpl) verifyUpdateChangefeedConfig(ctx context.Context, newInfo.Config = cfg.ReplicaConfig.ToInternalReplicaConfig() } + f, err := filter.NewFilter(newInfo.Config, "") + if err != nil { + return nil, nil, cerror.ErrChangefeedUpdateRefused. + GenWithStackByArgs(errors.Cause(err).Error()) + } + + tableInfos, _, _, err := entry.VerifyTables(f, kvStorage, checkpointTs) + if err != nil { + return nil, nil, cerror.ErrChangefeedUpdateRefused.GenWithStackByCause(err) + } + + err = f.Verify(tableInfos) + if err != nil { + return nil, nil, cerror.ErrChangefeedUpdateRefused. + GenWithStackByArgs(errors.Cause(err).Error()) + } + // verify SinkURI if cfg.SinkURI != "" { newInfo.SinkURI = cfg.SinkURI @@ -417,5 +446,11 @@ func (h APIV2HelpersImpl) getVerfiedTables(replicaConfig *config.ReplicaConfig, storage tidbkv.Storage, startTs uint64) (ineligibleTables, eligibleTables []model.TableName, err error, ) { - return entry.VerifyTables(replicaConfig, storage, startTs) + f, err := filter.NewFilter(replicaConfig, "") + if err != nil { + return + } + _, ineligibleTables, eligibleTables, err = entry. + VerifyTables(f, storage, startTs) + return } diff --git a/cdc/api/v2/api_helpers_mock.go b/cdc/api/v2/api_helpers_mock.go index 8a2ba6ed02d..bd598b129cc 100644 --- a/cdc/api/v2/api_helpers_mock.go +++ b/cdc/api/v2/api_helpers_mock.go @@ -116,9 +116,9 @@ func (mr *MockAPIV2HelpersMockRecorder) verifyResumeChangefeedConfig(ctx, pdClie } // verifyUpdateChangefeedConfig mocks base method. -func (m *MockAPIV2Helpers) verifyUpdateChangefeedConfig(ctx context.Context, cfg *ChangefeedConfig, oldInfo *model.ChangeFeedInfo, oldUpInfo *model.UpstreamInfo) (*model.ChangeFeedInfo, *model.UpstreamInfo, error) { +func (m *MockAPIV2Helpers) verifyUpdateChangefeedConfig(ctx context.Context, cfg *ChangefeedConfig, oldInfo *model.ChangeFeedInfo, oldUpInfo *model.UpstreamInfo, kvStorage kv.Storage, checkpointTs uint64) (*model.ChangeFeedInfo, *model.UpstreamInfo, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "verifyUpdateChangefeedConfig", ctx, cfg, oldInfo, oldUpInfo) + ret := m.ctrl.Call(m, "verifyUpdateChangefeedConfig", ctx, cfg, oldInfo, oldUpInfo, kvStorage, checkpointTs) ret0, _ := ret[0].(*model.ChangeFeedInfo) ret1, _ := ret[1].(*model.UpstreamInfo) ret2, _ := ret[2].(error) @@ -126,9 +126,9 @@ func (m *MockAPIV2Helpers) verifyUpdateChangefeedConfig(ctx context.Context, cfg } // verifyUpdateChangefeedConfig indicates an expected call of verifyUpdateChangefeedConfig. -func (mr *MockAPIV2HelpersMockRecorder) verifyUpdateChangefeedConfig(ctx, cfg, oldInfo, oldUpInfo interface{}) *gomock.Call { +func (mr *MockAPIV2HelpersMockRecorder) verifyUpdateChangefeedConfig(ctx, cfg, oldInfo, oldUpInfo, kvStorage, checkpointTs interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "verifyUpdateChangefeedConfig", reflect.TypeOf((*MockAPIV2Helpers)(nil).verifyUpdateChangefeedConfig), ctx, cfg, oldInfo, oldUpInfo) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "verifyUpdateChangefeedConfig", reflect.TypeOf((*MockAPIV2Helpers)(nil).verifyUpdateChangefeedConfig), ctx, cfg, oldInfo, oldUpInfo, kvStorage, checkpointTs) } // verifyUpstream mocks base method. diff --git a/cdc/api/v2/api_helpers_test.go b/cdc/api/v2/api_helpers_test.go index 503cdd0d98d..613d01cb02d 100644 --- a/cdc/api/v2/api_helpers_test.go +++ b/cdc/api/v2/api_helpers_test.go @@ -18,21 +18,19 @@ import ( "testing" "time" - tidbkv "github.com/pingcap/tidb/kv" + "github.com/pingcap/tiflow/cdc/entry" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/pkg/config" cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/stretchr/testify/require" ) -type mockStorage struct { - tidbkv.Storage -} - func TestVerifyCreateChangefeedConfig(t *testing.T) { ctx := context.Background() pdClient := &mockPDClient{} - storage := &mockStorage{} + helper := entry.NewSchemaTestHelper(t) + helper.Tk().MustExec("use test;") + storage := helper.Storage() provider := &mockStatusProvider{} cfg := &ChangefeedConfig{} h := &APIV2HelpersImpl{} @@ -94,17 +92,22 @@ func TestVerifyCreateChangefeedConfig(t *testing.T) { func TestVerifyUpdateChangefeedConfig(t *testing.T) { ctx := context.Background() cfg := &ChangefeedConfig{} - oldInfo := &model.ChangeFeedInfo{} + oldInfo := &model.ChangeFeedInfo{ + Config: config.GetDefaultReplicaConfig(), + } oldUpInfo := &model.UpstreamInfo{} + helper := entry.NewSchemaTestHelper(t) + helper.Tk().MustExec("use test;") + storage := helper.Storage() h := &APIV2HelpersImpl{} - newCfInfo, newUpInfo, err := h.verifyUpdateChangefeedConfig(ctx, cfg, oldInfo, oldUpInfo) + newCfInfo, newUpInfo, err := h.verifyUpdateChangefeedConfig(ctx, cfg, oldInfo, oldUpInfo, storage, 0) require.NotNil(t, err) require.Nil(t, newCfInfo) require.Nil(t, newUpInfo) // namespace and id can not be updated cfg.Namespace = "abc" cfg.ID = "1234" - newCfInfo, newUpInfo, err = h.verifyUpdateChangefeedConfig(ctx, cfg, oldInfo, oldUpInfo) + newCfInfo, newUpInfo, err = h.verifyUpdateChangefeedConfig(ctx, cfg, oldInfo, oldUpInfo, storage, 0) require.NotNil(t, err) require.Nil(t, newCfInfo) require.Nil(t, newUpInfo) @@ -120,7 +123,7 @@ func TestVerifyUpdateChangefeedConfig(t *testing.T) { cfg.KeyPath = "p3" cfg.SinkURI = "blackhole://" cfg.CertAllowedCN = []string{"c", "d"} - newCfInfo, newUpInfo, err = h.verifyUpdateChangefeedConfig(ctx, cfg, oldInfo, oldUpInfo) + newCfInfo, newUpInfo, err = h.verifyUpdateChangefeedConfig(ctx, cfg, oldInfo, oldUpInfo, storage, 0) require.Nil(t, err) // startTs can not be updated require.Equal(t, "table", string(newCfInfo.Config.Sink.TxnAtomicity)) @@ -139,6 +142,6 @@ func TestVerifyUpdateChangefeedConfig(t *testing.T) { require.Equal(t, "blackhole://", newCfInfo.SinkURI) oldInfo.StartTs = 10 cfg.TargetTs = 9 - newCfInfo, newUpInfo, err = h.verifyUpdateChangefeedConfig(ctx, cfg, oldInfo, oldUpInfo) + newCfInfo, newUpInfo, err = h.verifyUpdateChangefeedConfig(ctx, cfg, oldInfo, oldUpInfo, storage, 0) require.NotNil(t, err) } diff --git a/cdc/api/v2/changefeed.go b/cdc/api/v2/changefeed.go index 8c335695a6a..ec13d00c402 100644 --- a/cdc/api/v2/changefeed.go +++ b/cdc/api/v2/changefeed.go @@ -26,6 +26,7 @@ import ( "github.com/pingcap/tiflow/cdc/capture" "github.com/pingcap/tiflow/cdc/model" cerror "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/security" "github.com/pingcap/tiflow/pkg/txnutil/gc" "github.com/pingcap/tiflow/pkg/upstream" "go.uber.org/zap" @@ -202,6 +203,12 @@ func (h *OpenAPIV2) updateChangefeed(c *gin.Context) { GenWithStackByArgs("can only update changefeed config when it is stopped")) return } + cfStatus, err := h.capture.StatusProvider().GetChangeFeedStatus(ctx, changefeedID) + if err != nil { + _ = c.Error(err) + return + } + upInfo, err := h.capture.GetEtcdClient(). GetUpstreamInfo(ctx, cfInfo.UpstreamID, cfInfo.Namespace) if err != nil { @@ -226,8 +233,28 @@ func (h *OpenAPIV2) updateChangefeed(c *gin.Context) { zap.String("changefeedInfo", cfInfo.String()), zap.Any("upstreamInfo", upInfo)) + var pdAddrs []string + var credentials *security.Credential + if upInfo != nil { + pdAddrs = strings.Split(upInfo.PDEndpoints, ",") + credentials = &security.Credential{ + CAPath: upInfo.CAPath, + CertPath: upInfo.CertPath, + KeyPath: upInfo.KeyPath, + CertAllowedCN: upInfo.CertAllowedCN, + } + } + if len(updateCfConfig.PDAddrs) != 0 { + pdAddrs = updateCfConfig.PDAddrs + credentials = updateCfConfig.PDConfig.toCredential() + } + + storage, err := h.helpers.createTiStore(pdAddrs, credentials) + if err != nil { + _ = c.Error(errors.Trace(err)) + } newCfInfo, newUpInfo, err := h.helpers. - verifyUpdateChangefeedConfig(ctx, updateCfConfig, cfInfo, upInfo) + verifyUpdateChangefeedConfig(ctx, updateCfConfig, cfInfo, upInfo, storage, cfStatus.CheckpointTs) if err != nil { _ = c.Error(errors.Trace(err)) return diff --git a/cdc/api/v2/changefeed_test.go b/cdc/api/v2/changefeed_test.go index b39773cf3f7..55119b7d104 100644 --- a/cdc/api/v2/changefeed_test.go +++ b/cdc/api/v2/changefeed_test.go @@ -331,10 +331,17 @@ func TestUpdateChangefeed(t *testing.T) { verifyUpstream(gomock.Any(), gomock.Any(), gomock.Any()). Return(nil).AnyTimes() helpers.EXPECT(). - verifyUpdateChangefeedConfig(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + createTiStore(gomock.Any(), gomock.Any()). + Return(nil, nil). + AnyTimes() + helpers.EXPECT(). + verifyUpdateChangefeedConfig(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). Return(&model.ChangeFeedInfo{}, &model.UpstreamInfo{}, cerrors.ErrChangefeedUpdateRefused). Times(1) + statusProvider.changefeedStatus = &model.ChangeFeedStatus{ + CheckpointTs: 1, + } w = httptest.NewRecorder() req, _ = http.NewRequestWithContext(context.Background(), update.method, fmt.Sprintf(update.url, validID), bytes.NewReader(body)) @@ -347,7 +354,7 @@ func TestUpdateChangefeed(t *testing.T) { // case 7: update transaction failed helpers.EXPECT(). - verifyUpdateChangefeedConfig(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + verifyUpdateChangefeedConfig(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). Return(&model.ChangeFeedInfo{}, &model.UpstreamInfo{}, nil). Times(1) etcdClient.EXPECT(). @@ -366,7 +373,7 @@ func TestUpdateChangefeed(t *testing.T) { // case 8: success helpers.EXPECT(). - verifyUpdateChangefeedConfig(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + verifyUpdateChangefeedConfig(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). Return(oldCfInfo, &model.UpstreamInfo{}, nil). Times(1) etcdClient.EXPECT(). diff --git a/cdc/api/v2/model.go b/cdc/api/v2/model.go index c77d8e731f8..c15f9fe2777 100644 --- a/cdc/api/v2/model.go +++ b/cdc/api/v2/model.go @@ -18,6 +18,7 @@ import ( "time" "github.com/pingcap/errors" + bf "github.com/pingcap/tidb-tools/pkg/binlog-filter" tidbModel "github.com/pingcap/tidb/parser/model" filter "github.com/pingcap/tidb/util/table-filter" "github.com/pingcap/tiflow/cdc/model" @@ -134,14 +135,21 @@ func (c *ReplicaConfig) ToInternalReplicaConfig() *config.ReplicaConfig { } } } + var efs []*config.EventFilterRule + if len(c.Filter.EventFilters) != 0 { + efs = make([]*config.EventFilterRule, len(c.Filter.EventFilters)) + for i, ef := range c.Filter.EventFilters { + efs[i] = ef.ToInternalEventFilterRule() + } + } res.Filter = &config.FilterConfig{ Rules: c.Filter.Rules, MySQLReplicationRules: mySQLReplicationRules, IgnoreTxnStartTs: c.Filter.IgnoreTxnStartTs, DDLAllowlist: c.Filter.DDLAllowlist, + EventFilters: efs, } } - if c.Consistent != nil { res.Consistent = &config.ConsistentConfig{ Level: c.Consistent.Level, @@ -214,11 +222,21 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig { } } } + + var efs []EventFilterRule + if len(c.Filter.EventFilters) != 0 { + efs = make([]EventFilterRule, len(c.Filter.EventFilters)) + for i, ef := range c.Filter.EventFilters { + efs[i] = ToAPIEventFilterRule(ef) + } + } + res.Filter = &FilterConfig{ MySQLReplicationRules: mySQLReplicationRules, Rules: cloned.Filter.Rules, IgnoreTxnStartTs: cloned.Filter.IgnoreTxnStartTs, DDLAllowlist: cloned.Filter.DDLAllowlist, + EventFilters: efs, } } if cloned.Sink != nil { @@ -282,6 +300,64 @@ type FilterConfig struct { Rules []string `json:"rules,omitempty"` IgnoreTxnStartTs []uint64 `json:"ignore_txn_start_ts,omitempty"` DDLAllowlist []tidbModel.ActionType `json:"ddl_allow_list,omitempty"` + EventFilters []EventFilterRule `json:"event_filters"` +} + +// EventFilterRule is used by sql event filter and expression filter +type EventFilterRule struct { + Matcher []string `json:"matcher"` + IgnoreEvent []string `json:"ignore_event"` + // regular expression + IgnoreSQL []string `toml:"ignore_sql" json:"ignore_sql"` + // sql expression + IgnoreInsertValueExpr string `json:"ignore_insert_value_expr"` + IgnoreUpdateNewValueExpr string `json:"ignore_update_new_value_expr"` + IgnoreUpdateOldValueExpr string `json:"ignore_update_old_value_expr"` + IgnoreDeleteValueExpr string `json:"ignore_delete_value_expr"` +} + +// ToInternalEventFilterRule converts EventFilterRule to *config.EventFilterRule +func (e EventFilterRule) ToInternalEventFilterRule() *config.EventFilterRule { + res := &config.EventFilterRule{ + Matcher: e.Matcher, + IgnoreSQL: e.IgnoreSQL, + IgnoreInsertValueExpr: e.IgnoreInsertValueExpr, + IgnoreUpdateNewValueExpr: e.IgnoreUpdateNewValueExpr, + IgnoreUpdateOldValueExpr: e.IgnoreUpdateOldValueExpr, + IgnoreDeleteValueExpr: e.IgnoreDeleteValueExpr, + } + if len(e.IgnoreEvent) != 0 { + res.IgnoreEvent = make([]bf.EventType, len(e.IgnoreEvent)) + for i, et := range e.IgnoreEvent { + res.IgnoreEvent[i] = bf.EventType(et) + } + } + return res +} + +// ToAPIEventFilterRule converts *config.EventFilterRule to API EventFilterRule +func ToAPIEventFilterRule(er *config.EventFilterRule) EventFilterRule { + res := EventFilterRule{ + IgnoreInsertValueExpr: er.IgnoreInsertValueExpr, + IgnoreUpdateNewValueExpr: er.IgnoreUpdateNewValueExpr, + IgnoreUpdateOldValueExpr: er.IgnoreUpdateOldValueExpr, + IgnoreDeleteValueExpr: er.IgnoreDeleteValueExpr, + } + if len(er.Matcher) != 0 { + res.Matcher = make([]string, len(er.Matcher)) + copy(res.Matcher, er.Matcher) + } + if len(er.IgnoreSQL) != 0 { + res.IgnoreSQL = make([]string, len(er.IgnoreSQL)) + copy(res.IgnoreSQL, er.IgnoreSQL) + } + if len(er.IgnoreEvent) != 0 { + res.IgnoreEvent = make([]string, len(er.IgnoreEvent)) + for i, et := range er.IgnoreEvent { + res.IgnoreEvent[i] = string(et) + } + } + return res } // MySQLReplicationRules is a set of rules based on MySQL's replication tableFilter. diff --git a/cdc/api/v2/model_test.go b/cdc/api/v2/model_test.go index be4e7a6a81c..dca903f3b8b 100644 --- a/cdc/api/v2/model_test.go +++ b/cdc/api/v2/model_test.go @@ -18,6 +18,7 @@ import ( "strings" "testing" + bf "github.com/pingcap/tidb-tools/pkg/binlog-filter" parserModel "github.com/pingcap/tidb/parser/model" filter "github.com/pingcap/tidb/util/table-filter" "github.com/pingcap/tiflow/pkg/config" @@ -73,6 +74,15 @@ func TestToAPIReplicaConfig(t *testing.T) { DDLAllowlist: []parserModel.ActionType{ parserModel.ActionType(2), }, + EventFilters: []*config.EventFilterRule{{ + Matcher: []string{"test.t1", "test.t2"}, + IgnoreEvent: []bf.EventType{bf.AllDML, bf.AllDDL, bf.AlterTable}, + IgnoreSQL: []string{"^DROP TABLE", "ADD COLUMN"}, + IgnoreInsertValueExpr: "c >= 0", + IgnoreUpdateNewValueExpr: "age <= 55", + IgnoreUpdateOldValueExpr: "age >= 84", + IgnoreDeleteValueExpr: "age > 20", + }}, } cfg2 := ToAPIReplicaConfig(cfg).ToInternalReplicaConfig() require.Equal(t, "", cfg2.Sink.DispatchRules[0].DispatcherRule) @@ -120,3 +130,35 @@ func TestToCredential(t *testing.T) { require.Equal(t, len(credential.CertAllowedCN), 1) require.Equal(t, credential.CertAllowedCN[0], pdCfg.CertAllowedCN[0]) } + +func TestEventFilterRuleConvert(t *testing.T) { + cases := []struct { + inRule *config.EventFilterRule + apiRule EventFilterRule + }{ + { + inRule: &config.EventFilterRule{ + Matcher: []string{"test.t1", "test.t2"}, + IgnoreEvent: []bf.EventType{bf.AllDML, bf.AllDDL, bf.AlterTable}, + IgnoreSQL: []string{"^DROP TABLE", "ADD COLUMN"}, + IgnoreInsertValueExpr: "c >= 0", + IgnoreUpdateNewValueExpr: "age <= 55", + IgnoreUpdateOldValueExpr: "age >= 84", + IgnoreDeleteValueExpr: "age > 20", + }, + apiRule: EventFilterRule{ + Matcher: []string{"test.t1", "test.t2"}, + IgnoreEvent: []string{"all dml", "all ddl", "alter table"}, + IgnoreSQL: []string{"^DROP TABLE", "ADD COLUMN"}, + IgnoreInsertValueExpr: "c >= 0", + IgnoreUpdateNewValueExpr: "age <= 55", + IgnoreUpdateOldValueExpr: "age >= 84", + IgnoreDeleteValueExpr: "age > 20", + }, + }, + } + for _, c := range cases { + require.Equal(t, c.apiRule, ToAPIEventFilterRule(c.inRule)) + require.Equal(t, c.inRule, c.apiRule.ToInternalEventFilterRule()) + } +} diff --git a/cdc/entry/metrics.go b/cdc/entry/metrics.go index 2a6e904fb74..212927c71d4 100644 --- a/cdc/entry/metrics.go +++ b/cdc/entry/metrics.go @@ -33,10 +33,18 @@ var ( Name: "total_rows_count", Help: "The total count of rows that are processed by mounter", }, []string{"namespace", "changefeed"}) + ignoredDMLEventCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "ticdc", + Subsystem: "mounter", + Name: "ignored_dml_event_count", + Help: "The total count of dml events that are ignored in mounter.", + }, []string{"namespace", "changefeed"}) ) // InitMetrics registers all metrics in this file func InitMetrics(registry *prometheus.Registry) { registry.MustRegister(mountDuration) registry.MustRegister(totalRowsCountGauge) + registry.MustRegister(ignoredDMLEventCounter) } diff --git a/cdc/entry/mounter.go b/cdc/entry/mounter.go index db4e6defc60..2c5b8e6c6be 100644 --- a/cdc/entry/mounter.go +++ b/cdc/entry/mounter.go @@ -68,20 +68,21 @@ type Mounter interface { } type mounterImpl struct { - schemaStorage SchemaStorage - tz *time.Location - enableOldValue bool - changefeedID model.ChangeFeedID - filter *pfilter.Filter - metricMountDuration prometheus.Observer - metricTotalRows prometheus.Gauge + schemaStorage SchemaStorage + tz *time.Location + enableOldValue bool + changefeedID model.ChangeFeedID + filter pfilter.Filter + metricMountDuration prometheus.Observer + metricTotalRows prometheus.Gauge + metricIgnoredDMLEventCounter prometheus.Counter } // NewMounter creates a mounter func NewMounter(schemaStorage SchemaStorage, changefeedID model.ChangeFeedID, tz *time.Location, - filter *pfilter.Filter, + filter pfilter.Filter, enableOldValue bool, ) Mounter { return &mounterImpl{ @@ -93,6 +94,8 @@ func NewMounter(schemaStorage SchemaStorage, WithLabelValues(changefeedID.Namespace, changefeedID.ID), metricTotalRows: totalRowsCountGauge. WithLabelValues(changefeedID.Namespace, changefeedID.ID), + metricIgnoredDMLEventCounter: ignoredDMLEventCounter. + WithLabelValues(changefeedID.Namespace, changefeedID.ID), tz: tz, } } @@ -110,6 +113,10 @@ func (m *mounterImpl) DecodeEvent(ctx context.Context, pEvent *model.Polymorphic if err != nil { return false, errors.Trace(err) } + if row == nil { + log.Debug("message's row changed event is nil, it should be ignored", zap.Uint64("startTs", pEvent.StartTs)) + return true, nil + } pEvent.Row = row pEvent.RawKV.Value = nil @@ -118,12 +125,7 @@ func (m *mounterImpl) DecodeEvent(ctx context.Context, pEvent *model.Polymorphic if duration > time.Second { m.metricMountDuration.Observe(duration.Seconds()) } - - ignored := m.filter.ShouldIgnoreDMLEvent(row.StartTs, row.Table.Schema, row.Table.Table) - if ignored { - log.Debug("message's row changed event is nil, it should be ignored", zap.Uint64("startTs", row.StartTs)) - } - return ignored, nil + return false, nil } func (m *mounterImpl) unmarshalAndMountRowChanged(ctx context.Context, raw *model.RawKVEntry) (*model.RowChangedEvent, error) { @@ -173,11 +175,26 @@ func (m *mounterImpl) unmarshalAndMountRowChanged(ctx context.Context, raw *mode if rowKV == nil { return nil, nil } - return m.mountRowKVEntry(tableInfo, rowKV, raw.ApproximateDataSize()) + row, rawRow, err := m.mountRowKVEntry(tableInfo, rowKV, raw.ApproximateDataSize()) + if err != nil { + return nil, err + } + // We need to filter a row here because we need its tableInfo. + ignore, err := m.filter.ShouldIgnoreDMLEvent(row, rawRow, tableInfo) + if err != nil { + return nil, err + } + // TODO(dongmen): try to find better way to indicate this row has been filtered. + // Return a nil RowChangedEvent if this row should be ignored. + if ignore { + m.metricIgnoredDMLEventCounter.Inc() + return nil, nil + } + return row, nil } return nil, nil }() - if err != nil { + if err != nil && !cerror.IsChangefeedUnRetryableError(err) { log.Error("failed to mount and unmarshals entry, start to print debug info", zap.Error(err)) snap.PrintStatus(log.Error) } @@ -256,8 +273,9 @@ func UnmarshalDDL(raw *model.RawKVEntry) (*timodel.Job, error) { return job, nil } -func datum2Column(tableInfo *model.TableInfo, datums map[int64]types.Datum, fillWithDefaultValue bool) ([]*model.Column, error) { +func datum2Column(tableInfo *model.TableInfo, datums map[int64]types.Datum, fillWithDefaultValue bool) ([]*model.Column, []types.Datum, error) { cols := make([]*model.Column, len(tableInfo.RowColumnsOffset)) + rawCols := make([]types.Datum, len(tableInfo.RowColumnsOffset)) for _, colInfo := range tableInfo.Columns { colSize := 0 if !model.IsColCDCVisible(colInfo) { @@ -275,16 +293,17 @@ func datum2Column(tableInfo *model.TableInfo, datums map[int64]types.Datum, fill if exist { colValue, size, warn, err = formatColVal(colDatums, colInfo) } else if fillWithDefaultValue { - colValue, size, warn, err = getDefaultOrZeroValue(colInfo) + colDatums, colValue, size, warn, err = getDefaultOrZeroValue(colInfo) } if err != nil { - return nil, errors.Trace(err) + return nil, nil, errors.Trace(err) } if warn != "" { log.Warn(warn, zap.String("table", tableInfo.TableName.String()), zap.String("column", colInfo.Name.String())) } defaultValue := getDDLDefaultDefinition(colInfo) colSize += size + rawCols[tableInfo.RowColumnsOffset[colInfo.ID]] = colDatums cols[tableInfo.RowColumnsOffset[colInfo.ID]] = &model.Column{ Name: colName, Type: colInfo.GetType(), @@ -296,13 +315,15 @@ func datum2Column(tableInfo *model.TableInfo, datums map[int64]types.Datum, fill ApproximateBytes: colSize + sizeOfEmptyColumn, } } - return cols, nil + return cols, rawCols, nil } -func (m *mounterImpl) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntry, dataSize int64) (*model.RowChangedEvent, error) { +func (m *mounterImpl) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntry, dataSize int64) (*model.RowChangedEvent, model.RowChangedDatums, error) { var err error // Decode previous columns. var preCols []*model.Column + var preRawCols []types.Datum + var rawRow model.RowChangedDatums // Since we now always use old value internally, // we need to control the output(sink will use the PreColumns field to determine whether to output old value). // Normally old value is output when only enableOldValue is on, @@ -311,9 +332,9 @@ func (m *mounterImpl) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntr if row.PreRowExist { // FIXME(leoppro): using pre table info to mounter pre column datum // the pre column and current column in one event may using different table info - preCols, err = datum2Column(tableInfo, row.PreRow, m.enableOldValue) + preCols, preRawCols, err = datum2Column(tableInfo, row.PreRow, m.enableOldValue) if err != nil { - return nil, errors.Trace(err) + return nil, rawRow, errors.Trace(err) } // NOTICE: When the old Value feature is off, @@ -329,10 +350,11 @@ func (m *mounterImpl) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntr } var cols []*model.Column + var rawCols []types.Datum if row.RowExist { - cols, err = datum2Column(tableInfo, row.Row, true) + cols, rawCols, err = datum2Column(tableInfo, row.Row, true) if err != nil { - return nil, errors.Trace(err) + return nil, rawRow, errors.Trace(err) } } @@ -353,6 +375,8 @@ func (m *mounterImpl) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntr _, _, colInfos := tableInfo.GetRowColInfos() + rawRow.PreRowDatums = preRawCols + rawRow.RowDatums = rawCols return &model.RowChangedEvent{ StartTs: row.StartTs, CommitTs: row.CRTs, @@ -369,7 +393,7 @@ func (m *mounterImpl) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntr PreColumns: preCols, IndexColumns: tableInfo.IndexColumnsOffset, ApproximateDataSize: dataSize, - }, nil + }, rawRow, nil } var emptyBytes = make([]byte, 0) @@ -466,7 +490,7 @@ func formatColVal(datum types.Datum, col *timodel.ColumnInfo) ( // https://github.com/golang/go/blob/go1.17.4/src/database/sql/driver/types.go#L236 // Supported type is: nil, basic type(Int, Int8,..., Float32, Float64, String), Slice(uint8), other types not support // TODO: Check default expr support -func getDefaultOrZeroValue(col *timodel.ColumnInfo) (interface{}, int, string, error) { +func getDefaultOrZeroValue(col *timodel.ColumnInfo) (types.Datum, any, int, string, error) { var d types.Datum // NOTICE: SHOULD use OriginDefaultValue here, more info pls ref to // https://github.com/pingcap/tiflow/issues/4048 @@ -476,7 +500,7 @@ func getDefaultOrZeroValue(col *timodel.ColumnInfo) (interface{}, int, string, e // Ref: https://github.com/pingcap/tidb/blob/d2c352980a43bb593db81fd1db996f47af596d91/table/column.go#L489 if col.GetOriginDefaultValue() != nil { d = types.NewDatum(col.GetOriginDefaultValue()) - return d.GetValue(), sizeOfDatum(d), "", nil + return d, d.GetValue(), sizeOfDatum(d), "", nil } if !mysql.HasNotNullFlag(col.GetFlag()) { @@ -492,7 +516,7 @@ func getDefaultOrZeroValue(col *timodel.ColumnInfo) (interface{}, int, string, e // the default value is the first element of the enum list d = types.NewDatum(col.FieldType.GetElem(0)) case mysql.TypeString, mysql.TypeVarString, mysql.TypeVarchar: - return emptyBytes, sizeOfEmptyBytes, "", nil + return d, emptyBytes, sizeOfEmptyBytes, "", nil default: d = table.GetZeroValue(col) if d.IsNull() { @@ -500,8 +524,8 @@ func getDefaultOrZeroValue(col *timodel.ColumnInfo) (interface{}, int, string, e } } } - - return formatColVal(d, col) + v, size, warn, err := formatColVal(d, col) + return d, v, size, warn, err } func getDDLDefaultDefinition(col *timodel.ColumnInfo) interface{} { diff --git a/cdc/entry/mounter_test.go b/cdc/entry/mounter_test.go index 684a7611ada..444c386e1ea 100644 --- a/cdc/entry/mounter_test.go +++ b/cdc/entry/mounter_test.go @@ -292,7 +292,7 @@ func testMounterDisableOldValue(t *testing.T, tc struct { require.Nil(t, err) scheamStorage.AdvanceResolvedTs(ver.Ver) config := config.GetDefaultReplicaConfig() - filter, err := pfilter.NewFilter(config) + filter, err := pfilter.NewFilter(config, "") require.Nil(t, err) mounter := NewMounter(scheamStorage, model.DefaultChangeFeedID("c1"), @@ -959,7 +959,7 @@ func TestGetDefaultZeroValue(t *testing.T) { } for _, tc := range testCases { - val, _, _, _ := getDefaultOrZeroValue(&tc.ColInfo) + _, val, _, _, _ := getDefaultOrZeroValue(&tc.ColInfo) require.Equal(t, tc.Res, val, tc.Name) val = getDDLDefaultDefinition(&tc.ColInfo) require.Equal(t, tc.Default, val, tc.Name) @@ -983,14 +983,13 @@ func TestDecodeEventIgnoreRow(t *testing.T) { cfg := config.GetDefaultReplicaConfig() cfg.Filter.Rules = []string{"test.student", "test.computer"} - filter, err := pfilter.NewFilter(cfg) + filter, err := pfilter.NewFilter(cfg, "") require.Nil(t, err) ver, err := helper.Storage().CurrentVersion(oracle.GlobalTxnScope) require.Nil(t, err) schemaStorage, err := NewSchemaStorage(helper.GetCurrentMeta(), ver.Ver, filter, false, cfID) require.Nil(t, err) - // apply ddl to schemaStorage for _, ddl := range ddls { job := helper.DDL2Job(ddl) diff --git a/cdc/entry/schema_storage.go b/cdc/entry/schema_storage.go index 3718d359f35..53557e4122d 100644 --- a/cdc/entry/schema_storage.go +++ b/cdc/entry/schema_storage.go @@ -57,7 +57,7 @@ type schemaStorageImpl struct { gcTs uint64 resolvedTs uint64 - filter *filter.Filter + filter filter.Filter forceReplicate bool id model.ChangeFeedID @@ -65,7 +65,7 @@ type schemaStorageImpl struct { // NewSchemaStorage creates a new schema storage func NewSchemaStorage( - meta *timeta.Meta, startTs uint64, filter *filter.Filter, + meta *timeta.Meta, startTs uint64, filter filter.Filter, forceReplicate bool, id model.ChangeFeedID, ) (SchemaStorage, error) { var snap *schema.Snapshot diff --git a/cdc/entry/validator.go b/cdc/entry/validator.go index ca7ab7a1a3e..720a93e3afc 100644 --- a/cdc/entry/validator.go +++ b/cdc/entry/validator.go @@ -19,31 +19,31 @@ import ( "github.com/pingcap/tiflow/cdc/entry/schema" "github.com/pingcap/tiflow/cdc/kv" "github.com/pingcap/tiflow/cdc/model" - "github.com/pingcap/tiflow/pkg/config" "github.com/pingcap/tiflow/pkg/filter" ) // VerifyTables catalog tables specified by ReplicaConfig into // eligible (has an unique index or primary key) and ineligible tables. -func VerifyTables(replicaConfig *config.ReplicaConfig, - storage tidbkv.Storage, startTs uint64) (ineligibleTables, - eligibleTables []model.TableName, err error, +func VerifyTables( + f filter.Filter, + storage tidbkv.Storage, + startTs uint64) ( + tableInfos []*model.TableInfo, + ineligibleTables, + eligibleTables []model.TableName, + err error, ) { - filter, err := filter.NewFilter(replicaConfig) - if err != nil { - return nil, nil, errors.Trace(err) - } meta, err := kv.GetSnapshotMeta(storage, startTs) if err != nil { - return nil, nil, errors.Trace(err) + return nil, nil, nil, errors.Trace(err) } snap, err := schema.NewSingleSnapshotFromMeta(meta, startTs, false /* explicitTables */) if err != nil { - return nil, nil, errors.Trace(err) + return nil, nil, nil, errors.Trace(err) } snap.IterTables(true, func(tableInfo *model.TableInfo) { - if filter.ShouldIgnoreTable(tableInfo.TableName.Schema, tableInfo.TableName.Table) { + if f.ShouldIgnoreTable(tableInfo.TableName.Schema, tableInfo.TableName.Table) { return } // Sequence is not supported yet, TiCDC needs to filter all sequence tables. @@ -51,6 +51,7 @@ func VerifyTables(replicaConfig *config.ReplicaConfig, if tableInfo.IsSequence() { return } + tableInfos = append(tableInfos, tableInfo) if !tableInfo.IsEligible(false /* forceReplicate */) { ineligibleTables = append(ineligibleTables, tableInfo.TableName) } else { diff --git a/cdc/model/errors.go b/cdc/model/errors.go index c9aa07a3718..e25bf41ab1d 100644 --- a/cdc/model/errors.go +++ b/cdc/model/errors.go @@ -26,7 +26,7 @@ type RunningError struct { Message string `json:"message"` } -// IsChangefeedNotRetryError return true if a running error contains a changefeed not retry error. -func (r RunningError) IsChangefeedNotRetryError() bool { - return cerror.IsChangefeedNotRetryError(errors.New(r.Message + r.Code)) +// IsChangefeedUnRetryableError return true if a running error contains a changefeed not retry error. +func (r RunningError) IsChangefeedUnRetryableError() bool { + return cerror.IsChangefeedUnRetryableError(errors.New(r.Message + r.Code)) } diff --git a/cdc/model/errors_test.go b/cdc/model/errors_test.go index af1225ed835..0b4a1f1ed6c 100644 --- a/cdc/model/errors_test.go +++ b/cdc/model/errors_test.go @@ -52,6 +52,6 @@ func TestIsChangefeedNotRetryError(t *testing.T) { } for _, c := range cases { - require.Equal(t, c.result, c.err.IsChangefeedNotRetryError()) + require.Equal(t, c.result, c.err.IsChangefeedUnRetryableError()) } } diff --git a/cdc/model/schema_storage.go b/cdc/model/schema_storage.go index c589268b34b..111adaa214d 100644 --- a/cdc/model/schema_storage.go +++ b/cdc/model/schema_storage.go @@ -35,8 +35,9 @@ const ( // TableInfo provides meta data describing a DB table. type TableInfo struct { *model.TableInfo - SchemaID int64 - TableName TableName + SchemaID int64 + TableName TableName + // TableInfoVersion record the tso of create the table info. TableInfoVersion uint64 columnsOffset map[int64]int indicesOffset map[int64]int diff --git a/cdc/owner/feed_state_manager.go b/cdc/owner/feed_state_manager.go index d3b5c1f7980..58aff28393f 100644 --- a/cdc/owner/feed_state_manager.go +++ b/cdc/owner/feed_state_manager.go @@ -141,7 +141,7 @@ func (m *feedStateManager) Tick(state *orchestrator.ChangefeedReactorState) (adm m.shouldBeRunning = false return case model.StateError: - if m.state.Info.Error.IsChangefeedNotRetryError() { + if m.state.Info.Error.IsChangefeedUnRetryableError() { m.shouldBeRunning = false return } @@ -423,6 +423,24 @@ func (m *feedStateManager) handleError(errs ...*model.RunningError) { } } + // we need to patch changefeed unretryable error to the changefeed info, + // so we have to iterate all errs here to check wether it is a unretryable + // error in errs + for _, err := range errs { + if err.IsChangefeedUnRetryableError() { + m.state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) { + if info == nil { + return nil, false, nil + } + info.Error = err + return info, true, nil + }) + m.shouldBeRunning = false + m.patchState(model.StateError) + return + } + } + m.state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) { if info == nil { return nil, false, nil diff --git a/cdc/owner/metrics.go b/cdc/owner/metrics.go index a3b853ea708..dc1dce54fce 100644 --- a/cdc/owner/metrics.go +++ b/cdc/owner/metrics.go @@ -93,6 +93,13 @@ var ( Help: "Bucketed histogram of owner close changefeed reactor time (s).", Buckets: prometheus.ExponentialBuckets(0.01 /* 10 ms */, 2, 18), }) + changefeedIgnoredDDLEventCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "ticdc", + Subsystem: "owner", + Name: "ignored_ddl_event_count", + Help: "The total count of ddl events that are ignored in changefeed.", + }, []string{"namespace", "changefeed"}) ) const ( @@ -119,4 +126,5 @@ func InitMetrics(registry *prometheus.Registry) { registry.MustRegister(changefeedStatusGauge) registry.MustRegister(changefeedTickDuration) registry.MustRegister(changefeedCloseDuration) + registry.MustRegister(changefeedIgnoredDDLEventCounter) } diff --git a/cdc/owner/schema.go b/cdc/owner/schema.go index 452ecdd6523..4330df06335 100644 --- a/cdc/owner/schema.go +++ b/cdc/owner/schema.go @@ -26,16 +26,18 @@ import ( cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/filter" "github.com/pingcap/tiflow/pkg/util" + "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" ) type schemaWrap4Owner struct { - schemaSnapshot *schema.Snapshot - filter *filter.Filter - config *config.ReplicaConfig - allPhysicalTablesCache []model.TableID - ddlHandledTs model.Ts - id model.ChangeFeedID + schemaSnapshot *schema.Snapshot + filter filter.Filter + config *config.ReplicaConfig + allPhysicalTablesCache []model.TableID + ddlHandledTs model.Ts + id model.ChangeFeedID + metricIgnoreDDLEventCounter prometheus.Counter } func newSchemaWrap4Owner( @@ -54,7 +56,9 @@ func newSchemaWrap4Owner( if err != nil { return nil, errors.Trace(err) } - f, err := filter.NewFilter(config) + // It is no matter to use a empty as timezone here because schemaWrap4Owner + // doesn't use expression filter's method. + f, err := filter.NewFilter(config, "") if err != nil { return nil, errors.Trace(err) } @@ -64,6 +68,8 @@ func newSchemaWrap4Owner( config: config, ddlHandledTs: startTs, id: id, + metricIgnoreDDLEventCounter: changefeedIgnoredDDLEventCounter. + WithLabelValues(id.Namespace, id.ID), }, nil } @@ -189,8 +195,8 @@ func (s *schemaWrap4Owner) parseRenameTables( func (s *schemaWrap4Owner) BuildDDLEvents( job *timodel.Job, ) ([]*model.DDLEvent, error) { - var preTableInfo *model.TableInfo var err error + var preTableInfo *model.TableInfo ddlEvents := make([]*model.DDLEvent, 0) switch job.Type { case timodel.ActionRenameTables: @@ -216,7 +222,12 @@ func (s *schemaWrap4Owner) BuildDDLEvents( // filter out ddl here res := make([]*model.DDLEvent, 0, len(ddlEvents)) for _, event := range ddlEvents { - if s.filter.ShouldIgnoreDDLEvent(event) { + ignored, err := s.filter.ShouldIgnoreDDLEvent(event) + if err != nil { + return nil, errors.Trace(err) + } + if ignored { + s.metricIgnoreDDLEventCounter.Inc() log.Info( "DDL event ignored", zap.String("query", event.Query), diff --git a/cdc/processor/pipeline/sorter.go b/cdc/processor/pipeline/sorter.go index b3b35f4dcf8..5ad0685228d 100644 --- a/cdc/processor/pipeline/sorter.go +++ b/cdc/processor/pipeline/sorter.go @@ -226,6 +226,8 @@ func (n *sorterNode) start( if msg.RawKV.OpType != model.OpTypeResolved { ignored, err := n.mounter.DecodeEvent(ctx, msg) if err != nil { + log.Error("Got an error from mounter, sorter will stop.", zap.Error(err)) + ctx.Throw(err) return errors.Trace(err) } if ignored { diff --git a/cdc/processor/processor.go b/cdc/processor/processor.go index 85549bb52e0..187c6405aa7 100644 --- a/cdc/processor/processor.go +++ b/cdc/processor/processor.go @@ -67,7 +67,7 @@ type processor struct { schemaStorage entry.SchemaStorage lastSchemaTs model.Ts - filter *filter.Filter + filter filter.Filter mounter entry.Mounter sink sink.Sink redoManager redo.LogManager @@ -639,7 +639,8 @@ func (p *processor) lazyInitImpl(ctx cdcContext.Context) error { }() var err error - p.filter, err = filter.NewFilter(p.changefeed.Info.Config) + p.filter, err = filter.NewFilter(p.changefeed.Info.Config, + util.GetTimeZoneName(contextutil.TimezoneFromCtx(ctx))) if err != nil { return errors.Trace(err) } diff --git a/cdc/puller/ddl_puller.go b/cdc/puller/ddl_puller.go index dd1ec597df7..8ef5bf1d19f 100644 --- a/cdc/puller/ddl_puller.go +++ b/cdc/puller/ddl_puller.go @@ -32,6 +32,7 @@ import ( "github.com/pingcap/tiflow/pkg/regionspan" "github.com/pingcap/tiflow/pkg/upstream" "github.com/pingcap/tiflow/pkg/util" + "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" "golang.org/x/sync/errgroup" ) @@ -56,7 +57,7 @@ type DDLPuller interface { type ddlPullerImpl struct { puller Puller - filter *filter.Filter + filter filter.Filter mu sync.Mutex resolvedTS uint64 @@ -68,6 +69,7 @@ type ddlPullerImpl struct { clock clock.Clock lastResolvedTsAdvancedTime time.Time + metricDiscardedDDLCounter prometheus.Counter } // NewDDLPuller return a puller for DDL Event @@ -77,11 +79,12 @@ func NewDDLPuller(ctx context.Context, startTs uint64, changefeed model.ChangeFeedID, ) (DDLPuller, error) { - f, err := filter.NewFilter(replicaConfig) + // It is no matter to use a empty as timezone here because DDLPuller + // doesn't use expression filter's method. + f, err := filter.NewFilter(replicaConfig, "") if err != nil { return nil, errors.Trace(err) } - // add "_ddl_puller" to make it different from table pullers. changefeed.ID += "_ddl_puller" @@ -110,6 +113,8 @@ func NewDDLPuller(ctx context.Context, cancel: func() {}, clock: clock.New(), changefeedID: changefeed, + metricDiscardedDDLCounter: discardedDDLCounter. + WithLabelValues(changefeed.Namespace, changefeed.ID), }, nil } @@ -132,6 +137,7 @@ func (h *ddlPullerImpl) handleRawDDL(rawDDL *model.RawKVEntry) error { return nil } if h.filter.ShouldDiscardDDL(job.Type) { + h.metricDiscardedDDLCounter.Inc() log.Info("discard the ddl job", zap.String("namespace", h.changefeedID.Namespace), zap.String("changefeed", h.changefeedID.ID), diff --git a/cdc/puller/metrics.go b/cdc/puller/metrics.go index c195ab8d92f..ae38104b6bf 100644 --- a/cdc/puller/metrics.go +++ b/cdc/puller/metrics.go @@ -62,6 +62,13 @@ var ( Help: "Puller event channel size", Buckets: prometheus.ExponentialBuckets(1, 2, 8), }, []string{"namespace", "changefeed"}) + discardedDDLCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "ticdc", + Subsystem: "puller", + Name: "discarded_ddl_count", + Help: "The total count of ddl job that are discarded in ddl puller.", + }, []string{"namespace", "changefeed"}) ) // InitMetrics registers all metrics in this file @@ -72,4 +79,5 @@ func InitMetrics(registry *prometheus.Registry) { registry.MustRegister(memBufferSizeGauge) registry.MustRegister(outputChanSizeHistogram) registry.MustRegister(eventChanSizeHistogram) + registry.MustRegister(discardedDDLCounter) } diff --git a/cdc/sink/mq/dispatcher/event_router.go b/cdc/sink/mq/dispatcher/event_router.go index 520a8dabb42..c72e3c78091 100644 --- a/cdc/sink/mq/dispatcher/event_router.go +++ b/cdc/sink/mq/dispatcher/event_router.go @@ -96,7 +96,7 @@ func NewEventRouter(cfg *config.ReplicaConfig, defaultTopic string) (*EventRoute for _, ruleConfig := range ruleConfigs { f, err := filter.Parse(ruleConfig.Matcher) if err != nil { - return nil, cerror.WrapError(cerror.ErrFilterRuleInvalid, err) + return nil, cerror.WrapError(cerror.ErrFilterRuleInvalid, err, ruleConfig.Matcher) } if !cfg.CaseSensitive { f = filter.CaseInsensitive(f) diff --git a/cdc/verification/checker.go b/cdc/verification/checker.go index 894e287ad3a..dcbdab46835 100644 --- a/cdc/verification/checker.go +++ b/cdc/verification/checker.go @@ -27,7 +27,7 @@ import ( //go:generate mockery --name=checkSumChecker --inpackage type checkSumChecker interface { - getCheckSum(ctx context.Context, db string, f *filter.Filter) (map[string]string, error) + getCheckSum(ctx context.Context, db string, f filter.Filter) (map[string]string, error) getAllDBs(ctx context.Context) ([]string, error) } @@ -42,7 +42,7 @@ func newChecker(db *sql.DB) *checker { } } -func (c *checker) getCheckSum(ctx context.Context, db string, f *filter.Filter) (map[string]string, error) { +func (c *checker) getCheckSum(ctx context.Context, db string, f filter.Filter) (map[string]string, error) { _, err := c.db.ExecContext(ctx, fmt.Sprintf("USE %s", db)) if err != nil { return nil, cerror.WrapError(cerror.ErrMySQLQueryError, err) @@ -93,7 +93,7 @@ func (c *checker) getAllDBs(ctx context.Context) ([]string, error) { return dbs, nil } -func (c *checker) getAllTables(ctx context.Context, db string, f *filter.Filter) ([]string, error) { +func (c *checker) getAllTables(ctx context.Context, db string, f filter.Filter) ([]string, error) { rows, err := c.db.QueryContext(ctx, "SHOW TABLES") if err != nil { return nil, cerror.WrapError(cerror.ErrMySQLQueryError, err) @@ -179,7 +179,7 @@ func (c *checker) doChecksum(ctx context.Context, columns []columnInfo, database } // TODO: use ADMIN CHECKSUM TABLE for tidb if needed -var compareCheckSum = func(ctx context.Context, upstreamChecker, downstreamChecker checkSumChecker, f *filter.Filter) (bool, error) { +var compareCheckSum = func(ctx context.Context, upstreamChecker, downstreamChecker checkSumChecker, f filter.Filter) (bool, error) { dbs, err := upstreamChecker.getAllDBs(ctx) if err != nil { return false, err diff --git a/cdc/verification/checker_test.go b/cdc/verification/checker_test.go index 4f84fde52e2..35ace1f3e5c 100644 --- a/cdc/verification/checker_test.go +++ b/cdc/verification/checker_test.go @@ -72,8 +72,9 @@ func TestCompareCheckSum(t *testing.T) { mockUpChecker.On("getCheckSum", mock.Anything, mock.Anything, mock.Anything).Return(tt.wantSource, tt.wantSourceErr) mockDownChecker := &mockCheckSumChecker{} mockDownChecker.On("getCheckSum", mock.Anything, mock.Anything, mock.Anything).Return(tt.wantSink, tt.wantSinkErr) - - ret, err := compareCheckSum(context.Background(), mockUpChecker, mockDownChecker, &filter.Filter{}) + f, err := filter.NewFilter(config.GetDefaultReplicaConfig(), "") + require.Nil(t, err) + ret, err := compareCheckSum(context.Background(), mockUpChecker, mockDownChecker, f) require.Equal(t, tt.wantRet, ret, tt.name) if tt.wantDBErr != nil { require.True(t, errors.ErrorEqual(err, tt.wantDBErr), tt.name) @@ -234,7 +235,7 @@ func TestGetAllTables(t *testing.T) { c := &checker{ db: db, } - f, err := filter.NewFilter(tt.args.filterConfig) + f, err := filter.NewFilter(tt.args.filterConfig, "") require.Nil(t, err, tt.name) ret, err := c.getAllTables(context.Background(), "d", f) require.Equal(t, tt.wanRet, ret, tt.name) @@ -306,7 +307,7 @@ func TestGetCheckSum(t *testing.T) { c := &checker{ db: db, } - f, err := filter.NewFilter(config.GetDefaultReplicaConfig()) + f, err := filter.NewFilter(config.GetDefaultReplicaConfig(), "") require.Nil(t, err) ret, err := c.getCheckSum(context.Background(), "db", f) require.Equal(t, tt.wantRet, ret, tt.name) diff --git a/cdc/verification/mock_checkSumChecker.go b/cdc/verification/mock_checkSumChecker.go index 91850541b7b..512fe02ec32 100644 --- a/cdc/verification/mock_checkSumChecker.go +++ b/cdc/verification/mock_checkSumChecker.go @@ -51,11 +51,11 @@ func (_m *mockCheckSumChecker) getAllDBs(ctx context.Context) ([]string, error) } // getCheckSum provides a mock function with given fields: ctx, db, f -func (_m *mockCheckSumChecker) getCheckSum(ctx context.Context, db string, f *filter.Filter) (map[string]string, error) { +func (_m *mockCheckSumChecker) getCheckSum(ctx context.Context, db string, f filter.Filter) (map[string]string, error) { ret := _m.Called(ctx, db, f) var r0 map[string]string - if rf, ok := ret.Get(0).(func(context.Context, string, *filter.Filter) map[string]string); ok { + if rf, ok := ret.Get(0).(func(context.Context, string, filter.Filter) map[string]string); ok { r0 = rf(ctx, db, f) } else { if ret.Get(0) != nil { @@ -64,7 +64,7 @@ func (_m *mockCheckSumChecker) getCheckSum(ctx context.Context, db string, f *fi } var r1 error - if rf, ok := ret.Get(1).(func(context.Context, string, *filter.Filter) error); ok { + if rf, ok := ret.Get(1).(func(context.Context, string, filter.Filter) error); ok { r1 = rf(ctx, db, f) } else { r1 = ret.Error(1) diff --git a/cmd/kafka-consumer/main.go b/cmd/kafka-consumer/main.go index 9384cc47265..34d7bb58206 100644 --- a/cmd/kafka-consumer/main.go +++ b/cmd/kafka-consumer/main.go @@ -194,7 +194,7 @@ func init() { zap.Error(err), zap.String("config", configFile)) } - if _, err := filter.VerifyRules(eventRouterReplicaConfig.Filter); err != nil { + if _, err := filter.VerifyTableRules(eventRouterReplicaConfig.Filter); err != nil { log.Panic("verify rule failed", zap.Error(err)) } } diff --git a/dm/dm/config/task_test.go b/dm/dm/config/task_test.go index 8cae420cff4..b6a49354282 100644 --- a/dm/dm/config/task_test.go +++ b/dm/dm/config/task_test.go @@ -595,7 +595,7 @@ func (t *testConfig) TestGenAndFromSubTaskConfigs(c *C) { filterRule1 = bf.BinlogEventRule{ SchemaPattern: "db*", TablePattern: "tbl1*", - Events: []bf.EventType{bf.CreateIndex, bf.AlertTable}, + Events: []bf.EventType{bf.CreateIndex, bf.AlterTable}, Action: bf.Do, } filterRule2 = bf.BinlogEventRule{ diff --git a/errors.toml b/errors.toml index 95502a1845d..3cdef912537 100755 --- a/errors.toml +++ b/errors.toml @@ -308,7 +308,7 @@ exec DDL failed ["CDC:ErrExpressionColumnNotFound"] error = ''' -invalid filter expressions. Can not found column '%s' from table '%s' in: %s +invalid filter expression(s). Cannot find column '%s' from table '%s' in: %s ''' ["CDC:ErrFailedToFilterDDL"] diff --git a/go.mod b/go.mod index 7e8b08a482d..36765d7086c 100644 --- a/go.mod +++ b/go.mod @@ -54,7 +54,7 @@ require ( github.com/pingcap/kvproto v0.0.0-20220705090230-a5d4ffd2ba33 github.com/pingcap/log v1.1.0 github.com/pingcap/tidb v1.1.0-beta.0.20220713062705-50437e1d4087 - github.com/pingcap/tidb-tools v6.0.1-0.20220516050036-b3ea358e374a+incompatible + github.com/pingcap/tidb-tools v6.1.1-0.20220715000306-1d2f00da8c3e+incompatible github.com/pingcap/tidb/parser v0.0.0-20220713065705-0e13d5d00990 github.com/prometheus/client_golang v1.12.2 github.com/prometheus/client_model v0.2.0 diff --git a/go.sum b/go.sum index 8a946d3d1c9..528600dc367 100644 --- a/go.sum +++ b/go.sum @@ -984,8 +984,8 @@ github.com/pingcap/tidb v1.1.0-beta.0.20220511160835-98c31070d958/go.mod h1:luW4 github.com/pingcap/tidb v1.1.0-beta.0.20220713062705-50437e1d4087 h1:dtYlIZvf7racGM63VaFMkbptRAvSYQ6o2FFgTcJGRNg= github.com/pingcap/tidb v1.1.0-beta.0.20220713062705-50437e1d4087/go.mod h1:tteDbokLuRBJUhSl9Pxcf42wGMCuJWlObpOr+zQOHeY= github.com/pingcap/tidb-dashboard v0.0.0-20220117082709-e8076b5c79ba/go.mod h1:4hk/3owVGWdvI9Kx6yCqqvM1T5PVgwyQNyMQxD3rwfc= -github.com/pingcap/tidb-tools v6.0.1-0.20220516050036-b3ea358e374a+incompatible h1:dAfZKHR4Wu3bdFDNDsH2c4fWrkqbkgVlwY5pC4Nmp6Q= -github.com/pingcap/tidb-tools v6.0.1-0.20220516050036-b3ea358e374a+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnwf3hiTeqrV6MN/u1se9N8yIM= +github.com/pingcap/tidb-tools v6.1.1-0.20220715000306-1d2f00da8c3e+incompatible h1:ftmrSd7avCEdTOkWx3O0UkS4yTBrlKQweRF8uqz9+No= +github.com/pingcap/tidb-tools v6.1.1-0.20220715000306-1d2f00da8c3e+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnwf3hiTeqrV6MN/u1se9N8yIM= github.com/pingcap/tidb/parser v0.0.0-20211011031125-9b13dc409c5e/go.mod h1:e1MGCA9Sg3T8jid8PKAEq5eYVuMMCq4n8gJ+Kqp4Plg= github.com/pingcap/tidb/parser v0.0.0-20220511160835-98c31070d958/go.mod h1:ElJiub4lRy6UZDb+0JHDkGEdr6aOli+ykhyej7VCLoI= github.com/pingcap/tidb/parser v0.0.0-20220713065705-0e13d5d00990 h1:+hpuMOWRk8zGw1Oyz9jBg63dyey/iOM/obwZSZZMheo= diff --git a/pkg/cmd/cli/cli_changefeed_create.go b/pkg/cmd/cli/cli_changefeed_create.go index 49e010469ed..3d9dac14ba6 100644 --- a/pkg/cmd/cli/cli_changefeed_create.go +++ b/pkg/cmd/cli/cli_changefeed_create.go @@ -15,6 +15,7 @@ package cli import ( "context" + "fmt" "net/url" "strings" "time" @@ -97,7 +98,7 @@ func (o *changefeedCommonOptions) strictDecodeConfig(component string, cfg *conf return err } - _, err = filter.VerifyRules(cfg.Filter) + _, err = filter.VerifyTableRules(cfg.Filter) return err } @@ -291,7 +292,7 @@ func (o *createChangefeedOptions) run(ctx context.Context, cmd *cobra.Command) e } if !o.commonChangefeedOptions.noConfirm { - if err := confirmLargeDataGap(cmd, tso.Timestamp, o.startTs); err != nil { + if err = confirmLargeDataGap(cmd, tso.Timestamp, o.startTs); err != nil { return err } } @@ -312,6 +313,15 @@ func (o *createChangefeedOptions) run(ctx context.Context, cmd *cobra.Command) e tables, err := o.apiClient.Changefeeds().VerifyTable(ctx, verifyTableConfig) if err != nil { + if strings.Contains(err.Error(), "ErrInvalidIgnoreEventType") { + supportedEventTypes := filter.SupportedEventTypes() + eventTypesStr := make([]string, 0, len(supportedEventTypes)) + for _, eventType := range supportedEventTypes { + eventTypesStr = append(eventTypesStr, string(eventType)) + } + cmd.Println(fmt.Sprintf("Invalid input, 'ignore-event' parameters can only accept [%s]", + strings.Join(eventTypesStr, ", "))) + } return err } @@ -340,6 +350,15 @@ func (o *createChangefeedOptions) run(ctx context.Context, cmd *cobra.Command) e info, err := o.apiClient.Changefeeds().Create(ctx, createChangefeedCfg) if err != nil { + if strings.Contains(err.Error(), "ErrInvalidIgnoreEventType") { + supportedEventTypes := filter.SupportedEventTypes() + eventTypesStr := make([]string, 0, len(supportedEventTypes)) + for _, eventType := range supportedEventTypes { + eventTypesStr = append(eventTypesStr, string(eventType)) + } + cmd.Println(fmt.Sprintf("Invalid input, 'ignore-event' parameters can only accept [%s]", + strings.Join(eventTypesStr, ", "))) + } return err } infoStr, err := info.Marshal() diff --git a/pkg/errors/cdc_errors.go b/pkg/errors/cdc_errors.go index 7fff6764600..163094784a1 100644 --- a/pkg/errors/cdc_errors.go +++ b/pkg/errors/cdc_errors.go @@ -1081,7 +1081,7 @@ var ( errors.RFCCodeText("CDC:ErrInvalidFilterExpression"), ) ErrExpressionColumnNotFound = errors.Normalize( - "invalid filter expressions. Can not found column '%s' from table '%s' in: %s", + "invalid filter expression(s). Cannot find column '%s' from table '%s' in: %s", errors.RFCCodeText("CDC:ErrExpressionColumnNotFound"), ) ErrInvalidIgnoreEventType = errors.Normalize( diff --git a/pkg/errors/helper.go b/pkg/errors/helper.go index 2fb0c7ab091..3a32d05dc53 100644 --- a/pkg/errors/helper.go +++ b/pkg/errors/helper.go @@ -68,13 +68,13 @@ func IsChangefeedFastFailErrorCode(errCode errors.RFCErrorCode) bool { return false } -var changefeedNotRetryErrors = []*errors.Error{ +var changefeedUnRetryableErrors = []*errors.Error{ ErrExpressionColumnNotFound, ErrExpressionParseFailed, } -// IsChangefeedNotRetryError returns true if a error is a changefeed not retry error. -func IsChangefeedNotRetryError(err error) bool { - for _, e := range changefeedNotRetryErrors { +// IsChangefeedUnRetryableError returns true if a error is a changefeed not retry error. +func IsChangefeedUnRetryableError(err error) bool { + for _, e := range changefeedUnRetryableErrors { if e.Equal(err) { return true } diff --git a/pkg/errors/helper_test.go b/pkg/errors/helper_test.go index 5db04432286..d64e907bd9c 100644 --- a/pkg/errors/helper_test.go +++ b/pkg/errors/helper_test.go @@ -171,6 +171,6 @@ func TestChangefeedNotRetryError(t *testing.T) { } for _, c := range cases { - require.Equal(t, c.expected, IsChangefeedNotRetryError(c.err)) + require.Equal(t, c.expected, IsChangefeedUnRetryableError(c.err)) } } diff --git a/pkg/filter/expr_filter.go b/pkg/filter/expr_filter.go index e1fbb4840d8..da5c96d1e85 100644 --- a/pkg/filter/expr_filter.go +++ b/pkg/filter/expr_filter.go @@ -20,6 +20,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/pingcap/tidb/expression" + "github.com/pingcap/tidb/parser" "github.com/pingcap/tidb/planner/core" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/types" @@ -77,6 +78,32 @@ func newExprFilterRule( // verifyAndInitRule will verify and init the rule. // It should only be called in dmlExprFilter's verify method. func (r *dmlExprFilterRule) verify(tableInfos []*model.TableInfo) error { + // verify expression filter rule syntax. + p := parser.New() + _, _, err := p.ParseSQL(completeExpression(r.config.IgnoreInsertValueExpr)) + if err != nil { + log.Error("failed to parse expression", zap.Error(err)) + return cerror.ErrExpressionParseFailed. + FastGenByArgs(r.config.IgnoreInsertValueExpr) + } + _, _, err = p.ParseSQL(completeExpression(r.config.IgnoreUpdateNewValueExpr)) + if err != nil { + log.Error("failed to parse expression", zap.Error(err)) + return cerror.ErrExpressionParseFailed. + FastGenByArgs(r.config.IgnoreUpdateNewValueExpr) + } + _, _, err = p.ParseSQL(completeExpression(r.config.IgnoreUpdateOldValueExpr)) + if err != nil { + log.Error("failed to parse expression", zap.Error(err)) + return cerror.ErrExpressionParseFailed. + FastGenByArgs(r.config.IgnoreUpdateOldValueExpr) + } + _, _, err = p.ParseSQL(completeExpression(r.config.IgnoreDeleteValueExpr)) + if err != nil { + log.Error("failed to parse expression", zap.Error(err)) + return cerror.ErrExpressionParseFailed. + FastGenByArgs(r.config.IgnoreDeleteValueExpr) + } // verify expression filter rule. for _, ti := range tableInfos { tableName := ti.TableName.String() @@ -209,7 +236,7 @@ func (r *dmlExprFilterRule) getSimpleExprOfTable( zap.String("expression", expr), zap.Error(err)) return nil, cerror.ErrExpressionColumnNotFound. - FastGenByArgs(getColumnFromError(err), ti.TableName.Table, expr) + FastGenByArgs(getColumnFromError(err), ti.TableName.String(), expr) } log.Error("failed to parse expression", zap.Error(err)) return nil, cerror.ErrExpressionParseFailed.FastGenByArgs(err, expr) @@ -388,6 +415,9 @@ func (f *dmlExprFilter) shouldSkipDML( for _, rule := range rules { ignore, err := rule.shouldSkipDML(row, rawRow, ti) if err != nil { + if cerror.IsChangefeedUnRetryableError(err) { + return false, err + } return false, cerror.WrapError(cerror.ErrFailedToFilterDML, err, row) } if ignore { diff --git a/pkg/filter/expr_filter_test.go b/pkg/filter/expr_filter_test.go index e1a3408f27a..372ace442eb 100644 --- a/pkg/filter/expr_filter_test.go +++ b/pkg/filter/expr_filter_test.go @@ -403,7 +403,7 @@ func TestShouldSkipDMLError(t *testing.T) { row: []interface{}{999, "Will", 39, "male"}, ignore: false, err: cerror.ErrExpressionColumnNotFound, - errMsg: "Can not found column 'mather' from table 'student' in", + errMsg: "Cannot find column 'mather' from table 'test.student' in", }, { // update schema: "test", @@ -521,7 +521,7 @@ func TestShouldSkipDMLTableUpdated(t *testing.T) { row: []interface{}{999, "Will", 39, "male"}, ignore: false, err: cerror.ErrExpressionColumnNotFound, - errMsg: "Can not found column 'mather' from table 'student' in", + errMsg: "Cannot find column 'mather' from table 'test.student' in", }, { // insert schema: "test", @@ -622,7 +622,7 @@ func TestShouldSkipDMLTableUpdated(t *testing.T) { row: []interface{}{999, "Will", 39, "male"}, ignore: false, err: cerror.ErrExpressionColumnNotFound, - errMsg: "Can not found column 'company' from table 'worker' in", + errMsg: "Cannot find column 'company' from table 'test.worker' in", }, }, }, @@ -729,7 +729,7 @@ func TestVerify(t *testing.T) { }, }, err: cerror.ErrExpressionColumnNotFound, - errMsg: "Can not found column 'company' from table 'child' in", + errMsg: "Cannot find column 'company' from table 'test.child' in", }, { ddls: []string{ diff --git a/pkg/filter/filter.go b/pkg/filter/filter.go index 99ae053e99f..63428fca628 100644 --- a/pkg/filter/filter.go +++ b/pkg/filter/filter.go @@ -15,86 +15,111 @@ package filter import ( timodel "github.com/pingcap/tidb/parser/model" - filterV1 "github.com/pingcap/tidb/util/filter" - filterV2 "github.com/pingcap/tidb/util/table-filter" + tfilter "github.com/pingcap/tidb/util/table-filter" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/pkg/config" - cerror "github.com/pingcap/tiflow/pkg/errors" ) -// Filter is an event filter implementation. -type Filter struct { - // tableFilter is used to filter row event by table name. - tableFilter filterV2.Filter +// Filter are safe for concurrent use. +// TODO: find a better way to abstract this interface. +type Filter interface { + // ShouldIgnoreDMLEvent returns true and nil if the DML event should be ignored. + ShouldIgnoreDMLEvent(dml *model.RowChangedEvent, rawRow model.RowChangedDatums, tableInfo *model.TableInfo) (bool, error) + // ShouldIgnoreDDLEvent returns true and nil if the DDL event should be ignored. + // If a ddl is ignored, it will applied to cdc's schema storage, + // but not sent to downstream. + ShouldIgnoreDDLEvent(ddl *model.DDLEvent) (bool, error) + // ShouldDiscardDDL returns true if this DDL should be discarded. + // If a ddl is discarded, it will neither be applied to cdc's schema storage + // nor sent to downstream. + ShouldDiscardDDL(ddlType timodel.ActionType) bool + // ShouldIgnoreTable returns true if the table should be ignored. + ShouldIgnoreTable(schema, table string) bool + // Verify should only be called by create changefeed OpenAPI. + // Its purpose is to verify the expression filter config. + Verify(tableInfos []*model.TableInfo) error +} + +// filter implements Filter. +type filter struct { + // tableFilter is used to filter in dml/ddl event by table name. + tableFilter tfilter.Filter + // dmlExprFilter is used to filter out dml event by its columns value. + dmlExprFilter *dmlExprFilter + // sqlEventFilter is used to filter out dml/ddl event by its type or query. + sqlEventFilter *sqlEventFilter + // ignoreTxnStartTs is used to filter out dml/ddl event by its starsTs. ignoreTxnStartTs []uint64 ddlAllowlist []timodel.ActionType } -// VerifyRules checks the filter rules in the configuration -// and returns an invalid rule error if the verification fails, otherwise it will return the parsed filter. -func VerifyRules(cfg *config.FilterConfig) (filterV2.Filter, error) { - var f filterV2.Filter - var err error - if len(cfg.Rules) == 0 && cfg.MySQLReplicationRules != nil { - f, err = filterV2.ParseMySQLReplicationRules(cfg.MySQLReplicationRules) - } else { - rules := cfg.Rules - if len(rules) == 0 { - rules = []string{"*.*"} - } - f, err = filterV2.Parse(rules) - } +// NewFilter creates a filter. +func NewFilter(cfg *config.ReplicaConfig, tz string) (Filter, error) { + f, err := VerifyTableRules(cfg.Filter) if err != nil { - return nil, cerror.WrapError(cerror.ErrFilterRuleInvalid, err) + return nil, err } - return f, nil -} + if !cfg.CaseSensitive { + f = tfilter.CaseInsensitive(f) + } -// NewFilter creates a filter. -func NewFilter(cfg *config.ReplicaConfig) (*Filter, error) { - f, err := VerifyRules(cfg.Filter) + dmlExprFilter, err := newExprFilter(tz, cfg.Filter) if err != nil { - return nil, cerror.WrapError(cerror.ErrFilterRuleInvalid, err) + return nil, err } - - if !cfg.CaseSensitive { - f = filterV2.CaseInsensitive(f) + sqlEventFilter, err := newSQLEventFilter(cfg.Filter) + if err != nil { + return nil, err } - return &Filter{ + return &filter{ tableFilter: f, + dmlExprFilter: dmlExprFilter, + sqlEventFilter: sqlEventFilter, ignoreTxnStartTs: cfg.Filter.IgnoreTxnStartTs, ddlAllowlist: cfg.Filter.DDLAllowlist, }, nil } -func (f *Filter) shouldIgnoreStartTs(ts uint64) bool { - for _, ignoreTs := range f.ignoreTxnStartTs { - if ignoreTs == ts { - return true - } +// ShouldIgnoreDMLEvent checks if a DML event should be ignore by conditions below: +// 0. By startTs. +// 1. By table name. +// 2. By type. +// 3. By columns value. +func (f *filter) ShouldIgnoreDMLEvent( + dml *model.RowChangedEvent, + rawRow model.RowChangedDatums, + ti *model.TableInfo, +) (bool, error) { + if f.shouldIgnoreStartTs(dml.StartTs) { + return true, nil } - return false -} -// ShouldIgnoreTable returns true if the specified table should be ignored by this change feed. -// NOTICE: Set `tbl` to an empty string to test against the whole database. -func (f *Filter) ShouldIgnoreTable(db, tbl string) bool { - if isSysSchema(db) { - return true + if f.ShouldIgnoreTable(dml.Table.Schema, dml.Table.Table) { + return true, nil } - return !f.tableFilter.MatchTable(db, tbl) -} -// ShouldIgnoreDMLEvent removes DMLs that's not wanted by this changefeed. -// CDC only supports filtering by database/table now. -func (f *Filter) ShouldIgnoreDMLEvent(ts uint64, schema, table string) bool { - return f.shouldIgnoreStartTs(ts) || f.ShouldIgnoreTable(schema, table) + ignoreByEventType, err := f.sqlEventFilter.shouldSkipDML(dml) + if err != nil { + return false, err + } + if ignoreByEventType { + return true, nil + } + return f.dmlExprFilter.shouldSkipDML(dml, rawRow, ti) } -// ShouldIgnoreDDLEvent removes DDLs that's not wanted by this changefeed. -// CDC only supports filtering by database/table now. -func (f *Filter) ShouldIgnoreDDLEvent(ddl *model.DDLEvent) bool { +// ShouldIgnoreDDLEvent checks if a DDL Event should be ignore by conditions below: +// 0. By startTs. +// 1. By schema name. +// 2. By table name. +// 3. By type. +// 4. By query. +func (f *filter) ShouldIgnoreDDLEvent(ddl *model.DDLEvent) (bool, error) { + if f.shouldIgnoreStartTs(ddl.StartTs) { + return true, nil + } + var shouldIgnoreTableOrSchema bool switch ddl.Type { case timodel.ActionCreateSchema, timodel.ActionDropSchema, @@ -103,15 +128,21 @@ func (f *Filter) ShouldIgnoreDDLEvent(ddl *model.DDLEvent) bool { default: shouldIgnoreTableOrSchema = f.ShouldIgnoreTable(ddl.TableInfo.Schema, ddl.TableInfo.Table) } - - return f.shouldIgnoreStartTs(ddl.StartTs) || shouldIgnoreTableOrSchema + if shouldIgnoreTableOrSchema { + return true, nil + } + return f.sqlEventFilter.shouldSkipDDL(ddl) } // ShouldDiscardDDL returns true if this DDL should be discarded. -func (f *Filter) ShouldDiscardDDL(ddlType timodel.ActionType) bool { - if !f.shouldDiscardByBuiltInDDLAllowlist(ddlType) { +// If a ddl is discarded, it will not be applied to cdc's schema storage +// and sent to downstream. +func (f *filter) ShouldDiscardDDL(ddlType timodel.ActionType) bool { + if !shouldDiscardByBuiltInDDLAllowlist(ddlType) { return false } + // If a ddl is in BuildInDDLAllowList we should check if it was be + // added to filter's ddlAllowList by user. for _, allowDDLType := range f.ddlAllowlist { if allowDDLType == ddlType { return false @@ -120,68 +151,24 @@ func (f *Filter) ShouldDiscardDDL(ddlType timodel.ActionType) bool { return true } -func (f *Filter) shouldDiscardByBuiltInDDLAllowlist(ddlType timodel.ActionType) bool { - /* The following DDL will be filter: - ActionAddForeignKey ActionType = 9 - ActionDropForeignKey ActionType = 10 - ActionRebaseAutoID ActionType = 13 - ActionShardRowID ActionType = 16 - ActionLockTable ActionType = 27 - ActionUnlockTable ActionType = 28 - ActionRepairTable ActionType = 29 - ActionSetTiFlashReplica ActionType = 30 - ActionUpdateTiFlashReplicaStatus ActionType = 31 - ActionCreateSequence ActionType = 34 - ActionAlterSequence ActionType = 35 - ActionDropSequence ActionType = 36 - ActionModifyTableAutoIdCache ActionType = 39 - ActionRebaseAutoRandomBase ActionType = 40 - ActionAlterIndexVisibility ActionType = 41 - ActionExchangeTablePartition ActionType = 42 - ActionAddCheckConstraint ActionType = 43 - ActionDropCheckConstraint ActionType = 44 - ActionAlterCheckConstraint ActionType = 45 - ActionAlterTableAlterPartition ActionType = 46 - - ... Any Action which of value is greater than 46 ... - */ - switch ddlType { - case timodel.ActionCreateSchema, - timodel.ActionDropSchema, - timodel.ActionCreateTable, - timodel.ActionDropTable, - timodel.ActionAddColumn, - timodel.ActionDropColumn, - timodel.ActionAddIndex, - timodel.ActionDropIndex, - timodel.ActionTruncateTable, - timodel.ActionModifyColumn, - timodel.ActionRenameTable, - timodel.ActionRenameTables, - timodel.ActionSetDefaultValue, - timodel.ActionModifyTableComment, - timodel.ActionRenameIndex, - timodel.ActionAddTablePartition, - timodel.ActionDropTablePartition, - timodel.ActionCreateView, - timodel.ActionModifyTableCharsetAndCollate, - timodel.ActionTruncateTablePartition, - timodel.ActionDropView, - timodel.ActionRecoverTable, - timodel.ActionModifySchemaCharsetAndCollate, - timodel.ActionAddPrimaryKey, - timodel.ActionDropPrimaryKey, - timodel.ActionAddColumns, // Removed in TiDB v6.2.0, see https://github.com/pingcap/tidb/pull/35862. - timodel.ActionDropColumns, // Removed in TiDB v6.2.0 - timodel.ActionRebaseAutoID, - timodel.ActionAlterIndexVisibility, - timodel.ActionMultiSchemaChange: - return false +// ShouldIgnoreTable returns true if the specified table should be ignored by this change feed. +// NOTICE: Set `tbl` to an empty string to test against the whole database. +func (f *filter) ShouldIgnoreTable(db, tbl string) bool { + if isSysSchema(db) { + return true } - return true + return !f.tableFilter.MatchTable(db, tbl) } -// isSysSchema returns true if the given schema is a system schema -func isSysSchema(db string) bool { - return filterV1.IsSystemSchema(db) +func (f *filter) Verify(tableInfos []*model.TableInfo) error { + return f.dmlExprFilter.verify(tableInfos) +} + +func (f *filter) shouldIgnoreStartTs(ts uint64) bool { + for _, ignoreTs := range f.ignoreTxnStartTs { + if ignoreTs == ts { + return true + } + } + return false } diff --git a/pkg/filter/filter_test.go b/pkg/filter/filter_test.go index 132fe1e95c2..c72604eef5e 100644 --- a/pkg/filter/filter_test.go +++ b/pkg/filter/filter_test.go @@ -19,6 +19,7 @@ import ( "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/pkg/config" + bf "github.com/pingcap/tidb-tools/pkg/binlog-filter" timodel "github.com/pingcap/tidb/parser/model" "github.com/stretchr/testify/require" ) @@ -26,7 +27,7 @@ import ( func TestShouldUseDefaultRules(t *testing.T) { t.Parallel() - filter, err := NewFilter(config.GetDefaultReplicaConfig()) + filter, err := NewFilter(config.GetDefaultReplicaConfig(), "") require.Nil(t, err) require.True(t, filter.ShouldIgnoreTable("information_schema", "")) require.True(t, filter.ShouldIgnoreTable("information_schema", "statistics")) @@ -43,7 +44,7 @@ func TestShouldUseCustomRules(t *testing.T) { Filter: &config.FilterConfig{ Rules: []string{"sns.*", "ecom.*", "!sns.log", "!ecom.test"}, }, - }) + }, "") require.Nil(t, err) require.True(t, filter.ShouldIgnoreTable("other", "")) require.True(t, filter.ShouldIgnoreTable("other", "what")) @@ -53,9 +54,24 @@ func TestShouldUseCustomRules(t *testing.T) { require.True(t, filter.ShouldIgnoreTable("ecom", "test")) require.True(t, filter.ShouldIgnoreTable("sns", "log")) require.True(t, filter.ShouldIgnoreTable("information_schema", "")) + + filter, err = NewFilter(&config.ReplicaConfig{ + Filter: &config.FilterConfig{ + // 1. match all schema and table + // 2. do not match test.season + // 3. match all table of schema school + // 4. do not match table school.teacher + Rules: []string{"*.*", "!test.season", "school.*", "!school.teacher"}, + }, + }, "") + require.True(t, filter.ShouldIgnoreTable("test", "season")) + require.False(t, filter.ShouldIgnoreTable("other", "")) + require.False(t, filter.ShouldIgnoreTable("school", "student")) + require.True(t, filter.ShouldIgnoreTable("school", "teacher")) + require.Nil(t, err) } -func TestShouldIgnoreTxn(t *testing.T) { +func TestShouldIgnoreDMLEvent(t *testing.T) { t.Parallel() testCases := []struct { @@ -109,15 +125,16 @@ func TestShouldIgnoreTxn(t *testing.T) { IgnoreTxnStartTs: ftc.ignoreTxnStartTs, Rules: ftc.rules, }, - }) + }, "") require.Nil(t, err) for _, tc := range ftc.cases { - require.Equal(t, tc.ignore, filter.ShouldIgnoreDMLEvent(tc.ts, tc.schema, tc.table)) - ddl := &model.DDLEvent{ - StartTs: tc.ts, Type: timodel.ActionCreateTable, - TableInfo: &model.SimpleTableInfo{Schema: tc.schema, Table: tc.table}, + dml := &model.RowChangedEvent{ + Table: &model.TableName{Table: tc.table, Schema: tc.schema}, + StartTs: tc.ts, } - require.Equal(t, tc.ignore, filter.ShouldIgnoreDDLEvent(ddl), "%#v", tc) + ignoreDML, err := filter.ShouldIgnoreDMLEvent(dml, model.RowChangedDatums{}, nil) + require.Nil(t, err) + require.Equal(t, ignoreDML, tc.ignore) } } } @@ -130,7 +147,7 @@ func TestShouldDiscardDDL(t *testing.T) { DDLAllowlist: []timodel.ActionType{timodel.ActionAddForeignKey}, }, } - filter, err := NewFilter(config) + filter, err := NewFilter(config, "") require.Nil(t, err) require.False(t, filter.ShouldDiscardDDL(timodel.ActionAddForeignKey)) @@ -172,61 +189,143 @@ func TestShouldDiscardDDL(t *testing.T) { func TestShouldIgnoreDDL(t *testing.T) { t.Parallel() - testCases := []struct { cases []struct { + startTs uint64 schema string table string + query string ddlType timodel.ActionType ignore bool } - rules []string + rules []string + ignoredTs []uint64 + eventFilters []*config.EventFilterRule }{{ + // Ignore by table name cases. cases: []struct { + startTs uint64 schema string table string + query string ddlType timodel.ActionType ignore bool }{ - {"sns", "", timodel.ActionCreateSchema, false}, - {"sns", "", timodel.ActionDropSchema, false}, - {"sns", "", timodel.ActionModifySchemaCharsetAndCollate, false}, - {"ecom", "", timodel.ActionCreateSchema, false}, - {"ecom", "aa", timodel.ActionCreateTable, false}, - {"ecom", "", timodel.ActionCreateSchema, false}, - {"test", "", timodel.ActionCreateSchema, true}, + {1, "sns", "", "create database test", timodel.ActionCreateSchema, false}, + {1, "sns", "", "drop database test", timodel.ActionDropSchema, false}, + {1, "sns", "", "ALTER DATABASE dbname CHARACTER SET utf8 COLLATE utf8_general_ci", timodel.ActionModifySchemaCharsetAndCollate, false}, + {1, "ecom", "", "create database test", timodel.ActionCreateSchema, false}, + {1, "ecom", "aa", "create table test.t1(a int primary key)", timodel.ActionCreateTable, false}, + {1, "ecom", "", "create database test", timodel.ActionCreateSchema, false}, + {1, "test", "", "create database test", timodel.ActionCreateSchema, true}, }, rules: []string{"sns.*", "ecom.*", "!sns.log", "!ecom.test"}, + // Ignore by schema name cases. }, { cases: []struct { + startTs uint64 schema string table string + query string ddlType timodel.ActionType ignore bool }{ - {"sns", "", timodel.ActionCreateSchema, false}, - {"sns", "", timodel.ActionDropSchema, false}, - {"sns", "", timodel.ActionModifySchemaCharsetAndCollate, false}, - {"sns", "aa", timodel.ActionCreateTable, true}, - {"sns", "C1", timodel.ActionCreateTable, false}, - {"sns", "", timodel.ActionCreateTable, true}, + {1, "schema", "C1", "create database test", timodel.ActionCreateSchema, false}, + {1, "schema", "", "drop database test", timodel.ActionDropSchema, true}, + {1, "schema", "", "ALTER DATABASE dbname CHARACTER SET utf8 COLLATE utf8_general_ci", timodel.ActionModifySchemaCharsetAndCollate, true}, + {1, "schema", "aa", "create table test.t1(a int primary key)", timodel.ActionCreateTable, true}, + {1, "schema", "C1", "create table test.t1(a int primary key)", timodel.ActionCreateTable, false}, + {1, "schema", "", "create table test.t1(a int primary key)", timodel.ActionCreateTable, true}, + }, + rules: []string{"schema.C1"}, + }, { // cases ignore by startTs + cases: []struct { + startTs uint64 + schema string + table string + query string + ddlType timodel.ActionType + ignore bool + }{ + {1, "ts", "", "create database test", timodel.ActionCreateSchema, true}, + {2, "ts", "student", "drop database test", timodel.ActionDropSchema, true}, + {3, "ts", "teacher", "ALTER DATABASE dbname CHARACTER SET utf8 COLLATE utf8_general_ci", timodel.ActionModifySchemaCharsetAndCollate, true}, + {4, "ts", "man", "create table test.t1(a int primary key)", timodel.ActionCreateTable, false}, + {5, "ts", "fruit", "create table test.t1(a int primary key)", timodel.ActionCreateTable, false}, + {6, "ts", "insect", "create table test.t1(a int primary key)", timodel.ActionCreateTable, false}, + }, + rules: []string{"*.*"}, + ignoredTs: []uint64{1, 2, 3}, + }, { // cases ignore by ddl type. + cases: []struct { + startTs uint64 + schema string + table string + query string + ddlType timodel.ActionType + ignore bool + }{ + {1, "event", "", "drop table t1", timodel.ActionDropTable, true}, + {1, "event", "January", "drop index i on t1", timodel.ActionDropIndex, true}, + {1, "event", "February", "drop index x2 on t2", timodel.ActionDropIndex, true}, + {1, "event", "March", "create table t2(age int)", timodel.ActionCreateTable, false}, + {1, "event", "April", "create table t2(age int)", timodel.ActionCreateTable, false}, + {1, "event", "May", "create table t2(age int)", timodel.ActionCreateTable, false}, + }, + rules: []string{"*.*"}, + eventFilters: []*config.EventFilterRule{ + { + Matcher: []string{"event.*"}, + IgnoreEvent: []bf.EventType{ + bf.AlterTable, bf.DropTable, + }, + }, + }, + }, { // cases ignore by ddl query + cases: []struct { + startTs uint64 + schema string + table string + query string + ddlType timodel.ActionType + ignore bool + }{ + {1, "sql_pattern", "t1", "CREATE DATABASE sql_pattern", timodel.ActionCreateSchema, false}, + {1, "sql_pattern", "t1", "DROP DATABASE sql_pattern", timodel.ActionDropSchema, true}, + { + 1, "sql_pattern", "t1", + "ALTER DATABASE `test_db` CHARACTER SET 'utf8' COLLATE 'utf8_general_ci'", + timodel.ActionModifySchemaCharsetAndCollate, + false, + }, + {1, "sql_pattern", "t1", "CREATE TABLE t1(id int primary key)", timodel.ActionCreateTable, false}, + {1, "sql_pattern", "t1", "DROP TABLE t1", timodel.ActionDropTable, true}, + {1, "sql_pattern", "t1", "CREATE VIEW test.v AS SELECT * FROM t", timodel.ActionCreateView, true}, + }, + rules: []string{"*.*"}, + eventFilters: []*config.EventFilterRule{ + { + Matcher: []string{"sql_pattern.*"}, + IgnoreSQL: []string{"^DROP TABLE", "^CREATE VIEW", "^DROP DATABASE"}, + }, }, - rules: []string{"sns.C1"}, }} + for _, ftc := range testCases { filter, err := NewFilter(&config.ReplicaConfig{ Filter: &config.FilterConfig{ - IgnoreTxnStartTs: []uint64{}, Rules: ftc.rules, + IgnoreTxnStartTs: ftc.ignoredTs, + EventFilters: ftc.eventFilters, }, - }) + }, "") require.Nil(t, err) for _, tc := range ftc.cases { - ddl := &model.DDLEvent{ - StartTs: 1, Type: tc.ddlType, - TableInfo: &model.SimpleTableInfo{Schema: tc.schema, Table: tc.table}, - } - require.Equal(t, filter.ShouldIgnoreDDLEvent(ddl), tc.ignore, "%#v", tc) + tableInfo := &model.SimpleTableInfo{Schema: tc.schema, Table: tc.table} + ddl := &model.DDLEvent{StartTs: tc.startTs, TableInfo: tableInfo, Query: tc.query} + ignore, err := filter.ShouldIgnoreDDLEvent(ddl) + require.Nil(t, err, "%#v", tc) + require.Equal(t, tc.ignore, ignore, "%#v", tc) } } } diff --git a/pkg/filter/sql_event_filter.go b/pkg/filter/sql_event_filter.go new file mode 100644 index 00000000000..8518377e1e9 --- /dev/null +++ b/pkg/filter/sql_event_filter.go @@ -0,0 +1,216 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package filter + +import ( + "github.com/pingcap/errors" + "github.com/pingcap/log" + bf "github.com/pingcap/tidb-tools/pkg/binlog-filter" + "github.com/pingcap/tidb/parser" + tfilter "github.com/pingcap/tidb/util/table-filter" + "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/pkg/config" + cerror "github.com/pingcap/tiflow/pkg/errors" + "go.uber.org/zap" +) + +const ( + // binlogFilterSchemaPlaceholder is a place holder for schema name in binlog filter. + // Since we use table filter in rule as a matcher to match a dml/ddl event's schema and table, + // so we don't need to care about schema name when we calling binlog filter's method, + // we just use this place holder to call binlog filter's method whenever we need pass a schema. + binlogFilterSchemaPlaceholder = "binlogFilterSchema" + // binlogFilterTablePlaceholder is a place holder for table name in binlog filter. + // The reason we need it is the same as binlogFilterSchemaPlaceholder. + binlogFilterTablePlaceholder = "binlogFilterTable" + // dmlQuery is a place holder to call binlog filter to filter dml event. + dmlQuery = "" + // caseSensitive is use to create bf.BinlogEvent. + caseSensitive = false +) + +// sqlEventRule only be used by sqlEventFilter. +type sqlEventRule struct { + // we use table filter to match a dml/ddl event's schema and table. + // since binlog filter does not support syntax like `!test.t1`, + // which means not match `test.t1`. + tf tfilter.Filter + bf *bf.BinlogEvent +} + +func newSQLEventFilterRule(cfg *config.EventFilterRule) (*sqlEventRule, error) { + tf, err := tfilter.Parse(cfg.Matcher) + if err != nil { + return nil, cerror.WrapError(cerror.ErrFilterRuleInvalid, err, cfg.Matcher) + } + + res := &sqlEventRule{ + tf: tf, + } + + if err := verifyIgnoreEvents(cfg.IgnoreEvent); err != nil { + return nil, err + } + + bfRule := &bf.BinlogEventRule{ + SchemaPattern: binlogFilterSchemaPlaceholder, + TablePattern: binlogFilterTablePlaceholder, + Events: cfg.IgnoreEvent, + SQLPattern: cfg.IgnoreSQL, + Action: bf.Ignore, + } + + res.bf, err = bf.NewBinlogEvent(caseSensitive, []*bf.BinlogEventRule{bfRule}) + if err != nil { + return nil, cerror.WrapError(cerror.ErrFilterRuleInvalid, err, "failed to create binlog event filter") + } + + return res, nil +} + +func verifyIgnoreEvents(types []bf.EventType) error { + typesMap := make(map[bf.EventType]struct{}, len(supportedEventTypes)) + for _, et := range supportedEventTypes { + typesMap[et] = struct{}{} + } + for _, et := range types { + if _, ok := typesMap[et]; !ok { + return cerror.ErrInvalidIgnoreEventType.GenWithStackByArgs(string(et)) + } + } + return nil +} + +// sqlEventFilter is a filter that filters DDL/DML event by its type or query. +type sqlEventFilter struct { + p *parser.Parser + rules []*sqlEventRule +} + +func newSQLEventFilter(cfg *config.FilterConfig) (*sqlEventFilter, error) { + res := &sqlEventFilter{ + p: parser.New(), + } + for _, rule := range cfg.EventFilters { + if err := res.addRule(rule); err != nil { + return nil, errors.Trace(err) + } + } + return res, nil +} + +func (f *sqlEventFilter) addRule(cfg *config.EventFilterRule) error { + rule, err := newSQLEventFilterRule(cfg) + if err != nil { + return errors.Trace(err) + } + f.rules = append(f.rules, rule) + return nil +} + +func (f *sqlEventFilter) getRules(schema, table string) []*sqlEventRule { + res := make([]*sqlEventRule, 0) + for _, rule := range f.rules { + if len(table) == 0 { + if rule.tf.MatchSchema(schema) { + res = append(res, rule) + } + } else { + if rule.tf.MatchTable(schema, table) { + res = append(res, rule) + } + } + } + return res +} + +// skipDDLEvent skips ddl event by its type and query. +func (f *sqlEventFilter) shouldSkipDDL(ddl *model.DDLEvent) (bool, error) { + evenType, err := ddlToEventType(f.p, ddl.Query, ddl.Type) + if err != nil { + return false, err + } + if evenType == bf.NullEvent { + log.Warn("sql event filter unsupported ddl type, do nothing", + zap.String("type", ddl.Type.String()), + zap.String("query", ddl.Query)) + return false, nil + } + + rules := f.getRules(ddl.TableInfo.Schema, ddl.TableInfo.Table) + for _, rule := range rules { + action, err := rule.bf.Filter(binlogFilterSchemaPlaceholder, binlogFilterTablePlaceholder, evenType, ddl.Query) + if err != nil { + return false, errors.Trace(err) + } + if action == bf.Ignore { + return true, nil + } + } + return false, nil +} + +// shouldSkipDML skips dml event by its type. +func (f *sqlEventFilter) shouldSkipDML(event *model.RowChangedEvent) (bool, error) { + var et bf.EventType + switch { + case event.IsInsert(): + et = bf.InsertEvent + case event.IsUpdate(): + et = bf.UpdateEvent + case event.IsDelete(): + et = bf.DeleteEvent + default: + // It should never happen. + log.Warn("unknown row changed event type") + return false, nil + } + rules := f.getRules(event.Table.Schema, event.Table.Table) + for _, rule := range rules { + action, err := rule.bf.Filter(binlogFilterSchemaPlaceholder, binlogFilterTablePlaceholder, et, dmlQuery) + if err != nil { + return false, cerror.WrapError(cerror.ErrFailedToFilterDML, err, event) + } + if action == bf.Ignore { + return true, nil + } + } + return false, nil +} + +var supportedEventTypes = []bf.EventType{ + bf.AllDML, + bf.AllDDL, + + // dml events + bf.InsertEvent, + bf.UpdateEvent, + bf.DeleteEvent, + + // ddl events + bf.CreateSchema, + bf.CreateDatabase, + bf.DropSchema, + bf.DropDatabase, + bf.CreateTable, + bf.DropTable, + bf.RenameTable, + bf.TruncateTable, + bf.AlterTable, + bf.CreateView, + bf.DropView, + bf.AddTablePartition, + bf.DropTablePartition, + bf.TruncateTablePartition, +} diff --git a/pkg/filter/sql_event_filter_test.go b/pkg/filter/sql_event_filter_test.go new file mode 100644 index 00000000000..be409d5d9a3 --- /dev/null +++ b/pkg/filter/sql_event_filter_test.go @@ -0,0 +1,347 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package filter + +import ( + "testing" + + "github.com/pingcap/errors" + bf "github.com/pingcap/tidb-tools/pkg/binlog-filter" + "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/pkg/config" + cerror "github.com/pingcap/tiflow/pkg/errors" + "github.com/stretchr/testify/require" +) + +func TestShouldSkipDDL(t *testing.T) { + t.Parallel() + type innerCase struct { + schema string + table string + query string + skip bool + } + + type testCase struct { + cfg *config.FilterConfig + cases []innerCase + err error + } + + testCases := []testCase{ + { + cfg: &config.FilterConfig{ + EventFilters: []*config.EventFilterRule{ + { + Matcher: []string{"test.t1"}, + IgnoreEvent: []bf.EventType{bf.AllDDL}, + }, + }, + }, + cases: []innerCase{ + { + schema: "test", + table: "t1", + query: "alter table t1 modify column age int", + skip: true, + }, + { + schema: "test", + table: "t1", + query: "create table t1(id int primary key)", + skip: true, + }, + { + schema: "test", + table: "t2", // table name not match + query: "alter table t2 modify column age int", + skip: false, + }, + { + schema: "test2", // schema name not match + table: "t1", + query: "alter table t1 modify column age int", + skip: false, + }, + }, + }, + { + cfg: &config.FilterConfig{ + EventFilters: []*config.EventFilterRule{ + { + Matcher: []string{"*.t1"}, + IgnoreEvent: []bf.EventType{bf.DropDatabase, bf.DropSchema}, + }, + }, + }, + cases: []innerCase{ + { + schema: "test", + table: "t1", + query: "alter table t1 modify column age int", + skip: false, + }, + { + schema: "test", + table: "t1", + query: "alter table t1 drop column age", + skip: false, + }, + { + schema: "test2", + table: "t1", + query: "drop database test2", + skip: true, + }, + { + schema: "test3", + table: "t1", + query: "drop index i3 on t1", + skip: false, + }, + }, + }, + { + cfg: &config.FilterConfig{ + EventFilters: []*config.EventFilterRule{ + { + Matcher: []string{"*.t1"}, + IgnoreSQL: []string{"MODIFY COLUMN", "DROP COLUMN", "^DROP DATABASE"}, + }, + }, + }, + cases: []innerCase{ + { + schema: "test", + table: "t1", + query: "ALTER TABLE t1 MODIFY COLUMN age int(11) NOT NULL", + skip: true, + }, + { + schema: "test", + table: "t1", + query: "ALTER TABLE t1 DROP COLUMN age", + skip: true, + }, + { // no table name + schema: "test2", + query: "DROP DATABASE test", + skip: true, + }, + { + schema: "test3", + table: "t1", + query: "Drop Index i1 on test3.t1", + skip: false, + }, + }, + }, + { // config error + cfg: &config.FilterConfig{ + EventFilters: []*config.EventFilterRule{ + { + Matcher: []string{"*.t1"}, + IgnoreEvent: []bf.EventType{bf.EventType("aa")}, + }, + }, + }, + err: cerror.ErrInvalidIgnoreEventType, + }, + { + cfg: &config.FilterConfig{ + EventFilters: []*config.EventFilterRule{ + { + Matcher: []string{"*.t1"}, + IgnoreSQL: []string{"--6"}, // this is a valid regx + }, + }, + }, + }, + } + + for _, tc := range testCases { + f, err := newSQLEventFilter(tc.cfg) + require.True(t, errors.ErrorEqual(err, tc.err), "case: %+s", err) + for _, c := range tc.cases { + ddl := &model.DDLEvent{ + TableInfo: &model.SimpleTableInfo{ + Schema: c.schema, + Table: c.table, + }, + Query: c.query, + } + skip, err := f.shouldSkipDDL(ddl) + require.NoError(t, err) + require.Equal(t, c.skip, skip, "case: %+v", c) + } + } +} + +func TestShouldSkipDML(t *testing.T) { + t.Parallel() + type innerCase struct { + schema string + table string + preColumns string + columns string + skip bool + } + + type testCase struct { + name string + cfg *config.FilterConfig + cases []innerCase + } + + testCases := []testCase{ + { + name: "dml-filter-test", + cfg: &config.FilterConfig{ + EventFilters: []*config.EventFilterRule{ + { + Matcher: []string{"test1.allDml"}, + IgnoreEvent: []bf.EventType{bf.AllDML}, + }, + { + Matcher: []string{"test2.insert"}, + IgnoreEvent: []bf.EventType{bf.InsertEvent}, + }, + { + Matcher: []string{"*.delete"}, + IgnoreEvent: []bf.EventType{bf.DeleteEvent}, + }, + }, + }, + cases: []innerCase{ + { // match test1.allDML + schema: "test1", + table: "allDml", + columns: "insert", + skip: true, + }, + { + schema: "test1", + table: "allDml", + preColumns: "delete", + skip: true, + }, + { + schema: "test1", + table: "allDml", + preColumns: "update", + columns: "update", + skip: true, + }, + { // not match + schema: "test", + table: "t1", + columns: "insert", + skip: false, + }, + { // match test2.insert + schema: "test2", + table: "insert", + columns: "insert", + skip: true, + }, + { + schema: "test2", + table: "insert", + preColumns: "delete", + skip: false, + }, + { + schema: "test2", + table: "insert", + preColumns: "update", + columns: "update", + skip: false, + }, + { + schema: "noMatter", + table: "delete", + columns: "insert", + skip: false, + }, + { + schema: "noMatter", + table: "delete", + preColumns: "update", + columns: "update", + skip: false, + }, + { + schema: "noMatter", + table: "delete", + preColumns: "delete", + skip: true, + }, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + f, err := newSQLEventFilter(tc.cfg) + require.NoError(t, err) + for _, c := range tc.cases { + event := &model.RowChangedEvent{ + Table: &model.TableName{ + Schema: c.schema, + Table: c.table, + }, + } + if c.columns != "" { + event.Columns = []*model.Column{{Value: c.columns}} + } + if c.preColumns != "" { + event.PreColumns = []*model.Column{{Value: c.preColumns}} + } + skip, err := f.shouldSkipDML(event) + require.NoError(t, err) + require.Equal(t, c.skip, skip, "case: %+v", c) + } + }) + } +} + +func TestVerifyIgnoreEvents(t *testing.T) { + t.Parallel() + type testCase struct { + ignoreEvent []bf.EventType + err error + } + + cases := make([]testCase, len(supportedEventTypes)) + for i, eventType := range supportedEventTypes { + cases[i] = testCase{ + ignoreEvent: []bf.EventType{eventType}, + err: nil, + } + } + + cases = append(cases, testCase{ + ignoreEvent: []bf.EventType{bf.EventType("unknown")}, + err: cerror.ErrInvalidIgnoreEventType, + }) + + cases = append(cases, testCase{ + ignoreEvent: []bf.EventType{bf.AlterTable}, + err: nil, + }) + + for _, tc := range cases { + require.True(t, errors.ErrorEqual(tc.err, verifyIgnoreEvents(tc.ignoreEvent))) + } +} diff --git a/pkg/filter/utils.go b/pkg/filter/utils.go new file mode 100644 index 00000000000..5809d31a72c --- /dev/null +++ b/pkg/filter/utils.go @@ -0,0 +1,156 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package filter + +import ( + "fmt" + + bf "github.com/pingcap/tidb-tools/pkg/binlog-filter" + "github.com/pingcap/tidb/parser" + timodel "github.com/pingcap/tidb/parser/model" + tifilter "github.com/pingcap/tidb/util/filter" + tfilter "github.com/pingcap/tidb/util/table-filter" + "github.com/pingcap/tiflow/pkg/config" + cerror "github.com/pingcap/tiflow/pkg/errors" +) + +// isSysSchema returns true if the given schema is a system schema +func isSysSchema(db string) bool { + return tifilter.IsSystemSchema(db) +} + +// VerifyTableRules checks the table filter rules in the configuration +// and returns an invalid rule error if the verification fails, +// otherwise it will return a table filter. +func VerifyTableRules(cfg *config.FilterConfig) (tfilter.Filter, error) { + var f tfilter.Filter + var err error + if len(cfg.Rules) == 0 && cfg.MySQLReplicationRules != nil { + f, err = tfilter.ParseMySQLReplicationRules(cfg.MySQLReplicationRules) + } else { + rules := cfg.Rules + if len(rules) == 0 { + rules = []string{"*.*"} + } + f, err = tfilter.Parse(rules) + } + if err != nil { + return nil, cerror.WrapError(cerror.ErrFilterRuleInvalid, err, cfg) + } + + return f, nil +} + +// ddlToEventType get event type from ddl query. +func ddlToEventType(p *parser.Parser, query string, jobType timodel.ActionType) (bf.EventType, error) { + // Since `Parser` will return a AlterTable type `ast.StmtNode` for table partition related DDL, + // we need to check the ActionType of a ddl at first. + switch jobType { + case timodel.ActionAddTablePartition: + return bf.AddTablePartition, nil + case timodel.ActionDropTablePartition: + return bf.DropTablePartition, nil + case timodel.ActionTruncateTablePartition: + return bf.TruncateTablePartition, nil + } + stmt, err := p.ParseOneStmt(query, "", "") + if err != nil { + return bf.NullEvent, cerror.WrapError(cerror.ErrConvertDDLToEventTypeFailed, err, query) + } + et := bf.AstToDDLEvent(stmt) + // `Parser` will return a `AlterTable` type `ast.StmtNode` for a query like: + // `alter table t1 add index (xxx)` and will return a `CreateIndex` type + // `ast.StmtNode` for a query like: `create index i on t1 (xxx)`. + // So we cast index related DDL to `AlterTable` event type for the sake of simplicity. + switch et { + case bf.DropIndex: + return bf.AlterTable, nil + case bf.CreateIndex: + return bf.AlterTable, nil + } + return et, nil +} + +// SupportedEventTypes returns the supported event types. +func SupportedEventTypes() []bf.EventType { + return supportedEventTypes +} + +func shouldDiscardByBuiltInDDLAllowlist(ddlType timodel.ActionType) bool { + /* The following DDL will be filter: + ActionAddForeignKey ActionType = 9 + ActionDropForeignKey ActionType = 10 + ActionRebaseAutoID ActionType = 13 + ActionShardRowID ActionType = 16 + ActionLockTable ActionType = 27 + ActionUnlockTable ActionType = 28 + ActionRepairTable ActionType = 29 + ActionSetTiFlashReplica ActionType = 30 + ActionUpdateTiFlashReplicaStatus ActionType = 31 + ActionCreateSequence ActionType = 34 + ActionAlterSequence ActionType = 35 + ActionDropSequence ActionType = 36 + ActionModifyTableAutoIdCache ActionType = 39 + ActionRebaseAutoRandomBase ActionType = 40 + ActionAlterIndexVisibility ActionType = 41 + ActionExchangeTablePartition ActionType = 42 + ActionAddCheckConstraint ActionType = 43 + ActionDropCheckConstraint ActionType = 44 + ActionAlterCheckConstraint ActionType = 45 + ActionAlterTableAlterPartition ActionType = 46 + + ... Any Action which of value is greater than 46 ... + */ + switch ddlType { + case timodel.ActionCreateSchema, + timodel.ActionDropSchema, + timodel.ActionCreateTable, + timodel.ActionDropTable, + timodel.ActionAddColumn, + timodel.ActionDropColumn, + timodel.ActionAddIndex, + timodel.ActionDropIndex, + timodel.ActionTruncateTable, + timodel.ActionModifyColumn, + timodel.ActionRenameTable, + timodel.ActionRenameTables, + timodel.ActionSetDefaultValue, + timodel.ActionModifyTableComment, + timodel.ActionRenameIndex, + timodel.ActionAddTablePartition, + timodel.ActionDropTablePartition, + timodel.ActionCreateView, + timodel.ActionModifyTableCharsetAndCollate, + timodel.ActionTruncateTablePartition, + timodel.ActionDropView, + timodel.ActionRecoverTable, + timodel.ActionModifySchemaCharsetAndCollate, + timodel.ActionAddPrimaryKey, + timodel.ActionDropPrimaryKey, + timodel.ActionAddColumns, // Removed in TiDB v6.2.0, see https://github.com/pingcap/tidb/pull/35862. + timodel.ActionDropColumns, // Removed in TiDB v6.2.0 + timodel.ActionRebaseAutoID, + timodel.ActionAlterIndexVisibility, + timodel.ActionMultiSchemaChange: + return false + } + return true +} + +func completeExpression(suffix string) string { + if suffix == "" { + return suffix + } + return fmt.Sprintf("select * from t where %s", suffix) +} diff --git a/pkg/filter/utils_test.go b/pkg/filter/utils_test.go new file mode 100644 index 00000000000..0c0c61001f5 --- /dev/null +++ b/pkg/filter/utils_test.go @@ -0,0 +1,159 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package filter + +import ( + "testing" + + bf "github.com/pingcap/tidb-tools/pkg/binlog-filter" + "github.com/pingcap/tidb/parser" + timodel "github.com/pingcap/tidb/parser/model" + tifilter "github.com/pingcap/tidb/util/filter" + "github.com/pingcap/tiflow/pkg/config" + cerror "github.com/pingcap/tiflow/pkg/errors" + "github.com/stretchr/testify/require" +) + +func TestIsSchema(t *testing.T) { + t.Parallel() + cases := []struct { + schema string + result bool + }{ + {"", false}, + {"test", false}, + {"SYS", true}, + {"MYSQL", true}, + {tifilter.InformationSchemaName, true}, + {tifilter.InspectionSchemaName, true}, + {tifilter.PerformanceSchemaName, true}, + {tifilter.MetricSchemaName, true}, + } + for _, c := range cases { + require.Equal(t, c.result, isSysSchema(c.schema)) + } +} + +func TestSupportedEventTypeString(t *testing.T) { + t.Parallel() + require.Equal(t, supportedEventTypes, SupportedEventTypes()) +} + +func TestVerifyTableRules(t *testing.T) { + t.Parallel() + cases := []struct { + cfg *config.FilterConfig + hasError bool + }{ + {&config.FilterConfig{Rules: []string{""}}, false}, + {&config.FilterConfig{Rules: []string{"*.*"}}, false}, + {&config.FilterConfig{Rules: []string{"test.*ms"}}, false}, + {&config.FilterConfig{Rules: []string{"*.889"}}, false}, + {&config.FilterConfig{Rules: []string{"test-a.*", "*.*.*"}}, true}, + {&config.FilterConfig{Rules: []string{"*.*", "*.*.*", "*.*.*.*"}}, true}, + } + for _, c := range cases { + _, err := VerifyTableRules(c.cfg) + require.Equal(t, c.hasError, err != nil, "case: %s", c.cfg.Rules) + } +} + +func TestDDLToEventType(t *testing.T) { + t.Parallel() + cases := []struct { + ddl string + jobType timodel.ActionType + eventType bf.EventType + err error + }{ + {"CREATE DATABASE test", timodel.ActionCreateSchema, bf.CreateDatabase, nil}, + {"DROP DATABASE test", timodel.ActionDropSchema, bf.DropDatabase, nil}, + {"CREATE TABLE test.t1(id int primary key)", timodel.ActionCreateTable, bf.CreateTable, nil}, + {"DROP TABLE test.t1", timodel.ActionDropTable, bf.DropTable, nil}, + {"TRUNCATE TABLE test.t1", timodel.ActionTruncateTable, bf.TruncateTable, nil}, + {"rename table s1.t1 to s2.t2", timodel.ActionRenameTable, bf.RenameTable, nil}, + {"rename table s1.t1 to s2.t2, test.t1 to test.t2", timodel.ActionRenameTables, bf.RenameTable, nil}, + {"create index i1 on test.t1 (age)", timodel.ActionAddIndex, bf.AlterTable, nil}, + {"drop index i1 on test.t1", timodel.ActionDropIndex, bf.AlterTable, nil}, + {"CREATE VIEW test.v AS SELECT * FROM t", timodel.ActionCreateView, bf.CreateView, nil}, + {"DROP view if exists test.v", timodel.ActionDropView, bf.DropView, nil}, + + {"alter table test.t1 add column name varchar(50)", timodel.ActionAddColumn, bf.AlterTable, nil}, + {"alter table test.t1 drop column name", timodel.ActionDropColumn, bf.AlterTable, nil}, + {"alter table test.t1 modify column name varchar(100)", timodel.ActionModifyColumn, bf.AlterTable, nil}, + {"ALTER TABLE test.t1 CONVERT TO CHARACTER SET gbk", timodel.ActionModifyTableCharsetAndCollate, bf.AlterTable, nil}, + {"alter table test add primary key(b)", timodel.ActionAddIndex, bf.AlterTable, nil}, + {"ALTER DATABASE dbname CHARACTER SET utf8 COLLATE utf8_general_ci;", timodel.ActionModifySchemaCharsetAndCollate, bf.AlterDatabase, nil}, + {"Alter table test.t1 drop partition t11", timodel.ActionDropTablePartition, bf.DropTablePartition, nil}, + {"Alter table test.t1 add partition (partition p3 values less than (2002))", timodel.ActionDropTablePartition, bf.DropTablePartition, nil}, + {"Alter table test.t1 truncate partition t11", timodel.ActionDropTablePartition, bf.DropTablePartition, nil}, + {"alter table add i", timodel.ActionAddIndex, bf.NullEvent, cerror.ErrConvertDDLToEventTypeFailed}, + } + p := parser.New() + for _, c := range cases { + et, err := ddlToEventType(p, c.ddl, c.jobType) + if c.err != nil { + errRFC, ok := cerror.RFCCode(err) + require.True(t, ok) + caseErrRFC, ok := cerror.RFCCode(c.err) + require.True(t, ok) + require.Equal(t, caseErrRFC, errRFC) + } else { + require.NoError(t, err) + } + require.Equal(t, c.eventType, et, "case%v", c.ddl) + } +} + +func TestShouldDiscardByBuiltInDDLAllowlist(t *testing.T) { + t.Parallel() + cases := []struct { + jobType timodel.ActionType + discard bool + }{ + {timodel.ActionCreateSchema, false}, + {timodel.ActionDropSchema, false}, + {timodel.ActionCreateTable, false}, + {timodel.ActionDropTable, false}, + {timodel.ActionAddColumn, false}, + {timodel.ActionDropColumn, false}, + {timodel.ActionAddIndex, false}, + {timodel.ActionDropIndex, false}, + {timodel.ActionTruncateTable, false}, + {timodel.ActionModifyColumn, false}, + {timodel.ActionRenameTable, false}, + {timodel.ActionRenameTables, false}, + {timodel.ActionSetDefaultValue, false}, + {timodel.ActionModifyTableComment, false}, + {timodel.ActionRenameIndex, false}, + {timodel.ActionAddTablePartition, false}, + {timodel.ActionDropTablePartition, false}, + {timodel.ActionCreateView, false}, + {timodel.ActionModifyTableCharsetAndCollate, false}, + {timodel.ActionTruncateTablePartition, false}, + {timodel.ActionDropView, false}, + {timodel.ActionRecoverTable, false}, + {timodel.ActionModifySchemaCharsetAndCollate, false}, + {timodel.ActionAddPrimaryKey, false}, + {timodel.ActionDropPrimaryKey, false}, + {timodel.ActionAddColumns, false}, + {timodel.ActionDropColumns, false}, + {timodel.ActionAddForeignKey, true}, // not in built-in DDL allowlist + {timodel.ActionDropForeignKey, true}, // not in built-in DDL allowlist + } + + for _, c := range cases { + require.Equal(t, c.discard, shouldDiscardByBuiltInDDLAllowlist(c.jobType)) + } +} diff --git a/pkg/util/tz.go b/pkg/util/tz.go index fc3a1ec02a4..fbdc2a1bd0e 100644 --- a/pkg/util/tz.go +++ b/pkg/util/tz.go @@ -61,3 +61,11 @@ func GetLocalTimezone() (*time.Location, error) { str := timeutil.InferSystemTZ() return GetTimezoneFromZonefile(str) } + +// GetTimeZoneName returns the timezone name in a time.Location. +func GetTimeZoneName(tz *time.Location) string { + if tz == nil { + return "" + } + return tz.String() +} diff --git a/pkg/util/tz_test.go b/pkg/util/tz_test.go index cf0a5cd413d..9ed91c83f5a 100644 --- a/pkg/util/tz_test.go +++ b/pkg/util/tz_test.go @@ -41,3 +41,10 @@ func TestGetTimezoneFromZonefile(t *testing.T) { } } } + +func TestGetTimezoneName(t *testing.T) { + tz, err := GetTimezone("") + require.NoError(t, err) + require.True(t, len(GetTimeZoneName(tz)) != 0) + require.True(t, len(GetTimeZoneName(nil)) == 0) +}