Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

endpoint: Add dst support for s3 #89

Merged
merged 2 commits into from
Feb 29, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions endpoint/s3/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,24 @@ const MaxKeys = 1000

// ErrorCodeNotFound is the error code for key not found.
const ErrorCodeNotFound = "NoSuchKey"

// MaxListObjectsLimit is the max limit for list objects.
const MaxListObjectsLimit = 1000

// Multipart related constants.
// ref: https://docs.qingcloud.com/qingstor/api/object/multipart/index.html
const (
// DefaultMultipartBoundarySize is the default multipart size.
// 64 * 1024 * 1024 = 67108864 B = 64 MB
DefaultMultipartSize = 67108864
// MaxAutoMultipartSize is the max auto multipart size.
// If part size is over MaxAutoMultipartSize, we will not detect it any more.
// 1024 * 1024 * 1024 = 1073741824 B = 1 GB
MaxAutoMultipartSize = 1073741824
// MaxMultipartNumber is the max part that QingStor supported.
MaxMultipartNumber = 10000
// MaxMultipartBoundarySize is the max multipart boundary size.
// Over this, put object will be reset by server.
// 5 * 1024 * 1024 * 1024 = 5368709120 B = 5 GB
MaxMultipartBoundarySize = 5368709120
)
147 changes: 147 additions & 0 deletions endpoint/s3/destination.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
package s3

import (
"context"
"io"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/sirupsen/logrus"
"github.com/yunify/qscamel/constants"

"github.com/yunify/qscamel/model"
"github.com/yunify/qscamel/utils"
)

// Deletable implement destination.Deletable
func (c *Client) Deletable() bool {
return true
}

// Fetchable implement destination.Fetchable
func (c *Client) Fetchable() bool {
return false
}

// Writable implement destination.Writable
func (c *Client) Writable() bool {
return true
}

// Delete implement destination.Delete
func (c *Client) Delete(ctx context.Context, p string) (err error) {
cp := utils.Join(c.Path, p)

_, err = c.client.DeleteObject(&s3.DeleteObjectInput{
Bucket: aws.String(c.BucketName),
Key: aws.String(cp),
})
if err != nil {
return
}

logrus.Debugf("s3 delete object %s.", cp)
return
}

// Write implement destination.Write
func (c *Client) Write(ctx context.Context, p string, size int64, r io.Reader) (err error) {
cp := utils.Join(c.Path, p)

_, err = c.client.PutObject(&s3.PutObjectInput{
Bucket: aws.String(c.BucketName),
Key: aws.String(cp),
Body: aws.ReadSeekCloser(r),
ContentLength: aws.Int64(size),
})

if err != nil {
return
}

logrus.Debugf("s3 wrote object %s.", cp)
return
}

// Fetch implement destination.Fetch
func (c *Client) Fetch(ctx context.Context, p, url string) (err error) {
return constants.ErrEndpointFuncNotImplemented
}

// Partable implement destination.Partable
func (c *Client) Partable() bool {
return true
}

// InitPart implement destination.InitPart
func (c *Client) InitPart(ctx context.Context, p string, size int64) (uploadID string, partSize int64, partNumbers int, err error) {
cp := utils.Join(c.Path, p)

resp, err := c.client.CreateMultipartUpload(&s3.CreateMultipartUploadInput{
Bucket: aws.String(c.BucketName),
Key: aws.String(cp),
})

if err != nil {
return
}

uploadID = *resp.UploadId
partSize, err = calculatePartSize(size)
if err != nil {
logrus.Errorf("Object %s is too large", p)
return
}

partNumbers = int(size / partSize)
if size%partSize != 0 {
partNumbers++
}
return
}

// UploadPart implement destination.UploadPart
func (c *Client) UploadPart(ctx context.Context, o *model.PartialObject, r io.Reader) (err error) {
cp := utils.Join(c.Path, o.Key)

_, err = c.client.UploadPart(&s3.UploadPartInput{
Bucket: aws.String(c.BucketName),
Key: aws.String(cp),
UploadId: aws.String(o.UploadID),
ContentLength: aws.Int64(o.Size),
PartNumber: aws.Int64(int64(o.PartNumber)),
Body: aws.ReadSeekCloser(r),
})
if err != nil {
return
}

next, err := model.NextPartialObject(ctx, o.Key, o.PartNumber)
if err != nil {
return
}
if next != nil {
logrus.Debugf("s3 wrote partial object %s at %d.", o.Key, o.Offset)
return nil
}

parts := make([]*s3.CompletedPart, o.TotalNumber)
for i := 0; i < o.TotalNumber; i++ {
parts[i] = &s3.CompletedPart{
PartNumber: aws.Int64(int64(i)),
}
}

_, err = c.client.CompleteMultipartUpload(&s3.CompleteMultipartUploadInput{
Bucket: aws.String(c.BucketName),
Key: aws.String(cp),
UploadId: aws.String(o.UploadID),
MultipartUpload: &s3.CompletedMultipartUpload{
Parts: parts,
},
})
if err != nil {
return err
}
return nil
}
28 changes: 28 additions & 0 deletions endpoint/s3/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package s3

import (
"github.com/yunify/qscamel/constants"
)

// calculatePartSize will calculate the object's part size.
func calculatePartSize(size int64) (partSize int64, err error) {
partSize = DefaultMultipartSize

for size/partSize >= int64(MaxMultipartNumber) {
if partSize < MaxAutoMultipartSize {
partSize = partSize << 1
continue
}
// Try to adjust partSize if it is too small and account for
// integer division truncation.
partSize = size/int64(MaxMultipartNumber) + 1
break
}

if partSize > MaxMultipartBoundarySize {
err = constants.ErrObjectTooLarge
return
}

return
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/yunify/qscamel

go 1.13
go 1.14

require (
cloud.google.com/go v0.53.0 // indirect
Expand Down
5 changes: 5 additions & 0 deletions migrate/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,11 @@ func check(ctx context.Context) (err error) {
if err != nil {
return
}
case constants.EndpointS3:
dst, err = s3.New(ctx, constants.DestinationEndpoint, contexts.Client)
if err != nil {
return
}
default:
logrus.Errorf("Type %s is not supported.", t.Src.Type)
err = constants.ErrEndpointNotSupported
Expand Down