From 5d88adc83aec1d6ae37f0287350b5001b07d2bac Mon Sep 17 00:00:00 2001 From: nabbar Date: Mon, 22 May 2023 16:23:23 +0200 Subject: [PATCH] Package AWS: - rework MultipartUpload process & helper - update test to use lib size - update object multipart to use new helper Package IO Utils : - add truncate & sync to FileProgress --- .golangci.yml | 18 +-- aws/aws_suite_test.go | 5 +- aws/bucket_test.go | 3 +- aws/helper/partSize.go | 121 ------------------ aws/multipart/errors.go | 36 ++++++ aws/multipart/interface.go | 67 ++++++++++ aws/multipart/io.go | 34 +++++ aws/multipart/model.go | 252 +++++++++++++++++++++++++++++++++++++ aws/multipart/part.go | 237 ++++++++++++++++++++++++++++++++++ aws/multipart/start.go | 80 ++++++++++++ aws/multipart/stop.go | 105 ++++++++++++++++ aws/object/interface.go | 3 + aws/object/multipart.go | 158 +++++------------------ aws/object_test.go | 4 +- ioutils/error.go | 6 + ioutils/fileProgess.go | 28 +++++ 16 files changed, 896 insertions(+), 261 deletions(-) delete mode 100644 aws/helper/partSize.go create mode 100644 aws/multipart/errors.go create mode 100644 aws/multipart/interface.go create mode 100644 aws/multipart/io.go create mode 100644 aws/multipart/model.go create mode 100644 aws/multipart/part.go create mode 100644 aws/multipart/start.go create mode 100644 aws/multipart/stop.go diff --git a/.golangci.yml b/.golangci.yml index 27d7942e..0e786ed6 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -82,7 +82,7 @@ linters: disable: # - bodyclose # - contextcheck -# - cyclop + - cyclop - deadcode # - errname # - errorlint @@ -90,12 +90,12 @@ linters: - exhaustivestruct - exhaustruct # - forbidigo -# - funlen + - funlen # - gci # - gochecknoglobals # - gochecknoinits # - gocognit -# - gocritic + - gocritic # - gocyclo # - godot # - godox @@ -106,16 +106,16 @@ linters: - ifshort # - interfacebloat - interfacer -# - ireturn + - ireturn # - lll - maligned # - nakedret -# - nestif + - nestif # - nilerr # - nlreturn # - noctx -# - nolintlint -# - nonamedreturns + - nolintlint + - nonamedreturns - nosnakecase # - revive # - rowserrcheck @@ -129,5 +129,5 @@ linters: - varnamelen # - wastedassign # - whitespace -# - wrapcheck -# - wsl + - wrapcheck + - wsl diff --git a/aws/aws_suite_test.go b/aws/aws_suite_test.go index e4fb575e..b4174e79 100644 --- a/aws/aws_suite_test.go +++ b/aws/aws_suite_test.go @@ -31,6 +31,7 @@ import ( "crypto/rand" "errors" "fmt" + libsiz "github.com/nabbar/golib/size" "io/ioutil" "net" "net/http" @@ -240,8 +241,8 @@ func WaitMinio(host string) bool { return err == nil } -func randContent(size int) *bytes.Buffer { - p := make([]byte, size) +func randContent(size libsiz.Size) *bytes.Buffer { + p := make([]byte, size.Int64()) _, err := rand.Read(p) diff --git a/aws/bucket_test.go b/aws/bucket_test.go index 8ae8ea79..9bf3ee4d 100644 --- a/aws/bucket_test.go +++ b/aws/bucket_test.go @@ -26,6 +26,7 @@ package aws_test import ( + libsiz "github.com/nabbar/golib/size" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" ) @@ -52,7 +53,7 @@ var _ = Describe("Bucket", func() { It("Must succeed", func() { var ( err error - rnd = randContent(64 * 1024) + rnd = randContent(10 * libsiz.SizeMega) ) err = cli.Object().MultipartPut("object", rnd) diff --git a/aws/helper/partSize.go b/aws/helper/partSize.go deleted file mode 100644 index d5ead004..00000000 --- a/aws/helper/partSize.go +++ /dev/null @@ -1,121 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2020 Nicolas JUHEL - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - * - */ - -package helper - -import ( - "errors" - "io" - "strings" - - libsiz "github.com/nabbar/golib/size" - - sdkaws "github.com/aws/aws-sdk-go-v2/aws" - sdktps "github.com/aws/aws-sdk-go-v2/service/s3/types" -) - -type ReaderPartSize interface { - io.Reader - NextPart(eTag *string) - CurrPart() int32 - CompPart() *sdktps.CompletedMultipartUpload - IeOEF() bool -} - -func NewReaderPartSize(rd io.Reader, p libsiz.Size) ReaderPartSize { - return &readerPartSize{ - b: rd, - p: p.Int64(), - i: 1, - j: 0, - e: false, - c: nil, - } -} - -type readerPartSize struct { - // buffer - b io.Reader - // partsize - p int64 - // partNumber - i int64 - // current part counter - j int64 - // Is EOF - e bool - // complete part slice - c *sdktps.CompletedMultipartUpload -} - -func (r *readerPartSize) NextPart(eTag *string) { - if r.c == nil { - r.c = &sdktps.CompletedMultipartUpload{ - Parts: nil, - } - } - - if r.c.Parts == nil { - r.c.Parts = make([]sdktps.CompletedPart, 0) - } - - r.c.Parts = append(r.c.Parts, sdktps.CompletedPart{ - ETag: sdkaws.String(strings.Replace(*eTag, "\"", "", -1)), - PartNumber: int32(r.i), - }) - - r.i++ - r.j = 0 -} - -func (r readerPartSize) CurrPart() int32 { - return int32(r.i) -} - -func (r readerPartSize) CompPart() *sdktps.CompletedMultipartUpload { - return r.c -} - -func (r readerPartSize) IeOEF() bool { - return r.e -} - -func (r *readerPartSize) Read(p []byte) (n int, err error) { - if r.e || r.j >= r.p { - return 0, io.EOF - } - - if len(p) > int(r.p-r.j) { - p = make([]byte, int(r.p-r.j)) - } - - n, e := r.b.Read(p) - - if errors.Is(e, io.EOF) { - r.e = true - } - - return n, e -} diff --git a/aws/multipart/errors.go b/aws/multipart/errors.go new file mode 100644 index 00000000..30fb2c26 --- /dev/null +++ b/aws/multipart/errors.go @@ -0,0 +1,36 @@ +/* + * MIT License + * + * Copyright (c) 2020 Nicolas JUHEL + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * + */ + +package multipart + +import "fmt" + +var ( + ErrInvalidInstance = fmt.Errorf("invalid instance") + ErrInvalidClient = fmt.Errorf("invalid aws S3 client") + ErrInvalidResponse = fmt.Errorf("invalid aws S3 response") + ErrInvalidUploadID = fmt.Errorf("invalid aws s3 MPU Upload ID") + ErrInvalidTMPFile = fmt.Errorf("invalid working or temporary file") +) diff --git a/aws/multipart/interface.go b/aws/multipart/interface.go new file mode 100644 index 00000000..2464d5a7 --- /dev/null +++ b/aws/multipart/interface.go @@ -0,0 +1,67 @@ +/* + * MIT License + * + * Copyright (c) 2020 Nicolas JUHEL + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * + */ + +package multipart + +import ( + "io" + "sync" + + sdksss "github.com/aws/aws-sdk-go-v2/service/s3" + libctx "github.com/nabbar/golib/context" + libsiz "github.com/nabbar/golib/size" +) + +const DefaultPartSize = 5 * libsiz.SizeMega + +type FuncClientS3 func() *sdksss.Client + +type MultiPart interface { + io.WriteCloser + + RegisterContext(fct libctx.FuncContext) + RegisterClientS3(fct FuncClientS3) + RegisterMultipartID(id string) + RegisterWorkingFile(file string) error + + StartMPU() error + StopMPU(abort bool) error + + AddPart(r io.Reader) (n int64, e error) + AddToPart(p []byte) (n int, e error) + RegisterPart(etag string) +} + +func New(partSize libsiz.Size, object string, bucket string) MultiPart { + return &mpu{ + m: sync.RWMutex{}, + c: nil, + s: partSize, + i: "", + o: object, + b: bucket, + n: 0, + } +} diff --git a/aws/multipart/io.go b/aws/multipart/io.go new file mode 100644 index 00000000..3ce4b859 --- /dev/null +++ b/aws/multipart/io.go @@ -0,0 +1,34 @@ +/* + * MIT License + * + * Copyright (c) 2020 Nicolas JUHEL + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * + */ + +package multipart + +func (m *mpu) Write(p []byte) (n int, err error) { + return m.AddToPart(p) +} + +func (m *mpu) Close() error { + return m.StopMPU(false) +} diff --git a/aws/multipart/model.go b/aws/multipart/model.go new file mode 100644 index 00000000..340b7711 --- /dev/null +++ b/aws/multipart/model.go @@ -0,0 +1,252 @@ +/* + * MIT License + * + * Copyright (c) 2020 Nicolas JUHEL + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * + */ + +package multipart + +import ( + "context" + "path/filepath" + "sync" + + sdksss "github.com/aws/aws-sdk-go-v2/service/s3" + sdktyp "github.com/aws/aws-sdk-go-v2/service/s3/types" + libctx "github.com/nabbar/golib/context" + libiot "github.com/nabbar/golib/ioutils" + libsiz "github.com/nabbar/golib/size" +) + +type mpu struct { + m sync.RWMutex + x libctx.FuncContext + c FuncClientS3 + s libsiz.Size + i string + b string + o string + n int32 + l []sdktyp.CompletedPart + w libiot.FileProgress +} + +func (m *mpu) RegisterContext(fct libctx.FuncContext) { + if m == nil { + return + } + + m.m.Lock() + defer m.m.Unlock() + + m.x = fct +} + +func (m *mpu) getContext() context.Context { + if m == nil { + return context.Background() + } + + m.m.RLock() + defer m.m.RUnlock() + + if m.x == nil { + return context.Background() + } else if x := m.x(); x == nil { + return context.Background() + } else { + return x + } +} + +func (m *mpu) RegisterClientS3(fct FuncClientS3) { + if m == nil { + return + } + + m.m.Lock() + defer m.m.Unlock() + + m.c = fct +} + +func (m *mpu) getClient() *sdksss.Client { + if m == nil { + return nil + } + + m.m.RLock() + defer m.m.RUnlock() + + if m.c == nil { + return nil + } else if c := m.c(); c == nil { + return nil + } else { + return c + } +} + +func (m *mpu) RegisterMultipartID(id string) { + if m == nil { + return + } + + m.m.Lock() + defer m.m.Unlock() + + m.i = id +} + +func (m *mpu) getMultipartID() string { + if m == nil { + return "" + } + + m.m.RLock() + defer m.m.RUnlock() + + return m.i +} + +func (m *mpu) RegisterWorkingFile(file string) error { + if m == nil { + return ErrInvalidInstance + } + + m.m.Lock() + defer m.m.Unlock() + + var e error + + if m.w != nil { + m.m.Unlock() + + if e = m.checkAndSendPart(true); e != nil { + return e + } + + m.m.Lock() + _ = m.w.Close() + m.w = nil + } + + m.w, e = libiot.NewFileProgressPathOpen(filepath.Clean(file)) + + if e != nil { + return e + } + + return nil +} + +func (m *mpu) getWorkingFile() (libiot.FileProgress, error) { + if m == nil { + return nil, ErrInvalidInstance + } + + m.m.RLock() + defer m.m.RUnlock() + + if m.w != nil { + return m.w, nil + } + + m.m.RUnlock() + e := m.setTempWorkingFile() + m.m.RLock() + + if e != nil { + return nil, e + } else if m.w == nil { + return nil, ErrInvalidTMPFile + } + + return m.w, nil +} + +func (m *mpu) setTempWorkingFile() error { + if m == nil { + return ErrInvalidInstance + } + + m.m.Lock() + defer m.m.Unlock() + + var e error + m.w, e = libiot.NewFileProgressTemp() + return e +} + +func (m *mpu) closeWorkingFile() error { + if m == nil { + return nil + } + + m.m.Lock() + defer m.m.Unlock() + + if m.w == nil { + return nil + } + + e := m.w.Close() + m.w = nil + return e +} + +func (m *mpu) getPartSize() libsiz.Size { + if m == nil { + return DefaultPartSize + } + + m.m.RLock() + defer m.m.RUnlock() + + if m.s < 1 { + return DefaultPartSize + } + + return m.s +} + +func (m *mpu) getObject() string { + if m == nil { + return "" + } + + m.m.RLock() + defer m.m.RUnlock() + + return m.o +} + +func (m *mpu) getBucket() string { + if m == nil { + return "" + } + + m.m.RLock() + defer m.m.RUnlock() + + return m.b +} diff --git a/aws/multipart/part.go b/aws/multipart/part.go new file mode 100644 index 00000000..26f92f9e --- /dev/null +++ b/aws/multipart/part.go @@ -0,0 +1,237 @@ +/* + * MIT License + * + * Copyright (c) 2020 Nicolas JUHEL + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * + */ + +package multipart + +import ( + /* #nosec */ + //nolint #nosec + "crypto/md5" + "encoding/base64" + "fmt" + "io" + "strings" + + sdkaws "github.com/aws/aws-sdk-go-v2/aws" + sdksss "github.com/aws/aws-sdk-go-v2/service/s3" + sdktyp "github.com/aws/aws-sdk-go-v2/service/s3/types" + libiot "github.com/nabbar/golib/ioutils" +) + +func (m *mpu) getPartList() []sdktyp.CompletedPart { + if m == nil { + return make([]sdktyp.CompletedPart, 0) + } + + m.m.RLock() + defer m.m.RUnlock() + + if len(m.l) < 1 { + return make([]sdktyp.CompletedPart, 0) + } + + return m.l +} + +func (m *mpu) getCounter() int32 { + if m == nil { + return 0 + } + + m.m.RLock() + defer m.m.RUnlock() + + return m.n +} + +func (m *mpu) RegisterPart(etag string) { + if m == nil { + return + } + + m.m.Lock() + defer m.m.Unlock() + + if len(m.l) < 1 { + m.l = make([]sdktyp.CompletedPart, 0) + } + + m.n++ + m.l = append(m.l, sdktyp.CompletedPart{ + ETag: sdkaws.String(strings.Replace(etag, "\"", "", -1)), + PartNumber: m.n, + }) +} + +func (m *mpu) AddPart(r io.Reader) (n int64, e error) { + if m == nil { + return 0, ErrInvalidInstance + } + + var ( + cli *sdksss.Client + res *sdksss.UploadPartOutput + tmp libiot.FileProgress + ctx = m.getContext() + obj = m.getObject() + bck = m.getBucket() + mid = m.getMultipartID() + + /* #nosec */ + //nolint #nosec + hsh = md5.New() + ) + + if cli = m.getClient(); cli == nil { + return 0, ErrInvalidClient + } else if tmp, e = libiot.NewFileProgressTemp(); e != nil { + return 0, e + } else if tmp == nil { + return 0, ErrInvalidTMPFile + } else { + defer func() { + if tmp != nil { + _ = tmp.Close() + } + }() + } + + if n, e = io.Copy(tmp, r); e != nil || n < 1 { + return n, e + } else if _, e = tmp.Seek(0, io.SeekStart); e != nil { + return 0, e + } else if _, e = tmp.WriteTo(hsh); e != nil { + return 0, e + } else if _, e = tmp.Seek(0, io.SeekStart); e != nil { + return 0, e + } + + res, e = cli.UploadPart(ctx, &sdksss.UploadPartInput{ + Bucket: sdkaws.String(bck), + Key: sdkaws.String(obj), + UploadId: sdkaws.String(mid), + PartNumber: m.getCounter() + 1, + ContentLength: n, + Body: tmp, + RequestPayer: sdktyp.RequestPayerRequester, + ContentMD5: sdkaws.String(base64.StdEncoding.EncodeToString(hsh.Sum(nil))), + }) + + if e != nil { + return 0, e + } else if res == nil || res.ETag == nil || len(*res.ETag) < 1 { + return 0, ErrInvalidResponse + } else { + m.RegisterPart(*res.ETag) + } + + return n, nil +} + +func (m *mpu) AddToPart(p []byte) (n int, e error) { + var ( + tmp libiot.FileProgress + ) + + if tmp, e = m.getWorkingFile(); e != nil { + return 0, e + } else if tmp == nil { + return 0, ErrInvalidTMPFile + } + + for len(p) > 0 { + var ( + r []byte + i int + s int64 + siz = m.getPartSize().Int64() + ) + + if _, e = tmp.Seek(0, io.SeekStart); e != nil { + return n, e + } else if s, e = tmp.SizeToEOF(); e != nil { + return n, e + } else if _, e = tmp.Seek(0, io.SeekEnd); e != nil { + return n, e + } else if s > 0 && s >= siz { + if e = m.checkAndSendPart(false); e != nil { + return n, e + } + continue + } else if s > 0 && s < siz { + siz -= s + } + + if int64(len(p)) > siz { + r = p[:siz] + p = p[siz:] + } else { + r = p + p = nil + } + + if i, e = tmp.Write(r); e != nil { + return n, e + } else if i != len(r) { + return n, fmt.Errorf("write a wrong number of byte") + } else if e = m.checkAndSendPart(false); e != nil { + return n, e + } else { + n += len(r) + } + } + + return n, nil +} + +func (m *mpu) checkAndSendPart(force bool) error { + var ( + err error + siz int64 + tmp libiot.FileProgress + ) + + if tmp, err = m.getWorkingFile(); err != nil { + return err + } else if tmp == nil { + return ErrInvalidTMPFile + } else if _, err = tmp.Seek(0, io.SeekStart); err != nil { + return err + } else if siz, err = tmp.SizeToEOF(); err != nil { + return err + } else if siz < m.getPartSize().Int64() && !force { + return nil + } else if siz == 0 { + return nil + } else if _, err = m.AddPart(tmp); err != nil { + return err + } else if err = tmp.Truncate(0); err != nil { + return err + } else if err = tmp.Sync(); err != nil { + return err + } else { + return nil + } +} diff --git a/aws/multipart/start.go b/aws/multipart/start.go new file mode 100644 index 00000000..b5c984b6 --- /dev/null +++ b/aws/multipart/start.go @@ -0,0 +1,80 @@ +/* + * MIT License + * + * Copyright (c) 2020 Nicolas JUHEL + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * + */ + +package multipart + +import ( + "mime" + "path/filepath" + + sdkaws "github.com/aws/aws-sdk-go-v2/aws" + sdksss "github.com/aws/aws-sdk-go-v2/service/s3" +) + +func (m *mpu) StartMPU() error { + if m == nil { + return ErrInvalidInstance + } + + var ( + cli *sdksss.Client + res *sdksss.CreateMultipartUploadOutput + err error + tpe *string + ctx = m.getContext() + obj = m.getObject() + bck = m.getBucket() + ) + + if cli = m.getClient(); cli == nil { + return ErrInvalidClient + } + + if t := mime.TypeByExtension(filepath.Ext(obj)); t == "" { + tpe = sdkaws.String("application/octet-stream") + } else { + tpe = sdkaws.String(t) + } + + res, err = cli.CreateMultipartUpload(ctx, &sdksss.CreateMultipartUploadInput{ + Key: sdkaws.String(obj), + Bucket: sdkaws.String(bck), + ContentType: tpe, + }) + + if err != nil { + return err + } else if res == nil { + return ErrInvalidResponse + } else if res.UploadId == nil || len(*res.UploadId) < 1 { + return ErrInvalidResponse + } + + m.m.Lock() + defer m.m.Unlock() + m.i = *res.UploadId + + return nil +} diff --git a/aws/multipart/stop.go b/aws/multipart/stop.go new file mode 100644 index 00000000..610c4a92 --- /dev/null +++ b/aws/multipart/stop.go @@ -0,0 +1,105 @@ +/* + * MIT License + * + * Copyright (c) 2020 Nicolas JUHEL + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * + */ + +package multipart + +import ( + sdkaws "github.com/aws/aws-sdk-go-v2/aws" + sdksss "github.com/aws/aws-sdk-go-v2/service/s3" + sdktyp "github.com/aws/aws-sdk-go-v2/service/s3/types" +) + +func (m *mpu) StopMPU(abort bool) error { + if m == nil { + return ErrInvalidInstance + } + + var ( + cli *sdksss.Client + res *sdksss.CompleteMultipartUploadOutput + err error + ctx = m.getContext() + obj = m.getObject() + bck = m.getBucket() + mid = m.getMultipartID() + lst = m.getPartList() + ) + + if cli = m.getClient(); cli == nil { + return ErrInvalidClient + } else if len(mid) < 1 { + return ErrInvalidUploadID + } + + if !abort { + if err = m.checkAndSendPart(true); err != nil { + return err + } + } + + _ = m.closeWorkingFile() + + if abort || len(lst) < 1 { + abort = true + _, err = cli.AbortMultipartUpload(ctx, &sdksss.AbortMultipartUploadInput{ + Bucket: sdkaws.String(bck), + Key: sdkaws.String(obj), + UploadId: sdkaws.String(mid), + }) + } else { + res, err = cli.CompleteMultipartUpload(ctx, &sdksss.CompleteMultipartUploadInput{ + Bucket: sdkaws.String(bck), + Key: sdkaws.String(obj), + UploadId: sdkaws.String(mid), + MultipartUpload: &sdktyp.CompletedMultipartUpload{ + Parts: lst, + }, + RequestPayer: sdktyp.RequestPayerRequester, + }) + } + + if err != nil { + return err + } + + m.m.Lock() + defer m.m.Unlock() + + m.i = "" + m.o = "" + m.b = "" + m.l = nil + m.n = 0 + + if abort { + return nil + } else if res == nil { + return ErrInvalidResponse + } else if res.Key == nil || len(*res.Key) < 1 { + return ErrInvalidResponse + } + + return nil +} diff --git a/aws/object/interface.go b/aws/object/interface.go index 9e79985e..59e88569 100644 --- a/aws/object/interface.go +++ b/aws/object/interface.go @@ -30,6 +30,8 @@ import ( "io" "time" + libmpu "github.com/nabbar/golib/aws/multipart" + libsiz "github.com/nabbar/golib/size" sdkiam "github.com/aws/aws-sdk-go-v2/service/iam" @@ -67,6 +69,7 @@ type Object interface { GetAttributes(object, version string) (*sdksss.GetObjectAttributesOutput, liberr.Error) MultipartList(keyMarker, markerId string) (uploads []sdktps.MultipartUpload, nextKeyMarker string, nextIdMarker string, count int64, e liberr.Error) + MultipartNew(partSize libsiz.Size, object string) libmpu.MultiPart MultipartPut(object string, body io.Reader) liberr.Error MultipartPutCustom(partSize libsiz.Size, object string, body io.Reader) liberr.Error MultipartCancel(uploadId, key string) liberr.Error diff --git a/aws/object/multipart.go b/aws/object/multipart.go index ce6df4e8..5747e5b0 100644 --- a/aws/object/multipart.go +++ b/aws/object/multipart.go @@ -26,29 +26,17 @@ package object import ( - //nolint #nosec - /* #nosec */ - "crypto/md5" - "encoding/base64" "io" - "mime" - "path/filepath" - - libsiz "github.com/nabbar/golib/size" - - //nolint #gci - "os" sdkaws "github.com/aws/aws-sdk-go-v2/aws" sdksss "github.com/aws/aws-sdk-go-v2/service/s3" sdktyp "github.com/aws/aws-sdk-go-v2/service/s3/types" libhlp "github.com/nabbar/golib/aws/helper" + libmpu "github.com/nabbar/golib/aws/multipart" liberr "github.com/nabbar/golib/errors" - libiou "github.com/nabbar/golib/ioutils" + libsiz "github.com/nabbar/golib/size" ) -const DefaultPartSize = 5 * libsiz.SizeMega - // MultipartList implement the ListMultipartUploads. // See docs for more infos : https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListMultipartUploads.html func (cli *client) MultipartList(keyMarker, markerId string) (uploads []sdktyp.MultipartUpload, nextKeyMarker string, nextIdMarker string, count int64, e liberr.Error) { @@ -73,141 +61,57 @@ func (cli *client) MultipartList(keyMarker, markerId string) (uploads []sdktyp.M } } +func (cli *client) MultipartNew(partSize libsiz.Size, object string) libmpu.MultiPart { + m := libmpu.New(partSize, object, cli.GetBucketName()) + m.RegisterContext(cli.GetContext) + m.RegisterClientS3(func() *sdksss.Client { + return cli.s3 + }) + + return m +} + func (cli *client) MultipartPut(object string, body io.Reader) liberr.Error { - return cli.MultipartPutCustom(DefaultPartSize, object, body) + return cli.MultipartPutCustom(libmpu.DefaultPartSize, object, body) } func (cli *client) MultipartPutCustom(partSize libsiz.Size, object string, body io.Reader) liberr.Error { var ( - tmp libiou.FileProgress - rio libhlp.ReaderPartSize - upl *sdksss.CreateMultipartUploadOutput - err error - tpe *string + e error + m = cli.MultipartNew(partSize, object) ) defer func() { - if tmp != nil { - _ = tmp.Close() + if m != nil { + _ = m.Close() } }() - if t := mime.TypeByExtension(filepath.Ext(object)); t == "" { - tpe = sdkaws.String("application/octet-stream") + if e = m.StartMPU(); e != nil { + return cli.GetError(e) + } else if _, e = io.Copy(m, body); e != nil { + return cli.GetError(e) + } else if e = m.StopMPU(false); e != nil { + return cli.GetError(e) } else { - tpe = sdkaws.String(t) - } - - upl, err = cli.s3.CreateMultipartUpload(cli.GetContext(), &sdksss.CreateMultipartUploadInput{ - Key: sdkaws.String(object), - Bucket: sdkaws.String(cli.GetBucketName()), - ContentType: tpe, - }) - - if err != nil { - return cli.GetError(err) - } else if upl == nil { - return libhlp.ErrorResponse.Error(nil) - } - - rio = libhlp.NewReaderPartSize(body, partSize) - - for !rio.IeOEF() { - var ( - inf os.FileInfo - prt *sdksss.UploadPartOutput - ) - - tmp, err = libiou.NewFileProgressTemp() - if err != nil { - return cli._MultipartCancel(err, upl.UploadId, object) - } - - _, err = io.Copy(tmp, rio) - if err != nil { - return cli._MultipartCancel(err, upl.UploadId, object) - } - - _, err = tmp.Seek(0, io.SeekStart) - if err != nil { - return cli._MultipartCancel(err, upl.UploadId, object) - } - - inf, err = tmp.FileStat() - if err != nil { - return cli._MultipartCancel(err, upl.UploadId, object) - } - - /* #nosec */ - h := md5.New() - if _, e := tmp.WriteTo(h); e != nil { - return cli._MultipartCancel(e, upl.UploadId, object) - } - - _, err = tmp.Seek(0, io.SeekStart) - if err != nil { - return cli._MultipartCancel(err, upl.UploadId, object) - } - - prt, err = cli.s3.UploadPart(cli.GetContext(), &sdksss.UploadPartInput{ - Bucket: sdkaws.String(cli.GetBucketName()), - Body: tmp, - PartNumber: rio.CurrPart(), - UploadId: upl.UploadId, - Key: sdkaws.String(object), - ContentLength: inf.Size(), - RequestPayer: sdktyp.RequestPayerRequester, - ContentMD5: sdkaws.String(base64.StdEncoding.EncodeToString(h.Sum(nil))), - }) - - _ = tmp.Close() - tmp = nil - - if err != nil { - return cli._MultipartCancel(err, upl.UploadId, object) - } else if prt == nil || prt.ETag == nil || len(*prt.ETag) == 0 { - return cli._MultipartCancel(libhlp.ErrorResponse.Error(nil), upl.UploadId, object) - } - - rio.NextPart(prt.ETag) - } - - var prt *sdksss.CompleteMultipartUploadOutput - prt, err = cli.s3.CompleteMultipartUpload(cli.GetContext(), &sdksss.CompleteMultipartUploadInput{ - Bucket: sdkaws.String(cli.GetBucketName()), - Key: sdkaws.String(object), - UploadId: upl.UploadId, - MultipartUpload: rio.CompPart(), - RequestPayer: sdktyp.RequestPayerRequester, - }) - - if err != nil { - return cli._MultipartCancel(err, upl.UploadId, object) - } else if prt == nil || prt.ETag == nil || len(*prt.ETag) == 0 { - return cli._MultipartCancel(libhlp.ErrorResponse.Error(nil), upl.UploadId, object) + m = nil } return nil } -func (cli *client) _MultipartCancel(err error, updIp *string, object string) liberr.Error { - cnl, e := cli.s3.AbortMultipartUpload(cli.GetContext(), &sdksss.AbortMultipartUploadInput{ +func (cli *client) MultipartCancel(uploadId, key string) liberr.Error { + res, err := cli.s3.AbortMultipartUpload(cli.GetContext(), &sdksss.AbortMultipartUploadInput{ Bucket: sdkaws.String(cli.GetBucketName()), - UploadId: updIp, - Key: sdkaws.String(object), + UploadId: sdkaws.String(uploadId), + Key: sdkaws.String(key), }) - if e != nil { - return cli.GetError(e, err) - } else if cnl == nil { - return libhlp.ErrorResponse.Error(cli.GetError(err)) - } else if err != nil { + if err != nil { return cli.GetError(err) + } else if res == nil { + return libhlp.ErrorResponse.Error(nil) } else { return nil } } - -func (cli *client) MultipartCancel(uploadId, key string) liberr.Error { - return cli._MultipartCancel(nil, sdkaws.String(uploadId), key) -} diff --git a/aws/object_test.go b/aws/object_test.go index a78923ab..6ae4ece2 100644 --- a/aws/object_test.go +++ b/aws/object_test.go @@ -28,6 +28,8 @@ package aws_test import ( "bytes" + libsiz "github.com/nabbar/golib/size" + . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" ) @@ -70,7 +72,7 @@ var _ = Describe("Object", func() { Context("Multipart Put object", func() { It("Must fail as the bucket doesn't exists - 5", func() { - err := cli.Object().MultipartPut("object", randContent(4*1024)) + err := cli.Object().MultipartPut("object", randContent(10*libsiz.SizeMega)) Expect(err).To(HaveOccurred()) }) }) diff --git a/ioutils/error.go b/ioutils/error.go index 184ba2e8..5a49b706 100644 --- a/ioutils/error.go +++ b/ioutils/error.go @@ -38,6 +38,8 @@ const ( ErrorSyscallRLimitSet ErrorIOFileStat ErrorIOFileSeek + ErrorIOFileTruncate + ErrorIOFileSync ErrorIOFileOpen ErrorIOFileTempNew ErrorIOFileTempClose @@ -64,6 +66,10 @@ func getMessage(code liberr.CodeError) (message string) { return "error occur while trying to get stat of file" case ErrorIOFileSeek: return "error occur while trying seek into file" + case ErrorIOFileTruncate: + return "error occur while trying truncate file" + case ErrorIOFileSync: + return "error occur while trying to sync file" case ErrorIOFileOpen: return "error occur while trying to open file" case ErrorIOFileTempNew: diff --git a/ioutils/fileProgess.go b/ioutils/fileProgess.go index dfd536e5..89088051 100644 --- a/ioutils/fileProgess.go +++ b/ioutils/fileProgess.go @@ -67,6 +67,8 @@ type FileProgress interface { FileStat() (os.FileInfo, liberr.Error) SizeToEOF() (size int64, err liberr.Error) + Truncate(size int64) liberr.Error + Sync() liberr.Error NewFilePathMode(filepath string, mode int, perm os.FileMode) (FileProgress, liberr.Error) NewFilePathWrite(filepath string, create, overwrite bool, perm os.FileMode) (FileProgress, liberr.Error) @@ -271,6 +273,32 @@ func (f *fileProgress) SizeToEOF() (size int64, err liberr.Error) { } } +func (f *fileProgress) Truncate(size int64) liberr.Error { + if f == nil { + return ErrorNilPointer.Error(nil) + } + + if e := f.fs.Truncate(size); e != nil { + return ErrorIOFileTruncate.ErrorParent(e) + } + + f.reset(0) + + return nil +} + +func (f *fileProgress) Sync() liberr.Error { + if f == nil { + return ErrorNilPointer.Error(nil) + } + + if e := f.fs.Sync(); e != nil { + return ErrorIOFileSync.ErrorParent(e) + } + + return nil +} + func (f *fileProgress) SetIncrement(increment func(size int64)) { if f != nil { f.fc = increment