Skip to content

Commit

Permalink
executor: fix that traffic replay from S3 always report an error (#59813
Browse files Browse the repository at this point in the history
)

close #59811
  • Loading branch information
djshow832 authored Feb 27, 2025
1 parent b61e0e1 commit 67edd7d
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 39 deletions.
1 change: 1 addition & 0 deletions pkg/executor/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,7 @@ go_test(
flaky = True,
shard_count = 50,
deps = [
"//br/pkg/storage",
"//pkg/config",
"//pkg/ddl",
"//pkg/ddl/util",
Expand Down
46 changes: 25 additions & 21 deletions pkg/executor/traffic.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,10 @@ import (

// The keys for the mocked data that stored in context. They are only used for test.
type tiproxyAddrKeyType struct{}
type trafficPathKeyType struct{}
type trafficStoreKeyType struct{}

var tiproxyAddrKey tiproxyAddrKeyType
var trafficPathKey trafficPathKeyType
var trafficStoreKey trafficStoreKeyType

type trafficJob struct {
Instance string `json:"-"` // not passed from TiProxy
Expand Down Expand Up @@ -344,11 +344,11 @@ func formReader4Replay(ctx context.Context, args map[string]string, tiproxyNum i
if !ok || len(input) == 0 {
return nil, errors.New("the input path for replay must be specified")
}
u, err := storage.ParseRawURL(input)
backend, err := storage.ParseBackend(input, nil)
if err != nil {
return nil, errors.Wrapf(err, "parse input path failed")
}
if storage.IsLocal(u) {
if backend.GetLocal() != nil {
readers := make([]io.Reader, tiproxyNum)
form := getForm(args)
for i := 0; i < tiproxyNum; i++ {
Expand All @@ -357,34 +357,38 @@ func formReader4Replay(ctx context.Context, args map[string]string, tiproxyNum i
return readers, nil
}

names := make([]string, 0, tiproxyNum)
if mockNames := ctx.Value(trafficPathKey); mockNames != nil {
names = mockNames.([]string)
var store storage.ExternalStorage
if mockStore := ctx.Value(trafficStoreKey); mockStore != nil {
store = mockStore.(storage.ExternalStorage)
} else {
backend, err := storage.ParseBackendFromURL(u, nil)
if err != nil {
return nil, errors.Wrapf(err, "parse backend from the input path failed")
}
store, err := storage.NewWithDefaultOpt(ctx, backend)
store, err = storage.NewWithDefaultOpt(ctx, backend)
if err != nil {
return nil, errors.Wrapf(err, "create storage for input failed")
}
defer store.Close()
err = store.WalkDir(ctx, &storage.WalkOption{
ObjPrefix: filePrefix,
}, func(name string, _ int64) error {
names = append(names, name)
return nil
})
if err != nil {
return nil, errors.Wrapf(err, "walk input path failed")
}
names := make(map[string]struct{}, tiproxyNum)
err = store.WalkDir(ctx, &storage.WalkOption{
ObjPrefix: filePrefix,
}, func(name string, _ int64) error {
if idx := strings.Index(name, "/"); idx >= 0 {
names[name[:idx]] = struct{}{}
}
return nil
})
if err != nil {
return nil, errors.Wrapf(err, "walk input path failed")
}
if len(names) == 0 {
return nil, errors.New("no replay files found in the input path")
}
readers := make([]io.Reader, 0, len(names))
for _, name := range names {
// ParseBackendFromURL clears URL.RawQuery, so no need to reuse the *url.URL.
u, err := storage.ParseRawURL(input)
if err != nil {
return nil, errors.Wrapf(err, "parse input path failed")
}
for name := range names {
m := maps.Clone(args)
m[inputKey] = u.JoinPath(name).String()
form := getForm(m)
Expand Down
63 changes: 45 additions & 18 deletions pkg/executor/traffic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"sync"
"testing"

"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/pkg/domain/infosync"
"github.com/pingcap/tidb/pkg/executor/internal/exec"
"github.com/pingcap/tidb/pkg/infoschema"
Expand Down Expand Up @@ -190,16 +191,18 @@ func TestCapturePath(t *testing.T) {
ctx := context.TODO()
tempCtx := fillCtxWithTiProxyAddr(ctx, ports)
suite := newTrafficTestSuite(t, 10)
exec := suite.build(ctx, "traffic capture to 's3://bucket/tmp' duration='1s'")
prefix, suffix := "s3://bucket/tmp", "access-key=minioadmin&secret-access-key=minioadmin&endpoint=http://minio:8000&force-path-style=true"
exec := suite.build(ctx, fmt.Sprintf("traffic capture to '%s?%s' duration='1s'", prefix, suffix))
require.NoError(t, exec.Next(tempCtx, nil))

paths := make([]string, 0, tiproxyNum)
expectedPaths := make([]string, 0, tiproxyNum)
for i := 0; i < tiproxyNum; i++ {
httpHandler := handlers[i]
output := httpHandler.getForm().Get("output")
require.True(t, strings.HasPrefix(output, "s3://bucket/tmp/"), output)
paths = append(paths, output[len("s3://bucket/tmp/"):])
require.True(t, strings.HasPrefix(output, prefix), output)
require.True(t, strings.HasSuffix(output, suffix), output)
paths = append(paths, output[len(prefix)+1:len(output)-len(suffix)-1])
expectedPaths = append(expectedPaths, fmt.Sprintf("tiproxy-%d", i))
}
sort.Strings(paths)
Expand Down Expand Up @@ -236,40 +239,47 @@ func TestReplayPath(t *testing.T) {
formPaths: []string{},
},
{
paths: []string{"tiproxy-0"},
paths: []string{"tiproxy-0/meta", "tiproxy-0/traffic-1.log", "tiproxy-0/traffic-2.log"},
formPaths: []string{"tiproxy-0"},
warn: "tiproxy instances number (2) is greater than input paths number (1)",
},
{
paths: []string{"tiproxy-0", "tiproxy-1"},
paths: []string{"tiproxy-0/meta", "tiproxy-1/meta", "tiproxy-2"},
formPaths: []string{"tiproxy-0", "tiproxy-1"},
},
{
paths: []string{"tiproxy-0", "tiproxy-1", "tiproxy-2"},
paths: []string{"tiproxy-0/meta", "tiproxy-0/traffic-1.log", "tiproxy-1/meta", "tiproxy-1/traffic-1.log"},
formPaths: []string{"tiproxy-0", "tiproxy-1"},
},
{
paths: []string{"tiproxy-0/meta", "tiproxy-1/meta", "tiproxy-2/meta"},
formPaths: []string{},
err: "tiproxy instances number (2) is less than input paths number (3)",
},
}
ctx := context.TODO()
store := &mockExternalStorage{}
ctx = fillCtxWithTiProxyAddr(ctx, ports)
ctx = context.WithValue(ctx, trafficStoreKey, store)
prefix, suffix := "s3://bucket/tmp", "access-key=minioadmin&secret-access-key=minioadmin&endpoint=http://minio:8000&force-path-style=true"
for i, test := range tests {
tempCtx := context.WithValue(ctx, trafficPathKey, test.paths)
store.paths = test.paths
suite := newTrafficTestSuite(t, 10)
exec := suite.build(ctx, "traffic replay from 's3://bucket/tmp' user='root'")
exec := suite.build(ctx, fmt.Sprintf("traffic replay from '%s?%s' user='root'", prefix, suffix))
for j := 0; j < tiproxyNum; j++ {
handlers[j].reset()
}
err := exec.Next(tempCtx, nil)
err := exec.Next(ctx, nil)
if test.err != "" {
require.ErrorContains(t, err, test.err)
require.ErrorContains(t, err, test.err, "case %d", i)
} else {
require.NoError(t, err)
require.NoError(t, err, "case %d", i)
warnings := suite.stmtCtx().GetWarnings()
if test.warn != "" {
require.Len(t, warnings, 1)
require.ErrorContains(t, warnings[0].Err, test.warn)
require.Len(t, warnings, 1, "case %d", i)
require.ErrorContains(t, warnings[0].Err, test.warn, "case %d", i)
} else {
require.Len(t, warnings, 0)
require.Len(t, warnings, 0, "case %d", i)
}
}

Expand All @@ -278,14 +288,15 @@ func TestReplayPath(t *testing.T) {
httpHandler := handlers[j]
if httpHandler.getMethod() != "" {
form := httpHandler.getForm()
require.NotEmpty(t, form)
require.NotEmpty(t, form, "case %d", i)
input := form.Get("input")
require.True(t, strings.HasPrefix(input, "s3://bucket/tmp/"), input)
formPaths = append(formPaths, input[len("s3://bucket/tmp/"):])
require.True(t, strings.HasPrefix(input, prefix), input)
require.True(t, strings.HasSuffix(input, suffix), input)
formPaths = append(formPaths, input[len(prefix)+1:len(input)-len(suffix)-1])
}
}
sort.Strings(formPaths)
require.Equal(t, test.formPaths, formPaths, "case %d", i)
require.Equal(t, test.formPaths, formPaths, "case %d", i, "case %d", i)
}
}

Expand Down Expand Up @@ -579,3 +590,19 @@ type mockPrivManager struct {
func (m *mockPrivManager) RequestDynamicVerification(activeRoles []*auth.RoleIdentity, privName string, grantable bool) bool {
return m.Called(activeRoles, privName, grantable).Bool(0)
}

var _ storage.ExternalStorage = (*mockExternalStorage)(nil)

type mockExternalStorage struct {
storage.ExternalStorage
paths []string
}

func (s *mockExternalStorage) WalkDir(ctx context.Context, _ *storage.WalkOption, fn func(string, int64) error) error {
for _, path := range s.paths {
if err := fn(path, 0); err != nil {
return err
}
}
return nil
}

0 comments on commit 67edd7d

Please sign in to comment.