Skip to content

Commit

Permalink
filter(ticdc): Add row filter. (#6095)
Browse files Browse the repository at this point in the history
close #6160
  • Loading branch information
asddongmen authored Jul 20, 2022
1 parent 0a9577a commit 8f3fd7a
Show file tree
Hide file tree
Showing 46 changed files with 1,598 additions and 285 deletions.
21 changes: 14 additions & 7 deletions cdc/api/v1/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,13 +144,20 @@ func verifyCreateChangefeedConfig(
SyncPointInterval: 10 * time.Minute,
CreatorVersion: version.ReleaseVersion,
}

f, err := filter.NewFilter(replicaConfig, "")
if err != nil {
return nil, err
}
tableInfos, ineligibleTables, _, err := entry.VerifyTables(f,
up.KVStorage, changefeedConfig.StartTS)
if err != nil {
return nil, err
}
err = f.Verify(tableInfos)
if err != nil {
return nil, err
}
if !replicaConfig.ForceReplicate && !changefeedConfig.IgnoreIneligibleTable {
ineligibleTables, _, err := entry.VerifyTables(replicaConfig,
up.KVStorage, changefeedConfig.StartTS)
if err != nil {
return nil, err
}
if len(ineligibleTables) != 0 {
return nil, cerror.ErrTableIneligible.GenWithStackByArgs(ineligibleTables)
}
Expand Down Expand Up @@ -188,7 +195,7 @@ func VerifyUpdateChangefeedConfig(ctx context.Context,
// verify rules
if len(changefeedConfig.FilterRules) != 0 {
newInfo.Config.Filter.Rules = changefeedConfig.FilterRules
_, err = filter.VerifyRules(newInfo.Config.Filter)
_, err = filter.VerifyTableRules(newInfo.Config.Filter)
if err != nil {
return nil, cerror.ErrChangefeedUpdateRefused.GenWithStackByArgs(err.Error())
}
Expand Down
49 changes: 42 additions & 7 deletions cdc/api/v2/api_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ type APIV2Helpers interface {
cfg *ChangefeedConfig,
oldInfo *model.ChangeFeedInfo,
oldUpInfo *model.UpstreamInfo,
kvStorage tidbkv.Storage,
checkpointTs uint64,
) (*model.ChangeFeedInfo, *model.UpstreamInfo, error)

// verifyUpstream verifies the upstreamConfig
Expand Down Expand Up @@ -212,13 +214,19 @@ func (APIV2HelpersImpl) verifyCreateChangefeedConfig(
"if use force replicate, old value feature must be enabled")
}
}
_, err = filter.VerifyRules(replicaCfg.Filter)
f, err := filter.NewFilter(replicaCfg, "")
if err != nil {
return nil, cerror.ErrChangefeedUpdateRefused.GenWithStackByArgs(err.Error())
return nil, errors.Cause(err)
}
tableInfos, ineligibleTables, _, err := entry.VerifyTables(f, kvStorage, cfg.StartTs)
if err != nil {
return nil, errors.Cause(err)
}
err = f.Verify(tableInfos)
if err != nil {
return nil, errors.Cause(err)
}

if !replicaCfg.ForceReplicate && !cfg.ReplicaConfig.IgnoreIneligibleTable {
ineligibleTables, _, err := entry.VerifyTables(replicaCfg, kvStorage, cfg.StartTs)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -279,9 +287,13 @@ func (h APIV2HelpersImpl) verifyUpstream(ctx context.Context,

// verifyUpdateChangefeedConfig verifies config to update
// a changefeed and returns a changefeedInfo
func (APIV2HelpersImpl) verifyUpdateChangefeedConfig(ctx context.Context,
cfg *ChangefeedConfig, oldInfo *model.ChangeFeedInfo,
func (APIV2HelpersImpl) verifyUpdateChangefeedConfig(
ctx context.Context,
cfg *ChangefeedConfig,
oldInfo *model.ChangeFeedInfo,
oldUpInfo *model.UpstreamInfo,
kvStorage tidbkv.Storage,
checkpointTs uint64,
) (*model.ChangeFeedInfo, *model.UpstreamInfo, error) {
newInfo, err := oldInfo.Clone()
if err != nil {
Expand All @@ -308,6 +320,23 @@ func (APIV2HelpersImpl) verifyUpdateChangefeedConfig(ctx context.Context,
newInfo.Config = cfg.ReplicaConfig.ToInternalReplicaConfig()
}

f, err := filter.NewFilter(newInfo.Config, "")
if err != nil {
return nil, nil, cerror.ErrChangefeedUpdateRefused.
GenWithStackByArgs(errors.Cause(err).Error())
}

tableInfos, _, _, err := entry.VerifyTables(f, kvStorage, checkpointTs)
if err != nil {
return nil, nil, cerror.ErrChangefeedUpdateRefused.GenWithStackByCause(err)
}

err = f.Verify(tableInfos)
if err != nil {
return nil, nil, cerror.ErrChangefeedUpdateRefused.
GenWithStackByArgs(errors.Cause(err).Error())
}

// verify SinkURI
if cfg.SinkURI != "" {
newInfo.SinkURI = cfg.SinkURI
Expand Down Expand Up @@ -417,5 +446,11 @@ func (h APIV2HelpersImpl) getVerfiedTables(replicaConfig *config.ReplicaConfig,
storage tidbkv.Storage, startTs uint64) (ineligibleTables,
eligibleTables []model.TableName, err error,
) {
return entry.VerifyTables(replicaConfig, storage, startTs)
f, err := filter.NewFilter(replicaConfig, "")
if err != nil {
return
}
_, ineligibleTables, eligibleTables, err = entry.
VerifyTables(f, storage, startTs)
return
}
8 changes: 4 additions & 4 deletions cdc/api/v2/api_helpers_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 14 additions & 11 deletions cdc/api/v2/api_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,19 @@ import (
"testing"
"time"

tidbkv "github.com/pingcap/tidb/kv"
"github.com/pingcap/tiflow/cdc/entry"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/stretchr/testify/require"
)

type mockStorage struct {
tidbkv.Storage
}

func TestVerifyCreateChangefeedConfig(t *testing.T) {
ctx := context.Background()
pdClient := &mockPDClient{}
storage := &mockStorage{}
helper := entry.NewSchemaTestHelper(t)
helper.Tk().MustExec("use test;")
storage := helper.Storage()
provider := &mockStatusProvider{}
cfg := &ChangefeedConfig{}
h := &APIV2HelpersImpl{}
Expand Down Expand Up @@ -94,17 +92,22 @@ func TestVerifyCreateChangefeedConfig(t *testing.T) {
func TestVerifyUpdateChangefeedConfig(t *testing.T) {
ctx := context.Background()
cfg := &ChangefeedConfig{}
oldInfo := &model.ChangeFeedInfo{}
oldInfo := &model.ChangeFeedInfo{
Config: config.GetDefaultReplicaConfig(),
}
oldUpInfo := &model.UpstreamInfo{}
helper := entry.NewSchemaTestHelper(t)
helper.Tk().MustExec("use test;")
storage := helper.Storage()
h := &APIV2HelpersImpl{}
newCfInfo, newUpInfo, err := h.verifyUpdateChangefeedConfig(ctx, cfg, oldInfo, oldUpInfo)
newCfInfo, newUpInfo, err := h.verifyUpdateChangefeedConfig(ctx, cfg, oldInfo, oldUpInfo, storage, 0)
require.NotNil(t, err)
require.Nil(t, newCfInfo)
require.Nil(t, newUpInfo)
// namespace and id can not be updated
cfg.Namespace = "abc"
cfg.ID = "1234"
newCfInfo, newUpInfo, err = h.verifyUpdateChangefeedConfig(ctx, cfg, oldInfo, oldUpInfo)
newCfInfo, newUpInfo, err = h.verifyUpdateChangefeedConfig(ctx, cfg, oldInfo, oldUpInfo, storage, 0)
require.NotNil(t, err)
require.Nil(t, newCfInfo)
require.Nil(t, newUpInfo)
Expand All @@ -120,7 +123,7 @@ func TestVerifyUpdateChangefeedConfig(t *testing.T) {
cfg.KeyPath = "p3"
cfg.SinkURI = "blackhole://"
cfg.CertAllowedCN = []string{"c", "d"}
newCfInfo, newUpInfo, err = h.verifyUpdateChangefeedConfig(ctx, cfg, oldInfo, oldUpInfo)
newCfInfo, newUpInfo, err = h.verifyUpdateChangefeedConfig(ctx, cfg, oldInfo, oldUpInfo, storage, 0)
require.Nil(t, err)
// startTs can not be updated
require.Equal(t, "table", string(newCfInfo.Config.Sink.TxnAtomicity))
Expand All @@ -139,6 +142,6 @@ func TestVerifyUpdateChangefeedConfig(t *testing.T) {
require.Equal(t, "blackhole://", newCfInfo.SinkURI)
oldInfo.StartTs = 10
cfg.TargetTs = 9
newCfInfo, newUpInfo, err = h.verifyUpdateChangefeedConfig(ctx, cfg, oldInfo, oldUpInfo)
newCfInfo, newUpInfo, err = h.verifyUpdateChangefeedConfig(ctx, cfg, oldInfo, oldUpInfo, storage, 0)
require.NotNil(t, err)
}
29 changes: 28 additions & 1 deletion cdc/api/v2/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pingcap/tiflow/cdc/capture"
"github.com/pingcap/tiflow/cdc/model"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/security"
"github.com/pingcap/tiflow/pkg/txnutil/gc"
"github.com/pingcap/tiflow/pkg/upstream"
"go.uber.org/zap"
Expand Down Expand Up @@ -202,6 +203,12 @@ func (h *OpenAPIV2) updateChangefeed(c *gin.Context) {
GenWithStackByArgs("can only update changefeed config when it is stopped"))
return
}
cfStatus, err := h.capture.StatusProvider().GetChangeFeedStatus(ctx, changefeedID)
if err != nil {
_ = c.Error(err)
return
}

upInfo, err := h.capture.GetEtcdClient().
GetUpstreamInfo(ctx, cfInfo.UpstreamID, cfInfo.Namespace)
if err != nil {
Expand All @@ -226,8 +233,28 @@ func (h *OpenAPIV2) updateChangefeed(c *gin.Context) {
zap.String("changefeedInfo", cfInfo.String()),
zap.Any("upstreamInfo", upInfo))

var pdAddrs []string
var credentials *security.Credential
if upInfo != nil {
pdAddrs = strings.Split(upInfo.PDEndpoints, ",")
credentials = &security.Credential{
CAPath: upInfo.CAPath,
CertPath: upInfo.CertPath,
KeyPath: upInfo.KeyPath,
CertAllowedCN: upInfo.CertAllowedCN,
}
}
if len(updateCfConfig.PDAddrs) != 0 {
pdAddrs = updateCfConfig.PDAddrs
credentials = updateCfConfig.PDConfig.toCredential()
}

storage, err := h.helpers.createTiStore(pdAddrs, credentials)
if err != nil {
_ = c.Error(errors.Trace(err))
}
newCfInfo, newUpInfo, err := h.helpers.
verifyUpdateChangefeedConfig(ctx, updateCfConfig, cfInfo, upInfo)
verifyUpdateChangefeedConfig(ctx, updateCfConfig, cfInfo, upInfo, storage, cfStatus.CheckpointTs)
if err != nil {
_ = c.Error(errors.Trace(err))
return
Expand Down
13 changes: 10 additions & 3 deletions cdc/api/v2/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,10 +331,17 @@ func TestUpdateChangefeed(t *testing.T) {
verifyUpstream(gomock.Any(), gomock.Any(), gomock.Any()).
Return(nil).AnyTimes()
helpers.EXPECT().
verifyUpdateChangefeedConfig(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
createTiStore(gomock.Any(), gomock.Any()).
Return(nil, nil).
AnyTimes()
helpers.EXPECT().
verifyUpdateChangefeedConfig(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
Return(&model.ChangeFeedInfo{}, &model.UpstreamInfo{}, cerrors.ErrChangefeedUpdateRefused).
Times(1)

statusProvider.changefeedStatus = &model.ChangeFeedStatus{
CheckpointTs: 1,
}
w = httptest.NewRecorder()
req, _ = http.NewRequestWithContext(context.Background(), update.method,
fmt.Sprintf(update.url, validID), bytes.NewReader(body))
Expand All @@ -347,7 +354,7 @@ func TestUpdateChangefeed(t *testing.T) {

// case 7: update transaction failed
helpers.EXPECT().
verifyUpdateChangefeedConfig(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
verifyUpdateChangefeedConfig(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
Return(&model.ChangeFeedInfo{}, &model.UpstreamInfo{}, nil).
Times(1)
etcdClient.EXPECT().
Expand All @@ -366,7 +373,7 @@ func TestUpdateChangefeed(t *testing.T) {

// case 8: success
helpers.EXPECT().
verifyUpdateChangefeedConfig(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
verifyUpdateChangefeedConfig(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
Return(oldCfInfo, &model.UpstreamInfo{}, nil).
Times(1)
etcdClient.EXPECT().
Expand Down
Loading

0 comments on commit 8f3fd7a

Please sign in to comment.