From 22fb318db675acf7478f4dd92cc34a18e8f621fd Mon Sep 17 00:00:00 2001 From: Umputun Date: Sat, 25 Jan 2020 02:13:56 -0600 Subject: [PATCH 1/8] fix potential race on close --- backend/app/store/image/image.go | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/backend/app/store/image/image.go b/backend/app/store/image/image.go index 94a2e62950..095c5ad659 100644 --- a/backend/app/store/image/image.go +++ b/backend/app/store/image/image.go @@ -75,10 +75,11 @@ func (s *Service) Submit(idsFn func() []string) { time.Sleep(time.Millisecond * 10) // small sleep to relive busy wait but keep reactive for term (close) } for _, id := range req.idsFn() { - if err := s.Commit(id); err != nil { + if err := s.commit(id); err != nil { log.Printf("[WARN] failed to commit image %s", id) } } + atomic.StoreInt32(&s.term, 0) // indicates completion of ids commits } log.Printf("[INFO] image submitter terminated") }() @@ -130,7 +131,14 @@ func (s *Service) Cleanup(ctx context.Context) { // Close flushes all in-progress submits and enforces waiting commits func (s *Service) Close() { log.Printf("[INFO] close image service ") - atomic.AddInt32(&s.term, 1) // enforce non-delayed commits for all ids left in submitCh + atomic.StoreInt32(&s.term, 1) // enforce non-delayed commits for all ids left in submitCh + for { + // set to 0 by commit goroutine after everything waited on TTL sent + if atomic.LoadInt32(&s.term) == 0 { + break + } + time.Sleep(10 * time.Millisecond) + } if s.submitCh != nil { close(s.submitCh) } From cea92e1a87d88dda48e55a3afcda89f2fc7736df Mon Sep 17 00:00:00 2001 From: Umputun Date: Sat, 25 Jan 2020 02:15:34 -0600 Subject: [PATCH 2/8] move validate inside --- backend/app/store/image/image.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/backend/app/store/image/image.go b/backend/app/store/image/image.go index 095c5ad659..275828f04c 100644 --- a/backend/app/store/image/image.go +++ b/backend/app/store/image/image.go @@ -206,6 +206,12 @@ func isValidImage(b []byte) bool { } func readAndValidateImage(r io.Reader, maxSize int) ([]byte, error) { + + isValidImage := func(b []byte) bool { + ct := http.DetectContentType(b) + return ct == "image/gif" || ct == "image/png" || ct == "image/jpeg" || ct == "image/webp" + } + lr := io.LimitReader(r, int64(maxSize)+1) data, err := ioutil.ReadAll(lr) if err != nil { From de35c78e381e2c5aed02ba904c9b9ed9a303f6f5 Mon Sep 17 00:00:00 2001 From: Umputun Date: Sat, 25 Jan 2020 02:18:06 -0600 Subject: [PATCH 3/8] demote commit and cleanup in image.Store to non-exposed functions. --- backend/app/store/image/bolt_store.go | 10 ++-- backend/app/store/image/bolt_store_test.go | 8 ++-- backend/app/store/image/fs_store.go | 12 ++--- backend/app/store/image/fs_store_test.go | 8 ++-- backend/app/store/image/image.go | 30 ++++++------ backend/app/store/image/image_mock.go | 56 +++++++++++----------- backend/app/store/image/image_test.go | 31 ++++++------ backend/app/store/service/service.go | 15 +++--- backend/app/store/service/service_test.go | 4 +- 9 files changed, 85 insertions(+), 89 deletions(-) diff --git a/backend/app/store/image/bolt_store.go b/backend/app/store/image/bolt_store.go index 66ba72a6a1..e958800684 100644 --- a/backend/app/store/image/bolt_store.go +++ b/backend/app/store/image/bolt_store.go @@ -29,7 +29,7 @@ type Bolt struct { MaxWidth int } -// Create Bolt Store. +// NewBoltStorage create bolt image store func NewBoltStorage(fileName string, maxSize int, maxHeight int, maxWidth int, options bolt.Options) (*Bolt, error) { db, err := bolt.Open(fileName, 0600, &options) if err != nil { @@ -60,7 +60,7 @@ func NewBoltStorage(fileName string, maxSize int, maxHeight int, maxWidth int, o }, nil } -// SaveWithID saves data from reader with given id +// SaveWithID saves data from a reader, for given id func (b *Bolt) SaveWithID(id string, r io.Reader) (string, error) { data, err := readAndValidateImage(r, b.MaxSize) if err != nil { @@ -87,14 +87,14 @@ func (b *Bolt) SaveWithID(id string, r io.Reader) (string, error) { } // Save data from reader to staging bucket in DB -func (b *Bolt) Save(fileName string, userID string, r io.Reader) (id string, err error) { +func (b *Bolt) Save(_ string, userID string, r io.Reader) (id string, err error) { id = path.Join(userID, guid()) return b.SaveWithID(id, r) } // Commit file stored in staging bucket by copying it to permanent bucket // Data from staging bucket not removed immediately, but would be removed on cleanup -func (b *Bolt) Commit(id string) error { +func (b *Bolt) commit(id string) error { err := b.db.Update(func(tx *bolt.Tx) error { data := tx.Bucket([]byte(imagesStagedBktName)).Get([]byte(id)) if data == nil { @@ -127,7 +127,7 @@ func (b *Bolt) Load(id string) (io.ReadCloser, int64, error) { } // Cleanup runs scan of staging and removes old data based on ttl -func (b *Bolt) Cleanup(ctx context.Context, ttl time.Duration) error { +func (b *Bolt) cleanup(_ context.Context, ttl time.Duration) error { err := b.db.Update(func(tx *bolt.Tx) error { c := tx.Bucket([]byte(insertTimeBktName)).Cursor() diff --git a/backend/app/store/image/bolt_store_test.go b/backend/app/store/image/bolt_store_test.go index 5ae27ecf6a..d206c49a84 100644 --- a/backend/app/store/image/bolt_store_test.go +++ b/backend/app/store/image/bolt_store_test.go @@ -31,7 +31,7 @@ func TestBoltStore_SaveCommit(t *testing.T) { }) assert.NoError(t, err) - err = svc.Commit(id) + err = svc.commit(id) require.NoError(t, err) err = svc.db.View(func(tx *bolt.Tx) error { @@ -90,7 +90,7 @@ func TestBoltStore_Cleanup(t *testing.T) { time.Sleep(100 * time.Millisecond) img3 := save("blah_ff3.png", "user2") - err := svc.Cleanup(context.Background(), time.Since(img1ts)) // clean first images + err := svc.cleanup(context.Background(), time.Since(img1ts)) // clean first images assert.NoError(t, err) assertBoltImgNil(t, svc.db, imagesStagedBktName, img1) @@ -98,10 +98,10 @@ func TestBoltStore_Cleanup(t *testing.T) { assertBoltImgNotNil(t, svc.db, imagesStagedBktName, img2) assertBoltImgNotNil(t, svc.db, imagesStagedBktName, img3) - err = svc.Commit(img3) + err = svc.commit(img3) require.NoError(t, err) - err = svc.Cleanup(context.Background(), time.Millisecond*10) + err = svc.cleanup(context.Background(), time.Millisecond*10) assert.NoError(t, err) assertBoltImgNil(t, svc.db, imagesStagedBktName, img2) diff --git a/backend/app/store/image/fs_store.go b/backend/app/store/image/fs_store.go index b800812b27..9c1bbc90c2 100644 --- a/backend/app/store/image/fs_store.go +++ b/backend/app/store/image/fs_store.go @@ -36,7 +36,7 @@ type FileSystem struct { } } -// SaveWithID saves data from reader with given id +// SaveWithID saves data from a reader, with given id func (f *FileSystem) SaveWithID(id string, r io.Reader) (string, error) { data, err := readAndValidateImage(r, f.MaxSize) if err != nil { @@ -51,15 +51,15 @@ func (f *FileSystem) SaveWithID(id string, r io.Reader) (string, error) { } if err = ioutil.WriteFile(dst, data, 0600); err != nil { - return "", errors.Wrapf(err, "can't write file") + return "", errors.Wrapf(err, "can't write image file") } log.Printf("[DEBUG] file %s saved for image %s, size=%d", dst, id, len(data)) return id, nil } -// Save data from reader for given file name to local FS, staging directory. Returns id as user/uuid -// Files partitioned across multiple subdirectories and the final path includes part, i.e. /location/user1/03/123-4567 +// Save data from a reader for given file name to local FS, staging directory. Returns id as user/uuid +// Files partitioned across multiple subdirectories, and the final path includes part, i.e. /location/user1/03/123-4567 func (f *FileSystem) Save(fileName string, userID string, r io.Reader) (id string, err error) { id = path.Join(userID, guid()) // make id as user/uuid finalID, err := f.SaveWithID(id, r) @@ -70,7 +70,7 @@ func (f *FileSystem) Save(fileName string, userID string, r io.Reader) (id strin } // Commit file stored in staging location by moving it to permanent location -func (f *FileSystem) Commit(id string) error { +func (f *FileSystem) commit(id string) error { log.Printf("[DEBUG] commit image %s", id) stagingImage, permImage := f.location(f.Staging, id), f.location(f.Location, id) @@ -110,7 +110,7 @@ func (f *FileSystem) Load(id string) (io.ReadCloser, int64, error) { } // Cleanup runs scan of staging and removes old files based on ttl -func (f *FileSystem) Cleanup(ctx context.Context, ttl time.Duration) error { +func (f *FileSystem) cleanup(_ context.Context, ttl time.Duration) error { if _, err := os.Stat(f.Staging); os.IsNotExist(err) { return nil diff --git a/backend/app/store/image/fs_store_test.go b/backend/app/store/image/fs_store_test.go index 5af64a9bd5..1a3897ff5c 100644 --- a/backend/app/store/image/fs_store_test.go +++ b/backend/app/store/image/fs_store_test.go @@ -129,7 +129,7 @@ func TestFsStore_SaveAndCommit(t *testing.T) { id, err := svc.Save("file1.png", "user1", gopherPNG()) require.NoError(t, err) - err = svc.Commit(id) + err = svc.commit(id) require.NoError(t, err) imgStaging := svc.location(svc.Staging, id) @@ -180,7 +180,7 @@ func TestFsStore_LoadAfterCommit(t *testing.T) { id, err := svc.Save("blah_ff1.png", "user1", gopherPNG()) assert.NoError(t, err) t.Log(id) - err = svc.Commit(id) + err = svc.commit(id) require.NoError(t, err) r, sz, err := svc.Load(id) @@ -260,7 +260,7 @@ func TestFsStore_Cleanup(t *testing.T) { img3 := save("blah_ff3.png", "user2") time.Sleep(100 * time.Millisecond) // make first image expired - err := svc.Cleanup(context.Background(), time.Millisecond*300) + err := svc.cleanup(context.Background(), time.Millisecond*300) assert.NoError(t, err) _, err = os.Stat(img1) @@ -280,7 +280,7 @@ func TestFsStore_Cleanup(t *testing.T) { assert.NoError(t, err, "file on staging") time.Sleep(200 * time.Millisecond) // make all images expired - err = svc.Cleanup(context.Background(), time.Millisecond*300) + err = svc.cleanup(context.Background(), time.Millisecond*300) assert.NoError(t, err) _, err = os.Stat(img2) diff --git a/backend/app/store/image/image.go b/backend/app/store/image/image.go index 275828f04c..0675abeac7 100644 --- a/backend/app/store/image/image.go +++ b/backend/app/store/image/image.go @@ -1,6 +1,6 @@ // Package image handles storing, resizing and retrieval of images -// Provides Store with Save and Load and one implementation on top of local file system. -// Service object encloses Store and add common methods, this is the one consumer should use +// Provides Store with Save and Load and implementations on top of local file system and bolt db. +// Service object encloses Store and add common methods, this is the one consumer should use. package image //go:generate sh -c "mockery -inpkg -name Store -print > /tmp/mock.tmp && mv /tmp/mock.tmp image_mock.go" @@ -28,17 +28,20 @@ import ( ) // Store defines interface for saving and loading pictures. -// Declares two-stage save with commit +// Declares two-stage save with commit. Save stores to staging area and Commit moves to the final location type Store interface { - SaveWithID(id string, r io.Reader) (string, error) - Save(fileName string, userID string, r io.Reader) (id string, err error) // get name and reader and returns ID of stored image - Commit(id string) error // move image from staging to permanent + Save(fileName string, userID string, r io.Reader) (id string, err error) // get name and reader and returns ID of stored (staging) image + SaveWithID(id string, r io.Reader) (string, error) // store image for passed id to staging Load(id string) (io.ReadCloser, int64, error) // load image by ID. Caller has to close the reader. - Cleanup(ctx context.Context, ttl time.Duration) error // run removal loop for old images on staging SizeLimit() int // max image size + + commit(id string) error // move image from staging to permanent + cleanup(ctx context.Context, ttl time.Duration) error // run removal loop for old images on staging } -// Service extends Store with common functions needed for any store implementation +// Service wrap Store with common functions needed for any store implementation +// It also provides async Submit with func param retrieving all submitting ids. +// Submitted ids committed (i.e. moved from staging to final) on TTL expiration. type Service struct { Store TTL time.Duration // for how long file allowed on staging @@ -121,7 +124,7 @@ func (s *Service) Cleanup(ctx context.Context) { log.Printf("[INFO] cleanup terminated, %v", ctx.Err()) return case <-time.After(s.TTL / 2): - if err := s.Store.Cleanup(ctx, s.TTL); err != nil { + if err := s.Store.cleanup(ctx, s.TTL); err != nil { log.Printf("[WARN] failed to cleanup, %v", err) } } @@ -199,12 +202,7 @@ func getProportionalSizes(srcW, srcH int, limitW, limitH int) (resW, resH int) { return limitW, int(propH) } -// check if file f is a valid image format, i.e. gif, png, jpeg or webp -func isValidImage(b []byte) bool { - ct := http.DetectContentType(b) - return ct == "image/gif" || ct == "image/png" || ct == "image/jpeg" || ct == "image/webp" -} - +// check if file f is a valid image format, i.e. gif, png, jpeg or webp and reads up to maxSize. func readAndValidateImage(r io.Reader, maxSize int) ([]byte, error) { isValidImage := func(b []byte) bool { @@ -224,7 +222,7 @@ func readAndValidateImage(r io.Reader, maxSize int) ([]byte, error) { // read header first, needs it to check if data is valid png/gif/jpeg if !isValidImage(data[:512]) { - return nil, errors.Errorf("file format is not allowed") + return nil, errors.Errorf("file format not allowed") } return data, nil diff --git a/backend/app/store/image/image_mock.go b/backend/app/store/image/image_mock.go index 47cda23dda..3ab8c2b50b 100644 --- a/backend/app/store/image/image_mock.go +++ b/backend/app/store/image/image_mock.go @@ -16,34 +16,6 @@ type MockStore struct { mock.Mock } -// Cleanup provides a mock function with given fields: ctx, ttl -func (_m *MockStore) Cleanup(ctx context.Context, ttl time.Duration) error { - ret := _m.Called(ctx, ttl) - - var r0 error - if rf, ok := ret.Get(0).(func(context.Context, time.Duration) error); ok { - r0 = rf(ctx, ttl) - } else { - r0 = ret.Error(0) - } - - return r0 -} - -// Commit provides a mock function with given fields: id -func (_m *MockStore) Commit(id string) error { - ret := _m.Called(id) - - var r0 error - if rf, ok := ret.Get(0).(func(string) error); ok { - r0 = rf(id) - } else { - r0 = ret.Error(0) - } - - return r0 -} - // Load provides a mock function with given fields: id func (_m *MockStore) Load(id string) (io.ReadCloser, int64, error) { ret := _m.Called(id) @@ -129,3 +101,31 @@ func (_m *MockStore) SizeLimit() int { return r0 } + +// cleanup provides a mock function with given fields: ctx, ttl +func (_m *MockStore) cleanup(ctx context.Context, ttl time.Duration) error { + ret := _m.Called(ctx, ttl) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, time.Duration) error); ok { + r0 = rf(ctx, ttl) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// commit provides a mock function with given fields: id +func (_m *MockStore) commit(id string) error { + ret := _m.Called(id) + + var r0 error + if rf, ok := ret.Get(0).(func(string) error); ok { + r0 = rf(id) + } else { + r0 = ret.Error(0) + } + + return r0 +} diff --git a/backend/app/store/image/image_test.go b/backend/app/store/image/image_test.go index 67b70fbb43..5a7a7c4dd6 100644 --- a/backend/app/store/image/image_test.go +++ b/backend/app/store/image/image_test.go @@ -37,63 +37,63 @@ func TestService_ExtractPictures2(t *testing.T) { func TestService_Cleanup(t *testing.T) { store := MockStore{} - store.On("Cleanup", mock.Anything, mock.Anything).Times(10).Return(nil) + store.On("cleanup", mock.Anything, mock.Anything).Times(10).Return(nil) svc := Service{Store: &store, TTL: 100 * time.Millisecond} ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*549) defer cancel() svc.Cleanup(ctx) - store.AssertNumberOfCalls(t, "Cleanup", 10) + store.AssertNumberOfCalls(t, "cleanup", 10) } func TestService_Submit(t *testing.T) { store := MockStore{} - store.On("Commit", mock.Anything, mock.Anything).Times(5).Return(nil) + store.On("commit", mock.Anything, mock.Anything).Times(5).Return(nil) svc := Service{Store: &store, ImageAPI: "/blah/", TTL: time.Millisecond * 100} svc.Submit(func() []string { return []string{"id1", "id2", "id3"} }) svc.Submit(func() []string { return []string{"id4", "id5"} }) svc.Submit(nil) - store.AssertNumberOfCalls(t, "Commit", 0) + store.AssertNumberOfCalls(t, "commit", 0) time.Sleep(time.Millisecond * 150) - store.AssertNumberOfCalls(t, "Commit", 5) + store.AssertNumberOfCalls(t, "commit", 5) } func TestService_Close(t *testing.T) { store := MockStore{} - store.On("Commit", mock.Anything, mock.Anything).Times(5).Return(nil) + store.On("commit", mock.Anything, mock.Anything).Times(5).Return(nil) svc := Service{Store: &store, ImageAPI: "/blah/", TTL: time.Millisecond * 500} svc.Submit(func() []string { return []string{"id1", "id2", "id3"} }) svc.Submit(func() []string { return []string{"id4", "id5"} }) svc.Submit(nil) svc.Close() - store.AssertNumberOfCalls(t, "Commit", 5) + store.AssertNumberOfCalls(t, "commit", 5) } func TestService_SubmitDelay(t *testing.T) { store := MockStore{} - store.On("Commit", mock.Anything, mock.Anything).Times(5).Return(nil) + store.On("commit", mock.Anything, mock.Anything).Times(5).Return(nil) svc := Service{Store: &store, ImageAPI: "/blah/", TTL: time.Millisecond * 100} svc.Submit(func() []string { return []string{"id1", "id2", "id3"} }) time.Sleep(150 * time.Millisecond) // let first batch to pass TTL svc.Submit(func() []string { return []string{"id4", "id5"} }) svc.Submit(nil) - store.AssertNumberOfCalls(t, "Commit", 3) + store.AssertNumberOfCalls(t, "commit", 3) svc.Close() - store.AssertNumberOfCalls(t, "Commit", 5) + store.AssertNumberOfCalls(t, "commit", 5) } func TestService_resize(t *testing.T) { - // Reader is nil. + // reader is nil resized := resize(nil, 100, 100) assert.Nil(t, resized) - // Negative limit error. + // negative limit error resized = resize([]byte("some picture bin data"), -1, -1) require.NotNil(t, resized) assert.Equal(t, resized, []byte("some picture bin data")) - // Decode error. + // decode error resized = resize([]byte("invalid image content"), 100, 100) assert.NotNil(t, resized) assert.Equal(t, resized, []byte("invalid image content")) @@ -110,15 +110,14 @@ func TestService_resize(t *testing.T) { img, err := ioutil.ReadFile(c.file) require.NoError(t, err, "can't open test file %s", c.file) - // No need for resize, image dimensions are smaller than resize limit. + // no need for resize, image dimensions are smaller than resize limit resized = resize(img, 800, 800) assert.NotNil(t, resized, "file %s", c.file) assert.Equal(t, resized, img) - // Resizing to half of width. Check resized image format PNG. + // resizing to half of width resized = resize(img, 400, 400) assert.NotNil(t, resized, "file %s", c.file) - imgRz, format, err := image.Decode(bytes.NewBuffer(resized)) assert.NoError(t, err, "file %s", c.file) assert.Equal(t, "png", format, "file %s", c.file) diff --git a/backend/app/store/service/service.go b/backend/app/store/service/service.go index 7b90c4afa8..e168c18dac 100644 --- a/backend/app/store/service/service.go +++ b/backend/app/store/service/service.go @@ -102,7 +102,7 @@ func (s *DataStore) Create(comment store.Comment) (commentID string, err error) comment.PostTitle = title }() - s.submitImages(comment) + s.submitImages(comment.Locator, comment.ID) if e := s.AdminStore.OnEvent(comment.Locator.SiteID, admin.EvCreate); e != nil { log.Printf("[WARN] failed to send create event, %s", e) } @@ -202,23 +202,22 @@ func (s *DataStore) DeleteUserDetail(siteID string, userID string, detail engine } // submitImages initiated delayed commit of all images from the comment uploaded to remark42 -func (s *DataStore) submitImages(comment store.Comment) { +func (s *DataStore) submitImages(locator store.Locator, commentID string) { - s.ImageService.Submit(func() []string { - c := comment + s.ImageService.Submit(func() []string { // get all ids from comment's text // this can be called after last edit, we have to retrieve fresh comment - cc, err := s.Engine.Get(engine.GetRequest{Locator: c.Locator, CommentID: c.ID}) + cc, err := s.Engine.Get(engine.GetRequest{Locator: locator, CommentID: commentID}) if err != nil { - log.Printf("[WARN] can't get comment's %s text for image extraction, %v", c.ID, err) + log.Printf("[WARN] can't get comment's %s text for image extraction, %v", commentID, err) return nil } imgIds, err := s.ImageService.ExtractPictures(cc.Text) if err != nil { - log.Printf("[WARN] can't get extract pictures from %s, %v", c.ID, err) + log.Printf("[WARN] can't get extract pictures from %s, %v", commentID, err) return nil } if len(imgIds) > 0 { - log.Printf("[DEBUG] image ids extracted from %s - %+v", c.ID, imgIds) + log.Printf("[DEBUG] image ids extracted from %s - %+v", commentID, imgIds) } return imgIds }) diff --git a/backend/app/store/service/service_test.go b/backend/app/store/service/service_test.go index 3aa4e5af5c..d4ba370087 100644 --- a/backend/app/store/service/service_test.go +++ b/backend/app/store/service/service_test.go @@ -1277,7 +1277,7 @@ func TestService_submitImages(t *testing.T) { lgr.Setup(lgr.Debug, lgr.CallerFile, lgr.CallerFunc) mockStore := image.MockStore{} - mockStore.On("Commit", mock.Anything, mock.Anything).Times(2).Return(nil) + mockStore.On("commit", mock.Anything, mock.Anything).Times(2).Return(nil) imgSvc := &image.Service{Store: &mockStore, TTL: time.Millisecond * 50} // two comments for https://radio-t.com @@ -1296,7 +1296,7 @@ func TestService_submitImages(t *testing.T) { _, err := b.Engine.Create(c) // create directly with engine, doesn't call submitImages assert.NoError(t, err) - b.submitImages(c) + b.submitImages(c.Locator, c.ID) time.Sleep(250 * time.Millisecond) } From 266791b615db55daa3a2c140eaeabf8bd7ab2c27 Mon Sep 17 00:00:00 2001 From: Umputun Date: Sat, 25 Jan 2020 02:19:05 -0600 Subject: [PATCH 4/8] replace immediate image commit with delayed via Submit --- backend/app/rest/proxy/image.go | 5 +---- backend/app/rest/proxy/image_test.go | 4 ++-- 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/backend/app/rest/proxy/image.go b/backend/app/rest/proxy/image.go index e9c0a25323..98901c639c 100644 --- a/backend/app/rest/proxy/image.go +++ b/backend/app/rest/proxy/image.go @@ -159,10 +159,7 @@ func (p Image) cacheImage(r io.Reader, imgID string) { if err != nil { log.Printf("[WARN] unable to save image to the storage: %+v", err) } - // In the future we can do something smarter than just committing everything (eg, some kind of LFU/LRU) - if err := p.ImageService.Commit(id); err != nil { - log.Printf("[WARN] unable to commit image %s", imgID) - } + p.ImageService.Submit(func() []string { return []string{id} }) } // download an image. Returns a Reader which has to be closed by a caller diff --git a/backend/app/rest/proxy/image_test.go b/backend/app/rest/proxy/image_test.go index 88351ffd01..0e7dc523fd 100644 --- a/backend/app/rest/proxy/image_test.go +++ b/backend/app/rest/proxy/image_test.go @@ -114,7 +114,7 @@ func TestImage_Routes_CachingImage(t *testing.T) { imageStore.On("Load", mock.Anything).Once().Return(nil, int64(0), nil) imageStore.On("SaveWithID", mock.Anything, mock.Anything).Once().Run(func(args mock.Arguments) { _, _ = ioutil.ReadAll(args.Get(1).(io.Reader)) }).Return("", nil) - imageStore.On("Commit", mock.Anything).Once().Return(nil) + imageStore.On("commit", mock.Anything).Once().Return(nil) resp, err := http.Get(ts.URL + "/?src=" + encodedImgURL) require.Nil(t, err) @@ -124,7 +124,7 @@ func TestImage_Routes_CachingImage(t *testing.T) { imageStore.AssertCalled(t, "Load", mock.Anything) imageStore.AssertCalled(t, "SaveWithID", "cached_images/4b84b15bff6ee5796152495a230e45e3d7e947d9-"+sha1Str(imgURL), mock.Anything) - imageStore.AssertCalled(t, "Commit", mock.Anything) + imageStore.AssertCalled(t, "commit", mock.Anything) } func TestImage_Routes_Using_Cachded_Image(t *testing.T) { From 07af608d4a36cc993dff2f99f4b1e4523994af89 Mon Sep 17 00:00:00 2001 From: Umputun Date: Sat, 25 Jan 2020 12:17:15 -0600 Subject: [PATCH 5/8] minor: remove error logging, rename tests --- backend/app/rest/proxy/image.go | 3 +-- backend/app/rest/proxy/image_test.go | 26 +++++++++++++------------- 2 files changed, 14 insertions(+), 15 deletions(-) diff --git a/backend/app/rest/proxy/image.go b/backend/app/rest/proxy/image.go index 98901c639c..c53c488071 100644 --- a/backend/app/rest/proxy/image.go +++ b/backend/app/rest/proxy/image.go @@ -186,8 +186,7 @@ func (p Image) downloadImage(ctx context.Context, imgURL string) (io.ReadCloser, return e }) if err != nil { - log.Print(err.Error()) - return nil, err + return nil, errors.Wrapf(err, "can't download image %s", imgURL) } if resp.StatusCode != http.StatusOK { diff --git a/backend/app/rest/proxy/image_test.go b/backend/app/rest/proxy/image_test.go index 0e7dc523fd..0122bd254d 100644 --- a/backend/app/rest/proxy/image_test.go +++ b/backend/app/rest/proxy/image_test.go @@ -19,7 +19,7 @@ import ( "github.com/umputun/remark/backend/app/store/image" ) -func TestPicture_Extract(t *testing.T) { +func TestImage_Extract(t *testing.T) { tbl := []struct { inp string @@ -61,7 +61,7 @@ func TestPicture_Extract(t *testing.T) { } } -func TestPicture_Replace(t *testing.T) { +func TestImage_Replace(t *testing.T) { img := Image{HTTP2HTTPS: true, RoutePath: "/img"} r := img.replace(` xyz `, []string{"http://radio-t.com/img3.png", "http://images.pexels.com/67636/img4.jpeg"}) @@ -73,7 +73,7 @@ func TestImage_Routes(t *testing.T) { ts := httptest.NewServer(http.HandlerFunc(img.Handler)) defer ts.Close() - httpSrv := imgHTTPServer(t) + httpSrv := imgHTTPTestsServer(t) defer httpSrv.Close() encodedImgURL := base64.URLEncoding.EncodeToString([]byte(httpSrv.URL + "/image/img1.png")) @@ -95,7 +95,7 @@ func TestImage_Routes(t *testing.T) { assert.Equal(t, 400, resp.StatusCode) } -func TestImage_Routes_CachingImage(t *testing.T) { +func TestImage_RoutesCachingImage(t *testing.T) { imageStore := image.MockStore{} img := Image{ CacheExternal: true, @@ -106,7 +106,7 @@ func TestImage_Routes_CachingImage(t *testing.T) { ts := httptest.NewServer(http.HandlerFunc(img.Handler)) defer ts.Close() - httpSrv := imgHTTPServer(t) + httpSrv := imgHTTPTestsServer(t) defer httpSrv.Close() imgURL := httpSrv.URL + "/image/img1.png" @@ -127,7 +127,7 @@ func TestImage_Routes_CachingImage(t *testing.T) { imageStore.AssertCalled(t, "commit", mock.Anything) } -func TestImage_Routes_Using_Cachded_Image(t *testing.T) { +func TestImage_RoutesUsingCachedImage(t *testing.T) { imageStore := image.MockStore{} img := Image{ CacheExternal: true, @@ -138,12 +138,12 @@ func TestImage_Routes_Using_Cachded_Image(t *testing.T) { ts := httptest.NewServer(http.HandlerFunc(img.Handler)) defer ts.Close() - httpSrv := imgHTTPServer(t) + httpSrv := imgHTTPTestsServer(t) defer httpSrv.Close() encodedImgURL := base64.URLEncoding.EncodeToString([]byte(httpSrv.URL + "/image/img1.png")) - // In order to validate that cached data is used cache "will return" some other data from what http server would + // In order to validate that cached data used cache "will return" some other data from what http server would imageReader := ioutil.NopCloser(bytes.NewReader([]byte(fmt.Sprintf("%256s", "X")))) imageStore.On("Load", mock.Anything).Once().Return(imageReader, int64(256), nil) @@ -161,7 +161,7 @@ func TestImage_RoutesTimedOut(t *testing.T) { ts := httptest.NewServer(http.HandlerFunc(img.Handler)) defer ts.Close() - httpSrv := imgHTTPServer(t) + httpSrv := imgHTTPTestsServer(t) defer httpSrv.Close() encodedImgURL := base64.URLEncoding.EncodeToString([]byte(httpSrv.URL + "/image/img-slow.png")) @@ -174,7 +174,7 @@ func TestImage_RoutesTimedOut(t *testing.T) { assert.True(t, strings.Contains(string(b), "deadline exceeded")) } -func TestPicture_Convert_ProxyMode(t *testing.T) { +func TestImage_ConvertProxyMode(t *testing.T) { img := Image{HTTP2HTTPS: true, RoutePath: "/img"} r := img.Convert(` xyz `) assert.Equal(t, ` xyz `, r) @@ -191,7 +191,7 @@ func TestPicture_Convert_ProxyMode(t *testing.T) { assert.Equal(t, ` xyz`, r, "disabled, no proxy") } -func TestPicture_Convert_CachingMode(t *testing.T) { +func TestImage_ConvertCachingMode(t *testing.T) { img := Image{CacheExternal: true, RoutePath: "/img", RemarkURL: "https://remark42.com"} r := img.Convert(` xyz `) assert.Equal(t, ` xyz `, r) @@ -206,13 +206,13 @@ func TestPicture_Convert_CachingMode(t *testing.T) { r = img.Convert(``) assert.Equal(t, ``, r) - // both Caching and Proxy are enabled + // both Caching and Proxy enabled img = Image{CacheExternal: true, HTTP2HTTPS: true, RoutePath: "/img", RemarkURL: "https://remark42.com"} r = img.Convert(` xyz `) assert.Equal(t, ` xyz `, r) } -func imgHTTPServer(t *testing.T) *httptest.Server { +func imgHTTPTestsServer(t *testing.T) *httptest.Server { ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if r.URL.Path == "/image/img1.png" { t.Log("http img request", r.URL) From 8a42c10ac6d1c20e34486c142adf707a9ed01062 Mon Sep 17 00:00:00 2001 From: Umputun Date: Sat, 25 Jan 2020 12:33:12 -0600 Subject: [PATCH 6/8] minor: err wrapping, comments wording --- backend/app/store/image/fs_store.go | 5 +++-- backend/app/store/image/image.go | 34 ++++++++++++++--------------- 2 files changed, 20 insertions(+), 19 deletions(-) diff --git a/backend/app/store/image/fs_store.go b/backend/app/store/image/fs_store.go index 9c1bbc90c2..220118296b 100644 --- a/backend/app/store/image/fs_store.go +++ b/backend/app/store/image/fs_store.go @@ -51,7 +51,7 @@ func (f *FileSystem) SaveWithID(id string, r io.Reader) (string, error) { } if err = ioutil.WriteFile(dst, data, 0600); err != nil { - return "", errors.Wrapf(err, "can't write image file") + return "", errors.Wrapf(err, "can't write image file with id %s", id) } log.Printf("[DEBUG] file %s saved for image %s, size=%d", dst, id, len(data)) @@ -64,7 +64,7 @@ func (f *FileSystem) Save(fileName string, userID string, r io.Reader) (id strin id = path.Join(userID, guid()) // make id as user/uuid finalID, err := f.SaveWithID(id, r) if err != nil { - err = errors.Wrapf(err, "can't save file %s", fileName) + err = errors.Wrapf(err, "can't save image file %s", fileName) } return finalID, err } @@ -116,6 +116,7 @@ func (f *FileSystem) cleanup(_ context.Context, ttl time.Duration) error { return nil } + // we can ignore context as on local FS remove is relatively fast operation err := filepath.Walk(f.Staging, func(fpath string, info os.FileInfo, err error) error { if err != nil { return err diff --git a/backend/app/store/image/image.go b/backend/app/store/image/image.go index 0675abeac7..90558690f9 100644 --- a/backend/app/store/image/image.go +++ b/backend/app/store/image/image.go @@ -1,5 +1,5 @@ // Package image handles storing, resizing and retrieval of images -// Provides Store with Save and Load and implementations on top of local file system and bolt db. +// Provides Store with Save and Load implementations on top of local file system and bolt db. // Service object encloses Store and add common methods, this is the one consumer should use. package image @@ -27,19 +27,7 @@ import ( "golang.org/x/image/draw" ) -// Store defines interface for saving and loading pictures. -// Declares two-stage save with commit. Save stores to staging area and Commit moves to the final location -type Store interface { - Save(fileName string, userID string, r io.Reader) (id string, err error) // get name and reader and returns ID of stored (staging) image - SaveWithID(id string, r io.Reader) (string, error) // store image for passed id to staging - Load(id string) (io.ReadCloser, int64, error) // load image by ID. Caller has to close the reader. - SizeLimit() int // max image size - - commit(id string) error // move image from staging to permanent - cleanup(ctx context.Context, ttl time.Duration) error // run removal loop for old images on staging -} - -// Service wrap Store with common functions needed for any store implementation +// Service wraps Store with common functions needed for any store implementation // It also provides async Submit with func param retrieving all submitting ids. // Submitted ids committed (i.e. moved from staging to final) on TTL expiration. type Service struct { @@ -50,7 +38,19 @@ type Service struct { wg sync.WaitGroup submitCh chan submitReq once sync.Once - term int32 + term int32 // term value used atomically to detect emergency termination +} + +// Store defines interface for saving and loading pictures. +// Declares two-stage save with commit. Save stores to staging area and Commit moves to the final location +type Store interface { + Save(fileName string, userID string, r io.Reader) (id string, err error) // get name and reader and returns ID of stored (staging) image + SaveWithID(id string, r io.Reader) (string, error) // store image for passed id to staging + Load(id string) (io.ReadCloser, int64, error) // load image by ID. Caller has to close the reader. + SizeLimit() int // max image size + + commit(id string) error // move image from staging to permanent + cleanup(ctx context.Context, ttl time.Duration) error // run removal loop for old images on staging } const submitQueueSize = 5000 @@ -148,8 +148,8 @@ func (s *Service) Close() { s.wg.Wait() } -// resize an image of supported format (PNG, JPG, GIF) to the size of "limit" px of the -// biggest side (width or height) preserving aspect ratio. +// resize an image of supported format (PNG, JPG, GIF) to the size of "limit" px of +// the biggest side (width or height) preserving aspect ratio. // Returns original data if resizing is not needed or failed. // If resized the result will be for png format func resize(data []byte, limitW, limitH int) []byte { From afe8ed67723ee621ee938cecb4101ee421396d85 Mon Sep 17 00:00:00 2001 From: Dmitry Verkhoturov Date: Sat, 25 Jan 2020 12:50:48 +0100 Subject: [PATCH 7/8] clarify FileSystem.Save code --- backend/app/store/image/fs_store.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/backend/app/store/image/fs_store.go b/backend/app/store/image/fs_store.go index 220118296b..4a74056cef 100644 --- a/backend/app/store/image/fs_store.go +++ b/backend/app/store/image/fs_store.go @@ -61,12 +61,12 @@ func (f *FileSystem) SaveWithID(id string, r io.Reader) (string, error) { // Save data from a reader for given file name to local FS, staging directory. Returns id as user/uuid // Files partitioned across multiple subdirectories, and the final path includes part, i.e. /location/user1/03/123-4567 func (f *FileSystem) Save(fileName string, userID string, r io.Reader) (id string, err error) { - id = path.Join(userID, guid()) // make id as user/uuid - finalID, err := f.SaveWithID(id, r) + tempId := path.Join(userID, guid()) // make id as user/uuid + id, err = f.SaveWithID(tempId, r) if err != nil { err = errors.Wrapf(err, "can't save image file %s", fileName) } - return finalID, err + return id, err } // Commit file stored in staging location by moving it to permanent location From e4e80b927b6902b692dbabbb4a90d53db338f4f5 Mon Sep 17 00:00:00 2001 From: Umputun Date: Mon, 27 Jan 2020 13:17:53 -0600 Subject: [PATCH 8/8] attempt to fix #584 by making submitted image commits on a half of TTL --- backend/app/store/image/fs_store.go | 2 +- backend/app/store/image/fs_store_test.go | 2 +- backend/app/store/image/image.go | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/backend/app/store/image/fs_store.go b/backend/app/store/image/fs_store.go index 4a74056cef..e493e5800c 100644 --- a/backend/app/store/image/fs_store.go +++ b/backend/app/store/image/fs_store.go @@ -125,7 +125,7 @@ func (f *FileSystem) cleanup(_ context.Context, ttl time.Duration) error { return nil } age := time.Since(info.ModTime()) - if age > ttl { + if age > (ttl + 100*time.Millisecond) { // delay cleanup triggering to allow commit log.Printf("[INFO] remove staging image %s, age %v", fpath, age) rmErr := os.Remove(fpath) _ = os.Remove(path.Dir(fpath)) // try to remove directory diff --git a/backend/app/store/image/fs_store_test.go b/backend/app/store/image/fs_store_test.go index 1a3897ff5c..2c5778b7b0 100644 --- a/backend/app/store/image/fs_store_test.go +++ b/backend/app/store/image/fs_store_test.go @@ -259,7 +259,7 @@ func TestFsStore_Cleanup(t *testing.T) { time.Sleep(100 * time.Millisecond) img3 := save("blah_ff3.png", "user2") - time.Sleep(100 * time.Millisecond) // make first image expired + time.Sleep(200 * time.Millisecond) // make first image expired err := svc.cleanup(context.Background(), time.Millisecond*300) assert.NoError(t, err) diff --git a/backend/app/store/image/image.go b/backend/app/store/image/image.go index 90558690f9..8e060c7938 100644 --- a/backend/app/store/image/image.go +++ b/backend/app/store/image/image.go @@ -74,7 +74,7 @@ func (s *Service) Submit(idsFn func() []string) { defer s.wg.Done() for req := range s.submitCh { // wait for TTL expiration with emergency pass on term - for atomic.LoadInt32(&s.term) == 0 && time.Since(req.TS) <= s.TTL { + for atomic.LoadInt32(&s.term) == 0 && time.Since(req.TS) <= s.TTL/2 { // commit on a half of TTL time.Sleep(time.Millisecond * 10) // small sleep to relive busy wait but keep reactive for term (close) } for _, id := range req.idsFn() { @@ -123,7 +123,7 @@ func (s *Service) Cleanup(ctx context.Context) { case <-ctx.Done(): log.Printf("[INFO] cleanup terminated, %v", ctx.Err()) return - case <-time.After(s.TTL / 2): + case <-time.After(s.TTL / 2): // cleanup call on every 1/2 TTL if err := s.Store.cleanup(ctx, s.TTL); err != nil { log.Printf("[WARN] failed to cleanup, %v", err) }