Skip to content

Commit

Permalink
redo(ticdc): set default timeout for accessing external storage to 15…
Browse files Browse the repository at this point in the history
… minutes (pingcap#8181)

close pingcap#8089
  • Loading branch information
CharlesCheung96 committed Feb 13, 2023
1 parent 1ce0a34 commit aa5cae9
Show file tree
Hide file tree
Showing 7 changed files with 228 additions and 153 deletions.
16 changes: 8 additions & 8 deletions cdc/redo/writer/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,7 @@ const (
// pageBytes is the alignment for flushing records to the backing Writer.
// It should be a multiple of the minimum sector size so that log can safely
// distinguish between torn writes and ordinary data corruption.
pageBytes = 8 * redo.MinSectorSize
defaultS3Timeout = 15 * time.Second
pageBytes = 8 * redo.MinSectorSize
)

var (
Expand Down Expand Up @@ -298,7 +297,9 @@ func (w *Writer) Close() error {
common.RedoWriteBytesGauge.
DeleteLabelValues(w.cfg.ChangeFeedID.Namespace, w.cfg.ChangeFeedID.ID)

return w.close()
ctx, cancel := context.WithTimeout(context.Background(), redo.CloseTimeout)
defer cancel()
return w.close(ctx)
}

// IsRunning implement IsRunning interface
Expand All @@ -310,7 +311,7 @@ func (w *Writer) isGCRunning() bool {
return w.gcRunning.Load()
}

func (w *Writer) close() error {
func (w *Writer) close(ctx context.Context) error {
if w.file == nil {
return nil
}
Expand Down Expand Up @@ -358,9 +359,6 @@ func (w *Writer) close() error {
// We only write content to S3 before closing the local file.
// By this way, we no longer need renaming object in S3.
if w.cfg.UseExternalStorage {
ctx, cancel := context.WithTimeout(context.Background(), defaultS3Timeout)
defer cancel()

err = w.writeToS3(ctx, w.ongoingFilePath)
if err != nil {
w.file.Close()
Expand Down Expand Up @@ -448,7 +446,9 @@ func (w *Writer) newPageWriter() error {
}

func (w *Writer) rotate() error {
if err := w.close(); err != nil {
ctx, cancel := context.WithTimeout(context.Background(), redo.DefaultTimeout)
defer cancel()
if err := w.close(ctx); err != nil {
return err
}
return w.openNew()
Expand Down
93 changes: 41 additions & 52 deletions cdc/redo/writer/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,15 @@ import (
"path/filepath"
"time"

"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tiflow/cdc/contextutil"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/redo/common"
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/redo"
"github.com/pingcap/tiflow/pkg/util"
"github.com/pingcap/tiflow/pkg/uuid"
"go.uber.org/multierr"
"go.uber.org/zap"
Expand Down Expand Up @@ -77,7 +75,7 @@ func NewRedoLogWriter(

scheme := uri.Scheme
if !redo.IsValidConsistentStorage(scheme) {
return nil, cerror.ErrConsistentStorage.GenWithStackByArgs(scheme)
return nil, errors.ErrConsistentStorage.GenWithStackByArgs(scheme)
}
if redo.IsBlackholeStorage(scheme) {
return NewBlackHoleWriter(), nil
Expand Down Expand Up @@ -142,7 +140,8 @@ func newLogWriter(
ctx context.Context, cfg *logWriterConfig, opts ...Option,
) (lw *logWriter, err error) {
if cfg == nil {
return nil, cerror.WrapError(cerror.ErrRedoConfigInvalid, errors.New("LogWriterConfig can not be nil"))
err := errors.New("LogWriterConfig can not be nil")
return nil, errors.WrapError(errors.ErrRedoConfigInvalid, err)
}

lw = &logWriter{cfg: cfg}
Expand Down Expand Up @@ -215,7 +214,7 @@ func newLogWriter(
func (l *logWriter) preCleanUpS3(ctx context.Context) error {
ret, err := l.extStorage.FileExists(ctx, l.getDeletedChangefeedMarker())
if err != nil {
return cerror.WrapError(cerror.ErrExternalStorageAPI, err)
return errors.WrapError(errors.ErrExternalStorageAPI, err)
}
if !ret {
return nil
Expand All @@ -237,8 +236,8 @@ func (l *logWriter) preCleanUpS3(ctx context.Context) error {
return err
}
err = l.extStorage.DeleteFile(ctx, l.getDeletedChangefeedMarker())
if !isNotExistInS3(err) {
return cerror.WrapError(cerror.ErrExternalStorageAPI, err)
if !util.IsNotExistInExtStorage(err) {
return errors.WrapError(errors.ErrExternalStorageAPI, err)
}

return nil
Expand All @@ -258,12 +257,13 @@ func (l *logWriter) initMeta(ctx context.Context) error {
if os.IsNotExist(err) {
return nil
}
return cerror.WrapError(cerror.ErrRedoMetaInitialize, errors.Annotate(err, "read meta file fail"))
err = errors.Annotate(err, "read meta file fail")
return errors.WrapError(errors.ErrRedoMetaInitialize, err)
}

_, err = l.meta.UnmarshalMsg(data)
if err != nil {
return cerror.WrapError(cerror.ErrRedoMetaInitialize, err)
return errors.WrapError(errors.ErrRedoMetaInitialize, err)
}

return nil
Expand All @@ -290,7 +290,7 @@ func (l *logWriter) WriteLog(ctx context.Context, rows []*model.RedoRowChangedEv
}

if l.isStopped() {
return cerror.ErrRedoWriterStopped.GenWithStackByArgs()
return errors.ErrRedoWriterStopped.GenWithStackByArgs()
}
if len(rows) == 0 {
return nil
Expand All @@ -304,7 +304,7 @@ func (l *logWriter) WriteLog(ctx context.Context, rows []*model.RedoRowChangedEv
rl := &model.RedoLog{RedoRow: r, Type: model.RedoLogTypeRow}
data, err := rl.MarshalMsg(nil)
if err != nil {
return cerror.WrapError(cerror.ErrMarshalFailed, err)
return errors.WrapError(errors.ErrMarshalFailed, err)
}

l.rowWriter.AdvanceTs(r.Row.CommitTs)
Expand All @@ -325,7 +325,7 @@ func (l *logWriter) SendDDL(ctx context.Context, ddl *model.RedoDDLEvent) error
}

if l.isStopped() {
return cerror.ErrRedoWriterStopped.GenWithStackByArgs()
return errors.ErrRedoWriterStopped.GenWithStackByArgs()
}
if ddl == nil || ddl.DDL == nil {
return nil
Expand All @@ -334,7 +334,7 @@ func (l *logWriter) SendDDL(ctx context.Context, ddl *model.RedoDDLEvent) error
rl := &model.RedoLog{RedoDDL: ddl, Type: model.RedoLogTypeDDL}
data, err := rl.MarshalMsg(nil)
if err != nil {
return cerror.WrapError(cerror.ErrMarshalFailed, err)
return errors.WrapError(errors.ErrMarshalFailed, err)
}

l.ddlWriter.AdvanceTs(ddl.DDL.CommitTs)
Expand All @@ -351,10 +351,10 @@ func (l *logWriter) FlushLog(ctx context.Context, checkpointTs, resolvedTs model
}

if l.isStopped() {
return cerror.ErrRedoWriterStopped.GenWithStackByArgs()
return errors.ErrRedoWriterStopped.GenWithStackByArgs()
}

return l.flush(checkpointTs, resolvedTs)
return l.flush(ctx, checkpointTs, resolvedTs)
}

// GetMeta implement GetMeta api
Expand All @@ -377,7 +377,7 @@ func (l *logWriter) DeleteAllLogs(ctx context.Context) (err error) {
log.Warn("read removed log dir fail", zap.Error(err))
return nil
}
return cerror.WrapError(cerror.ErrRedoFileOp,
return errors.WrapError(errors.ErrRedoFileOp,
errors.Annotatef(err, "can't read log file directory: %s", l.cfg.Dir))
}

Expand All @@ -393,7 +393,7 @@ func (l *logWriter) DeleteAllLogs(ctx context.Context) (err error) {
log.Warn("removed log dir fail", zap.Error(err))
return nil
}
return cerror.WrapError(cerror.ErrRedoFileOp, err)
return errors.WrapError(errors.ErrRedoFileOp, err)
}
} else {
for _, file := range filteredFiles {
Expand All @@ -402,7 +402,7 @@ func (l *logWriter) DeleteAllLogs(ctx context.Context) (err error) {
log.Warn("removed log dir fail", zap.Error(err))
return nil
}
return cerror.WrapError(cerror.ErrRedoFileOp, err)
return errors.WrapError(errors.ErrRedoFileOp, err)
}
}
}
Expand Down Expand Up @@ -439,7 +439,7 @@ func (l *logWriter) getDeletedChangefeedMarker() string {
}

func (l *logWriter) writeDeletedMarkerToS3(ctx context.Context) error {
return cerror.WrapError(cerror.ErrExternalStorageAPI,
return errors.WrapError(errors.ErrExternalStorageAPI,
l.extStorage.WriteFile(ctx, l.getDeletedChangefeedMarker(), []byte("D")))
}

Expand All @@ -451,8 +451,8 @@ func (l *logWriter) deleteFilesInS3(ctx context.Context, files []string) error {
err := l.extStorage.DeleteFile(eCtx, name)
if err != nil {
// if fail then retry, may end up with notExit err, ignore the error
if !isNotExistInS3(err) {
return cerror.WrapError(cerror.ErrExternalStorageAPI, err)
if !util.IsNotExistInExtStorage(err) {
return errors.WrapError(errors.ErrExternalStorageAPI, err)
}
}
return nil
Expand All @@ -461,27 +461,14 @@ func (l *logWriter) deleteFilesInS3(ctx context.Context, files []string) error {
return eg.Wait()
}

func isNotExistInS3(err error) bool {
// TODO: support other storage
if err != nil {
if aerr, ok := errors.Cause(err).(awserr.Error); ok { // nolint:errorlint
switch aerr.Code() {
case s3.ErrCodeNoSuchKey:
return true
}
}
}
return false
}

var getAllFilesInS3 = func(ctx context.Context, l *logWriter) ([]string, error) {
files := []string{}
err := l.extStorage.WalkDir(ctx, &storage.WalkOption{}, func(path string, _ int64) error {
files = append(files, path)
return nil
})
if err != nil {
return nil, cerror.WrapError(cerror.ErrExternalStorageAPI, err)
return nil, errors.WrapError(errors.ErrExternalStorageAPI, err)
}

return files, nil
Expand All @@ -502,15 +489,15 @@ func (l *logWriter) Close() (err error) {
}

// flush flushes all the buffered data to the disk.
func (l *logWriter) flush(checkpointTs, resolvedTs model.Ts) (err error) {
func (l *logWriter) flush(ctx context.Context, checkpointTs, resolvedTs model.Ts) (err error) {
if l.cfg.EmitDDLEvents {
err = multierr.Append(err, l.ddlWriter.Flush())
}
if l.cfg.EmitRowEvents {
err = multierr.Append(err, l.rowWriter.Flush())
}
if l.cfg.EmitMeta {
err = multierr.Append(err, l.flushLogMeta(checkpointTs, resolvedTs))
err = multierr.Append(err, l.flushLogMeta(ctx, checkpointTs, resolvedTs))
}
return
}
Expand Down Expand Up @@ -556,12 +543,12 @@ func (l *logWriter) maybeUpdateMeta(checkpointTs, resolvedTs uint64) ([]byte, er

data, err := l.meta.MarshalMsg(nil)
if err != nil {
err = cerror.WrapError(cerror.ErrMarshalFailed, err)
err = errors.WrapError(errors.ErrMarshalFailed, err)
}
return data, err
}

func (l *logWriter) flushLogMeta(checkpointTs, resolvedTs uint64) error {
func (l *logWriter) flushLogMeta(ctx context.Context, checkpointTs, resolvedTs uint64) error {
data, err := l.maybeUpdateMeta(checkpointTs, resolvedTs)
if err != nil {
return err
Expand All @@ -573,34 +560,31 @@ func (l *logWriter) flushLogMeta(checkpointTs, resolvedTs uint64) error {
if !l.cfg.UseExternalStorage {
return l.flushMetaToLocal(data)
}

ctx, cancel := context.WithTimeout(context.Background(), defaultS3Timeout)
defer cancel()
return l.flushMetaToS3(ctx, data)
}

func (l *logWriter) flushMetaToLocal(data []byte) error {
if err := os.MkdirAll(l.cfg.Dir, redo.DefaultDirMode); err != nil {
e := errors.Annotate(err, "can't make dir for new redo logfile")
return cerror.WrapError(cerror.ErrRedoFileOp, e)
return errors.WrapError(errors.ErrRedoFileOp, e)
}

metaFile, err := openTruncFile(l.filePath())
if err != nil {
return cerror.WrapError(cerror.ErrRedoFileOp, err)
return errors.WrapError(errors.ErrRedoFileOp, err)
}
_, err = metaFile.Write(data)
if err != nil {
return cerror.WrapError(cerror.ErrRedoFileOp, err)
return errors.WrapError(errors.ErrRedoFileOp, err)
}
err = metaFile.Sync()
if err != nil {
return cerror.WrapError(cerror.ErrRedoFileOp, err)
return errors.WrapError(errors.ErrRedoFileOp, err)
}

if l.preMetaFile != "" {
if err := os.Remove(l.preMetaFile); err != nil && !os.IsNotExist(err) {
return cerror.WrapError(cerror.ErrRedoFileOp, err)
return errors.WrapError(errors.ErrRedoFileOp, err)
}
}
l.preMetaFile = metaFile.Name()
Expand All @@ -612,12 +596,17 @@ func (l *logWriter) flushMetaToS3(ctx context.Context, data []byte) error {
start := time.Now()
metaFile := l.getMetafileName()
if err := l.extStorage.WriteFile(ctx, metaFile, data); err != nil {
return cerror.WrapError(cerror.ErrExternalStorageAPI, err)
return errors.WrapError(errors.ErrExternalStorageAPI, err)
}

if l.preMetaFile != "" {
if err := l.extStorage.DeleteFile(ctx, l.preMetaFile); err != nil && !isNotExistInS3(err) {
return cerror.WrapError(cerror.ErrExternalStorageAPI, err)
if l.preMetaFile == metaFile {
// This should only happen when use a constant uuid generator in test.
return nil
}
err := l.extStorage.DeleteFile(ctx, l.preMetaFile)
if err != nil && !util.IsNotExistInExtStorage(err) {
return errors.WrapError(errors.ErrExternalStorageAPI, err)
}
}
l.preMetaFile = metaFile
Expand Down
2 changes: 1 addition & 1 deletion engine/pkg/externalresource/internal/s3/storage_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func GetExternalStorageFromURI(
}

// Note that we may have network I/O here.
ret, err := util.GetExternalStorage(ctx, uri, opts)
ret, err := util.GetExternalStorage(ctx, uri, opts, util.DefaultS3Retryer())
if err != nil {
retErr := errors.ErrFailToCreateExternalStorage.Wrap(errors.Trace(err))
return nil, retErr.GenWithStackByArgs("creating ExternalStorage for s3")
Expand Down
Loading

0 comments on commit aa5cae9

Please sign in to comment.