Skip to content

Commit

Permalink
unify back source interface (#877)
Browse files Browse the repository at this point in the history
* unify back source interface

Signed-off-by: sunwp <[email protected]>

* metaData Spelling correction

Signed-off-by: sunwp <[email protected]>

* hdfs compile

Signed-off-by: sunwp <[email protected]>

* hdfs compile

Signed-off-by: sunwp <[email protected]>

* hdfs unit test

Signed-off-by: sunwp <[email protected]>

* reset fqdn

Signed-off-by: sunwp <[email protected]>

* new seed task with header

Signed-off-by: sunwp <[email protected]>

* unify back source interface

Signed-off-by: sunwp <[email protected]>

* add unit tests & rename CheckRespCode -> CheckResponseCode & rename GetLastModifiedMillis -> GetLastModified

Signed-off-by: sunwp <[email protected]>

* add request test

Signed-off-by: sunwp <[email protected]>

* golang lint

Signed-off-by: sunwp <[email protected]>
  • Loading branch information
244372610 authored Dec 7, 2021
1 parent 2269a9e commit 935f861
Show file tree
Hide file tree
Showing 40 changed files with 1,755 additions and 805 deletions.
40 changes: 24 additions & 16 deletions cdn/supervisor/cdn/cache_detector.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"io"
"io/ioutil"
"sort"
"time"

"github.com/pkg/errors"
"go.opentelemetry.io/otel/trace"
Expand Down Expand Up @@ -103,16 +102,22 @@ func (cd *cacheDetector) doDetect(ctx context.Context, task *types.SeedTask, fil
if err := checkSameFile(task, fileMetaData); err != nil {
return nil, errors.Wrapf(err, "check same file")
}
ctx, expireCancel := context.WithTimeout(context.Background(), 4*time.Second)
defer expireCancel()
expired, err := source.IsExpired(ctx, task.URL, task.Header, fileMetaData.ExpireInfo)
checkExpiredRequest, err := source.NewRequestWithContext(ctx, task.URL, task.Header)
if err != nil {
// 如果获取失败,则认为没有过期,防止打爆源
task.Log().Errorf("failed to check if the task expired: %v", err)
return nil, errors.Wrapf(err, "create request")
}
task.Log().Debugf("task expired result: %t", expired)
expired, err := source.IsExpired(checkExpiredRequest, &source.ExpireInfo{
LastModified: fileMetaData.ExpireInfo[source.LastModified],
ETag: fileMetaData.ExpireInfo[source.ETag],
})
if err != nil {
// If the check fails, the resource is regarded as not expired to prevent the source from being knocked down
task.Log().Warnf("failed to check whether the source is expired. To prevent the source from being suspended, "+
"assume that the source is not expired: %v", err)
}
task.Log().Debugf("task resource expired result: %t", expired)
if expired {
return nil, cdnerrors.ErrResourceExpired{URL: task.URL}
return nil, errors.Errorf("resource %s has expired", task.TaskURL)
}
// not expired
if fileMetaData.Finish {
Expand All @@ -121,14 +126,17 @@ func (cd *cacheDetector) doDetect(ctx context.Context, task *types.SeedTask, fil
}
// check if the resource supports range request. if so,
// detect the cache situation by reading piece meta and data file
ctx, rangeCancel := context.WithTimeout(context.Background(), 4*time.Second)
defer rangeCancel()
supportRange, err := source.IsSupportRange(ctx, task.URL, task.Header)
checkSupportRangeRequest, err := source.NewRequestWithContext(ctx, task.URL, task.Header)
if err != nil {
return nil, errors.Wrapf(err, "create check support range request")
}
checkSupportRangeRequest.Header.Add(source.Range, "0-0")
supportRange, err := source.IsSupportRange(checkSupportRangeRequest)
if err != nil {
return nil, errors.Wrapf(err, "check if url(%s) supports range request", task.URL)
return nil, errors.Wrap(err, "check if support range")
}
if !supportRange {
return nil, cdnerrors.ErrResourceNotSupportRangeRequest{URL: task.URL}
return nil, errors.Errorf("resource %s is not support range request", task.URL)
}
return cd.parseByReadFile(task.TaskID, fileMetaData, fileDigest)
}
Expand Down Expand Up @@ -169,12 +177,12 @@ func (cd *cacheDetector) parseByReadMetaFile(taskID string, fileMetaData *storag
func (cd *cacheDetector) parseByReadFile(taskID string, metaData *storage.FileMetaData, fileDigest hash.Hash) (*cacheResult, error) {
reader, err := cd.cacheDataManager.readDownloadFile(taskID)
if err != nil {
return nil, errors.Wrapf(err, "read data file")
return nil, errors.Wrapf(err, "read download data file")
}
defer reader.Close()
tempRecords, err := cd.cacheDataManager.readPieceMetaRecords(taskID)
if err != nil {
return nil, errors.Wrapf(err, "read piece meta file")
return nil, errors.Wrapf(err, "read piece meta records")
}

// sort piece meta records by pieceNum
Expand All @@ -188,7 +196,7 @@ func (cd *cacheDetector) parseByReadFile(taskID string, metaData *storage.FileMe
if uint32(index) != tempRecords[index].PieceNum {
break
}
// read content
// read content TODO concurrent by multi-goroutine
if err := checkPieceContent(reader, tempRecords[index], fileDigest); err != nil {
logger.WithTaskID(taskID).Errorf("read content of pieceNum %d failed: %v", tempRecords[index].PieceNum, err)
break
Expand Down
26 changes: 15 additions & 11 deletions cdn/supervisor/cdn/cache_detector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"hash"
"io"
"io/ioutil"
"os"
"sort"
"strings"
"testing"
Expand All @@ -36,6 +37,7 @@ import (
storageMock "d7y.io/dragonfly/v2/cdn/supervisor/cdn/storage/mock"
"d7y.io/dragonfly/v2/cdn/types"
"d7y.io/dragonfly/v2/pkg/source"
"d7y.io/dragonfly/v2/pkg/source/httpprotocol"
sourceMock "d7y.io/dragonfly/v2/pkg/source/mock"
"d7y.io/dragonfly/v2/pkg/util/digestutils"
"d7y.io/dragonfly/v2/pkg/util/rangeutils"
Expand All @@ -53,7 +55,8 @@ type CacheDetectorTestSuite struct {
func (suite *CacheDetectorTestSuite) SetupSuite() {
ctrl := gomock.NewController(suite.T())
sourceClient := sourceMock.NewMockResourceClient(ctrl)
source.Register("http", sourceClient)
source.UnRegister("http")
suite.Require().Nil(source.Register("http", sourceClient, httpprotocol.Adapter))
storageMgr := storageMock.NewMockManager(ctrl)
cacheDataManager := newCacheDataManager(storageMgr)
suite.detector = newCacheDetector(cacheDataManager)
Expand All @@ -80,29 +83,30 @@ func (suite *CacheDetectorTestSuite) SetupSuite() {
suite.Nil(err)
return ioutil.NopCloser(strings.NewReader(string(content))), nil
}).AnyTimes()
storageMgr.EXPECT().ReadDownloadFile(noCache.taskID).Return(nil, cdnerrors.ErrFileNotExist{}).AnyTimes()
storageMgr.EXPECT().ReadDownloadFile(noCache.taskID).Return(nil, os.ErrNotExist).AnyTimes()
storageMgr.EXPECT().ReadPieceMetaRecords(fullNoExpiredCache.taskID).Return(fullNoExpiredCache.pieces, nil).AnyTimes()
storageMgr.EXPECT().ReadPieceMetaRecords(partialNotSupportRangeCache.taskID).Return(partialNotSupportRangeCache.pieces, nil).AnyTimes()
storageMgr.EXPECT().ReadPieceMetaRecords(partialSupportRangeCache.taskID).Return(partialSupportRangeCache.pieces, nil).AnyTimes()
storageMgr.EXPECT().ReadPieceMetaRecords(noCache.taskID).Return(nil, cdnerrors.ErrFileNotExist{}).AnyTimes()
storageMgr.EXPECT().ReadPieceMetaRecords(noCache.taskID).Return(nil, os.ErrNotExist).AnyTimes()
storageMgr.EXPECT().StatDownloadFile(fullNoExpiredCache.taskID).Return(&storedriver.StorageInfo{
Path: "",
Size: 9789,
CreateTime: time.Time{},
ModTime: time.Time{},
}, nil).AnyTimes()
storageMgr.EXPECT().StatDownloadFile(gomock.Not(fullNoExpiredCache.taskID)).Return(&storedriver.StorageInfo{}, nil).AnyTimes()

sourceClient.EXPECT().IsExpired(gomock.Any(), expiredAndSupportURL, gomock.Any(), gomock.Any()).Return(true, nil).AnyTimes()
sourceClient.EXPECT().IsSupportRange(gomock.Any(), expiredAndSupportURL, gomock.Any()).Return(true, nil).AnyTimes()
sourceClient.EXPECT().IsExpired(source.RequestEq(expiredAndSupportURL), gomock.Any()).Return(true, nil).AnyTimes()
sourceClient.EXPECT().IsSupportRange(source.RequestEq(expiredAndSupportURL)).Return(true, nil).AnyTimes()

sourceClient.EXPECT().IsExpired(gomock.Any(), expiredAndNotSupportURL, gomock.Any(), gomock.Any()).Return(true, nil).AnyTimes()
sourceClient.EXPECT().IsSupportRange(gomock.Any(), expiredAndNotSupportURL, gomock.Any()).Return(false, nil).AnyTimes()
sourceClient.EXPECT().IsExpired(source.RequestEq(noExpiredAndSupportURL), gomock.Any()).Return(false, nil).AnyTimes()
sourceClient.EXPECT().IsSupportRange(source.RequestEq(noExpiredAndSupportURL)).Return(true, nil).AnyTimes()

sourceClient.EXPECT().IsExpired(gomock.Any(), noExpiredAndSupportURL, gomock.Any(), gomock.Any()).Return(false, nil).AnyTimes()
sourceClient.EXPECT().IsSupportRange(gomock.Any(), noExpiredAndSupportURL, gomock.Any()).Return(true, nil).AnyTimes()
sourceClient.EXPECT().IsExpired(source.RequestEq(expiredAndNotSupportURL), gomock.Any()).Return(true, nil).AnyTimes()
sourceClient.EXPECT().IsSupportRange(source.RequestEq(expiredAndNotSupportURL)).Return(false, nil).AnyTimes()

sourceClient.EXPECT().IsExpired(gomock.Any(), noExpiredAndNotSupportURL, gomock.Any(), gomock.Any()).Return(false, nil).AnyTimes()
sourceClient.EXPECT().IsSupportRange(gomock.Any(), noExpiredAndNotSupportURL, gomock.Any()).Return(false, nil).AnyTimes()
sourceClient.EXPECT().IsExpired(source.RequestEq(noExpiredAndNotSupportURL), gomock.Any()).Return(false, nil).AnyTimes()
sourceClient.EXPECT().IsSupportRange(source.RequestEq(noExpiredAndNotSupportURL)).Return(false, nil).AnyTimes()
}

var noCacheTask, partialAndSupportCacheTask, partialAndNotSupportCacheTask, fullCacheExpiredTask, fullCacheNotExpiredTask = "noCache", "partialSupportCache",
Expand Down
63 changes: 42 additions & 21 deletions cdn/supervisor/cdn/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,34 +25,55 @@ import (

"d7y.io/dragonfly/v2/cdn/types"
"d7y.io/dragonfly/v2/pkg/source"
"d7y.io/dragonfly/v2/pkg/util/maputils"
"d7y.io/dragonfly/v2/pkg/util/rangeutils"
"d7y.io/dragonfly/v2/pkg/util/stringutils"
)

const RangeHeaderName = "Range"

func (cm *Manager) download(ctx context.Context, task *types.SeedTask, detectResult *cacheResult) (io.ReadCloser, error) {
headers := maputils.DeepCopy(task.Header)
if detectResult.breakPoint > 0 {
breakRange, err := rangeutils.GetBreakRange(detectResult.breakPoint, task.SourceFileLength)
func (cm *Manager) download(ctx context.Context, task *types.SeedTask, breakPoint int64) (io.ReadCloser, error) {
var err error
breakRange := task.Range
if breakPoint > 0 {
// todo replace task.SourceFileLength with totalSourceFileLength to get BreakRange
breakRange, err = getBreakRange(breakPoint, task.Range, task.SourceFileLength)
if err != nil {
return nil, errors.Wrapf(err, "failed to calculate the breakRange")
}
// check if Range in header? if Range already in Header, priority use this range
if _, ok := headers[RangeHeaderName]; !ok {
headers[RangeHeaderName] = fmt.Sprintf("bytes=%s", breakRange)
return nil, errors.Wrapf(err, "calculate the breakRange")
}
}
task.Log().Infof("start download url %s at range: %d-%d: with header: %#v", task.URL, detectResult.breakPoint,
task.SourceFileLength, task.Header)
reader, responseHeader, err := source.DownloadWithResponseHeader(ctx, task.URL, headers)
task.Log().Infof("start downloading URL %s at range %s with header %s", task.URL, breakRange, task.Header)
downloadRequest, err := source.NewRequestWithContext(ctx, task.URL, task.Header)
if err != nil {
return nil, errors.Wrap(err, "create download request")
}
if !stringutils.IsBlank(breakRange) {
downloadRequest.Header.Add(source.Range, breakRange)
}
body, expireInfo, err := source.DownloadWithExpireInfo(downloadRequest)
// update Expire info
if err == nil {
expireInfo := map[string]string{
source.LastModified: responseHeader.Get(source.LastModified),
source.ETag: responseHeader.Get(source.ETag),
}
cm.updateExpireInfo(task.TaskID, expireInfo)
cm.updateExpireInfo(task.TaskID, map[string]string{
source.LastModified: expireInfo.LastModified,
source.ETag: expireInfo.ETag,
})
}
return body, err
}

func getBreakRange(breakPoint int64, taskRange string, fileTotalLength int64) (string, error) {
if breakPoint <= 0 {
return "", errors.Errorf("breakPoint is non-positive: %d", breakPoint)
}
if fileTotalLength <= 0 {
return "", errors.Errorf("file length is non-positive: %d", fileTotalLength)
}
if stringutils.IsBlank(taskRange) {
return fmt.Sprintf("%d-%d", breakPoint, fileTotalLength-1), nil
}
requestRange, err := rangeutils.ParseRange(taskRange, uint64(fileTotalLength))
if err != nil {
return "", errors.Errorf("parse range failed, taskRange: %s, fileTotalLength: %d: %v", taskRange, fileTotalLength, err)
}
if breakPoint >= int64(requestRange.EndIndex-requestRange.StartIndex+1) {
return "", errors.Errorf("breakPoint %d is larger than or equal with length of download required %s", breakPoint, requestRange)
}
return reader, err
return fmt.Sprintf("%d-%d", requestRange.StartIndex+uint64(breakPoint), requestRange.EndIndex), nil
}
101 changes: 101 additions & 0 deletions cdn/supervisor/cdn/downloader_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Copyright 2020 The Dragonfly Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cdn

import "testing"

func TestGetBreakRange(t *testing.T) {
type args struct {
breakPoint int64
sourceFileLength int64
taskRange string
}
tests := []struct {
name string
args args
want string
wantErr bool
}{
{
name: "breakPoint with range",
args: args{
breakPoint: 5,
sourceFileLength: 200,
taskRange: "3-103",
},
want: "8-103",
wantErr: false,
}, {
name: "range with breakPoint 1",
args: args{
breakPoint: 1,
sourceFileLength: 100,
taskRange: "0-100",
},
want: "1-99",
wantErr: false,
}, {
name: "breakpoint larger than length of download required",
args: args{
breakPoint: 101,
sourceFileLength: 200,
taskRange: "100-300",
},
want: "",
wantErr: true,
}, {
name: "breakpoint is equal with length of download required",
args: args{
breakPoint: 100,
sourceFileLength: 200,
taskRange: "100-300",
},
want: "",
wantErr: true,
}, {
name: "breakpoint is smaller than length of download required",
args: args{
breakPoint: 99,
sourceFileLength: 200,
taskRange: "100-300",
},
want: "199-199",
wantErr: false,
}, {
name: "test2",
args: args{
breakPoint: 102760448,
sourceFileLength: 552562021,
taskRange: "",
},
want: "102760448-552562020",
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := getBreakRange(tt.args.breakPoint, tt.args.taskRange, tt.args.sourceFileLength)
if (err != nil) != tt.wantErr {
t.Errorf("getBreakRange() error = %v, wantErr %v", err, tt.wantErr)
return
}
if got != tt.want {
t.Errorf("getBreakRange() got = %v, want %v", got, tt.want)
}
})
}
}
2 changes: 1 addition & 1 deletion cdn/supervisor/cdn/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func (cm *Manager) TriggerCDN(ctx context.Context, task *types.SeedTask) (seedTa
var downloadSpan trace.Span
ctx, downloadSpan = tracer.Start(ctx, config.SpanDownloadSource)
downloadSpan.End()
body, err := cm.download(ctx, task, detectResult)
body, err := cm.download(ctx, task, detectResult.breakPoint)
// download fail
if err != nil {
downloadSpan.RecordError(err)
Expand Down
Loading

0 comments on commit 935f861

Please sign in to comment.