From 9a0d057bcb13122d5385f72faeb1ab2efc07125e Mon Sep 17 00:00:00 2001 From: zzy987 <67889264+zzy987@users.noreply.github.com> Date: Fri, 6 Aug 2021 16:15:31 +0800 Subject: [PATCH] let cdn support sha256 (#517) cdn support sha256 digest --- cdnsystem/daemon/cdn/cache_data_mgr.go | 4 +- cdnsystem/daemon/cdn/cache_detector.go | 24 ++-- cdnsystem/daemon/cdn/cache_detector_test.go | 52 ++++----- cdnsystem/daemon/cdn/manager.go | 47 +++++--- cdnsystem/daemon/cdn/manager_test.go | 121 ++++++++++++++------ cdnsystem/daemon/cdn/storage/storage_mgr.go | 26 ++--- cdnsystem/daemon/task/manager_test.go | 22 +++- cdnsystem/daemon/task/manager_util.go | 14 +-- cdnsystem/server/service/cdn_seed_server.go | 9 +- cdnsystem/types/seed_task_info.go | 8 +- cdnsystem/types/task_register_request.go | 2 +- client/dfget/dfget.go | 4 +- client/dfget/dfget_test.go | 3 +- internal/constants/constants.go | 3 - pkg/ratelimiter/limitreader/limit_reader.go | 64 +++++------ pkg/util/digestutils/digest.go | 38 +++++- pkg/util/digestutils/digest_test.go | 3 +- scheduler/tasks/tasks.go | 7 +- 18 files changed, 269 insertions(+), 182 deletions(-) diff --git a/cdnsystem/daemon/cdn/cache_data_mgr.go b/cdnsystem/daemon/cdn/cache_data_mgr.go index c73ebc1cd1c..50457754df0 100644 --- a/cdnsystem/daemon/cdn/cache_data_mgr.go +++ b/cdnsystem/daemon/cdn/cache_data_mgr.go @@ -119,8 +119,8 @@ func (mm *cacheDataManager) updateStatusAndResult(taskID string, metaData *stora if metaData.TotalPieceCount > 0 { originMetaData.TotalPieceCount = metaData.TotalPieceCount } - if !stringutils.IsBlank(metaData.SourceRealMd5) { - originMetaData.SourceRealMd5 = metaData.SourceRealMd5 + if !stringutils.IsBlank(metaData.SourceRealDigest) { + originMetaData.SourceRealDigest = metaData.SourceRealDigest } if !stringutils.IsBlank(metaData.PieceMd5Sign) { originMetaData.PieceMd5Sign = metaData.PieceMd5Sign diff --git a/cdnsystem/daemon/cdn/cache_detector.go b/cdnsystem/daemon/cdn/cache_detector.go index dd1beaa5127..860c713511d 100644 --- a/cdnsystem/daemon/cdn/cache_detector.go +++ b/cdnsystem/daemon/cdn/cache_detector.go @@ -59,12 +59,12 @@ func newCacheDetector(cacheDataManager *cacheDataManager) *cacheDetector { } } -func (cd *cacheDetector) detectCache(task *types.SeedTask, fileMd5 hash.Hash) (*cacheResult, error) { +func (cd *cacheDetector) detectCache(task *types.SeedTask, fileDigest hash.Hash) (*cacheResult, error) { //err := cd.cacheStore.CreateUploadLink(ctx, task.TaskId) //if err != nil { // return nil, errors.Wrapf(err, "failed to create upload symbolic link") //} - result, err := cd.doDetect(task, fileMd5) + result, err := cd.doDetect(task, fileDigest) if err != nil { logger.WithTaskID(task.TaskID).Infof("failed to detect cache, reset cache: %v", err) metaData, err := cd.resetCache(task) @@ -83,7 +83,7 @@ func (cd *cacheDetector) detectCache(task *types.SeedTask, fileMd5 hash.Hash) (* } // doDetect the actual detect action which detects file metaData and pieces metaData of specific task -func (cd *cacheDetector) doDetect(task *types.SeedTask, fileMd5 hash.Hash) (result *cacheResult, err error) { +func (cd *cacheDetector) doDetect(task *types.SeedTask, fileDigest hash.Hash) (result *cacheResult, err error) { fileMetaData, err := cd.cacheDataManager.readFileMetaData(task.TaskID) if err != nil { return nil, errors.Wrapf(err, "read file meta data of task %s", task.TaskID) @@ -118,7 +118,7 @@ func (cd *cacheDetector) doDetect(task *types.SeedTask, fileMd5 hash.Hash) (resu if !supportRange { return nil, cdnerrors.ErrResourceNotSupportRangeRequest{URL: task.URL} } - return cd.parseByReadFile(task.TaskID, fileMetaData, fileMd5) + return cd.parseByReadFile(task.TaskID, fileMetaData, fileDigest) } // parseByReadMetaFile detect cache by read meta and pieceMeta files of task @@ -154,7 +154,7 @@ func (cd *cacheDetector) parseByReadMetaFile(taskID string, fileMetaData *storag } // parseByReadFile detect cache by read pieceMeta and data files of task -func (cd *cacheDetector) parseByReadFile(taskID string, metaData *storage.FileMetaData, fileMd5 hash.Hash) (*cacheResult, error) { +func (cd *cacheDetector) parseByReadFile(taskID string, metaData *storage.FileMetaData, fileDigest hash.Hash) (*cacheResult, error) { reader, err := cd.cacheDataManager.readDownloadFile(taskID) if err != nil { return nil, errors.Wrapf(err, "read data file") @@ -177,7 +177,7 @@ func (cd *cacheDetector) parseByReadFile(taskID string, metaData *storage.FileMe break } // read content - if err := checkPieceContent(reader, tempRecords[index], fileMd5); err != nil { + if err := checkPieceContent(reader, tempRecords[index], fileDigest); err != nil { logger.WithTaskID(taskID).Errorf("read content of pieceNum %d failed: %v", tempRecords[index].PieceNum, err) break } @@ -237,19 +237,19 @@ func checkSameFile(task *types.SeedTask, metaData *storage.FileMetaData) error { if metaData.TaskURL != task.TaskURL { return errors.Errorf("meta task taskUrl(%s) is not equals with task taskUrl(%s)", metaData.TaskURL, task.URL) } - if !stringutils.IsBlank(metaData.SourceRealMd5) && !stringutils.IsBlank(task.RequestMd5) && - metaData.SourceRealMd5 != task.RequestMd5 { - return errors.Errorf("meta task source md5(%s) is not equals with task request md5(%s)", - metaData.SourceRealMd5, task.RequestMd5) + if !stringutils.IsBlank(metaData.SourceRealDigest) && !stringutils.IsBlank(task.RequestDigest) && + metaData.SourceRealDigest != task.RequestDigest { + return errors.Errorf("meta task source digest(%s) is not equals with task request digest(%s)", + metaData.SourceRealDigest, task.RequestDigest) } return nil } //checkPieceContent read piece content from reader and check data integrity by pieceMetaRecord -func checkPieceContent(reader io.Reader, pieceRecord *storage.PieceMetaRecord, fileMd5 hash.Hash) error { +func checkPieceContent(reader io.Reader, pieceRecord *storage.PieceMetaRecord, fileDigest hash.Hash) error { // TODO Analyze the original data for the slice format to calculate fileMd5 pieceMd5 := md5.New() - tee := io.TeeReader(io.TeeReader(io.LimitReader(reader, int64(pieceRecord.PieceLen)), pieceMd5), fileMd5) + tee := io.TeeReader(io.TeeReader(io.LimitReader(reader, int64(pieceRecord.PieceLen)), pieceMd5), fileDigest) if n, err := io.Copy(ioutil.Discard, tee); n != int64(pieceRecord.PieceLen) || err != nil { return errors.Wrap(err, "read piece content") } diff --git a/cdnsystem/daemon/cdn/cache_detector_test.go b/cdnsystem/daemon/cdn/cache_detector_test.go index 24ef6bcf851..0258930b8f3 100644 --- a/cdnsystem/daemon/cdn/cache_detector_test.go +++ b/cdnsystem/daemon/cdn/cache_detector_test.go @@ -218,37 +218,37 @@ var fullPieceMetaRecords = append(partialPieceMetaRecords, &storage.PieceMetaRec func newCompletedFileMeta(taskID string, URL string, success bool) *storage.FileMetaData { return &storage.FileMetaData{ - TaskID: taskID, - TaskURL: URL, - PieceSize: 2000, - SourceFileLen: 9789, - AccessTime: 1624126443284, - Interval: 0, - CdnFileLength: 9789, - SourceRealMd5: "", - PieceMd5Sign: "98166bdfebb7b71dd5c6d47492d844f4421d90199641ca11fd8ce3111894115a", - ExpireInfo: nil, - Finish: true, - Success: success, - TotalPieceCount: 5, + TaskID: taskID, + TaskURL: URL, + PieceSize: 2000, + SourceFileLen: 9789, + AccessTime: 1624126443284, + Interval: 0, + CdnFileLength: 9789, + SourceRealDigest: "", + PieceMd5Sign: "98166bdfebb7b71dd5c6d47492d844f4421d90199641ca11fd8ce3111894115a", + ExpireInfo: nil, + Finish: true, + Success: success, + TotalPieceCount: 5, } } func newPartialFileMeta(taskID string, URL string) *storage.FileMetaData { return &storage.FileMetaData{ - TaskID: taskID, - TaskURL: URL, - PieceSize: 2000, - SourceFileLen: 9789, - AccessTime: 1624126443284, - Interval: 0, - CdnFileLength: 0, - SourceRealMd5: "", - PieceMd5Sign: "", - ExpireInfo: nil, - Finish: false, - Success: false, - TotalPieceCount: 0, + TaskID: taskID, + TaskURL: URL, + PieceSize: 2000, + SourceFileLen: 9789, + AccessTime: 1624126443284, + Interval: 0, + CdnFileLength: 0, + SourceRealDigest: "", + PieceMd5Sign: "", + ExpireInfo: nil, + Finish: false, + Success: false, + TotalPieceCount: 0, } } diff --git a/cdnsystem/daemon/cdn/manager.go b/cdnsystem/daemon/cdn/manager.go index 55f6f006385..d3e21687dc8 100644 --- a/cdnsystem/daemon/cdn/manager.go +++ b/cdnsystem/daemon/cdn/manager.go @@ -17,10 +17,12 @@ package cdn import ( + "crypto/md5" "time" + "d7y.io/dragonfly/v2/pkg/util/digestutils" + "context" - "crypto/md5" "fmt" "d7y.io/dragonfly/v2/cdnsystem/daemon" @@ -85,9 +87,16 @@ func (cm *Manager) TriggerCDN(ctx context.Context, task *types.SeedTask) (seedTa // obtain taskId write lock cm.cdnLocker.Lock(task.TaskID, false) defer cm.cdnLocker.UnLock(task.TaskID, false) + + var fileDigest = md5.New() + var digestType = digestutils.Md5Hash.String() + if !stringutils.IsBlank(task.RequestDigest) { + requestDigest := digestutils.Parse(task.RequestDigest) + digestType = requestDigest[0] + fileDigest = digestutils.CreateHash(digestType) + } // first: detect Cache - fileMd5 := md5.New() - detectResult, err := cm.detector.detectCache(task, fileMd5) + detectResult, err := cm.detector.detectCache(task, fileDigest) if err != nil { seedTask.UpdateStatus(types.TaskInfoCdnStatusFailed) return seedTask, errors.Wrapf(err, "failed to detect cache") @@ -101,7 +110,7 @@ func (cm *Manager) TriggerCDN(ctx context.Context, task *types.SeedTask) (seedTa // full cache if detectResult.breakPoint == -1 { logger.WithTaskID(task.TaskID).Infof("cache full hit on local") - seedTask.UpdateTaskInfo(types.TaskInfoCdnStatusSuccess, detectResult.fileMetaData.SourceRealMd5, detectResult.fileMetaData.PieceMd5Sign, + seedTask.UpdateTaskInfo(types.TaskInfoCdnStatusSuccess, detectResult.fileMetaData.SourceRealDigest, detectResult.fileMetaData.PieceMd5Sign, detectResult.fileMetaData.SourceFileLen, detectResult.fileMetaData.CdnFileLength) return seedTask, nil } @@ -116,8 +125,8 @@ func (cm *Manager) TriggerCDN(ctx context.Context, task *types.SeedTask) (seedTa return seedTask, err } defer body.Close() + reader := limitreader.NewLimitReaderWithLimiterAndDigest(body, cm.limiter, fileDigest, digestutils.Algorithms[digestType]) - reader := limitreader.NewLimitReaderWithLimiterAndMD5Sum(body, cm.limiter, fileMd5) // forth: write to storage downloadMetadata, err := cm.writer.startWriter(reader, task, detectResult) if err != nil { @@ -129,14 +138,14 @@ func (cm *Manager) TriggerCDN(ctx context.Context, task *types.SeedTask) (seedTa } server.StatSeedFinish(task.TaskID, task.URL, true, nil, start.Nanosecond(), time.Now().Nanosecond(), downloadMetadata.backSourceLength, downloadMetadata.realSourceFileLength) - sourceMD5 := reader.Md5() + sourceDigest := reader.Digest() // fifth: handle CDN result - success, err := cm.handleCDNResult(task, sourceMD5, downloadMetadata) + success, err := cm.handleCDNResult(task, sourceDigest, downloadMetadata) if err != nil || !success { seedTask.UpdateStatus(types.TaskInfoCdnStatusFailed) return seedTask, err } - seedTask.UpdateTaskInfo(types.TaskInfoCdnStatusSuccess, sourceMD5, downloadMetadata.pieceMd5Sign, + seedTask.UpdateTaskInfo(types.TaskInfoCdnStatusSuccess, sourceDigest, downloadMetadata.pieceMd5Sign, downloadMetadata.realSourceFileLength, downloadMetadata.realCdnFileLength) return seedTask, nil } @@ -149,13 +158,13 @@ func (cm *Manager) Delete(taskID string) error { return nil } -func (cm *Manager) handleCDNResult(task *types.SeedTask, sourceMd5 string, downloadMetadata *downloadMetadata) (bool, error) { +func (cm *Manager) handleCDNResult(task *types.SeedTask, sourceDigest string, downloadMetadata *downloadMetadata) (bool, error) { logger.WithTaskID(task.TaskID).Debugf("handle cdn result, downloadMetaData: %+v", downloadMetadata) var isSuccess = true var errorMsg string // check md5 - if !stringutils.IsBlank(task.RequestMd5) && task.RequestMd5 != sourceMd5 { - errorMsg = fmt.Sprintf("file md5 not match expected: %s real: %s", task.RequestMd5, sourceMd5) + if !stringutils.IsBlank(task.RequestDigest) && task.RequestDigest != sourceDigest { + errorMsg = fmt.Sprintf("file digest not match expected: %s real: %s", task.RequestDigest, sourceDigest) isSuccess = false } // check source length @@ -178,13 +187,13 @@ func (cm *Manager) handleCDNResult(task *types.SeedTask, sourceMd5 string, downl cdnFileLength = 0 } if err := cm.cacheDataManager.updateStatusAndResult(task.TaskID, &storage.FileMetaData{ - Finish: true, - Success: isSuccess, - SourceRealMd5: sourceMd5, - PieceMd5Sign: pieceMd5Sign, - CdnFileLength: cdnFileLength, - SourceFileLen: sourceFileLen, - TotalPieceCount: downloadMetadata.pieceTotalCount, + Finish: true, + Success: isSuccess, + SourceRealDigest: sourceDigest, + PieceMd5Sign: pieceMd5Sign, + CdnFileLength: cdnFileLength, + SourceFileLen: sourceFileLen, + TotalPieceCount: downloadMetadata.pieceTotalCount, }); err != nil { return false, errors.Wrap(err, "failed to update task status and result") } @@ -193,7 +202,7 @@ func (cm *Manager) handleCDNResult(task *types.SeedTask, sourceMd5 string, downl return false, errors.New(errorMsg) } - logger.WithTaskID(task.TaskID).Infof("success to get task, downloadMetadata: %+v realMd5: %s", downloadMetadata, sourceMd5) + logger.WithTaskID(task.TaskID).Infof("success to get task, downloadMetadata: %+v realDigest: %s", downloadMetadata, sourceDigest) return true, nil } diff --git a/cdnsystem/daemon/cdn/manager_test.go b/cdnsystem/daemon/cdn/manager_test.go index 4bddb1f55a2..58e09a81bcc 100644 --- a/cdnsystem/daemon/cdn/manager_test.go +++ b/cdnsystem/daemon/cdn/manager_test.go @@ -61,13 +61,16 @@ func (suite *CDNManagerTestSuite) SetupSuite() { } ctrl := gomock.NewController(suite.T()) progressMgr := mock.NewMockSeedProgressMgr(ctrl) - progressMgr.EXPECT().PublishPiece(taskID, gomock.Any()).Return(nil).Times(98 * 2) + progressMgr.EXPECT().PublishPiece(md5TaskID, gomock.Any()).Return(nil).Times(98 * 2) + progressMgr.EXPECT().PublishPiece(sha256TaskID, gomock.Any()).Return(nil).Times(98 * 2) suite.cm, _ = newManager(config.New(), storeMgr, progressMgr) } -var dragonflyURL = "http://dragonfly.io.com?a=a&b=b&c=c" - -var taskID = idgen.TaskID(dragonflyURL, &base.UrlMeta{Digest: "f1e2488bba4d1267948d9e2f7008571c", Tag: "dragonfly", Filter: "a&b"}) +var ( + dragonflyURL = "http://dragonfly.io.com?a=a&b=b&c=c" + md5TaskID = idgen.TaskID(dragonflyURL, &base.UrlMeta{Digest: "md5:f1e2488bba4d1267948d9e2f7008571c", Tag: "dragonfly", Filter: "a&b"}) + sha256TaskID = idgen.TaskID(dragonflyURL, &base.UrlMeta{Digest: "sha256:b9907b9a5ba2b0223868c201b9addfe2ec1da1b90325d57c34f192966b0a68c5", Tag: "dragonfly", Filter: "a&b"}) +) func (suite *CDNManagerTestSuite) TearDownSuite() { if suite.workHome != "" { @@ -120,40 +123,86 @@ func (suite *CDNManagerTestSuite) TestTriggerCDN() { return fileInfo.Size(), nil }, ).AnyTimes() - sourceTask := &types.SeedTask{ - TaskID: taskID, - URL: dragonflyURL, - TaskURL: urlutils.FilterURLParam(dragonflyURL, []string{"a", "b"}), - SourceFileLength: 9789, - CdnFileLength: 0, - PieceSize: 100, - Header: map[string]string{"md5": "f1e2488bba4d1267948d9e2f7008571c"}, - CdnStatus: types.TaskInfoCdnStatusRunning, - PieceTotal: 0, - RequestMd5: "f1e2488bba4d1267948d9e2f7008571c", - SourceRealMd5: "", - PieceMd5Sign: "", + + tests := []struct { + name string + sourceTask *types.SeedTask + targetTask *types.SeedTask + }{ + { + name: "trigger_md5", + sourceTask: &types.SeedTask{ + TaskID: md5TaskID, + URL: dragonflyURL, + TaskURL: urlutils.FilterURLParam(dragonflyURL, []string{"a", "b"}), + SourceFileLength: 9789, + CdnFileLength: 0, + PieceSize: 100, + Header: map[string]string{"md5": "f1e2488bba4d1267948d9e2f7008571c"}, + CdnStatus: types.TaskInfoCdnStatusRunning, + PieceTotal: 0, + RequestDigest: "md5:f1e2488bba4d1267948d9e2f7008571c", + SourceRealDigest: "", + PieceMd5Sign: "", + }, + targetTask: &types.SeedTask{ + TaskID: md5TaskID, + URL: dragonflyURL, + TaskURL: urlutils.FilterURLParam(dragonflyURL, []string{"a", "b"}), + SourceFileLength: 9789, + CdnFileLength: 9789, + PieceSize: 100, + Header: map[string]string{"md5": "f1e2488bba4d1267948d9e2f7008571c"}, + CdnStatus: types.TaskInfoCdnStatusSuccess, + PieceTotal: 0, + RequestDigest: "md5:f1e2488bba4d1267948d9e2f7008571c", + SourceRealDigest: "md5:f1e2488bba4d1267948d9e2f7008571c", + PieceMd5Sign: "bb138842f338fff90af737e4a6b2c6f8e2a7031ca9d5900bc9b646f6406d890f", + }, + }, + { + name: "trigger_sha256", + sourceTask: &types.SeedTask{ + TaskID: sha256TaskID, + URL: dragonflyURL, + TaskURL: urlutils.FilterURLParam(dragonflyURL, []string{"a", "b"}), + SourceFileLength: 9789, + CdnFileLength: 0, + PieceSize: 100, + Header: map[string]string{"sha256": "b9907b9a5ba2b0223868c201b9addfe2ec1da1b90325d57c34f192966b0a68c5"}, + CdnStatus: types.TaskInfoCdnStatusRunning, + PieceTotal: 0, + RequestDigest: "sha256:b9907b9a5ba2b0223868c201b9addfe2ec1da1b90325d57c34f192966b0a68c5", + SourceRealDigest: "", + PieceMd5Sign: "", + }, + targetTask: &types.SeedTask{ + TaskID: sha256TaskID, + URL: dragonflyURL, + TaskURL: urlutils.FilterURLParam(dragonflyURL, []string{"a", "b"}), + SourceFileLength: 9789, + CdnFileLength: 9789, + PieceSize: 100, + Header: map[string]string{"sha256": "b9907b9a5ba2b0223868c201b9addfe2ec1da1b90325d57c34f192966b0a68c5"}, + CdnStatus: types.TaskInfoCdnStatusSuccess, + PieceTotal: 0, + RequestDigest: "sha256:b9907b9a5ba2b0223868c201b9addfe2ec1da1b90325d57c34f192966b0a68c5", + SourceRealDigest: "sha256:b9907b9a5ba2b0223868c201b9addfe2ec1da1b90325d57c34f192966b0a68c5", + PieceMd5Sign: "bb138842f338fff90af737e4a6b2c6f8e2a7031ca9d5900bc9b646f6406d890f", + }, + }, } - targetTask := &types.SeedTask{ - TaskID: taskID, - URL: dragonflyURL, - TaskURL: urlutils.FilterURLParam(dragonflyURL, []string{"a", "b"}), - SourceFileLength: 9789, - CdnFileLength: 9789, - PieceSize: 100, - Header: map[string]string{"md5": "f1e2488bba4d1267948d9e2f7008571c"}, - CdnStatus: types.TaskInfoCdnStatusSuccess, - PieceTotal: 0, - RequestMd5: "f1e2488bba4d1267948d9e2f7008571c", - SourceRealMd5: "f1e2488bba4d1267948d9e2f7008571c", - PieceMd5Sign: "bb138842f338fff90af737e4a6b2c6f8e2a7031ca9d5900bc9b646f6406d890f", + for _, tt := range tests { + suite.Run(tt.name, func() { + gotSeedTask, err := suite.cm.TriggerCDN(context.Background(), tt.sourceTask) + suite.Nil(err) + suite.Equal(tt.targetTask, gotSeedTask) + cacheSeedTask, err := suite.cm.TriggerCDN(context.Background(), gotSeedTask) + suite.Nil(err) + suite.Equal(tt.targetTask, cacheSeedTask) + }) } - gotSeedTask, err := suite.cm.TriggerCDN(context.Background(), sourceTask) - suite.Nil(err) - suite.Equal(targetTask, gotSeedTask) - cacheSeedTask, err := suite.cm.TriggerCDN(context.Background(), gotSeedTask) - suite.Nil(err) - suite.Equal(targetTask, cacheSeedTask) + // TODO test range download } diff --git a/cdnsystem/daemon/cdn/storage/storage_mgr.go b/cdnsystem/daemon/cdn/storage/storage_mgr.go index bd9024d273a..50a1acf80c5 100644 --- a/cdnsystem/daemon/cdn/storage/storage_mgr.go +++ b/cdnsystem/daemon/cdn/storage/storage_mgr.go @@ -75,19 +75,19 @@ type Manager interface { // FileMetaData type FileMetaData struct { - TaskID string `json:"taskId"` - TaskURL string `json:"taskUrl"` - PieceSize int32 `json:"pieceSize"` - SourceFileLen int64 `json:"sourceFileLen"` - AccessTime int64 `json:"accessTime"` - Interval int64 `json:"interval"` - CdnFileLength int64 `json:"cdnFileLength"` - SourceRealMd5 string `json:"sourceRealMd5"` - PieceMd5Sign string `json:"pieceMd5Sign"` - ExpireInfo map[string]string `json:"expireInfo"` - Finish bool `json:"finish"` - Success bool `json:"success"` - TotalPieceCount int32 `json:"totalPieceCount"` + TaskID string `json:"taskId"` + TaskURL string `json:"taskUrl"` + PieceSize int32 `json:"pieceSize"` + SourceFileLen int64 `json:"sourceFileLen"` + AccessTime int64 `json:"accessTime"` + Interval int64 `json:"interval"` + CdnFileLength int64 `json:"cdnFileLength"` + SourceRealDigest string `json:"sourceRealDigest"` + PieceMd5Sign string `json:"pieceMd5Sign"` + ExpireInfo map[string]string `json:"expireInfo"` + Finish bool `json:"finish"` + Success bool `json:"success"` + TotalPieceCount int32 `json:"totalPieceCount"` //PieceMetaDataSign string `json:"pieceMetaDataSign"` } diff --git a/cdnsystem/daemon/task/manager_test.go b/cdnsystem/daemon/task/manager_test.go index 7530047f1ed..7f19f20e502 100644 --- a/cdnsystem/daemon/task/manager_test.go +++ b/cdnsystem/daemon/task/manager_test.go @@ -40,7 +40,6 @@ type TaskManagerTestSuite struct { func (suite *TaskManagerTestSuite) TestRegister() { dragonflyURL := "http://dragonfly.io.com?a=a&b=b&c=c" - taskID := idgen.TaskID(dragonflyURL, &base.UrlMeta{Filter: "a&b", Tag: "dragonfly", Digest: "f1e2488bba4d1267948d9e2f7008571c"}) ctrl := gomock.NewController(suite.T()) cdnMgr := mock.NewMockCDNMgr(ctrl) progressMgr := mock.NewMockSeedProgressMgr(ctrl) @@ -59,13 +58,28 @@ func (suite *TaskManagerTestSuite) TestRegister() { wantErr bool }{ { - name: "register", + name: "register_md5", args: args{ ctx: context.Background(), req: &types.TaskRegisterRequest{ URL: dragonflyURL, - TaskID: taskID, - Md5: "f1e2488bba4d1267948d9e2f7008571c", + TaskID: idgen.TaskID(dragonflyURL, &base.UrlMeta{Filter: "a&b", Tag: "dragonfly", Digest: "md5:f1e2488bba4d1267948d9e2f7008571c"}), + Digest: "md5:f1e2488bba4d1267948d9e2f7008571c", + Filter: []string{"a", "b"}, + Header: nil, + }, + }, + wantPieceChan: nil, + wantErr: false, + }, + { + name: "register_sha256", + args: args{ + ctx: context.Background(), + req: &types.TaskRegisterRequest{ + URL: dragonflyURL, + TaskID: idgen.TaskID(dragonflyURL, &base.UrlMeta{Filter: "a&b", Tag: "dragonfly", Digest: "sha256:b9907b9a5ba2b0223868c201b9addfe2ec1da1b90325d57c34f192966b0a68c5"}), + Digest: "sha256:b9907b9a5ba2b0223868c201b9addfe2ec1da1b90325d57c34f192966b0a68c5", Filter: []string{"a", "b"}, Header: nil, }, diff --git a/cdnsystem/daemon/task/manager_util.go b/cdnsystem/daemon/task/manager_util.go index 3caa56f34a9..374137af78d 100644 --- a/cdnsystem/daemon/task/manager_util.go +++ b/cdnsystem/daemon/task/manager_util.go @@ -62,7 +62,7 @@ func (tm *Manager) addOrUpdateTask(ctx context.Context, request *types.TaskRegis newTask := &types.SeedTask{ TaskID: taskID, Header: request.Header, - RequestMd5: request.Md5, + RequestDigest: request.Digest, URL: request.URL, TaskURL: taskURL, CdnStatus: types.TaskInfoCdnStatusWaiting, @@ -153,8 +153,8 @@ func (tm *Manager) updateTask(taskID string, updateTaskInfo *types.SeedTask) (*t task.CdnFileLength = updateTaskInfo.CdnFileLength } - if !stringutils.IsBlank(updateTaskInfo.SourceRealMd5) { - task.SourceRealMd5 = updateTaskInfo.SourceRealMd5 + if !stringutils.IsBlank(updateTaskInfo.SourceRealDigest) { + task.SourceRealDigest = updateTaskInfo.SourceRealDigest } if !stringutils.IsBlank(updateTaskInfo.PieceMd5Sign) { @@ -181,14 +181,14 @@ func isSameTask(task1, task2 *types.SeedTask) bool { return false } - if !stringutils.IsBlank(task1.RequestMd5) && !stringutils.IsBlank(task2.RequestMd5) { - if task1.RequestMd5 != task2.RequestMd5 { + if !stringutils.IsBlank(task1.RequestDigest) && !stringutils.IsBlank(task2.RequestDigest) { + if task1.RequestDigest != task2.RequestDigest { return false } } - if !stringutils.IsBlank(task1.RequestMd5) && !stringutils.IsBlank(task2.SourceRealMd5) { - return task1.SourceRealMd5 == task2.RequestMd5 + if !stringutils.IsBlank(task1.RequestDigest) && !stringutils.IsBlank(task2.SourceRealDigest) { + return task1.SourceRealDigest == task2.RequestDigest } return true diff --git a/cdnsystem/server/service/cdn_seed_server.go b/cdnsystem/server/service/cdn_seed_server.go index d59b3f6a93e..21c86289091 100644 --- a/cdnsystem/server/service/cdn_seed_server.go +++ b/cdnsystem/server/service/cdn_seed_server.go @@ -32,6 +32,7 @@ import ( "d7y.io/dragonfly/v2/internal/idgen" "d7y.io/dragonfly/v2/pkg/rpc/base" "d7y.io/dragonfly/v2/pkg/rpc/cdnsystem" + "d7y.io/dragonfly/v2/pkg/util/digestutils" "d7y.io/dragonfly/v2/pkg/util/net/iputils" "d7y.io/dragonfly/v2/pkg/util/net/urlutils" "d7y.io/dragonfly/v2/pkg/util/stringutils" @@ -60,7 +61,11 @@ func constructRegisterRequest(req *cdnsystem.SeedRequest) (*types.TaskRegisterRe header := make(map[string]string) if meta != nil { if !stringutils.IsBlank(meta.Digest) { - header["md5"] = meta.Digest + digest := digestutils.Parse(meta.Digest) + if _, ok := digestutils.Algorithms[digest[0]]; !ok { + return nil, errors.Errorf("unsupported digest algorithm") + } + header["digest"] = meta.Digest } if !stringutils.IsBlank(meta.Range) { header["range"] = meta.Range @@ -72,7 +77,7 @@ func constructRegisterRequest(req *cdnsystem.SeedRequest) (*types.TaskRegisterRe return &types.TaskRegisterRequest{ Header: header, URL: req.Url, - Md5: header["md5"], + Digest: header["digest"], TaskID: req.TaskId, Filter: strings.Split(req.UrlMeta.Filter, "&"), }, nil diff --git a/cdnsystem/types/seed_task_info.go b/cdnsystem/types/seed_task_info.go index 73b865db158..f5509fbca16 100644 --- a/cdnsystem/types/seed_task_info.go +++ b/cdnsystem/types/seed_task_info.go @@ -26,8 +26,8 @@ type SeedTask struct { Header map[string]string `json:"header,omitempty"` CdnStatus string `json:"cdnStatus,omitempty"` PieceTotal int32 `json:"pieceTotal,omitempty"` - RequestMd5 string `json:"requestMd5,omitempty"` - SourceRealMd5 string `json:"sourceRealMd5,omitempty"` + RequestDigest string `json:"requestDigest,omitempty"` + SourceRealDigest string `json:"sourceRealDigest,omitempty"` PieceMd5Sign string `json:"pieceMd5Sign,omitempty"` } @@ -59,10 +59,10 @@ func (task *SeedTask) UpdateStatus(cdnStatus string) { task.CdnStatus = cdnStatus } -func (task *SeedTask) UpdateTaskInfo(cdnStatus, realMD5, pieceMd5Sign string, sourceFileLength, cdnFileLength int64) { +func (task *SeedTask) UpdateTaskInfo(cdnStatus, realDigest, pieceMd5Sign string, sourceFileLength, cdnFileLength int64) { task.CdnStatus = cdnStatus task.PieceMd5Sign = pieceMd5Sign - task.SourceRealMd5 = realMD5 + task.SourceRealDigest = realDigest task.SourceFileLength = sourceFileLength task.CdnFileLength = cdnFileLength } diff --git a/cdnsystem/types/task_register_request.go b/cdnsystem/types/task_register_request.go index 5cdd31c217b..970549f70ca 100644 --- a/cdnsystem/types/task_register_request.go +++ b/cdnsystem/types/task_register_request.go @@ -20,7 +20,7 @@ package types type TaskRegisterRequest struct { URL string `json:"rawURL,omitempty"` TaskID string `json:"taskId,omitempty"` - Md5 string `json:"md5,omitempty"` + Digest string `json:"digest,omitempty"` Filter []string `json:"filter,omitempty"` Header map[string]string `json:"header,omitempty"` } diff --git a/client/dfget/dfget.go b/client/dfget/dfget.go index 241c29d3a39..01fd112ae4d 100644 --- a/client/dfget/dfget.go +++ b/client/dfget/dfget.go @@ -158,9 +158,9 @@ func downloadFromSource(ctx context.Context, cfg *config.DfgetConfig, hdr map[st if !stringutils.IsBlank(cfg.Digest) { parsedHash := digestutils.Parse(cfg.Digest) - realHash := digestutils.HashFile(target.Name(), parsedHash[0]) + realHash := digestutils.HashFile(target.Name(), digestutils.Algorithms[parsedHash[0]]) - if realHash != parsedHash[1] { + if realHash != "" && realHash != parsedHash[1] { return errors.Errorf("%s digest is not matched: real[%s] expected[%s]", parsedHash[0], realHash, parsedHash[1]) } } diff --git a/client/dfget/dfget_test.go b/client/dfget/dfget_test.go index 326e1e0f59c..fd08fb37110 100644 --- a/client/dfget/dfget_test.go +++ b/client/dfget/dfget_test.go @@ -25,7 +25,6 @@ import ( "testing" "d7y.io/dragonfly/v2/client/config" - "d7y.io/dragonfly/v2/internal/constants" "d7y.io/dragonfly/v2/internal/idgen" "d7y.io/dragonfly/v2/pkg/source" sourcemock "d7y.io/dragonfly/v2/pkg/source/mock" @@ -49,7 +48,7 @@ func Test_downloadFromSource(t *testing.T) { cfg := &config.DfgetConfig{ URL: "http://a.b.c/xx", Output: output, - Digest: strings.Join([]string{constants.Sha256Hash, digestutils.Sha256(content)}, ":"), + Digest: strings.Join([]string{digestutils.Sha256Hash.String(), digestutils.Sha256(content)}, ":"), } sourceClient.EXPECT().Download(context.Background(), cfg.URL, nil, nil).Return(ioutil.NopCloser(strings.NewReader(content)), nil) diff --git a/internal/constants/constants.go b/internal/constants/constants.go index 3c647a6894f..cb63bb3ab37 100644 --- a/internal/constants/constants.go +++ b/internal/constants/constants.go @@ -21,7 +21,4 @@ const ( SourcePattern = "source" CDNPattern = "cdn" P2PPattern = "p2p" - - Sha256Hash = "sha256" - Md5Hash = "md5" ) diff --git a/pkg/ratelimiter/limitreader/limit_reader.go b/pkg/ratelimiter/limitreader/limit_reader.go index 44e9490a6a1..4dabf3625cb 100644 --- a/pkg/ratelimiter/limitreader/limit_reader.go +++ b/pkg/ratelimiter/limitreader/limit_reader.go @@ -17,10 +17,12 @@ package limitreader import ( - "crypto/md5" + "fmt" "hash" "io" + "github.com/opencontainers/go-digest" + "d7y.io/dragonfly/v2/pkg/ratelimiter/ratelimiter" "d7y.io/dragonfly/v2/pkg/util/digestutils" ) @@ -28,40 +30,17 @@ import ( // NewLimitReader creates a LimitReader. // src: reader // rate: bytes/second -func NewLimitReader(src io.Reader, rate int64, calculateMd5 bool) *LimitReader { - return NewLimitReaderWithLimiter(newRateLimiterWithDefaultWindow(rate), src, calculateMd5) +func NewLimitReader(src io.Reader, rate int64) *LimitReader { + return NewLimitReaderWithLimiter(newRateLimiterWithDefaultWindow(rate), src) } // NewLimitReaderWithLimiter creates LimitReader with a rateLimiter. // src: reader // rate: bytes/second -func NewLimitReaderWithLimiter(rl *ratelimiter.RateLimiter, src io.Reader, calculateMd5 bool) *LimitReader { - var md5sum hash.Hash - if calculateMd5 { - md5sum = md5.New() - } - return &LimitReader{ - Src: src, - Limiter: rl, - md5sum: md5sum, - } -} - -// NewLimitReaderWithMD5Sum creates LimitReader with a md5 sum. -// src: reader -// rate: bytes/second -func NewLimitReaderWithMD5Sum(src io.Reader, rate int64, md5sum hash.Hash) *LimitReader { - return NewLimitReaderWithLimiterAndMD5Sum(src, newRateLimiterWithDefaultWindow(rate), md5sum) -} - -// NewLimitReaderWithLimiterAndMD5Sum creates LimitReader with rateLimiter and md5 sum. -// src: reader -// rate: bytes/second -func NewLimitReaderWithLimiterAndMD5Sum(src io.Reader, rl *ratelimiter.RateLimiter, md5sum hash.Hash) *LimitReader { +func NewLimitReaderWithLimiter(rl *ratelimiter.RateLimiter, src io.Reader) *LimitReader { return &LimitReader{ Src: src, Limiter: rl, - md5sum: md5sum, } } @@ -71,9 +50,10 @@ func newRateLimiterWithDefaultWindow(rate int64) *ratelimiter.RateLimiter { // LimitReader reads stream with RateLimiter. type LimitReader struct { - Src io.Reader - Limiter *ratelimiter.RateLimiter - md5sum hash.Hash + Src io.Reader + Limiter *ratelimiter.RateLimiter + digest hash.Hash + digestType string } func (lr *LimitReader) Read(p []byte) (n int, err error) { @@ -82,18 +62,30 @@ func (lr *LimitReader) Read(p []byte) (n int, err error) { return n, e } if n > 0 { - if lr.md5sum != nil { - lr.md5sum.Write(p[:n]) + if lr.digest != nil { + lr.digest.Write(p[:n]) } lr.Limiter.AcquireBlocking(int64(n)) } return n, e } -// Md5 calculates the md5 of all contents read. -func (lr *LimitReader) Md5() string { - if lr.md5sum != nil { - return digestutils.ToHashString(lr.md5sum) +// NewLimitReaderWithLimiterAndDigest creates LimitReader with rateLimiter and digest. +// src: reader +// rate: bytes/second +func NewLimitReaderWithLimiterAndDigest(src io.Reader, rl *ratelimiter.RateLimiter, digest hash.Hash, digestType digest.Algorithm) *LimitReader { + return &LimitReader{ + Src: src, + Limiter: rl, + digest: digest, + digestType: digestType.String(), + } +} + +// Digest calculates the digest of all contents read, return value is like : +func (lr *LimitReader) Digest() string { + if lr.digest != nil { + return fmt.Sprintf("%s:%s", lr.digestType, digestutils.ToHashString(lr.digest)) } return "" } diff --git a/pkg/util/digestutils/digest.go b/pkg/util/digestutils/digest.go index dd6b6f309c3..90cdd6da3c5 100644 --- a/pkg/util/digestutils/digest.go +++ b/pkg/util/digestutils/digest.go @@ -26,11 +26,27 @@ import ( "os" "strings" - "d7y.io/dragonfly/v2/internal/constants" + "github.com/opencontainers/go-digest" + "d7y.io/dragonfly/v2/pkg/unit" "d7y.io/dragonfly/v2/pkg/util/fileutils" ) +const ( + Sha256Hash digest.Algorithm = "sha256" + Md5Hash digest.Algorithm = "md5" +) + +var ( + // Algorithms is used to check if an algorithm is supported. + // If algo is not supported, Algorithms[algo] will return empty string. + // Please don't use digest.Algorithm() to convert a string to digest.Algorithm. + Algorithms = map[string]digest.Algorithm{ + Sha256Hash.String(): Sha256Hash, + Md5Hash.String(): Md5Hash, + } +) + func Sha256(values ...string) string { if len(values) == 0 { return "" @@ -62,8 +78,8 @@ func Md5Bytes(bytes []byte) string { } // HashFile computes hash value corresponding to hashType, -// hashType is from constants.Md5Hash and constants.Sha256Hash. -func HashFile(file string, hashType string) string { +// hashType is from digestutils.Md5Hash and digestutils.Sha256Hash. +func HashFile(file string, hashType digest.Algorithm) string { if !fileutils.IsRegular(file) { return "" } @@ -76,9 +92,9 @@ func HashFile(file string, hashType string) string { defer f.Close() var h hash.Hash - if hashType == constants.Md5Hash { + if hashType == Md5Hash { h = md5.New() - } else if hashType == constants.Sha256Hash { + } else if hashType == Sha256Hash { h = sha256.New() } else { return "" @@ -102,3 +118,15 @@ func Parse(digest string) []string { digest = strings.Trim(digest, " ") return strings.Split(digest, ":") } + +func CreateHash(hashType string) hash.Hash { + algo := Algorithms[hashType] + switch algo { + case Sha256Hash: + return sha256.New() + case Md5Hash: + return md5.New() + default: + return nil + } +} diff --git a/pkg/util/digestutils/digest_test.go b/pkg/util/digestutils/digest_test.go index aae00a1b7a5..ae131ce90a0 100644 --- a/pkg/util/digestutils/digest_test.go +++ b/pkg/util/digestutils/digest_test.go @@ -22,7 +22,6 @@ import ( "syscall" "testing" - "d7y.io/dragonfly/v2/internal/constants" "d7y.io/dragonfly/v2/pkg/basic" "d7y.io/dragonfly/v2/pkg/util/fileutils" "github.com/google/uuid" @@ -61,5 +60,5 @@ func TestHashFile(t *testing.T) { f.Write([]byte("hello")) f.Close() - assert.Equal(t, expected, HashFile(path, constants.Md5Hash)) + assert.Equal(t, expected, HashFile(path, Md5Hash)) } diff --git a/scheduler/tasks/tasks.go b/scheduler/tasks/tasks.go index f7766f81a35..4182875f55b 100644 --- a/scheduler/tasks/tasks.go +++ b/scheduler/tasks/tasks.go @@ -2,7 +2,6 @@ package tasks import ( "context" - "strings" "time" "github.com/go-playground/validator/v10" @@ -139,11 +138,7 @@ func (t *tasks) preheat(req string) error { Header: request.Headers, Tag: request.Tag, Filter: request.Filter, - } - - //TODO(@zzy987) CDN don't support sha256 - if strings.HasPrefix(request.Digest, "md5") { - meta.Digest = request.Digest + Digest: request.Digest, } // Generate range