From d4718fb802669fb35d4fa17915291889709ef267 Mon Sep 17 00:00:00 2001 From: sunwp <244372610@qq.com> Date: Tue, 7 Dec 2021 17:11:28 +0800 Subject: [PATCH] correct metadata spell Signed-off-by: sunwp <244372610@qq.com> --- cdn/supervisor/cdn/cache_data_mgr.go | 68 ++++++++--------- cdn/supervisor/cdn/cache_detector.go | 76 +++++++++---------- cdn/supervisor/cdn/cache_detector_test.go | 40 +++++----- cdn/supervisor/cdn/cache_writer.go | 2 +- cdn/supervisor/cdn/cache_writer_test.go | 4 +- cdn/supervisor/cdn/manager.go | 8 +- cdn/supervisor/cdn/storage/disk/disk.go | 30 ++++---- cdn/supervisor/cdn/storage/hybrid/hybrid.go | 28 +++---- .../cdn/storage/mock/mock_storage_mgr.go | 38 +++++----- cdn/supervisor/cdn/storage/path_util.go | 16 ++-- cdn/supervisor/cdn/storage/storage.go | 30 ++++---- cdn/supervisor/cdn/storage/storage_gc.go | 22 +++--- cdn/supervisor/progress/manager.go | 4 +- client/daemon/peer/peertask_file_callback.go | 4 +- client/daemon/peer/peertask_manager.go | 4 +- client/daemon/peer/peertask_reuse.go | 2 +- client/daemon/peer/peertask_stream.go | 4 +- .../daemon/peer/peertask_stream_callback.go | 4 +- client/daemon/peer/piece_manager.go | 14 ++-- client/daemon/storage/const.go | 2 +- client/daemon/storage/local_storage.go | 14 ++-- client/daemon/storage/local_storage_test.go | 20 ++--- client/daemon/storage/metadata.go | 16 ++-- client/daemon/storage/storage_manager.go | 64 ++++++++-------- client/daemon/test/mock/storage/manager.go | 12 +-- client/daemon/upload/upload_manager.go | 4 +- 26 files changed, 265 insertions(+), 265 deletions(-) diff --git a/cdn/supervisor/cdn/cache_data_mgr.go b/cdn/supervisor/cdn/cache_data_mgr.go index 6c0dd149c5e..60286717268 100644 --- a/cdn/supervisor/cdn/cache_data_mgr.go +++ b/cdn/supervisor/cdn/cache_data_mgr.go @@ -46,11 +46,11 @@ func newCacheDataManager(storeMgr storage.Manager) *cacheDataManager { } } -// writeFileMetaDataByTask stores the metadata of task by task to storage. -func (mm *cacheDataManager) writeFileMetaDataByTask(task *types.SeedTask) (*storage.FileMetaData, error) { +// writeFileMetadataByTask stores the metadata of task by task to storage. +func (mm *cacheDataManager) writeFileMetadataByTask(task *types.SeedTask) (*storage.FileMetadata, error) { mm.cacheLocker.Lock(task.TaskID, false) defer mm.cacheLocker.UnLock(task.TaskID, false) - metaData := &storage.FileMetaData{ + metadata := &storage.FileMetadata{ TaskID: task.TaskID, TaskURL: task.TaskURL, PieceSize: task.PieceSize, @@ -60,11 +60,11 @@ func (mm *cacheDataManager) writeFileMetaDataByTask(task *types.SeedTask) (*stor TotalPieceCount: task.PieceTotal, } - if err := mm.storage.WriteFileMetaData(task.TaskID, metaData); err != nil { + if err := mm.storage.WriteFileMetadata(task.TaskID, metadata); err != nil { return nil, errors.Wrapf(err, "write task %s metadata file", task.TaskID) } - return metaData, nil + return metadata, nil } // updateAccessTime update access and interval @@ -72,73 +72,73 @@ func (mm *cacheDataManager) updateAccessTime(taskID string, accessTime int64) er mm.cacheLocker.Lock(taskID, false) defer mm.cacheLocker.UnLock(taskID, false) - originMetaData, err := mm.readFileMetaData(taskID) + originMetadata, err := mm.readFileMetadata(taskID) if err != nil { return err } // access interval - interval := accessTime - originMetaData.AccessTime - originMetaData.Interval = interval + interval := accessTime - originMetadata.AccessTime + originMetadata.Interval = interval if interval <= 0 { logger.WithTaskID(taskID).Warnf("file hit interval: %d, accessTime: %s", interval, timeutils.MillisUnixTime(accessTime)) - originMetaData.Interval = 0 + originMetadata.Interval = 0 } - originMetaData.AccessTime = accessTime + originMetadata.AccessTime = accessTime - return mm.storage.WriteFileMetaData(taskID, originMetaData) + return mm.storage.WriteFileMetadata(taskID, originMetadata) } func (mm *cacheDataManager) updateExpireInfo(taskID string, expireInfo map[string]string) error { mm.cacheLocker.Lock(taskID, false) defer mm.cacheLocker.UnLock(taskID, false) - originMetaData, err := mm.readFileMetaData(taskID) + originMetadata, err := mm.readFileMetadata(taskID) if err != nil { return err } - originMetaData.ExpireInfo = expireInfo + originMetadata.ExpireInfo = expireInfo - return mm.storage.WriteFileMetaData(taskID, originMetaData) + return mm.storage.WriteFileMetadata(taskID, originMetadata) } -func (mm *cacheDataManager) updateStatusAndResult(taskID string, metaData *storage.FileMetaData) error { +func (mm *cacheDataManager) updateStatusAndResult(taskID string, metadata *storage.FileMetadata) error { mm.cacheLocker.Lock(taskID, false) defer mm.cacheLocker.UnLock(taskID, false) - originMetaData, err := mm.readFileMetaData(taskID) + originMetadata, err := mm.readFileMetadata(taskID) if err != nil { return err } - originMetaData.Finish = metaData.Finish - originMetaData.Success = metaData.Success - if originMetaData.Success { - originMetaData.CdnFileLength = metaData.CdnFileLength - originMetaData.SourceFileLen = metaData.SourceFileLen - if metaData.TotalPieceCount > 0 { - originMetaData.TotalPieceCount = metaData.TotalPieceCount + originMetadata.Finish = metadata.Finish + originMetadata.Success = metadata.Success + if originMetadata.Success { + originMetadata.CdnFileLength = metadata.CdnFileLength + originMetadata.SourceFileLen = metadata.SourceFileLen + if metadata.TotalPieceCount > 0 { + originMetadata.TotalPieceCount = metadata.TotalPieceCount } - if !stringutils.IsBlank(metaData.SourceRealDigest) { - originMetaData.SourceRealDigest = metaData.SourceRealDigest + if !stringutils.IsBlank(metadata.SourceRealDigest) { + originMetadata.SourceRealDigest = metadata.SourceRealDigest } - if !stringutils.IsBlank(metaData.PieceMd5Sign) { - originMetaData.PieceMd5Sign = metaData.PieceMd5Sign + if !stringutils.IsBlank(metadata.PieceMd5Sign) { + originMetadata.PieceMd5Sign = metadata.PieceMd5Sign } } - return mm.storage.WriteFileMetaData(taskID, originMetaData) + return mm.storage.WriteFileMetadata(taskID, originMetadata) } -// appendPieceMetaData append piece meta info to storage -func (mm *cacheDataManager) appendPieceMetaData(taskID string, record *storage.PieceMetaRecord) error { +// appendPieceMetadata append piece meta info to storage +func (mm *cacheDataManager) appendPieceMetadata(taskID string, record *storage.PieceMetaRecord) error { mm.cacheLocker.Lock(taskID, false) defer mm.cacheLocker.UnLock(taskID, false) // write to the storage - return mm.storage.AppendPieceMetaData(taskID, record) + return mm.storage.AppendPieceMetadata(taskID, record) } -// appendPieceMetaData append piece meta info to storage +// appendPieceMetadata append piece meta info to storage func (mm *cacheDataManager) writePieceMetaRecords(taskID string, records []*storage.PieceMetaRecord) error { mm.cacheLocker.Lock(taskID, false) defer mm.cacheLocker.UnLock(taskID, false) @@ -183,8 +183,8 @@ func (mm *cacheDataManager) getPieceMd5Sign(taskID string) (string, []*storage.P return digestutils.Sha256(pieceMd5...), pieceMetaRecords, nil } -func (mm *cacheDataManager) readFileMetaData(taskID string) (*storage.FileMetaData, error) { - fileMeta, err := mm.storage.ReadFileMetaData(taskID) +func (mm *cacheDataManager) readFileMetadata(taskID string) (*storage.FileMetadata, error) { + fileMeta, err := mm.storage.ReadFileMetadata(taskID) if err != nil { return nil, errors.Wrapf(err, "read file metadata of task %s from storage", taskID) } diff --git a/cdn/supervisor/cdn/cache_detector.go b/cdn/supervisor/cdn/cache_detector.go index fd41309ee1e..823a318539c 100644 --- a/cdn/supervisor/cdn/cache_detector.go +++ b/cdn/supervisor/cdn/cache_detector.go @@ -47,11 +47,11 @@ type cacheDetector struct { type cacheResult struct { breakPoint int64 // break-point of task file pieceMetaRecords []*storage.PieceMetaRecord // piece meta data records of task - fileMetaData *storage.FileMetaData // file meta data of task + fileMetadata *storage.FileMetadata // file meta data of task } func (s *cacheResult) String() string { - return fmt.Sprintf("{breakNum: %d, pieceMetaRecords: %#v, fileMetaData: %#v}", s.breakPoint, s.pieceMetaRecords, s.fileMetaData) + return fmt.Sprintf("{breakNum: %d, pieceMetaRecords: %#v, fileMetadata: %#v}", s.breakPoint, s.pieceMetaRecords, s.fileMetadata) } // newCacheDetector create a new cache detector @@ -75,10 +75,10 @@ func (cd *cacheDetector) detectCache(ctx context.Context, task *types.SeedTask, result, err = cd.doDetect(ctx, task, fileDigest) if err != nil { task.Log().Infof("failed to detect cache, reset cache: %v", err) - metaData, err := cd.resetCache(task) + metadata, err := cd.resetCache(task) if err == nil { result = &cacheResult{ - fileMetaData: metaData, + fileMetadata: metadata, } return result, nil } @@ -90,16 +90,16 @@ func (cd *cacheDetector) detectCache(ctx context.Context, task *types.SeedTask, return result, nil } -// doDetect the actual detect action which detects file metaData and pieces metaData of specific task +// doDetect the actual detect action which detects file metadata and pieces metadata of specific task func (cd *cacheDetector) doDetect(ctx context.Context, task *types.SeedTask, fileDigest hash.Hash) (result *cacheResult, err error) { span := trace.SpanFromContext(ctx) - fileMetaData, err := cd.cacheDataManager.readFileMetaData(task.TaskID) + fileMetadata, err := cd.cacheDataManager.readFileMetadata(task.TaskID) if err != nil { span.RecordError(err) return nil, errors.Wrapf(err, "read file meta data of task %s", task.TaskID) } span.SetAttributes() - if err := checkSameFile(task, fileMetaData); err != nil { + if err := checkSameFile(task, fileMetadata); err != nil { return nil, errors.Wrapf(err, "check same file") } checkExpiredRequest, err := source.NewRequestWithContext(ctx, task.URL, task.Header) @@ -107,8 +107,8 @@ func (cd *cacheDetector) doDetect(ctx context.Context, task *types.SeedTask, fil return nil, errors.Wrapf(err, "create request") } expired, err := source.IsExpired(checkExpiredRequest, &source.ExpireInfo{ - LastModified: fileMetaData.ExpireInfo[source.LastModified], - ETag: fileMetaData.ExpireInfo[source.ETag], + LastModified: fileMetadata.ExpireInfo[source.LastModified], + ETag: fileMetadata.ExpireInfo[source.ETag], }) if err != nil { // If the check fails, the resource is regarded as not expired to prevent the source from being knocked down @@ -120,9 +120,9 @@ func (cd *cacheDetector) doDetect(ctx context.Context, task *types.SeedTask, fil return nil, errors.Errorf("resource %s has expired", task.TaskURL) } // not expired - if fileMetaData.Finish { + if fileMetadata.Finish { // quickly detect the cache situation through the meta data - return cd.parseByReadMetaFile(task.TaskID, fileMetaData) + return cd.parseByReadMetaFile(task.TaskID, fileMetadata) } // check if the resource supports range request. if so, // detect the cache situation by reading piece meta and data file @@ -138,20 +138,20 @@ func (cd *cacheDetector) doDetect(ctx context.Context, task *types.SeedTask, fil if !supportRange { return nil, errors.Errorf("resource %s is not support range request", task.URL) } - return cd.parseByReadFile(task.TaskID, fileMetaData, fileDigest) + return cd.parseByReadFile(task.TaskID, fileMetadata, fileDigest) } // parseByReadMetaFile detect cache by read meta and pieceMeta files of task -func (cd *cacheDetector) parseByReadMetaFile(taskID string, fileMetaData *storage.FileMetaData) (*cacheResult, error) { - if !fileMetaData.Success { +func (cd *cacheDetector) parseByReadMetaFile(taskID string, fileMetadata *storage.FileMetadata) (*cacheResult, error) { + if !fileMetadata.Success { return nil, fmt.Errorf("success flag of taskID %s is false", taskID) } - pieceMetaRecords, err := cd.cacheDataManager.readAndCheckPieceMetaRecords(taskID, fileMetaData.PieceMd5Sign) + pieceMetaRecords, err := cd.cacheDataManager.readAndCheckPieceMetaRecords(taskID, fileMetadata.PieceMd5Sign) if err != nil { return nil, errors.Wrapf(err, "check piece meta integrity") } - if fileMetaData.TotalPieceCount > 0 && len(pieceMetaRecords) != int(fileMetaData.TotalPieceCount) { - err := cdnerrors.ErrInconsistentValues{Expected: fileMetaData.TotalPieceCount, Actual: len(pieceMetaRecords)} + if fileMetadata.TotalPieceCount > 0 && len(pieceMetaRecords) != int(fileMetadata.TotalPieceCount) { + err := cdnerrors.ErrInconsistentValues{Expected: fileMetadata.TotalPieceCount, Actual: len(pieceMetaRecords)} return nil, errors.Wrapf(err, "compare file piece count") } storageInfo, err := cd.cacheDataManager.statDownloadFile(taskID) @@ -159,9 +159,9 @@ func (cd *cacheDetector) parseByReadMetaFile(taskID string, fileMetaData *storag return nil, errors.Wrapf(err, "get cdn file length") } // check file data integrity by file size - if fileMetaData.CdnFileLength != storageInfo.Size { + if fileMetadata.CdnFileLength != storageInfo.Size { err := cdnerrors.ErrInconsistentValues{ - Expected: fileMetaData.CdnFileLength, + Expected: fileMetadata.CdnFileLength, Actual: storageInfo.Size, } return nil, errors.Wrapf(err, "compare file cdn file length") @@ -169,12 +169,12 @@ func (cd *cacheDetector) parseByReadMetaFile(taskID string, fileMetaData *storag return &cacheResult{ breakPoint: -1, pieceMetaRecords: pieceMetaRecords, - fileMetaData: fileMetaData, + fileMetadata: fileMetadata, }, nil } // parseByReadFile detect cache by read pieceMeta and data files of task -func (cd *cacheDetector) parseByReadFile(taskID string, metaData *storage.FileMetaData, fileDigest hash.Hash) (*cacheResult, error) { +func (cd *cacheDetector) parseByReadFile(taskID string, metadata *storage.FileMetadata, fileDigest hash.Hash) (*cacheResult, error) { reader, err := cd.cacheDataManager.readDownloadFile(taskID) if err != nil { return nil, errors.Wrapf(err, "read download data file") @@ -210,11 +210,11 @@ func (cd *cacheDetector) parseByReadFile(taskID string, metaData *storage.FileMe } } // TODO already download done, piece 信息已经写完但是meta信息还没有完成更新 - //if metaData.SourceFileLen >=0 && int64(breakPoint) == metaData.SourceFileLen { + //if metadata.SourceFileLen >=0 && int64(breakPoint) == metadata.SourceFileLen { // return &cacheResult{ // breakPoint: -1, // pieceMetaRecords: pieceMetaRecords, - // fileMetaData: metaData, + // fileMetadata: metadata, // fileMd5: fileMd5, // }, nil //} @@ -222,45 +222,45 @@ func (cd *cacheDetector) parseByReadFile(taskID string, metaData *storage.FileMe return &cacheResult{ breakPoint: int64(breakPoint), pieceMetaRecords: pieceMetaRecords, - fileMetaData: metaData, + fileMetadata: metadata, }, nil } // resetCache -func (cd *cacheDetector) resetCache(task *types.SeedTask) (*storage.FileMetaData, error) { +func (cd *cacheDetector) resetCache(task *types.SeedTask) (*storage.FileMetadata, error) { err := cd.cacheDataManager.resetRepo(task) if err != nil { return nil, err } // initialize meta data file - return cd.cacheDataManager.writeFileMetaDataByTask(task) + return cd.cacheDataManager.writeFileMetadataByTask(task) } /* helper functions */ // checkSameFile check whether meta file is modified -func checkSameFile(task *types.SeedTask, metaData *storage.FileMetaData) error { - if task == nil || metaData == nil { - return errors.Errorf("task or metaData is nil, task: %v, metaData: %v", task, metaData) +func checkSameFile(task *types.SeedTask, metadata *storage.FileMetadata) error { + if task == nil || metadata == nil { + return errors.Errorf("task or metadata is nil, task: %v, metadata: %v", task, metadata) } - if metaData.PieceSize != task.PieceSize { - return errors.Errorf("meta piece size(%d) is not equals with task piece size(%d)", metaData.PieceSize, + if metadata.PieceSize != task.PieceSize { + return errors.Errorf("meta piece size(%d) is not equals with task piece size(%d)", metadata.PieceSize, task.PieceSize) } - if metaData.TaskID != task.TaskID { - return errors.Errorf("meta task TaskId(%s) is not equals with task TaskId(%s)", metaData.TaskID, task.TaskID) + if metadata.TaskID != task.TaskID { + return errors.Errorf("meta task TaskId(%s) is not equals with task TaskId(%s)", metadata.TaskID, task.TaskID) } - if metaData.TaskURL != task.TaskURL { - return errors.Errorf("meta task taskUrl(%s) is not equals with task taskUrl(%s)", metaData.TaskURL, task.URL) + if metadata.TaskURL != task.TaskURL { + return errors.Errorf("meta task taskUrl(%s) is not equals with task taskUrl(%s)", metadata.TaskURL, task.URL) } - if !stringutils.IsBlank(metaData.SourceRealDigest) && !stringutils.IsBlank(task.RequestDigest) && - metaData.SourceRealDigest != task.RequestDigest { + if !stringutils.IsBlank(metadata.SourceRealDigest) && !stringutils.IsBlank(task.RequestDigest) && + metadata.SourceRealDigest != task.RequestDigest { return errors.Errorf("meta task source digest(%s) is not equals with task request digest(%s)", - metaData.SourceRealDigest, task.RequestDigest) + metadata.SourceRealDigest, task.RequestDigest) } return nil } diff --git a/cdn/supervisor/cdn/cache_detector_test.go b/cdn/supervisor/cdn/cache_detector_test.go index 6d8b2e5b8a2..fb127a73349 100644 --- a/cdn/supervisor/cdn/cache_detector_test.go +++ b/cdn/supervisor/cdn/cache_detector_test.go @@ -60,11 +60,11 @@ func (suite *CacheDetectorTestSuite) SetupSuite() { storageMgr := storageMock.NewMockManager(ctrl) cacheDataManager := newCacheDataManager(storageMgr) suite.detector = newCacheDetector(cacheDataManager) - storageMgr.EXPECT().ReadFileMetaData(fullExpiredCache.taskID).Return(fullExpiredCache.fileMeta, nil).AnyTimes() - storageMgr.EXPECT().ReadFileMetaData(fullNoExpiredCache.taskID).Return(fullNoExpiredCache.fileMeta, nil).AnyTimes() - storageMgr.EXPECT().ReadFileMetaData(partialNotSupportRangeCache.taskID).Return(partialNotSupportRangeCache.fileMeta, nil).AnyTimes() - storageMgr.EXPECT().ReadFileMetaData(partialSupportRangeCache.taskID).Return(partialSupportRangeCache.fileMeta, nil).AnyTimes() - storageMgr.EXPECT().ReadFileMetaData(noCache.taskID).Return(noCache.fileMeta, cdnerrors.ErrFileNotExist{}).AnyTimes() + storageMgr.EXPECT().ReadFileMetadata(fullExpiredCache.taskID).Return(fullExpiredCache.fileMeta, nil).AnyTimes() + storageMgr.EXPECT().ReadFileMetadata(fullNoExpiredCache.taskID).Return(fullNoExpiredCache.fileMeta, nil).AnyTimes() + storageMgr.EXPECT().ReadFileMetadata(partialNotSupportRangeCache.taskID).Return(partialNotSupportRangeCache.fileMeta, nil).AnyTimes() + storageMgr.EXPECT().ReadFileMetadata(partialSupportRangeCache.taskID).Return(partialSupportRangeCache.fileMeta, nil).AnyTimes() + storageMgr.EXPECT().ReadFileMetadata(noCache.taskID).Return(noCache.fileMeta, cdnerrors.ErrFileNotExist{}).AnyTimes() storageMgr.EXPECT().ReadDownloadFile(fullNoExpiredCache.taskID).DoAndReturn( func(taskID string) (io.ReadCloser, error) { content, err := ioutil.ReadFile("../../testdata/cdn/go.html") @@ -118,7 +118,7 @@ var expiredAndSupportURL, expiredAndNotSupportURL, noExpiredAndSupportURL, noExp type mockData struct { taskID string pieces []*storage.PieceMetaRecord - fileMeta *storage.FileMetaData + fileMeta *storage.FileMetadata reader io.ReadCloser } @@ -222,8 +222,8 @@ var fullPieceMetaRecords = append(partialPieceMetaRecords, &storage.PieceMetaRec PieceStyle: 1, }) -func newCompletedFileMeta(taskID string, URL string, success bool) *storage.FileMetaData { - return &storage.FileMetaData{ +func newCompletedFileMeta(taskID string, URL string, success bool) *storage.FileMetadata { + return &storage.FileMetadata{ TaskID: taskID, TaskURL: URL, PieceSize: 2000, @@ -240,8 +240,8 @@ func newCompletedFileMeta(taskID string, URL string, success bool) *storage.File } } -func newPartialFileMeta(taskID string, URL string) *storage.FileMetaData { - return &storage.FileMetaData{ +func newPartialFileMeta(taskID string, URL string) *storage.FileMetadata { + return &storage.FileMetadata{ TaskID: taskID, TaskURL: URL, PieceSize: 2000, @@ -294,7 +294,7 @@ func (suite *CacheDetectorTestSuite) TestDetectCache() { want: &cacheResult{ breakPoint: 4000, pieceMetaRecords: partialPieceMetaRecords, - fileMetaData: newPartialFileMeta(partialAndSupportCacheTask, noExpiredAndSupportURL), + fileMetadata: newPartialFileMeta(partialAndSupportCacheTask, noExpiredAndSupportURL), }, wantErr: false, }, @@ -326,7 +326,7 @@ func (suite *CacheDetectorTestSuite) TestDetectCache() { want: &cacheResult{ breakPoint: -1, pieceMetaRecords: fullPieceMetaRecords, - fileMetaData: newCompletedFileMeta(fullCacheNotExpiredTask, noExpiredAndNotSupportURL, true), + fileMetadata: newCompletedFileMeta(fullCacheNotExpiredTask, noExpiredAndNotSupportURL, true), }, wantErr: false, }, @@ -356,7 +356,7 @@ func (suite *CacheDetectorTestSuite) TestDetectCache() { func (suite *CacheDetectorTestSuite) TestParseByReadFile() { type args struct { taskID string - metaData *storage.FileMetaData + metadata *storage.FileMetadata } tests := []struct { name string @@ -368,19 +368,19 @@ func (suite *CacheDetectorTestSuite) TestParseByReadFile() { name: "partial And SupportCacheTask", args: args{ taskID: partialSupportRangeCache.taskID, - metaData: partialSupportRangeCache.fileMeta, + metadata: partialSupportRangeCache.fileMeta, }, want: &cacheResult{ breakPoint: 4000, pieceMetaRecords: partialSupportRangeCache.pieces, - fileMetaData: partialSupportRangeCache.fileMeta, + fileMetadata: partialSupportRangeCache.fileMeta, }, wantErr: false, }, } for _, tt := range tests { suite.Run(tt.name, func() { - got, err := suite.detector.parseByReadFile(tt.args.taskID, tt.args.metaData, md5.New()) + got, err := suite.detector.parseByReadFile(tt.args.taskID, tt.args.metadata, md5.New()) suite.Equal(tt.want, got) suite.Equal(tt.wantErr, err != nil) }) @@ -390,7 +390,7 @@ func (suite *CacheDetectorTestSuite) TestParseByReadFile() { func (suite *CacheDetectorTestSuite) TestParseByReadMetaFile() { type args struct { taskID string - fileMetaData *storage.FileMetaData + fileMetadata *storage.FileMetadata } tests := []struct { name string @@ -402,19 +402,19 @@ func (suite *CacheDetectorTestSuite) TestParseByReadMetaFile() { name: "parse full cache file meta", args: args{ taskID: fullNoExpiredCache.taskID, - fileMetaData: fullNoExpiredCache.fileMeta, + fileMetadata: fullNoExpiredCache.fileMeta, }, want: &cacheResult{ breakPoint: -1, pieceMetaRecords: fullNoExpiredCache.pieces, - fileMetaData: fullNoExpiredCache.fileMeta, + fileMetadata: fullNoExpiredCache.fileMeta, }, wantErr: false, }, } for _, tt := range tests { suite.Run(tt.name, func() { - got, err := suite.detector.parseByReadMetaFile(tt.args.taskID, tt.args.fileMetaData) + got, err := suite.detector.parseByReadMetaFile(tt.args.taskID, tt.args.fileMetadata) suite.Equal(tt.wantErr, err != nil) suite.Equal(tt.want, got) }) diff --git a/cdn/supervisor/cdn/cache_writer.go b/cdn/supervisor/cdn/cache_writer.go index 398be77cd14..e4f223fe756 100644 --- a/cdn/supervisor/cdn/cache_writer.go +++ b/cdn/supervisor/cdn/cache_writer.go @@ -181,7 +181,7 @@ func (cw *cacheWriter) writerPool(ctx context.Context, wg *sync.WaitGroup, routi PieceStyle: pieceStyle, } // write piece meta to storage - if err = cw.cacheDataManager.appendPieceMetaData(p.taskID, pieceRecord); err != nil { + if err = cw.cacheDataManager.appendPieceMetadata(p.taskID, pieceRecord); err != nil { logger.Errorf("write piece meta file: %v", err) continue } diff --git a/cdn/supervisor/cdn/cache_writer_test.go b/cdn/supervisor/cdn/cache_writer_test.go index 0d41b409b02..e8ad1e3c5c5 100644 --- a/cdn/supervisor/cdn/cache_writer_test.go +++ b/cdn/supervisor/cdn/cache_writer_test.go @@ -147,7 +147,7 @@ func (suite *CacheWriterTestSuite) TestStartWriter() { detectResult: &cacheResult{ breakPoint: 0, pieceMetaRecords: nil, - fileMetaData: nil, + fileMetadata: nil, }, }, result: &downloadMetadata{ @@ -169,7 +169,7 @@ func (suite *CacheWriterTestSuite) TestStartWriter() { detectResult: &cacheResult{ breakPoint: 0, pieceMetaRecords: nil, - fileMetaData: nil, + fileMetadata: nil, }, }, result: &downloadMetadata{ diff --git a/cdn/supervisor/cdn/manager.go b/cdn/supervisor/cdn/manager.go index 272122db6ce..dba0a323752 100644 --- a/cdn/supervisor/cdn/manager.go +++ b/cdn/supervisor/cdn/manager.go @@ -119,8 +119,8 @@ func (cm *Manager) TriggerCDN(ctx context.Context, task *types.SeedTask) (seedTa // full cache if detectResult.breakPoint == -1 { task.Log().Infof("cache full hit on local") - seedTask.UpdateTaskInfo(types.TaskInfoCdnStatusSuccess, detectResult.fileMetaData.SourceRealDigest, detectResult.fileMetaData.PieceMd5Sign, - detectResult.fileMetaData.SourceFileLen, detectResult.fileMetaData.CdnFileLength) + seedTask.UpdateTaskInfo(types.TaskInfoCdnStatusSuccess, detectResult.fileMetadata.SourceRealDigest, detectResult.fileMetadata.PieceMd5Sign, + detectResult.fileMetadata.SourceFileLen, detectResult.fileMetadata.CdnFileLength) return seedTask, nil } server.StatSeedStart(task.TaskID, task.URL) @@ -176,7 +176,7 @@ func (cm *Manager) TryFreeSpace(fileLength int64) (bool, error) { } func (cm *Manager) handleCDNResult(task *types.SeedTask, sourceDigest string, downloadMetadata *downloadMetadata) (bool, error) { - task.Log().Debugf("handle cdn result, downloadMetaData: %#v", downloadMetadata) + task.Log().Debugf("handle cdn result, downloadMetadata: %#v", downloadMetadata) var isSuccess = true var errorMsg string // check md5 @@ -203,7 +203,7 @@ func (cm *Manager) handleCDNResult(task *types.SeedTask, sourceDigest string, do if !isSuccess { cdnFileLength = 0 } - if err := cm.cacheDataManager.updateStatusAndResult(task.TaskID, &storage.FileMetaData{ + if err := cm.cacheDataManager.updateStatusAndResult(task.TaskID, &storage.FileMetadata{ Finish: true, Success: isSuccess, SourceRealDigest: sourceDigest, diff --git a/cdn/supervisor/cdn/storage/disk/disk.go b/cdn/supervisor/cdn/storage/disk/disk.go index 4e52bd87c05..550eaf2df1c 100644 --- a/cdn/supervisor/cdn/storage/disk/disk.go +++ b/cdn/supervisor/cdn/storage/disk/disk.go @@ -106,12 +106,12 @@ func (s *diskStorageMgr) Initialize(taskMgr supervisor.SeedTaskMgr) { s.cleaner, _ = storage.NewStorageCleaner(diskGcConfig, s.diskDriver, s, taskMgr) } -func (s *diskStorageMgr) AppendPieceMetaData(taskID string, pieceRecord *storage.PieceMetaRecord) error { - return s.diskDriver.PutBytes(storage.GetAppendPieceMetaDataRaw(taskID), []byte(pieceRecord.String()+"\n")) +func (s *diskStorageMgr) AppendPieceMetadata(taskID string, pieceRecord *storage.PieceMetaRecord) error { + return s.diskDriver.PutBytes(storage.GetAppendPieceMetadataRaw(taskID), []byte(pieceRecord.String()+"\n")) } func (s *diskStorageMgr) ReadPieceMetaRecords(taskID string) ([]*storage.PieceMetaRecord, error) { - readBytes, err := s.diskDriver.GetBytes(storage.GetPieceMetaDataRaw(taskID)) + readBytes, err := s.diskDriver.GetBytes(storage.GetPieceMetadataRaw(taskID)) if err != nil { return nil, err } @@ -160,25 +160,25 @@ func (s *diskStorageMgr) WriteDownloadFile(taskID string, offset int64, len int6 return s.diskDriver.Put(raw, data) } -func (s *diskStorageMgr) ReadFileMetaData(taskID string) (*storage.FileMetaData, error) { - bytes, err := s.diskDriver.GetBytes(storage.GetTaskMetaDataRaw(taskID)) +func (s *diskStorageMgr) ReadFileMetadata(taskID string) (*storage.FileMetadata, error) { + bytes, err := s.diskDriver.GetBytes(storage.GetTaskMetadataRaw(taskID)) if err != nil { return nil, errors.Wrapf(err, "get metadata bytes") } - metaData := &storage.FileMetaData{} - if err := json.Unmarshal(bytes, metaData); err != nil { + metadata := &storage.FileMetadata{} + if err := json.Unmarshal(bytes, metadata); err != nil { return nil, errors.Wrapf(err, "unmarshal metadata bytes") } - return metaData, nil + return metadata, nil } -func (s *diskStorageMgr) WriteFileMetaData(taskID string, metaData *storage.FileMetaData) error { - data, err := json.Marshal(metaData) +func (s *diskStorageMgr) WriteFileMetadata(taskID string, metadata *storage.FileMetadata) error { + data, err := json.Marshal(metadata) if err != nil { return errors.Wrapf(err, "marshal metadata") } - return s.diskDriver.PutBytes(storage.GetTaskMetaDataRaw(taskID), data) + return s.diskDriver.PutBytes(storage.GetTaskMetadataRaw(taskID), data) } func (s *diskStorageMgr) WritePieceMetaRecords(taskID string, records []*storage.PieceMetaRecord) error { @@ -186,14 +186,14 @@ func (s *diskStorageMgr) WritePieceMetaRecords(taskID string, records []*storage for i := range records { recordStrs = append(recordStrs, records[i].String()) } - pieceRaw := storage.GetPieceMetaDataRaw(taskID) + pieceRaw := storage.GetPieceMetadataRaw(taskID) pieceRaw.Trunc = true pieceRaw.TruncSize = 0 return s.diskDriver.PutBytes(pieceRaw, []byte(strings.Join(recordStrs, "\n"))) } func (s *diskStorageMgr) ReadPieceMetaBytes(taskID string) ([]byte, error) { - return s.diskDriver.GetBytes(storage.GetPieceMetaDataRaw(taskID)) + return s.diskDriver.GetBytes(storage.GetPieceMetadataRaw(taskID)) } func (s *diskStorageMgr) ReadDownloadFile(taskID string) (io.ReadCloser, error) { @@ -214,10 +214,10 @@ func (s *diskStorageMgr) CreateUploadLink(taskID string) error { } func (s *diskStorageMgr) DeleteTask(taskID string) error { - if err := s.diskDriver.Remove(storage.GetTaskMetaDataRaw(taskID)); err != nil && !cdnerrors.IsFileNotExist(err) { + if err := s.diskDriver.Remove(storage.GetTaskMetadataRaw(taskID)); err != nil && !cdnerrors.IsFileNotExist(err) { return err } - if err := s.diskDriver.Remove(storage.GetPieceMetaDataRaw(taskID)); err != nil && !cdnerrors.IsFileNotExist(err) { + if err := s.diskDriver.Remove(storage.GetPieceMetadataRaw(taskID)); err != nil && !cdnerrors.IsFileNotExist(err) { return err } if err := s.diskDriver.Remove(storage.GetDownloadRaw(taskID)); err != nil && !cdnerrors.IsFileNotExist(err) { diff --git a/cdn/supervisor/cdn/storage/hybrid/hybrid.go b/cdn/supervisor/cdn/storage/hybrid/hybrid.go index c0eca378488..b65c4416360 100644 --- a/cdn/supervisor/cdn/storage/hybrid/hybrid.go +++ b/cdn/supervisor/cdn/storage/hybrid/hybrid.go @@ -219,7 +219,7 @@ func (h *hybridStorageMgr) ReadDownloadFile(taskID string) (io.ReadCloser, error } func (h *hybridStorageMgr) ReadPieceMetaRecords(taskID string) ([]*storage.PieceMetaRecord, error) { - readBytes, err := h.diskDriver.GetBytes(storage.GetPieceMetaDataRaw(taskID)) + readBytes, err := h.diskDriver.GetBytes(storage.GetPieceMetadataRaw(taskID)) if err != nil { return nil, err } @@ -235,29 +235,29 @@ func (h *hybridStorageMgr) ReadPieceMetaRecords(taskID string) ([]*storage.Piece return result, nil } -func (h *hybridStorageMgr) ReadFileMetaData(taskID string) (*storage.FileMetaData, error) { - readBytes, err := h.diskDriver.GetBytes(storage.GetTaskMetaDataRaw(taskID)) +func (h *hybridStorageMgr) ReadFileMetadata(taskID string) (*storage.FileMetadata, error) { + readBytes, err := h.diskDriver.GetBytes(storage.GetTaskMetadataRaw(taskID)) if err != nil { return nil, errors.Wrapf(err, "get metadata bytes") } - metaData := &storage.FileMetaData{} - if err := json.Unmarshal(readBytes, metaData); err != nil { + metadata := &storage.FileMetadata{} + if err := json.Unmarshal(readBytes, metadata); err != nil { return nil, errors.Wrapf(err, "unmarshal metadata bytes") } - return metaData, nil + return metadata, nil } -func (h *hybridStorageMgr) AppendPieceMetaData(taskID string, record *storage.PieceMetaRecord) error { - return h.diskDriver.PutBytes(storage.GetAppendPieceMetaDataRaw(taskID), []byte(record.String()+"\n")) +func (h *hybridStorageMgr) AppendPieceMetadata(taskID string, record *storage.PieceMetaRecord) error { + return h.diskDriver.PutBytes(storage.GetAppendPieceMetadataRaw(taskID), []byte(record.String()+"\n")) } -func (h *hybridStorageMgr) WriteFileMetaData(taskID string, metaData *storage.FileMetaData) error { - data, err := json.Marshal(metaData) +func (h *hybridStorageMgr) WriteFileMetadata(taskID string, metadata *storage.FileMetadata) error { + data, err := json.Marshal(metadata) if err != nil { return errors.Wrapf(err, "marshal metadata") } - return h.diskDriver.PutBytes(storage.GetTaskMetaDataRaw(taskID), data) + return h.diskDriver.PutBytes(storage.GetTaskMetadataRaw(taskID), data) } func (h *hybridStorageMgr) WritePieceMetaRecords(taskID string, records []*storage.PieceMetaRecord) error { @@ -265,7 +265,7 @@ func (h *hybridStorageMgr) WritePieceMetaRecords(taskID string, records []*stora for i := range records { recordStrings = append(recordStrings, records[i].String()) } - return h.diskDriver.PutBytes(storage.GetPieceMetaDataRaw(taskID), []byte(strings.Join(recordStrings, "\n"))) + return h.diskDriver.PutBytes(storage.GetPieceMetadataRaw(taskID), []byte(strings.Join(recordStrings, "\n"))) } func (h *hybridStorageMgr) CreateUploadLink(taskID string) error { @@ -387,11 +387,11 @@ func (h *hybridStorageMgr) deleteTaskFiles(taskID string, deleteUploadPath bool, return err } // deleteTaskFiles delete files associated with taskID - if err := h.diskDriver.Remove(storage.GetTaskMetaDataRaw(taskID)); err != nil && !cdnerrors.IsFileNotExist(err) { + if err := h.diskDriver.Remove(storage.GetTaskMetadataRaw(taskID)); err != nil && !cdnerrors.IsFileNotExist(err) { return err } // delete piece meta data - if err := h.diskDriver.Remove(storage.GetPieceMetaDataRaw(taskID)); err != nil && !cdnerrors.IsFileNotExist(err) { + if err := h.diskDriver.Remove(storage.GetPieceMetadataRaw(taskID)); err != nil && !cdnerrors.IsFileNotExist(err) { return err } } diff --git a/cdn/supervisor/cdn/storage/mock/mock_storage_mgr.go b/cdn/supervisor/cdn/storage/mock/mock_storage_mgr.go index 0eb044ab209..abe7e95c7f6 100644 --- a/cdn/supervisor/cdn/storage/mock/mock_storage_mgr.go +++ b/cdn/supervisor/cdn/storage/mock/mock_storage_mgr.go @@ -38,18 +38,18 @@ func (m *MockManager) EXPECT() *MockManagerMockRecorder { return m.recorder } -// AppendPieceMetaData mocks base method. -func (m *MockManager) AppendPieceMetaData(arg0 string, arg1 *storage.PieceMetaRecord) error { +// AppendPieceMetadata mocks base method. +func (m *MockManager) AppendPieceMetadata(arg0 string, arg1 *storage.PieceMetaRecord) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "AppendPieceMetaData", arg0, arg1) + ret := m.ctrl.Call(m, "AppendPieceMetadata", arg0, arg1) ret0, _ := ret[0].(error) return ret0 } -// AppendPieceMetaData indicates an expected call of AppendPieceMetaData. -func (mr *MockManagerMockRecorder) AppendPieceMetaData(arg0, arg1 interface{}) *gomock.Call { +// AppendPieceMetadata indicates an expected call of AppendPieceMetadata. +func (mr *MockManagerMockRecorder) AppendPieceMetadata(arg0, arg1 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AppendPieceMetaData", reflect.TypeOf((*MockManager)(nil).AppendPieceMetaData), arg0, arg1) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AppendPieceMetadata", reflect.TypeOf((*MockManager)(nil).AppendPieceMetadata), arg0, arg1) } // CreateUploadLink mocks base method. @@ -107,19 +107,19 @@ func (mr *MockManagerMockRecorder) ReadDownloadFile(arg0 interface{}) *gomock.Ca return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReadDownloadFile", reflect.TypeOf((*MockManager)(nil).ReadDownloadFile), arg0) } -// ReadFileMetaData mocks base method. -func (m *MockManager) ReadFileMetaData(arg0 string) (*storage.FileMetaData, error) { +// ReadFileMetadata mocks base method. +func (m *MockManager) ReadFileMetadata(arg0 string) (*storage.FileMetadata, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ReadFileMetaData", arg0) - ret0, _ := ret[0].(*storage.FileMetaData) + ret := m.ctrl.Call(m, "ReadFileMetadata", arg0) + ret0, _ := ret[0].(*storage.FileMetadata) ret1, _ := ret[1].(error) return ret0, ret1 } -// ReadFileMetaData indicates an expected call of ReadFileMetaData. -func (mr *MockManagerMockRecorder) ReadFileMetaData(arg0 interface{}) *gomock.Call { +// ReadFileMetadata indicates an expected call of ReadFileMetadata. +func (mr *MockManagerMockRecorder) ReadFileMetadata(arg0 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReadFileMetaData", reflect.TypeOf((*MockManager)(nil).ReadFileMetaData), arg0) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReadFileMetadata", reflect.TypeOf((*MockManager)(nil).ReadFileMetadata), arg0) } // ReadPieceMetaRecords mocks base method. @@ -195,18 +195,18 @@ func (mr *MockManagerMockRecorder) WriteDownloadFile(arg0, arg1, arg2, arg3 inte return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WriteDownloadFile", reflect.TypeOf((*MockManager)(nil).WriteDownloadFile), arg0, arg1, arg2, arg3) } -// WriteFileMetaData mocks base method. -func (m *MockManager) WriteFileMetaData(arg0 string, arg1 *storage.FileMetaData) error { +// WriteFileMetadata mocks base method. +func (m *MockManager) WriteFileMetadata(arg0 string, arg1 *storage.FileMetadata) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "WriteFileMetaData", arg0, arg1) + ret := m.ctrl.Call(m, "WriteFileMetadata", arg0, arg1) ret0, _ := ret[0].(error) return ret0 } -// WriteFileMetaData indicates an expected call of WriteFileMetaData. -func (mr *MockManagerMockRecorder) WriteFileMetaData(arg0, arg1 interface{}) *gomock.Call { +// WriteFileMetadata indicates an expected call of WriteFileMetadata. +func (mr *MockManagerMockRecorder) WriteFileMetadata(arg0, arg1 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WriteFileMetaData", reflect.TypeOf((*MockManager)(nil).WriteFileMetaData), arg0, arg1) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WriteFileMetadata", reflect.TypeOf((*MockManager)(nil).WriteFileMetadata), arg0, arg1) } // WritePieceMetaRecords mocks base method. diff --git a/cdn/supervisor/cdn/storage/path_util.go b/cdn/supervisor/cdn/storage/path_util.go index 09e5016605d..d87cf06ba0c 100644 --- a/cdn/supervisor/cdn/storage/path_util.go +++ b/cdn/supervisor/cdn/storage/path_util.go @@ -35,11 +35,11 @@ func getDownloadKey(taskID string) string { return path.Join(getParentKey(taskID), taskID) } -func getTaskMetaDataKey(taskID string) string { +func getTaskMetadataKey(taskID string) string { return path.Join(getParentKey(taskID), taskID+".meta") } -func getPieceMetaDataKey(taskID string) string { +func getPieceMetadataKey(taskID string) string { return path.Join(getParentKey(taskID), taskID+".piece") } @@ -61,25 +61,25 @@ func GetUploadRaw(taskID string) *storedriver.Raw { } } -func GetTaskMetaDataRaw(taskID string) *storedriver.Raw { +func GetTaskMetadataRaw(taskID string) *storedriver.Raw { return &storedriver.Raw{ Bucket: DownloadHome, - Key: getTaskMetaDataKey(taskID), + Key: getTaskMetadataKey(taskID), Trunc: true, } } -func GetPieceMetaDataRaw(taskID string) *storedriver.Raw { +func GetPieceMetadataRaw(taskID string) *storedriver.Raw { return &storedriver.Raw{ Bucket: DownloadHome, - Key: getPieceMetaDataKey(taskID), + Key: getPieceMetadataKey(taskID), } } -func GetAppendPieceMetaDataRaw(taskID string) *storedriver.Raw { +func GetAppendPieceMetadataRaw(taskID string) *storedriver.Raw { return &storedriver.Raw{ Bucket: DownloadHome, - Key: getPieceMetaDataKey(taskID), + Key: getPieceMetadataKey(taskID), Append: true, } } diff --git a/cdn/supervisor/cdn/storage/storage.go b/cdn/supervisor/cdn/storage/storage.go index ddd9271fff3..bc4dddf7798 100644 --- a/cdn/supervisor/cdn/storage/storage.go +++ b/cdn/supervisor/cdn/storage/storage.go @@ -55,17 +55,17 @@ type Manager interface { // CreateUploadLink create a upload link to download file CreateUploadLink(taskID string) error - // ReadFileMetaData return meta data of download file - ReadFileMetaData(taskID string) (*FileMetaData, error) + // ReadFileMetadata return meta data of download file + ReadFileMetadata(taskID string) (*FileMetadata, error) - // WriteFileMetaData write file meta to storage - WriteFileMetaData(taskID string, meta *FileMetaData) error + // WriteFileMetadata write file meta to storage + WriteFileMetadata(taskID string, meta *FileMetadata) error // WritePieceMetaRecords write piece meta records to storage WritePieceMetaRecords(taskID string, metaRecords []*PieceMetaRecord) error - // AppendPieceMetaData append piece meta data to storage - AppendPieceMetaData(taskID string, metaRecord *PieceMetaRecord) error + // AppendPieceMetadata append piece meta data to storage + AppendPieceMetadata(taskID string, metaRecord *PieceMetaRecord) error // ReadPieceMetaRecords read piece meta records from storage ReadPieceMetaRecords(taskID string) ([]*PieceMetaRecord, error) @@ -77,8 +77,8 @@ type Manager interface { TryFreeSpace(fileLength int64) (bool, error) } -// FileMetaData meta data of task -type FileMetaData struct { +// FileMetadata meta data of task +type FileMetadata struct { TaskID string `json:"taskId"` TaskURL string `json:"taskUrl"` PieceSize int32 `json:"pieceSize"` @@ -92,7 +92,7 @@ type FileMetaData struct { Finish bool `json:"finish"` Success bool `json:"success"` TotalPieceCount int32 `json:"totalPieceCount"` - //PieceMetaDataSign string `json:"pieceMetaDataSign"` + //PieceMetadataSign string `json:"pieceMetadataSign"` } // PieceMetaRecord meta data of piece @@ -185,20 +185,20 @@ func (m *managerPlugin) CreateUploadLink(taskID string) error { return m.instance.CreateUploadLink(taskID) } -func (m *managerPlugin) ReadFileMetaData(taskID string) (*FileMetaData, error) { - return m.instance.ReadFileMetaData(taskID) +func (m *managerPlugin) ReadFileMetadata(taskID string) (*FileMetadata, error) { + return m.instance.ReadFileMetadata(taskID) } -func (m *managerPlugin) WriteFileMetaData(taskID string, data *FileMetaData) error { - return m.instance.WriteFileMetaData(taskID, data) +func (m *managerPlugin) WriteFileMetadata(taskID string, data *FileMetadata) error { + return m.instance.WriteFileMetadata(taskID, data) } func (m *managerPlugin) WritePieceMetaRecords(taskID string, records []*PieceMetaRecord) error { return m.instance.WritePieceMetaRecords(taskID, records) } -func (m *managerPlugin) AppendPieceMetaData(taskID string, record *PieceMetaRecord) error { - return m.instance.AppendPieceMetaData(taskID, record) +func (m *managerPlugin) AppendPieceMetadata(taskID string, record *PieceMetaRecord) error { + return m.instance.AppendPieceMetadata(taskID, record) } func (m *managerPlugin) ReadPieceMetaRecords(taskID string) ([]*PieceMetaRecord, error) { diff --git a/cdn/supervisor/cdn/storage/storage_gc.go b/cdn/supervisor/cdn/storage/storage_gc.go index 6e36eeaac97..80c9e736750 100644 --- a/cdn/supervisor/cdn/storage/storage_gc.go +++ b/cdn/supervisor/cdn/storage/storage_gc.go @@ -111,15 +111,15 @@ func (cleaner *Cleaner) GC(storagePattern string, force bool) ([]string, error) return nil } - metaData, err := cleaner.storageMgr.ReadFileMetaData(taskID) - if err != nil || metaData == nil { + metadata, err := cleaner.storageMgr.ReadFileMetadata(taskID) + if err != nil || metadata == nil { logger.GcLogger.With("type", storagePattern).Debugf("taskID: %s, failed to get metadata: %v", taskID, err) gcTaskIDs = append(gcTaskIDs, taskID) return nil } // put taskId into gapTasks or intervalTasks which will sort by some rules - if err := cleaner.sortInert(gapTasks, intervalTasks, metaData); err != nil { - logger.GcLogger.With("type", storagePattern).Errorf("failed to parse inert metaData(%#v): %v", metaData, err) + if err := cleaner.sortInert(gapTasks, intervalTasks, metadata); err != nil { + logger.GcLogger.With("type", storagePattern).Errorf("failed to parse inert metadata(%#v): %v", metadata, err) } return nil @@ -138,12 +138,12 @@ func (cleaner *Cleaner) GC(storagePattern string, force bool) ([]string, error) return gcTaskIDs, nil } -func (cleaner *Cleaner) sortInert(gapTasks, intervalTasks *treemap.Map, metaData *FileMetaData) error { - gap := timeutils.CurrentTimeMillis() - metaData.AccessTime +func (cleaner *Cleaner) sortInert(gapTasks, intervalTasks *treemap.Map, metadata *FileMetadata) error { + gap := timeutils.CurrentTimeMillis() - metadata.AccessTime - if metaData.Interval > 0 && - gap <= metaData.Interval+(int64(cleaner.cfg.IntervalThreshold.Seconds())*int64(time.Millisecond)) { - info, err := cleaner.storageMgr.StatDownloadFile(metaData.TaskID) + if metadata.Interval > 0 && + gap <= metadata.Interval+(int64(cleaner.cfg.IntervalThreshold.Seconds())*int64(time.Millisecond)) { + info, err := cleaner.storageMgr.StatDownloadFile(metadata.TaskID) if err != nil { return err } @@ -153,7 +153,7 @@ func (cleaner *Cleaner) sortInert(gapTasks, intervalTasks *treemap.Map, metaData v = make([]string, 0) } tasks := v.([]string) - tasks = append(tasks, metaData.TaskID) + tasks = append(tasks, metadata.TaskID) intervalTasks.Put(info.Size, tasks) return nil } @@ -163,7 +163,7 @@ func (cleaner *Cleaner) sortInert(gapTasks, intervalTasks *treemap.Map, metaData v = make([]string, 0) } tasks := v.([]string) - tasks = append(tasks, metaData.TaskID) + tasks = append(tasks, metadata.TaskID) gapTasks.Put(gap, tasks) return nil } diff --git a/cdn/supervisor/progress/manager.go b/cdn/supervisor/progress/manager.go index dc4eabe4f4c..34777d2e5c0 100644 --- a/cdn/supervisor/progress/manager.go +++ b/cdn/supervisor/progress/manager.go @@ -96,14 +96,14 @@ func (pm *Manager) WatchSeedProgress(ctx context.Context, taskID string) (<-chan if err != nil { return nil, fmt.Errorf("get seed subscribers: %v", err) } - pieceMetaDataRecords, err := pm.getPieceMetaRecordsByTaskID(taskID) + pieceMetadataRecords, err := pm.getPieceMetaRecordsByTaskID(taskID) if err != nil { return nil, fmt.Errorf("get piece meta records by taskID: %v", err) } ch := make(chan *types.SeedPiece, pm.buffer) ele := chanList.PushBack(ch) go func(seedCh chan *types.SeedPiece, ele *list.Element) { - for _, pieceMetaRecord := range pieceMetaDataRecords { + for _, pieceMetaRecord := range pieceMetadataRecords { logger.Debugf("seed piece meta record %#v", pieceMetaRecord) select { case seedCh <- pieceMetaRecord: diff --git a/client/daemon/peer/peertask_file_callback.go b/client/daemon/peer/peertask_file_callback.go index 54494a7517b..369846a5da1 100644 --- a/client/daemon/peer/peertask_file_callback.go +++ b/client/daemon/peer/peertask_file_callback.go @@ -64,7 +64,7 @@ func (p *filePeerTaskCallback) Update(pt Task) error { // update storage err := p.ptm.storageManager.UpdateTask(p.pt.ctx, &storage.UpdateTaskRequest{ - PeerTaskMetaData: storage.PeerTaskMetaData{ + PeerTaskMetadata: storage.PeerTaskMetadata{ PeerID: pt.GetPeerID(), TaskID: pt.GetTaskID(), }, @@ -157,7 +157,7 @@ func (p *filePeerTaskCallback) ValidateDigest(pt Task) error { return nil } err := p.ptm.storageManager.ValidateDigest( - &storage.PeerTaskMetaData{ + &storage.PeerTaskMetadata{ PeerID: pt.GetPeerID(), TaskID: pt.GetTaskID(), }) diff --git a/client/daemon/peer/peertask_manager.go b/client/daemon/peer/peertask_manager.go index 62cb28a10da..c26b03d0f50 100644 --- a/client/daemon/peer/peertask_manager.go +++ b/client/daemon/peer/peertask_manager.go @@ -281,11 +281,11 @@ func (ptm *peerTaskManager) storeTinyPeerTask(ctx context.Context, tiny *TinyDat } n, err := ptm.storageManager.WritePiece(ctx, &storage.WritePieceRequest{ - PeerTaskMetaData: storage.PeerTaskMetaData{ + PeerTaskMetadata: storage.PeerTaskMetadata{ PeerID: tiny.PeerID, TaskID: tiny.TaskID, }, - PieceMetaData: storage.PieceMetaData{ + PieceMetadata: storage.PieceMetadata{ Num: 0, Md5: "", Offset: 0, diff --git a/client/daemon/peer/peertask_reuse.go b/client/daemon/peer/peertask_reuse.go index de800af5c6f..43ced0afc77 100644 --- a/client/daemon/peer/peertask_reuse.go +++ b/client/daemon/peer/peertask_reuse.go @@ -123,7 +123,7 @@ func (ptm *peerTaskManager) tryReuseStreamPeerTask(ctx context.Context, span.SetAttributes(semconv.HTTPURLKey.String(request.Url)) defer span.End() - rc, err := ptm.storageManager.ReadAllPieces(ctx, &reuse.PeerTaskMetaData) + rc, err := ptm.storageManager.ReadAllPieces(ctx, &reuse.PeerTaskMetadata) if err != nil { log.Errorf("read all pieces error when reuse peer task: %s", err) span.SetAttributes(config.AttributePeerTaskSuccess.Bool(false)) diff --git a/client/daemon/peer/peertask_stream.go b/client/daemon/peer/peertask_stream.go index 16839db5890..fcb22f7ecc3 100644 --- a/client/daemon/peer/peertask_stream.go +++ b/client/daemon/peer/peertask_stream.go @@ -355,11 +355,11 @@ func (s *streamPeerTask) SetTotalPieces(i int32) { func (s *streamPeerTask) writeOnePiece(w io.Writer, pieceNum int32) (int64, error) { pr, pc, err := s.pieceManager.ReadPiece(s.ctx, &storage.ReadPieceRequest{ - PeerTaskMetaData: storage.PeerTaskMetaData{ + PeerTaskMetadata: storage.PeerTaskMetadata{ PeerID: s.peerID, TaskID: s.taskID, }, - PieceMetaData: storage.PieceMetaData{ + PieceMetadata: storage.PieceMetadata{ Num: pieceNum, }, }) diff --git a/client/daemon/peer/peertask_stream_callback.go b/client/daemon/peer/peertask_stream_callback.go index 2b7b4e35b3e..4170dce40ae 100644 --- a/client/daemon/peer/peertask_stream_callback.go +++ b/client/daemon/peer/peertask_stream_callback.go @@ -63,7 +63,7 @@ func (p *streamPeerTaskCallback) Update(pt Task) error { // update storage err := p.ptm.storageManager.UpdateTask(p.pt.ctx, &storage.UpdateTaskRequest{ - PeerTaskMetaData: storage.PeerTaskMetaData{ + PeerTaskMetadata: storage.PeerTaskMetadata{ PeerID: pt.GetPeerID(), TaskID: pt.GetTaskID(), }, @@ -155,7 +155,7 @@ func (p *streamPeerTaskCallback) ValidateDigest(pt Task) error { return nil } err := p.ptm.storageManager.ValidateDigest( - &storage.PeerTaskMetaData{ + &storage.PeerTaskMetadata{ PeerID: pt.GetPeerID(), TaskID: pt.GetTaskID(), }) diff --git a/client/daemon/peer/piece_manager.go b/client/daemon/peer/piece_manager.go index 06c1a475c70..fb891844e84 100644 --- a/client/daemon/peer/piece_manager.go +++ b/client/daemon/peer/piece_manager.go @@ -164,11 +164,11 @@ func (pm *pieceManager) DownloadPiece(ctx context.Context, pt Task, request *Dow // 2. save to storage var n int64 n, err = pm.storageManager.WritePiece(ctx, &storage.WritePieceRequest{ - PeerTaskMetaData: storage.PeerTaskMetaData{ + PeerTaskMetadata: storage.PeerTaskMetadata{ PeerID: pt.GetPeerID(), TaskID: pt.GetTaskID(), }, - PieceMetaData: storage.PieceMetaData{ + PieceMetadata: storage.PieceMetadata{ Num: request.piece.PieceNum, Md5: request.piece.PieceMd5, Offset: request.piece.PieceOffset, @@ -300,11 +300,11 @@ func (pm *pieceManager) processPieceFromSource(pt Task, pt.Context(), &storage.WritePieceRequest{ UnknownLength: unknownLength, - PeerTaskMetaData: storage.PeerTaskMetaData{ + PeerTaskMetadata: storage.PeerTaskMetadata{ PeerID: pt.GetPeerID(), TaskID: pt.GetTaskID(), }, - PieceMetaData: storage.PieceMetaData{ + PieceMetadata: storage.PieceMetadata{ Num: pieceNum, // storage manager will get digest from DigestReader, keep empty here is ok Md5: "", @@ -360,7 +360,7 @@ func (pm *pieceManager) DownloadSource(ctx context.Context, pt Task, request *sc } else { err = pm.storageManager.UpdateTask(ctx, &storage.UpdateTaskRequest{ - PeerTaskMetaData: storage.PeerTaskMetaData{ + PeerTaskMetadata: storage.PeerTaskMetadata{ PeerID: pt.GetPeerID(), TaskID: pt.GetTaskID(), }, @@ -407,7 +407,7 @@ func (pm *pieceManager) DownloadSource(ctx context.Context, pt Task, request *sc contentLength = int64(pieceNum)*int64(pieceSize) + n if err := pm.storageManager.UpdateTask(ctx, &storage.UpdateTaskRequest{ - PeerTaskMetaData: storage.PeerTaskMetaData{ + PeerTaskMetadata: storage.PeerTaskMetadata{ PeerID: pt.GetPeerID(), TaskID: pt.GetTaskID(), }, @@ -446,7 +446,7 @@ func (pm *pieceManager) DownloadSource(ctx context.Context, pt Task, request *sc if err := pm.storageManager.UpdateTask(ctx, &storage.UpdateTaskRequest{ - PeerTaskMetaData: storage.PeerTaskMetaData{ + PeerTaskMetadata: storage.PeerTaskMetadata{ PeerID: pt.GetPeerID(), TaskID: pt.GetTaskID(), }, diff --git a/client/daemon/storage/const.go b/client/daemon/storage/const.go index 668dbb2d348..7e0ff3318ee 100644 --- a/client/daemon/storage/const.go +++ b/client/daemon/storage/const.go @@ -24,7 +24,7 @@ import ( const ( taskData = "data" - taskMetaData = "metadata" + taskMetadata = "metadata" defaultFileMode = os.FileMode(0644) defaultDirectoryMode = os.FileMode(0755) diff --git a/client/daemon/storage/local_storage.go b/client/daemon/storage/local_storage.go index d86e00d5aae..4e29f19fbea 100644 --- a/client/daemon/storage/local_storage.go +++ b/client/daemon/storage/local_storage.go @@ -126,11 +126,11 @@ func (t *localTaskStore) WritePiece(ctx context.Context, req *WritePieceRequest) } } // when Md5 is empty, try to get md5 from reader, it's useful for back source - if req.PieceMetaData.Md5 == "" { + if req.PieceMetadata.Md5 == "" { t.Warnf("piece md5 not found in metadata, read from reader") if get, ok := req.Reader.(digestutils.DigestReader); ok { - req.PieceMetaData.Md5 = get.Digest() - t.Infof("read md5 from reader, value: %s", req.PieceMetaData.Md5) + req.PieceMetadata.Md5 = get.Digest() + t.Infof("read md5 from reader, value: %s", req.PieceMetadata.Md5) } else { t.Warnf("reader is not a DigestReader") } @@ -143,7 +143,7 @@ func (t *localTaskStore) WritePiece(ctx context.Context, req *WritePieceRequest) if _, ok := t.Pieces[req.Num]; ok { return n, nil } - t.Pieces[req.Num] = req.PieceMetaData + t.Pieces[req.Num] = req.PieceMetadata return n, nil } @@ -171,7 +171,7 @@ func (t *localTaskStore) UpdateTask(ctx context.Context, req *UpdateTaskRequest) return nil } -func (t *localTaskStore) ValidateDigest(*PeerTaskMetaData) error { +func (t *localTaskStore) ValidateDigest(*PeerTaskMetadata) error { t.Lock() defer t.Unlock() if t.persistentMetadata.PieceMd5Sign == "" { @@ -196,7 +196,7 @@ func (t *localTaskStore) ValidateDigest(*PeerTaskMetaData) error { return nil } -func (t *localTaskStore) IsInvalid(*PeerTaskMetaData) (bool, error) { +func (t *localTaskStore) IsInvalid(*PeerTaskMetadata) (bool, error) { return t.invalid.Load(), nil } @@ -236,7 +236,7 @@ func (t *localTaskStore) ReadPiece(ctx context.Context, req *ReadPieceRequest) ( return io.LimitReader(file, req.Range.Length), file, nil } -func (t *localTaskStore) ReadAllPieces(ctx context.Context, req *PeerTaskMetaData) (io.ReadCloser, error) { +func (t *localTaskStore) ReadAllPieces(ctx context.Context, req *PeerTaskMetadata) (io.ReadCloser, error) { if t.invalid.Load() { t.Errorf("invalid digest, refuse to read all pieces") return nil, ErrInvalidDigest diff --git a/client/daemon/storage/local_storage_test.go b/client/daemon/storage/local_storage_test.go index 1b4b44ec4f1..c539b1db930 100644 --- a/client/daemon/storage/local_storage_test.go +++ b/client/daemon/storage/local_storage_test.go @@ -81,7 +81,7 @@ func TestLocalTaskStore_PutAndGetPiece_Simple(t *testing.T) { ContentLength: int64(len(testBytes)), }) assert.Nil(err, "create task storage") - ts, ok := s.LoadTask(PeerTaskMetaData{ + ts, ok := s.LoadTask(PeerTaskMetadata{ PeerID: peerID, TaskID: taskID, }) @@ -114,10 +114,10 @@ func TestLocalTaskStore_PutAndGetPiece_Simple(t *testing.T) { // random put all pieces for _, p := range pieces { _, err = ts.WritePiece(context.Background(), &WritePieceRequest{ - PeerTaskMetaData: PeerTaskMetaData{ + PeerTaskMetadata: PeerTaskMetadata{ TaskID: taskID, }, - PieceMetaData: PieceMetaData{ + PieceMetadata: PieceMetadata{ Num: int32(p.index), Md5: "", Offset: uint64(p.start), @@ -140,10 +140,10 @@ func TestLocalTaskStore_PutAndGetPiece_Simple(t *testing.T) { rand.Shuffle(len(pieces), func(i, j int) { pieces[i], pieces[j] = pieces[j], pieces[i] }) for _, p := range pieces { rd, cl, err := ts.ReadPiece(context.Background(), &ReadPieceRequest{ - PeerTaskMetaData: PeerTaskMetaData{ + PeerTaskMetadata: PeerTaskMetadata{ TaskID: taskID, }, - PieceMetaData: PieceMetaData{ + PieceMetadata: PieceMetadata{ Num: int32(p.index), Md5: "", Offset: uint64(p.start), @@ -254,7 +254,7 @@ func TestLocalTaskStore_PutAndGetPiece_Advance(t *testing.T) { ContentLength: int64(len(testBytes)), }) assert.Nil(err, "create task storage") - ts, ok := s.LoadTask(PeerTaskMetaData{ + ts, ok := s.LoadTask(PeerTaskMetadata{ PeerID: peerID, TaskID: taskID, }) @@ -287,10 +287,10 @@ func TestLocalTaskStore_PutAndGetPiece_Advance(t *testing.T) { // random put all pieces for _, p := range pieces { _, err = ts.WritePiece(context.Background(), &WritePieceRequest{ - PeerTaskMetaData: PeerTaskMetaData{ + PeerTaskMetadata: PeerTaskMetadata{ TaskID: taskID, }, - PieceMetaData: PieceMetaData{ + PieceMetadata: PieceMetadata{ Num: int32(p.index), Md5: "", Offset: uint64(p.start), @@ -313,10 +313,10 @@ func TestLocalTaskStore_PutAndGetPiece_Advance(t *testing.T) { rand.Shuffle(len(pieces), func(i, j int) { pieces[i], pieces[j] = pieces[j], pieces[i] }) for _, p := range pieces { rd, cl, err := ts.ReadPiece(context.Background(), &ReadPieceRequest{ - PeerTaskMetaData: PeerTaskMetaData{ + PeerTaskMetadata: PeerTaskMetadata{ TaskID: taskID, }, - PieceMetaData: PieceMetaData{ + PieceMetadata: PieceMetadata{ Num: int32(p.index), Md5: "", Offset: uint64(p.start), diff --git a/client/daemon/storage/metadata.go b/client/daemon/storage/metadata.go index 9e594c2d4ae..38a9ea14179 100644 --- a/client/daemon/storage/metadata.go +++ b/client/daemon/storage/metadata.go @@ -30,18 +30,18 @@ type persistentMetadata struct { ContentLength int64 `json:"contentLength"` TotalPieces int32 `json:"totalPieces"` PeerID string `json:"peerID"` - Pieces map[int32]PieceMetaData `json:"pieces"` + Pieces map[int32]PieceMetadata `json:"pieces"` PieceMd5Sign string `json:"pieceMd5Sign"` DataFilePath string `json:"dataFilePath"` Done bool `json:"done"` } -type PeerTaskMetaData struct { +type PeerTaskMetadata struct { PeerID string `json:"peerID,omitempty"` TaskID string `json:"taskID,omitempty"` } -type PieceMetaData struct { +type PieceMetadata struct { Num int32 `json:"num,omitempty"` Md5 string `json:"md5,omitempty"` Offset uint64 `json:"offset,omitempty"` @@ -63,8 +63,8 @@ type RegisterTaskRequest struct { } type WritePieceRequest struct { - PeerTaskMetaData - PieceMetaData + PeerTaskMetadata + PieceMetadata UnknownLength bool Reader io.Reader } @@ -77,12 +77,12 @@ type StoreRequest struct { } type ReadPieceRequest struct { - PeerTaskMetaData - PieceMetaData + PeerTaskMetadata + PieceMetadata } type UpdateTaskRequest struct { - PeerTaskMetaData + PeerTaskMetadata ContentLength int64 TotalPieces int32 PieceMd5Sign string diff --git a/client/daemon/storage/storage_manager.go b/client/daemon/storage/storage_manager.go index b6b96262d55..2f8926e9ea4 100644 --- a/client/daemon/storage/storage_manager.go +++ b/client/daemon/storage/storage_manager.go @@ -52,7 +52,7 @@ type TaskStorageDriver interface { // If req.Num is equal to -1, range has a fixed value. ReadPiece(ctx context.Context, req *ReadPieceRequest) (io.Reader, io.Closer, error) - ReadAllPieces(ctx context.Context, req *PeerTaskMetaData) (io.ReadCloser, error) + ReadAllPieces(ctx context.Context, req *PeerTaskMetadata) (io.ReadCloser, error) GetPieces(ctx context.Context, req *base.PieceTaskRequest) (*base.PiecePacket, error) @@ -61,9 +61,9 @@ type TaskStorageDriver interface { // Store stores task data to the target path Store(ctx context.Context, req *StoreRequest) error - ValidateDigest(req *PeerTaskMetaData) error + ValidateDigest(req *PeerTaskMetadata) error - IsInvalid(req *PeerTaskMetaData) (bool, error) + IsInvalid(req *PeerTaskMetadata) (bool, error) } // Reclaimer stands storage reclaimer @@ -114,7 +114,7 @@ type storageManager struct { storeStrategy config.StoreStrategy storeOption *config.StorageOption tasks sync.Map - markedReclaimTasks []PeerTaskMetaData + markedReclaimTasks []PeerTaskMetadata dataPathStat *syscall.Stat_t gcCallback func(CommonTaskRequest) @@ -186,7 +186,7 @@ func WithStorageOption(opt *config.StorageOption) func(*storageManager) error { func (s *storageManager) RegisterTask(ctx context.Context, req RegisterTaskRequest) error { if _, ok := s.LoadTask( - PeerTaskMetaData{ + PeerTaskMetadata{ PeerID: req.PeerID, TaskID: req.TaskID, }); !ok { @@ -195,7 +195,7 @@ func (s *storageManager) RegisterTask(ctx context.Context, req RegisterTaskReque s.Lock() defer s.Unlock() if _, ok := s.LoadTask( - PeerTaskMetaData{ + PeerTaskMetadata{ PeerID: req.PeerID, TaskID: req.TaskID, }); ok { @@ -209,7 +209,7 @@ func (s *storageManager) RegisterTask(ctx context.Context, req RegisterTaskReque func (s *storageManager) WritePiece(ctx context.Context, req *WritePieceRequest) (int64, error) { t, ok := s.LoadTask( - PeerTaskMetaData{ + PeerTaskMetadata{ PeerID: req.PeerID, TaskID: req.TaskID, }) @@ -221,7 +221,7 @@ func (s *storageManager) WritePiece(ctx context.Context, req *WritePieceRequest) func (s *storageManager) ReadPiece(ctx context.Context, req *ReadPieceRequest) (io.Reader, io.Closer, error) { t, ok := s.LoadTask( - PeerTaskMetaData{ + PeerTaskMetadata{ PeerID: req.PeerID, TaskID: req.TaskID, }) @@ -232,9 +232,9 @@ func (s *storageManager) ReadPiece(ctx context.Context, req *ReadPieceRequest) ( return t.(TaskStorageDriver).ReadPiece(ctx, req) } -func (s *storageManager) ReadAllPieces(ctx context.Context, req *PeerTaskMetaData) (io.ReadCloser, error) { +func (s *storageManager) ReadAllPieces(ctx context.Context, req *PeerTaskMetadata) (io.ReadCloser, error) { t, ok := s.LoadTask( - PeerTaskMetaData{ + PeerTaskMetadata{ PeerID: req.PeerID, TaskID: req.TaskID, }) @@ -247,7 +247,7 @@ func (s *storageManager) ReadAllPieces(ctx context.Context, req *PeerTaskMetaDat func (s *storageManager) Store(ctx context.Context, req *StoreRequest) error { t, ok := s.LoadTask( - PeerTaskMetaData{ + PeerTaskMetadata{ PeerID: req.PeerID, TaskID: req.TaskID, }) @@ -260,7 +260,7 @@ func (s *storageManager) Store(ctx context.Context, req *StoreRequest) error { func (s *storageManager) GetPieces(ctx context.Context, req *base.PieceTaskRequest) (*base.PiecePacket, error) { t, ok := s.LoadTask( - PeerTaskMetaData{ + PeerTaskMetadata{ TaskID: req.TaskId, PeerID: req.DstPid, }) @@ -270,7 +270,7 @@ func (s *storageManager) GetPieces(ctx context.Context, req *base.PieceTaskReque return t.(TaskStorageDriver).GetPieces(ctx, req) } -func (s *storageManager) LoadTask(meta PeerTaskMetaData) (TaskStorageDriver, bool) { +func (s *storageManager) LoadTask(meta PeerTaskMetadata) (TaskStorageDriver, bool) { s.Keep() d, ok := s.tasks.Load(meta) if !ok { @@ -281,7 +281,7 @@ func (s *storageManager) LoadTask(meta PeerTaskMetaData) (TaskStorageDriver, boo func (s *storageManager) UpdateTask(ctx context.Context, req *UpdateTaskRequest) error { t, ok := s.LoadTask( - PeerTaskMetaData{ + PeerTaskMetadata{ TaskID: req.TaskID, PeerID: req.PeerID, }) @@ -305,11 +305,11 @@ func (s *storageManager) CreateTask(req RegisterTaskRequest) error { TotalPieces: req.TotalPieces, PieceMd5Sign: req.PieceMd5Sign, PeerID: req.PeerID, - Pieces: map[int32]PieceMetaData{}, + Pieces: map[int32]PieceMetadata{}, }, gcCallback: s.gcCallback, dataDir: dataDir, - metadataFilePath: path.Join(dataDir, taskMetaData), + metadataFilePath: path.Join(dataDir, taskMetadata), expireTime: s.storeOption.TaskExpireTime.Duration, SugaredLoggerOnWith: logger.With("task", req.TaskID, "peer", req.PeerID, "component", "localTaskStore"), @@ -373,7 +373,7 @@ func (s *storageManager) CreateTask(req RegisterTaskRequest) error { } } s.tasks.Store( - PeerTaskMetaData{ + PeerTaskMetadata{ PeerID: req.PeerID, TaskID: req.TaskID, }, t) @@ -411,7 +411,7 @@ func (s *storageManager) FindCompletedTask(taskID string) *ReusePeerTask { continue } return &ReusePeerTask{ - PeerTaskMetaData: PeerTaskMetaData{ + PeerTaskMetadata: PeerTaskMetadata{ PeerID: t.PeerID, TaskID: taskID, }, @@ -441,9 +441,9 @@ func (s *storageManager) cleanIndex(taskID, peerID string) { s.indexTask2PeerTask[taskID] = remain } -func (s *storageManager) ValidateDigest(req *PeerTaskMetaData) error { +func (s *storageManager) ValidateDigest(req *PeerTaskMetadata) error { t, ok := s.LoadTask( - PeerTaskMetaData{ + PeerTaskMetadata{ TaskID: req.TaskID, PeerID: req.PeerID, }) @@ -453,9 +453,9 @@ func (s *storageManager) ValidateDigest(req *PeerTaskMetaData) error { return t.(TaskStorageDriver).ValidateDigest(req) } -func (s *storageManager) IsInvalid(req *PeerTaskMetaData) (bool, error) { +func (s *storageManager) IsInvalid(req *PeerTaskMetadata) (bool, error) { t, ok := s.LoadTask( - PeerTaskMetaData{ + PeerTaskMetadata{ TaskID: req.TaskID, PeerID: req.PeerID, }) @@ -488,7 +488,7 @@ func (s *storageManager) ReloadPersistentTask(gcCallback GCCallback) error { dataDir := path.Join(s.storeOption.DataPath, taskID, peerID) t := &localTaskStore{ dataDir: dataDir, - metadataFilePath: path.Join(dataDir, taskMetaData), + metadataFilePath: path.Join(dataDir, taskMetadata), expireTime: s.storeOption.TaskExpireTime.Duration, gcCallback: gcCallback, SugaredLoggerOnWith: logger.With("task", taskID, "peer", peerID, "component", s.storeStrategy), @@ -520,7 +520,7 @@ func (s *storageManager) ReloadPersistentTask(gcCallback GCCallback) error { } logger.Debugf("load task %s/%s from disk, metadata %s, last access: %s, expire time: %s", t.persistentMetadata.TaskID, t.persistentMetadata.PeerID, t.metadataFilePath, t.lastAccess, t.expireTime) - s.tasks.Store(PeerTaskMetaData{ + s.tasks.Store(PeerTaskMetadata{ PeerID: peerID, TaskID: taskID, }, t) @@ -537,10 +537,10 @@ func (s *storageManager) ReloadPersistentTask(gcCallback GCCallback) error { // remove load error peer tasks for _, dir := range loadErrDirs { // remove metadata - if err = os.Remove(path.Join(dir, taskMetaData)); err != nil { - logger.Warnf("remove load error file %s error: %s", path.Join(dir, taskMetaData), err) + if err = os.Remove(path.Join(dir, taskMetadata)); err != nil { + logger.Warnf("remove load error file %s error: %s", path.Join(dir, taskMetadata), err) } else { - logger.Warnf("remove load error file %s ok", path.Join(dir, taskMetaData)) + logger.Warnf("remove load error file %s ok", path.Join(dir, taskMetadata)) } // remove data @@ -579,17 +579,17 @@ func (s *storageManager) ReloadPersistentTask(gcCallback GCCallback) error { } func (s *storageManager) TryGC() (bool, error) { - var markedTasks []PeerTaskMetaData + var markedTasks []PeerTaskMetadata var totalNotMarkedSize int64 s.tasks.Range(func(key, task interface{}) bool { if task.(*localTaskStore).CanReclaim() { task.(*localTaskStore).MarkReclaim() - markedTasks = append(markedTasks, key.(PeerTaskMetaData)) + markedTasks = append(markedTasks, key.(PeerTaskMetadata)) } else { // just calculate not reclaimed task totalNotMarkedSize += task.(*localTaskStore).ContentLength logger.Debugf("task %s/%s not reach gc time", - key.(PeerTaskMetaData).TaskID, key.(PeerTaskMetaData).PeerID) + key.(PeerTaskMetadata).TaskID, key.(PeerTaskMetadata).PeerID) } return true }) @@ -610,7 +610,7 @@ func (s *storageManager) TryGC() (bool, error) { }) for _, task := range tasks { task.MarkReclaim() - markedTasks = append(markedTasks, PeerTaskMetaData{task.PeerID, task.TaskID}) + markedTasks = append(markedTasks, PeerTaskMetadata{task.PeerID, task.TaskID}) logger.Infof("quota threshold reached, mark task %s/%s reclaimed, last access: %s, size: %s", task.TaskID, task.PeerID, time.Unix(0, task.lastAccess.Load()).Format(time.RFC3339Nano), units.BytesSize(float64(task.ContentLength))) @@ -661,7 +661,7 @@ func (s *storageManager) CleanUp() { func (s *storageManager) forceGC() (bool, error) { s.tasks.Range(func(key, task interface{}) bool { - meta := key.(PeerTaskMetaData) + meta := key.(PeerTaskMetadata) s.tasks.Delete(meta) s.cleanIndex(meta.TaskID, meta.PeerID) task.(*localTaskStore).MarkReclaim() diff --git a/client/daemon/test/mock/storage/manager.go b/client/daemon/test/mock/storage/manager.go index 7c03c8c494b..7075a756e8a 100644 --- a/client/daemon/test/mock/storage/manager.go +++ b/client/daemon/test/mock/storage/manager.go @@ -54,7 +54,7 @@ func (mr *MockTaskStorageDriverMockRecorder) GetPieces(ctx, req interface{}) *go } // IsInvalid mocks base method. -func (m *MockTaskStorageDriver) IsInvalid(req *storage.PeerTaskMetaData) (bool, error) { +func (m *MockTaskStorageDriver) IsInvalid(req *storage.PeerTaskMetadata) (bool, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "IsInvalid", req) ret0, _ := ret[0].(bool) @@ -69,7 +69,7 @@ func (mr *MockTaskStorageDriverMockRecorder) IsInvalid(req interface{}) *gomock. } // ReadAllPieces mocks base method. -func (m *MockTaskStorageDriver) ReadAllPieces(ctx context.Context, req *storage.PeerTaskMetaData) (io.ReadCloser, error) { +func (m *MockTaskStorageDriver) ReadAllPieces(ctx context.Context, req *storage.PeerTaskMetadata) (io.ReadCloser, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "ReadAllPieces", ctx, req) ret0, _ := ret[0].(io.ReadCloser) @@ -128,7 +128,7 @@ func (mr *MockTaskStorageDriverMockRecorder) UpdateTask(ctx, req interface{}) *g } // ValidateDigest mocks base method. -func (m *MockTaskStorageDriver) ValidateDigest(req *storage.PeerTaskMetaData) error { +func (m *MockTaskStorageDriver) ValidateDigest(req *storage.PeerTaskMetadata) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "ValidateDigest", req) ret0, _ := ret[0].(error) @@ -298,7 +298,7 @@ func (mr *MockManagerMockRecorder) GetPieces(ctx, req interface{}) *gomock.Call } // IsInvalid mocks base method. -func (m *MockManager) IsInvalid(req *storage.PeerTaskMetaData) (bool, error) { +func (m *MockManager) IsInvalid(req *storage.PeerTaskMetadata) (bool, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "IsInvalid", req) ret0, _ := ret[0].(bool) @@ -325,7 +325,7 @@ func (mr *MockManagerMockRecorder) Keep() *gomock.Call { } // ReadAllPieces mocks base method. -func (m *MockManager) ReadAllPieces(ctx context.Context, req *storage.PeerTaskMetaData) (io.ReadCloser, error) { +func (m *MockManager) ReadAllPieces(ctx context.Context, req *storage.PeerTaskMetadata) (io.ReadCloser, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "ReadAllPieces", ctx, req) ret0, _ := ret[0].(io.ReadCloser) @@ -398,7 +398,7 @@ func (mr *MockManagerMockRecorder) UpdateTask(ctx, req interface{}) *gomock.Call } // ValidateDigest mocks base method. -func (m *MockManager) ValidateDigest(req *storage.PeerTaskMetaData) error { +func (m *MockManager) ValidateDigest(req *storage.PeerTaskMetadata) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "ValidateDigest", req) ret0, _ := ret[0].(error) diff --git a/client/daemon/upload/upload_manager.go b/client/daemon/upload/upload_manager.go index 41e64987dc7..a3d035cf030 100644 --- a/client/daemon/upload/upload_manager.go +++ b/client/daemon/upload/upload_manager.go @@ -111,11 +111,11 @@ func (um *uploadManager) handleUpload(w http.ResponseWriter, r *http.Request) { w.Header().Add(headers.ContentLength, fmt.Sprintf("%d", rg[0].Length)) reader, closer, err := um.StorageManager.ReadPiece(r.Context(), &storage.ReadPieceRequest{ - PeerTaskMetaData: storage.PeerTaskMetaData{ + PeerTaskMetadata: storage.PeerTaskMetadata{ TaskID: task, PeerID: peer, }, - PieceMetaData: storage.PieceMetaData{ + PieceMetadata: storage.PieceMetadata{ Num: -1, Range: rg[0], },