Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix s3 upload performance regression #98

Merged
merged 5 commits into from
Jan 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 74 additions & 61 deletions objstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,19 +232,6 @@ func NopCloserWithSize(r io.Reader) io.ReadCloser {
return nopCloserWithObjectSize{r}
}

type nopSeekerCloserWithObjectSize struct{ io.Reader }

func (nopSeekerCloserWithObjectSize) Close() error { return nil }
func (n nopSeekerCloserWithObjectSize) ObjectSize() (int64, error) { return TryToGetSize(n.Reader) }

func (n nopSeekerCloserWithObjectSize) Seek(offset int64, whence int) (int64, error) {
return n.Reader.(io.Seeker).Seek(offset, whence)
}

func nopSeekerCloserWithSize(r io.Reader) io.ReadSeekCloser {
return nopSeekerCloserWithObjectSize{r}
}

// UploadDir uploads all files in srcdir to the bucket with into a top-level directory
// named dstdir. It is a caller responsibility to clean partial upload in case of failure.
func UploadDir(ctx context.Context, logger log.Logger, bkt Bucket, srcdir, dstdir string, options ...UploadOption) error {
Expand Down Expand Up @@ -555,8 +542,9 @@ func (b *metricBucket) Get(ctx context.Context, name string) (io.ReadCloser, err
}
return nil, err
}
return newTimingReadCloser(
return newTimingReader(
rc,
true,
op,
b.opsDuration,
b.opsFailures,
Expand All @@ -577,8 +565,9 @@ func (b *metricBucket) GetRange(ctx context.Context, name string, off, length in
}
return nil, err
}
return newTimingReadCloser(
return newTimingReader(
rc,
true,
op,
b.opsDuration,
b.opsFailures,
Expand Down Expand Up @@ -608,16 +597,9 @@ func (b *metricBucket) Upload(ctx context.Context, name string, r io.Reader) err
const op = OpUpload
b.ops.WithLabelValues(op).Inc()

_, ok := r.(io.Seeker)
var nopR io.ReadCloser
if ok {
nopR = nopSeekerCloserWithSize(r)
} else {
nopR = NopCloserWithSize(r)
}

trc := newTimingReadCloser(
nopR,
trc := newTimingReader(
r,
false,
op,
b.opsDuration,
b.opsFailures,
Expand Down Expand Up @@ -670,12 +652,13 @@ func (b *metricBucket) Name() string {
return b.bkt.Name()
}

type timingReadSeekCloser struct {
timingReadCloser
}
type timingReader struct {
io.Reader

// closeReader holds whether the wrapper io.Reader should be closed when
// Close() is called on the timingReader.
closeReader bool

type timingReadCloser struct {
io.ReadCloser
objSize int64
objSizeErr error

Expand All @@ -691,14 +674,15 @@ type timingReadCloser struct {
transferredBytes *prometheus.HistogramVec
}

func newTimingReadCloser(rc io.ReadCloser, op string, dur *prometheus.HistogramVec, failed *prometheus.CounterVec, isFailureExpected IsOpFailureExpectedFunc, fetchedBytes *prometheus.CounterVec, transferredBytes *prometheus.HistogramVec) io.ReadCloser {
func newTimingReader(r io.Reader, closeReader bool, op string, dur *prometheus.HistogramVec, failed *prometheus.CounterVec, isFailureExpected IsOpFailureExpectedFunc, fetchedBytes *prometheus.CounterVec, transferredBytes *prometheus.HistogramVec) io.ReadCloser {
// Initialize the metrics with 0.
dur.WithLabelValues(op)
failed.WithLabelValues(op)
objSize, objSizeErr := TryToGetSize(rc)
objSize, objSizeErr := TryToGetSize(r)

trc := timingReadCloser{
ReadCloser: rc,
trc := timingReader{
Reader: r,
closeReader: closeReader,
objSize: objSize,
objSizeErr: objSizeErr,
start: time.Now(),
Expand All @@ -711,50 +695,79 @@ func newTimingReadCloser(rc io.ReadCloser, op string, dur *prometheus.HistogramV
readBytes: 0,
}

_, ok := rc.(io.Seeker)
if ok {
return &timingReadSeekCloser{
timingReadCloser: trc,
}
_, isSeeker := r.(io.Seeker)
_, isReaderAt := r.(io.ReaderAt)

if isSeeker && isReaderAt {
// The assumption is that in most cases when io.ReaderAt() is implemented then
// io.Seeker is implemented too (e.g. os.File).
return &timingReaderSeekerReaderAt{timingReaderSeeker: timingReaderSeeker{timingReader: trc}}
}
if isSeeker {
return &timingReaderSeeker{timingReader: trc}
}

return &trc
}

func (t *timingReadCloser) ObjectSize() (int64, error) {
return t.objSize, t.objSizeErr
func (r *timingReader) ObjectSize() (int64, error) {
return r.objSize, r.objSizeErr
}

func (rc *timingReadCloser) Close() error {
err := rc.ReadCloser.Close()
if !rc.alreadyGotErr && err != nil {
rc.failed.WithLabelValues(rc.op).Inc()
func (r *timingReader) Close() error {
var closeErr error

// Call the wrapped reader if it implements Close(), only if we've been asked to close it.
if closer, ok := r.Reader.(io.Closer); r.closeReader && ok {
closeErr = closer.Close()

if !r.alreadyGotErr && closeErr != nil {
r.failed.WithLabelValues(r.op).Inc()
r.alreadyGotErr = true
}
}
if !rc.alreadyGotErr && err == nil {
rc.duration.WithLabelValues(rc.op).Observe(time.Since(rc.start).Seconds())
rc.transferredBytes.WithLabelValues(rc.op).Observe(float64(rc.readBytes))
rc.alreadyGotErr = true

// Track duration and transferred bytes only if no error occurred.
if !r.alreadyGotErr {
r.duration.WithLabelValues(r.op).Observe(time.Since(r.start).Seconds())
r.transferredBytes.WithLabelValues(r.op).Observe(float64(r.readBytes))

// Trick to tracking metrics multiple times in case Close() gets called again.
r.alreadyGotErr = true
}
return err

return closeErr
}

func (rc *timingReadCloser) Read(b []byte) (n int, err error) {
n, err = rc.ReadCloser.Read(b)
if rc.fetchedBytes != nil {
rc.fetchedBytes.WithLabelValues(rc.op).Add(float64(n))
func (r *timingReader) Read(b []byte) (n int, err error) {
n, err = r.Reader.Read(b)
if r.fetchedBytes != nil {
r.fetchedBytes.WithLabelValues(r.op).Add(float64(n))
}

rc.readBytes += int64(n)
r.readBytes += int64(n)
// Report metric just once.
if !rc.alreadyGotErr && err != nil && err != io.EOF {
if !rc.isFailureExpected(err) {
rc.failed.WithLabelValues(rc.op).Inc()
if !r.alreadyGotErr && err != nil && err != io.EOF {
if !r.isFailureExpected(err) {
r.failed.WithLabelValues(r.op).Inc()
}
rc.alreadyGotErr = true
r.alreadyGotErr = true
}
return n, err
}

func (rsc *timingReadSeekCloser) Seek(offset int64, whence int) (int64, error) {
return (rsc.ReadCloser).(io.Seeker).Seek(offset, whence)
type timingReaderSeeker struct {
timingReader
}

func (rsc *timingReaderSeeker) Seek(offset int64, whence int) (int64, error) {
return (rsc.Reader).(io.Seeker).Seek(offset, whence)
}

type timingReaderSeekerReaderAt struct {
timingReaderSeeker
}

func (rsc *timingReaderSeekerReaderAt) ReadAt(p []byte, off int64) (int, error) {
return (rsc.Reader).(io.ReaderAt).ReadAt(p, off)
}
Loading
Loading