Skip to content

Commit

Permalink
task manager (#885)
Browse files Browse the repository at this point in the history
Signed-off-by: sunwp <[email protected]>
  • Loading branch information
244372610 authored Dec 7, 2021
1 parent 361a6bb commit 743cece
Show file tree
Hide file tree
Showing 10 changed files with 132 additions and 191 deletions.
28 changes: 1 addition & 27 deletions cdn/rpcserver/rpcserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package rpcserver
import (
"context"
"fmt"
"strings"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
Expand All @@ -36,7 +35,6 @@ import (
"d7y.io/dragonfly/v2/pkg/rpc/cdnsystem"
cdnserver "d7y.io/dragonfly/v2/pkg/rpc/cdnsystem/server"
"d7y.io/dragonfly/v2/pkg/util/hostutils"
"d7y.io/dragonfly/v2/pkg/util/stringutils"
)

var tracer = otel.Tracer("cdn-server")
Expand All @@ -58,29 +56,6 @@ func New(cfg *config.Config, taskMgr supervisor.SeedTaskMgr, opts ...grpc.Server
return svr.Server, nil
}

func constructRegisterRequest(req *cdnsystem.SeedRequest) *types.TaskRegisterRequest {
meta := req.UrlMeta
header := make(map[string]string)
if meta != nil {
if !stringutils.IsBlank(meta.Digest) {
header["digest"] = meta.Digest
}
if !stringutils.IsBlank(meta.Range) {
header["range"] = meta.Range
}
for k, v := range meta.Header {
header[k] = v
}
}
return &types.TaskRegisterRequest{
Header: header,
URL: req.Url,
Digest: header["digest"],
TaskID: req.TaskId,
Filter: strings.Split(req.UrlMeta.Filter, "&"),
}
}

func (css *server) ObtainSeeds(ctx context.Context, req *cdnsystem.SeedRequest, psc chan<- *cdnsystem.PieceSeed) (err error) {
var span trace.Span
ctx, span = tracer.Start(ctx, config.SpanObtainSeeds, trace.WithSpanKind(trace.SpanKindServer))
Expand All @@ -96,9 +71,8 @@ func (css *server) ObtainSeeds(ctx context.Context, req *cdnsystem.SeedRequest,
}
logger.Infof("seeds task %s result success: %t", req.TaskId, err == nil)
}()
registerRequest := constructRegisterRequest(req)
// register task
pieceChan, err := css.taskMgr.Register(ctx, registerRequest)
pieceChan, err := css.taskMgr.Register(ctx, types.NewSeedTask(req.TaskId, req.Url, req.UrlMeta))
if err != nil {
if cdnerrors.IsResourcesLacked(err) {
err = dferrors.Newf(base.Code_ResourceLacked, "resources lacked for task(%s): %v", req.TaskId, err)
Expand Down
10 changes: 5 additions & 5 deletions cdn/supervisor/cdn/cache_detector.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,8 @@ func (cd *cacheDetector) doDetect(ctx context.Context, task *types.SeedTask, fil
}
// not expired
if fileMetadata.Finish {
// quickly detect the cache situation through the meta data
return cd.parseByReadMetaFile(task.TaskID, fileMetadata)
// quickly detect the cache situation through the metadata
return cd.detectByReadMetaFile(task.TaskID, fileMetadata)
}
// check if the resource supports range request. if so,
// detect the cache situation by reading piece meta and data file
Expand All @@ -138,11 +138,11 @@ func (cd *cacheDetector) doDetect(ctx context.Context, task *types.SeedTask, fil
if !supportRange {
return nil, errors.Errorf("resource %s is not support range request", task.URL)
}
return cd.parseByReadFile(task.TaskID, fileMetadata, fileDigest)
return cd.detectByReadFile(task.TaskID, fileMetadata, fileDigest)
}

// parseByReadMetaFile detect cache by read meta and pieceMeta files of task
func (cd *cacheDetector) parseByReadMetaFile(taskID string, fileMetadata *storage.FileMetadata) (*cacheResult, error) {
func (cd *cacheDetector) detectByReadMetaFile(taskID string, fileMetadata *storage.FileMetadata) (*cacheResult, error) {
if !fileMetadata.Success {
return nil, fmt.Errorf("success flag of taskID %s is false", taskID)
}
Expand Down Expand Up @@ -174,7 +174,7 @@ func (cd *cacheDetector) parseByReadMetaFile(taskID string, fileMetadata *storag
}

// parseByReadFile detect cache by read pieceMeta and data files of task
func (cd *cacheDetector) parseByReadFile(taskID string, metadata *storage.FileMetadata, fileDigest hash.Hash) (*cacheResult, error) {
func (cd *cacheDetector) detectByReadFile(taskID string, metadata *storage.FileMetadata, fileDigest hash.Hash) (*cacheResult, error) {
reader, err := cd.cacheDataManager.readDownloadFile(taskID)
if err != nil {
return nil, errors.Wrapf(err, "read download data file")
Expand Down
39 changes: 19 additions & 20 deletions cdn/supervisor/cdn/cache_detector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/suite"

cdnerrors "d7y.io/dragonfly/v2/cdn/errors"
"d7y.io/dragonfly/v2/cdn/storedriver"
"d7y.io/dragonfly/v2/cdn/supervisor/cdn/storage"
storageMock "d7y.io/dragonfly/v2/cdn/supervisor/cdn/storage/mock"
Expand All @@ -57,44 +56,44 @@ func (suite *CacheDetectorTestSuite) SetupSuite() {
sourceClient := sourceMock.NewMockResourceClient(ctrl)
source.UnRegister("http")
suite.Require().Nil(source.Register("http", sourceClient, httpprotocol.Adapter))
storageMgr := storageMock.NewMockManager(ctrl)
cacheDataManager := newCacheDataManager(storageMgr)
storageManager := storageMock.NewMockManager(ctrl)
cacheDataManager := newCacheDataManager(storageManager)
suite.detector = newCacheDetector(cacheDataManager)
storageMgr.EXPECT().ReadFileMetadata(fullExpiredCache.taskID).Return(fullExpiredCache.fileMeta, nil).AnyTimes()
storageMgr.EXPECT().ReadFileMetadata(fullNoExpiredCache.taskID).Return(fullNoExpiredCache.fileMeta, nil).AnyTimes()
storageMgr.EXPECT().ReadFileMetadata(partialNotSupportRangeCache.taskID).Return(partialNotSupportRangeCache.fileMeta, nil).AnyTimes()
storageMgr.EXPECT().ReadFileMetadata(partialSupportRangeCache.taskID).Return(partialSupportRangeCache.fileMeta, nil).AnyTimes()
storageMgr.EXPECT().ReadFileMetadata(noCache.taskID).Return(noCache.fileMeta, cdnerrors.ErrFileNotExist{}).AnyTimes()
storageMgr.EXPECT().ReadDownloadFile(fullNoExpiredCache.taskID).DoAndReturn(
storageManager.EXPECT().ReadFileMetadata(fullExpiredCache.taskID).Return(fullExpiredCache.fileMeta, nil).AnyTimes()
storageManager.EXPECT().ReadFileMetadata(fullNoExpiredCache.taskID).Return(fullNoExpiredCache.fileMeta, nil).AnyTimes()
storageManager.EXPECT().ReadFileMetadata(partialNotSupportRangeCache.taskID).Return(partialNotSupportRangeCache.fileMeta, nil).AnyTimes()
storageManager.EXPECT().ReadFileMetadata(partialSupportRangeCache.taskID).Return(partialSupportRangeCache.fileMeta, nil).AnyTimes()
storageManager.EXPECT().ReadFileMetadata(noCache.taskID).Return(noCache.fileMeta, os.ErrNotExist).AnyTimes()
storageManager.EXPECT().ReadDownloadFile(fullNoExpiredCache.taskID).DoAndReturn(
func(taskID string) (io.ReadCloser, error) {
content, err := ioutil.ReadFile("../../testdata/cdn/go.html")
suite.Nil(err)
return ioutil.NopCloser(strings.NewReader(string(content))), nil
}).AnyTimes()
storageMgr.EXPECT().ReadDownloadFile(partialNotSupportRangeCache.taskID).DoAndReturn(
storageManager.EXPECT().ReadDownloadFile(partialNotSupportRangeCache.taskID).DoAndReturn(
func(taskID string) (io.ReadCloser, error) {
content, err := ioutil.ReadFile("../../testdata/cdn/go.html")
suite.Nil(err)
return ioutil.NopCloser(strings.NewReader(string(content))), nil
}).AnyTimes()
storageMgr.EXPECT().ReadDownloadFile(partialSupportRangeCache.taskID).DoAndReturn(
storageManager.EXPECT().ReadDownloadFile(partialSupportRangeCache.taskID).DoAndReturn(
func(taskID string) (io.ReadCloser, error) {
content, err := ioutil.ReadFile("../../testdata/cdn/go.html")
suite.Nil(err)
return ioutil.NopCloser(strings.NewReader(string(content))), nil
}).AnyTimes()
storageMgr.EXPECT().ReadDownloadFile(noCache.taskID).Return(nil, os.ErrNotExist).AnyTimes()
storageMgr.EXPECT().ReadPieceMetaRecords(fullNoExpiredCache.taskID).Return(fullNoExpiredCache.pieces, nil).AnyTimes()
storageMgr.EXPECT().ReadPieceMetaRecords(partialNotSupportRangeCache.taskID).Return(partialNotSupportRangeCache.pieces, nil).AnyTimes()
storageMgr.EXPECT().ReadPieceMetaRecords(partialSupportRangeCache.taskID).Return(partialSupportRangeCache.pieces, nil).AnyTimes()
storageMgr.EXPECT().ReadPieceMetaRecords(noCache.taskID).Return(nil, os.ErrNotExist).AnyTimes()
storageMgr.EXPECT().StatDownloadFile(fullNoExpiredCache.taskID).Return(&storedriver.StorageInfo{
storageManager.EXPECT().ReadDownloadFile(noCache.taskID).Return(nil, os.ErrNotExist).AnyTimes()
storageManager.EXPECT().ReadPieceMetaRecords(fullNoExpiredCache.taskID).Return(fullNoExpiredCache.pieces, nil).AnyTimes()
storageManager.EXPECT().ReadPieceMetaRecords(partialNotSupportRangeCache.taskID).Return(partialNotSupportRangeCache.pieces, nil).AnyTimes()
storageManager.EXPECT().ReadPieceMetaRecords(partialSupportRangeCache.taskID).Return(partialSupportRangeCache.pieces, nil).AnyTimes()
storageManager.EXPECT().ReadPieceMetaRecords(noCache.taskID).Return(nil, os.ErrNotExist).AnyTimes()
storageManager.EXPECT().StatDownloadFile(fullNoExpiredCache.taskID).Return(&storedriver.StorageInfo{
Path: "",
Size: 9789,
CreateTime: time.Time{},
ModTime: time.Time{},
}, nil).AnyTimes()
storageMgr.EXPECT().StatDownloadFile(gomock.Not(fullNoExpiredCache.taskID)).Return(&storedriver.StorageInfo{}, nil).AnyTimes()
storageManager.EXPECT().StatDownloadFile(gomock.Not(fullNoExpiredCache.taskID)).Return(&storedriver.StorageInfo{}, nil).AnyTimes()

sourceClient.EXPECT().IsExpired(source.RequestEq(expiredAndSupportURL), gomock.Any()).Return(true, nil).AnyTimes()
sourceClient.EXPECT().IsSupportRange(source.RequestEq(expiredAndSupportURL)).Return(true, nil).AnyTimes()
Expand Down Expand Up @@ -380,7 +379,7 @@ func (suite *CacheDetectorTestSuite) TestParseByReadFile() {
}
for _, tt := range tests {
suite.Run(tt.name, func() {
got, err := suite.detector.parseByReadFile(tt.args.taskID, tt.args.metadata, md5.New())
got, err := suite.detector.detectByReadFile(tt.args.taskID, tt.args.metadata, md5.New())
suite.Equal(tt.want, got)
suite.Equal(tt.wantErr, err != nil)
})
Expand Down Expand Up @@ -414,7 +413,7 @@ func (suite *CacheDetectorTestSuite) TestParseByReadMetaFile() {
}
for _, tt := range tests {
suite.Run(tt.name, func() {
got, err := suite.detector.parseByReadMetaFile(tt.args.taskID, tt.args.fileMetadata)
got, err := suite.detector.detectByReadMetaFile(tt.args.taskID, tt.args.fileMetadata)
suite.Equal(tt.wantErr, err != nil)
suite.Equal(tt.want, got)
})
Expand Down
14 changes: 0 additions & 14 deletions cdn/supervisor/cdn/storage/mock/mock_storage_mgr.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 2 additions & 9 deletions cdn/supervisor/cdn/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,6 @@ type Manager interface {
// ReadDownloadFile return reader of download file
ReadDownloadFile(taskID string) (io.ReadCloser, error)

// CreateUploadLink create a upload link to download file
CreateUploadLink(taskID string) error

// ReadFileMetadata return meta data of download file
ReadFileMetadata(taskID string) (*FileMetadata, error)

Expand All @@ -79,8 +76,8 @@ type Manager interface {

// FileMetadata meta data of task
type FileMetadata struct {
TaskID string `json:"taskId"`
TaskURL string `json:"taskUrl"`
TaskID string `json:"taskID"`
TaskURL string `json:"taskURL"`
PieceSize int32 `json:"pieceSize"`
SourceFileLen int64 `json:"sourceFileLen"`
AccessTime int64 `json:"accessTime"`
Expand Down Expand Up @@ -181,10 +178,6 @@ func (m *managerPlugin) ReadDownloadFile(taskID string) (io.ReadCloser, error) {
return m.instance.ReadDownloadFile(taskID)
}

func (m *managerPlugin) CreateUploadLink(taskID string) error {
return m.instance.CreateUploadLink(taskID)
}

func (m *managerPlugin) ReadFileMetadata(taskID string) (*FileMetadata, error) {
return m.instance.ReadFileMetadata(taskID)
}
Expand Down
2 changes: 1 addition & 1 deletion cdn/supervisor/mock/mock_task_mgr.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

80 changes: 77 additions & 3 deletions cdn/supervisor/task/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,23 @@ import (
"d7y.io/dragonfly/v2/cdn/types"
"d7y.io/dragonfly/v2/internal/dferrors"
logger "d7y.io/dragonfly/v2/internal/dflog"
"d7y.io/dragonfly/v2/internal/util"
"d7y.io/dragonfly/v2/pkg/source"
"d7y.io/dragonfly/v2/pkg/synclock"
"d7y.io/dragonfly/v2/pkg/syncmap"
"d7y.io/dragonfly/v2/pkg/unit"
"d7y.io/dragonfly/v2/pkg/util/stringutils"
)

// Ensure that Manager implements the SeedTaskMgr and gcExecutor interfaces
var _ supervisor.SeedTaskMgr = (*Manager)(nil)
var _ gc.Executor = (*Manager)(nil)

var (
errURLUnreachable = errors.New("url is unreachable")
errTaskIDConflict = errors.New("taskID is conflict")
)

var tracer trace.Tracer

func init() {
Expand Down Expand Up @@ -72,14 +80,14 @@ func NewManager(cfg *config.Config, cdnMgr supervisor.CDNMgr, progressMgr superv
return taskMgr, nil
}

func (tm *Manager) Register(ctx context.Context, req *types.TaskRegisterRequest) (pieceChan <-chan *types.SeedPiece, err error) {
func (tm *Manager) Register(ctx context.Context, registerTask *types.SeedTask) (pieceChan <-chan *types.SeedPiece, err error) {
var span trace.Span
ctx, span = tracer.Start(ctx, config.SpanTaskRegister)
defer span.End()
task, err := tm.addOrUpdateTask(ctx, req)
task, err := tm.AddOrUpdate(registerTask)
if err != nil {
span.RecordError(err)
logger.WithTaskID(req.TaskID).Infof("failed to add or update task with req: %#v: %v", req, err)
logger.WithTaskID(registerTask.TaskID).Infof("failed to add or update task with req: %#v: %v", registerTask, err)
return nil, err
}
taskBytes, _ := json.Marshal(task)
Expand Down Expand Up @@ -172,6 +180,72 @@ func (tm *Manager) getTask(taskID string) (*types.SeedTask, error) {
return nil, errors.Wrapf(cdnerrors.ErrConvertFailed, "origin object: %#v", v)
}

func (tm *Manager) AddOrUpdate(registerTask *types.SeedTask) (seedTask *types.SeedTask, err error) {
defer func() {
if err != nil {
tm.accessTimeMap.Store(registerTask.TaskID, time.Now())
}
}()
synclock.Lock(registerTask.TaskID, true)
if unreachableTime, ok := tm.getTaskUnreachableTime(registerTask.TaskID); ok {
if time.Since(unreachableTime) < tm.cfg.FailAccessInterval {
synclock.UnLock(registerTask.TaskID, true)
// TODO 校验Header
return nil, errURLUnreachable
}
logger.Debugf("delete taskID: %s from unreachable url list", registerTask.TaskID)
tm.taskURLUnReachableStore.Delete(registerTask.TaskID)
}
actual, loaded := tm.taskStore.LoadOrStore(registerTask.TaskID, registerTask)
seedTask = actual.(*types.SeedTask)
if loaded && !IsSame(seedTask, registerTask) {
synclock.UnLock(registerTask.TaskID, true)
return nil, errors.Wrapf(errTaskIDConflict, "register task %#v is conflict with exist task %#v", registerTask, seedTask)
}
if seedTask.SourceFileLength != source.UnKnownSourceFileLen {
synclock.UnLock(registerTask.TaskID, true)
return seedTask, nil
}
synclock.UnLock(registerTask.TaskID, true)
synclock.Lock(registerTask.TaskID, false)
defer synclock.UnLock(registerTask.TaskID, false)
if seedTask.SourceFileLength != source.UnKnownSourceFileLen {
return seedTask, nil
}
// get sourceContentLength with req.Header
contentLengthRequest, err := source.NewRequestWithHeader(registerTask.URL, registerTask.Header)
if err != nil {
return nil, err
}
// add range info
if !stringutils.IsBlank(registerTask.Range) {
contentLengthRequest.Header.Add(source.Range, registerTask.Range)
}
sourceFileLength, err := source.GetContentLength(contentLengthRequest)
if err != nil {
registerTask.Log().Errorf("get url (%s) content length failed: %v", registerTask.URL, err)
if source.IsResourceNotReachableError(err) {
tm.taskURLUnReachableStore.Store(registerTask, time.Now())
}
return seedTask, err
}
seedTask.SourceFileLength = sourceFileLength
seedTask.Log().Debugf("success get file content length: %d", sourceFileLength)

// if success to get the information successfully with the req.Header then update the task.UrlMeta to registerTask.UrlMeta.
if registerTask.Header != nil {
seedTask.Header = registerTask.Header
}

// calculate piece size and update the PieceSize and PieceTotal
if registerTask.PieceSize <= 0 {
pieceSize := util.ComputePieceSize(registerTask.SourceFileLength)
seedTask.PieceSize = int32(pieceSize)
seedTask.Log().Debugf("piece size calculate result: %s", unit.ToBytes(int64(pieceSize)))
}
return seedTask, nil
}

func (tm Manager) Get(taskID string) (*types.SeedTask, error) {
task, err := tm.getTask(taskID)
// update accessTime for taskID
Expand Down
Loading

0 comments on commit 743cece

Please sign in to comment.