Skip to content

Commit

Permalink
pre check consistent config
Browse files Browse the repository at this point in the history
  • Loading branch information
CharlesCheung96 committed Jan 3, 2023
1 parent 2d4cffd commit ea86abc
Show file tree
Hide file tree
Showing 25 changed files with 626 additions and 556 deletions.
4 changes: 2 additions & 2 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@ import (
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/puller"
"github.com/pingcap/tiflow/cdc/redo"
redoCommon "github.com/pingcap/tiflow/cdc/redo/common"
"github.com/pingcap/tiflow/cdc/scheduler"
"github.com/pingcap/tiflow/cdc/sink"
"github.com/pingcap/tiflow/pkg/config"
cdcContext "github.com/pingcap/tiflow/pkg/context"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/orchestrator"
"github.com/pingcap/tiflow/pkg/pdutil"
redoCfg "github.com/pingcap/tiflow/pkg/redo"
"github.com/pingcap/tiflow/pkg/txnutil/gc"
"github.com/pingcap/tiflow/pkg/upstream"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -675,7 +675,7 @@ func (c *changefeed) cleanupRedoManager(ctx context.Context) {
log.Warn("changefeed is removed, but state is not complete", zap.Any("state", c.state))
return
}
if !redoCommon.IsConsistentEnabled(c.state.Info.Config.Consistent.Level) {
if !redoCfg.IsConsistentEnabled(c.state.Info.Config.Consistent.Level) {
return
}
// when removing a paused changefeed, the redo manager is nil, create a new one
Expand Down
136 changes: 0 additions & 136 deletions cdc/redo/common/model.go

This file was deleted.

85 changes: 0 additions & 85 deletions cdc/redo/common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,97 +14,12 @@
package common

import (
"context"
"fmt"
"net/url"
"path/filepath"
"strings"

"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/util"
)

const (
// RedoLogFileFormatV1 was used before v6.1.0, which doesn't contain namespace information
// layout: captureID_changefeedID_fileType_maxEventCommitTs_uuid.fileExtName
RedoLogFileFormatV1 = "%s_%s_%s_%d_%s%s"
// RedoLogFileFormatV2 is available since v6.1.0, which contains namespace information
// layout: captureID_namespace_changefeedID_fileType_maxEventCommitTs_uuid.fileExtName
RedoLogFileFormatV2 = "%s_%s_%s_%s_%d_%s%s"
)

// InitExternalStorage init an external storage.
var InitExternalStorage = func(ctx context.Context, uri url.URL) (storage.ExternalStorage, error) {
if ConsistentStorage(uri.Scheme) == ConsistentStorageS3 && len(uri.Host) == 0 {
// TODO: this branch is compatible with previous s3 logic and will be removed
// in the future.
return nil, errors.WrapChangefeedUnretryableErr(errors.ErrS3StorageInitialize,
errors.Errorf("please specify the bucket for %+v", uri))
}
s, err := util.GetExternalStorage(ctx, uri.String(), nil)
if err != nil {
return nil, errors.WrapChangefeedUnretryableErr(errors.ErrS3StorageInitialize, err)
}
return s, nil
}

// logFormat2ParseFormat converts redo log file name format to the space separated
// format, which can be read and parsed by sscanf. Besides remove the suffix `%s`
// which is used as file name extension, since we will parse extension first.
func logFormat2ParseFormat(fmtStr string) string {
return strings.TrimSuffix(strings.ReplaceAll(fmtStr, "_", " "), "%s")
}

// ParseLogFileName extract the commitTs, fileType from log fileName
func ParseLogFileName(name string) (uint64, string, error) {
ext := filepath.Ext(name)
if ext == MetaEXT {
return 0, RedoMetaFileType, nil
}

// if .sort, the name should be like
// fmt.Sprintf("%s_%s_%s_%d_%s_%d%s", w.cfg.captureID,
// w.cfg.changeFeedID.Namespace,w.cfg.changeFeedID.ID,
// w.cfg.fileType, w.commitTS.Load(), uuid, LogEXT)+SortLogEXT
if ext == SortLogEXT {
name = strings.TrimSuffix(name, SortLogEXT)
ext = filepath.Ext(name)
}
if ext != LogEXT && ext != TmpEXT {
return 0, "", nil
}

var commitTs uint64
var captureID, namespace, changefeedID, fileType, uid string
// if the namespace is not default, the log looks like:
// fmt.Sprintf("%s_%s_%s_%s_%d_%s%s", w.cfg.captureID,
// w.cfg.changeFeedID.Namespace,w.cfg.changeFeedID.ID,
// w.cfg.fileType, w.commitTS.Load(), uuid, redo.LogEXT)
// otherwise it looks like:
// fmt.Sprintf("%s_%s_%s_%d_%s%s", w.cfg.captureID,
// w.cfg.changeFeedID.ID,
// w.cfg.fileType, w.commitTS.Load(), uuid, redo.LogEXT)
var (
vars []any
formatStr string
)
if len(strings.Split(name, "_")) == 6 {
formatStr = logFormat2ParseFormat(RedoLogFileFormatV2)
vars = []any{&captureID, &namespace, &changefeedID, &fileType, &commitTs, &uid}
} else {
formatStr = logFormat2ParseFormat(RedoLogFileFormatV1)
vars = []any{&captureID, &changefeedID, &fileType, &commitTs, &uid}
}
name = strings.ReplaceAll(name, "_", " ")
_, err := fmt.Sscanf(name, formatStr, vars...)
if err != nil {
return 0, "", errors.Annotatef(err, "bad log name: %s", name)
}
return commitTs, fileType, nil
}

// FilterChangefeedFiles return the files that match to the changefeed.
func FilterChangefeedFiles(files []string, changefeedID model.ChangeFeedID) []string {
var (
Expand Down
Loading

0 comments on commit ea86abc

Please sign in to comment.