Skip to content

Commit

Permalink
feat: better named bucket functions (#59)
Browse files Browse the repository at this point in the history
  • Loading branch information
jobstoit authored Jul 21, 2024
1 parent 2dd3f61 commit 8c0b70b
Show file tree
Hide file tree
Showing 3 changed files with 258 additions and 3 deletions.
85 changes: 82 additions & 3 deletions bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,34 @@ type Bucket struct {
writeChunkSize int64
concurrency int
logger *slog.Logger
cli *s3.Client
cli BucketApiClient
}

// NewRawBucket returns a new bucket instance.
// For normal operations use OpenBucket instead as this will connect and verify the bucket.
func NewRawBucket(
name string,
readChunkSize, writeChunkSize int64,
concurrency int,
logger *slog.Logger,
cli BucketApiClient,
) *Bucket {
return &Bucket{
name,
readChunkSize,
writeChunkSize,
concurrency,
logger,
cli,
}
}

// OpenURL opens the bucket with all the connection options in the url.
// The url is written as: s3://access-key:access-secret@host/bucketname?region=us-east&pathstyle=true
//
// The url assumes the host has a https protocol unless the "insecure" query param is set to "true".
// To create the bucket if it doesn't exist set the "create" query param to "true".
// To use the pathstyle url set "pathstyle" to "true"
// To use the pathstyle url set "pathstyle" to "true".
func OpenURL(ctx context.Context, u string, opts ...BucketOption) (*Bucket, error) {
pu, err := url.Parse(u)
if err != nil {
Expand Down Expand Up @@ -307,6 +326,8 @@ func (b *Bucket) NewWriter(ctx context.Context, key string, opts ...ObjectWriter
}

// WriteFrom writes all the bytes from the reader into the given object
//
// Deprecated: use the improved and better named ReadFrom instead
func (b *Bucket) WriteFrom(ctx context.Context, key string, from io.Reader, opts ...ObjectWriterOption) (int64, error) {
wr := b.NewWriter(ctx, key, opts...)
defer wr.Close()
Expand Down Expand Up @@ -334,10 +355,68 @@ func (b *Bucket) WriteAll(ctx context.Context, key string, p []byte, opts ...Obj

// Client returns the s3 client the Bucket uses
func (b *Bucket) Client() *s3.Client {
return b.cli
cli, ok := b.cli.(*s3.Client)
if !ok {
panic("not an s3 client object")
}

return cli
}

// Name returns the specified bucket's name
func (b *Bucket) Name() string {
return b.name
}

// ReadFrom reads the bytes from the given reader into the object
// and closes the reader if it implements io.Closer
func (b *Bucket) ReadFrom(ctx context.Context, key string, rd io.Reader, opts ...ObjectWriterOption) (int64, error) {
cl, clOk := rd.(io.Closer)
if clOk {
defer cl.Close()
}

wr := b.NewWriter(ctx, key, opts...)
defer wr.Close()

if wrTo, ok := rd.(io.WriterTo); ok {
return wrTo.WriteTo(wr)
}

n, err := io.Copy(wr, rd)
if err != nil {
return n, err
}

return n, wr.Close()
}

// WriteTo write all the bytes from the object into the given writer
// and closes the writer if it implements io.Closer
func (b *Bucket) WriteTo(ctx context.Context, key string, wr io.Writer, opts ...ObjectReaderOption) (int64, error) {
cl, clOk := wr.(io.Closer)
if clOk {
defer cl.Close()
}

rd := b.NewReader(ctx, key, opts...)

var n int64
var err error

if rdFr, ok := wr.(io.ReaderFrom); ok {
n, err = rdFr.ReadFrom(rd)
} else {
n, err = io.Copy(wr, rd)
}

if err != nil {
return n, err
}

if clOk {
err = cl.Close()
}

return n, err
}
167 changes: 167 additions & 0 deletions bucket_test.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,20 @@
package s3io_test

import (
"bytes"
"context"
"crypto/rand"
"errors"
"fmt"
"io"
"net/url"
"reflect"
"strings"
"testing"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/aws-sdk-go-v2/service/s3/types"
"github.com/jobstoit/s3io/v2"
)

Expand Down Expand Up @@ -47,3 +55,162 @@ func TestOpenUrl(t *testing.T) {
t.Errorf("unexpected error with url '%s': %v", u, err)
}
}

func TestReadFrom(t *testing.T) {
ctx := context.Background()
s, ops, args := NewUploadLoggingClient(nil)

cli := &BucketLoggingClient{
uploadClient: s,
}

bucket := s3io.NewRawBucket("testing", s3io.DefaultChunkSize, s3io.DefaultChunkSize, 1, noopLogger, cli)

var amount int64 = 1024 * 1024 * 12

c, err := bucket.ReadFrom(
ctx,
"path/to/item",
io.LimitReader(rand.Reader, amount),
s3io.WithWriterConcurrency(1),
s3io.WithWriterChunkSize(1024*1024*7),
)
if err != nil {
t.Fatalf("error reading from: %v", err)
}

if e, a := amount, c; e != a {
t.Errorf("expected %d not equal to actual %d", e, a)
}

vals := []string{"CreateMultipartUpload", "UploadPart", "UploadPart", "CompleteMultipartUpload"}
if !reflect.DeepEqual(vals, *ops) {
t.Errorf("expect %v, got %v", vals, *ops)
}

// Part lengths
if e, a := int64(1024*1024*7), getReaderLength((*args)[1].(*s3.UploadPartInput).Body); e != a {
t.Errorf("expect %d, got %d", e, a)
}

if e, a := int64(1024*1024*5), getReaderLength((*args)[2].(*s3.UploadPartInput).Body); e != a {
t.Errorf("expect %d, got %d", e, a)
}
}

func TestWriteTo(t *testing.T) {
ctx := context.Background()
dlcli, invocations, ranges := newDownloadRangeClient(buf12MB)

cli := &BucketLoggingClient{
downloadCaptureClient: dlcli,
}

bucket := s3io.NewRawBucket("testing", s3io.DefaultChunkSize, s3io.DefaultChunkSize, 1, noopLogger, cli)

buf := &bytes.Buffer{}
_, err := bucket.WriteTo(
ctx,
"path/to/item",
buf,
s3io.WithReaderConcurrency(1),
)
if err != nil {
t.Fatalf("unable to write to: %v", err)
}

if e, a := len(buf12MB), buf.Len(); e != a {
t.Errorf("expected %d but got %d", e, a)
}

if e, a := 4, *invocations; e != a {
t.Errorf("expect %v API calls, got %v", e, a)
}

expectRngs := []string{"bytes=0-0", "bytes=0-5242880", "bytes=5242881-10485761", "bytes=10485762-12582912"}
if e, a := expectRngs, *ranges; !reflect.DeepEqual(e, a) {
t.Errorf("expect %v ranges, got %v", e, a)
}
}

// BucketLoggingClient is a test client
type BucketLoggingClient struct {
Invocations []string
paths []string
uploadClient *UploadLoggingClient
downloadCaptureClient *downloadCaptureClient
}

// HeadObject is an implementation of s3.HeadObjectAPIClient
func (b *BucketLoggingClient) HeadObject(
_ context.Context,
input *s3.HeadObjectInput,
optFns ...func(*s3.Options),
) (*s3.HeadObjectOutput, error) {
b.Invocations = append(b.Invocations, "HeadObject")

var err error
if !strings.Contains(strings.Join(b.paths, " "), aws.ToString(input.Key)) {
err = &types.NotFound{
Message: aws.String("object not found"),
}
}

return &s3.HeadObjectOutput{
ContentType: aws.String("text/plain"),
}, err
}

// ListObjectsV2 is an implementation of s3.ListObjectsV2APIClient
func (b *BucketLoggingClient) ListObjectsV2(
_ context.Context,
input *s3.ListObjectsV2Input,
optFns ...func(*s3.Options),
) (*s3.ListObjectsV2Output, error) {
b.Invocations = append(b.Invocations, "ListObjectsV2")

objs := make([]types.Object, len(b.paths))
for i, p := range b.paths {
objs[i] = types.Object{
Key: aws.String(p),
}
}

out := &s3.ListObjectsV2Output{
Contents: objs,
}

return out, nil
}

func (b *BucketLoggingClient) DeleteObjects(_ context.Context, input *s3.DeleteObjectsInput, optFns ...func(*s3.Options)) (*s3.DeleteObjectsOutput, error) {
panic("not implemented")
}

func (b *BucketLoggingClient) DeleteObject(_ context.Context, input *s3.DeleteObjectInput, optFns ...func(*s3.Options)) (*s3.DeleteObjectOutput, error) {
panic("not implemented")
}

func (b *BucketLoggingClient) GetObject(ctx context.Context, input *s3.GetObjectInput, optFns ...func(*s3.Options)) (*s3.GetObjectOutput, error) {
return b.downloadCaptureClient.GetObject(ctx, input, optFns...)
}

func (b *BucketLoggingClient) PutObject(ctx context.Context, input *s3.PutObjectInput, optFns ...func(*s3.Options)) (*s3.PutObjectOutput, error) {
return b.uploadClient.PutObject(ctx, input, optFns...)
}

func (b *BucketLoggingClient) UploadPart(ctx context.Context, input *s3.UploadPartInput, optFns ...func(*s3.Options)) (*s3.UploadPartOutput, error) {
return b.uploadClient.UploadPart(ctx, input, optFns...)
}

func (b *BucketLoggingClient) CreateMultipartUpload(ctx context.Context, input *s3.CreateMultipartUploadInput, optFns ...func(*s3.Options)) (*s3.CreateMultipartUploadOutput, error) {
return b.uploadClient.CreateMultipartUpload(ctx, input, optFns...)
}

func (b *BucketLoggingClient) CompleteMultipartUpload(ctx context.Context, input *s3.CompleteMultipartUploadInput, optFns ...func(*s3.Options)) (*s3.CompleteMultipartUploadOutput, error) {
return b.uploadClient.CompleteMultipartUpload(ctx, input, optFns...)
}

func (b *BucketLoggingClient) AbortMultipartUpload(ctx context.Context, input *s3.AbortMultipartUploadInput, optFns ...func(*s3.Options)) (*s3.AbortMultipartUploadOutput, error) {
return b.uploadClient.AbortMultipartUpload(ctx, input, optFns...)
}
9 changes: 9 additions & 0 deletions s3io.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,15 @@ type UploadAPIClient interface {
AbortMultipartUpload(context.Context, *s3.AbortMultipartUploadInput, ...func(*s3.Options)) (*s3.AbortMultipartUploadOutput, error)
}

type BucketApiClient interface {
DownloadAPIClient
UploadAPIClient
s3.HeadObjectAPIClient
s3.ListObjectsV2APIClient
DeleteObject(ctx context.Context, input *s3.DeleteObjectInput, optFns ...func(*s3.Options)) (*s3.DeleteObjectOutput, error)
DeleteObjects(ctx context.Context, input *s3.DeleteObjectsInput, optFns ...func(*s3.Options)) (*s3.DeleteObjectsOutput, error)
}

type concurrencyLock struct {
l chan struct{}
}
Expand Down

0 comments on commit 8c0b70b

Please sign in to comment.