Skip to content

Commit

Permalink
feat: beautify scheduler & CDN log (#618)
Browse files Browse the repository at this point in the history
* feat: beautify scheduler & CDN log

Signed-off-by: santong <[email protected]>
  • Loading branch information
244372610 authored Sep 8, 2021
1 parent 4f44c13 commit 3300b75
Show file tree
Hide file tree
Showing 16 changed files with 146 additions and 126 deletions.
8 changes: 4 additions & 4 deletions cdnsystem/supervisor/cdn/cache_detector.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (cd *cacheDetector) detectCache(ctx context.Context, task *types.SeedTask,
}()
result, err = cd.doDetect(ctx, task, fileDigest)
if err != nil {
logger.WithTaskID(task.TaskID).Infof("failed to detect cache, reset cache: %v", err)
task.Log().Infof("failed to detect cache, reset cache: %v", err)
metaData, err := cd.resetCache(task)
if err == nil {
result = &cacheResult{
Expand All @@ -85,7 +85,7 @@ func (cd *cacheDetector) detectCache(ctx context.Context, task *types.SeedTask,
return result, err
}
if err := cd.cacheDataManager.updateAccessTime(task.TaskID, getCurrentTimeMillisFunc()); err != nil {
logger.WithTaskID(task.TaskID).Warnf("failed to update task access time ")
task.Log().Warnf("failed to update task access time ")
}
return result, nil
}
Expand All @@ -107,9 +107,9 @@ func (cd *cacheDetector) doDetect(ctx context.Context, task *types.SeedTask, fil
expired, err := source.IsExpired(ctx, task.URL, task.Header, fileMetaData.ExpireInfo)
if err != nil {
// 如果获取失败,则认为没有过期,防止打爆源
logger.WithTaskID(task.TaskID).Errorf("failed to check if the task expired: %v", err)
task.Log().Errorf("failed to check if the task expired: %v", err)
}
logger.WithTaskID(task.TaskID).Debugf("task expired result: %t", expired)
task.Log().Debugf("task expired result: %t", expired)
if expired {
return nil, cdnerrors.ErrResourceExpired{URL: task.URL}
}
Expand Down
3 changes: 1 addition & 2 deletions cdnsystem/supervisor/cdn/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"io"

"d7y.io/dragonfly/v2/cdnsystem/types"
logger "d7y.io/dragonfly/v2/internal/dflog"
"d7y.io/dragonfly/v2/pkg/source"
"d7y.io/dragonfly/v2/pkg/structure/maputils"
"d7y.io/dragonfly/v2/pkg/util/rangeutils"
Expand All @@ -43,7 +42,7 @@ func (cm *Manager) download(ctx context.Context, task *types.SeedTask, detectRes
headers[RangeHeaderName] = fmt.Sprintf("bytes=%s", breakRange)
}
}
logger.WithTaskID(task.TaskID).Infof("start download url %s at range: %d-%d: with header: %+v", task.URL, detectResult.breakPoint,
task.Log().Infof("start download url %s at range: %d-%d: with header: %+v", task.URL, detectResult.breakPoint,
task.SourceFileLength, task.Header)
reader, responseHeader, err := source.DownloadWithResponseHeader(ctx, task.URL, headers)
// update Expire info
Expand Down
12 changes: 6 additions & 6 deletions cdnsystem/supervisor/cdn/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,15 +113,15 @@ func (cm *Manager) TriggerCDN(ctx context.Context, task *types.SeedTask) (seedTa
return seedTask, errors.Wrapf(err, "failed to detect cache")
}
span.SetAttributes(config.AttributeCacheResult.String(detectResult.String()))
logger.WithTaskID(task.TaskID).Debugf("detects cache result: %+v", detectResult)
task.Log().Debugf("detects cache result: %+v", detectResult)
// second: report detect result
err = cm.cdnReporter.reportCache(ctx, task.TaskID, detectResult)
if err != nil {
logger.WithTaskID(task.TaskID).Errorf("failed to report cache, reset detectResult: %v", err)
task.Log().Errorf("failed to report cache, reset detectResult: %v", err)
}
// full cache
if detectResult.breakPoint == -1 {
logger.WithTaskID(task.TaskID).Infof("cache full hit on local")
task.Log().Infof("cache full hit on local")
seedTask.UpdateTaskInfo(types.TaskInfoCdnStatusSuccess, detectResult.fileMetaData.SourceRealDigest, detectResult.fileMetaData.PieceMd5Sign,
detectResult.fileMetaData.SourceFileLen, detectResult.fileMetaData.CdnFileLength)
return seedTask, nil
Expand All @@ -148,7 +148,7 @@ func (cm *Manager) TriggerCDN(ctx context.Context, task *types.SeedTask) (seedTa
if err != nil {
server.StatSeedFinish(task.TaskID, task.URL, false, err, start.Nanosecond(), time.Now().Nanosecond(), downloadMetadata.backSourceLength,
downloadMetadata.realSourceFileLength)
logger.WithTaskID(task.TaskID).Errorf("failed to write for task: %v", err)
task.Log().Errorf("failed to write for task: %v", err)
seedTask.UpdateStatus(types.TaskInfoCdnStatusFailed)
return seedTask, err
}
Expand Down Expand Up @@ -179,7 +179,7 @@ func (cm *Manager) TryFreeSpace(fileLength int64) (bool, error) {
}

func (cm *Manager) handleCDNResult(task *types.SeedTask, sourceDigest string, downloadMetadata *downloadMetadata) (bool, error) {
logger.WithTaskID(task.TaskID).Debugf("handle cdn result, downloadMetaData: %+v", downloadMetadata)
task.Log().Debugf("handle cdn result, downloadMetaData: %+v", downloadMetadata)
var isSuccess = true
var errorMsg string
// check md5
Expand Down Expand Up @@ -222,7 +222,7 @@ func (cm *Manager) handleCDNResult(task *types.SeedTask, sourceDigest string, do
return false, errors.New(errorMsg)
}

logger.WithTaskID(task.TaskID).Infof("success to get task, downloadMetadata: %+v realDigest: %s", downloadMetadata, sourceDigest)
task.Log().Infof("success to get task, downloadMetadata: %+v realDigest: %s", downloadMetadata, sourceDigest)

return true, nil
}
Expand Down
2 changes: 1 addition & 1 deletion cdnsystem/supervisor/cdn/storage/hybrid/hybrid.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ func (h *hybridStorageMgr) CreateUploadLink(taskID string) error {

func (h *hybridStorageMgr) ResetRepo(task *types.SeedTask) error {
if err := h.deleteTaskFiles(task.TaskID, false, true); err != nil {
logger.WithTaskID(task.TaskID).Errorf("reset repo: failed to delete task files: %v", err)
task.Log().Errorf("reset repo: failed to delete task files: %v", err)
}
// 判断是否有足够空间存放
shmPath, err := h.tryShmSpace(task.URL, task.TaskID, task.SourceFileLength)
Expand Down
20 changes: 10 additions & 10 deletions cdnsystem/supervisor/task/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,18 +83,18 @@ func (tm *Manager) Register(ctx context.Context, req *types.TaskRegisterRequest)
}
taskBytes, _ := json.Marshal(task)
span.SetAttributes(config.AttributeTaskInfo.String(string(taskBytes)))
logger.WithTaskID(task.TaskID).Debugf("success get task info: %+v", task)
task.Log().Debugf("success get task info: %+v", task)

// update accessTime for taskId
if err := tm.accessTimeMap.Add(task.TaskID, time.Now()); err != nil {
logger.WithTaskID(task.TaskID).Warnf("failed to update accessTime: %v", err)
task.Log().Warnf("failed to update accessTime: %v", err)
}

// trigger CDN
if err := tm.triggerCdnSyncAction(ctx, task); err != nil {
return nil, errors.Wrapf(err, "trigger cdn")
}
logger.WithTaskID(task.TaskID).Infof("successfully trigger cdn sync action")
task.Log().Infof("successfully trigger cdn sync action")
// watch seed progress
return tm.progressMgr.WatchSeedProgress(ctx, task.TaskID)
}
Expand All @@ -107,7 +107,7 @@ func (tm *Manager) triggerCdnSyncAction(ctx context.Context, task *types.SeedTas
synclock.Lock(task.TaskID, true)
if !task.IsFrozen() {
span.SetAttributes(config.AttributeTaskStatus.String(task.CdnStatus))
logger.WithTaskID(task.TaskID).Infof("seedTask is running or has been downloaded successfully, status: %s", task.CdnStatus)
task.Log().Infof("seedTask is running or has been downloaded successfully, status: %s", task.CdnStatus)
synclock.UnLock(task.TaskID, true)
return nil
}
Expand All @@ -118,12 +118,12 @@ func (tm *Manager) triggerCdnSyncAction(ctx context.Context, task *types.SeedTas
// reconfirm
span.SetAttributes(config.AttributeTaskStatus.String(task.CdnStatus))
if !task.IsFrozen() {
logger.WithTaskID(task.TaskID).Infof("reconfirm find seedTask is running or has been downloaded successfully, status: %s", task.CdnStatus)
task.Log().Infof("reconfirm find seedTask is running or has been downloaded successfully, status: %s", task.CdnStatus)
return nil
}
if task.IsWait() {
tm.progressMgr.InitSeedProgress(ctx, task.TaskID)
logger.WithTaskID(task.TaskID).Infof("successfully init seed progress for task")
task.Log().Infof("successfully init seed progress for task")
}
updatedTask, err := tm.updateTask(task.TaskID, &types.SeedTask{
CdnStatus: types.TaskInfoCdnStatusRunning,
Expand All @@ -135,19 +135,19 @@ func (tm *Manager) triggerCdnSyncAction(ctx context.Context, task *types.SeedTas
go func() {
updateTaskInfo, err := tm.cdnMgr.TriggerCDN(ctx, task)
if err != nil {
logger.WithTaskID(task.TaskID).Errorf("trigger cdn get error: %v", err)
task.Log().Errorf("trigger cdn get error: %v", err)
}
go func() {
if err := tm.progressMgr.PublishTask(ctx, task.TaskID, updateTaskInfo); err != nil {
logger.WithTaskID(task.TaskID).Errorf("failed to publish task: %v", err)
task.Log().Errorf("failed to publish task: %v", err)
}

}()
updatedTask, err = tm.updateTask(task.TaskID, updateTaskInfo)
if err != nil {
logger.WithTaskID(task.TaskID).Errorf("failed to update task: %v", err)
task.Log().Errorf("failed to update task: %v", err)
}
logger.WithTaskID(task.TaskID).Infof("successfully update task cdn updatedTask: %+v", updatedTask)
task.Log().Infof("successfully update task cdn updatedTask: %+v", updatedTask)
}()
return nil
}
Expand Down
18 changes: 3 additions & 15 deletions cdnsystem/supervisor/task/manager_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,6 @@ import (
"go.opentelemetry.io/otel/trace"
)

const (
IllegalSourceFileLen = -100
)

// addOrUpdateTask add a new task or update exist task
func (tm *Manager) addOrUpdateTask(ctx context.Context, request *types.TaskRegisterRequest) (*types.SeedTask, error) {
var span trace.Span
Expand Down Expand Up @@ -67,15 +63,7 @@ func (tm *Manager) addOrUpdateTask(ctx context.Context, request *types.TaskRegis
}

var task *types.SeedTask
newTask := &types.SeedTask{
TaskID: taskID,
Header: request.Header,
RequestDigest: request.Digest,
URL: request.URL,
TaskURL: taskURL,
CdnStatus: types.TaskInfoCdnStatusWaiting,
SourceFileLength: IllegalSourceFileLen,
}
newTask := types.NewSeedTask(taskID, request.Header, request.Digest, request.URL, taskURL)
// using the existing task if it already exists corresponding to taskID
if v, err := tm.taskStore.Get(taskID); err == nil {
span.SetAttributes(config.AttributeIfReuseTask.Bool(true))
Expand All @@ -92,7 +80,7 @@ func (tm *Manager) addOrUpdateTask(ctx context.Context, request *types.TaskRegis
task = newTask
}

if task.SourceFileLength != IllegalSourceFileLen {
if task.SourceFileLength != types.IllegalSourceFileLen {
return task, nil
}

Expand All @@ -102,7 +90,7 @@ func (tm *Manager) addOrUpdateTask(ctx context.Context, request *types.TaskRegis
span.AddEvent(config.EventRequestSourceFileLength)
sourceFileLength, err := source.GetContentLength(ctx, task.URL, request.Header)
if err != nil {
logger.WithTaskID(task.TaskID).Errorf("failed to get url (%s) content length: %v", task.URL, err)
task.Log().Errorf("failed to get url (%s) content length: %v", task.URL, err)
if cdnerrors.IsURLNotReachable(err) {
tm.taskURLUnReachableStore.Add(taskID, time.Now())
return nil, err
Expand Down
29 changes: 29 additions & 0 deletions cdnsystem/types/seed_task_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@

package types

import (
logger "d7y.io/dragonfly/v2/internal/dflog"
)

type SeedTask struct {
TaskID string `json:"taskId,omitempty"`
URL string `json:"url,omitempty"`
Expand All @@ -29,6 +33,24 @@ type SeedTask struct {
RequestDigest string `json:"requestDigest,omitempty"`
SourceRealDigest string `json:"sourceRealDigest,omitempty"`
PieceMd5Sign string `json:"pieceMd5Sign,omitempty"`
logger *logger.SugaredLoggerOnWith
}

const (
IllegalSourceFileLen = -100
)

func NewSeedTask(taskID string, header map[string]string, digest string, url string, taskURL string) *SeedTask {
return &SeedTask{
TaskID: taskID,
Header: header,
RequestDigest: digest,
URL: url,
TaskURL: taskURL,
CdnStatus: TaskInfoCdnStatusWaiting,
SourceFileLength: IllegalSourceFileLen,
logger: logger.WithTaskID(taskID),
}
}

// IsSuccess determines that whether the CDNStatus is success.
Expand Down Expand Up @@ -67,6 +89,13 @@ func (task *SeedTask) UpdateTaskInfo(cdnStatus, realDigest, pieceMd5Sign string,
task.CdnFileLength = cdnFileLength
}

func (task *SeedTask) Log() *logger.SugaredLoggerOnWith {
if task.logger == nil {
task.logger = logger.WithTaskID(task.TaskID)
}
return task.logger
}

const (

// TaskInfoCdnStatusWaiting captures enum value "WAITING"
Expand Down
4 changes: 2 additions & 2 deletions pkg/source/httpprotocol/http_source_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ import (
"os"
"time"

"d7y.io/dragonfly/v2/cdnsystem/types"
"d7y.io/dragonfly/v2/pkg/util/rangeutils"

"github.com/go-http-utils/headers"

"d7y.io/dragonfly/v2/cdnsystem/supervisor/task"
"d7y.io/dragonfly/v2/pkg/source"
"d7y.io/dragonfly/v2/pkg/structure/maputils"
"d7y.io/dragonfly/v2/pkg/util/stringutils"
Expand Down Expand Up @@ -113,7 +113,7 @@ func (client *httpSourceClient) GetContentLength(ctx context.Context, url string
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusPartialContent {
// TODO Whether this situation should be distinguished from the err situation,
//similar to proposing another error type to indicate that this error can interact with the URL, but the status code does not meet expectations
return task.IllegalSourceFileLen, fmt.Errorf("get http resource length failed, unexpected code: %d", resp.StatusCode)
return types.IllegalSourceFileLen, fmt.Errorf("get http resource length failed, unexpected code: %d", resp.StatusCode)
}
return resp.ContentLength, nil
}
Expand Down
Loading

0 comments on commit 3300b75

Please sign in to comment.