Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#7993
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
CharlesCheung96 authored and ti-chi-bot committed Jan 4, 2023
1 parent 48ecd66 commit 772140d
Show file tree
Hide file tree
Showing 56 changed files with 1,371 additions and 1,360 deletions.
2 changes: 1 addition & 1 deletion cdc/api/v2/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ func GetDefaultReplicaConfig() *ReplicaConfig {
Consistent: &ConsistentConfig{
Level: "none",
MaxLogSize: 64,
FlushIntervalInMs: 1000,
FlushIntervalInMs: config.MinFlushIntervalInMs,
Storage: "",
},
}
Expand Down
3 changes: 2 additions & 1 deletion cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/orchestrator"
"github.com/pingcap/tiflow/pkg/pdutil"
redoCfg "github.com/pingcap/tiflow/pkg/redo"
"github.com/pingcap/tiflow/pkg/txnutil/gc"
"github.com/pingcap/tiflow/pkg/upstream"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -662,7 +663,7 @@ func (c *changefeed) cleanupRedoManager(ctx context.Context) {
log.Warn("changefeed is removed, but state is not complete", zap.Any("state", c.state))
return
}
if !redo.IsConsistentEnabled(c.state.Info.Config.Consistent.Level) {
if !redoCfg.IsConsistentEnabled(c.state.Info.Config.Consistent.Level) {
return
}
// when removing a paused changefeed, the redo manager is nil, create a new one
Expand Down
5 changes: 3 additions & 2 deletions cdc/owner/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -485,8 +485,9 @@ func TestRemoveChangefeed(t *testing.T) {
info := ctx.ChangefeedVars().Info
dir := t.TempDir()
info.Config.Consistent = &config.ConsistentConfig{
Level: "eventual",
Storage: filepath.Join("nfs://", dir),
Level: "eventual",
Storage: filepath.Join("nfs://", dir),
FlushIntervalInMs: config.MinFlushIntervalInMs,
}
ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{
ID: ctx.ChangefeedVars().ID,
Expand Down
34 changes: 0 additions & 34 deletions cdc/redo/applier.go

This file was deleted.

2 changes: 1 addition & 1 deletion cdc/redo/convert.go → cdc/redo/common/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package redo
package common

import (
"bytes"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package redo
package common

import (
"testing"
Expand Down
75 changes: 0 additions & 75 deletions cdc/redo/common/redo.go

This file was deleted.

40 changes: 40 additions & 0 deletions cdc/redo/common/redo_meta.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

//go:generate msgp

package common

import (
"github.com/pingcap/tiflow/cdc/model"
)

// LogMeta is used for store meta info.
type LogMeta struct {
CheckpointTs uint64 `msg:"checkpointTs"`
ResolvedTs uint64 `msg:"resolvedTs"`
}

// ParseMeta parses meta.
func ParseMeta(metas []*LogMeta, checkpointTs, resolvedTs *model.Ts) {
*checkpointTs = 0
*resolvedTs = 0
for _, meta := range metas {
if *checkpointTs < meta.CheckpointTs {
*checkpointTs = meta.CheckpointTs
}
if *resolvedTs < meta.ResolvedTs {
*resolvedTs = meta.ResolvedTs
}
}
}
File renamed without changes.
File renamed without changes.
144 changes: 0 additions & 144 deletions cdc/redo/common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,156 +14,12 @@
package common

import (
"context"
"fmt"
"net/url"
"path/filepath"
"strings"
"time"

"github.com/aws/aws-sdk-go/aws/client"
"github.com/aws/aws-sdk-go/aws/request"
"github.com/pingcap/errors"
backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tiflow/cdc/model"
cerror "github.com/pingcap/tiflow/pkg/errors"
"go.uber.org/zap"
)

const (
// RedoLogFileFormatV1 was used before v6.1.0, which doesn't contain namespace information
// layout: captureID_changefeedID_fileType_maxEventCommitTs_uuid.fileExtName
RedoLogFileFormatV1 = "%s_%s_%s_%d_%s%s"
// RedoLogFileFormatV2 is available since v6.1.0, which contains namespace information
// layout: captureID_namespace_changefeedID_fileType_maxEventCommitTs_uuid.fileExtName
RedoLogFileFormatV2 = "%s_%s_%s_%s_%d_%s%s"
)

// InitS3storage init a storage used for s3,
// s3URI should be like s3URI="s3://logbucket/test-changefeed?endpoint=http://$S3_ENDPOINT/"
var InitS3storage = func(ctx context.Context, uri url.URL) (storage.ExternalStorage, error) {
if len(uri.Host) == 0 {
return nil, cerror.WrapChangefeedUnretryableErr(cerror.ErrS3StorageInitialize, errors.Errorf("please specify the bucket for s3 in %v", uri))
}

prefix := strings.Trim(uri.Path, "/")
s3 := &backuppb.S3{Bucket: uri.Host, Prefix: prefix}
options := &storage.BackendOptions{}
storage.ExtractQueryParameters(&uri, &options.S3)
if err := options.S3.Apply(s3); err != nil {
return nil, cerror.WrapChangefeedUnretryableErr(cerror.ErrS3StorageInitialize, err)
}

// we should set this to true, since br set it by default in parseBackend
s3.ForcePathStyle = true
backend := &backuppb.StorageBackend{
Backend: &backuppb.StorageBackend_S3{S3: s3},
}
s3storage, err := storage.New(ctx, backend, &storage.ExternalStorageOptions{
SendCredentials: false,
HTTPClient: nil,
S3Retryer: DefaultS3Retryer(),
})
if err != nil {
return nil, cerror.WrapChangefeedUnretryableErr(cerror.ErrS3StorageInitialize, err)
}

return s3storage, nil
}

// logFormat2ParseFormat converts redo log file name format to the space separated
// format, which can be read and parsed by sscanf. Besides remove the suffix `%s`
// which is used as file name extension, since we will parse extension first.
func logFormat2ParseFormat(fmtStr string) string {
return strings.TrimSuffix(strings.ReplaceAll(fmtStr, "_", " "), "%s")
}

// ParseLogFileName extract the commitTs, fileType from log fileName
func ParseLogFileName(name string) (uint64, string, error) {
ext := filepath.Ext(name)
if ext == MetaEXT {
return 0, DefaultMetaFileType, nil
}

// if .sort, the name should be like
// fmt.Sprintf("%s_%s_%s_%d_%s_%d%s", w.cfg.captureID,
// w.cfg.changeFeedID.Namespace,w.cfg.changeFeedID.ID,
// w.cfg.fileType, w.commitTS.Load(), uuid, LogEXT)+SortLogEXT
if ext == SortLogEXT {
name = strings.TrimSuffix(name, SortLogEXT)
ext = filepath.Ext(name)
}
if ext != LogEXT && ext != TmpEXT {
return 0, "", nil
}

var commitTs uint64
var captureID, namespace, changefeedID, fileType, uid string
// if the namespace is not default, the log looks like:
// fmt.Sprintf("%s_%s_%s_%s_%d_%s%s", w.cfg.captureID,
// w.cfg.changeFeedID.Namespace,w.cfg.changeFeedID.ID,
// w.cfg.fileType, w.commitTS.Load(), uuid, redo.LogEXT)
// otherwise it looks like:
// fmt.Sprintf("%s_%s_%s_%d_%s%s", w.cfg.captureID,
// w.cfg.changeFeedID.ID,
// w.cfg.fileType, w.commitTS.Load(), uuid, redo.LogEXT)
var (
vars []any
formatStr string
)
if len(strings.Split(name, "_")) == 6 {
formatStr = logFormat2ParseFormat(RedoLogFileFormatV2)
vars = []any{&captureID, &namespace, &changefeedID, &fileType, &commitTs, &uid}
} else {
formatStr = logFormat2ParseFormat(RedoLogFileFormatV1)
vars = []any{&captureID, &changefeedID, &fileType, &commitTs, &uid}
}
name = strings.ReplaceAll(name, "_", " ")
_, err := fmt.Sscanf(name, formatStr, vars...)
if err != nil {
return 0, "", errors.Annotatef(err, "bad log name: %s", name)
}
return commitTs, fileType, nil
}

// retryerWithLog wraps the client.DefaultRetryer, and logs when retrying.
type retryerWithLog struct {
client.DefaultRetryer
}

func isDeadlineExceedError(err error) bool {
return strings.Contains(err.Error(), "context deadline exceeded")
}

func (rl retryerWithLog) ShouldRetry(r *request.Request) bool {
if isDeadlineExceedError(r.Error) {
return false
}
return rl.DefaultRetryer.ShouldRetry(r)
}

func (rl retryerWithLog) RetryRules(r *request.Request) time.Duration {
backoffTime := rl.DefaultRetryer.RetryRules(r)
if backoffTime > 0 {
log.Warn("failed to request s3, retrying", zap.Error(r.Error), zap.Duration("backoff", backoffTime))
}
return backoffTime
}

// DefaultS3Retryer is the default s3 retryer, maybe this function
// should be extracted to another place.
func DefaultS3Retryer() request.Retryer {
return retryerWithLog{
DefaultRetryer: client.DefaultRetryer{
NumMaxRetries: 3,
MinRetryDelay: 1 * time.Second,
MinThrottleDelay: 2 * time.Second,
},
}
}

// FilterChangefeedFiles return the files that match to the changefeed.
func FilterChangefeedFiles(files []string, changefeedID model.ChangeFeedID) []string {
var (
Expand Down
Loading

0 comments on commit 772140d

Please sign in to comment.