Skip to content

Commit

Permalink
Merge pull request #4 from C2FO/s3-file-wait
Browse files Browse the repository at this point in the history
EP-2036 -> master | NoSuchKey: The specified key does not exist.
  • Loading branch information
funkyshu authored Jul 23, 2018
2 parents 5471c8a + 4e39ade commit 4d67189
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 20 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ go test $(glide novendor)
* Apply last of bugfixes from old repo
* 1.1.0
* Enable server-side encryption on S3 (matching GCS) as a more sane, secure default for files is at rest
* 1.2.0
* For the S3 implementation of the File interface, ensure the file exists in S3 after it is written before continuing.

## Meta

Expand Down
47 changes: 45 additions & 2 deletions s3/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/aws/aws-sdk-go/service/s3/s3manager"

"github.com/c2fo/vfs"
"github.com/c2fo/vfs/mocks"
)

//File implements vfs.File interface for S3 fs.
Expand Down Expand Up @@ -220,6 +221,10 @@ func (f *File) Close() (rerr error) {
}

f.writeBuffer = nil

if err := waitUntilFileExists(f, 5); err != nil {
return err
}
return nil
}

Expand Down Expand Up @@ -346,8 +351,8 @@ func (f *File) putObject(reader io.ReadSeeker) error {
func (f *File) uploadInput() *s3manager.UploadInput {
sseType := "AES256"
return &s3manager.UploadInput{
Bucket: &f.bucket,
Key: &f.key,
Bucket: &f.bucket,
Key: &f.key,
ServerSideEncryption: &sseType,
}
}
Expand All @@ -364,3 +369,41 @@ func (f *File) getObject() (io.ReadCloser, error) {

return getOutput.Body, nil
}

//WaitUntilFileExists attempts to ensure that a recently written file is available before moving on. This is helpful for
// attempting to overcome race conditions withe S3's "eventual consistency".
// WaitUntilFileExists accepts vfs.File and an int representing the number of times to retry(once a second).
// error is returned if the file is still not available after the specified retries.
// nil is returned once the file is available.
func waitUntilFileExists(file vfs.File, retries int) error {
// Ignore in-memory VFS files
if _, ok := file.(*mocks.ReadWriteFile); ok {
return nil
}

// Return as if file was found when retries is set to -1. Useful mainly for testing.
if retries == -1 {
return nil
}
var retryCount = 0
for {
if retryCount == retries {
return errors.New(fmt.Sprintf("Failed to find file %s after %d", file, retries))
}

//check for existing file
found, err := file.Exists()
if err != nil {
return errors.New("unable to check for file on S3")
}

if found {
break
}

retryCount++
time.Sleep(time.Second * 1)
}

return nil
}
36 changes: 18 additions & 18 deletions s3/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ func (ts *fileTestSuite) TestRead() {
s3apiMock.On("GetObject", mock.AnythingOfType("*s3.GetObjectInput")).Return(&s3.GetObjectOutput{
Body: nopCloser{bytes.NewBufferString(contents)},
}, nil)
s3apiMock.On("HeadObject", mock.AnythingOfType("*s3.HeadObjectInput")).Return(&s3.HeadObjectOutput{}, nil)

file, err := fs.NewFile("bucket", "/some/path/file.txt")
if err != nil {
Expand Down Expand Up @@ -91,22 +92,16 @@ func (ts *fileTestSuite) TestSeek() {
s3apiMock.On("GetObject", mock.AnythingOfType("*s3.GetObjectInput")).Return(&s3.GetObjectOutput{
Body: nopCloser{bytes.NewBufferString(contents)},
}, nil)
s3apiMock.On("HeadObject", mock.AnythingOfType("*s3.HeadObjectInput")).Return(&s3.HeadObjectOutput{}, nil)

_, seekErr := file.Seek(6, 0)
assert.NoError(ts.T(), seekErr, "no error expected")

var localFile = bytes.NewBuffer([]byte{})

s3apiMock.AssertExpectations(ts.T())

_, copyErr := io.Copy(localFile, file)
assert.NoError(ts.T(), copyErr, "no error expected")

defer func() {
closeErr := file.Close()
assert.NoError(ts.T(), closeErr, "no error expected")
}()

ts.Equal("world!", localFile.String(), "Seeking should download the file and move the cursor as expected")

localFile = bytes.NewBuffer([]byte{})
Expand All @@ -116,6 +111,10 @@ func (ts *fileTestSuite) TestSeek() {
_, copyErr2 := io.Copy(localFile, file)
assert.NoError(ts.T(), copyErr2, "no error expected")
ts.Equal(contents, localFile.String(), "Subsequent calls to seek work on temp file as expected")

closeErr := file.Close()
assert.NoError(ts.T(), closeErr, "no error expected")
s3apiMock.AssertExpectations(ts.T())
}

func (ts *fileTestSuite) TestGetLocation() {
Expand Down Expand Up @@ -199,6 +198,7 @@ func (ts *fileTestSuite) TestMoveToFile() {

s3apiMock.On("CopyObject", mock.AnythingOfType("*s3.CopyObjectInput")).Return(&s3.CopyObjectOutput{}, nil)
s3apiMock.On("DeleteObject", mock.AnythingOfType("*s3.DeleteObjectInput")).Return(&s3.DeleteObjectOutput{}, nil)
s3apiMock.On("HeadObject", mock.AnythingOfType("*s3.HeadObjectInput")).Return(&s3.HeadObjectOutput{}, nil)

err := testFile.MoveToFile(targetFile)
ts.Nil(err, "Error shouldn't be returned from successful call to CopyToFile")
Expand Down Expand Up @@ -234,6 +234,7 @@ func (ts *fileTestSuite) TestCopyToLocation() {
s3apiMock.On("GetObject", mock.AnythingOfType("*s3.GetObjectInput")).Return(&s3.GetObjectOutput{
Body: nopCloser{bytes.NewBufferString(expectedText)},
}, nil)
s3apiMock.On("HeadObject", mock.AnythingOfType("*s3.HeadObjectInput")).Return(&s3.HeadObjectOutput{}, nil)
file, err := fs.NewFile("bucket", "/hello.txt")
if err != nil {
ts.Fail("Shouldn't return error creating test s3.File instance.")
Expand Down Expand Up @@ -271,23 +272,22 @@ func (ts *fileTestSuite) TestCopyToLocationWithinS3() {
location.On("Volume", mock.Anything).Return("newBucket").Twice()

s3apiMock.On("CopyObject", mock.AnythingOfType("*s3.CopyObjectInput")).Return(&s3.CopyObjectOutput{}, nil)
s3apiMock.On("HeadObject", mock.AnythingOfType("*s3.HeadObjectInput")).Return(&s3.HeadObjectOutput{}, nil)

file, err := fs.NewFile("bucket", "/hello.txt")
if err != nil {
ts.Fail("Shouldn't return error creating test s3.File instance.")
}

defer func() {
closeErr := file.Close()
assert.NoError(ts.T(), closeErr, "no error expected")
}()

otherFs.On("Scheme").Return(Scheme)
otherFs.On("NewFile", "newBucket", "new/file/path/hello.txt").Return(otherFile, nil)

_, err = file.CopyToLocation(location)
assert.NoError(ts.T(), err, "no error expected")

closeErr := file.Close()
assert.NoError(ts.T(), closeErr, "no error expected")

s3apiMock.AssertExpectations(ts.T())
otherFs.AssertExpectations(ts.T())
location.AssertExpectations(ts.T())
Expand All @@ -306,6 +306,7 @@ func (ts *fileTestSuite) TestMoveToLocation() {

s3apiMock.On("CopyObject", mock.AnythingOfType("*s3.CopyObjectInput")).Return(&s3.CopyObjectOutput{}, nil)
s3apiMock.On("DeleteObject", mock.AnythingOfType("*s3.DeleteObjectInput")).Return(&s3.DeleteObjectOutput{}, nil)
s3apiMock.On("HeadObject", mock.AnythingOfType("*s3.HeadObjectInput")).Return(&s3.HeadObjectOutput{}, nil)

file, err := fs.NewFile("bucket", "/hello.txt")
if err != nil {
Expand Down Expand Up @@ -338,20 +339,19 @@ func (ts *fileTestSuite) TestMoveToLocationFail() {
location.On("Volume", mock.Anything).Return("newBucket").Once()

s3apiMock.On("CopyObject", mock.AnythingOfType("*s3.CopyObjectInput")).Return(nil, errors.New("didn't copy, oh noes"))
s3apiMock.On("HeadObject", mock.AnythingOfType("*s3.HeadObjectInput")).Return(&s3.HeadObjectOutput{}, nil)

file, err := fs.NewFile("bucket", "/hello.txt")
if err != nil {
ts.Fail("Shouldn't return error creating test s3.File instance.")
}

defer func() {
closeErr := file.Close()
assert.NoError(ts.T(), closeErr, "no close error expected")
}()

_, merr := file.MoveToLocation(location)
assert.Error(ts.T(), merr, "MoveToLocation error not expected")

closeErr := file.Close()
assert.NoError(ts.T(), closeErr, "no close error expected")

s3apiMock.AssertExpectations(ts.T())
s3apiMock.AssertNotCalled(ts.T(), "DeleteObject", mock.AnythingOfType("*s3.DeleteObjectInput"))
otherFs.AssertExpectations(ts.T())
Expand All @@ -360,7 +360,7 @@ func (ts *fileTestSuite) TestMoveToLocationFail() {

func (ts *fileTestSuite) TestDelete() {
s3apiMock.On("DeleteObject", mock.AnythingOfType("*s3.DeleteObjectInput")).Return(&s3.DeleteObjectOutput{}, nil)

s3apiMock.On("HeadObject", mock.AnythingOfType("*s3.HeadObjectInput")).Return(&s3.HeadObjectOutput{}, nil)
err := testFile.Delete()
ts.Nil(err, "Successful delete should not return an error.")
s3apiMock.AssertExpectations(ts.T())
Expand Down

0 comments on commit 4d67189

Please sign in to comment.