From acef138388bad3b881573dfbc7d8fca92b270f52 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Fri, 28 Feb 2020 15:31:56 +0800 Subject: [PATCH 1/2] misc: Upgrade to go 1.14 Signed-off-by: Xuanwo --- go.mod | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go.mod b/go.mod index edcd12b..c6e1430 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/yunify/qscamel -go 1.13 +go 1.14 require ( cloud.google.com/go v0.53.0 // indirect From e5fcc3789858c9601707f7de95d5b48dc18aeb6c Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Fri, 28 Feb 2020 16:23:57 +0800 Subject: [PATCH 2/2] endpoint: Add dst support for s3 Signed-off-by: Xuanwo --- endpoint/s3/constants.go | 21 ++++++ endpoint/s3/destination.go | 147 +++++++++++++++++++++++++++++++++++++ endpoint/s3/utils.go | 28 +++++++ migrate/migrate.go | 5 ++ 4 files changed, 201 insertions(+) create mode 100644 endpoint/s3/destination.go create mode 100644 endpoint/s3/utils.go diff --git a/endpoint/s3/constants.go b/endpoint/s3/constants.go index 6951576..4d217ea 100644 --- a/endpoint/s3/constants.go +++ b/endpoint/s3/constants.go @@ -5,3 +5,24 @@ const MaxKeys = 1000 // ErrorCodeNotFound is the error code for key not found. const ErrorCodeNotFound = "NoSuchKey" + +// MaxListObjectsLimit is the max limit for list objects. +const MaxListObjectsLimit = 1000 + +// Multipart related constants. +// ref: https://docs.qingcloud.com/qingstor/api/object/multipart/index.html +const ( + // DefaultMultipartBoundarySize is the default multipart size. + // 64 * 1024 * 1024 = 67108864 B = 64 MB + DefaultMultipartSize = 67108864 + // MaxAutoMultipartSize is the max auto multipart size. + // If part size is over MaxAutoMultipartSize, we will not detect it any more. + // 1024 * 1024 * 1024 = 1073741824 B = 1 GB + MaxAutoMultipartSize = 1073741824 + // MaxMultipartNumber is the max part that QingStor supported. + MaxMultipartNumber = 10000 + // MaxMultipartBoundarySize is the max multipart boundary size. + // Over this, put object will be reset by server. + // 5 * 1024 * 1024 * 1024 = 5368709120 B = 5 GB + MaxMultipartBoundarySize = 5368709120 +) diff --git a/endpoint/s3/destination.go b/endpoint/s3/destination.go new file mode 100644 index 0000000..c78e852 --- /dev/null +++ b/endpoint/s3/destination.go @@ -0,0 +1,147 @@ +package s3 + +import ( + "context" + "io" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/s3" + "github.com/sirupsen/logrus" + "github.com/yunify/qscamel/constants" + + "github.com/yunify/qscamel/model" + "github.com/yunify/qscamel/utils" +) + +// Deletable implement destination.Deletable +func (c *Client) Deletable() bool { + return true +} + +// Fetchable implement destination.Fetchable +func (c *Client) Fetchable() bool { + return false +} + +// Writable implement destination.Writable +func (c *Client) Writable() bool { + return true +} + +// Delete implement destination.Delete +func (c *Client) Delete(ctx context.Context, p string) (err error) { + cp := utils.Join(c.Path, p) + + _, err = c.client.DeleteObject(&s3.DeleteObjectInput{ + Bucket: aws.String(c.BucketName), + Key: aws.String(cp), + }) + if err != nil { + return + } + + logrus.Debugf("s3 delete object %s.", cp) + return +} + +// Write implement destination.Write +func (c *Client) Write(ctx context.Context, p string, size int64, r io.Reader) (err error) { + cp := utils.Join(c.Path, p) + + _, err = c.client.PutObject(&s3.PutObjectInput{ + Bucket: aws.String(c.BucketName), + Key: aws.String(cp), + Body: aws.ReadSeekCloser(r), + ContentLength: aws.Int64(size), + }) + + if err != nil { + return + } + + logrus.Debugf("s3 wrote object %s.", cp) + return +} + +// Fetch implement destination.Fetch +func (c *Client) Fetch(ctx context.Context, p, url string) (err error) { + return constants.ErrEndpointFuncNotImplemented +} + +// Partable implement destination.Partable +func (c *Client) Partable() bool { + return true +} + +// InitPart implement destination.InitPart +func (c *Client) InitPart(ctx context.Context, p string, size int64) (uploadID string, partSize int64, partNumbers int, err error) { + cp := utils.Join(c.Path, p) + + resp, err := c.client.CreateMultipartUpload(&s3.CreateMultipartUploadInput{ + Bucket: aws.String(c.BucketName), + Key: aws.String(cp), + }) + + if err != nil { + return + } + + uploadID = *resp.UploadId + partSize, err = calculatePartSize(size) + if err != nil { + logrus.Errorf("Object %s is too large", p) + return + } + + partNumbers = int(size / partSize) + if size%partSize != 0 { + partNumbers++ + } + return +} + +// UploadPart implement destination.UploadPart +func (c *Client) UploadPart(ctx context.Context, o *model.PartialObject, r io.Reader) (err error) { + cp := utils.Join(c.Path, o.Key) + + _, err = c.client.UploadPart(&s3.UploadPartInput{ + Bucket: aws.String(c.BucketName), + Key: aws.String(cp), + UploadId: aws.String(o.UploadID), + ContentLength: aws.Int64(o.Size), + PartNumber: aws.Int64(int64(o.PartNumber)), + Body: aws.ReadSeekCloser(r), + }) + if err != nil { + return + } + + next, err := model.NextPartialObject(ctx, o.Key, o.PartNumber) + if err != nil { + return + } + if next != nil { + logrus.Debugf("s3 wrote partial object %s at %d.", o.Key, o.Offset) + return nil + } + + parts := make([]*s3.CompletedPart, o.TotalNumber) + for i := 0; i < o.TotalNumber; i++ { + parts[i] = &s3.CompletedPart{ + PartNumber: aws.Int64(int64(i)), + } + } + + _, err = c.client.CompleteMultipartUpload(&s3.CompleteMultipartUploadInput{ + Bucket: aws.String(c.BucketName), + Key: aws.String(cp), + UploadId: aws.String(o.UploadID), + MultipartUpload: &s3.CompletedMultipartUpload{ + Parts: parts, + }, + }) + if err != nil { + return err + } + return nil +} diff --git a/endpoint/s3/utils.go b/endpoint/s3/utils.go new file mode 100644 index 0000000..c78c0e1 --- /dev/null +++ b/endpoint/s3/utils.go @@ -0,0 +1,28 @@ +package s3 + +import ( + "github.com/yunify/qscamel/constants" +) + +// calculatePartSize will calculate the object's part size. +func calculatePartSize(size int64) (partSize int64, err error) { + partSize = DefaultMultipartSize + + for size/partSize >= int64(MaxMultipartNumber) { + if partSize < MaxAutoMultipartSize { + partSize = partSize << 1 + continue + } + // Try to adjust partSize if it is too small and account for + // integer division truncation. + partSize = size/int64(MaxMultipartNumber) + 1 + break + } + + if partSize > MaxMultipartBoundarySize { + err = constants.ErrObjectTooLarge + return + } + + return +} diff --git a/migrate/migrate.go b/migrate/migrate.go index 65851e2..0f5ac1a 100644 --- a/migrate/migrate.go +++ b/migrate/migrate.go @@ -151,6 +151,11 @@ func check(ctx context.Context) (err error) { if err != nil { return } + case constants.EndpointS3: + dst, err = s3.New(ctx, constants.DestinationEndpoint, contexts.Client) + if err != nil { + return + } default: logrus.Errorf("Type %s is not supported.", t.Src.Type) err = constants.ErrEndpointNotSupported