Skip to content

Commit

Permalink
validate grpc model (#825)
Browse files Browse the repository at this point in the history
* base validate

Signed-off-by: sunwp <[email protected]>
  • Loading branch information
244372610 authored Dec 2, 2021
1 parent 5b4a968 commit b122200
Show file tree
Hide file tree
Showing 55 changed files with 2,715 additions and 808 deletions.
4 changes: 2 additions & 2 deletions cdn/cdnutil/cdn_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
//
// If the fileLength<=0, which means failed to get fileLength
// and then use the DefaultPieceSize.
func ComputePieceSize(length int64) int32 {
func ComputePieceSize(length int64) uint32 {
if length <= 0 || length <= 200*1024*1024 {
return config.DefaultPieceSize
}
Expand All @@ -34,5 +34,5 @@ func ComputePieceSize(length int64) int32 {
if mpSize > config.DefaultPieceSizeLimit {
return config.DefaultPieceSizeLimit
}
return int32(mpSize)
return uint32(mpSize)
}
57 changes: 5 additions & 52 deletions cdn/rpcserver/rpcserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"fmt"
"strings"

"github.com/pkg/errors"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc"
Expand All @@ -36,9 +35,7 @@ import (
"d7y.io/dragonfly/v2/pkg/rpc/base"
"d7y.io/dragonfly/v2/pkg/rpc/cdnsystem"
cdnserver "d7y.io/dragonfly/v2/pkg/rpc/cdnsystem/server"
"d7y.io/dragonfly/v2/pkg/util/digestutils"
"d7y.io/dragonfly/v2/pkg/util/hostutils"
"d7y.io/dragonfly/v2/pkg/util/net/urlutils"
"d7y.io/dragonfly/v2/pkg/util/stringutils"
)

Expand All @@ -61,18 +58,11 @@ func New(cfg *config.Config, taskMgr supervisor.SeedTaskMgr, opts ...grpc.Server
return svr.Server, nil
}

func constructRegisterRequest(req *cdnsystem.SeedRequest) (*types.TaskRegisterRequest, error) {
if err := checkSeedRequestParams(req); err != nil {
return nil, err
}
func constructRegisterRequest(req *cdnsystem.SeedRequest) *types.TaskRegisterRequest {
meta := req.UrlMeta
header := make(map[string]string)
if meta != nil {
if !stringutils.IsBlank(meta.Digest) {
digest := digestutils.Parse(meta.Digest)
if _, ok := digestutils.Algorithms[digest[0]]; !ok {
return nil, errors.Errorf("unsupported digest algorithm")
}
header["digest"] = meta.Digest
}
if !stringutils.IsBlank(meta.Range) {
Expand All @@ -88,18 +78,7 @@ func constructRegisterRequest(req *cdnsystem.SeedRequest) (*types.TaskRegisterRe
Digest: header["digest"],
TaskID: req.TaskId,
Filter: strings.Split(req.UrlMeta.Filter, "&"),
}, nil
}

// checkSeedRequestParams check the params of SeedRequest.
func checkSeedRequestParams(req *cdnsystem.SeedRequest) error {
if !urlutils.IsValidURL(req.Url) {
return errors.Errorf("resource url: %s is invalid", req.Url)
}
if stringutils.IsBlank(req.TaskId) {
return errors.New("taskId is empty")
}
return nil
}

func (css *server) ObtainSeeds(ctx context.Context, req *cdnsystem.SeedRequest, psc chan<- *cdnsystem.PieceSeed) (err error) {
Expand All @@ -117,12 +96,7 @@ func (css *server) ObtainSeeds(ctx context.Context, req *cdnsystem.SeedRequest,
}
logger.Infof("seeds task %s result success: %t", req.TaskId, err == nil)
}()
registerRequest, err := constructRegisterRequest(req)
if err != nil {
err = dferrors.Newf(base.Code_BadRequest, "bad seed request for task(%s): %v", req.TaskId, err)
span.RecordError(err)
return err
}
registerRequest := constructRegisterRequest(req)
// register task
pieceChan, err := css.taskMgr.Register(ctx, registerRequest)
if err != nil {
Expand All @@ -141,7 +115,7 @@ func (css *server) ObtainSeeds(ctx context.Context, req *cdnsystem.SeedRequest,
PeerId: peerID,
HostUuid: idgen.CDNHostID(hostutils.FQDNHostname, int32(css.cfg.ListenPort)),
PieceInfo: &base.PieceInfo{
PieceNum: piece.PieceNum,
PieceNum: int32(piece.PieceNum),
RangeStart: piece.PieceRange.StartIndex,
RangeSize: piece.PieceLen,
PieceMd5: piece.PieceMd5,
Expand Down Expand Up @@ -186,11 +160,6 @@ func (css *server) GetPieceTasks(ctx context.Context, req *base.PieceTaskRequest
}
}()
logger.Infof("get piece tasks: %+v", req)
if err := checkPieceTasksRequestParams(req); err != nil {
err = dferrors.Newf(base.Code_BadRequest, "failed to validate seed request for task(%s): %v", req.TaskId, err)
span.RecordError(err)
return nil, err
}
task, err := css.taskMgr.Get(req.TaskId)
if err != nil {
if cdnerrors.IsDataNotFound(err) {
Expand All @@ -214,11 +183,11 @@ func (css *server) GetPieceTasks(ctx context.Context, req *base.PieceTaskRequest
return nil, err
}
pieceInfos := make([]*base.PieceInfo, 0)
var count int32 = 0
var count uint32 = 0
for _, piece := range pieces {
if piece.PieceNum >= req.StartNum && (count < req.Limit || req.Limit == 0) {
p := &base.PieceInfo{
PieceNum: piece.PieceNum,
PieceNum: int32(piece.PieceNum),
RangeStart: piece.PieceRange.StartIndex,
RangeSize: piece.PieceLen,
PieceMd5: piece.PieceMd5,
Expand All @@ -241,19 +210,3 @@ func (css *server) GetPieceTasks(ctx context.Context, req *base.PieceTaskRequest
span.SetAttributes(config.AttributePiecePacketResult.String(pp.String()))
return pp, nil
}

func checkPieceTasksRequestParams(req *base.PieceTaskRequest) error {
if stringutils.IsBlank(req.TaskId) {
return errors.Wrap(cdnerrors.ErrInvalidValue, "taskId is nil")
}
if stringutils.IsBlank(req.SrcPid) {
return errors.Wrapf(cdnerrors.ErrInvalidValue, "src peer id is nil")
}
if req.StartNum < 0 {
return errors.Wrapf(cdnerrors.ErrInvalidValue, "invalid starNum %d", req.StartNum)
}
if req.Limit < 0 {
return errors.Wrapf(cdnerrors.ErrInvalidValue, "invalid limit %d", req.Limit)
}
return nil
}
67 changes: 0 additions & 67 deletions cdn/rpcserver/rpcserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (

"d7y.io/dragonfly/v2/cdn/config"
"d7y.io/dragonfly/v2/cdn/supervisor"
"d7y.io/dragonfly/v2/cdn/types"
"d7y.io/dragonfly/v2/pkg/rpc/base"
"d7y.io/dragonfly/v2/pkg/rpc/cdnsystem"
)
Expand Down Expand Up @@ -121,69 +120,3 @@ func TestNewCdnSeedServer(t *testing.T) {
})
}
}

func Test_checkPieceTasksRequestParams(t *testing.T) {
type args struct {
req *base.PieceTaskRequest
}
tests := []struct {
name string
args args
wantErr bool
}{
// TODO: Add test cases.
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if err := checkPieceTasksRequestParams(tt.args.req); (err != nil) != tt.wantErr {
t.Errorf("checkPieceTasksRequestParams() error = %v, wantErr %v", err, tt.wantErr)
}
})
}
}

func Test_checkSeedRequestParams(t *testing.T) {
type args struct {
req *cdnsystem.SeedRequest
}
tests := []struct {
name string
args args
wantErr bool
}{
// TODO: Add test cases.
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if err := checkSeedRequestParams(tt.args.req); (err != nil) != tt.wantErr {
t.Errorf("checkSeedRequestParams() error = %v, wantErr %v", err, tt.wantErr)
}
})
}
}

func Test_constructRegisterRequest(t *testing.T) {
type args struct {
req *cdnsystem.SeedRequest
}
tests := []struct {
name string
args args
want *types.TaskRegisterRequest
wantErr bool
}{
// TODO: Add test cases.
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := constructRegisterRequest(tt.args.req)
if (err != nil) != tt.wantErr {
t.Errorf("constructRegisterRequest() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("constructRegisterRequest() got = %v, want %v", got, tt.want)
}
})
}
}
2 changes: 1 addition & 1 deletion cdn/supervisor/cdn/cache_detector.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ func (cd *cacheDetector) parseByReadFile(taskID string, metaData *storage.FileMe
var breakPoint uint64 = 0
pieceMetaRecords := make([]*storage.PieceMetaRecord, 0, len(tempRecords))
for index := range tempRecords {
if int32(index) != tempRecords[index].PieceNum {
if uint32(index) != tempRecords[index].PieceNum {
break
}
// read content
Expand Down
4 changes: 2 additions & 2 deletions cdn/supervisor/cdn/cache_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,8 @@ func (cw *cacheWriter) writerPool(ctx context.Context, wg *sync.WaitGroup, routi
start := uint64(p.pieceNum) * uint64(p.pieceSize)
end := start + uint64(pieceLen) - 1
pieceRecord := &storage.PieceMetaRecord{
PieceNum: p.pieceNum,
PieceLen: int32(pieceLen),
PieceNum: uint32(p.pieceNum),
PieceLen: uint32(pieceLen),
Md5: digestutils.ToHashString(pieceMd5),
Range: &rangeutils.Range{
StartIndex: start,
Expand Down
4 changes: 2 additions & 2 deletions cdn/supervisor/cdn/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (re *reporter) reportPieceMetaRecord(ctx context.Context, taskID string, re
from string) error {
// report cache pieces status
logger.DownloaderLogger.Info(taskID,
zap.Int32("pieceNum", record.PieceNum),
zap.Uint32("pieceNum", record.PieceNum),
zap.String("md5", record.Md5),
zap.String("from", from))
return re.progress.PublishPiece(ctx, taskID, convertPieceMeta2SeedPiece(record))
Expand All @@ -74,7 +74,7 @@ func (re *reporter) reportPieceMetaRecord(ctx context.Context, taskID string, re
func convertPieceMeta2SeedPiece(record *storage.PieceMetaRecord) *types.SeedPiece {
return &types.SeedPiece{
PieceStyle: record.PieceStyle,
PieceNum: record.PieceNum,
PieceNum: uint32(record.PieceNum),
PieceMd5: record.Md5,
PieceRange: record.Range,
OriginRange: record.OriginRange,
Expand Down
8 changes: 4 additions & 4 deletions cdn/supervisor/cdn/storage/storage_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ type FileMetaData struct {

// PieceMetaRecord meta data of piece
type PieceMetaRecord struct {
PieceNum int32 `json:"pieceNum"` // piece Num start from 0
PieceLen int32 `json:"pieceLen"` // 存储到存储介质的真实长度
PieceNum uint32 `json:"pieceNum"` // piece Num start from 0
PieceLen uint32 `json:"pieceLen"` // 存储到存储介质的真实长度
Md5 string `json:"md5"` // for transported piece content,不是origin source 的 md5,是真是存储到存储介质后的md5(为了读取数据文件时方便校验完整性)
Range *rangeutils.Range `json:"range"` // 下载存储到磁盘的range,不是origin source的range.提供给客户端发送下载请求,for transported piece content
OriginRange *rangeutils.Range `json:"originRange"` // piece's real offset in the file
Expand Down Expand Up @@ -141,8 +141,8 @@ func ParsePieceMetaRecord(value string) (record *PieceMetaRecord, err error) {
return nil, errors.Wrapf(err, "invalid pieceStyle: %s", fields[5])
}
return &PieceMetaRecord{
PieceNum: int32(pieceNum),
PieceLen: int32(pieceLen),
PieceNum: uint32(pieceNum),
PieceLen: uint32(pieceLen),
Md5: md5,
Range: pieceRange,
OriginRange: originRange,
Expand Down
2 changes: 1 addition & 1 deletion cdn/supervisor/task/manager_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (tm *Manager) addOrUpdateTask(ctx context.Context, request *types.TaskRegis
// calculate piece size and update the PieceSize and PieceTotal
if task.PieceSize <= 0 {
pieceSize := cdnutil.ComputePieceSize(task.SourceFileLength)
task.PieceSize = pieceSize
task.PieceSize = int32(pieceSize)
}
if err := tm.taskStore.Add(task.TaskID, task); err != nil {
return nil, err
Expand Down
4 changes: 2 additions & 2 deletions cdn/types/seed_piece_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ import "d7y.io/dragonfly/v2/pkg/util/rangeutils"

type SeedPiece struct {
PieceStyle PieceFormat `json:"piece_style"` // 0: PlainUnspecified
PieceNum int32 `json:"piece_num"`
PieceNum uint32 `json:"piece_num"`
PieceMd5 string `json:"piece_md_5"`
PieceRange *rangeutils.Range `json:"piece_range"`
OriginRange *rangeutils.Range `json:"origin_range"`
PieceLen int32 `json:"piece_len"`
PieceLen uint32 `json:"piece_len"`
}

type PieceFormat int8
Expand Down
18 changes: 9 additions & 9 deletions client/daemon/peer/peertask_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ type peerTask struct {
md5 string
contentLength *atomic.Int64
completedLength *atomic.Int64
usedTraffic *atomic.Int64
usedTraffic *atomic.Uint64

//sizeScope base.SizeScope
singlePiece *scheduler.SinglePiece
Expand Down Expand Up @@ -162,11 +162,11 @@ func (pt *peerTask) SetContentLength(i int64) error {
return pt.setContentLengthFunc(i)
}

func (pt *peerTask) AddTraffic(n int64) {
func (pt *peerTask) AddTraffic(n uint64) {
pt.usedTraffic.Add(n)
}

func (pt *peerTask) GetTraffic() int64 {
func (pt *peerTask) GetTraffic() uint64 {
return pt.usedTraffic.Load()
}

Expand Down Expand Up @@ -393,11 +393,11 @@ func (pt *peerTask) pullPiecesFromPeers(cleanUnfinishedFunc func()) {
var (
num int32
ok bool
limit int32
limit uint32
initialized bool
pieceRequestCh chan *DownloadPieceRequest
// keep same size with pt.failedPieceCh for avoiding dead-lock
pieceBufferSize = int32(config.DefaultPieceChanSize)
pieceBufferSize = uint32(config.DefaultPieceChanSize)
)
limit = pieceBufferSize
loop:
Expand Down Expand Up @@ -437,7 +437,7 @@ loop:
&base.PieceTaskRequest{
TaskId: pt.taskID,
SrcPid: pt.peerID,
StartNum: num,
StartNum: uint32(num),
Limit: limit,
})

Expand Down Expand Up @@ -492,7 +492,7 @@ loop:
}
}

func (pt *peerTask) init(piecePacket *base.PiecePacket, pieceBufferSize int32) (chan *DownloadPieceRequest, bool) {
func (pt *peerTask) init(piecePacket *base.PiecePacket, pieceBufferSize uint32) (chan *DownloadPieceRequest, bool) {
pt.contentLength.Store(piecePacket.ContentLength)
if pt.contentLength.Load() > 0 {
pt.span.SetAttributes(config.AttributeTaskContentLength.Int64(pt.contentLength.Load()))
Expand Down Expand Up @@ -796,7 +796,7 @@ retry:
}

if code == base.Code_CDNTaskNotFound && curPeerPacket == pt.peerPacket.Load().(*scheduler.PeerPacket) {
span.AddEvent("retry for CDNTaskNotFound")
span.AddEvent("retry for CdnTaskNotFound")
goto retry
}
return nil, err
Expand Down Expand Up @@ -875,7 +875,7 @@ func (pt *peerTask) getNextPieceNum(cur int32) int32 {
// double check, re-search not success or not requested pieces
for i = int32(0); pt.requestedPieces.IsSet(i); i++ {
}
if pt.totalPiece > 0 && i >= pt.totalPiece {
if pt.totalPiece > 0 && i >= int32(pt.totalPiece) {
return -1
}
}
Expand Down
2 changes: 1 addition & 1 deletion client/daemon/peer/peertask_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ func newFilePeerTask(ctx context.Context,
schedulerClient: schedulerClient,
limiter: limiter,
completedLength: atomic.NewInt64(0),
usedTraffic: atomic.NewInt64(0),
usedTraffic: atomic.NewUint64(0),
SugaredLoggerOnWith: logger.With("peer", request.PeerId, "task", result.TaskId, "component", "filePeerTask"),
},
}
Expand Down
Loading

0 comments on commit b122200

Please sign in to comment.