Skip to content

Commit

Permalink
service/s3/s3manager: Add DownloadStrategy for buffering downloaded p…
Browse files Browse the repository at this point in the history
…arts (#2823)

* service/s3/s3manager: Add DownloadStrategy for buffering downloaded parts
* awstesting/integration/performance/s3DownloadManager: S3 Download Manager Benchmark Utility
  • Loading branch information
skmcgrail authored Sep 25, 2019
1 parent 9fd70c2 commit 4cfae4a
Show file tree
Hide file tree
Showing 16 changed files with 1,192 additions and 25 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG_PENDING.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
### SDK Features
* `service/s3/s3manager`: Add Download Buffer Provider ([#2823](https://github.com/aws/aws-sdk-go/pull/2823))
* Adds a new `BufferProvider` member for specifying how part data can be buffered in memory when copying from the http response body.
* Windows platforms will now default to buffering 1MB per part to reduce contention when downloading files.
* Non-Windows platforms will continue to employ a non-buffering behavior.
* Fixes [#2180](https://github.com/aws/aws-sdk-go/issues/2180)
* Fixes [#2662](https://github.com/aws/aws-sdk-go/issues/2662)

### SDK Enhancements

Expand Down
11 changes: 11 additions & 0 deletions awstesting/discard.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package awstesting

// DiscardAt is an io.WriteAt that discards
// the requested bytes to be written
type DiscardAt struct{}

// WriteAt discards the given []byte slice and returns len(p) bytes
// as having been written at the given offset. It will never return an error.
func (d DiscardAt) WriteAt(p []byte, off int64) (n int, err error) {
return len(p), nil
}
12 changes: 12 additions & 0 deletions awstesting/endless_reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package awstesting

// EndlessReader is an io.Reader that will always return
// that bytes have been read.
type EndlessReader struct{}

// Read will report that it has read len(p) bytes in p.
// The content in the []byte will be unmodified.
// This will never return an error.
func (e EndlessReader) Read(p []byte) (int, error) {
return len(p), nil
}
39 changes: 39 additions & 0 deletions awstesting/integration/performance/s3DownloadManager/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
## Performance Utility

Downloads a test file from a S3 bucket using the SDK's S3 download manager. Allows passing
in custom configuration for the HTTP client and SDK's Download Manager behavior.

## Build
### Standalone
```sh
go build -tags "integration perftest" -o s3DownloadManager ./awstesting/integration/performance/s3DownloadManager
```
### Benchmarking
```sh
go test -tags "integration perftest" -c -o s3DownloadManager ./awstesting/integration/performance/s3DownloadManager
```

## Usage Example:
### Standalone
```sh
AWS_REGION=us-west-2 AWS_PROFILE=aws-go-sdk-team-test ./s3DownloadManager \
-bucket aws-sdk-go-data \
-size 10485760 \
-client.idle-conns 1000 \
-client.idle-conns-host 300 \
-client.timeout.connect=1s \
-client.timeout.response-header=1s
```

### Benchmarking
```sh
AWS_REGION=us-west-2 AWS_PROFILE=aws-go-sdk-team-test ./s3DownloadManager \
-test.bench=. \
-test.benchmem \
-test.benchtime 1x \
-bucket aws-sdk-go-data \
-client.idle-conns 1000 \
-client.idle-conns-host 300 \
-client.timeout.connect=1s \
-client.timeout.response-header=1s
```
32 changes: 32 additions & 0 deletions awstesting/integration/performance/s3DownloadManager/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// +build integration,perftest

package main

import (
"net"
"net/http"
"time"
)

func NewClient(cfg ClientConfig) *http.Client {
tr := &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: cfg.Timeouts.Connect,
KeepAlive: 30 * time.Second,
DualStack: true,
}).DialContext,
MaxIdleConns: cfg.MaxIdleConns,
MaxIdleConnsPerHost: cfg.MaxIdleConnsPerHost,
IdleConnTimeout: 90 * time.Second,

DisableKeepAlives: !cfg.KeepAlive,
TLSHandshakeTimeout: cfg.Timeouts.TLSHandshake,
ExpectContinueTimeout: cfg.Timeouts.ExpectContinue,
ResponseHeaderTimeout: cfg.Timeouts.ResponseHeader,
}

return &http.Client{
Transport: tr,
}
}
152 changes: 152 additions & 0 deletions awstesting/integration/performance/s3DownloadManager/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
// +build integration,perftest

package main

import (
"flag"
"fmt"
"net/http"
"strings"
"time"

"github.com/aws/aws-sdk-go/service/s3/s3manager"
)

type Config struct {
Bucket string
Size int64
LogVerbose bool

SDK SDKConfig
Client ClientConfig
}

func (c *Config) SetupFlags(prefix string, flagset *flag.FlagSet) {
flagset.StringVar(&c.Bucket, "bucket", "",
"The S3 bucket `name` to download the object from.")
flagset.Int64Var(&c.Size, "size", 0,
"The S3 object size in bytes to be first uploaded then downloaded")
flagset.BoolVar(&c.LogVerbose, "verbose", false,
"The output log will include verbose request information")

c.SDK.SetupFlags(prefix, flagset)
c.Client.SetupFlags(prefix, flagset)
}

func (c *Config) Validate() error {
var errs Errors

if len(c.Bucket) == 0 || c.Size <= 0 {
errs = append(errs, fmt.Errorf("bucket and filename/size are required"))
}

if err := c.SDK.Validate(); err != nil {
errs = append(errs, err)
}
if err := c.Client.Validate(); err != nil {
errs = append(errs, err)
}

if len(errs) != 0 {
return errs
}

return nil
}

type SDKConfig struct {
PartSize int64
Concurrency int
BufferProvider s3manager.WriterReadFromProvider
}

func (c *SDKConfig) SetupFlags(prefix string, flagset *flag.FlagSet) {
prefix += "sdk."

flagset.Int64Var(&c.PartSize, prefix+"part-size", s3manager.DefaultDownloadPartSize,
"Specifies the `size` of parts of the object to download.")
flagset.IntVar(&c.Concurrency, prefix+"concurrency", s3manager.DefaultDownloadConcurrency,
"Specifies the number of parts to download `at once`.")
}

func (c *SDKConfig) Validate() error {
return nil
}

type ClientConfig struct {
KeepAlive bool
Timeouts Timeouts

MaxIdleConns int
MaxIdleConnsPerHost int
}

func (c *ClientConfig) SetupFlags(prefix string, flagset *flag.FlagSet) {
prefix += "client."

flagset.BoolVar(&c.KeepAlive, prefix+"http-keep-alive", true,
"Specifies if HTTP keep alive is enabled.")

defTR := http.DefaultTransport.(*http.Transport)

flagset.IntVar(&c.MaxIdleConns, prefix+"idle-conns", defTR.MaxIdleConns,
"Specifies max idle connection pool size.")

flagset.IntVar(&c.MaxIdleConnsPerHost, prefix+"idle-conns-host", http.DefaultMaxIdleConnsPerHost,
"Specifies max idle connection pool per host, will be truncated by idle-conns.")

c.Timeouts.SetupFlags(prefix, flagset)
}

func (c *ClientConfig) Validate() error {
var errs Errors

if err := c.Timeouts.Validate(); err != nil {
errs = append(errs, err)
}

if len(errs) != 0 {
return errs
}
return nil
}

type Timeouts struct {
Connect time.Duration
TLSHandshake time.Duration
ExpectContinue time.Duration
ResponseHeader time.Duration
}

func (c *Timeouts) SetupFlags(prefix string, flagset *flag.FlagSet) {
prefix += "timeout."

flagset.DurationVar(&c.Connect, prefix+"connect", 30*time.Second,
"The `timeout` connecting to the remote host.")

defTR := http.DefaultTransport.(*http.Transport)

flagset.DurationVar(&c.TLSHandshake, prefix+"tls", defTR.TLSHandshakeTimeout,
"The `timeout` waiting for the TLS handshake to complete.")

flagset.DurationVar(&c.ExpectContinue, prefix+"expect-continue", defTR.ExpectContinueTimeout,
"The `timeout` waiting for the TLS handshake to complete.")

flagset.DurationVar(&c.ResponseHeader, prefix+"response-header", defTR.ResponseHeaderTimeout,
"The `timeout` waiting for the TLS handshake to complete.")
}

func (c *Timeouts) Validate() error {
return nil
}

type Errors []error

func (es Errors) Error() string {
var buf strings.Builder
for _, e := range es {
buf.WriteString(e.Error())
}

return buf.String()
}
Loading

0 comments on commit 4cfae4a

Please sign in to comment.