Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: support concurrent write for S3 writer (#45723) #49186

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions br/pkg/mock/storage/storage.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 13 additions & 0 deletions br/pkg/storage/compress.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,21 @@ func WithCompression(inner ExternalStorage, compressionType CompressType, cfg De
}
}

<<<<<<< HEAD
func (w *withCompression) Create(ctx context.Context, name string, o *WriterOption) (ExternalFileWriter, error) {
writer, err := w.ExternalStorage.Create(ctx, name, o)
=======
func (w *withCompression) Create(ctx context.Context, name string, _ *WriterOption) (ExternalFileWriter, error) {
var (
writer ExternalFileWriter
err error
)
if s3Storage, ok := w.ExternalStorage.(*S3Storage); ok {
writer, err = s3Storage.CreateUploader(ctx, name)
} else {
writer, err = w.ExternalStorage.Create(ctx, name, nil)
}
>>>>>>> 5309c2ff775 (*: support concurrent write for S3 writer (#45723))
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
4 changes: 4 additions & 0 deletions br/pkg/storage/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,13 +209,17 @@ func (f *localFile) GetFileSize() (int64, error) {

// Create implements ExternalStorage interface.
func (l *LocalStorage) Create(_ context.Context, name string, _ *WriterOption) (ExternalFileWriter, error) {
<<<<<<< HEAD
filename := filepath.Join(l.base, name)
dir := filepath.Dir(filename)
err := os.MkdirAll(dir, 0750)
if err != nil {
return nil, errors.Trace(err)
}
file, err := os.Create(filename)
=======
file, err := os.Create(filepath.Join(l.base, name))
>>>>>>> 5309c2ff775 (*: support concurrent write for S3 writer (#45723))
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
40 changes: 40 additions & 0 deletions br/pkg/storage/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -1015,6 +1015,46 @@ func (s *s3ObjectWriter) Close(_ context.Context) error {
err := s.wd.Close()
if err != nil {
return err
<<<<<<< HEAD
=======
}
s.wg.Wait()
return s.err
}

// Create creates multi upload request.
func (rs *S3Storage) Create(ctx context.Context, name string, option *WriterOption) (ExternalFileWriter, error) {
var uploader ExternalFileWriter
var err error
if option == nil || option.Concurrency <= 1 {
uploader, err = rs.CreateUploader(ctx, name)
if err != nil {
return nil, err
}
} else {
up := s3manager.NewUploaderWithClient(rs.svc, func(u *s3manager.Uploader) {
u.Concurrency = option.Concurrency
u.BufferProvider = s3manager.NewBufferedReadSeekerWriteToPool(option.Concurrency * 8 * 1024 * 1024)
})
rd, wd := io.Pipe()
upParams := &s3manager.UploadInput{
Bucket: aws.String(rs.options.Bucket),
Key: aws.String(rs.options.Prefix + name),
Body: rd,
}
s3Writer := &s3ObjectWriter{wd: wd, wg: &sync.WaitGroup{}}
s3Writer.wg.Add(1)
go func() {
_, err := up.UploadWithContext(ctx, upParams)
err1 := rd.Close()
if err != nil {
log.Warn("upload to s3 failed", zap.String("filename", name), zap.Error(err), zap.Error(err1))
}
s3Writer.err = err
s3Writer.wg.Done()
}()
uploader = s3Writer
>>>>>>> 5309c2ff775 (*: support concurrent write for S3 writer (#45723))
}
s.wg.Wait()
return s.err
Expand Down
3 changes: 3 additions & 0 deletions br/pkg/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,16 @@ type WriterOption struct {
Concurrency int
}

<<<<<<< HEAD
type ReaderOption struct {
// StartOffset is inclusive. And it's incompatible with Seek.
StartOffset *int64
// EndOffset is exclusive. And it's incompatible with Seek.
EndOffset *int64
}

=======
>>>>>>> 5309c2ff775 (*: support concurrent write for S3 writer (#45723))
// ExternalStorage represents a kind of file system storage.
type ExternalStorage interface {
// WriteFile writes a complete file to storage, similar to os.WriteFile, but WriteFile should be atomic
Expand Down
8 changes: 8 additions & 0 deletions dumpling/export/writer_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,11 @@ func writeBytes(tctx *tcontext.Context, writer storage.ExternalFileWriter, p []b
func buildFileWriter(tctx *tcontext.Context, s storage.ExternalStorage, fileName string, compressType storage.CompressType) (storage.ExternalFileWriter, func(ctx context.Context) error, error) {
fileName += compressFileSuffix(compressType)
fullPath := s.URI() + "/" + fileName
<<<<<<< HEAD
writer, err := storage.WithCompression(s, compressType, storage.DecompressConfig{}).Create(tctx, fileName, nil)
=======
writer, err := storage.WithCompression(s, compressType).Create(tctx, fileName, nil)
>>>>>>> 5309c2ff775 (*: support concurrent write for S3 writer (#45723))
if err != nil {
tctx.L().Warn("fail to open file",
zap.String("path", fullPath),
Expand Down Expand Up @@ -486,7 +490,11 @@ func buildInterceptFileWriter(pCtx *tcontext.Context, s storage.ExternalStorage,
initRoutine := func() error {
// use separated context pCtx here to make sure context used in ExternalFile won't be canceled before close,
// which will cause a context canceled error when closing gcs's Writer
<<<<<<< HEAD
w, err := storage.WithCompression(s, compressType, storage.DecompressConfig{}).Create(pCtx, fileName, nil)
=======
w, err := storage.WithCompression(s, compressType).Create(pCtx, fileName, nil)
>>>>>>> 5309c2ff775 (*: support concurrent write for S3 writer (#45723))
if err != nil {
pCtx.L().Warn("fail to open file",
zap.String("path", fullPath),
Expand Down