Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Check free space when Registering Task #585

Merged
merged 16 commits into from
Sep 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions cdnsystem/cdn.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,6 @@ func New(cfg *config.Config) (*Server, error) {

// Initialize storage manager
storageMgr.Initialize(taskMgr)
if err != nil {
return nil, errors.Wrapf(err, "create storage manager")
}

// Initialize storage manager
cdnSeedServer, err := rpcserver.NewCdnSeedServer(cfg, taskMgr)
Expand Down
7 changes: 7 additions & 0 deletions cdnsystem/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ var (

// ErrConvertFailed represents failed to convert.
ErrConvertFailed = errors.New("convert failed")

// ErrResourcesLacked represents a lack of resources, for example, the disk does not have enough space.
ErrResourcesLacked = errors.New("resources lacked")
)

// IsSystemError checks the error is a system error or not.
Expand Down Expand Up @@ -152,3 +155,7 @@ func IsFileNotExist(err error) bool {
_, ok := err.(ErrFileNotExist)
return ok
}

func IsResourcesLacked(err error) bool {
return errors.Cause(err) == ErrResourcesLacked
}
5 changes: 5 additions & 0 deletions cdnsystem/rpcserver/rpcserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,11 @@ func (css *CdnSeedServer) ObtainSeeds(ctx context.Context, req *cdnsystem.SeedRe
// register task
pieceChan, err := css.taskMgr.Register(ctx, registerRequest)
if err != nil {
if cdnerrors.IsResourcesLacked(err) {
err = dferrors.Newf(dfcodes.ResourceLacked, "resources lacked for task(%s): %v", req.TaskId, err)
span.RecordError(err)
return err
}
err = dferrors.Newf(dfcodes.CdnTaskRegistryFail, "failed to register seed task(%s): %v", req.TaskId, err)
span.RecordError(err)
return err
Expand Down
4 changes: 4 additions & 0 deletions cdnsystem/supervisor/cdn/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,10 @@ func (cm *Manager) Delete(taskID string) error {
return nil
}

func (cm *Manager) TryFreeSpace(fileLength int64) (bool, error) {
return cm.cacheStore.TryFreeSpace(fileLength)
}

func (cm *Manager) handleCDNResult(task *types.SeedTask, sourceDigest string, downloadMetadata *downloadMetadata) (bool, error) {
logger.WithTaskID(task.TaskID).Debugf("handle cdn result, downloadMetaData: %+v", downloadMetadata)
var isSuccess = true
Expand Down
58 changes: 57 additions & 1 deletion cdnsystem/supervisor/cdn/storage/disk/disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,13 @@ import (
"encoding/json"
"fmt"
"io"
"os"
"path"
"strings"
"time"

"go.uber.org/atomic"

cdnerrors "d7y.io/dragonfly/v2/cdnsystem/errors"
"d7y.io/dragonfly/v2/cdnsystem/storedriver"
"d7y.io/dragonfly/v2/cdnsystem/storedriver/local"
Expand Down Expand Up @@ -131,7 +135,7 @@ func (s *diskStorageMgr) GC() error {
for _, taskID := range gcTaskIDs {
synclock.Lock(taskID, false)
// try to ensure the taskID is not using again
if s.taskMgr.Exist(taskID) {
if _, exist := s.taskMgr.Exist(taskID); exist {
synclock.UnLock(taskID, false)
continue
}
Expand Down Expand Up @@ -230,3 +234,55 @@ func (s *diskStorageMgr) DeleteTask(taskID string) error {
func (s *diskStorageMgr) ResetRepo(task *types.SeedTask) error {
return s.DeleteTask(task.TaskID)
}

func (s *diskStorageMgr) TryFreeSpace(fileLength int64) (bool, error) {
freeSpace, err := s.diskDriver.GetFreeSpace()
if err != nil {
return false, err
}
if freeSpace > 500*unit.GB && freeSpace.ToNumber() > fileLength {
return true, nil
}

remainder := atomic.NewInt64(0)
r := &storedriver.Raw{
WalkFn: func(filePath string, info os.FileInfo, err error) error {
if fileutils.IsRegular(filePath) {
taskID := strings.Split(path.Base(filePath), ".")[0]
task, exist := s.taskMgr.Exist(taskID)
if exist {
var totalLen int64 = 0
if task.CdnFileLength > 0 {
totalLen = task.CdnFileLength
} else {
totalLen = task.SourceFileLength
}
if totalLen > 0 {
remainder.Add(totalLen - info.Size())
}
} else {
logger.Warnf("failed to get task: %s", taskID)
}
}
return nil
},
}
s.diskDriver.Walk(r)

enoughSpace := freeSpace.ToNumber()-remainder.Load() > fileLength
if !enoughSpace {
s.cleaner.GC("disk", true)
remainder.Store(0)
s.diskDriver.Walk(r)
freeSpace, err = s.diskDriver.GetFreeSpace()
if err != nil {
return false, err
}
enoughSpace = freeSpace.ToNumber()-remainder.Load() > fileLength
}
if !enoughSpace {
return false, nil
}

return true, nil
}
115 changes: 115 additions & 0 deletions cdnsystem/supervisor/cdn/storage/disk/disk_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package disk

import (
"fmt"
"testing"

"d7y.io/dragonfly/v2/cdnsystem/storedriver"
"d7y.io/dragonfly/v2/cdnsystem/supervisor/cdn/storage"
"d7y.io/dragonfly/v2/cdnsystem/supervisor/mock"
"github.com/golang/mock/gomock"

"d7y.io/dragonfly/v2/pkg/unit"

"github.com/stretchr/testify/suite"
)

func TestDiskStorageMgrSuite(t *testing.T) {
suite.Run(t, new(DiskStorageMgrSuite))
}

type DiskStorageMgrSuite struct {
m *diskStorageMgr
suite.Suite
}

func (suite *DiskStorageMgrSuite) TestTryFreeSpace() {
ctrl := gomock.NewController(suite.T())
diskDriver := storedriver.NewMockDriver(ctrl)
taskMgr := mock.NewMockSeedTaskMgr(ctrl)
suite.m = &diskStorageMgr{
diskDriver: diskDriver,
taskMgr: taskMgr,
}
diskDriver.EXPECT().GetTotalSpace().Return(100*unit.GB, nil)
cleaner, _ := storage.NewStorageCleaner(suite.m.getDefaultGcConfig(), diskDriver, suite.m, taskMgr)
suite.m.cleaner = cleaner

tests := []struct {
name string
setupSuite func()
fileLength int64
success func(bool, error) bool
}{
{
name: "very large free space",
setupSuite: func() {
// call GetFreeSpace 1 time in TryFreeSpace and return
diskDriver.EXPECT().GetFreeSpace().Return(unit.TB, nil)
},
fileLength: unit.MB.ToNumber(),
success: func(ok bool, err error) bool {
return ok == true && err == nil
},
},
{
name: "try a small file",
setupSuite: func() {
// call GetFreeSpace 1 time in TryFreeSpace
diskDriver.EXPECT().GetFreeSpace().Return(100*unit.GB, nil)
// call Walk 1 time in TryFreeSpace
diskDriver.EXPECT().Walk(gomock.Any())
},
fileLength: unit.KB.ToNumber(),
success: func(ok bool, err error) bool {
return ok == true && err == nil
},
},
{
name: "try a very large file",
setupSuite: func() {
// call GetFreeSpace 2 times in TryFreeSpace, 1 time in GC
diskDriver.EXPECT().GetFreeSpace().Return(100*unit.GB, nil).Times(3)
// call Walk 2 times in TryFreeSpace, 1 time in GC
diskDriver.EXPECT().Walk(gomock.Any()).Times(3)
},
fileLength: unit.TB.ToNumber(),
success: func(ok bool, err error) bool {
return ok == false && err == nil
},
},
{
name: "if get free space meets error",
setupSuite: func() {
// call GetFreeSpace 1 times in TryFreeSpace and return
diskDriver.EXPECT().GetFreeSpace().Return(unit.ToBytes(0), fmt.Errorf("a error for test"))
},
fileLength: unit.MB.ToNumber(),
success: func(ok bool, err error) bool {
return ok == false && err != nil && err.Error() == "a error for test"
},
},
{
name: "ok after gc",
setupSuite: func() {
// first call GetFreeSpace 1 times in TryFreeSpace, 1 time in GC
diskDriver.EXPECT().GetFreeSpace().Return(100*unit.MB, nil).Times(2)
// then call GetFreeSpace 1 times in TryFreeSpace, get another value
diskDriver.EXPECT().GetFreeSpace().Return(100*unit.GB, nil)
// call Walk 2 times in TryFreeSpace, 1 time in GC
diskDriver.EXPECT().Walk(gomock.Any()).Times(3)
},
fileLength: unit.GB.ToNumber(),
success: func(ok bool, err error) bool {
return ok == true && err == nil
},
},
}

for _, tt := range tests {
suite.Run(tt.name, func() {
tt.setupSuite()
suite.True(tt.success(suite.m.TryFreeSpace(tt.fileLength)))
})
}
}
65 changes: 57 additions & 8 deletions cdnsystem/supervisor/cdn/storage/hybrid/hybrid.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,10 +177,7 @@ func (h *hybridStorageMgr) gcTasks(gcTaskIDs []string, isDisk bool) int {
for _, taskID := range gcTaskIDs {
synclock.Lock(taskID, false)
// try to ensure the taskID is not using again
if _, err := h.taskMgr.Get(taskID); err == nil || !cdnerrors.IsDataNotFound(err) {
if err != nil {
logger.GcLogger.With("type", "hybrid").Errorf("gc disk: failed to get taskID(%s): %v", taskID, err)
}
if _, exist := h.taskMgr.Exist(taskID); exist {
synclock.UnLock(taskID, false)
continue
}
Expand Down Expand Up @@ -297,6 +294,58 @@ func (h *hybridStorageMgr) StatDownloadFile(taskID string) (*storedriver.Storage
return h.diskDriver.Stat(storage.GetDownloadRaw(taskID))
}

func (h *hybridStorageMgr) TryFreeSpace(fileLength int64) (bool, error) {
diskFreeSpace, err := h.diskDriver.GetFreeSpace()
if err != nil {
return false, err
}
if diskFreeSpace > 500*unit.GB && diskFreeSpace.ToNumber() > fileLength {
return true, nil
}

remainder := atomic.NewInt64(0)
r := &storedriver.Raw{
WalkFn: func(filePath string, info os.FileInfo, err error) error {
if fileutils.IsRegular(filePath) {
taskID := strings.Split(path.Base(filePath), ".")[0]
task, exist := h.taskMgr.Exist(taskID)
if exist {
var totalLen int64 = 0
if task.CdnFileLength > 0 {
totalLen = task.CdnFileLength
} else {
totalLen = task.SourceFileLength
}
if totalLen > 0 {
remainder.Add(totalLen - info.Size())
}
} else {
logger.Warnf("failed to get task: %s", taskID)
}
}
return nil
},
}
h.diskDriver.Walk(r)

enoughSpace := diskFreeSpace.ToNumber()-remainder.Load() > fileLength
if !enoughSpace {
h.diskDriverCleaner.GC("hybrid", true)
remainder.Store(0)
h.diskDriver.Walk(r)
diskFreeSpace, err = h.diskDriver.GetFreeSpace()
if err != nil {
return false, err
}
enoughSpace = diskFreeSpace.ToNumber()-remainder.Load() > fileLength
}
if !enoughSpace {
return false, nil
}

return true, nil
}

func (h *hybridStorageMgr) deleteDiskFiles(taskID string) error {
return h.deleteTaskFiles(taskID, true, true)
}
Expand Down Expand Up @@ -352,9 +401,9 @@ func (h *hybridStorageMgr) tryShmSpace(url, taskID string, fileLength int64) (st
h.memoryDriver.Walk(&storedriver.Raw{
WalkFn: func(filePath string, info os.FileInfo, err error) error {
if fileutils.IsRegular(filePath) {
taskID := path.Base(filePath)
task, err := h.taskMgr.Get(taskID)
if err == nil {
taskID := strings.Split(path.Base(filePath), ".")[0]
task, exist := h.taskMgr.Exist(taskID)
if exist {
var totalLen int64 = 0
if task.CdnFileLength > 0 {
totalLen = task.CdnFileLength
Expand All @@ -365,7 +414,7 @@ func (h *hybridStorageMgr) tryShmSpace(url, taskID string, fileLength int64) (st
remainder.Add(totalLen - info.Size())
}
} else {
logger.Warnf("failed to get task: %s: %v", taskID, err)
logger.Warnf("failed to get task: %s", taskID)
}
}
return nil
Expand Down
15 changes: 15 additions & 0 deletions cdnsystem/supervisor/cdn/storage/mock/mock_storage_mgr.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion cdnsystem/supervisor/cdn/storage/storage_gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func (cleaner *Cleaner) GC(storagePattern string, force bool) ([]string, error)
walkTaskIds[taskID] = true

// we should return directly when we success to get info which means it is being used
if cleaner.taskMgr.Exist(taskID) {
if _, exist := cleaner.taskMgr.Exist(taskID); exist {
return nil
}

Expand Down
3 changes: 3 additions & 0 deletions cdnsystem/supervisor/cdn/storage/storage_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ type Manager interface {

// DeleteTask delete task from storage
DeleteTask(taskID string) error

// TryFreeSpace checks if there is enough space for the file, return true while we are sure that there is enough space.
TryFreeSpace(fileLength int64) (bool, error)
}

// FileMetaData meta data of task
Expand Down
3 changes: 3 additions & 0 deletions cdnsystem/supervisor/cdn_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,7 @@ type CDNMgr interface {
// Delete the cdn meta with specified taskID.
// The file on the disk will be deleted when the force is true.
Delete(string) error

// TryFreeSpace checks if the free space of the storage is larger than the fileLength.
TryFreeSpace(fileLength int64) (bool, error)
}
Loading