Skip to content

Commit

Permalink
prefetch: fix underlying unexpected-eof taken as EOF (#59752)
Browse files Browse the repository at this point in the history
ref #59495
  • Loading branch information
D3Hunter authored Mar 5, 2025
1 parent e52d8ab commit 0f5ba8f
Show file tree
Hide file tree
Showing 19 changed files with 160 additions and 102 deletions.
14 changes: 6 additions & 8 deletions br/pkg/storage/gcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ func (s *GCSStorage) Open(ctx context.Context, path string, o *ReaderOption) (Ex

attrs, err := handle.Attrs(ctx)
if err != nil {
if errors.Cause(err) == storage.ErrObjectNotExist { // nolint:errorlint
if goerrors.Is(err, storage.ErrObjectNotExist) {
return nil, errors.Annotatef(err,
"the object doesn't exist, file info: input.bucket='%s', input.key='%s'",
s.gcs.Bucket, path)
Expand All @@ -257,7 +257,7 @@ func (s *GCSStorage) Open(ctx context.Context, path string, o *ReaderOption) (Ex
pos = *o.StartOffset
}
if o.EndOffset != nil {
endPos = *o.EndOffset
endPos = min(endPos, *o.EndOffset)
}
prefetchSize = o.PrefetchSize
}
Expand Down Expand Up @@ -544,6 +544,7 @@ type gcsObjectReader struct {
name string
objHandle *storage.ObjectHandle
reader io.ReadCloser
// [pos, endPos) is the range of the file to read.
pos int64
endPos int64
totalSize int64
Expand All @@ -558,10 +559,7 @@ type gcsObjectReader struct {
// Read implement the io.Reader interface.
func (r *gcsObjectReader) Read(p []byte) (n int, err error) {
if r.reader == nil {
length := int64(-1)
if r.endPos != r.totalSize {
length = r.endPos - r.pos
}
length := r.endPos - r.pos
rc, err := r.objHandle.NewRangeReader(r.ctx, r.pos, length)
if err != nil {
return 0, errors.Annotatef(err,
Expand All @@ -570,7 +568,7 @@ func (r *gcsObjectReader) Read(p []byte) (n int, err error) {
}
r.reader = rc
if r.prefetchSize > 0 {
r.reader = prefetch.NewReader(r.reader, r.prefetchSize)
r.reader = prefetch.NewReader(r.reader, length, r.prefetchSize)
}
}
n, err = r.reader.Read(p)
Expand Down Expand Up @@ -630,7 +628,7 @@ func (r *gcsObjectReader) Seek(offset int64, whence int) (int64, error) {
}
r.reader = rc
if r.prefetchSize > 0 {
r.reader = prefetch.NewReader(r.reader, r.prefetchSize)
r.reader = prefetch.NewReader(r.reader, r.endPos-r.pos, r.prefetchSize)
}

return realOffset, nil
Expand Down
8 changes: 4 additions & 4 deletions br/pkg/storage/ks3.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,7 @@ func (rs *KS3Storage) Open(ctx context.Context, path string, o *ReaderOption) (E
return nil, errors.Trace(err)
}
if prefetchSize > 0 {
reader = prefetch.NewReader(reader, prefetchSize)
reader = prefetch.NewReader(reader, r.RangeSize(), prefetchSize)
}
return &ks3ObjectReader{
ctx: ctx,
Expand Down Expand Up @@ -564,14 +564,14 @@ func (r *ks3ObjectReader) Read(p []byte) (n int, err error) {
}
_ = r.reader.Close()

newReader, _, err1 := r.storage.open(r.ctx, r.name, r.pos, end)
newReader, rangeInfo, err1 := r.storage.open(r.ctx, r.name, r.pos, end)
if err1 != nil {
log.Warn("open new s3 reader failed", zap.String("file", r.name), zap.Error(err1))
return
}
r.reader = newReader
if r.prefetchSize > 0 {
r.reader = prefetch.NewReader(r.reader, r.prefetchSize)
r.reader = prefetch.NewReader(r.reader, rangeInfo.RangeSize(), r.prefetchSize)
}
retryCnt++
n, err = r.reader.Read(p[:maxCnt])
Expand Down Expand Up @@ -643,7 +643,7 @@ func (r *ks3ObjectReader) Seek(offset int64, whence int) (int64, error) {
}
r.reader = newReader
if r.prefetchSize > 0 {
r.reader = prefetch.NewReader(r.reader, r.prefetchSize)
r.reader = prefetch.NewReader(r.reader, info.RangeSize(), r.prefetchSize)
}
r.rangeInfo = info
r.pos = realOffset
Expand Down
18 changes: 12 additions & 6 deletions br/pkg/storage/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -822,7 +822,7 @@ func (rs *S3Storage) Open(ctx context.Context, path string, o *ReaderOption) (Ex
return nil, errors.Trace(err)
}
if prefetchSize > 0 {
reader = prefetch.NewReader(reader, o.PrefetchSize)
reader = prefetch.NewReader(reader, r.RangeSize(), o.PrefetchSize)
}
return &s3ObjectReader{
storage: rs,
Expand All @@ -835,8 +835,9 @@ func (rs *S3Storage) Open(ctx context.Context, path string, o *ReaderOption) (Ex
}, nil
}

// RangeInfo represents the an HTTP Content-Range header value
// RangeInfo represents the HTTP Content-Range header value
// of the form `bytes [Start]-[End]/[Size]`.
// see https://www.rfc-editor.org/rfc/rfc9110.html#section-14.4.
type RangeInfo struct {
// Start is the absolute position of the first byte of the byte range,
// starting from 0.
Expand All @@ -849,6 +850,11 @@ type RangeInfo struct {
Size int64
}

// RangeSize returns the size of the range.
func (r *RangeInfo) RangeSize() int64 {
return r.End + 1 - r.Start
}

// if endOffset > startOffset, should return reader for bytes in [startOffset, endOffset).
func (rs *S3Storage) open(
ctx context.Context,
Expand Down Expand Up @@ -977,7 +983,7 @@ func (r *s3ObjectReader) Read(p []byte) (n int, err error) {
n, err = r.reader.Read(p[:maxCnt])
// TODO: maybe we should use !errors.Is(err, io.EOF) here to avoid error lint, but currently, pingcap/errors
// doesn't implement this method yet.
for err != nil && errors.Cause(err) != io.EOF && retryCnt < maxErrorRetries { //nolint:errorlint
for err != nil && errors.Cause(err) != io.EOF && r.ctx.Err() == nil && retryCnt < maxErrorRetries { //nolint:errorlint
log.L().Warn(
"read s3 object failed, will retry",
zap.String("file", r.name),
Expand All @@ -991,14 +997,14 @@ func (r *s3ObjectReader) Read(p []byte) (n int, err error) {
}
_ = r.reader.Close()

newReader, _, err1 := r.storage.open(r.ctx, r.name, r.pos, end)
newReader, rangeInfo, err1 := r.storage.open(r.ctx, r.name, r.pos, end)
if err1 != nil {
log.Warn("open new s3 reader failed", zap.String("file", r.name), zap.Error(err1))
return
}
r.reader = newReader
if r.prefetchSize > 0 {
r.reader = prefetch.NewReader(r.reader, r.prefetchSize)
r.reader = prefetch.NewReader(r.reader, rangeInfo.RangeSize(), r.prefetchSize)
}
retryCnt++
n, err = r.reader.Read(p[:maxCnt])
Expand Down Expand Up @@ -1070,7 +1076,7 @@ func (r *s3ObjectReader) Seek(offset int64, whence int) (int64, error) {
}
r.reader = newReader
if r.prefetchSize > 0 {
r.reader = prefetch.NewReader(r.reader, r.prefetchSize)
r.reader = prefetch.NewReader(r.reader, info.RangeSize(), r.prefetchSize)
}
r.rangeInfo = info
r.pos = realOffset
Expand Down
33 changes: 33 additions & 0 deletions br/pkg/storage/s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -903,6 +903,39 @@ func TestS3RangeReaderRetryRead(t *testing.T) {
require.Equal(t, []byte("56"), slice)
}

func TestS3RangeReaderShouldNotRetryWhenContextCancelled(t *testing.T) {
s := createS3Suite(t)
ctx, cancelFunc := context.WithCancel(context.Background())
content := []byte("0123456789")
var failCount atomic.Int32
s.s3.EXPECT().GetObjectWithContext(gomock.Any(), gomock.Any()).
DoAndReturn(func(_ context.Context, input *s3.GetObjectInput, opt ...request.Option) (*s3.GetObjectOutput, error) {
var start int
_, err := fmt.Sscanf(*input.Range, "bytes=%d-", &start)
require.NoError(t, err)
requestedBytes := content[start:]
return &s3.GetObjectOutput{
Body: io.NopCloser(&mockFailReader{r: bytes.NewReader(requestedBytes), failCount: &failCount}),
ContentRange: aws.String(fmt.Sprintf("bytes %d-%d/%d", start, len(content)-1, len(content))),
}, nil
})
reader, err := s.storage.Open(ctx, "random", &ReaderOption{StartOffset: aws.Int64(3)})
require.NoError(t, err)
defer func() {
require.NoError(t, reader.Close())
}()
slice := make([]byte, 2)
n, err := reader.Read(slice)
require.NoError(t, err)
require.Equal(t, 2, n)
require.Equal(t, []byte("34"), slice)
failCount.Store(1)
cancelFunc()
n, err = reader.Read(slice)
require.ErrorContains(t, err, "mock read error")
require.Zero(t, n)
}

// TestS3ReaderWithRetryEOF check the Read with retry and end with io.EOF.
func TestS3ReaderWithRetryEOF(t *testing.T) {
s := createS3Suite(t)
Expand Down
1 change: 1 addition & 0 deletions pkg/lightning/backend/external/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ go_library(
"//pkg/util/intest",
"//pkg/util/logutil",
"//pkg/util/size",
"@com_github_aws_aws_sdk_go//aws",
"@com_github_cockroachdb_pebble//:pebble",
"@com_github_docker_go_units//:go-units",
"@com_github_google_uuid//:uuid",
Expand Down
5 changes: 3 additions & 2 deletions pkg/lightning/backend/external/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package external
import (
"context"
"encoding/hex"
goerrors "errors"
"flag"
"fmt"
"io"
Expand Down Expand Up @@ -267,7 +268,7 @@ func readFileSequential(t *testing.T, s *readTestSuite) {
break
}
}
intest.Assert(err == io.EOF)
intest.Assert(goerrors.Is(err, io.EOF))
totalFileSize.Add(int64(sz))
err = reader.Close()
intest.AssertNoError(err)
Expand Down Expand Up @@ -311,7 +312,7 @@ func readFileConcurrently(t *testing.T, s *readTestSuite) {
break
}
}
intest.Assert(err == io.EOF)
intest.Assert(goerrors.Is(err, io.EOF))
totalFileSize.Add(int64(sz))
err = reader.Close()
intest.AssertNoError(err)
Expand Down
35 changes: 16 additions & 19 deletions pkg/lightning/backend/external/byte_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@ package external

import (
"context"
goerrors "errors"
"fmt"
"io"

"github.com/aws/aws-sdk-go/aws"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/br/pkg/storage"
Expand Down Expand Up @@ -74,11 +76,10 @@ func openStoreReaderAndSeek(
initFileOffset uint64,
prefetchSize int,
) (storage.ExternalFileReader, error) {
storageReader, err := store.Open(ctx, name, &storage.ReaderOption{PrefetchSize: prefetchSize})
if err != nil {
return nil, err
}
_, err = storageReader.Seek(int64(initFileOffset), io.SeekStart)
storageReader, err := store.Open(ctx, name, &storage.ReaderOption{
StartOffset: aws.Int64(int64(initFileOffset)),
PrefetchSize: prefetchSize,
})
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -157,7 +158,7 @@ func (r *byteReader) switchConcurrentMode(useConcurrent bool) error {
return err
}
err := r.reload()
if err != nil && err == io.EOF {
if goerrors.Is(err, io.EOF) {
// ignore EOF error, let readNBytes handle it
return nil
}
Expand Down Expand Up @@ -228,16 +229,12 @@ func (r *byteReader) readNBytes(n int) ([]byte, error) {
hasRead := readLen > 0
for n > 0 {
err := r.reload()
switch err {
case nil:
case io.EOF:
// EOF is only allowed when we have not read any data
if hasRead {
return nil, io.ErrUnexpectedEOF
if err != nil {
if goerrors.Is(err, io.EOF) && hasRead {
// EOF is only allowed when we have not read any data
return nil, errors.Annotatef(io.ErrUnexpectedEOF, "file: %s", r.concurrentReader.filename)
}
return nil, err
default:
return nil, err
return nil, errors.Trace(err)
}
readLen, bs = r.next(n)
hasRead = hasRead || readLen > 0
Expand Down Expand Up @@ -301,15 +298,15 @@ func (r *byteReader) reload() error {
// when not using concurrentReader, len(curBuf) == 1
n, err := io.ReadFull(r.storageReader, r.curBuf[0][0:])
if err != nil {
switch err {
case io.EOF:
switch {
case goerrors.Is(err, io.EOF):
// move curBufIdx so following read will also find EOF
r.curBufIdx = len(r.curBuf)
return err
case io.ErrUnexpectedEOF:
case goerrors.Is(err, io.ErrUnexpectedEOF):
// The last batch.
r.curBuf[0] = r.curBuf[0][:n]
case context.Canceled:
case goerrors.Is(err, context.Canceled):
return err
default:
r.logger.Warn("other error during read", zap.Error(err))
Expand Down
7 changes: 4 additions & 3 deletions pkg/lightning/backend/external/byte_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package external

import (
"context"
goerrors "errors"
"io"
"net/http/httptest"
"testing"
Expand Down Expand Up @@ -189,7 +190,7 @@ func TestUnexpectedEOF(t *testing.T) {
func TestEmptyContent(t *testing.T) {
ms := &mockExtStore{src: []byte{}}
_, err := newByteReader(context.Background(), ms, 100)
require.Equal(t, io.EOF, err)
require.ErrorIs(t, err, io.EOF)

st, clean := NewS3WithBucketAndPrefix(t, "test", "testprefix")
defer clean()
Expand All @@ -204,7 +205,7 @@ func TestEmptyContent(t *testing.T) {
return rsc
}
_, err = newByteReader(context.Background(), newRsc(), 100)
require.Equal(t, io.EOF, err)
require.ErrorIs(t, err, io.EOF)
}

func TestSwitchMode(t *testing.T) {
Expand Down Expand Up @@ -260,7 +261,7 @@ func TestSwitchMode(t *testing.T) {
}
}
key, val, err := kvReader.nextKV()
if err == io.EOF {
if goerrors.Is(err, io.EOF) {
break
}
require.NoError(t, err)
Expand Down
11 changes: 2 additions & 9 deletions pkg/lightning/backend/external/concurrent_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@ import (
"context"
"io"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/pkg/lightning/log"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)

Expand Down Expand Up @@ -87,13 +86,7 @@ func (r *concurrentFileReader) read(bufs [][]byte) ([][]byte, error) {
buf,
)
if err != nil {
log.FromContext(r.ctx).Error(
"concurrent read meet error",
zap.Int64("offset", offset),
zap.Int("readSize", len(buf)),
zap.Error(err),
)
return err
return errors.Annotatef(err, "offset: %d, readSize: %d", offset, len(buf))
}
return nil
})
Expand Down
3 changes: 2 additions & 1 deletion pkg/lightning/backend/external/concurrent_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package external

import (
"context"
goerrors "errors"
"io"
"testing"
"time"
Expand Down Expand Up @@ -70,7 +71,7 @@ func TestConcurrentRead(t *testing.T) {
for {
bs, err := rd.read(bufs)
if err != nil {
if err == io.EOF {
if goerrors.Is(err, io.EOF) {
break
}
require.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/lightning/backend/external/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func TestKVReadWrite(t *testing.T) {
require.Equal(t, values[i], value)
}
_, _, err = kvReader.nextKV()
require.Equal(t, io.EOF, err)
require.ErrorIs(t, err, io.EOF)

require.NoError(t, kvReader.Close())
}
Loading

0 comments on commit 0f5ba8f

Please sign in to comment.