Skip to content

Commit

Permalink
redo (ticdc): only delete changefeed related file when changefeed was…
Browse files Browse the repository at this point in the history
… removed (#7279) (#7305)

close #6413
  • Loading branch information
ti-chi-bot authored Oct 10, 2022
1 parent be54b50 commit 4af5e75
Show file tree
Hide file tree
Showing 8 changed files with 160 additions and 36 deletions.
31 changes: 26 additions & 5 deletions cdc/redo/common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/pingcap/errors"
backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tiflow/cdc/model"
cerror "github.com/pingcap/tiflow/pkg/errors"
)

Expand Down Expand Up @@ -97,13 +98,13 @@ func ParseLogFileName(name string) (uint64, string, error) {
}

var commitTs uint64
var s1, namespace, s2, fileType, uid string
var captureID, namespace, changefeedID, fileType, uid string
// if the namespace is not default, the log looks like:
// fmt.Sprintf("%s_%s_%s_%d_%s_%d%s", w.cfg.captureID,
// fmt.Sprintf("%s_%s_%s_%s_%d_%s%s", w.cfg.captureID,
// w.cfg.changeFeedID.Namespace,w.cfg.changeFeedID.ID,
// w.cfg.fileType, w.commitTS.Load(), uuid, redo.LogEXT)
// otherwise it looks like:
// fmt.Sprintf("%s_%s_%d_%s_%d%s", w.cfg.captureID,
// fmt.Sprintf("%s_%s_%s_%d_%s%s", w.cfg.captureID,
// w.cfg.changeFeedID.ID,
// w.cfg.fileType, w.commitTS.Load(), uuid, redo.LogEXT)
var (
Expand All @@ -112,10 +113,10 @@ func ParseLogFileName(name string) (uint64, string, error) {
)
if len(strings.Split(name, "_")) == 6 {
formatStr = logFormat2ParseFormat(RedoLogFileFormatV2)
vars = []any{&s1, &namespace, &s2, &fileType, &commitTs, &uid}
vars = []any{&captureID, &namespace, &changefeedID, &fileType, &commitTs, &uid}
} else {
formatStr = logFormat2ParseFormat(RedoLogFileFormatV1)
vars = []any{&s1, &s2, &fileType, &commitTs, &uid}
vars = []any{&captureID, &changefeedID, &fileType, &commitTs, &uid}
}
name = strings.ReplaceAll(name, "_", " ")
_, err := fmt.Sscanf(name, formatStr, vars...)
Expand All @@ -124,3 +125,23 @@ func ParseLogFileName(name string) (uint64, string, error) {
}
return commitTs, fileType, nil
}

// FilterChangefeedFiles return the files that match to the changefeed.
func FilterChangefeedFiles(files []string, changefeedID model.ChangeFeedID) []string {
var (
matcher string
res []string
)

if changefeedID.Namespace == "default" {
matcher = fmt.Sprintf("_%s_", changefeedID.ID)
} else {
matcher = fmt.Sprintf("_%s_%s_", changefeedID.Namespace, changefeedID.ID)
}
for _, file := range files {
if strings.Contains(file, matcher) {
res = append(res, file)
}
}
return res
}
37 changes: 37 additions & 0 deletions cdc/redo/common/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"testing"

"github.com/google/uuid"
"github.com/pingcap/tiflow/cdc/model"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -167,3 +168,39 @@ func TestParseLogFileName(t *testing.T) {
}
}
}

func TestGetChangefeedFiles(t *testing.T) {
cases := []struct {
fileNames []string
changefeed model.ChangeFeedID
want []string
}{
{
fileNames: []string{
"captureID_test-2_uuid1.log",
"captureID_test-3_uuid2.log",
"captureID_test-1_uuid3.log",
},
changefeed: model.DefaultChangeFeedID("test-1"),
want: []string{
"captureID_test-1_uuid3.log",
},
},
{
fileNames: []string{
"captureID_n1_test-2_uuid4.log",
"captureID_n2_test-2_uuid5.log",
"captureID_n1_test-1_uuid6.log",
},
changefeed: model.ChangeFeedID{Namespace: "n1", ID: "test-2"},
want: []string{
"captureID_n1_test-2_uuid4.log",
},
},
}

for _, c := range cases {
got := FilterChangefeedFiles(c.fileNames, c.changefeed)
require.Equal(t, c.want, got)
}
}
12 changes: 8 additions & 4 deletions cdc/redo/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
)

var (
// flushIntervalInMs is the minimum value of flush interval
flushIntervalInMs int64 = 2000 // 2 seconds
flushTimeout = time.Second * 20

Expand Down Expand Up @@ -229,7 +230,8 @@ func NewManager(ctx context.Context, cfg *config.ConsistentConfig, opts *Manager
m.writer = writer.NewBlackHoleWriter()
case consistentStorageLocal, consistentStorageNFS, consistentStorageS3:
globalConf := config.GetGlobalServerConfig()
// We use a temporary dir to storage redo logs before flushing to other backends, such as S3
// When an external storage such S3 is used, we use redoDir as a temporary dir to store redo logs
// before we flush them to S3.
var redoDir string
if changeFeedID.Namespace == model.DefaultNamespace {
redoDir = filepath.Join(globalConf.DataDir,
Expand All @@ -239,8 +241,9 @@ func NewManager(ctx context.Context, cfg *config.ConsistentConfig, opts *Manager
config.DefaultRedoDir,
changeFeedID.Namespace, changeFeedID.ID)
}

// When local storage or NFS is used, we use redoDir as the final storage path.
if m.storageType == consistentStorageLocal || m.storageType == consistentStorageNFS {
// When using local or nfs as backend, store redo logs to redoDir directly.
redoDir = uri.Path
}

Expand Down Expand Up @@ -456,6 +459,7 @@ func (m *ManagerImpl) RemoveTable(tableID model.TableID) {
}

// Cleanup removes all redo logs of this manager, it is called when changefeed is removed
// only owner should call this method.
func (m *ManagerImpl) Cleanup(ctx context.Context) error {
common.RedoWriteLogDurationHistogram.
DeleteLabelValues(m.changeFeedID.Namespace, m.changeFeedID.ID)
Expand Down Expand Up @@ -567,7 +571,7 @@ func (m *ManagerImpl) onResolvedTsMsg(tableID model.TableID, resolvedTs model.Ts

func (m *ManagerImpl) bgUpdateLog(ctx context.Context, errCh chan<- error) {
// logErrCh is used to retrieve errors from log flushing goroutines.
// if the channel is full, it's better to block subsequent flushings.
// if the channel is full, it's better to block subsequent flushing goroutines.
logErrCh := make(chan error, 1)
handleErr := func(err error) { logErrCh <- err }

Expand Down Expand Up @@ -635,7 +639,7 @@ func (m *ManagerImpl) bgUpdateLog(ctx context.Context, errCh chan<- error) {
}
}

// NOTE: the goroutine should never exit until the err is put into errCh successfully
// NOTE: the goroutine should never exit until the error is put into errCh successfully
// or the context is canceled.
select {
case <-ctx.Done():
Expand Down
2 changes: 1 addition & 1 deletion cdc/redo/writer/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ func (w *Writer) close() error {
return cerror.WrapError(cerror.ErrRedoFileOp, err)
}
defer dirFile.Close()
// sync the dir so as to guarantee the renamed file is persisted to disk.
// sync the dir to guarantee the renamed file is persisted to disk.
err = dirFile.Sync()
if err != nil {
return cerror.WrapError(cerror.ErrRedoFileOp, err)
Expand Down
55 changes: 44 additions & 11 deletions cdc/redo/writer/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type RedoLogWriter interface {
// Regressions on them will be ignored.
FlushLog(ctx context.Context, checkpointTs, resolvedTs model.Ts) error

// Get the current meta.
// GetMeta gets current meta.
GetMeta() (checkpointTs, resolvedTs model.Ts)

// DeleteAllLogs delete all log files related to the changefeed, called from owner only.
Expand All @@ -59,7 +59,7 @@ type RedoLogWriter interface {
// GC cleans stale files before the given checkpoint.
GC(ctx context.Context, checkpointTs model.Ts) error

// Close is used to closed the writer.
// Close is used to close the writer.
Close() error
}

Expand Down Expand Up @@ -92,7 +92,9 @@ type LogWriter struct {
cfg *LogWriterConfig
rowWriter fileWriter
ddlWriter fileWriter
storage storage.ExternalStorage
// storage in LogWriter is used to write meta and clean up
// the redo log files when changefeed is created or deleted.
storage storage.ExternalStorage

meta *common.LogMeta

Expand Down Expand Up @@ -339,27 +341,58 @@ func (l *LogWriter) DeleteAllLogs(ctx context.Context) (err error) {
return
}

err = os.RemoveAll(l.cfg.Dir)
localFiles, err := os.ReadDir(l.cfg.Dir)
if err != nil {
return cerror.WrapError(cerror.ErrRedoFileOp, err)
if os.IsNotExist(err) {
log.Warn("read removed log dir fail", zap.Error(err))
return nil
}
return cerror.WrapError(cerror.ErrRedoFileOp,
errors.Annotatef(err, "can't read log file directory: %s", l.cfg.Dir))
}

fileNames := make([]string, 0, len(localFiles))
for _, file := range localFiles {
fileNames = append(fileNames, file.Name())
}
filteredFiles := common.FilterChangefeedFiles(fileNames, l.cfg.ChangeFeedID)

if len(filteredFiles) == len(fileNames) {
if err = os.RemoveAll(l.cfg.Dir); err != nil {
if os.IsNotExist(err) {
log.Warn("removed log dir fail", zap.Error(err))
return nil
}
return cerror.WrapError(cerror.ErrRedoFileOp, err)
}
} else {
for _, file := range filteredFiles {
if err = os.RemoveAll(filepath.Join(l.cfg.Dir, file)); err != nil {
if os.IsNotExist(err) {
log.Warn("removed log dir fail", zap.Error(err))
return nil
}
return cerror.WrapError(cerror.ErrRedoFileOp, err)
}
}
}

if !l.cfg.S3Storage {
return
}

var files []string
files, err = getAllFilesInS3(ctx, l)
var remoteFiles []string
remoteFiles, err = getAllFilesInS3(ctx, l)
if err != nil {
return err
}

err = l.deleteFilesInS3(ctx, files)
filteredFiles = common.FilterChangefeedFiles(remoteFiles, l.cfg.ChangeFeedID)
err = l.deleteFilesInS3(ctx, filteredFiles)
if err != nil {
return
}

// Write the delete marker before clean any files.
// Write deleted mark before clean any files.
err = l.writeDeletedMarkerToS3(ctx)
log.Info("redo manager write deleted mark",
zap.String("namespace", l.cfg.ChangeFeedID.Namespace),
Expand Down Expand Up @@ -547,7 +580,7 @@ func (l *LogWriter) flushLogMeta(checkpointTs, resolvedTs uint64) error {
return cerror.WrapError(cerror.ErrRedoFileOp, err)
}
defer dirFile.Close()
// sync the dir so as to guarantee the renamed file is persisted to disk.
// sync the dir to guarantee the renamed file is persisted to disk.
err = dirFile.Sync()
if err != nil {
return cerror.WrapError(cerror.ErrRedoFileOp, err)
Expand Down
Loading

0 comments on commit 4af5e75

Please sign in to comment.