Skip to content

Commit

Permalink
Fix golang lint (#249)
Browse files Browse the repository at this point in the history
Signed-off-by: Gaius <[email protected]>Signed-off-by: hanson.yj <[email protected]>
Signed-off-by: hanson.yj <[email protected]>
  • Loading branch information
gaius-qi authored and yangjun289519474 committed May 24, 2021
1 parent 7346cf6 commit 019b3dc
Show file tree
Hide file tree
Showing 68 changed files with 405 additions and 346 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ jobs:
uses: actions/checkout@v2

- name: Golangci lint
uses: golangci/golangci-lint-action@v2
uses: golangci/golangci-lint-action@v2.5.2
with:
version: latest

Expand Down
29 changes: 29 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,32 @@ mysql
!.vscode/launch.json
!.vscode/extensions.json
*.code-workspace

### macOS ###
# General
.DS_Store
.AppleDouble
.LSOverride

# Icon must end with two \r
Icon


# Thumbnails
._*

# Files that might appear in the root of a volume
.DocumentRevisions-V100
.fseventsd
.Spotlight-V100
.TemporaryItems
.Trashes
.VolumeIcon.icns
.com.apple.timemachine.donotpresent

# Directories potentially created on remote AFP share
.AppleDB
.AppleDesktop
Network Trash Folder
Temporary Items
.apdisk
2 changes: 1 addition & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ run:

linters-settings:
gocyclo:
min-complexity: 20
min-complexity: 40

linters:
disable-all: true
Expand Down
14 changes: 7 additions & 7 deletions cdnsystem/daemon/mgr/cdn/cache_data_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,19 +48,19 @@ func newCacheDataManager(storeMgr storage.Manager) *cacheDataManager {

// writeFileMetaDataByTask stores the metadata of task by task to storage.
func (mm *cacheDataManager) writeFileMetaDataByTask(ctx context.Context, task *types.SeedTask) (*storage.FileMetaData, error) {
mm.cacheLocker.Lock(task.TaskId, false)
defer mm.cacheLocker.UnLock(task.TaskId, false)
mm.cacheLocker.Lock(task.TaskID, false)
defer mm.cacheLocker.UnLock(task.TaskID, false)
metaData := &storage.FileMetaData{
TaskId: task.TaskId,
TaskURL: task.TaskUrl,
TaskID: task.TaskID,
TaskURL: task.TaskURL,
PieceSize: task.PieceSize,
SourceFileLen: task.SourceFileLength,
AccessTime: getCurrentTimeMillisFunc(),
CdnFileLength: task.CdnFileLength,
TotalPieceCount: task.PieceTotal,
}

if err := mm.storage.WriteFileMetaData(ctx, task.TaskId, metaData); err != nil {
if err := mm.storage.WriteFileMetaData(ctx, task.TaskID, metaData); err != nil {
return nil, errors.Wrapf(err, "failed to write file metadata to storage")
}

Expand Down Expand Up @@ -197,8 +197,8 @@ func (mm *cacheDataManager) readDownloadFile(ctx context.Context, taskID string)
}

func (mm *cacheDataManager) resetRepo(ctx context.Context, task *types.SeedTask) error {
mm.cacheLocker.Lock(task.TaskId, false)
defer mm.cacheLocker.UnLock(task.TaskId, false)
mm.cacheLocker.Lock(task.TaskID, false)
defer mm.cacheLocker.UnLock(task.TaskID, false)
return mm.storage.ResetRepo(ctx, task)
}

Expand Down
26 changes: 13 additions & 13 deletions cdnsystem/daemon/mgr/cdn/cache_detector.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (cd *cacheDetector) detectCache(ctx context.Context, task *types.SeedTask)
//}
result, err := cd.doDetect(ctx, task)
if err != nil {
logger.WithTaskID(task.TaskId).Infof("failed to detect cache, reset cache: %v", err)
logger.WithTaskID(task.TaskID).Infof("failed to detect cache, reset cache: %v", err)
metaData, err := cd.resetCache(ctx, task)
if err == nil {
result = &cacheResult{
Expand All @@ -75,46 +75,46 @@ func (cd *cacheDetector) detectCache(ctx context.Context, task *types.SeedTask)
}
return result, err
}
if err := cd.cacheDataManager.updateAccessTime(ctx, task.TaskId, getCurrentTimeMillisFunc()); err != nil {
logger.WithTaskID(task.TaskId).Warnf("failed to update task access time ")
if err := cd.cacheDataManager.updateAccessTime(ctx, task.TaskID, getCurrentTimeMillisFunc()); err != nil {
logger.WithTaskID(task.TaskID).Warnf("failed to update task access time ")
}
return result, nil
}

// detectCache the actual detect action which detects file metaData and pieces metaData of specific task
func (cd *cacheDetector) doDetect(ctx context.Context, task *types.SeedTask) (result *cacheResult, err error) {
fileMetaData, err := cd.cacheDataManager.readFileMetaData(ctx, task.TaskId)
fileMetaData, err := cd.cacheDataManager.readFileMetaData(ctx, task.TaskID)
if err != nil {
return nil, errors.Wrapf(err, "failed to read file meta data")
}
if err := checkSameFile(task, fileMetaData); err != nil {
return nil, errors.Wrapf(err, "task does not match meta information of task file")
}
expired, err := cd.resourceClient.IsExpired(task.Url, task.Header, fileMetaData.ExpireInfo)
expired, err := cd.resourceClient.IsExpired(task.URL, task.Header, fileMetaData.ExpireInfo)
if err != nil {
// 如果获取失败,则认为没有过期,防止打爆源
logger.WithTaskID(task.TaskId).Errorf("failed to check if the task expired: %v", err)
logger.WithTaskID(task.TaskID).Errorf("failed to check if the task expired: %v", err)
}
logger.WithTaskID(task.TaskId).Debugf("task expired result: %t", expired)
logger.WithTaskID(task.TaskID).Debugf("task expired result: %t", expired)
if expired {
return nil, errors.Wrapf(cdnerrors.ErrResourceExpired, "url:%s, expireInfo:%+v", task.Url,
return nil, errors.Wrapf(cdnerrors.ErrResourceExpired, "url:%s, expireInfo:%+v", task.URL,
fileMetaData.ExpireInfo)
}
// not expired
if fileMetaData.Finish {
// quickly detect the cache situation through the meta data
return cd.parseByReadMetaFile(ctx, task.TaskId, fileMetaData)
return cd.parseByReadMetaFile(ctx, task.TaskID, fileMetaData)
}
// check if the resource supports range request. if so,
// detect the cache situation by reading piece meta and data file
supportRange, err := cd.resourceClient.IsSupportRange(task.Url, task.Header)
supportRange, err := cd.resourceClient.IsSupportRange(task.URL, task.Header)
if err != nil {
return nil, errors.Wrapf(err, "failed to check if url(%s) supports range request", task.Url)
return nil, errors.Wrapf(err, "failed to check if url(%s) supports range request", task.URL)
}
if !supportRange {
return nil, errors.Wrapf(cdnerrors.ErrResourceNotSupportRangeRequest, "url:%s", task.Url)
return nil, errors.Wrapf(cdnerrors.ErrResourceNotSupportRangeRequest, "url:%s", task.URL)
}
return cd.parseByReadFile(ctx, task.TaskId, fileMetaData)
return cd.parseByReadFile(ctx, task.TaskID, fileMetaData)
}

// parseByReadMetaFile detect cache by read meta and pieceMeta files of task
Expand Down
8 changes: 4 additions & 4 deletions cdnsystem/daemon/mgr/cdn/cache_detector_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,12 @@ func checkSameFile(task *types.SeedTask, metaData *storage.FileMetaData) error {
task.PieceSize)
}

if metaData.TaskId != task.TaskId {
return errors.Errorf("meta task TaskId(%s) is not equals with task TaskId(%s)", metaData.TaskId, task.TaskId)
if metaData.TaskID != task.TaskID {
return errors.Errorf("meta task TaskId(%s) is not equals with task TaskId(%s)", metaData.TaskID, task.TaskID)
}

if metaData.TaskURL != task.TaskUrl {
return errors.Errorf("meta task taskUrl(%s) is not equals with task taskUrl(%s)", metaData.TaskURL, task.Url)
if metaData.TaskURL != task.TaskURL {
return errors.Errorf("meta task taskUrl(%s) is not equals with task taskUrl(%s)", metaData.TaskURL, task.URL)
}
if !stringutils.IsBlank(metaData.SourceRealMd5) && !stringutils.IsBlank(task.RequestMd5) &&
metaData.SourceRealMd5 != task.RequestMd5 {
Expand Down
16 changes: 8 additions & 8 deletions cdnsystem/daemon/mgr/cdn/cache_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
)

type protocolContent struct {
TaskId string
TaskID string
pieceNum int32
pieceSize int32
pieceContent *bytes.Buffer
Expand Down Expand Up @@ -80,13 +80,13 @@ func (cw *cacheWriter) startWriter(ctx context.Context, reader io.Reader, task *
if int(pieceContLeft) <= n {
bb.Write(buf[:pieceContLeft])
pc := &protocolContent{
TaskId: task.TaskId,
TaskID: task.TaskID,
pieceNum: curPieceNum,
pieceSize: task.PieceSize,
pieceContent: bb,
}
jobCh <- pc
logger.WithTaskID(task.TaskId).Debugf("send protocolContent to jobCh, pieceNum: %d", curPieceNum)
logger.WithTaskID(task.TaskID).Debugf("send protocolContent to jobCh, pieceNum: %d", curPieceNum)
curPieceNum++

// write the data left to a new buffer
Expand All @@ -105,16 +105,16 @@ func (cw *cacheWriter) startWriter(ctx context.Context, reader io.Reader, task *
if err == io.EOF {
if bb.Len() > 0 {
pc := &protocolContent{
TaskId: task.TaskId,
TaskID: task.TaskID,
pieceNum: curPieceNum,
pieceSize: task.PieceSize,
pieceContent: bb,
}
jobCh <- pc
curPieceNum++
logger.WithTaskID(task.TaskId).Debugf("send the last protocolContent, pieceNum: %d", curPieceNum)
logger.WithTaskID(task.TaskID).Debugf("send the last protocolContent, pieceNum: %d", curPieceNum)
}
logger.WithTaskID(task.TaskId).Info("send all protocolContents and wait for cdnWriter")
logger.WithTaskID(task.TaskID).Info("send all protocolContents and wait for cdnWriter")
break
}
if err != nil {
Expand All @@ -127,12 +127,12 @@ func (cw *cacheWriter) startWriter(ctx context.Context, reader io.Reader, task *
close(jobCh)
wg.Wait()

storageInfo, err := cw.cacheDataManager.statDownloadFile(ctx, task.TaskId)
storageInfo, err := cw.cacheDataManager.statDownloadFile(ctx, task.TaskID)
if err != nil {
return &downloadMetadata{backSourceLength: backSourceFileLength}, errors.Wrapf(err, "failed to get cdn file length")
}

pieceMd5Sign, _, err := cw.cacheDataManager.getPieceMd5Sign(ctx, task.TaskId)
pieceMd5Sign, _, err := cw.cacheDataManager.getPieceMd5Sign(ctx, task.TaskID)
if err != nil {
return &downloadMetadata{backSourceLength: backSourceFileLength}, errors.Wrapf(err, "failed to get piece md5 sign")
}
Expand Down
12 changes: 6 additions & 6 deletions cdnsystem/daemon/mgr/cdn/cache_writer_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ func (cw *cacheWriter) writerPool(ctx context.Context, wg *sync.WaitGroup, write
pieceLen := originPieceLen // 经过处理后写到存储介质的真实长度
pieceStyle := types.PlainUnspecified

if err := cw.writeToFile(ctx, job.TaskId, waitToWriteContent, int64(job.pieceNum)*int64(job.pieceSize), pieceMd5); err != nil {
logger.WithTaskID(job.TaskId).Errorf("failed to write file, pieceNum %d: %v", job.pieceNum, err)
if err := cw.writeToFile(ctx, job.TaskID, waitToWriteContent, int64(job.pieceNum)*int64(job.pieceSize), pieceMd5); err != nil {
logger.WithTaskID(job.TaskID).Errorf("failed to write file, pieceNum %d: %v", job.pieceNum, err)
// todo redo the job?
continue
}
Expand All @@ -95,16 +95,16 @@ func (cw *cacheWriter) writerPool(ctx context.Context, wg *sync.WaitGroup, write
go func(record *storage.PieceMetaRecord) {
defer wg.Done()
// todo 可以先塞入channel,然后启动单独goroutine顺序写文件
if err := cw.cacheDataManager.appendPieceMetaData(ctx, job.TaskId, record); err != nil {
logger.WithTaskID(job.TaskId).Errorf("failed to append piece meta data to file:%v", err)
if err := cw.cacheDataManager.appendPieceMetaData(ctx, job.TaskID, record); err != nil {
logger.WithTaskID(job.TaskID).Errorf("failed to append piece meta data to file:%v", err)
}
}(pieceRecord)

if cw.cdnReporter != nil {
if err := cw.cdnReporter.reportPieceMetaRecord(ctx, job.TaskId, pieceRecord,
if err := cw.cdnReporter.reportPieceMetaRecord(ctx, job.TaskID, pieceRecord,
DownloaderReport); err != nil {
// NOTE: should we do this job again?
logger.WithTaskID(job.TaskId).Errorf("failed to report piece status, pieceNum %d pieceMetaRecord %s: %v", job.pieceNum, pieceRecord, err)
logger.WithTaskID(job.TaskID).Errorf("failed to report piece status, pieceNum %d pieceMetaRecord %s: %v", job.pieceNum, pieceRecord, err)
continue
}
}
Expand Down
4 changes: 2 additions & 2 deletions cdnsystem/daemon/mgr/cdn/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func (cm *Manager) download(task *types.SeedTask, detectResult *cacheResult) (io
headers[RangeHeaderName] = fmt.Sprintf("bytes=%s", breakRange)
}
}
logger.WithTaskID(task.TaskId).Infof("start download url %s at range:%d-%d: with header: %+v", task.Url, detectResult.breakPoint,
logger.WithTaskID(task.TaskID).Infof("start download url %s at range:%d-%d: with header: %+v", task.URL, detectResult.breakPoint,
task.SourceFileLength, task.Header)
return cm.resourceClient.Download(task.Url, headers)
return cm.resourceClient.Download(task.URL, headers)
}
32 changes: 16 additions & 16 deletions cdnsystem/daemon/mgr/cdn/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,38 +81,38 @@ func NewManager(cfg *config.Config, cacheStore storage.Manager, progressMgr mgr.

func (cm *Manager) TriggerCDN(ctx context.Context, task *types.SeedTask) (seedTask *types.SeedTask, err error) {
// obtain taskId write lock
cm.cdnLocker.Lock(task.TaskId, false)
defer cm.cdnLocker.UnLock(task.TaskId, false)
cm.cdnLocker.Lock(task.TaskID, false)
defer cm.cdnLocker.UnLock(task.TaskID, false)
// first: detect Cache
detectResult, err := cm.detector.detectCache(ctx, task)
if err != nil {
return getUpdateTaskInfoWithStatusOnly(types.TaskInfoCdnStatusFailed), errors.Wrapf(err, "failed to detect cache")
}
logger.WithTaskID(task.TaskId).Debugf("detects cache result: %+v", detectResult)
logger.WithTaskID(task.TaskID).Debugf("detects cache result: %+v", detectResult)
// second: report detect result
err = cm.cdnReporter.reportCache(ctx, task.TaskId, detectResult)
err = cm.cdnReporter.reportCache(ctx, task.TaskID, detectResult)
if err != nil {
logger.WithTaskID(task.TaskId).Errorf("failed to report cache, reset detectResult:%v", err)
logger.WithTaskID(task.TaskID).Errorf("failed to report cache, reset detectResult:%v", err)
}
// full cache
if detectResult.breakPoint == -1 {
logger.WithTaskID(task.TaskId).Infof("cache full hit on local")
logger.WithTaskID(task.TaskID).Infof("cache full hit on local")
return getUpdateTaskInfo(types.TaskInfoCdnStatusSuccess, detectResult.fileMetaData.SourceRealMd5, detectResult.fileMetaData.PieceMd5Sign,
detectResult.fileMetaData.SourceFileLen, detectResult.fileMetaData.CdnFileLength), nil
}
server.StatSeedStart(task.TaskId, task.Url)
server.StatSeedStart(task.TaskID, task.URL)
start := time.Now()
// third: start to download the source file
body, expireInfo, err := cm.download(task, detectResult)
// download fail
if err != nil {
server.StatSeedFinish(task.TaskId, task.Url, false, err, start.Nanosecond(), time.Now().Nanosecond(), 0, 0)
server.StatSeedFinish(task.TaskID, task.URL, false, err, start.Nanosecond(), time.Now().Nanosecond(), 0, 0)
return getUpdateTaskInfoWithStatusOnly(types.TaskInfoCdnStatusSourceError), err
}
defer body.Close()

//update Expire info
cm.updateExpireInfo(ctx, task.TaskId, expireInfo)
cm.updateExpireInfo(ctx, task.TaskID, expireInfo)
fileMd5 := md5.New()
if detectResult.fileMd5 != nil {
fileMd5 = detectResult.fileMd5
Expand All @@ -121,12 +121,12 @@ func (cm *Manager) TriggerCDN(ctx context.Context, task *types.SeedTask) (seedTa
// forth: write to storage
downloadMetadata, err := cm.writer.startWriter(ctx, reader, task, detectResult)
if err != nil {
server.StatSeedFinish(task.TaskId, task.Url, false, err, start.Nanosecond(), time.Now().Nanosecond(), downloadMetadata.backSourceLength,
server.StatSeedFinish(task.TaskID, task.URL, false, err, start.Nanosecond(), time.Now().Nanosecond(), downloadMetadata.backSourceLength,
downloadMetadata.realSourceFileLength)
logger.WithTaskID(task.TaskId).Errorf("failed to write for task: %v", err)
logger.WithTaskID(task.TaskID).Errorf("failed to write for task: %v", err)
return getUpdateTaskInfoWithStatusOnly(types.TaskInfoCdnStatusFailed), err
}
server.StatSeedFinish(task.TaskId, task.Url, true, nil, start.Nanosecond(), time.Now().Nanosecond(), downloadMetadata.backSourceLength,
server.StatSeedFinish(task.TaskID, task.URL, true, nil, start.Nanosecond(), time.Now().Nanosecond(), downloadMetadata.backSourceLength,
downloadMetadata.realSourceFileLength)
sourceMD5 := reader.Md5()
// fifth: handle CDN result
Expand All @@ -147,7 +147,7 @@ func (cm *Manager) Delete(ctx context.Context, taskID string) error {
}

func (cm *Manager) handleCDNResult(ctx context.Context, task *types.SeedTask, sourceMd5 string, downloadMetadata *downloadMetadata) (bool, error) {
logger.WithTaskID(task.TaskId).Debugf("handle cdn result, downloadMetaData: %+v", downloadMetadata)
logger.WithTaskID(task.TaskID).Debugf("handle cdn result, downloadMetaData: %+v", downloadMetadata)
var isSuccess = true
var errorMsg string
// check md5
Expand All @@ -165,7 +165,7 @@ func (cm *Manager) handleCDNResult(ctx context.Context, task *types.SeedTask, so
isSuccess = false
}
if !stringutils.IsBlank(errorMsg) {
logger.WithTaskID(task.TaskId).Error(errorMsg)
logger.WithTaskID(task.TaskID).Error(errorMsg)
}
sourceFileLen := task.SourceFileLength
if isSuccess && task.SourceFileLength <= 0 {
Expand All @@ -177,7 +177,7 @@ func (cm *Manager) handleCDNResult(ctx context.Context, task *types.SeedTask, so
if !isSuccess {
cdnFileLength = 0
}
if err := cm.cacheDataManager.updateStatusAndResult(ctx, task.TaskId, &storage.FileMetaData{
if err := cm.cacheDataManager.updateStatusAndResult(ctx, task.TaskID, &storage.FileMetaData{
Finish: true,
Success: isSuccess,
SourceRealMd5: sourceMd5,
Expand All @@ -193,7 +193,7 @@ func (cm *Manager) handleCDNResult(ctx context.Context, task *types.SeedTask, so
return false, errors.New(errorMsg)
}

logger.WithTaskID(task.TaskId).Infof("success to get task, downloadMetadata:%+v realMd5: %s", downloadMetadata, sourceMd5)
logger.WithTaskID(task.TaskID).Infof("success to get task, downloadMetadata:%+v realMd5: %s", downloadMetadata, sourceMd5)

return true, nil
}
Expand Down
2 changes: 1 addition & 1 deletion cdnsystem/daemon/mgr/cdn/storage/disk/disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ func (s *diskStorageMgr) DeleteTask(ctx context.Context, taskID string) error {
}

func (s *diskStorageMgr) ResetRepo(ctx context.Context, task *types.SeedTask) error {
return s.DeleteTask(ctx, task.TaskId)
return s.DeleteTask(ctx, task.TaskID)
}

func init() {
Expand Down
Loading

0 comments on commit 019b3dc

Please sign in to comment.