From 743cece749308101a86dba2d2652a1237d89552f Mon Sep 17 00:00:00 2001 From: sunwp <244372610@qq.com> Date: Tue, 7 Dec 2021 18:07:12 +0800 Subject: [PATCH] task manager (#885) Signed-off-by: sunwp <244372610@qq.com> --- cdn/rpcserver/rpcserver.go | 28 +---- cdn/supervisor/cdn/cache_detector.go | 10 +- cdn/supervisor/cdn/cache_detector_test.go | 39 +++--- .../cdn/storage/mock/mock_storage_mgr.go | 14 --- cdn/supervisor/cdn/storage/storage.go | 11 +- cdn/supervisor/mock/mock_task_mgr.go | 2 +- cdn/supervisor/task/manager.go | 80 ++++++++++++- cdn/supervisor/task/manager_util.go | 112 ++---------------- cdn/supervisor/task_mgr.go | 2 +- cdn/types/seed_task_info.go | 25 ++-- 10 files changed, 132 insertions(+), 191 deletions(-) diff --git a/cdn/rpcserver/rpcserver.go b/cdn/rpcserver/rpcserver.go index c706ef0c2bd..a72d16b7646 100644 --- a/cdn/rpcserver/rpcserver.go +++ b/cdn/rpcserver/rpcserver.go @@ -19,7 +19,6 @@ package rpcserver import ( "context" "fmt" - "strings" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/trace" @@ -36,7 +35,6 @@ import ( "d7y.io/dragonfly/v2/pkg/rpc/cdnsystem" cdnserver "d7y.io/dragonfly/v2/pkg/rpc/cdnsystem/server" "d7y.io/dragonfly/v2/pkg/util/hostutils" - "d7y.io/dragonfly/v2/pkg/util/stringutils" ) var tracer = otel.Tracer("cdn-server") @@ -58,29 +56,6 @@ func New(cfg *config.Config, taskMgr supervisor.SeedTaskMgr, opts ...grpc.Server return svr.Server, nil } -func constructRegisterRequest(req *cdnsystem.SeedRequest) *types.TaskRegisterRequest { - meta := req.UrlMeta - header := make(map[string]string) - if meta != nil { - if !stringutils.IsBlank(meta.Digest) { - header["digest"] = meta.Digest - } - if !stringutils.IsBlank(meta.Range) { - header["range"] = meta.Range - } - for k, v := range meta.Header { - header[k] = v - } - } - return &types.TaskRegisterRequest{ - Header: header, - URL: req.Url, - Digest: header["digest"], - TaskID: req.TaskId, - Filter: strings.Split(req.UrlMeta.Filter, "&"), - } -} - func (css *server) ObtainSeeds(ctx context.Context, req *cdnsystem.SeedRequest, psc chan<- *cdnsystem.PieceSeed) (err error) { var span trace.Span ctx, span = tracer.Start(ctx, config.SpanObtainSeeds, trace.WithSpanKind(trace.SpanKindServer)) @@ -96,9 +71,8 @@ func (css *server) ObtainSeeds(ctx context.Context, req *cdnsystem.SeedRequest, } logger.Infof("seeds task %s result success: %t", req.TaskId, err == nil) }() - registerRequest := constructRegisterRequest(req) // register task - pieceChan, err := css.taskMgr.Register(ctx, registerRequest) + pieceChan, err := css.taskMgr.Register(ctx, types.NewSeedTask(req.TaskId, req.Url, req.UrlMeta)) if err != nil { if cdnerrors.IsResourcesLacked(err) { err = dferrors.Newf(base.Code_ResourceLacked, "resources lacked for task(%s): %v", req.TaskId, err) diff --git a/cdn/supervisor/cdn/cache_detector.go b/cdn/supervisor/cdn/cache_detector.go index 823a318539c..4b750490757 100644 --- a/cdn/supervisor/cdn/cache_detector.go +++ b/cdn/supervisor/cdn/cache_detector.go @@ -121,8 +121,8 @@ func (cd *cacheDetector) doDetect(ctx context.Context, task *types.SeedTask, fil } // not expired if fileMetadata.Finish { - // quickly detect the cache situation through the meta data - return cd.parseByReadMetaFile(task.TaskID, fileMetadata) + // quickly detect the cache situation through the metadata + return cd.detectByReadMetaFile(task.TaskID, fileMetadata) } // check if the resource supports range request. if so, // detect the cache situation by reading piece meta and data file @@ -138,11 +138,11 @@ func (cd *cacheDetector) doDetect(ctx context.Context, task *types.SeedTask, fil if !supportRange { return nil, errors.Errorf("resource %s is not support range request", task.URL) } - return cd.parseByReadFile(task.TaskID, fileMetadata, fileDigest) + return cd.detectByReadFile(task.TaskID, fileMetadata, fileDigest) } // parseByReadMetaFile detect cache by read meta and pieceMeta files of task -func (cd *cacheDetector) parseByReadMetaFile(taskID string, fileMetadata *storage.FileMetadata) (*cacheResult, error) { +func (cd *cacheDetector) detectByReadMetaFile(taskID string, fileMetadata *storage.FileMetadata) (*cacheResult, error) { if !fileMetadata.Success { return nil, fmt.Errorf("success flag of taskID %s is false", taskID) } @@ -174,7 +174,7 @@ func (cd *cacheDetector) parseByReadMetaFile(taskID string, fileMetadata *storag } // parseByReadFile detect cache by read pieceMeta and data files of task -func (cd *cacheDetector) parseByReadFile(taskID string, metadata *storage.FileMetadata, fileDigest hash.Hash) (*cacheResult, error) { +func (cd *cacheDetector) detectByReadFile(taskID string, metadata *storage.FileMetadata, fileDigest hash.Hash) (*cacheResult, error) { reader, err := cd.cacheDataManager.readDownloadFile(taskID) if err != nil { return nil, errors.Wrapf(err, "read download data file") diff --git a/cdn/supervisor/cdn/cache_detector_test.go b/cdn/supervisor/cdn/cache_detector_test.go index fb127a73349..ec292f70727 100644 --- a/cdn/supervisor/cdn/cache_detector_test.go +++ b/cdn/supervisor/cdn/cache_detector_test.go @@ -31,7 +31,6 @@ import ( "github.com/golang/mock/gomock" "github.com/stretchr/testify/suite" - cdnerrors "d7y.io/dragonfly/v2/cdn/errors" "d7y.io/dragonfly/v2/cdn/storedriver" "d7y.io/dragonfly/v2/cdn/supervisor/cdn/storage" storageMock "d7y.io/dragonfly/v2/cdn/supervisor/cdn/storage/mock" @@ -57,44 +56,44 @@ func (suite *CacheDetectorTestSuite) SetupSuite() { sourceClient := sourceMock.NewMockResourceClient(ctrl) source.UnRegister("http") suite.Require().Nil(source.Register("http", sourceClient, httpprotocol.Adapter)) - storageMgr := storageMock.NewMockManager(ctrl) - cacheDataManager := newCacheDataManager(storageMgr) + storageManager := storageMock.NewMockManager(ctrl) + cacheDataManager := newCacheDataManager(storageManager) suite.detector = newCacheDetector(cacheDataManager) - storageMgr.EXPECT().ReadFileMetadata(fullExpiredCache.taskID).Return(fullExpiredCache.fileMeta, nil).AnyTimes() - storageMgr.EXPECT().ReadFileMetadata(fullNoExpiredCache.taskID).Return(fullNoExpiredCache.fileMeta, nil).AnyTimes() - storageMgr.EXPECT().ReadFileMetadata(partialNotSupportRangeCache.taskID).Return(partialNotSupportRangeCache.fileMeta, nil).AnyTimes() - storageMgr.EXPECT().ReadFileMetadata(partialSupportRangeCache.taskID).Return(partialSupportRangeCache.fileMeta, nil).AnyTimes() - storageMgr.EXPECT().ReadFileMetadata(noCache.taskID).Return(noCache.fileMeta, cdnerrors.ErrFileNotExist{}).AnyTimes() - storageMgr.EXPECT().ReadDownloadFile(fullNoExpiredCache.taskID).DoAndReturn( + storageManager.EXPECT().ReadFileMetadata(fullExpiredCache.taskID).Return(fullExpiredCache.fileMeta, nil).AnyTimes() + storageManager.EXPECT().ReadFileMetadata(fullNoExpiredCache.taskID).Return(fullNoExpiredCache.fileMeta, nil).AnyTimes() + storageManager.EXPECT().ReadFileMetadata(partialNotSupportRangeCache.taskID).Return(partialNotSupportRangeCache.fileMeta, nil).AnyTimes() + storageManager.EXPECT().ReadFileMetadata(partialSupportRangeCache.taskID).Return(partialSupportRangeCache.fileMeta, nil).AnyTimes() + storageManager.EXPECT().ReadFileMetadata(noCache.taskID).Return(noCache.fileMeta, os.ErrNotExist).AnyTimes() + storageManager.EXPECT().ReadDownloadFile(fullNoExpiredCache.taskID).DoAndReturn( func(taskID string) (io.ReadCloser, error) { content, err := ioutil.ReadFile("../../testdata/cdn/go.html") suite.Nil(err) return ioutil.NopCloser(strings.NewReader(string(content))), nil }).AnyTimes() - storageMgr.EXPECT().ReadDownloadFile(partialNotSupportRangeCache.taskID).DoAndReturn( + storageManager.EXPECT().ReadDownloadFile(partialNotSupportRangeCache.taskID).DoAndReturn( func(taskID string) (io.ReadCloser, error) { content, err := ioutil.ReadFile("../../testdata/cdn/go.html") suite.Nil(err) return ioutil.NopCloser(strings.NewReader(string(content))), nil }).AnyTimes() - storageMgr.EXPECT().ReadDownloadFile(partialSupportRangeCache.taskID).DoAndReturn( + storageManager.EXPECT().ReadDownloadFile(partialSupportRangeCache.taskID).DoAndReturn( func(taskID string) (io.ReadCloser, error) { content, err := ioutil.ReadFile("../../testdata/cdn/go.html") suite.Nil(err) return ioutil.NopCloser(strings.NewReader(string(content))), nil }).AnyTimes() - storageMgr.EXPECT().ReadDownloadFile(noCache.taskID).Return(nil, os.ErrNotExist).AnyTimes() - storageMgr.EXPECT().ReadPieceMetaRecords(fullNoExpiredCache.taskID).Return(fullNoExpiredCache.pieces, nil).AnyTimes() - storageMgr.EXPECT().ReadPieceMetaRecords(partialNotSupportRangeCache.taskID).Return(partialNotSupportRangeCache.pieces, nil).AnyTimes() - storageMgr.EXPECT().ReadPieceMetaRecords(partialSupportRangeCache.taskID).Return(partialSupportRangeCache.pieces, nil).AnyTimes() - storageMgr.EXPECT().ReadPieceMetaRecords(noCache.taskID).Return(nil, os.ErrNotExist).AnyTimes() - storageMgr.EXPECT().StatDownloadFile(fullNoExpiredCache.taskID).Return(&storedriver.StorageInfo{ + storageManager.EXPECT().ReadDownloadFile(noCache.taskID).Return(nil, os.ErrNotExist).AnyTimes() + storageManager.EXPECT().ReadPieceMetaRecords(fullNoExpiredCache.taskID).Return(fullNoExpiredCache.pieces, nil).AnyTimes() + storageManager.EXPECT().ReadPieceMetaRecords(partialNotSupportRangeCache.taskID).Return(partialNotSupportRangeCache.pieces, nil).AnyTimes() + storageManager.EXPECT().ReadPieceMetaRecords(partialSupportRangeCache.taskID).Return(partialSupportRangeCache.pieces, nil).AnyTimes() + storageManager.EXPECT().ReadPieceMetaRecords(noCache.taskID).Return(nil, os.ErrNotExist).AnyTimes() + storageManager.EXPECT().StatDownloadFile(fullNoExpiredCache.taskID).Return(&storedriver.StorageInfo{ Path: "", Size: 9789, CreateTime: time.Time{}, ModTime: time.Time{}, }, nil).AnyTimes() - storageMgr.EXPECT().StatDownloadFile(gomock.Not(fullNoExpiredCache.taskID)).Return(&storedriver.StorageInfo{}, nil).AnyTimes() + storageManager.EXPECT().StatDownloadFile(gomock.Not(fullNoExpiredCache.taskID)).Return(&storedriver.StorageInfo{}, nil).AnyTimes() sourceClient.EXPECT().IsExpired(source.RequestEq(expiredAndSupportURL), gomock.Any()).Return(true, nil).AnyTimes() sourceClient.EXPECT().IsSupportRange(source.RequestEq(expiredAndSupportURL)).Return(true, nil).AnyTimes() @@ -380,7 +379,7 @@ func (suite *CacheDetectorTestSuite) TestParseByReadFile() { } for _, tt := range tests { suite.Run(tt.name, func() { - got, err := suite.detector.parseByReadFile(tt.args.taskID, tt.args.metadata, md5.New()) + got, err := suite.detector.detectByReadFile(tt.args.taskID, tt.args.metadata, md5.New()) suite.Equal(tt.want, got) suite.Equal(tt.wantErr, err != nil) }) @@ -414,7 +413,7 @@ func (suite *CacheDetectorTestSuite) TestParseByReadMetaFile() { } for _, tt := range tests { suite.Run(tt.name, func() { - got, err := suite.detector.parseByReadMetaFile(tt.args.taskID, tt.args.fileMetadata) + got, err := suite.detector.detectByReadMetaFile(tt.args.taskID, tt.args.fileMetadata) suite.Equal(tt.wantErr, err != nil) suite.Equal(tt.want, got) }) diff --git a/cdn/supervisor/cdn/storage/mock/mock_storage_mgr.go b/cdn/supervisor/cdn/storage/mock/mock_storage_mgr.go index abe7e95c7f6..0f9d4f8c239 100644 --- a/cdn/supervisor/cdn/storage/mock/mock_storage_mgr.go +++ b/cdn/supervisor/cdn/storage/mock/mock_storage_mgr.go @@ -52,20 +52,6 @@ func (mr *MockManagerMockRecorder) AppendPieceMetadata(arg0, arg1 interface{}) * return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AppendPieceMetadata", reflect.TypeOf((*MockManager)(nil).AppendPieceMetadata), arg0, arg1) } -// CreateUploadLink mocks base method. -func (m *MockManager) CreateUploadLink(arg0 string) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "CreateUploadLink", arg0) - ret0, _ := ret[0].(error) - return ret0 -} - -// CreateUploadLink indicates an expected call of CreateUploadLink. -func (mr *MockManagerMockRecorder) CreateUploadLink(arg0 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateUploadLink", reflect.TypeOf((*MockManager)(nil).CreateUploadLink), arg0) -} - // DeleteTask mocks base method. func (m *MockManager) DeleteTask(arg0 string) error { m.ctrl.T.Helper() diff --git a/cdn/supervisor/cdn/storage/storage.go b/cdn/supervisor/cdn/storage/storage.go index bc4dddf7798..1fd9d81cc08 100644 --- a/cdn/supervisor/cdn/storage/storage.go +++ b/cdn/supervisor/cdn/storage/storage.go @@ -52,9 +52,6 @@ type Manager interface { // ReadDownloadFile return reader of download file ReadDownloadFile(taskID string) (io.ReadCloser, error) - // CreateUploadLink create a upload link to download file - CreateUploadLink(taskID string) error - // ReadFileMetadata return meta data of download file ReadFileMetadata(taskID string) (*FileMetadata, error) @@ -79,8 +76,8 @@ type Manager interface { // FileMetadata meta data of task type FileMetadata struct { - TaskID string `json:"taskId"` - TaskURL string `json:"taskUrl"` + TaskID string `json:"taskID"` + TaskURL string `json:"taskURL"` PieceSize int32 `json:"pieceSize"` SourceFileLen int64 `json:"sourceFileLen"` AccessTime int64 `json:"accessTime"` @@ -181,10 +178,6 @@ func (m *managerPlugin) ReadDownloadFile(taskID string) (io.ReadCloser, error) { return m.instance.ReadDownloadFile(taskID) } -func (m *managerPlugin) CreateUploadLink(taskID string) error { - return m.instance.CreateUploadLink(taskID) -} - func (m *managerPlugin) ReadFileMetadata(taskID string) (*FileMetadata, error) { return m.instance.ReadFileMetadata(taskID) } diff --git a/cdn/supervisor/mock/mock_task_mgr.go b/cdn/supervisor/mock/mock_task_mgr.go index fc83c12bb98..a19262a6636 100644 --- a/cdn/supervisor/mock/mock_task_mgr.go +++ b/cdn/supervisor/mock/mock_task_mgr.go @@ -95,7 +95,7 @@ func (mr *MockSeedTaskMgrMockRecorder) GetPieces(arg0, arg1 interface{}) *gomock } // Register mocks base method. -func (m *MockSeedTaskMgr) Register(arg0 context.Context, arg1 *types.TaskRegisterRequest) (<-chan *types.SeedPiece, error) { +func (m *MockSeedTaskMgr) Register(arg0 context.Context, arg1 *types.SeedTask) (<-chan *types.SeedPiece, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Register", arg0, arg1) ret0, _ := ret[0].(<-chan *types.SeedPiece) diff --git a/cdn/supervisor/task/manager.go b/cdn/supervisor/task/manager.go index 90e06b41d43..caa2836c669 100644 --- a/cdn/supervisor/task/manager.go +++ b/cdn/supervisor/task/manager.go @@ -32,8 +32,11 @@ import ( "d7y.io/dragonfly/v2/cdn/types" "d7y.io/dragonfly/v2/internal/dferrors" logger "d7y.io/dragonfly/v2/internal/dflog" + "d7y.io/dragonfly/v2/internal/util" + "d7y.io/dragonfly/v2/pkg/source" "d7y.io/dragonfly/v2/pkg/synclock" "d7y.io/dragonfly/v2/pkg/syncmap" + "d7y.io/dragonfly/v2/pkg/unit" "d7y.io/dragonfly/v2/pkg/util/stringutils" ) @@ -41,6 +44,11 @@ import ( var _ supervisor.SeedTaskMgr = (*Manager)(nil) var _ gc.Executor = (*Manager)(nil) +var ( + errURLUnreachable = errors.New("url is unreachable") + errTaskIDConflict = errors.New("taskID is conflict") +) + var tracer trace.Tracer func init() { @@ -72,14 +80,14 @@ func NewManager(cfg *config.Config, cdnMgr supervisor.CDNMgr, progressMgr superv return taskMgr, nil } -func (tm *Manager) Register(ctx context.Context, req *types.TaskRegisterRequest) (pieceChan <-chan *types.SeedPiece, err error) { +func (tm *Manager) Register(ctx context.Context, registerTask *types.SeedTask) (pieceChan <-chan *types.SeedPiece, err error) { var span trace.Span ctx, span = tracer.Start(ctx, config.SpanTaskRegister) defer span.End() - task, err := tm.addOrUpdateTask(ctx, req) + task, err := tm.AddOrUpdate(registerTask) if err != nil { span.RecordError(err) - logger.WithTaskID(req.TaskID).Infof("failed to add or update task with req: %#v: %v", req, err) + logger.WithTaskID(registerTask.TaskID).Infof("failed to add or update task with req: %#v: %v", registerTask, err) return nil, err } taskBytes, _ := json.Marshal(task) @@ -172,6 +180,72 @@ func (tm *Manager) getTask(taskID string) (*types.SeedTask, error) { return nil, errors.Wrapf(cdnerrors.ErrConvertFailed, "origin object: %#v", v) } +func (tm *Manager) AddOrUpdate(registerTask *types.SeedTask) (seedTask *types.SeedTask, err error) { + defer func() { + if err != nil { + tm.accessTimeMap.Store(registerTask.TaskID, time.Now()) + } + }() + synclock.Lock(registerTask.TaskID, true) + if unreachableTime, ok := tm.getTaskUnreachableTime(registerTask.TaskID); ok { + if time.Since(unreachableTime) < tm.cfg.FailAccessInterval { + synclock.UnLock(registerTask.TaskID, true) + // TODO 校验Header + return nil, errURLUnreachable + } + logger.Debugf("delete taskID: %s from unreachable url list", registerTask.TaskID) + tm.taskURLUnReachableStore.Delete(registerTask.TaskID) + } + actual, loaded := tm.taskStore.LoadOrStore(registerTask.TaskID, registerTask) + seedTask = actual.(*types.SeedTask) + if loaded && !IsSame(seedTask, registerTask) { + synclock.UnLock(registerTask.TaskID, true) + return nil, errors.Wrapf(errTaskIDConflict, "register task %#v is conflict with exist task %#v", registerTask, seedTask) + } + if seedTask.SourceFileLength != source.UnKnownSourceFileLen { + synclock.UnLock(registerTask.TaskID, true) + return seedTask, nil + } + synclock.UnLock(registerTask.TaskID, true) + synclock.Lock(registerTask.TaskID, false) + defer synclock.UnLock(registerTask.TaskID, false) + if seedTask.SourceFileLength != source.UnKnownSourceFileLen { + return seedTask, nil + } + // get sourceContentLength with req.Header + contentLengthRequest, err := source.NewRequestWithHeader(registerTask.URL, registerTask.Header) + if err != nil { + return nil, err + } + // add range info + if !stringutils.IsBlank(registerTask.Range) { + contentLengthRequest.Header.Add(source.Range, registerTask.Range) + } + sourceFileLength, err := source.GetContentLength(contentLengthRequest) + if err != nil { + registerTask.Log().Errorf("get url (%s) content length failed: %v", registerTask.URL, err) + if source.IsResourceNotReachableError(err) { + tm.taskURLUnReachableStore.Store(registerTask, time.Now()) + } + return seedTask, err + } + seedTask.SourceFileLength = sourceFileLength + seedTask.Log().Debugf("success get file content length: %d", sourceFileLength) + + // if success to get the information successfully with the req.Header then update the task.UrlMeta to registerTask.UrlMeta. + if registerTask.Header != nil { + seedTask.Header = registerTask.Header + } + + // calculate piece size and update the PieceSize and PieceTotal + if registerTask.PieceSize <= 0 { + pieceSize := util.ComputePieceSize(registerTask.SourceFileLength) + seedTask.PieceSize = int32(pieceSize) + seedTask.Log().Debugf("piece size calculate result: %s", unit.ToBytes(int64(pieceSize))) + } + return seedTask, nil +} + func (tm Manager) Get(taskID string) (*types.SeedTask, error) { task, err := tm.getTask(taskID) // update accessTime for taskID diff --git a/cdn/supervisor/task/manager_util.go b/cdn/supervisor/task/manager_util.go index ff6b7f21b5a..eeded7f9979 100644 --- a/cdn/supervisor/task/manager_util.go +++ b/cdn/supervisor/task/manager_util.go @@ -17,118 +17,22 @@ package task import ( - "context" - "fmt" - "reflect" "time" "github.com/pkg/errors" - "go.opentelemetry.io/otel/trace" - "d7y.io/dragonfly/v2/cdn/config" cdnerrors "d7y.io/dragonfly/v2/cdn/errors" "d7y.io/dragonfly/v2/cdn/types" - logger "d7y.io/dragonfly/v2/internal/dflog" - "d7y.io/dragonfly/v2/internal/util" - "d7y.io/dragonfly/v2/pkg/source" - "d7y.io/dragonfly/v2/pkg/synclock" - "d7y.io/dragonfly/v2/pkg/util/net/urlutils" "d7y.io/dragonfly/v2/pkg/util/stringutils" ) -// addOrUpdateTask add a new task or update exist task -func (tm *Manager) addOrUpdateTask(ctx context.Context, request *types.TaskRegisterRequest) (*types.SeedTask, error) { - var span trace.Span - ctx, span = tracer.Start(ctx, config.SpanAndOrUpdateTask) - defer span.End() - taskURL := request.URL - if request.Filter != nil { - taskURL = urlutils.FilterURLParam(request.URL, request.Filter) +// getTaskUnreachableTime get unreachable time of task and convert it to time.Time type +func (tm *Manager) getTaskUnreachableTime(taskID string) (time.Time, bool) { + unreachableTime, ok := tm.taskURLUnReachableStore.Load(taskID) + if !ok { + return time.Time{}, false } - span.SetAttributes(config.AttributeTaskURL.String(taskURL)) - taskID := request.TaskID - synclock.Lock(taskID, false) - defer synclock.UnLock(taskID, false) - if key, err := tm.taskURLUnReachableStore.Get(taskID); err == nil { - if unReachableStartTime, ok := key.(time.Time); ok && time.Since(unReachableStartTime) < tm.cfg.FailAccessInterval { - existTask, err := tm.taskStore.Get(taskID) - if err != nil || reflect.DeepEqual(request.Header, existTask.(*types.SeedTask).Header) { - span.AddEvent(config.EventHitUnReachableURL) - return nil, errors.Wrapf(cdnerrors.ErrURLNotReachable{URL: request.URL}, "task hit unReachable cache and interval less than %d, "+ - "url: %s", tm.cfg.FailAccessInterval, request.URL) - } - } - span.AddEvent(config.EventDeleteUnReachableTask) - tm.taskURLUnReachableStore.Delete(taskID) - logger.Debugf("delete taskID: %s from url unReachable store", taskID) - } - - var task *types.SeedTask - newTask := types.NewSeedTask(taskID, request.Header, request.Digest, request.URL, taskURL) - // using the existing task if it already exists corresponding to taskID - if v, err := tm.taskStore.Get(taskID); err == nil { - span.SetAttributes(config.AttributeIfReuseTask.Bool(true)) - existTask := v.(*types.SeedTask) - if !isSameTask(existTask, newTask) { - span.RecordError(fmt.Errorf("newTask: %#v, existTask: %#v", newTask, existTask)) - return nil, cdnerrors.ErrTaskIDDuplicate{TaskID: taskID, Cause: fmt.Errorf("newTask: %#v, existTask: %#v", newTask, existTask)} - } - task = existTask - logger.Debugf("get exist task for taskID: %s", taskID) - } else { - span.SetAttributes(config.AttributeIfReuseTask.Bool(false)) - logger.Debugf("get new task for taskID: %s", taskID) - task = newTask - } - - if task.SourceFileLength != types.IllegalSourceFileLen { - return task, nil - } - - lengthRequest, err := source.NewRequestWithContext(ctx, task.URL, request.Header) - if err != nil { - return nil, err - } - // get sourceContentLength with req.Header - span.AddEvent(config.EventRequestSourceFileLength) - sourceFileLength, err := source.GetContentLength(lengthRequest) - if err != nil { - task.Log().Errorf("failed to get url (%s) content length: %v", task.URL, err) - if cdnerrors.IsURLNotReachable(err) { - if err := tm.taskURLUnReachableStore.Add(taskID, time.Now()); err != nil { - task.Log().Errorf("failed to add url (%s) to unreachable store: %v", task.URL, err) - } - return nil, err - } - } - // if not support file length header request ,return -1 - task.SourceFileLength = sourceFileLength - logger.WithTaskID(taskID).Debugf("get file content length: %d", sourceFileLength) - if task.SourceFileLength > 0 { - ok, err := tm.cdnMgr.TryFreeSpace(task.SourceFileLength) - if err != nil { - logger.Errorf("failed to try free space: %v", err) - } else if !ok { - return nil, cdnerrors.ErrResourcesLacked - } - } - - // if success to get the information successfully with the req.Header then update the task.Header to req.Header. - if request.Header != nil { - task.Header = request.Header - } - - // calculate piece size and update the PieceSize and PieceTotal - if task.PieceSize <= 0 { - pieceSize := util.ComputePieceSize(task.SourceFileLength) - task.PieceSize = int32(pieceSize) - } - if err := tm.taskStore.Add(task.TaskID, task); err != nil { - return nil, err - } - - logger.Debugf("success add task: %#v into taskStore", task) - return task, nil + return unreachableTime.(time.Time), true } // updateTask @@ -187,8 +91,8 @@ func (tm *Manager) updateTask(taskID string, updateTaskInfo *types.SeedTask) (*t return task, nil } -// isSameTask check whether the two task provided are the same -func isSameTask(task1, task2 *types.SeedTask) bool { +// IsSame check whether the two task provided are the same +func IsSame(task1, task2 *types.SeedTask) bool { if task1 == task2 { return true } diff --git a/cdn/supervisor/task_mgr.go b/cdn/supervisor/task_mgr.go index 49c1d5468e3..0a43ab3b51a 100644 --- a/cdn/supervisor/task_mgr.go +++ b/cdn/supervisor/task_mgr.go @@ -29,7 +29,7 @@ import ( type SeedTaskMgr interface { // Register register seed task - Register(context.Context, *types.TaskRegisterRequest) (pieceCh <-chan *types.SeedPiece, err error) + Register(context.Context, *types.SeedTask) (pieceCh <-chan *types.SeedPiece, err error) // Get get task Info with specified taskId. Get(string) (*types.SeedTask, error) diff --git a/cdn/types/seed_task_info.go b/cdn/types/seed_task_info.go index 4a13168363f..7718ef8919f 100644 --- a/cdn/types/seed_task_info.go +++ b/cdn/types/seed_task_info.go @@ -17,7 +17,12 @@ package types import ( + "strings" + logger "d7y.io/dragonfly/v2/internal/dflog" + "d7y.io/dragonfly/v2/pkg/rpc/base" + "d7y.io/dragonfly/v2/pkg/source" + "d7y.io/dragonfly/v2/pkg/util/net/urlutils" ) type SeedTask struct { @@ -31,8 +36,8 @@ type SeedTask struct { CdnStatus string `json:"cdnStatus,omitempty"` PieceTotal int32 `json:"pieceTotal,omitempty"` RequestDigest string `json:"requestDigest,omitempty"` - Range string `json:"range"` SourceRealDigest string `json:"sourceRealDigest,omitempty"` + Range string `json:"range,omitempty"` PieceMd5Sign string `json:"pieceMd5Sign,omitempty"` logger *logger.SugaredLoggerOnWith } @@ -41,15 +46,21 @@ const ( IllegalSourceFileLen = -100 ) -func NewSeedTask(taskID string, header map[string]string, digest string, url string, taskURL string) *SeedTask { +func NewSeedTask(taskID string, rawURL string, urlMeta *base.UrlMeta) *SeedTask { + if urlMeta == nil { + urlMeta = &base.UrlMeta{} + } return &SeedTask{ TaskID: taskID, - Header: header, - RequestDigest: digest, - URL: url, - TaskURL: taskURL, + Header: urlMeta.Header, + RequestDigest: urlMeta.Digest, + URL: rawURL, + TaskURL: urlutils.FilterURLParam(rawURL, strings.Split(urlMeta.Filter, "&")), + SourceFileLength: source.UnKnownSourceFileLen, + CdnFileLength: 0, + PieceSize: 0, + Range: urlMeta.Range, CdnStatus: TaskInfoCdnStatusWaiting, - SourceFileLength: IllegalSourceFileLen, logger: logger.WithTaskID(taskID), } }