From ea86abcd0775a0f6c9d16efe171a3e347d9a073e Mon Sep 17 00:00:00 2001 From: CharlesCheung Date: Tue, 3 Jan 2023 21:36:49 +0800 Subject: [PATCH] pre check consistent config --- cdc/owner/changefeed.go | 4 +- cdc/redo/common/model.go | 136 --------- cdc/redo/common/util.go | 85 ------ cdc/redo/common/util_test.go | 169 ------------ cdc/redo/manager.go | 5 +- cdc/redo/manager_test.go | 14 +- cdc/redo/options.go | 14 +- cdc/redo/reader/file.go | 26 +- cdc/redo/reader/file_test.go | 46 ++-- cdc/redo/reader/reader.go | 15 +- cdc/redo/reader/reader_test.go | 23 +- cdc/redo/writer/file.go | 25 +- cdc/redo/writer/file_test.go | 59 ++-- cdc/redo/writer/writer.go | 25 +- cdc/redo/writer/writer_test.go | 21 +- errors.toml | 5 - pkg/config/consistent.go | 10 +- pkg/errors/cdc_errors.go | 6 +- pkg/fsutil/file_allocator.go | 4 +- pkg/redo/config.go | 260 ++++++++++++++++++ pkg/redo/config_test.go | 189 +++++++++++++ .../_utils/start_tidb_cluster_impl | 36 +-- .../conf/changefeed.toml | 2 +- .../consistent_replicate_nfs/conf/workload | 2 +- .../consistent_replicate_nfs/run.sh | 1 + 25 files changed, 626 insertions(+), 556 deletions(-) delete mode 100644 cdc/redo/common/model.go create mode 100644 pkg/redo/config.go create mode 100644 pkg/redo/config_test.go diff --git a/cdc/owner/changefeed.go b/cdc/owner/changefeed.go index d25eeec85d9..1a0c47e68d0 100644 --- a/cdc/owner/changefeed.go +++ b/cdc/owner/changefeed.go @@ -26,7 +26,6 @@ import ( "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/puller" "github.com/pingcap/tiflow/cdc/redo" - redoCommon "github.com/pingcap/tiflow/cdc/redo/common" "github.com/pingcap/tiflow/cdc/scheduler" "github.com/pingcap/tiflow/cdc/sink" "github.com/pingcap/tiflow/pkg/config" @@ -34,6 +33,7 @@ import ( cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/orchestrator" "github.com/pingcap/tiflow/pkg/pdutil" + redoCfg "github.com/pingcap/tiflow/pkg/redo" "github.com/pingcap/tiflow/pkg/txnutil/gc" "github.com/pingcap/tiflow/pkg/upstream" "github.com/prometheus/client_golang/prometheus" @@ -675,7 +675,7 @@ func (c *changefeed) cleanupRedoManager(ctx context.Context) { log.Warn("changefeed is removed, but state is not complete", zap.Any("state", c.state)) return } - if !redoCommon.IsConsistentEnabled(c.state.Info.Config.Consistent.Level) { + if !redoCfg.IsConsistentEnabled(c.state.Info.Config.Consistent.Level) { return } // when removing a paused changefeed, the redo manager is nil, create a new one diff --git a/cdc/redo/common/model.go b/cdc/redo/common/model.go deleted file mode 100644 index d76822c3f9c..00000000000 --- a/cdc/redo/common/model.go +++ /dev/null @@ -1,136 +0,0 @@ -// Copyright 2022 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package common - -const ( - // DefaultFileMode is the default mode when operation files - DefaultFileMode = 0o644 - // DefaultDirMode is the default mode when operation dir - DefaultDirMode = 0o755 - - // TmpEXT is the file ext of log file before safely wrote to disk - TmpEXT = ".tmp" - // LogEXT is the file ext of log file after safely wrote to disk - LogEXT = ".log" - // MetaEXT is the meta file ext of meta file after safely wrote to disk - MetaEXT = ".meta" - // MetaTmpEXT is the meta file ext of meta file before safely wrote to disk - MetaTmpEXT = ".mtmp" - // SortLogEXT is the sorted log file ext of log file after safely wrote to disk - SortLogEXT = ".sort" - - // MinSectorSize is minimum sector size used when flushing log so that log can safely - // distinguish between torn writes and ordinary data corruption. - MinSectorSize = 512 -) - -const ( - // RedoMetaFileType is the default file type of meta file - RedoMetaFileType = "meta" - // RedoRowLogFileType is the default file type of row log file - RedoRowLogFileType = "row" - // RedoDDLLogFileType is the default file type of ddl log file - RedoDDLLogFileType = "ddl" -) - -// FileTypeConfig Specifies redo file type config. -type FileTypeConfig struct { - // Whether emitting redo meta or not. - EmitMeta bool - // Whether emitting row events or not. - EmitRowEvents bool - // Whether emitting DDL events or not. - EmitDDLEvents bool -} - -// ConsistentLevelType is the level of redo log consistent level. -type ConsistentLevelType string - -const ( - // ConsistentLevelNone no consistent guarantee. - ConsistentLevelNone ConsistentLevelType = "none" - // ConsistentLevelEventual eventual consistent. - ConsistentLevelEventual ConsistentLevelType = "eventual" -) - -// IsValidConsistentLevel checks whether a given consistent level is valid -func IsValidConsistentLevel(level string) bool { - switch ConsistentLevelType(level) { - case ConsistentLevelNone, ConsistentLevelEventual: - return true - default: - return false - } -} - -// IsConsistentEnabled returns whether the consistent feature is enabled. -func IsConsistentEnabled(level string) bool { - return IsValidConsistentLevel(level) && ConsistentLevelType(level) != ConsistentLevelNone -} - -// ConsistentStorage is the type of consistent storage. -type ConsistentStorage string - -const ( - // ConsistentStorageBlackhole is a blackhole storage, which will discard all data. - ConsistentStorageBlackhole ConsistentStorage = "blackhole" - // ConsistentStorageLocal is a local storage, which will store data in local disk. - ConsistentStorageLocal ConsistentStorage = "local" - // ConsistentStorageNFS is a NFS storage, which will store data in NFS. - ConsistentStorageNFS ConsistentStorage = "nfs" - - // ConsistentStorageS3 is a S3 storage, which will store data in S3. - ConsistentStorageS3 ConsistentStorage = "s3" - // ConsistentStorageGCS is a GCS storage, which will store data in GCS. - ConsistentStorageGCS ConsistentStorage = "gcs" - // ConsistentStorageGS is an alias of GCS storage. - ConsistentStorageGS ConsistentStorage = "gs" - // ConsistentStorageAzblob is a Azure Blob storage, which will store data in Azure Blob. - ConsistentStorageAzblob ConsistentStorage = "azblob" - // ConsistentStorageAzure is an alias of Azure Blob storage. - ConsistentStorageAzure ConsistentStorage = "azure" - // ConsistentStorageFile is an external storage based on local files and - // will only be used for testing. - ConsistentStorageFile ConsistentStorage = "file" - // ConsistentStorageNoop is a noop storage, which simply discard all data. - ConsistentStorageNoop ConsistentStorage = "noop" -) - -// IsValidConsistentStorage checks whether a give consistent storage is valid. -func IsValidConsistentStorage(scheme string) bool { - switch ConsistentStorage(scheme) { - case ConsistentStorageBlackhole, ConsistentStorageLocal, ConsistentStorageNFS, - ConsistentStorageS3, ConsistentStorageGCS, ConsistentStorageGS, - ConsistentStorageAzblob, ConsistentStorageAzure, ConsistentStorageFile, - ConsistentStorageNoop: - return true - default: - return false - } -} - -// IsExternalStorage returns whether an external storage is used. -func IsExternalStorage(storage string) bool { - return !IsLocalStorage(storage) -} - -// IsLocalStorage returns whether a local storage is used. -func IsLocalStorage(storage string) bool { - switch ConsistentStorage(storage) { - case ConsistentStorageBlackhole, ConsistentStorageLocal, ConsistentStorageNFS: - return true - default: - return false - } -} diff --git a/cdc/redo/common/util.go b/cdc/redo/common/util.go index e79c72f6650..1d3d374d7b5 100644 --- a/cdc/redo/common/util.go +++ b/cdc/redo/common/util.go @@ -14,97 +14,12 @@ package common import ( - "context" "fmt" - "net/url" - "path/filepath" "strings" - "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tiflow/cdc/model" - "github.com/pingcap/tiflow/pkg/errors" - "github.com/pingcap/tiflow/pkg/util" ) -const ( - // RedoLogFileFormatV1 was used before v6.1.0, which doesn't contain namespace information - // layout: captureID_changefeedID_fileType_maxEventCommitTs_uuid.fileExtName - RedoLogFileFormatV1 = "%s_%s_%s_%d_%s%s" - // RedoLogFileFormatV2 is available since v6.1.0, which contains namespace information - // layout: captureID_namespace_changefeedID_fileType_maxEventCommitTs_uuid.fileExtName - RedoLogFileFormatV2 = "%s_%s_%s_%s_%d_%s%s" -) - -// InitExternalStorage init an external storage. -var InitExternalStorage = func(ctx context.Context, uri url.URL) (storage.ExternalStorage, error) { - if ConsistentStorage(uri.Scheme) == ConsistentStorageS3 && len(uri.Host) == 0 { - // TODO: this branch is compatible with previous s3 logic and will be removed - // in the future. - return nil, errors.WrapChangefeedUnretryableErr(errors.ErrS3StorageInitialize, - errors.Errorf("please specify the bucket for %+v", uri)) - } - s, err := util.GetExternalStorage(ctx, uri.String(), nil) - if err != nil { - return nil, errors.WrapChangefeedUnretryableErr(errors.ErrS3StorageInitialize, err) - } - return s, nil -} - -// logFormat2ParseFormat converts redo log file name format to the space separated -// format, which can be read and parsed by sscanf. Besides remove the suffix `%s` -// which is used as file name extension, since we will parse extension first. -func logFormat2ParseFormat(fmtStr string) string { - return strings.TrimSuffix(strings.ReplaceAll(fmtStr, "_", " "), "%s") -} - -// ParseLogFileName extract the commitTs, fileType from log fileName -func ParseLogFileName(name string) (uint64, string, error) { - ext := filepath.Ext(name) - if ext == MetaEXT { - return 0, RedoMetaFileType, nil - } - - // if .sort, the name should be like - // fmt.Sprintf("%s_%s_%s_%d_%s_%d%s", w.cfg.captureID, - // w.cfg.changeFeedID.Namespace,w.cfg.changeFeedID.ID, - // w.cfg.fileType, w.commitTS.Load(), uuid, LogEXT)+SortLogEXT - if ext == SortLogEXT { - name = strings.TrimSuffix(name, SortLogEXT) - ext = filepath.Ext(name) - } - if ext != LogEXT && ext != TmpEXT { - return 0, "", nil - } - - var commitTs uint64 - var captureID, namespace, changefeedID, fileType, uid string - // if the namespace is not default, the log looks like: - // fmt.Sprintf("%s_%s_%s_%s_%d_%s%s", w.cfg.captureID, - // w.cfg.changeFeedID.Namespace,w.cfg.changeFeedID.ID, - // w.cfg.fileType, w.commitTS.Load(), uuid, redo.LogEXT) - // otherwise it looks like: - // fmt.Sprintf("%s_%s_%s_%d_%s%s", w.cfg.captureID, - // w.cfg.changeFeedID.ID, - // w.cfg.fileType, w.commitTS.Load(), uuid, redo.LogEXT) - var ( - vars []any - formatStr string - ) - if len(strings.Split(name, "_")) == 6 { - formatStr = logFormat2ParseFormat(RedoLogFileFormatV2) - vars = []any{&captureID, &namespace, &changefeedID, &fileType, &commitTs, &uid} - } else { - formatStr = logFormat2ParseFormat(RedoLogFileFormatV1) - vars = []any{&captureID, &changefeedID, &fileType, &commitTs, &uid} - } - name = strings.ReplaceAll(name, "_", " ") - _, err := fmt.Sscanf(name, formatStr, vars...) - if err != nil { - return 0, "", errors.Annotatef(err, "bad log name: %s", name) - } - return commitTs, fileType, nil -} - // FilterChangefeedFiles return the files that match to the changefeed. func FilterChangefeedFiles(files []string, changefeedID model.ChangeFeedID) []string { var ( diff --git a/cdc/redo/common/util_test.go b/cdc/redo/common/util_test.go index f36429b5aab..c07085829ef 100644 --- a/cdc/redo/common/util_test.go +++ b/cdc/redo/common/util_test.go @@ -14,165 +14,12 @@ package common import ( - "context" - "fmt" "testing" - "github.com/google/uuid" - "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tiflow/cdc/model" "github.com/stretchr/testify/require" ) -func TestParseLogFileName(t *testing.T) { - t.Parallel() - - type arg struct { - name string - } - tests := []struct { - name string - args arg - wantTs uint64 - wantFileType string - wantErr string - }{ - { - name: "happy row .log", - args: arg{ - name: fmt.Sprintf(RedoLogFileFormatV1, "cp", - "test", - RedoRowLogFileType, 1, uuid.NewString(), LogEXT), - }, - wantTs: 1, - wantFileType: RedoRowLogFileType, - }, - { - name: "happy row .log", - args: arg{ - name: fmt.Sprintf(RedoLogFileFormatV2, "cp", - "namespace", "test", - RedoRowLogFileType, 1, uuid.NewString(), LogEXT), - }, - wantTs: 1, - wantFileType: RedoRowLogFileType, - }, - { - name: "happy row .tmp", - args: arg{ - name: fmt.Sprintf(RedoLogFileFormatV1, "cp", - "test", - RedoRowLogFileType, 1, uuid.NewString(), LogEXT) + TmpEXT, - }, - wantTs: 1, - wantFileType: RedoRowLogFileType, - }, - { - name: "happy row .tmp", - args: arg{ - name: fmt.Sprintf(RedoLogFileFormatV2, "cp", - "namespace", "test", - RedoRowLogFileType, 1, uuid.NewString(), LogEXT) + TmpEXT, - }, - wantTs: 1, - wantFileType: RedoRowLogFileType, - }, - { - name: "happy ddl .log", - args: arg{ - name: fmt.Sprintf(RedoLogFileFormatV1, "cp", - "test", - RedoDDLLogFileType, 1, uuid.NewString(), LogEXT), - }, - wantTs: 1, - wantFileType: RedoDDLLogFileType, - }, - { - name: "happy ddl .log", - args: arg{ - name: fmt.Sprintf(RedoLogFileFormatV2, "cp", - "namespace", "test", - RedoDDLLogFileType, 1, uuid.NewString(), LogEXT), - }, - wantTs: 1, - wantFileType: RedoDDLLogFileType, - }, - { - name: "happy ddl .sort", - args: arg{ - name: fmt.Sprintf(RedoLogFileFormatV2, "cp", - "default", "test", - RedoDDLLogFileType, 1, uuid.NewString(), LogEXT) + SortLogEXT, - }, - wantTs: 1, - wantFileType: RedoDDLLogFileType, - }, - { - name: "happy ddl .sort", - args: arg{ - name: fmt.Sprintf(RedoLogFileFormatV2, "cp", - "namespace", "test", - RedoDDLLogFileType, 1, uuid.NewString(), LogEXT) + SortLogEXT, - }, - wantTs: 1, - wantFileType: RedoDDLLogFileType, - }, - { - name: "happy ddl .tmp", - args: arg{ - name: fmt.Sprintf(RedoLogFileFormatV1, "cp", - "test", - RedoDDLLogFileType, 1, uuid.NewString(), LogEXT) + TmpEXT, - }, - wantTs: 1, - wantFileType: RedoDDLLogFileType, - }, - { - name: "happy ddl .tmp", - args: arg{ - name: fmt.Sprintf(RedoLogFileFormatV2, "cp", - "namespace", "test", - RedoDDLLogFileType, 1, uuid.NewString(), LogEXT) + TmpEXT, - }, - wantTs: 1, - wantFileType: RedoDDLLogFileType, - }, - { - name: "happy .meta", - args: arg{ - name: "sdfsdfsf" + MetaEXT, - }, - wantTs: 0, - wantFileType: RedoMetaFileType, - }, - { - name: "not supported fileType", - args: arg{ - name: "sdfsdfsf.sfsf", - }, - }, - { - name: "err wrong format ddl .tmp", - args: arg{ - name: fmt.Sprintf("%s_%s_%s_%s_%d%s%s", /* a wrong format */ - "cp", "default", "test", - RedoDDLLogFileType, 1, uuid.NewString(), LogEXT) + TmpEXT, - }, - wantErr: ".*bad log name*.", - }, - } - for _, tt := range tests { - ts, fileType, err := ParseLogFileName(tt.args.name) - if tt.wantErr != "" { - require.Regexp(t, tt.wantErr, err, tt.name) - } else { - require.Nil(t, err, tt.name) - require.EqualValues(t, tt.wantTs, ts, tt.name) - require.Equal(t, tt.wantFileType, fileType, tt.name) - } - } -} - func TestGetChangefeedFiles(t *testing.T) { t.Parallel() @@ -210,19 +57,3 @@ func TestGetChangefeedFiles(t *testing.T) { require.Equal(t, c.want, got) } } - -func TestInitExternalStorage(t *testing.T) { - t.Parallel() - - dir := t.TempDir() - urls := []string{ - fmt.Sprintf("file://%s/test", dir), - } - - for _, urlStr := range urls { - url, err := storage.ParseRawURL(urlStr) - require.NoError(t, err) - _, err = InitExternalStorage(context.Background(), *url) - require.NoError(t, err) - } -} diff --git a/cdc/redo/manager.go b/cdc/redo/manager.go index a11c6c0fb28..a49ca6d4544 100644 --- a/cdc/redo/manager.go +++ b/cdc/redo/manager.go @@ -30,6 +30,7 @@ import ( "github.com/pingcap/tiflow/pkg/chann" "github.com/pingcap/tiflow/pkg/config" cerror "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/redo" "github.com/pingcap/tiflow/pkg/spanz" "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" @@ -149,7 +150,7 @@ type ManagerImpl struct { // NewManager creates a new Manager func NewManager(ctx context.Context, cfg *config.ConsistentConfig, opts *ManagerOptions) (*ManagerImpl, error) { // return a disabled Manager if no consistent config or normal consistent level - if cfg == nil || !common.IsConsistentEnabled(cfg.Level) { + if cfg == nil || !redo.IsConsistentEnabled(cfg.Level) { return &ManagerImpl{enabled: false}, nil } @@ -200,7 +201,7 @@ func NewDisabledManager() *ManagerImpl { // NewMockManager returns a mock redo manager instance, used in test only func NewMockManager(ctx context.Context) (*ManagerImpl, error) { cfg := &config.ConsistentConfig{ - Level: string(common.ConsistentLevelEventual), + Level: string(redo.ConsistentLevelEventual), Storage: "blackhole://", FlushIntervalInMs: config.MinFlushIntervalInMs, } diff --git a/cdc/redo/manager_test.go b/cdc/redo/manager_test.go index 8af1e72a140..2205b976465 100644 --- a/cdc/redo/manager_test.go +++ b/cdc/redo/manager_test.go @@ -25,10 +25,10 @@ import ( "github.com/pingcap/tiflow/cdc/contextutil" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/processor/tablepb" - "github.com/pingcap/tiflow/cdc/redo/common" "github.com/pingcap/tiflow/cdc/redo/writer" "github.com/pingcap/tiflow/pkg/chann" "github.com/pingcap/tiflow/pkg/config" + "github.com/pingcap/tiflow/pkg/redo" "github.com/pingcap/tiflow/pkg/spanz" "github.com/stretchr/testify/require" "go.uber.org/zap" @@ -46,7 +46,7 @@ func TestConsistentConfig(t *testing.T) { {"", false}, } for _, lc := range levelCases { - require.Equal(t, lc.valid, common.IsValidConsistentLevel(lc.level)) + require.Equal(t, lc.valid, redo.IsValidConsistentLevel(lc.level)) } levelEnableCases := []struct { @@ -58,7 +58,7 @@ func TestConsistentConfig(t *testing.T) { {"eventual", true}, } for _, lc := range levelEnableCases { - require.Equal(t, lc.consistent, common.IsConsistentEnabled(lc.level)) + require.Equal(t, lc.consistent, redo.IsConsistentEnabled(lc.level)) } storageCases := []struct { @@ -73,7 +73,7 @@ func TestConsistentConfig(t *testing.T) { {"", false}, } for _, sc := range storageCases { - require.Equal(t, sc.valid, common.IsValidConsistentStorage(sc.storage)) + require.Equal(t, sc.valid, redo.IsValidConsistentStorage(sc.storage)) } s3StorageCases := []struct { @@ -86,7 +86,7 @@ func TestConsistentConfig(t *testing.T) { {"blackhole", false}, } for _, sc := range s3StorageCases { - require.Equal(t, sc.s3Enabled, common.IsExternalStorage(sc.storage)) + require.Equal(t, sc.s3Enabled, redo.IsExternalStorage(sc.storage)) } } @@ -333,7 +333,7 @@ func TestManagerError(t *testing.T) { defer cancel() cfg := &config.ConsistentConfig{ - Level: string(common.ConsistentLevelEventual), + Level: string(redo.ConsistentLevelEventual), Storage: "blackhole://", FlushIntervalInMs: config.MinFlushIntervalInMs, } @@ -398,7 +398,7 @@ func TestReuseWritter(t *testing.T) { dir := t.TempDir() cfg := &config.ConsistentConfig{ - Level: string(common.ConsistentLevelEventual), + Level: string(redo.ConsistentLevelEventual), Storage: "local://" + dir, FlushIntervalInMs: config.MinFlushIntervalInMs, } diff --git a/cdc/redo/options.go b/cdc/redo/options.go index 5fca5b6f743..3b97295bd41 100644 --- a/cdc/redo/options.go +++ b/cdc/redo/options.go @@ -13,11 +13,13 @@ package redo -import "github.com/pingcap/tiflow/cdc/redo/common" +import ( + "github.com/pingcap/tiflow/pkg/redo" +) // ManagerOptions defines options for redo log manager. type ManagerOptions struct { - common.FileTypeConfig + redo.FileTypeConfig // Whether to run background flush goroutine. EnableBgRunner bool @@ -30,7 +32,7 @@ type ManagerOptions struct { // NewOwnerManagerOptions creates a manager options for owner. func NewOwnerManagerOptions(errCh chan<- error) *ManagerOptions { return &ManagerOptions{ - FileTypeConfig: common.FileTypeConfig{ + FileTypeConfig: redo.FileTypeConfig{ EmitMeta: true, EmitRowEvents: false, EmitDDLEvents: true, @@ -44,7 +46,7 @@ func NewOwnerManagerOptions(errCh chan<- error) *ManagerOptions { // NewProcessorManagerOptions creates a manager options for processor. func NewProcessorManagerOptions(errCh chan<- error) *ManagerOptions { return &ManagerOptions{ - FileTypeConfig: common.FileTypeConfig{ + FileTypeConfig: redo.FileTypeConfig{ EmitMeta: false, EmitRowEvents: true, EmitDDLEvents: false, @@ -58,7 +60,7 @@ func NewProcessorManagerOptions(errCh chan<- error) *ManagerOptions { // NewManagerOptionsForClean creates a manager options for cleaning. func NewManagerOptionsForClean() *ManagerOptions { return &ManagerOptions{ - FileTypeConfig: common.FileTypeConfig{ + FileTypeConfig: redo.FileTypeConfig{ EmitMeta: false, EmitRowEvents: false, EmitDDLEvents: false, @@ -71,7 +73,7 @@ func NewManagerOptionsForClean() *ManagerOptions { // newMockManagerOptions creates a manager options for mock tests. func newMockManagerOptions(errCh chan<- error) *ManagerOptions { return &ManagerOptions{ - FileTypeConfig: common.FileTypeConfig{ + FileTypeConfig: redo.FileTypeConfig{ EmitMeta: true, EmitRowEvents: true, EmitDDLEvents: true, diff --git a/cdc/redo/reader/file.go b/cdc/redo/reader/file.go index 21436887599..a1f76575ebf 100644 --- a/cdc/redo/reader/file.go +++ b/cdc/redo/reader/file.go @@ -30,9 +30,9 @@ import ( "github.com/pingcap/log" "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tiflow/cdc/model" - "github.com/pingcap/tiflow/cdc/redo/common" "github.com/pingcap/tiflow/cdc/redo/writer" cerror "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/redo" "go.uber.org/multierr" "go.uber.org/zap" "golang.org/x/sync/errgroup" @@ -82,7 +82,7 @@ func newReader(ctx context.Context, cfg *readerConfig) ([]fileReader, error) { } if cfg.useExternalStorage { - extStorage, err := common.InitExternalStorage(ctx, cfg.uri) + extStorage, err := redo.InitExternalStorage(ctx, cfg.uri) if err != nil { return nil, err } @@ -119,7 +119,7 @@ func selectDownLoadFile(ctx context.Context, extStorage storage.ExternalStorage, files := []string{} err := extStorage.WalkDir(ctx, &storage.WalkOption{}, func(path string, size int64) error { fileName := filepath.Base(path) - _, fileType, err := common.ParseLogFileName(fileName) + _, fileType, err := redo.ParseLogFileName(fileName) if err != nil { return err } @@ -151,12 +151,12 @@ func downLoadToLocal(ctx context.Context, dir string, extStorage storage.Externa return cerror.WrapError(cerror.ErrS3StorageAPI, err) } - err = os.MkdirAll(dir, common.DefaultDirMode) + err = os.MkdirAll(dir, redo.DefaultDirMode) if err != nil { return cerror.WrapError(cerror.ErrRedoFileOp, err) } path := filepath.Join(dir, f) - err = os.WriteFile(path, data, common.DefaultFileMode) + err = os.WriteFile(path, data, redo.DefaultFileMode) return cerror.WrapError(cerror.ErrRedoFileOp, err) }) } @@ -172,7 +172,7 @@ func openSelectedFiles(ctx context.Context, dir, fixedType string, startTs uint6 sortedFileList := map[string]bool{} for _, file := range files { - if filepath.Ext(file.Name()) == common.SortLogEXT { + if filepath.Ext(file.Name()) == redo.SortLogEXT { sortedFileList[file.Name()] = false } } @@ -191,8 +191,8 @@ func openSelectedFiles(ctx context.Context, dir, fixedType string, startTs uint6 if ret { sortedName := name - if filepath.Ext(sortedName) != common.SortLogEXT { - sortedName += common.SortLogEXT + if filepath.Ext(sortedName) != redo.SortLogEXT { + sortedName += redo.SortLogEXT } if opened, ok := sortedFileList[sortedName]; ok { if opened { @@ -221,7 +221,7 @@ func openSelectedFiles(ctx context.Context, dir, fixedType string, startTs uint6 } func openReadFile(name string) (*os.File, error) { - return os.OpenFile(name, os.O_RDONLY, common.DefaultFileMode) + return os.OpenFile(name, os.O_RDONLY, redo.DefaultFileMode) } func readFile(file *os.File) (logHeap, error) { @@ -333,7 +333,7 @@ func createSortedFile(ctx context.Context, dir string, name string, errCh chan e return } - sortFileName := name + common.SortLogEXT + sortFileName := name + redo.SortLogEXT err = writFile(ctx, dir, sortFileName, h) if err != nil { errCh <- err @@ -350,7 +350,7 @@ func createSortedFile(ctx context.Context, dir string, name string, errCh chan e func shouldOpen(startTs uint64, name, fixedType string) (bool, error) { // .sort.tmp will return error - commitTs, fileType, err := common.ParseLogFileName(name) + commitTs, fileType, err := redo.ParseLogFileName(name) if err != nil { return false, err } @@ -358,7 +358,7 @@ func shouldOpen(startTs uint64, name, fixedType string) (bool, error) { return false, nil } // always open .tmp - if filepath.Ext(name) == common.TmpEXT { + if filepath.Ext(name) == redo.TmpEXT { return true, nil } // the commitTs=max(ts of log item in the file), if max > startTs then should open, @@ -436,7 +436,7 @@ func (r *reader) isTornEntry(data []byte) bool { chunks := [][]byte{} // split data on sector boundaries for curOff < len(data) { - chunkLen := int(common.MinSectorSize - (fileOff % common.MinSectorSize)) + chunkLen := int(redo.MinSectorSize - (fileOff % redo.MinSectorSize)) if chunkLen > len(data)-curOff { chunkLen = len(data) - curOff } diff --git a/cdc/redo/reader/file_test.go b/cdc/redo/reader/file_test.go index 3baeee9b531..2be067047b6 100644 --- a/cdc/redo/reader/file_test.go +++ b/cdc/redo/reader/file_test.go @@ -23,8 +23,8 @@ import ( "time" "github.com/pingcap/tiflow/cdc/model" - "github.com/pingcap/tiflow/cdc/redo/common" "github.com/pingcap/tiflow/cdc/redo/writer" + "github.com/pingcap/tiflow/pkg/redo" "github.com/pingcap/tiflow/pkg/uuid" "github.com/stretchr/testify/require" "golang.org/x/net/context" @@ -47,7 +47,7 @@ func TestReaderRead(t *testing.T) { Dir: dir, ChangeFeedID: model.DefaultChangeFeedID("test-cf"), CaptureID: "cp", - FileType: common.RedoRowLogFileType, + FileType: redo.RedoRowLogFileType, } ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -68,9 +68,9 @@ func TestReaderRead(t *testing.T) { err = w.Close() require.Nil(t, err) require.True(t, !w.IsRunning()) - fileName := fmt.Sprintf(common.RedoLogFileFormatV1, cfg.CaptureID, + fileName := fmt.Sprintf(redo.RedoLogFileFormatV1, cfg.CaptureID, cfg.ChangeFeedID.ID, - cfg.FileType, 11, uuidGen.NewString(), common.LogEXT) + cfg.FileType, 11, uuidGen.NewString(), redo.LogEXT) path := filepath.Join(cfg.Dir, fileName) info, err := os.Stat(path) require.Nil(t, err) @@ -80,7 +80,7 @@ func TestReaderRead(t *testing.T) { dir: dir, startTs: 1, endTs: 12, - fileType: common.RedoRowLogFileType, + fileType: redo.RedoRowLogFileType, }) require.Nil(t, err) require.Equal(t, 1, len(r)) @@ -102,9 +102,9 @@ func TestReaderOpenSelectedFiles(t *testing.T) { Dir: dir, } uuidGen := uuid.NewGenerator() - fileName := fmt.Sprintf(common.RedoLogFileFormatV2, "cp", - "default", "test-cf", common.RedoDDLLogFileType, 11, - uuidGen.NewString(), common.LogEXT+common.TmpEXT) + fileName := fmt.Sprintf(redo.RedoLogFileFormatV2, "cp", + "default", "test-cf", redo.RedoDDLLogFileType, 11, + uuidGen.NewString(), redo.LogEXT+redo.TmpEXT) w, err := writer.NewWriter(ctx, cfg, writer.WithLogFileName(func() string { return fileName })) @@ -130,24 +130,24 @@ func TestReaderOpenSelectedFiles(t *testing.T) { require.Nil(t, err) // no data, wil not open - fileName = fmt.Sprintf(common.RedoLogFileFormatV2, "cp", - "default", "test-cf11", common.RedoDDLLogFileType, 10, - uuidGen.NewString(), common.LogEXT) + fileName = fmt.Sprintf(redo.RedoLogFileFormatV2, "cp", + "default", "test-cf11", redo.RedoDDLLogFileType, 10, + uuidGen.NewString(), redo.LogEXT) path = filepath.Join(dir, fileName) _, err = os.Create(path) require.Nil(t, err) // SortLogEXT, wil open - fileName = fmt.Sprintf(common.RedoLogFileFormatV2, "cp", "default", - "test-cf111", common.RedoDDLLogFileType, 10, uuidGen.NewString(), - common.LogEXT) + common.SortLogEXT + fileName = fmt.Sprintf(redo.RedoLogFileFormatV2, "cp", "default", + "test-cf111", redo.RedoDDLLogFileType, 10, uuidGen.NewString(), + redo.LogEXT) + redo.SortLogEXT path = filepath.Join(dir, fileName) f1, err := os.Create(path) require.Nil(t, err) dir1 := t.TempDir() - fileName = fmt.Sprintf(common.RedoLogFileFormatV2, "cp", "default", "test-cf", - common.RedoDDLLogFileType, 11, uuidGen.NewString(), common.LogEXT+"test") + fileName = fmt.Sprintf(redo.RedoLogFileFormatV2, "cp", "default", "test-cf", + redo.RedoDDLLogFileType, 11, uuidGen.NewString(), redo.LogEXT+"test") path = filepath.Join(dir1, fileName) _, err = os.Create(path) require.Nil(t, err) @@ -167,7 +167,7 @@ func TestReaderOpenSelectedFiles(t *testing.T) { name: "dir not exist", args: arg{ dir: dir + "test", - fixedName: common.RedoDDLLogFileType, + fixedName: redo.RedoDDLLogFileType, startTs: 0, }, wantErr: ".*CDC:ErrRedoFileOp*.", @@ -176,7 +176,7 @@ func TestReaderOpenSelectedFiles(t *testing.T) { name: "happy", args: arg{ dir: dir, - fixedName: common.RedoDDLLogFileType, + fixedName: redo.RedoDDLLogFileType, startTs: 0, }, wantRet: []io.ReadCloser{f, f1}, @@ -185,7 +185,7 @@ func TestReaderOpenSelectedFiles(t *testing.T) { name: "wrong ts", args: arg{ dir: dir, - fixedName: common.RedoDDLLogFileType, + fixedName: redo.RedoDDLLogFileType, startTs: 12, }, wantRet: []io.ReadCloser{f}, @@ -194,7 +194,7 @@ func TestReaderOpenSelectedFiles(t *testing.T) { name: "wrong fixedName", args: arg{ dir: dir, - fixedName: common.RedoDDLLogFileType + "test", + fixedName: redo.RedoDDLLogFileType + "test", startTs: 0, }, }, @@ -202,7 +202,7 @@ func TestReaderOpenSelectedFiles(t *testing.T) { name: "wrong ext", args: arg{ dir: dir1, - fixedName: common.RedoDDLLogFileType, + fixedName: redo.RedoDDLLogFileType, startTs: 0, }, }, @@ -215,8 +215,8 @@ func TestReaderOpenSelectedFiles(t *testing.T) { require.Equal(t, len(tt.wantRet), len(ret), tt.name) for _, closer := range tt.wantRet { name := closer.(*os.File).Name() - if filepath.Ext(name) != common.SortLogEXT { - name += common.SortLogEXT + if filepath.Ext(name) != redo.SortLogEXT { + name += redo.SortLogEXT } contains := false for _, r := range ret { diff --git a/cdc/redo/reader/reader.go b/cdc/redo/reader/reader.go index efd1d714d2f..3faf1079912 100644 --- a/cdc/redo/reader/reader.go +++ b/cdc/redo/reader/reader.go @@ -27,6 +27,7 @@ import ( "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/redo/common" cerror "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/redo" "go.uber.org/multierr" "go.uber.org/zap" ) @@ -53,10 +54,10 @@ type RedoLogReader interface { func NewRedoLogReader( ctx context.Context, storageType string, cfg *LogReaderConfig, ) (rd RedoLogReader, err error) { - if !common.IsValidConsistentStorage(storageType) { + if !redo.IsValidConsistentStorage(storageType) { return nil, cerror.ErrConsistentStorage.GenWithStackByArgs(storageType) } - if common.ConsistentStorage(storageType) == common.ConsistentStorageBlackhole { + if redo.IsBlackholeStorage(storageType) { return newBlackHoleReader(), nil } return newLogReader(ctx, cfg) @@ -109,7 +110,7 @@ func newLogReader(ctx context.Context, cfg *LogReaderConfig) (*LogReader, error) cfg: cfg, } if cfg.UseExternalStorage { - extStorage, err := common.InitExternalStorage(ctx, cfg.URI) + extStorage, err := redo.InitExternalStorage(ctx, cfg.URI) if err != nil { return nil, err } @@ -118,7 +119,7 @@ func newLogReader(ctx context.Context, cfg *LogReaderConfig) (*LogReader, error) if err != nil { return nil, cerror.WrapError(cerror.ErrRedoFileOp, err) } - err = downLoadToLocal(ctx, cfg.Dir, extStorage, common.RedoMetaFileType) + err = downLoadToLocal(ctx, cfg.Dir, extStorage, redo.RedoMetaFileType) if err != nil { return nil, cerror.WrapError(cerror.ErrRedoDownloadFailed, err) } @@ -173,7 +174,7 @@ func (l *LogReader) setUpRowReader(ctx context.Context, startTs, endTs uint64) e startTs: startTs, endTs: endTs, dir: l.cfg.Dir, - fileType: common.RedoRowLogFileType, + fileType: redo.RedoRowLogFileType, uri: l.cfg.URI, useExternalStorage: l.cfg.UseExternalStorage, workerNums: l.cfg.WorkerNums, @@ -202,7 +203,7 @@ func (l *LogReader) setUpDDLReader(ctx context.Context, startTs, endTs uint64) e startTs: startTs, endTs: endTs, dir: l.cfg.Dir, - fileType: common.RedoDDLLogFileType, + fileType: redo.RedoDDLLogFileType, uri: l.cfg.URI, useExternalStorage: l.cfg.UseExternalStorage, workerNums: l.cfg.WorkerNums, @@ -366,7 +367,7 @@ func (l *LogReader) ReadMeta(ctx context.Context) (checkpointTs, resolvedTs uint metas := make([]*common.LogMeta, 0, 64) for _, file := range files { - if filepath.Ext(file.Name()) == common.MetaEXT { + if filepath.Ext(file.Name()) == redo.MetaEXT { path := filepath.Join(l.cfg.Dir, file.Name()) fileData, err := os.ReadFile(path) if err != nil { diff --git a/cdc/redo/reader/reader_test.go b/cdc/redo/reader/reader_test.go index 4289edc97f5..261eb96219a 100644 --- a/cdc/redo/reader/reader_test.go +++ b/cdc/redo/reader/reader_test.go @@ -31,6 +31,7 @@ import ( "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/redo/common" "github.com/pingcap/tiflow/cdc/redo/writer" + "github.com/pingcap/tiflow/pkg/redo" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "go.uber.org/multierr" @@ -48,15 +49,15 @@ func TestNewLogReader(t *testing.T) { s3URI, err := url.Parse("s3://logbucket/test-changefeed?endpoint=http://111/") require.Nil(t, err) - origin := common.InitExternalStorage + origin := redo.InitExternalStorage defer func() { - common.InitExternalStorage = origin + redo.InitExternalStorage = origin }() controller := gomock.NewController(t) mockStorage := mockstorage.NewMockExternalStorage(controller) // no file to download mockStorage.EXPECT().WalkDir(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil) - common.InitExternalStorage = func(ctx context.Context, uri url.URL) (storage.ExternalStorage, error) { + redo.InitExternalStorage = func(ctx context.Context, uri url.URL) (storage.ExternalStorage, error) { return mockStorage, nil } @@ -80,9 +81,9 @@ func TestLogReaderResetReader(t *testing.T) { MaxLogSize: 100000, Dir: dir, } - fileName := fmt.Sprintf(common.RedoLogFileFormatV2, "cp", + fileName := fmt.Sprintf(redo.RedoLogFileFormatV2, "cp", "default", "test-cf100", - common.RedoDDLLogFileType, 100, uuid.NewString(), common.LogEXT) + redo.RedoDDLLogFileType, 100, uuid.NewString(), redo.LogEXT) w, err := writer.NewWriter(ctx, cfg, writer.WithLogFileName(func() string { return fileName })) @@ -101,9 +102,9 @@ func TestLogReaderResetReader(t *testing.T) { f, err := os.Open(path) require.Nil(t, err) - fileName = fmt.Sprintf(common.RedoLogFileFormatV2, "cp", + fileName = fmt.Sprintf(redo.RedoLogFileFormatV2, "cp", "default", "test-cf10", - common.RedoRowLogFileType, 10, uuid.NewString(), common.LogEXT) + redo.RedoRowLogFileType, 10, uuid.NewString(), redo.LogEXT) w, err = writer.NewWriter(ctx, cfg, writer.WithLogFileName(func() string { return fileName })) @@ -224,8 +225,8 @@ func TestLogReaderResetReader(t *testing.T) { } else { require.Nil(t, err, tt.name) mockReader.AssertNumberOfCalls(t, "Close", 2) - require.Equal(t, tt.rowFleName+common.SortLogEXT, r.rowReader[0].(*reader).fileName, tt.name) - require.Equal(t, tt.ddlFleName+common.SortLogEXT, r.ddlReader[0].(*reader).fileName, tt.name) + require.Equal(t, tt.rowFleName+redo.SortLogEXT, r.rowReader[0].(*reader).fileName, tt.name) + require.Equal(t, tt.ddlFleName+redo.SortLogEXT, r.ddlReader[0].(*reader).fileName, tt.name) require.Equal(t, tt.wantStartTs, r.cfg.startTs, tt.name) require.Equal(t, tt.wantEndTs, r.cfg.endTs, tt.name) @@ -239,7 +240,7 @@ func TestLogReaderReadMeta(t *testing.T) { fileName := fmt.Sprintf("%s_%s_%d_%s%s", "cp", "test-changefeed", - time.Now().Unix(), common.RedoMetaFileType, common.MetaEXT) + time.Now().Unix(), redo.RedoMetaFileType, redo.MetaEXT) path := filepath.Join(dir, fileName) f, err := os.Create(path) require.Nil(t, err) @@ -254,7 +255,7 @@ func TestLogReaderReadMeta(t *testing.T) { fileName = fmt.Sprintf("%s_%s_%d_%s%s", "cp1", "test-changefeed", - time.Now().Unix(), common.RedoMetaFileType, common.MetaEXT) + time.Now().Unix(), redo.RedoMetaFileType, redo.MetaEXT) path = filepath.Join(dir, fileName) f, err = os.Create(path) require.Nil(t, err) diff --git a/cdc/redo/writer/file.go b/cdc/redo/writer/file.go index bd4a9b85d0f..c2b74318d48 100644 --- a/cdc/redo/writer/file.go +++ b/cdc/redo/writer/file.go @@ -32,6 +32,7 @@ import ( "github.com/pingcap/tiflow/cdc/redo/common" cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/fsutil" + "github.com/pingcap/tiflow/pkg/redo" "github.com/pingcap/tiflow/pkg/uuid" "github.com/prometheus/client_golang/prometheus" "github.com/uber-go/atomic" @@ -44,7 +45,7 @@ const ( // pageBytes is the alignment for flushing records to the backing Writer. // It should be a multiple of the minimum sector size so that log can safely // distinguish between torn writes and ordinary data corruption. - pageBytes = 8 * common.MinSectorSize + pageBytes = 8 * redo.MinSectorSize defaultS3Timeout = 15 * time.Second ) @@ -152,7 +153,7 @@ func NewWriter(ctx context.Context, cfg *FileWriterConfig, opts ...Option) (*Wri var extStorage storage.ExternalStorage if cfg.UseExternalStorage { var err error - extStorage, err = common.InitExternalStorage(ctx, cfg.URI) + extStorage, err = redo.InitExternalStorage(ctx, cfg.URI) if err != nil { return nil, err } @@ -186,7 +187,7 @@ func NewWriter(ctx context.Context, cfg *FileWriterConfig, opts ...Option) (*Wri return nil, cerror.WrapError(cerror.ErrRedoFileOp, errors.New("invalid redo dir path")) } - err := os.MkdirAll(cfg.Dir, common.DefaultDirMode) + err := os.MkdirAll(cfg.Dir, redo.DefaultDirMode) if err != nil { return nil, cerror.WrapError(cerror.ErrRedoFileOp, errors.Annotatef(err, "can't make dir: %s for redo writing", cfg.Dir)) @@ -379,13 +380,13 @@ func (w *Writer) getLogFileName() string { } uid := w.uuidGenerator.NewString() if model.DefaultNamespace == w.cfg.ChangeFeedID.Namespace { - return fmt.Sprintf(common.RedoLogFileFormatV1, + return fmt.Sprintf(redo.RedoLogFileFormatV1, w.cfg.CaptureID, w.cfg.ChangeFeedID.ID, w.cfg.FileType, - w.commitTS.Load(), uid, common.LogEXT) + w.commitTS.Load(), uid, redo.LogEXT) } - return fmt.Sprintf(common.RedoLogFileFormatV2, + return fmt.Sprintf(redo.RedoLogFileFormatV2, w.cfg.CaptureID, w.cfg.ChangeFeedID.Namespace, w.cfg.ChangeFeedID.ID, - w.cfg.FileType, w.commitTS.Load(), uid, common.LogEXT) + w.cfg.FileType, w.commitTS.Load(), uid, redo.LogEXT) } // filePath always creates a new, unique file path, note this function is not @@ -397,11 +398,11 @@ func (w *Writer) filePath() string { } func openTruncFile(name string) (*os.File, error) { - return os.OpenFile(name, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, common.DefaultFileMode) + return os.OpenFile(name, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, redo.DefaultFileMode) } func (w *Writer) openNew() error { - err := os.MkdirAll(w.cfg.Dir, common.DefaultDirMode) + err := os.MkdirAll(w.cfg.Dir, redo.DefaultDirMode) if err != nil { return cerror.WrapError(cerror.ErrRedoFileOp, errors.Annotatef(err, "can't make dir: %s for new redo logfile", w.cfg.Dir)) @@ -412,7 +413,7 @@ func (w *Writer) openNew() error { if w.allocator == nil { w.commitTS.Store(w.eventCommitTS.Load()) w.maxCommitTS.Store(w.eventCommitTS.Load()) - path := w.filePath() + common.TmpEXT + path := w.filePath() + redo.TmpEXT f, err = openTruncFile(path) if err != nil { return cerror.WrapError(cerror.ErrRedoFileOp, @@ -500,11 +501,11 @@ func (w *Writer) GC(checkPointTs uint64) error { // shouldRemoved remove the file which commitTs in file name (max commitTs of all event ts in the file) < checkPointTs, // since all event ts < checkPointTs already sent to sink, the log is not needed any more for recovery func (w *Writer) shouldRemoved(checkPointTs uint64, f os.FileInfo) (bool, error) { - if filepath.Ext(f.Name()) != common.LogEXT { + if filepath.Ext(f.Name()) != redo.LogEXT { return false, nil } - commitTs, fileType, err := common.ParseLogFileName(f.Name()) + commitTs, fileType, err := redo.ParseLogFileName(f.Name()) if err != nil { return false, err } diff --git a/cdc/redo/writer/file_test.go b/cdc/redo/writer/file_test.go index 9ada9c21057..6e5c4b90158 100644 --- a/cdc/redo/writer/file_test.go +++ b/cdc/redo/writer/file_test.go @@ -29,6 +29,7 @@ import ( "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/redo/common" "github.com/pingcap/tiflow/pkg/fsutil" + "github.com/pingcap/tiflow/pkg/redo" "github.com/pingcap/tiflow/pkg/uuid" "github.com/stretchr/testify/require" "github.com/uber-go/atomic" @@ -61,7 +62,7 @@ func TestWriterWrite(t *testing.T) { Dir: dir, ChangeFeedID: cf, CaptureID: "cp", - FileType: common.RedoRowLogFileType, + FileType: redo.RedoRowLogFileType, }, uint64buf: make([]byte, 8), running: *atomic.NewBool(true), @@ -80,13 +81,13 @@ func TestWriterWrite(t *testing.T) { var fileName string // create a .tmp file if w.cfg.ChangeFeedID.Namespace == model.DefaultNamespace { - fileName = fmt.Sprintf(common.RedoLogFileFormatV1, w.cfg.CaptureID, + fileName = fmt.Sprintf(redo.RedoLogFileFormatV1, w.cfg.CaptureID, w.cfg.ChangeFeedID.ID, - w.cfg.FileType, 1, uuidGen.NewString(), common.LogEXT) + common.TmpEXT + w.cfg.FileType, 1, uuidGen.NewString(), redo.LogEXT) + redo.TmpEXT } else { - fileName = fmt.Sprintf(common.RedoLogFileFormatV2, w.cfg.CaptureID, + fileName = fmt.Sprintf(redo.RedoLogFileFormatV2, w.cfg.CaptureID, w.cfg.ChangeFeedID.Namespace, w.cfg.ChangeFeedID.ID, - w.cfg.FileType, 1, uuidGen.NewString(), common.LogEXT) + common.TmpEXT + w.cfg.FileType, 1, uuidGen.NewString(), redo.LogEXT) + redo.TmpEXT } path := filepath.Join(w.cfg.Dir, fileName) info, err := os.Stat(path) @@ -102,13 +103,13 @@ func TestWriterWrite(t *testing.T) { // after rotate, rename to .log if w.cfg.ChangeFeedID.Namespace == model.DefaultNamespace { - fileName = fmt.Sprintf(common.RedoLogFileFormatV1, w.cfg.CaptureID, + fileName = fmt.Sprintf(redo.RedoLogFileFormatV1, w.cfg.CaptureID, w.cfg.ChangeFeedID.ID, - w.cfg.FileType, 1, uuidGen.NewString(), common.LogEXT) + w.cfg.FileType, 1, uuidGen.NewString(), redo.LogEXT) } else { - fileName = fmt.Sprintf(common.RedoLogFileFormatV2, w.cfg.CaptureID, + fileName = fmt.Sprintf(redo.RedoLogFileFormatV2, w.cfg.CaptureID, w.cfg.ChangeFeedID.Namespace, w.cfg.ChangeFeedID.ID, - w.cfg.FileType, 1, uuidGen.NewString(), common.LogEXT) + w.cfg.FileType, 1, uuidGen.NewString(), redo.LogEXT) } path = filepath.Join(w.cfg.Dir, fileName) info, err = os.Stat(path) @@ -116,13 +117,13 @@ func TestWriterWrite(t *testing.T) { require.Equal(t, fileName, info.Name()) // create a .tmp file with first eventCommitTS as name if w.cfg.ChangeFeedID.Namespace == model.DefaultNamespace { - fileName = fmt.Sprintf(common.RedoLogFileFormatV1, w.cfg.CaptureID, + fileName = fmt.Sprintf(redo.RedoLogFileFormatV1, w.cfg.CaptureID, w.cfg.ChangeFeedID.ID, - w.cfg.FileType, 12, uuidGen.NewString(), common.LogEXT) + common.TmpEXT + w.cfg.FileType, 12, uuidGen.NewString(), redo.LogEXT) + redo.TmpEXT } else { - fileName = fmt.Sprintf(common.RedoLogFileFormatV2, w.cfg.CaptureID, + fileName = fmt.Sprintf(redo.RedoLogFileFormatV2, w.cfg.CaptureID, w.cfg.ChangeFeedID.Namespace, w.cfg.ChangeFeedID.ID, - w.cfg.FileType, 12, uuidGen.NewString(), common.LogEXT) + common.TmpEXT + w.cfg.FileType, 12, uuidGen.NewString(), redo.LogEXT) + redo.TmpEXT } path = filepath.Join(w.cfg.Dir, fileName) info, err = os.Stat(path) @@ -133,13 +134,13 @@ func TestWriterWrite(t *testing.T) { require.False(t, w.IsRunning()) // safe close, rename to .log with max eventCommitTS as name if w.cfg.ChangeFeedID.Namespace == model.DefaultNamespace { - fileName = fmt.Sprintf(common.RedoLogFileFormatV1, w.cfg.CaptureID, + fileName = fmt.Sprintf(redo.RedoLogFileFormatV1, w.cfg.CaptureID, w.cfg.ChangeFeedID.ID, - w.cfg.FileType, 22, uuidGen.NewString(), common.LogEXT) + w.cfg.FileType, 22, uuidGen.NewString(), redo.LogEXT) } else { - fileName = fmt.Sprintf(common.RedoLogFileFormatV2, w.cfg.CaptureID, + fileName = fmt.Sprintf(redo.RedoLogFileFormatV2, w.cfg.CaptureID, w.cfg.ChangeFeedID.Namespace, w.cfg.ChangeFeedID.ID, - w.cfg.FileType, 22, uuidGen.NewString(), common.LogEXT) + w.cfg.FileType, 22, uuidGen.NewString(), redo.LogEXT) } path = filepath.Join(w.cfg.Dir, fileName) info, err = os.Stat(path) @@ -152,7 +153,7 @@ func TestWriterWrite(t *testing.T) { Dir: dir, ChangeFeedID: cf11s[idx], CaptureID: "cp", - FileType: common.RedoRowLogFileType, + FileType: redo.RedoRowLogFileType, }, uint64buf: make([]byte, 8), running: *atomic.NewBool(true), @@ -170,13 +171,13 @@ func TestWriterWrite(t *testing.T) { require.Nil(t, err) // create a .tmp file if w1.cfg.ChangeFeedID.Namespace == model.DefaultNamespace { - fileName = fmt.Sprintf(common.RedoLogFileFormatV1, w1.cfg.CaptureID, + fileName = fmt.Sprintf(redo.RedoLogFileFormatV1, w1.cfg.CaptureID, w1.cfg.ChangeFeedID.ID, - w1.cfg.FileType, 1, uuidGen.NewString(), common.LogEXT) + common.TmpEXT + w1.cfg.FileType, 1, uuidGen.NewString(), redo.LogEXT) + redo.TmpEXT } else { - fileName = fmt.Sprintf(common.RedoLogFileFormatV2, w1.cfg.CaptureID, + fileName = fmt.Sprintf(redo.RedoLogFileFormatV2, w1.cfg.CaptureID, w1.cfg.ChangeFeedID.Namespace, w1.cfg.ChangeFeedID.ID, - w1.cfg.FileType, 1, uuidGen.NewString(), common.LogEXT) + common.TmpEXT + w1.cfg.FileType, 1, uuidGen.NewString(), redo.LogEXT) + redo.TmpEXT } path = filepath.Join(w1.cfg.Dir, fileName) info, err = os.Stat(path) @@ -216,7 +217,7 @@ func TestWriterGC(t *testing.T) { ChangeFeedID: model.DefaultChangeFeedID("test"), CaptureID: "cp", MaxLogSize: 10, - FileType: common.RedoRowLogFileType, + FileType: redo.RedoRowLogFileType, UseExternalStorage: true, } @@ -257,10 +258,10 @@ func TestWriterGC(t *testing.T) { require.Nil(t, err) require.Equal(t, 1, len(files), "should have 1 log left after GC") - ts, fileType, err := common.ParseLogFileName(files[0].Name()) + ts, fileType, err := redo.ParseLogFileName(files[0].Name()) require.Nil(t, err, files[0].Name()) require.EqualValues(t, 3, ts) - require.Equal(t, common.RedoRowLogFileType, fileType) + require.Equal(t, redo.RedoRowLogFileType, fileType) time.Sleep(time.Duration(100) * time.Millisecond) w1 := &Writer{ @@ -322,7 +323,7 @@ func TestNewWriter(t *testing.T) { Dir: dir, CaptureID: "cp", ChangeFeedID: changefeed, - FileType: common.RedoDDLLogFileType, + FileType: redo.RedoDDLLogFileType, UseExternalStorage: true, MaxLogSize: defaultMaxLogSize, @@ -378,7 +379,7 @@ func TestRotateFileWithFileAllocator(t *testing.T) { Dir: dir, CaptureID: "cp", ChangeFeedID: changefeed, - FileType: common.RedoRowLogFileType, + FileType: redo.RedoRowLogFileType, UseExternalStorage: true, MaxLogSize: defaultMaxLogSize, @@ -394,7 +395,7 @@ func TestRotateFileWithFileAllocator(t *testing.T) { uuidGenerator: uuidGen, } w.allocator = fsutil.NewFileAllocator( - w.cfg.Dir, common.RedoRowLogFileType, defaultMaxLogSize) + w.cfg.Dir, redo.RedoRowLogFileType, defaultMaxLogSize) w.running.Store(true) _, err = w.Write([]byte("test")) @@ -443,7 +444,7 @@ func TestRotateFileWithoutFileAllocator(t *testing.T) { Dir: dir, CaptureID: "cp", ChangeFeedID: changefeed, - FileType: common.RedoDDLLogFileType, + FileType: redo.RedoDDLLogFileType, UseExternalStorage: true, MaxLogSize: defaultMaxLogSize, diff --git a/cdc/redo/writer/writer.go b/cdc/redo/writer/writer.go index 908e66795be..bafb094a8f2 100644 --- a/cdc/redo/writer/writer.go +++ b/cdc/redo/writer/writer.go @@ -30,6 +30,7 @@ import ( "github.com/pingcap/tiflow/cdc/redo/common" "github.com/pingcap/tiflow/pkg/config" cerror "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/redo" "github.com/prometheus/client_golang/prometheus" "go.uber.org/multierr" "go.uber.org/zap" @@ -66,7 +67,7 @@ type RedoLogWriter interface { func NewRedoLogWriter( ctx context.Context, cfg *config.ConsistentConfig, - fileTypeConfig common.FileTypeConfig, + fileTypeConfig redo.FileTypeConfig, ) (RedoLogWriter, error) { uri, err := storage.ParseRawURL(cfg.Storage) if err != nil { @@ -74,10 +75,10 @@ func NewRedoLogWriter( } scheme := uri.Scheme - if !common.IsValidConsistentStorage(scheme) { + if !redo.IsValidConsistentStorage(scheme) { return nil, cerror.ErrConsistentStorage.GenWithStackByArgs(scheme) } - if common.ConsistentStorage(scheme) == common.ConsistentStorageBlackhole { + if redo.IsBlackholeStorage(scheme) { return NewBlackHoleWriter(), nil } @@ -86,7 +87,7 @@ func NewRedoLogWriter( CaptureID: contextutil.CaptureAddrFromCtx(ctx), ChangeFeedID: contextutil.ChangefeedIDFromCtx(ctx), URI: *uri, - UseExternalStorage: common.IsExternalStorage(scheme), + UseExternalStorage: redo.IsExternalStorage(scheme), MaxLogSize: cfg.MaxLogSize, } @@ -110,7 +111,7 @@ func NewRedoLogWriter( } type logWriterConfig struct { - common.FileTypeConfig + redo.FileTypeConfig CaptureID string ChangeFeedID model.ChangeFeedID @@ -147,7 +148,7 @@ func newLogWriter( if lw.cfg.EmitRowEvents { writerCfg := &FileWriterConfig{ - FileType: common.RedoRowLogFileType, + FileType: redo.RedoRowLogFileType, ChangeFeedID: cfg.ChangeFeedID, CaptureID: cfg.CaptureID, URI: cfg.URI, @@ -162,7 +163,7 @@ func newLogWriter( if lw.cfg.EmitDDLEvents { writerCfg := &FileWriterConfig{ - FileType: common.RedoDDLLogFileType, + FileType: redo.RedoDDLLogFileType, ChangeFeedID: cfg.ChangeFeedID, CaptureID: cfg.CaptureID, URI: cfg.URI, @@ -186,7 +187,7 @@ func newLogWriter( } if cfg.UseExternalStorage { - lw.extStorage, err = common.InitExternalStorage(ctx, cfg.URI) + lw.extStorage, err = redo.InitExternalStorage(ctx, cfg.URI) if err != nil { return nil, err } @@ -519,11 +520,11 @@ func (l *logWriter) isStopped() bool { func (l *logWriter) getMetafileName() string { if model.DefaultNamespace == l.cfg.ChangeFeedID.Namespace { return fmt.Sprintf("%s_%s_%s%s", l.cfg.CaptureID, l.cfg.ChangeFeedID.ID, - common.RedoMetaFileType, common.MetaEXT) + redo.RedoMetaFileType, redo.MetaEXT) } return fmt.Sprintf("%s_%s_%s_%s%s", l.cfg.CaptureID, l.cfg.ChangeFeedID.Namespace, l.cfg.ChangeFeedID.ID, - common.RedoMetaFileType, common.MetaEXT) + redo.RedoMetaFileType, redo.MetaEXT) } func (l *logWriter) maybeUpdateMeta(checkpointTs, resolvedTs uint64) ([]byte, error) { @@ -570,13 +571,13 @@ func (l *logWriter) flushLogMeta(checkpointTs, resolvedTs uint64) error { return nil } - err = os.MkdirAll(l.cfg.Dir, common.DefaultDirMode) + err = os.MkdirAll(l.cfg.Dir, redo.DefaultDirMode) if err != nil { return cerror.WrapError(cerror.ErrRedoFileOp, errors.Annotate(err, "can't make dir for new redo logfile")) } // we will create a temp metadata file and then atomically rename it. - tmpFileName := l.filePath() + common.MetaTmpEXT + tmpFileName := l.filePath() + redo.MetaTmpEXT tmpFile, err := openTruncFile(tmpFileName) if err != nil { return cerror.WrapError(cerror.ErrRedoFileOp, err) diff --git a/cdc/redo/writer/writer_test.go b/cdc/redo/writer/writer_test.go index 19c5d629deb..e8d742d5ece 100644 --- a/cdc/redo/writer/writer_test.go +++ b/cdc/redo/writer/writer_test.go @@ -31,6 +31,7 @@ import ( "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/redo/common" cerror "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/redo" "github.com/pingcap/tiflow/pkg/uuid" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" @@ -120,7 +121,7 @@ func TestLogWriterWriteLog(t *testing.T) { mockWriter.On("IsRunning").Return(tt.isRunning) mockWriter.On("AdvanceTs", mock.Anything) writer := logWriter{ - cfg: &logWriterConfig{FileTypeConfig: common.FileTypeConfig{ + cfg: &logWriterConfig{FileTypeConfig: redo.FileTypeConfig{ EmitMeta: true, EmitRowEvents: true, EmitDDLEvents: true, @@ -223,7 +224,7 @@ func TestLogWriterSendDDL(t *testing.T) { mockWriter.On("IsRunning").Return(tt.isRunning) mockWriter.On("AdvanceTs", mock.Anything) writer := logWriter{ - cfg: &logWriterConfig{FileTypeConfig: common.FileTypeConfig{ + cfg: &logWriterConfig{FileTypeConfig: redo.FileTypeConfig{ EmitRowEvents: true, EmitDDLEvents: true, }}, @@ -320,7 +321,7 @@ func TestLogWriterFlushLog(t *testing.T) { mockWriter.On("Flush", mock.Anything).Return(tt.flushErr) mockWriter.On("IsRunning").Return(tt.isRunning) cfg := &logWriterConfig{ - FileTypeConfig: common.FileTypeConfig{ + FileTypeConfig: redo.FileTypeConfig{ EmitMeta: true, EmitRowEvents: true, EmitDDLEvents: true, @@ -357,7 +358,7 @@ func TestLogWriterFlushLog(t *testing.T) { func TestLogWriterRegress(t *testing.T) { dir := t.TempDir() writer, err := newLogWriter(context.Background(), &logWriterConfig{ - FileTypeConfig: common.FileTypeConfig{ + FileTypeConfig: redo.FileTypeConfig{ EmitMeta: true, EmitRowEvents: true, EmitDDLEvents: true, @@ -382,7 +383,7 @@ func TestNewLogWriter(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() cfg := &logWriterConfig{ - FileTypeConfig: common.FileTypeConfig{ + FileTypeConfig: redo.FileTypeConfig{ EmitMeta: true, EmitRowEvents: true, EmitDDLEvents: true, @@ -410,7 +411,7 @@ func TestNewLogWriter(t *testing.T) { dir := t.TempDir() cfg = &logWriterConfig{ - FileTypeConfig: common.FileTypeConfig{ + FileTypeConfig: redo.FileTypeConfig{ EmitMeta: true, EmitRowEvents: true, EmitDDLEvents: true, @@ -443,15 +444,15 @@ func TestNewLogWriter(t *testing.T) { require.Equal(t, meta.CheckpointTs, l.meta.CheckpointTs) require.Equal(t, meta.ResolvedTs, l.meta.ResolvedTs) - origin := common.InitExternalStorage + origin := redo.InitExternalStorage defer func() { - common.InitExternalStorage = origin + redo.InitExternalStorage = origin }() controller := gomock.NewController(t) mockStorage := mockstorage.NewMockExternalStorage(controller) // skip pre cleanup mockStorage.EXPECT().FileExists(gomock.Any(), gomock.Any()).Return(false, nil) - common.InitExternalStorage = func(ctx context.Context, uri url.URL) (storage.ExternalStorage, error) { + redo.InitExternalStorage = func(ctx context.Context, uri url.URL) (storage.ExternalStorage, error) { return mockStorage, nil } cfg3 := &logWriterConfig{ @@ -566,7 +567,7 @@ func TestDeleteAllLogs(t *testing.T) { mockWriter := &mockFileWriter{} mockWriter.On("Close").Return(tt.closeErr) cfg := &logWriterConfig{ - FileTypeConfig: common.FileTypeConfig{ + FileTypeConfig: redo.FileTypeConfig{ EmitMeta: true, EmitRowEvents: true, EmitDDLEvents: true, diff --git a/errors.toml b/errors.toml index f57a0efd0bc..8596fb9bb97 100755 --- a/errors.toml +++ b/errors.toml @@ -996,11 +996,6 @@ error = ''' s3 storage api ''' -["CDC:ErrS3StorageInitialize"] -error = ''' -new s3 storage for redo log -''' - ["CDC:ErrScanLockFailed"] error = ''' scan lock failed diff --git a/pkg/config/consistent.go b/pkg/config/consistent.go index e8052ac44a6..5f9ec680f10 100644 --- a/pkg/config/consistent.go +++ b/pkg/config/consistent.go @@ -16,7 +16,9 @@ package config import ( "fmt" + "github.com/pingcap/tidb/br/pkg/storage" cerror "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/redo" ) const ( @@ -41,6 +43,10 @@ func (c *ConsistentConfig) ValidateAndAdjust() error { c.FlushIntervalInMs, MinFlushIntervalInMs)) } - // TODO: validate storage - return nil + uri, err := storage.ParseRawURL(c.Storage) + if err != nil { + return cerror.ErrInvalidReplicaConfig.GenWithStackByArgs( + fmt.Sprintf("invalid storage uri: %s", c.Storage)) + } + return redo.ValidateStorage(uri) } diff --git a/pkg/errors/cdc_errors.go b/pkg/errors/cdc_errors.go index 61dcee24df4..2b8aeb6f128 100644 --- a/pkg/errors/cdc_errors.go +++ b/pkg/errors/cdc_errors.go @@ -303,9 +303,9 @@ var ( "s3 storage api", errors.RFCCodeText("CDC:ErrS3StorageAPI"), ) - ErrS3StorageInitialize = errors.Normalize( - "new s3 storage for redo log", - errors.RFCCodeText("CDC:ErrS3StorageInitialize"), + ErrStorageInitialize = errors.Normalize( + "fail to open storage for redo log", + errors.RFCCodeText("CDC:ErrStorageInitialize"), ) ErrCodecInvalidConfig = errors.Normalize( "Codec invalid config", diff --git a/pkg/fsutil/file_allocator.go b/pkg/fsutil/file_allocator.go index afb71041408..acf4f203d95 100644 --- a/pkg/fsutil/file_allocator.go +++ b/pkg/fsutil/file_allocator.go @@ -19,7 +19,7 @@ import ( "path/filepath" "sync" - "github.com/pingcap/tiflow/cdc/redo/common" + "github.com/pingcap/tiflow/pkg/redo" ) // FileAllocator has two functionalities: @@ -84,7 +84,7 @@ func (fl *FileAllocator) alloc() (f *os.File, err error) { } filePath := filepath.Join(fl.dir, fmt.Sprintf("%s_%d.tmp", fl.prefix, fl.count%2)) - f, err = os.OpenFile(filePath, os.O_CREATE|os.O_WRONLY, common.DefaultFileMode) + f, err = os.OpenFile(filePath, os.O_CREATE|os.O_WRONLY, redo.DefaultFileMode) if err != nil { return nil, err } diff --git a/pkg/redo/config.go b/pkg/redo/config.go new file mode 100644 index 00000000000..8016b12f495 --- /dev/null +++ b/pkg/redo/config.go @@ -0,0 +1,260 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package redo + +import ( + "context" + "fmt" + "net/url" + "os" + "path/filepath" + "strings" + "time" + + "github.com/pingcap/tidb/br/pkg/storage" + "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/util" +) + +const ( + // DefaultFileMode is the default mode when operation files + DefaultFileMode = 0o644 + // DefaultDirMode is the default mode when operation dir + DefaultDirMode = 0o755 + + // TmpEXT is the file ext of log file before safely wrote to disk + TmpEXT = ".tmp" + // LogEXT is the file ext of log file after safely wrote to disk + LogEXT = ".log" + // MetaEXT is the meta file ext of meta file after safely wrote to disk + MetaEXT = ".meta" + // MetaTmpEXT is the meta file ext of meta file before safely wrote to disk + MetaTmpEXT = ".mtmp" + // SortLogEXT is the sorted log file ext of log file after safely wrote to disk + SortLogEXT = ".sort" + + // MinSectorSize is minimum sector size used when flushing log so that log can safely + // distinguish between torn writes and ordinary data corruption. + MinSectorSize = 512 +) + +const ( + // RedoMetaFileType is the default file type of meta file + RedoMetaFileType = "meta" + // RedoRowLogFileType is the default file type of row log file + RedoRowLogFileType = "row" + // RedoDDLLogFileType is the default file type of ddl log file + RedoDDLLogFileType = "ddl" +) + +// FileTypeConfig Specifies redo file type config. +type FileTypeConfig struct { + // Whether emitting redo meta or not. + EmitMeta bool + // Whether emitting row events or not. + EmitRowEvents bool + // Whether emitting DDL events or not. + EmitDDLEvents bool +} + +// ConsistentLevelType is the level of redo log consistent level. +type ConsistentLevelType string + +const ( + // ConsistentLevelNone no consistent guarantee. + ConsistentLevelNone ConsistentLevelType = "none" + // ConsistentLevelEventual eventual consistent. + ConsistentLevelEventual ConsistentLevelType = "eventual" +) + +// IsValidConsistentLevel checks whether a given consistent level is valid +func IsValidConsistentLevel(level string) bool { + switch ConsistentLevelType(level) { + case ConsistentLevelNone, ConsistentLevelEventual: + return true + default: + return false + } +} + +// IsConsistentEnabled returns whether the consistent feature is enabled. +func IsConsistentEnabled(level string) bool { + return IsValidConsistentLevel(level) && ConsistentLevelType(level) != ConsistentLevelNone +} + +// ConsistentStorage is the type of consistent storage. +type ConsistentStorage string + +const ( + // consistentStorageBlackhole is a blackhole storage, which will discard all data. + consistentStorageBlackhole ConsistentStorage = "blackhole" + // consistentStorageLocal is a local storage, which will store data in local disk. + consistentStorageLocal ConsistentStorage = "local" + // consistentStorageNFS is a NFS storage, which will store data in NFS. + consistentStorageNFS ConsistentStorage = "nfs" + + // consistentStorageS3 is a S3 storage, which will store data in S3. + consistentStorageS3 ConsistentStorage = "s3" + // consistentStorageGCS is a GCS storage, which will store data in GCS. + consistentStorageGCS ConsistentStorage = "gcs" + // consistentStorageGS is an alias of GCS storage. + consistentStorageGS ConsistentStorage = "gs" + // consistentStorageAzblob is a Azure Blob storage, which will store data in Azure Blob. + consistentStorageAzblob ConsistentStorage = "azblob" + // consistentStorageAzure is an alias of Azure Blob storage. + consistentStorageAzure ConsistentStorage = "azure" + // consistentStorageFile is an external storage based on local files and + // will only be used for testing. + consistentStorageFile ConsistentStorage = "file" + // consistentStorageNoop is a noop storage, which simply discard all data. + consistentStorageNoop ConsistentStorage = "noop" +) + +// IsValidConsistentStorage checks whether a give consistent storage is valid. +func IsValidConsistentStorage(scheme string) bool { + return IsBlackholeStorage(scheme) || + IsLocalStorage(scheme) || + IsExternalStorage(scheme) +} + +// IsExternalStorage returns whether an external storage is used. +func IsExternalStorage(scheme string) bool { + switch ConsistentStorage(scheme) { + case consistentStorageS3, consistentStorageGCS, consistentStorageGS, + consistentStorageAzblob, consistentStorageAzure, consistentStorageFile, + consistentStorageNoop: + return true + default: + return false + } +} + +// IsLocalStorage returns whether a local storage is used. +func IsLocalStorage(scheme string) bool { + switch ConsistentStorage(scheme) { + case consistentStorageLocal, consistentStorageNFS: + return true + default: + return false + } +} + +// IsBlackholeStorage returns whether a blackhole storage is used. +func IsBlackholeStorage(scheme string) bool { + return ConsistentStorage(scheme) == consistentStorageBlackhole +} + +// InitExternalStorage init an external storage. +var InitExternalStorage = func(ctx context.Context, uri url.URL) (storage.ExternalStorage, error) { + if ConsistentStorage(uri.Scheme) == consistentStorageS3 && len(uri.Host) == 0 { + // TODO: this branch is compatible with previous s3 logic and will be removed + // in the future. + return nil, errors.WrapChangefeedUnretryableErr(errors.ErrStorageInitialize, + errors.Errorf("please specify the bucket for %+v", uri)) + } + s, err := util.GetExternalStorage(ctx, uri.String(), nil) + if err != nil { + return nil, errors.WrapChangefeedUnretryableErr(errors.ErrStorageInitialize, err) + } + return s, nil +} + +// ValidateStorage validates the storage used by redo. +func ValidateStorage(uri *url.URL) error { + scheme := uri.Scheme + if !IsValidConsistentStorage(scheme) { + return errors.ErrConsistentStorage.GenWithStackByArgs(scheme) + } + if IsBlackholeStorage(scheme) { + return nil + } + + if IsExternalStorage(scheme) { + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + _, err := InitExternalStorage(ctx, *uri) + return err + } + + err := os.MkdirAll(uri.Path, DefaultDirMode) + if err != nil { + return errors.WrapError(errors.ErrStorageInitialize, errors.Annotate(err, + fmt.Sprintf("can't make dir for new redo log: %+v", uri))) + } + return nil +} + +const ( + // RedoLogFileFormatV1 was used before v6.1.0, which doesn't contain namespace information + // layout: captureID_changefeedID_fileType_maxEventCommitTs_uuid.fileExtName + RedoLogFileFormatV1 = "%s_%s_%s_%d_%s%s" + // RedoLogFileFormatV2 is available since v6.1.0, which contains namespace information + // layout: captureID_namespace_changefeedID_fileType_maxEventCommitTs_uuid.fileExtName + RedoLogFileFormatV2 = "%s_%s_%s_%s_%d_%s%s" +) + +// logFormat2ParseFormat converts redo log file name format to the space separated +// format, which can be read and parsed by sscanf. Besides remove the suffix `%s` +// which is used as file name extension, since we will parse extension first. +func logFormat2ParseFormat(fmtStr string) string { + return strings.TrimSuffix(strings.ReplaceAll(fmtStr, "_", " "), "%s") +} + +// ParseLogFileName extract the commitTs, fileType from log fileName +func ParseLogFileName(name string) (uint64, string, error) { + ext := filepath.Ext(name) + if ext == MetaEXT { + return 0, RedoMetaFileType, nil + } + + // if .sort, the name should be like + // fmt.Sprintf("%s_%s_%s_%d_%s_%d%s", w.cfg.captureID, + // w.cfg.changeFeedID.Namespace,w.cfg.changeFeedID.ID, + // w.cfg.fileType, w.commitTS.Load(), uuid, LogEXT)+SortLogEXT + if ext == SortLogEXT { + name = strings.TrimSuffix(name, SortLogEXT) + ext = filepath.Ext(name) + } + if ext != LogEXT && ext != TmpEXT { + return 0, "", nil + } + + var commitTs uint64 + var captureID, namespace, changefeedID, fileType, uid string + // if the namespace is not default, the log looks like: + // fmt.Sprintf("%s_%s_%s_%s_%d_%s%s", w.cfg.captureID, + // w.cfg.changeFeedID.Namespace,w.cfg.changeFeedID.ID, + // w.cfg.fileType, w.commitTS.Load(), uuid, redo.LogEXT) + // otherwise it looks like: + // fmt.Sprintf("%s_%s_%s_%d_%s%s", w.cfg.captureID, + // w.cfg.changeFeedID.ID, + // w.cfg.fileType, w.commitTS.Load(), uuid, redo.LogEXT) + var ( + vars []any + formatStr string + ) + if len(strings.Split(name, "_")) == 6 { + formatStr = logFormat2ParseFormat(RedoLogFileFormatV2) + vars = []any{&captureID, &namespace, &changefeedID, &fileType, &commitTs, &uid} + } else { + formatStr = logFormat2ParseFormat(RedoLogFileFormatV1) + vars = []any{&captureID, &changefeedID, &fileType, &commitTs, &uid} + } + name = strings.ReplaceAll(name, "_", " ") + _, err := fmt.Sscanf(name, formatStr, vars...) + if err != nil { + return 0, "", errors.Annotatef(err, "bad log name: %s", name) + } + return commitTs, fileType, nil +} diff --git a/pkg/redo/config_test.go b/pkg/redo/config_test.go new file mode 100644 index 00000000000..c908d7e79fa --- /dev/null +++ b/pkg/redo/config_test.go @@ -0,0 +1,189 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package redo + +import ( + "context" + "fmt" + "testing" + + "github.com/google/uuid" + "github.com/pingcap/tidb/br/pkg/storage" + "github.com/stretchr/testify/require" +) + +func TestParseLogFileName(t *testing.T) { + t.Parallel() + + type arg struct { + name string + } + tests := []struct { + name string + args arg + wantTs uint64 + wantFileType string + wantErr string + }{ + { + name: "happy row .log", + args: arg{ + name: fmt.Sprintf(RedoLogFileFormatV1, "cp", + "test", + RedoRowLogFileType, 1, uuid.NewString(), LogEXT), + }, + wantTs: 1, + wantFileType: RedoRowLogFileType, + }, + { + name: "happy row .log", + args: arg{ + name: fmt.Sprintf(RedoLogFileFormatV2, "cp", + "namespace", "test", + RedoRowLogFileType, 1, uuid.NewString(), LogEXT), + }, + wantTs: 1, + wantFileType: RedoRowLogFileType, + }, + { + name: "happy row .tmp", + args: arg{ + name: fmt.Sprintf(RedoLogFileFormatV1, "cp", + "test", + RedoRowLogFileType, 1, uuid.NewString(), LogEXT) + TmpEXT, + }, + wantTs: 1, + wantFileType: RedoRowLogFileType, + }, + { + name: "happy row .tmp", + args: arg{ + name: fmt.Sprintf(RedoLogFileFormatV2, "cp", + "namespace", "test", + RedoRowLogFileType, 1, uuid.NewString(), LogEXT) + TmpEXT, + }, + wantTs: 1, + wantFileType: RedoRowLogFileType, + }, + { + name: "happy ddl .log", + args: arg{ + name: fmt.Sprintf(RedoLogFileFormatV1, "cp", + "test", + RedoDDLLogFileType, 1, uuid.NewString(), LogEXT), + }, + wantTs: 1, + wantFileType: RedoDDLLogFileType, + }, + { + name: "happy ddl .log", + args: arg{ + name: fmt.Sprintf(RedoLogFileFormatV2, "cp", + "namespace", "test", + RedoDDLLogFileType, 1, uuid.NewString(), LogEXT), + }, + wantTs: 1, + wantFileType: RedoDDLLogFileType, + }, + { + name: "happy ddl .sort", + args: arg{ + name: fmt.Sprintf(RedoLogFileFormatV2, "cp", + "default", "test", + RedoDDLLogFileType, 1, uuid.NewString(), LogEXT) + SortLogEXT, + }, + wantTs: 1, + wantFileType: RedoDDLLogFileType, + }, + { + name: "happy ddl .sort", + args: arg{ + name: fmt.Sprintf(RedoLogFileFormatV2, "cp", + "namespace", "test", + RedoDDLLogFileType, 1, uuid.NewString(), LogEXT) + SortLogEXT, + }, + wantTs: 1, + wantFileType: RedoDDLLogFileType, + }, + { + name: "happy ddl .tmp", + args: arg{ + name: fmt.Sprintf(RedoLogFileFormatV1, "cp", + "test", + RedoDDLLogFileType, 1, uuid.NewString(), LogEXT) + TmpEXT, + }, + wantTs: 1, + wantFileType: RedoDDLLogFileType, + }, + { + name: "happy ddl .tmp", + args: arg{ + name: fmt.Sprintf(RedoLogFileFormatV2, "cp", + "namespace", "test", + RedoDDLLogFileType, 1, uuid.NewString(), LogEXT) + TmpEXT, + }, + wantTs: 1, + wantFileType: RedoDDLLogFileType, + }, + { + name: "happy .meta", + args: arg{ + name: "sdfsdfsf" + MetaEXT, + }, + wantTs: 0, + wantFileType: RedoMetaFileType, + }, + { + name: "not supported fileType", + args: arg{ + name: "sdfsdfsf.sfsf", + }, + }, + { + name: "err wrong format ddl .tmp", + args: arg{ + name: fmt.Sprintf("%s_%s_%s_%s_%d%s%s", /* a wrong format */ + "cp", "default", "test", + RedoDDLLogFileType, 1, uuid.NewString(), LogEXT) + TmpEXT, + }, + wantErr: ".*bad log name*.", + }, + } + for _, tt := range tests { + ts, fileType, err := ParseLogFileName(tt.args.name) + if tt.wantErr != "" { + require.Regexp(t, tt.wantErr, err, tt.name) + } else { + require.Nil(t, err, tt.name) + require.EqualValues(t, tt.wantTs, ts, tt.name) + require.Equal(t, tt.wantFileType, fileType, tt.name) + } + } +} + +func TestInitExternalStorage(t *testing.T) { + t.Parallel() + + dir := t.TempDir() + urls := []string{ + fmt.Sprintf("file://%s/test", dir), + } + + for _, urlStr := range urls { + url, err := storage.ParseRawURL(urlStr) + require.NoError(t, err) + _, err = InitExternalStorage(context.Background(), *url) + require.NoError(t, err) + } +} diff --git a/tests/integration_tests/_utils/start_tidb_cluster_impl b/tests/integration_tests/_utils/start_tidb_cluster_impl index 892f6caf8f6..11cbda6af72 100755 --- a/tests/integration_tests/_utils/start_tidb_cluster_impl +++ b/tests/integration_tests/_utils/start_tidb_cluster_impl @@ -363,24 +363,24 @@ max-open-files = 1000 block-cache-size = "1GB" EOF -echo "Starting Upstream TiFlash..." -mkdir -p ${OUT_DIR}/tiflash/ && cp $CUR/tiflash-users.toml ${OUT_DIR}/tiflash/users.toml -tiflash version -tiflash server --config-file "$OUT_DIR/tiflash-config.toml" & - -echo "Verifying Upstream TiFlash is started..." -# Make sure TiFlash is started. -while ! curl -o /dev/null -sf http://127.0.0.1:17000/metrics 1>/dev/null 2>&1; do - i=$((i + 1)) - if [ "$i" -gt 10 ]; then - cat ${OUT_DIR}/tiflash/log/proxy.log - cat ${OUT_DIR}/tiflash/log/server.log - cat ${OUT_DIR}/tiflash/log/error.log - echo 'Failed to start TiFlash' - exit 1 - fi - sleep 2 -done +# echo "Starting Upstream TiFlash..." +# mkdir -p ${OUT_DIR}/tiflash/ && cp $CUR/tiflash-users.toml ${OUT_DIR}/tiflash/users.toml +# tiflash version +# tiflash server --config-file "$OUT_DIR/tiflash-config.toml" & + +# echo "Verifying Upstream TiFlash is started..." +# # Make sure TiFlash is started. +# while ! curl -o /dev/null -sf http://127.0.0.1:17000/metrics 1>/dev/null 2>&1; do +# i=$((i + 1)) +# if [ "$i" -gt 10 ]; then +# cat ${OUT_DIR}/tiflash/log/proxy.log +# cat ${OUT_DIR}/tiflash/log/server.log +# cat ${OUT_DIR}/tiflash/log/error.log +# echo 'Failed to start TiFlash' +# exit 1 +# fi +# sleep 2 +# done echo "Starting CDC state checker..." cd $CUR/../../utils/cdc_state_checker diff --git a/tests/integration_tests/consistent_replicate_nfs/conf/changefeed.toml b/tests/integration_tests/consistent_replicate_nfs/conf/changefeed.toml index a215b9d772b..a2141a9a4bb 100644 --- a/tests/integration_tests/consistent_replicate_nfs/conf/changefeed.toml +++ b/tests/integration_tests/consistent_replicate_nfs/conf/changefeed.toml @@ -1,3 +1,3 @@ [consistent] level = "eventual" -storage = "nfs:///tmp/tidb_cdc_test/consistent_replicate_nfs/nfs/redo" +storage = "gcs:///tmp/tidb_cdc_test/consistent_replicate_nfs/nfs/redo" diff --git a/tests/integration_tests/consistent_replicate_nfs/conf/workload b/tests/integration_tests/consistent_replicate_nfs/conf/workload index 7a3fd48df6f..8cdd808d2ac 100644 --- a/tests/integration_tests/consistent_replicate_nfs/conf/workload +++ b/tests/integration_tests/consistent_replicate_nfs/conf/workload @@ -1,5 +1,5 @@ threadcount=10 -recordcount=5000 +recordcount=500 operationcount=0 workload=core fieldcount=100 diff --git a/tests/integration_tests/consistent_replicate_nfs/run.sh b/tests/integration_tests/consistent_replicate_nfs/run.sh index 39284dc32e8..570edf617fe 100644 --- a/tests/integration_tests/consistent_replicate_nfs/run.sh +++ b/tests/integration_tests/consistent_replicate_nfs/run.sh @@ -31,6 +31,7 @@ function run() { SINK_URI="mysql://normal:123456@127.0.0.1:3306/" changefeed_id=$(cdc cli changefeed create --sink-uri="$SINK_URI" --config="$CUR/conf/changefeed.toml" 2>&1 | tail -n2 | head -n1 | awk '{print $2}') + read a run_sql "CREATE DATABASE consistent_replicate_nfs;" ${UP_TIDB_HOST} ${UP_TIDB_PORT} go-ycsb load mysql -P $CUR/conf/workload -p mysql.host=${UP_TIDB_HOST} -p mysql.port=${UP_TIDB_PORT} -p mysql.user=root -p mysql.db=consistent_replicate_nfs run_sql "CREATE table consistent_replicate_nfs.check1(id int primary key);" ${UP_TIDB_HOST} ${UP_TIDB_PORT}