Skip to content

Commit

Permalink
chore: optimize defer and test (#1010)
Browse files Browse the repository at this point in the history
* chore: optimize defer and test

Signed-off-by: Jim Ma <[email protected]>

* fix: random test failed

Signed-off-by: Jim Ma <[email protected]>
  • Loading branch information
jim3ma authored Jan 20, 2022
1 parent 2889ea3 commit 432a58f
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 29 deletions.
26 changes: 19 additions & 7 deletions client/daemon/peer/peertask_conductor.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ func (ptm *peerTaskManager) newPeerTaskConductor(
return ptc, nil
}

func (pt *peerTaskConductor) run() {
func (pt *peerTaskConductor) startPullAndBroadcastPieces() {
go pt.broker.Start()
go pt.pullPieces()
}
Expand Down Expand Up @@ -869,7 +869,15 @@ func (pt *peerTaskConductor) waitAvailablePeerPacket() (int32, bool) {
}

func (pt *peerTaskConductor) dispatchPieceRequest(pieceRequestCh chan *DownloadPieceRequest, piecePacket *base.PiecePacket) {
pt.Debugf("dispatch piece request, piece count: %d", len(piecePacket.PieceInfos))
pieceCount := len(piecePacket.PieceInfos)
pt.Debugf("dispatch piece request, piece count: %d", pieceCount)
// fix cdn return zero piece info, but with total piece count and content length
if pieceCount == 0 {
finished := pt.isCompleted()
if finished {
pt.Done()
}
}
for _, piece := range piecePacket.PieceInfos {
pt.Infof("get piece %d from %s/%s, digest: %s, start: %d, size: %d",
piece.PieceNum, piecePacket.DstAddr, piecePacket.DstPid, piece.PieceMd5, piece.RangeStart, piece.RangeSize)
Expand Down Expand Up @@ -1132,8 +1140,10 @@ func (pt *peerTaskConductor) Done() {
}

func (pt *peerTaskConductor) done() {
defer pt.span.End()
defer pt.broker.Stop()
defer func() {
pt.broker.Stop()
pt.span.End()
}()
var (
cost = time.Now().Sub(pt.start).Milliseconds()
success = true
Expand Down Expand Up @@ -1213,9 +1223,11 @@ func (pt *peerTaskConductor) Fail() {

func (pt *peerTaskConductor) fail() {
metrics.PeerTaskFailedCount.Add(1)
defer pt.span.End()
defer pt.broker.Stop()
defer close(pt.failCh)
defer func() {
close(pt.failCh)
pt.broker.Stop()
pt.span.End()
}()
pt.peerTaskManager.PeerTaskDone(pt.taskID)
var end = time.Now()
pt.Log().Errorf("stream peer task failed, code: %d, reason: %s", pt.failedCode, pt.failedReason)
Expand Down
2 changes: 1 addition & 1 deletion client/daemon/peer/peertask_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (ptm *peerTaskManager) newFileTask(
request *FileTaskRequest,
limit rate.Limit) (context.Context, *fileTask, error) {
metrics.FileTaskCount.Add(1)
ptc, err := ptm.getOrCreatePeerTaskConductor(ctx, idgen.TaskID(request.Url, request.UrlMeta), &request.PeerTaskRequest, limit)
ptc, err := ptm.getPeerTaskConductor(ctx, idgen.TaskID(request.Url, request.UrlMeta), &request.PeerTaskRequest, limit)
if err != nil {
return nil, nil, err
}
Expand Down
29 changes: 22 additions & 7 deletions client/daemon/peer/peertask_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,18 +158,35 @@ func (ptm *peerTaskManager) findPeerTaskConductor(taskID string) (*peerTaskCondu
return pt.(*peerTaskConductor), true
}

func (ptm *peerTaskManager) getPeerTaskConductor(ctx context.Context,
taskID string,
request *scheduler.PeerTaskRequest,
limit rate.Limit) (*peerTaskConductor, error) {
ptc, created, err := ptm.getOrCreatePeerTaskConductor(ctx, taskID, request, limit)
if err != nil {
return nil, err
}
if created {
ptc.startPullAndBroadcastPieces()
}
return ptc, err
}

// getOrCreatePeerTaskConductor will get or create a peerTaskConductor,
// if created, return (ptc, true, nil), otherwise return (ptc, false, nil)
func (ptm *peerTaskManager) getOrCreatePeerTaskConductor(
ctx context.Context,
taskID string,
request *scheduler.PeerTaskRequest,
limit rate.Limit) (*peerTaskConductor, error) {
limit rate.Limit) (*peerTaskConductor, bool, error) {
if ptc, ok := ptm.findPeerTaskConductor(taskID); ok {
logger.Debugf("peer task found: %s/%s", ptc.taskID, ptc.peerID)
return ptc, nil
return ptc, false, nil
}
// FIXME merge register peer tasks
ptc, err := ptm.newPeerTaskConductor(ctx, request, limit)
if err != nil {
return nil, err
return nil, false, err
}

ptm.conductorLock.Lock()
Expand All @@ -180,13 +197,11 @@ func (ptm *peerTaskManager) getOrCreatePeerTaskConductor(
p.taskID, p.peerID, ptc.taskID, ptc.peerID)
// cancel duplicate peer task
ptc.cancel(base.Code_ClientContextCanceled, reasonContextCanceled)
return p, nil
return p, false, nil
}
ptm.runningPeerTasks.Store(taskID, ptc)
ptm.conductorLock.Unlock()

ptc.run()
return ptc, nil
return ptc, true, nil
}

func (ptm *peerTaskManager) StartFileTask(ctx context.Context, req *FileTaskRequest) (chan *FileTaskProgress, *TinyData, error) {
Expand Down
41 changes: 29 additions & 12 deletions client/daemon/peer/peertask_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"net/http"
"net/http/httptest"
"os"
"runtime"
"sync"
"testing"
"time"
Expand All @@ -43,6 +44,7 @@ import (
mock_daemon "d7y.io/dragonfly/v2/client/daemon/test/mock/daemon"
mock_scheduler "d7y.io/dragonfly/v2/client/daemon/test/mock/scheduler"
"d7y.io/dragonfly/v2/internal/dferrors"
logger "d7y.io/dragonfly/v2/internal/dflog"
"d7y.io/dragonfly/v2/internal/dfnet"
"d7y.io/dragonfly/v2/pkg/idgen"
"d7y.io/dragonfly/v2/pkg/rpc"
Expand Down Expand Up @@ -327,7 +329,7 @@ func TestPeerTaskManager_TaskSuite(t *testing.T) {
taskData: testBytes,
pieceParallelCount: 4,
pieceSize: 1024,
peerID: "peer-0",
peerID: "normal-size-peer",
url: "http://localhost/test/data",
sizeScope: base.SizeScope_NORMAL,
mockPieceDownloader: commonPieceDownloader,
Expand All @@ -338,7 +340,7 @@ func TestPeerTaskManager_TaskSuite(t *testing.T) {
taskData: testBytes,
pieceParallelCount: 4,
pieceSize: 16384,
peerID: "peer-0",
peerID: "small-size-peer",
url: "http://localhost/test/data",
sizeScope: base.SizeScope_SMALL,
mockPieceDownloader: commonPieceDownloader,
Expand All @@ -349,7 +351,7 @@ func TestPeerTaskManager_TaskSuite(t *testing.T) {
taskData: testBytes[:64],
pieceParallelCount: 4,
pieceSize: 1024,
peerID: "peer-0",
peerID: "tiny-size-peer",
url: "http://localhost/test/data",
sizeScope: base.SizeScope_TINY,
mockPieceDownloader: nil,
Expand All @@ -360,7 +362,7 @@ func TestPeerTaskManager_TaskSuite(t *testing.T) {
taskData: testBytes,
pieceParallelCount: 4,
pieceSize: 1024,
peerID: "peer-0",
peerID: "normal-size-peer-back-source",
backSource: true,
url: "http://localhost/test/data",
sizeScope: base.SizeScope_NORMAL,
Expand All @@ -383,7 +385,7 @@ func TestPeerTaskManager_TaskSuite(t *testing.T) {
taskData: testBytes,
pieceParallelCount: 4,
pieceSize: 1024,
peerID: "peer-0",
peerID: "normal-size-peer-back-source-no-length",
backSource: true,
url: "http://localhost/test/data",
sizeScope: base.SizeScope_NORMAL,
Expand All @@ -402,11 +404,11 @@ func TestPeerTaskManager_TaskSuite(t *testing.T) {
},
},
{
name: "normal size scope - back source - content length - aligning",
name: "normal size scope - back source - no content length - aligning",
taskData: testBytes[:8192],
pieceParallelCount: 4,
pieceSize: 1024,
peerID: "peer-0",
peerID: "normal-size-peer-back-source-aligning-no-length",
backSource: true,
url: "http://localhost/test/data",
sizeScope: base.SizeScope_NORMAL,
Expand All @@ -429,7 +431,7 @@ func TestPeerTaskManager_TaskSuite(t *testing.T) {
taskData: testBytes,
pieceParallelCount: 4,
pieceSize: 1024,
peerID: "peer-0",
peerID: "normal-size-peer-schedule-timeout",
peerPacketDelay: []time.Duration{time.Second},
scheduleTimeout: time.Nanosecond,
urlGenerator: func(ts *testSpec) string {
Expand All @@ -456,6 +458,7 @@ func TestPeerTaskManager_TaskSuite(t *testing.T) {
require := testifyrequire.New(t)
for _, typ := range taskTypes {
// dup a new test case with the task type
logger.Infof("-------------------- test %s - type %d, started --------------------", _tc.name, typ)
tc := _tc
tc.taskType = typ
func() {
Expand Down Expand Up @@ -514,6 +517,7 @@ func TestPeerTaskManager_TaskSuite(t *testing.T) {

tc.run(assert, require, mm, urlMeta)
}()
logger.Infof("-------------------- test %s - type %d, finished --------------------", _tc.name, typ)
}
})
}
Expand Down Expand Up @@ -599,8 +603,9 @@ func (ts *testSpec) runConductorTest(assert *testifyassert.Assertions, require *
PeerHost: &scheduler.PeerHost{},
}

ptc, err := ptm.getOrCreatePeerTaskConductor(context.Background(), taskID, request, rate.Limit(pieceSize*4))
ptc, created, err := ptm.getOrCreatePeerTaskConductor(context.Background(), taskID, request, rate.Limit(pieceSize*4))
assert.Nil(err, "load first peerTaskConductor")
assert.True(created, "should create a new peerTaskConductor")

switch ts.sizeScope {
case base.SizeScope_TINY:
Expand All @@ -609,7 +614,7 @@ func (ts *testSpec) runConductorTest(assert *testifyassert.Assertions, require *
require.NotNil(ptc.singlePiece)
}

var ptcCount = 10
var ptcCount = 100
var wg = &sync.WaitGroup{}
wg.Add(ptcCount + 1)

Expand Down Expand Up @@ -643,12 +648,20 @@ func (ts *testSpec) runConductorTest(assert *testifyassert.Assertions, require *
}

for i := 0; i < ptcCount; i++ {
p, err := ptm.getOrCreatePeerTaskConductor(context.Background(), taskID, request, rate.Limit(pieceSize*3))
request := &scheduler.PeerTaskRequest{
Url: ts.url,
UrlMeta: urlMeta,
PeerId: fmt.Sprintf("should-not-use-peer-%d", i),
PeerHost: &scheduler.PeerHost{},
}
p, created, err := ptm.getOrCreatePeerTaskConductor(context.Background(), taskID, request, rate.Limit(pieceSize*3))
assert.Nil(err, fmt.Sprintf("load peerTaskConductor %d", i))
assert.Equal(ptc.peerID, p.GetPeerID(), fmt.Sprintf("ptc %d should be same with ptc", i))
assert.False(created, "should not create a new peerTaskConductor")
go syncFunc(i, p)
}

ptc.startPullAndBroadcastPieces()
wg.Wait()

for i, r := range result {
Expand All @@ -663,7 +676,10 @@ func (ts *testSpec) runConductorTest(assert *testifyassert.Assertions, require *
case <-ptc.successCh:
success = true
case <-ptc.failCh:
case <-time.After(10 * time.Minute):
case <-time.After(5 * time.Minute):
buf := make([]byte, 16384)
buf = buf[:runtime.Stack(buf, true)]
fmt.Printf("=== BEGIN goroutine stack dump ===\n%s\n=== END goroutine stack dump ===", buf)
}
assert.True(success, "task should success")

Expand All @@ -675,6 +691,7 @@ func (ts *testSpec) runConductorTest(assert *testifyassert.Assertions, require *
if noRunningTask {
break
}
noRunningTask = true
time.Sleep(100 * time.Millisecond)
}
assert.True(noRunningTask, "no running tasks")
Expand Down
2 changes: 1 addition & 1 deletion client/daemon/peer/peertask_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func (ptm *peerTaskManager) newStreamTask(
if ptm.perPeerRateLimit > 0 {
limit = ptm.perPeerRateLimit
}
ptc, err := ptm.getOrCreatePeerTaskConductor(ctx, idgen.TaskID(request.Url, request.UrlMeta), request, limit)
ptc, err := ptm.getPeerTaskConductor(ctx, idgen.TaskID(request.Url, request.UrlMeta), request, limit)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion client/daemon/storage/storage_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ var (
ErrTaskNotFound = errors.New("task not found")
ErrPieceNotFound = errors.New("piece not found")
ErrPieceCountNotSet = errors.New("total piece count not set")
ErrDigestNotSet = errors.New("piece digest not set")
ErrDigestNotSet = errors.New("digest not set")
ErrInvalidDigest = errors.New("invalid digest")
)

Expand Down

0 comments on commit 432a58f

Please sign in to comment.