Skip to content

Commit

Permalink
executor: LOAD DATA INFILE support asterisk matching (#42050)
Browse files Browse the repository at this point in the history
ref #40499
  • Loading branch information
lance6716 authored Mar 13, 2023
1 parent 34086a4 commit fd91259
Show file tree
Hide file tree
Showing 11 changed files with 418 additions and 134 deletions.
6 changes: 3 additions & 3 deletions br/pkg/storage/azblob.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,13 +348,13 @@ func (s *AzureBlobStorage) WalkDir(ctx context.Context, opt *WalkOption, fn func
if opt == nil {
opt = &WalkOption{}
}
if len(opt.ObjPrefix) != 0 {
return errors.New("azure storage not support ObjPrefix for now")
}
prefix := path.Join(s.options.Prefix, opt.SubDir)
if len(prefix) > 0 && !strings.HasSuffix(prefix, "/") {
prefix += "/"
}
if len(opt.ObjPrefix) != 0 {
prefix += opt.ObjPrefix
}

listOption := &azblob.ContainerListBlobFlatSegmentOptions{Prefix: &prefix}
for {
Expand Down
7 changes: 4 additions & 3 deletions br/pkg/storage/gcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,13 +205,14 @@ func (s *GCSStorage) WalkDir(ctx context.Context, opt *WalkOption, fn func(strin
if opt == nil {
opt = &WalkOption{}
}
if len(opt.ObjPrefix) != 0 {
return errors.New("gcs storage not support ObjPrefix for now")
}
prefix := path.Join(s.gcs.Prefix, opt.SubDir)
if len(prefix) > 0 && !strings.HasSuffix(prefix, "/") {
prefix += "/"
}
if len(opt.ObjPrefix) != 0 {
prefix += opt.ObjPrefix
}

query := &storage.Query{Prefix: prefix}
// only need each object's name and size
err := query.SetAttrSelection([]string{"Name", "Size"})
Expand Down
5 changes: 3 additions & 2 deletions br/pkg/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ const (
type WalkOption struct {
// walk on SubDir of specify directory
SubDir string
// ObjPrefix used fo prefix search in storage.
// it can save lots of time when we want find specify prefix objects in storage.
// ObjPrefix used fo prefix search in storage. Note that only part of storage
// support it.
// It can save lots of time when we want find specify prefix objects in storage.
// For example. we have 10000 <Hash>.sst files and 10 backupmeta.(\d+) files.
// we can use ObjPrefix = "backupmeta" to retrieve all meta files quickly.
ObjPrefix string
Expand Down
23 changes: 15 additions & 8 deletions executor/asyncloaddata/show_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,17 @@ func (s *mockGCSSuite) TestInternalStatus() {
s.server.CreateObject(fakestorage.Object{
ObjectAttrs: fakestorage.ObjectAttrs{
BucketName: "test-tsv",
Name: "t.tsv",
Name: "t1.tsv",
},
Content: []byte(`1
2`),
Content: []byte(`1`),
})

s.server.CreateObject(fakestorage.Object{
ObjectAttrs: fakestorage.ObjectAttrs{
BucketName: "test-tsv",
Name: "t2.tsv",
},
Content: []byte(`2`),
})

ctx := context.Background()
Expand Down Expand Up @@ -118,7 +125,7 @@ func (s *mockGCSSuite) TestInternalStatus() {
expected := &JobInfo{
JobID: id,
User: "test-load@test-host",
DataSource: fmt.Sprintf("gs://test-tsv/t.tsv?endpoint=%s", gcsEndpoint),
DataSource: fmt.Sprintf("gs://test-tsv/t*.tsv?endpoint=%s", gcsEndpoint),
TableSchema: "load_tsv",
TableName: "t",
ImportMode: "logical",
Expand All @@ -141,15 +148,15 @@ func (s *mockGCSSuite) TestInternalStatus() {
// tk2 @ 0:08
info, err = GetJobInfo(ctx, tk2.Session(), id)
require.NoError(s.T(), err)
expected.Progress = `{"SourceFileSize":3,"LoadedFileSize":0,"LoadedRowCnt":1}`
expected.Progress = `{"SourceFileSize":2,"LoadedFileSize":0,"LoadedRowCnt":1}`
require.Equal(s.T(), expected, info)
// tk @ 0:09
// commit one task and sleep 3 seconds
time.Sleep(3 * time.Second)
// tk2 @ 0:11
info, err = GetJobInfo(ctx, tk2.Session(), id)
require.NoError(s.T(), err)
expected.Progress = `{"SourceFileSize":3,"LoadedFileSize":2,"LoadedRowCnt":2}`
expected.Progress = `{"SourceFileSize":2,"LoadedFileSize":1,"LoadedRowCnt":2}`
require.Equal(s.T(), expected, info)
// tk @ 0:12
// finish job
Expand All @@ -159,7 +166,7 @@ func (s *mockGCSSuite) TestInternalStatus() {
require.NoError(s.T(), err)
expected.Status = JobFinished
expected.StatusMessage = "Records: 2 Deleted: 0 Skipped: 0 Warnings: 0"
expected.Progress = `{"SourceFileSize":3,"LoadedFileSize":3,"LoadedRowCnt":2}`
expected.Progress = `{"SourceFileSize":2,"LoadedFileSize":2,"LoadedRowCnt":2}`
require.Equal(s.T(), expected, info)
}()

Expand All @@ -183,7 +190,7 @@ func (s *mockGCSSuite) TestInternalStatus() {
s.enableFailpoint("github.com/pingcap/tidb/executor/AfterStartJob", `sleep(3000)`)
s.enableFailpoint("github.com/pingcap/tidb/executor/AfterCommitOneTask", `sleep(3000)`)
s.tk.MustExec("SET SESSION tidb_dml_batch_size = 1;")
sql := fmt.Sprintf(`LOAD DATA INFILE 'gs://test-tsv/t.tsv?endpoint=%s'
sql := fmt.Sprintf(`LOAD DATA INFILE 'gs://test-tsv/t*.tsv?endpoint=%s'
INTO TABLE load_tsv.t;`, gcsEndpoint)
s.tk.MustExec(sql)
wg.Wait()
Expand Down
Loading

0 comments on commit fd91259

Please sign in to comment.