diff --git a/client/config/constants_otel.go b/client/config/constants_otel.go index b5803f5488f..876b728b39f 100644 --- a/client/config/constants_otel.go +++ b/client/config/constants_otel.go @@ -44,9 +44,11 @@ const ( AttributeGetPieceCount = attribute.Key("d7y.peer.piece.count") AttributeGetPieceRetry = attribute.Key("d7y.peer.piece.retry") AttributeWritePieceSuccess = attribute.Key("d7y.peer.piece.write.success") + AttributeSeedTaskSuccess = attribute.Key("d7y.seed.task.success") SpanFileTask = "file-task" SpanStreamTask = "stream-task" + SpanSeedTask = "seed-task" SpanPeerTask = "peer-task" SpanReusePeerTask = "reuse-peer-task" SpanRegisterTask = "register" diff --git a/client/daemon/metrics/metrics.go b/client/daemon/metrics/metrics.go index 35b1e190da1..1c0b180917b 100644 --- a/client/daemon/metrics/metrics.go +++ b/client/daemon/metrics/metrics.go @@ -109,6 +109,13 @@ var ( Help: "Counter of the total stream tasks.", }) + SeedTaskCount = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: constants.MetricsNamespace, + Subsystem: constants.DfdaemonMetricsName, + Name: "seed_task_total", + Help: "Counter of the total seed tasks.", + }) + PeerTaskCacheHitCount = promauto.NewCounter(prometheus.CounterOpts{ Namespace: constants.MetricsNamespace, Subsystem: constants.DfdaemonMetricsName, diff --git a/client/daemon/peer/peertask_conductor.go b/client/daemon/peer/peertask_conductor.go index 081c0ed81f8..71ca188e6e7 100644 --- a/client/daemon/peer/peertask_conductor.go +++ b/client/daemon/peer/peertask_conductor.go @@ -77,6 +77,7 @@ type peerTaskConductor struct { // needBackSource indicates downloading resource from instead of other peers needBackSource *atomic.Bool + seed bool // pieceManager will be used for downloading piece pieceManager PieceManager @@ -165,7 +166,8 @@ func (ptm *peerTaskManager) newPeerTaskConductor( request *scheduler.PeerTaskRequest, limit rate.Limit, parent *peerTaskConductor, - rg *clientutil.Range) *peerTaskConductor { + rg *clientutil.Range, + seed bool) *peerTaskConductor { // use a new context with span info ctx = trace.ContextWithSpan(context.Background(), trace.SpanFromContext(ctx)) ctx, span := tracer.Start(ctx, config.SpanPeerTask, trace.WithSpanKind(trace.SpanKindClient)) @@ -226,6 +228,7 @@ func (ptm *peerTaskManager) newPeerTaskConductor( completedLength: atomic.NewInt64(0), usedTraffic: atomic.NewUint64(0), SugaredLoggerOnWith: log, + seed: seed, parent: parent, rg: rg, @@ -329,9 +332,17 @@ func (pt *peerTaskConductor) register() error { } func (pt *peerTaskConductor) start() error { - // register to scheduler - if err := pt.register(); err != nil { - return err + // when is seed task, setup back source + if pt.seed { + pt.peerPacketStream = &dummyPeerPacketStream{} + pt.schedulerClient = &dummySchedulerClient{} + pt.sizeScope = base.SizeScope_NORMAL + pt.needBackSource = atomic.NewBool(true) + } else { + // register to scheduler + if err := pt.register(); err != nil { + return err + } } go pt.broker.Start() diff --git a/client/daemon/peer/peertask_file.go b/client/daemon/peer/peertask_file.go index f7c86320a2e..1a63a24b97e 100644 --- a/client/daemon/peer/peertask_file.go +++ b/client/daemon/peer/peertask_file.go @@ -18,6 +18,7 @@ package peer import ( "context" + "fmt" "go.opentelemetry.io/otel/trace" "golang.org/x/time/rate" @@ -95,7 +96,7 @@ func (ptm *peerTaskManager) newFileTask( } taskID := idgen.TaskID(request.Url, request.UrlMeta) - ptc, err := ptm.getPeerTaskConductor(ctx, taskID, &request.PeerTaskRequest, limit, parent, request.Range, request.Output) + ptc, err := ptm.getPeerTaskConductor(ctx, taskID, &request.PeerTaskRequest, limit, parent, request.Range, request.Output, false) if err != nil { return nil, nil, err } @@ -125,12 +126,14 @@ func (f *fileTask) Start(ctx context.Context) (chan *FileTaskProgress, error) { } func (f *fileTask) syncProgress() { + defer f.span.End() for { select { case <-f.peerTaskConductor.successCh: f.storeToOutput() return case <-f.peerTaskConductor.failCh: + f.span.RecordError(fmt.Errorf(f.peerTaskConductor.failedReason)) f.sendFailProgress(f.peerTaskConductor.failedCode, f.peerTaskConductor.failedReason) return case <-f.ctx.Done(): diff --git a/client/daemon/peer/peertask_manager.go b/client/daemon/peer/peertask_manager.go index bc9f99f1747..16020b6237d 100644 --- a/client/daemon/peer/peertask_manager.go +++ b/client/daemon/peer/peertask_manager.go @@ -47,15 +47,17 @@ type TaskManager interface { StartFileTask(ctx context.Context, req *FileTaskRequest) ( progress chan *FileTaskProgress, tiny *TinyData, err error) // StartStreamTask starts a peer task with stream io - // tiny stands task file is tiny and task is done StartStreamTask(ctx context.Context, req *StreamTaskRequest) ( readCloser io.ReadCloser, attribute map[string]string, err error) + // StartSeedTask starts a seed peer task + StartSeedTask(ctx context.Context, req *SeedTaskRequest) ( + seedTaskResult *SeedTaskResponse, err error) - Subscribe(request *base.PieceTaskRequest) (*SubscribeResult, bool) + Subscribe(request *base.PieceTaskRequest) (*SubscribeResponse, bool) IsPeerTaskRunning(id string) bool - // Check if the given task exists in P2P network + // StatTask checks whether the given task exists in P2P network StatTask(ctx context.Context, taskID string) (*scheduler.Task, error) // AnnouncePeerTask announces peer task info to P2P network @@ -179,8 +181,9 @@ func (ptm *peerTaskManager) getPeerTaskConductor(ctx context.Context, limit rate.Limit, parent *peerTaskConductor, rg *clientutil.Range, - desiredLocation string) (*peerTaskConductor, error) { - ptc, created, err := ptm.getOrCreatePeerTaskConductor(ctx, taskID, request, limit, parent, rg, desiredLocation) + desiredLocation string, + seed bool) (*peerTaskConductor, error) { + ptc, created, err := ptm.getOrCreatePeerTaskConductor(ctx, taskID, request, limit, parent, rg, desiredLocation, seed) if err != nil { return nil, err } @@ -201,12 +204,13 @@ func (ptm *peerTaskManager) getOrCreatePeerTaskConductor( limit rate.Limit, parent *peerTaskConductor, rg *clientutil.Range, - desiredLocation string) (*peerTaskConductor, bool, error) { + desiredLocation string, + seed bool) (*peerTaskConductor, bool, error) { if ptc, ok := ptm.findPeerTaskConductor(taskID); ok { logger.Debugf("peer task found: %s/%s", ptc.taskID, ptc.peerID) return ptc, false, nil } - ptc := ptm.newPeerTaskConductor(ctx, request, limit, parent, rg) + ptc := ptm.newPeerTaskConductor(ctx, request, limit, parent, rg, seed) ptm.conductorLock.Lock() // double check @@ -256,7 +260,7 @@ func (ptm *peerTaskManager) prefetchParentTask(request *scheduler.PeerTaskReques } logger.Infof("prefetch peer task %s/%s", taskID, req.PeerId) - prefetch, err := ptm.getPeerTaskConductor(context.Background(), taskID, req, limit, nil, nil, desiredLocation) + prefetch, err := ptm.getPeerTaskConductor(context.Background(), taskID, req, limit, nil, nil, desiredLocation, false) if err != nil { logger.Errorf("prefetch peer task %s/%s error: %s", prefetch.taskID, prefetch.peerID, err) return nil @@ -325,20 +329,38 @@ func (ptm *peerTaskManager) StartStreamTask(ctx context.Context, req *StreamTask return readCloser, attribute, err } -type SubscribeResult struct { +func (ptm *peerTaskManager) StartSeedTask(ctx context.Context, req *SeedTaskRequest) (response *SeedTaskResponse, err error) { + response, ok := ptm.tryReuseSeedPeerTask(ctx, req) + if ok { + metrics.PeerTaskCacheHitCount.Add(1) + return response, nil + } + + var limit = rate.Inf + if ptm.perPeerRateLimit > 0 { + limit = ptm.perPeerRateLimit + } + if req.Limit > 0 { + limit = rate.Limit(req.Limit) + } + + return ptm.newSeedTask(ctx, req, limit) +} + +type SubscribeResponse struct { Storage storage.TaskStorageDriver PieceInfoChannel chan *PieceInfo Success chan struct{} Fail chan struct{} } -func (ptm *peerTaskManager) Subscribe(request *base.PieceTaskRequest) (*SubscribeResult, bool) { +func (ptm *peerTaskManager) Subscribe(request *base.PieceTaskRequest) (*SubscribeResponse, bool) { ptc, ok := ptm.findPeerTaskConductor(request.TaskId) if !ok { return nil, false } - result := &SubscribeResult{ + result := &SubscribeResponse{ Storage: ptc.storage, PieceInfoChannel: ptc.broker.Subscribe(), Success: ptc.successCh, diff --git a/client/daemon/peer/peertask_manager_mock_test.go b/client/daemon/peer/peertask_manager_mock_test.go index c311b843257..7d255db6a86 100644 --- a/client/daemon/peer/peertask_manager_mock_test.go +++ b/client/daemon/peer/peertask_manager_mock_test.go @@ -97,6 +97,21 @@ func (mr *MockTaskManagerMockRecorder) StartFileTask(ctx, req interface{}) *gomo return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StartFileTask", reflect.TypeOf((*MockTaskManager)(nil).StartFileTask), ctx, req) } +// StartSeedTask mocks base method. +func (m *MockTaskManager) StartSeedTask(ctx context.Context, req *SeedTaskRequest) (*SeedTaskResponse, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "StartSeedTask", ctx, req) + ret0, _ := ret[0].(*SeedTaskResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// StartSeedTask indicates an expected call of StartSeedTask. +func (mr *MockTaskManagerMockRecorder) StartSeedTask(ctx, req interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StartSeedTask", reflect.TypeOf((*MockTaskManager)(nil).StartSeedTask), ctx, req) +} + // StartStreamTask mocks base method. func (m *MockTaskManager) StartStreamTask(ctx context.Context, req *StreamTaskRequest) (io.ReadCloser, map[string]string, error) { m.ctrl.T.Helper() @@ -143,10 +158,10 @@ func (mr *MockTaskManagerMockRecorder) Stop(ctx interface{}) *gomock.Call { } // Subscribe mocks base method. -func (m *MockTaskManager) Subscribe(request *base.PieceTaskRequest) (*SubscribeResult, bool) { +func (m *MockTaskManager) Subscribe(request *base.PieceTaskRequest) (*SubscribeResponse, bool) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Subscribe", request) - ret0, _ := ret[0].(*SubscribeResult) + ret0, _ := ret[0].(*SubscribeResponse) ret1, _ := ret[1].(bool) return ret0, ret1 } diff --git a/client/daemon/peer/peertask_manager_test.go b/client/daemon/peer/peertask_manager_test.go index 142f416274c..66b7bfc0df8 100644 --- a/client/daemon/peer/peertask_manager_test.go +++ b/client/daemon/peer/peertask_manager_test.go @@ -323,9 +323,11 @@ const ( taskTypeFile = iota taskTypeStream taskTypeConductor + taskTypeSeed ) type testSpec struct { + runTaskTypes []int taskType int name string taskData []byte @@ -371,8 +373,8 @@ func TestPeerTaskManager_TaskSuite(t *testing.T) { return downloader } - taskTypes := []int{taskTypeConductor, taskTypeFile, taskTypeStream} - taskTypeNames := []string{"conductor", "file", "stream"} + taskTypes := []int{taskTypeConductor, taskTypeFile, taskTypeStream} // seed task need back source client + taskTypeNames := []string{"conductor", "file", "stream", "seed"} testCases := []testSpec{ { @@ -410,6 +412,7 @@ func TestPeerTaskManager_TaskSuite(t *testing.T) { }, { name: "normal size scope - back source - content length", + runTaskTypes: []int{taskTypeConductor, taskTypeFile, taskTypeStream, taskTypeSeed}, taskData: testBytes, pieceParallelCount: 4, pieceSize: 1024, @@ -432,18 +435,19 @@ func TestPeerTaskManager_TaskSuite(t *testing.T) { }, }, { - name: "normal size scope - range - back source - content length", - taskData: testBytes[0:4096], + name: "normal size scope - range - back source - content length", + runTaskTypes: []int{taskTypeConductor, taskTypeFile, taskTypeStream, taskTypeSeed}, + taskData: testBytes[0:4096], + pieceParallelCount: 4, + pieceSize: 1024, + peerID: "normal-size-peer-range-back-source", + backSource: true, + url: "http://localhost/test/data", + sizeScope: base.SizeScope_NORMAL, httpRange: &clientutil.Range{ Start: 0, Length: 4096, }, - pieceParallelCount: 4, - pieceSize: 1024, - peerID: "normal-size-peer-range-back-source", - backSource: true, - url: "http://localhost/test/data", - sizeScope: base.SizeScope_NORMAL, mockPieceDownloader: nil, mockHTTPSourceClient: func(t *testing.T, ctrl *gomock.Controller, rg *clientutil.Range, taskData []byte, url string) source.ResourceClient { sourceClient := sourceMock.NewMockResourceClient(ctrl) @@ -474,6 +478,7 @@ func TestPeerTaskManager_TaskSuite(t *testing.T) { }, { name: "normal size scope - back source - no content length", + runTaskTypes: []int{taskTypeConductor, taskTypeFile, taskTypeStream, taskTypeSeed}, taskData: testBytes, pieceParallelCount: 4, pieceSize: 1024, @@ -497,6 +502,7 @@ func TestPeerTaskManager_TaskSuite(t *testing.T) { }, { name: "normal size scope - back source - no content length - aligning", + runTaskTypes: []int{taskTypeConductor, taskTypeFile, taskTypeStream, taskTypeSeed}, taskData: testBytes[:8192], pieceParallelCount: 4, pieceSize: 1024, @@ -546,10 +552,14 @@ func TestPeerTaskManager_TaskSuite(t *testing.T) { for _, _tc := range testCases { t.Run(_tc.name, func(t *testing.T) { + var types = _tc.runTaskTypes + if _tc.runTaskTypes == nil { + types = taskTypes + } assert := testifyassert.New(t) require := testifyrequire.New(t) for _, legacy := range []bool{true, false} { - for _, typ := range taskTypes { + for _, typ := range types { // dup a new test case with the task type logger.Infof("-------------------- test %s - type %s, legacy feature: %v started --------------------", _tc.name, taskTypeNames[typ], legacy) @@ -632,6 +642,8 @@ func (ts *testSpec) run(assert *testifyassert.Assertions, require *testifyrequir ts.runStreamTaskTest(assert, require, mm, urlMeta) case taskTypeConductor: ts.runConductorTest(assert, require, mm, urlMeta) + case taskTypeSeed: + ts.runSeedTaskTest(assert, require, mm, urlMeta) default: panic("unknown test type") } @@ -686,6 +698,52 @@ func (ts *testSpec) runStreamTaskTest(_ *testifyassert.Assertions, require *test require.Equal(ts.taskData, outputBytes, "output and desired output must match") } +func (ts *testSpec) runSeedTaskTest(_ *testifyassert.Assertions, require *testifyrequire.Assertions, mm *mockManager, urlMeta *base.UrlMeta) { + r, err := mm.peerTaskManager.StartSeedTask( + context.Background(), + &SeedTaskRequest{ + PeerTaskRequest: scheduler.PeerTaskRequest{ + Url: ts.url, + UrlMeta: urlMeta, + PeerId: ts.peerID, + PeerHost: &scheduler.PeerHost{}, + HostLoad: nil, + IsMigrating: false, + }, + Limit: 0, + Callsystem: "", + Range: nil, + }) + + require.Nil(err, "start seed peer task") + + var success bool + +loop: + for { + select { + case <-r.Context.Done(): + break loop + case <-r.Success: + success = true + break loop + case <-r.Fail: + break loop + case p := <-r.PieceInfoChannel: + if p.Finished { + success = true + break loop + } + case <-time.After(5 * time.Minute): + buf := make([]byte, 16384) + buf = buf[:runtime.Stack(buf, true)] + fmt.Printf("=== BEGIN goroutine stack dump ===\n%s\n=== END goroutine stack dump ===", buf) + } + } + + require.True(success, "seed task should success") +} + func (ts *testSpec) runConductorTest(assert *testifyassert.Assertions, require *testifyrequire.Assertions, mm *mockManager, urlMeta *base.UrlMeta) { var ( ptm = mm.peerTaskManager @@ -705,7 +763,7 @@ func (ts *testSpec) runConductorTest(assert *testifyassert.Assertions, require * } ptc, created, err := ptm.getOrCreatePeerTaskConductor( - context.Background(), taskID, peerTaskRequest, rate.Limit(pieceSize*4), nil, nil, "") + context.Background(), taskID, peerTaskRequest, rate.Limit(pieceSize*4), nil, nil, "", false) assert.Nil(err, "load first peerTaskConductor") assert.True(created, "should create a new peerTaskConductor") @@ -750,7 +808,7 @@ func (ts *testSpec) runConductorTest(assert *testifyassert.Assertions, require * PeerHost: &scheduler.PeerHost{}, } p, created, err := ptm.getOrCreatePeerTaskConductor( - context.Background(), taskID, request, rate.Limit(pieceSize*3), nil, nil, "") + context.Background(), taskID, request, rate.Limit(pieceSize*3), nil, nil, "", false) assert.Nil(err, fmt.Sprintf("load peerTaskConductor %d", i)) assert.Equal(ptc.peerID, p.GetPeerID(), fmt.Sprintf("ptc %d should be same with ptc", i)) assert.False(created, "should not create a new peerTaskConductor") diff --git a/client/daemon/peer/peertask_reuse.go b/client/daemon/peer/peertask_reuse.go index 08da6f11ba2..a74242d39c4 100644 --- a/client/daemon/peer/peertask_reuse.go +++ b/client/daemon/peer/peertask_reuse.go @@ -263,3 +263,70 @@ func (ptm *peerTaskManager) tryReuseStreamPeerTask(ctx context.Context, span.SetAttributes(config.AttributePeerTaskSuccess.Bool(true)) return rc, attr, true } + +func (ptm *peerTaskManager) tryReuseSeedPeerTask(ctx context.Context, + request *SeedTaskRequest) (*SeedTaskResponse, bool) { + taskID := idgen.TaskID(request.Url, request.UrlMeta) + var ( + reuse *storage.ReusePeerTask + reuseRange *clientutil.Range // the range of parent peer task data to read + log *logger.SugaredLoggerOnWith + ) + + if ptm.enabledPrefetch(request.Range) { + reuse = ptm.storageManager.FindCompletedSubTask(taskID) + } else { + reuse = ptm.storageManager.FindCompletedTask(taskID) + } + + if reuse == nil { + if request.Range == nil { + return nil, false + } + // TODO, mock SeedTaskResponse for sub task + // for ranged request, check the parent task + //reuseRange = request.Range + //taskID = idgen.ParentTaskID(request.Url, request.UrlMeta) + //reuse = ptm.storageManager.FindPartialCompletedTask(taskID, reuseRange) + //if reuse == nil { + // return nil, false + //} + } + + if reuseRange == nil { + log = logger.With("peer", request.PeerId, "task", taskID, "component", "reuseSeedPeerTask") + log.Infof("reuse from peer task: %s, total size: %d", reuse.PeerID, reuse.ContentLength) + } else { + log = logger.With("peer", request.PeerId, "task", taskID, "range", request.UrlMeta.Range, + "component", "reuseRangeSeedPeerTask") + log.Infof("reuse partial data from peer task: %s, total size: %d, range: %s", + reuse.PeerID, reuse.ContentLength, request.UrlMeta.Range) + } + + ctx, span := tracer.Start(ctx, config.SpanReusePeerTask, trace.WithSpanKind(trace.SpanKindClient)) + span.SetAttributes(config.AttributePeerHost.String(ptm.host.Uuid)) + span.SetAttributes(semconv.NetHostIPKey.String(ptm.host.Ip)) + span.SetAttributes(config.AttributeTaskID.String(taskID)) + span.SetAttributes(config.AttributePeerID.String(request.PeerId)) + span.SetAttributes(config.AttributeReusePeerID.String(reuse.PeerID)) + span.SetAttributes(semconv.HTTPURLKey.String(request.Url)) + if reuseRange != nil { + span.SetAttributes(config.AttributeReuseRange.String(request.UrlMeta.Range)) + } + + successCh := make(chan struct{}, 1) + successCh <- struct{}{} + + span.SetAttributes(config.AttributePeerTaskSuccess.Bool(true)) + return &SeedTaskResponse{ + Context: ctx, + Span: span, + TaskID: taskID, + SubscribeResponse: SubscribeResponse{ + Storage: nil, + PieceInfoChannel: nil, + Success: successCh, + Fail: nil, + }, + }, true +} diff --git a/client/daemon/peer/peertask_seed.go b/client/daemon/peer/peertask_seed.go new file mode 100644 index 00000000000..a095c730093 --- /dev/null +++ b/client/daemon/peer/peertask_seed.go @@ -0,0 +1,85 @@ +/* + * Copyright 2022 The Dragonfly Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package peer + +import ( + "context" + + "go.opentelemetry.io/otel/trace" + "golang.org/x/time/rate" + + "d7y.io/dragonfly/v2/client/clientutil" + "d7y.io/dragonfly/v2/client/config" + "d7y.io/dragonfly/v2/client/daemon/metrics" + "d7y.io/dragonfly/v2/pkg/idgen" + "d7y.io/dragonfly/v2/pkg/rpc/scheduler" +) + +type SeedTaskRequest struct { + scheduler.PeerTaskRequest + Limit float64 + Callsystem string + Range *clientutil.Range +} + +type SeedTaskResponse struct { + SubscribeResponse + Context context.Context + Span trace.Span + TaskID string +} + +// SeedTask represents a seed peer task +type SeedTask interface { + Start(ctx context.Context) (chan *SeedTaskProgress, error) +} + +type SeedTaskProgress struct { + State *ProgressState + TaskID string + PeerID string + ContentLength int64 + CompletedLength int64 + PeerTaskDone bool +} + +func (ptm *peerTaskManager) newSeedTask( + ctx context.Context, + request *SeedTaskRequest, + limit rate.Limit) (*SeedTaskResponse, error) { + metrics.SeedTaskCount.Add(1) + + taskID := idgen.TaskID(request.Url, request.UrlMeta) + ptc, err := ptm.getPeerTaskConductor(ctx, taskID, &request.PeerTaskRequest, limit, nil, request.Range, "", true) + if err != nil { + return nil, err + } + + ctx, span := tracer.Start(ctx, config.SpanSeedTask, trace.WithSpanKind(trace.SpanKindClient)) + resp := &SeedTaskResponse{ + Context: ctx, + Span: span, + TaskID: taskID, + SubscribeResponse: SubscribeResponse{ + Storage: ptc.storage, + PieceInfoChannel: ptc.broker.Subscribe(), + Success: ptc.successCh, + Fail: ptc.failCh, + }, + } + return resp, nil +} diff --git a/client/daemon/peer/peertask_stream.go b/client/daemon/peer/peertask_stream.go index 0326af61ac8..d78112cfcbb 100644 --- a/client/daemon/peer/peertask_stream.go +++ b/client/daemon/peer/peertask_stream.go @@ -79,7 +79,7 @@ func (ptm *peerTaskManager) newStreamTask( } taskID := idgen.TaskID(request.Url, request.UrlMeta) - ptc, err := ptm.getPeerTaskConductor(ctx, taskID, request, limit, parent, rg, "") + ptc, err := ptm.getPeerTaskConductor(ctx, taskID, request, limit, parent, rg, "", false) if err != nil { return nil, err } diff --git a/client/daemon/proxy/proxy.go b/client/daemon/proxy/proxy.go index 885d3e5b746..71a84c4014d 100644 --- a/client/daemon/proxy/proxy.go +++ b/client/daemon/proxy/proxy.go @@ -363,10 +363,10 @@ func (proxy *Proxy) handleHTTP(span trace.Span, w http.ResponseWriter, req *http span.SetAttributes(semconv.HTTPStatusCodeKey.Int(resp.StatusCode)) if n, err := io.Copy(w, resp.Body); err != nil && err != io.EOF { if peerID := resp.Header.Get(config.HeaderDragonflyPeer); peerID != "" { - logger.Errorf("failed to write http body: %v, peer: %s, task: %s", - err, peerID, resp.Header.Get(config.HeaderDragonflyTask)) + logger.Errorf("failed to write http body: %v, peer: %s, task: %s, written bytes: %d", + err, peerID, resp.Header.Get(config.HeaderDragonflyTask), n) } else { - logger.Errorf("failed to write http body: %v", err) + logger.Errorf("failed to write http body: %v, written bytes: %d", err, n) } span.RecordError(err) } else { diff --git a/client/daemon/rpcserver/rpcserver.go b/client/daemon/rpcserver/rpcserver.go index 089426f07de..7a1579e057f 100644 --- a/client/daemon/rpcserver/rpcserver.go +++ b/client/daemon/rpcserver/rpcserver.go @@ -39,6 +39,7 @@ import ( logger "d7y.io/dragonfly/v2/internal/dflog" "d7y.io/dragonfly/v2/pkg/idgen" "d7y.io/dragonfly/v2/pkg/rpc/base" + "d7y.io/dragonfly/v2/pkg/rpc/cdnsystem" dfdaemongrpc "d7y.io/dragonfly/v2/pkg/rpc/dfdaemon" dfdaemonserver "d7y.io/dragonfly/v2/pkg/rpc/dfdaemon/server" "d7y.io/dragonfly/v2/pkg/rpc/scheduler" @@ -66,19 +67,25 @@ type server struct { } func New(peerHost *scheduler.PeerHost, peerTaskManager peer.TaskManager, storageManager storage.Manager, downloadOpts []grpc.ServerOption, peerOpts []grpc.ServerOption) (Server, error) { - svr := &server{ + s := &server{ KeepAlive: clientutil.NewKeepAlive("rpc server"), peerHost: peerHost, peerTaskManager: peerTaskManager, storageManager: storageManager, } - svr.downloadServer = dfdaemonserver.New(svr, downloadOpts...) - healthpb.RegisterHealthServer(svr.downloadServer, health.NewServer()) + sd := &seeder{ + server: s, + } + + s.downloadServer = dfdaemonserver.New(s, downloadOpts...) + healthpb.RegisterHealthServer(s.downloadServer, health.NewServer()) + + s.peerServer = dfdaemonserver.New(s, peerOpts...) + healthpb.RegisterHealthServer(s.peerServer, health.NewServer()) - svr.peerServer = dfdaemonserver.New(svr, peerOpts...) - healthpb.RegisterHealthServer(svr.peerServer, health.NewServer()) - return svr, nil + cdnsystem.RegisterSeederServer(s.peerServer, sd) + return s, nil } func (s *server) ServeDownload(listener net.Listener) error { @@ -210,7 +217,7 @@ func (s *server) SyncPieceTasks(sync dfdaemongrpc.Daemon_SyncPieceTasksServer) e } var sub = &subscriber{ - SubscribeResult: result, + SubscribeResponse: result, sync: sync, request: request, skipPieceCount: skipPieceCount, @@ -232,12 +239,12 @@ func (s *server) CheckHealth(context.Context) error { func (s *server) Download(ctx context.Context, req *dfdaemongrpc.DownRequest, results chan<- *dfdaemongrpc.DownResult) error { + s.Keep() return s.doDownload(ctx, req, results, "") } func (s *server) doDownload(ctx context.Context, req *dfdaemongrpc.DownRequest, results chan<- *dfdaemongrpc.DownResult, peerID string) error { - s.Keep() if req.UrlMeta == nil { req.UrlMeta = &base.UrlMeta{} } @@ -338,6 +345,7 @@ func (s *server) doDownload(ctx context.Context, req *dfdaemongrpc.DownRequest, } func (s *server) StatTask(ctx context.Context, req *dfdaemongrpc.StatTaskRequest) error { + s.Keep() taskID := idgen.TaskID(req.Cid, req.UrlMeta) log := logger.With("function", "StatTask", "Cid", req.Cid, "Tag", req.UrlMeta.Tag, "taskID", taskID, "LocalOnly", req.LocalOnly) @@ -373,6 +381,7 @@ func (s *server) isTaskCompleted(taskID string) bool { } func (s *server) ImportTask(ctx context.Context, req *dfdaemongrpc.ImportTaskRequest) error { + s.Keep() peerID := idgen.PeerID(s.peerHost.Ip) taskID := idgen.TaskID(req.Cid, req.UrlMeta) log := logger.With("function", "ImportTask", "Cid", req.Cid, "Tag", req.UrlMeta.Tag, "taskID", taskID, "file", req.Path) @@ -434,6 +443,7 @@ func (s *server) ImportTask(ctx context.Context, req *dfdaemongrpc.ImportTaskReq } func (s *server) ExportTask(ctx context.Context, req *dfdaemongrpc.ExportTaskRequest) error { + s.Keep() taskID := idgen.TaskID(req.Cid, req.UrlMeta) log := logger.With("function", "ExportTask", "Cid", req.Cid, "Tag", req.UrlMeta.Tag, "taskID", taskID, "destination", req.Output) @@ -545,6 +555,7 @@ func call(ctx context.Context, peerID string, drc chan *dfdaemongrpc.DownResult, } func (s *server) DeleteTask(ctx context.Context, req *dfdaemongrpc.DeleteTaskRequest) error { + s.Keep() taskID := idgen.TaskID(req.Cid, req.UrlMeta) log := logger.With("function", "DeleteTask", "Cid", req.Cid, "Tag", req.UrlMeta.Tag, "taskID", taskID) diff --git a/client/daemon/rpcserver/rpcserver_test.go b/client/daemon/rpcserver/rpcserver_test.go index da6f812c94c..11d00e721e5 100644 --- a/client/daemon/rpcserver/rpcserver_test.go +++ b/client/daemon/rpcserver/rpcserver_test.go @@ -423,7 +423,7 @@ func Test_SyncPieceTasks(t *testing.T) { }) mockTaskManager := mock_peer.NewMockTaskManager(ctrl) mockTaskManager.EXPECT().Subscribe(gomock.Any()).AnyTimes().DoAndReturn( - func(request *base.PieceTaskRequest) (*peer.SubscribeResult, bool) { + func(request *base.PieceTaskRequest) (*peer.SubscribeResponse, bool) { ch := make(chan *peer.PieceInfo) success := make(chan struct{}) fail := make(chan struct{}) @@ -464,7 +464,7 @@ func Test_SyncPieceTasks(t *testing.T) { close(success) }(tc.followingPieces) - return &peer.SubscribeResult{ + return &peer.SubscribeResponse{ Storage: mockStorageManger, PieceInfoChannel: ch, Success: success, diff --git a/client/daemon/rpcserver/seeder.go b/client/daemon/rpcserver/seeder.go new file mode 100644 index 00000000000..a2537a6ea75 --- /dev/null +++ b/client/daemon/rpcserver/seeder.go @@ -0,0 +1,266 @@ +/* + * Copyright 2022 The Dragonfly Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package rpcserver + +import ( + "context" + "fmt" + "math" + "time" + + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + + "d7y.io/dragonfly/v2/client/clientutil" + "d7y.io/dragonfly/v2/client/config" + "d7y.io/dragonfly/v2/client/daemon/peer" + logger "d7y.io/dragonfly/v2/internal/dflog" + "d7y.io/dragonfly/v2/pkg/idgen" + "d7y.io/dragonfly/v2/pkg/rpc/base" + "d7y.io/dragonfly/v2/pkg/rpc/base/common" + "d7y.io/dragonfly/v2/pkg/rpc/cdnsystem" + "d7y.io/dragonfly/v2/pkg/rpc/scheduler" + "d7y.io/dragonfly/v2/pkg/util/rangeutils" +) + +type seeder struct { + server *server +} + +func (s *seeder) GetPieceTasks(ctx context.Context, request *base.PieceTaskRequest) (*base.PiecePacket, error) { + return s.server.GetPieceTasks(ctx, request) +} + +func (s *seeder) SyncPieceTasks(tasksServer cdnsystem.Seeder_SyncPieceTasksServer) error { + return s.server.SyncPieceTasks(tasksServer) +} + +func (s *seeder) ObtainSeeds(seedRequest *cdnsystem.SeedRequest, seedsServer cdnsystem.Seeder_ObtainSeedsServer) error { + s.server.Keep() + if seedRequest.UrlMeta == nil { + seedRequest.UrlMeta = &base.UrlMeta{} + } + + req := peer.SeedTaskRequest{ + PeerTaskRequest: scheduler.PeerTaskRequest{ + Url: seedRequest.Url, + UrlMeta: seedRequest.UrlMeta, + PeerId: idgen.PeerID(s.server.peerHost.Ip), + PeerHost: s.server.peerHost, + HostLoad: nil, + IsMigrating: false, + }, + Limit: 0, + Callsystem: "", + Range: nil, + } + + log := logger.With("peer", req.PeerId, "task", seedRequest.TaskId, "component", "seedService") + + if len(req.UrlMeta.Range) > 0 { + r, err := rangeutils.ParseRange(req.UrlMeta.Range, math.MaxInt) + if err != nil { + err = fmt.Errorf("parse range %s error: %s", req.UrlMeta.Range, err) + log.Errorf(err.Error()) + return err + } + req.Range = &clientutil.Range{ + Start: int64(r.StartIndex), + Length: int64(r.Length()), + } + } + + resp, err := s.server.peerTaskManager.StartSeedTask(seedsServer.Context(), &req) + if err != nil { + log.Errorf("start seed task error: %s", err.Error()) + return err + } + + log.Infof("start seed task") + + err = seedsServer.Send( + &cdnsystem.PieceSeed{ + PeerId: req.PeerId, + HostUuid: req.PeerHost.Uuid, + PieceInfo: &base.PieceInfo{ + PieceNum: common.BeginOfPiece, + }, + Done: false, + }) + if err != nil { + resp.Span.RecordError(err) + log.Errorf("send piece seed error: %s", err.Error()) + return err + } + + sync := seedSynchronizer{ + SeedTaskResponse: resp, + SugaredLoggerOnWith: log, + seedsServer: seedsServer, + seedTaskRequest: &req, + startNanoSecond: time.Now().UnixNano(), + } + defer resp.Span.End() + + return sync.sendPieceSeeds() +} + +type seedSynchronizer struct { + *peer.SeedTaskResponse + *logger.SugaredLoggerOnWith + seedsServer cdnsystem.Seeder_ObtainSeedsServer + seedTaskRequest *peer.SeedTaskRequest + startNanoSecond int64 +} + +func (s *seedSynchronizer) sendPieceSeeds() (err error) { + var ( + ctx = s.Context + desired int32 + ) + for { + select { + case <-ctx.Done(): + err = ctx.Err() + s.Errorf("context done due to %s", err.Error()) + s.Span.RecordError(err) + s.Span.SetAttributes(config.AttributeSeedTaskSuccess.Bool(false)) + return err + case <-s.Success: + s.Infof("seed task success, send reminding piece seeds") + err = s.sendRemindingPieceSeeds(desired) + if err != nil { + s.Span.RecordError(err) + s.Span.SetAttributes(config.AttributeSeedTaskSuccess.Bool(false)) + } else { + s.Span.SetAttributes(config.AttributeSeedTaskSuccess.Bool(true)) + } + return err + case <-s.Fail: + s.Error("seed task failed") + s.Span.RecordError(err) + s.Span.SetAttributes(config.AttributeSeedTaskSuccess.Bool(false)) + return status.Errorf(codes.Internal, "seed task failed") + case p := <-s.PieceInfoChannel: + s.Infof("receive piece info, num: %d, ordered num: %d, finish: %v", p.Num, p.OrderedNum, p.Finished) + desired, err = s.sendOrderedPieceSeeds(desired, p.OrderedNum, p.Finished) + if err != nil { + s.Span.RecordError(err) + s.Span.SetAttributes(config.AttributeSeedTaskSuccess.Bool(false)) + return err + } + if p.Finished { + s.Debugf("send piece seeds finished") + s.Span.SetAttributes(config.AttributeSeedTaskSuccess.Bool(true)) + return nil + } + } + } +} + +func (s *seedSynchronizer) sendRemindingPieceSeeds(desired int32) error { + for { + pp, err := s.Storage.GetPieces(s.Context, + &base.PieceTaskRequest{ + TaskId: s.TaskID, + StartNum: uint32(desired), + Limit: 16, + }) + if err != nil { + s.Errorf("get pieces error %s, desired: %d", err.Error(), desired) + return err + } + + for _, p := range pp.PieceInfos { + if p.PieceNum != desired { + s.Errorf("desired piece %d, not found", desired) + return status.Errorf(codes.Internal, "seed task piece %d not found", desired) + } + ps := s.compositePieceSeed(pp, p) + if p.PieceNum == pp.TotalPiece-1 { + ps.Done, ps.EndTime = true, uint64(time.Now().UnixNano()) + s.Infof("seed tasks start time: %d, end time: %d, cost: %dms", ps.BeginTime, ps.EndTime, (ps.EndTime-ps.BeginTime)/1000000) + } + + err = s.seedsServer.Send(&ps) + if err != nil { + s.Errorf("send reminding piece seeds error: %s", err.Error()) + return err + } + + s.Span.AddEvent(fmt.Sprintf("send piece %d ok", desired)) + desired++ + } + if desired == pp.TotalPiece { + s.Debugf("send reminding piece seeds ok") + return nil + } + } +} + +func (s *seedSynchronizer) sendOrderedPieceSeeds(desired, orderedNum int32, finished bool) (int32, error) { + cur := desired + for ; cur <= orderedNum; cur++ { + pp, err := s.Storage.GetPieces(s.Context, + &base.PieceTaskRequest{ + TaskId: s.TaskID, + StartNum: uint32(cur), + Limit: 1, + }) + if err != nil { + s.Errorf("get pieces error %s, desired: %d", err.Error(), cur) + return -1, err + } + if len(pp.PieceInfos) < 1 { + s.Errorf("desired pieces %d not found", cur) + return -1, fmt.Errorf("get seed piece %d info failed", cur) + } + + ps := s.compositePieceSeed(pp, pp.PieceInfos[0]) + if cur == orderedNum && finished { + ps.Done, ps.EndTime = true, uint64(time.Now().UnixNano()) + s.Infof("seed tasks start time: %d, end time: %d, cost: %dms", ps.BeginTime, ps.EndTime, (ps.EndTime-ps.BeginTime)/1000000) + } + err = s.seedsServer.Send(&ps) + if err != nil { + s.Errorf("send ordered piece seeds error: %s", err.Error()) + return -1, err + } + s.Debugf("send piece %d seeds ok", cur) + s.Span.AddEvent(fmt.Sprintf("send piece %d ok", cur)) + } + return cur, nil +} + +func (s *seedSynchronizer) compositePieceSeed(pp *base.PiecePacket, piece *base.PieceInfo) cdnsystem.PieceSeed { + return cdnsystem.PieceSeed{ + PeerId: s.seedTaskRequest.PeerId, + HostUuid: s.seedTaskRequest.PeerHost.Uuid, + PieceInfo: &base.PieceInfo{ + PieceNum: piece.PieceNum, + RangeStart: piece.RangeStart, + RangeSize: piece.RangeSize, + PieceMd5: piece.PieceMd5, + PieceOffset: piece.PieceOffset, + PieceStyle: piece.PieceStyle, + DownloadCost: piece.DownloadCost, + }, + ContentLength: pp.ContentLength, + TotalPieceCount: pp.TotalPiece, + BeginTime: uint64(s.startNanoSecond), + } +} diff --git a/client/daemon/rpcserver/seeder_test.go b/client/daemon/rpcserver/seeder_test.go new file mode 100644 index 00000000000..a0f4d117d9b --- /dev/null +++ b/client/daemon/rpcserver/seeder_test.go @@ -0,0 +1,380 @@ +/* + * Copyright 2022 The Dragonfly Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package rpcserver + +import ( + "context" + "fmt" + "io" + "net" + "sync" + "testing" + + "github.com/golang/mock/gomock" + "github.com/phayes/freeport" + testifyassert "github.com/stretchr/testify/assert" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/trace" + + "d7y.io/dragonfly/v2/client/clientutil" + "d7y.io/dragonfly/v2/client/config" + "d7y.io/dragonfly/v2/client/daemon/peer" + mock_peer "d7y.io/dragonfly/v2/client/daemon/test/mock/peer" + mock_storage "d7y.io/dragonfly/v2/client/daemon/test/mock/storage" + "d7y.io/dragonfly/v2/pkg/dfnet" + "d7y.io/dragonfly/v2/pkg/rpc/base" + "d7y.io/dragonfly/v2/pkg/rpc/base/common" + "d7y.io/dragonfly/v2/pkg/rpc/cdnsystem" + cdnclient "d7y.io/dragonfly/v2/pkg/rpc/cdnsystem/client" + dfdaemonserver "d7y.io/dragonfly/v2/pkg/rpc/dfdaemon/server" + "d7y.io/dragonfly/v2/pkg/rpc/scheduler" +) + +func Test_ObtainSeeds(t *testing.T) { + assert := testifyassert.New(t) + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + var ( + pieceSize = uint32(1024) + ) + + type pieceRange struct { + start int + end int + } + var tests = []struct { + name string + existTaskID string // test for non-exists task + existPieces []pieceRange // already exist pieces in storage + followingPieces []pieceRange // following pieces in running task subscribe channel + limit uint32 + totalPieces uint32 + success bool + verify func(t *testing.T, assert *testifyassert.Assertions) + }{ + { + name: "already exists in storage", + existPieces: []pieceRange{ + { + start: 0, + end: 10, + }, + }, + totalPieces: 11, + success: true, + verify: func(t *testing.T, assert *testifyassert.Assertions) { + }, + }, + { + name: "already exists in storage with extra get piece request", + existPieces: []pieceRange{ + { + start: 0, + end: 10, + }, + }, + totalPieces: 11, + success: true, + verify: func(t *testing.T, assert *testifyassert.Assertions) { + }, + }, + { + name: "already exists in storage - large", + existPieces: []pieceRange{ + { + start: 0, + end: 1000, + }, + }, + totalPieces: 1001, + success: true, + verify: func(t *testing.T, assert *testifyassert.Assertions) { + }, + }, + { + name: "already exists in storage - large with extra get piece request", + existPieces: []pieceRange{ + { + start: 0, + end: 1000, + }, + }, + totalPieces: 1001, + success: true, + verify: func(t *testing.T, assert *testifyassert.Assertions) { + }, + }, + { + name: "partial exists in storage", + existPieces: []pieceRange{ + { + start: 0, + end: 10, + }, + }, + followingPieces: []pieceRange{ + { + start: 11, + end: 20, + }, + }, + totalPieces: 21, + success: true, + verify: func(t *testing.T, assert *testifyassert.Assertions) { + }, + }, + { + name: "partial exists in storage - large", + existPieces: []pieceRange{ + { + start: 0, + end: 1000, + }, + }, + followingPieces: []pieceRange{ + { + start: 1001, + end: 2000, + }, + }, + totalPieces: 2001, + success: true, + verify: func(t *testing.T, assert *testifyassert.Assertions) { + }, + }, + { + name: "not exists in storage", + followingPieces: []pieceRange{ + { + start: 0, + end: 20, + }, + }, + totalPieces: 21, + success: true, + verify: func(t *testing.T, assert *testifyassert.Assertions) { + }, + }, + { + name: "not exists in storage - large", + followingPieces: []pieceRange{ + { + start: 0, + end: 2000, + }, + }, + totalPieces: 2001, + success: true, + verify: func(t *testing.T, assert *testifyassert.Assertions) { + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + for _, delay := range []bool{false, true} { + delay := delay + mockStorageManger := mock_storage.NewMockManager(ctrl) + + if tc.limit == 0 { + tc.limit = 1024 + } + + var ( + totalPieces []*base.PieceInfo + lock sync.Mutex + ) + + var addedPieces = make(map[uint32]*base.PieceInfo) + for _, p := range tc.existPieces { + if p.end == 0 { + p.end = p.start + } + for i := p.start; i <= p.end; i++ { + if _, ok := addedPieces[uint32(i)]; ok { + continue + } + piece := &base.PieceInfo{ + PieceNum: int32(i), + RangeStart: uint64(i) * uint64(pieceSize), + RangeSize: pieceSize, + PieceOffset: uint64(i) * uint64(pieceSize), + PieceStyle: base.PieceStyle_PLAIN, + } + totalPieces = append(totalPieces, piece) + addedPieces[uint32(i)] = piece + } + } + + mockStorageManger.EXPECT().GetPieces(gomock.Any(), + gomock.Any()).AnyTimes().DoAndReturn( + func(ctx context.Context, req *base.PieceTaskRequest) (*base.PiecePacket, error) { + var pieces []*base.PieceInfo + lock.Lock() + for i := req.StartNum; i < tc.totalPieces; i++ { + if piece, ok := addedPieces[i]; ok { + if piece.PieceNum >= int32(req.StartNum) && len(pieces) < int(req.Limit) { + pieces = append(pieces, piece) + } + } + } + lock.Unlock() + return &base.PiecePacket{ + TaskId: req.TaskId, + DstPid: req.DstPid, + DstAddr: "", + PieceInfos: pieces, + TotalPiece: int32(tc.totalPieces), + ContentLength: int64(tc.totalPieces) * int64(pieceSize), + PieceMd5Sign: "", + }, nil + }) + mockTaskManager := mock_peer.NewMockTaskManager(ctrl) + mockTaskManager.EXPECT().StartSeedTask(gomock.Any(), gomock.Any()).DoAndReturn( + func(ctx context.Context, req *peer.SeedTaskRequest) (*peer.SeedTaskResponse, error) { + ch := make(chan *peer.PieceInfo) + success := make(chan struct{}) + fail := make(chan struct{}) + + go func(followingPieces []pieceRange) { + for i, p := range followingPieces { + if p.end == 0 { + p.end = p.start + } + for j := p.start; j <= p.end; j++ { + lock.Lock() + if _, ok := addedPieces[uint32(j)]; ok { + continue + } + piece := &base.PieceInfo{ + PieceNum: int32(j), + RangeStart: uint64(j) * uint64(pieceSize), + RangeSize: pieceSize, + PieceOffset: uint64(j) * uint64(pieceSize), + PieceStyle: base.PieceStyle_PLAIN, + } + totalPieces = append(totalPieces, piece) + addedPieces[uint32(j)] = piece + lock.Unlock() + + var finished bool + if i == len(followingPieces)-1 && j == p.end { + finished = true + } + if !delay { + ch <- &peer.PieceInfo{ + Num: int32(j), + Finished: finished, + } + } + } + } + close(success) + }(tc.followingPieces) + + tracer := otel.Tracer("test") + ctx, span := tracer.Start(ctx, config.SpanSeedTask, trace.WithSpanKind(trace.SpanKindClient)) + return &peer.SeedTaskResponse{ + SubscribeResponse: peer.SubscribeResponse{ + Storage: mockStorageManger, + PieceInfoChannel: ch, + Success: success, + Fail: fail, + }, + Context: ctx, + Span: span, + TaskID: "fake-task-id", + }, nil + }) + + s := &server{ + KeepAlive: clientutil.NewKeepAlive("test"), + peerHost: &scheduler.PeerHost{}, + storageManager: mockStorageManger, + peerTaskManager: mockTaskManager, + } + sd := &seeder{server: s} + + _, client := setupSeederServerAndClient(t, s, sd, assert, s.ServePeer) + + pps, err := client.ObtainSeeds( + context.Background(), + &cdnsystem.SeedRequest{ + TaskId: "fake-task-id", + Url: "http://localhost/path/to/file", + UrlMeta: nil, + }) + assert.Nil(err, "client obtain seeds grpc call should be ok") + + var ( + total = make(map[int32]bool) + maxNum int32 + ) + + for { + p, err := pps.Recv() + if err == io.EOF { + break + } + if p.PieceInfo.PieceNum == common.BeginOfPiece { + continue + } + total[p.PieceInfo.PieceNum] = true + if p.PieceInfo.PieceNum >= maxNum { + maxNum = p.PieceInfo.PieceNum + } + if tc.success { + assert.Nil(err, "receive seed info should be ok") + } + } + if tc.success { + assert.Equal(int(maxNum+1), len(total)) + } + s.peerServer.GracefulStop() + } + + }) + } +} + +func setupSeederServerAndClient(t *testing.T, srv *server, sd *seeder, assert *testifyassert.Assertions, serveFunc func(listener net.Listener) error) (int, cdnclient.CdnClient) { + srv.peerServer = dfdaemonserver.New(srv) + cdnsystem.RegisterSeederServer(srv.peerServer, sd) + + port, err := freeport.GetFreePort() + if err != nil { + t.Fatal(err) + } + + ln, err := net.Listen("tcp", fmt.Sprintf(":%d", port)) + assert.Nil(err, "get free port should be ok") + go func() { + if err := serveFunc(ln); err != nil { + t.Error(err) + } + }() + + client, err := cdnclient.GetClientByAddr([]dfnet.NetAddr{ + { + Type: dfnet.TCP, + Addr: fmt.Sprintf(":%d", port), + }, + }) + assert.Nil(err, "grpc dial should be ok") + return port, client +} diff --git a/client/daemon/rpcserver/subscriber.go b/client/daemon/rpcserver/subscriber.go index fcd10231f71..d1c54e758f4 100644 --- a/client/daemon/rpcserver/subscriber.go +++ b/client/daemon/rpcserver/subscriber.go @@ -34,7 +34,7 @@ import ( type subscriber struct { sync.Mutex // lock for sent map and grpc Send *logger.SugaredLoggerOnWith - *peer.SubscribeResult + *peer.SubscribeResponse sync dfdaemon.Daemon_SyncPieceTasksServer request *base.PieceTaskRequest skipPieceCount uint32 @@ -164,7 +164,7 @@ loop: s.Infof("remote SyncPieceTasks done, exit sending, local task is running") return nil case info := <-s.PieceInfoChannel: - s.Infof("receive piece info, num: %d, %v", info.Num, info.Finished) + s.Infof("receive piece info, num: %d, finished: %v", info.Num, info.Finished) // not desired piece if s.totalPieces > -1 && uint32(info.Num) < nextPieceNum { continue diff --git a/client/daemon/storage/local_storage.go b/client/daemon/storage/local_storage.go index a8040d7d3ac..2566b7535b5 100644 --- a/client/daemon/storage/local_storage.go +++ b/client/daemon/storage/local_storage.go @@ -113,6 +113,7 @@ func (t *localTaskStore) WritePiece(ctx context.Context, req *WritePieceRequest) } t.RUnlock() + start := time.Now().UnixNano() file, err := os.OpenFile(t.DataFilePath, os.O_RDWR, defaultFileMode) if err != nil { return 0, err @@ -170,6 +171,7 @@ func (t *localTaskStore) WritePiece(ctx context.Context, req *WritePieceRequest) if _, ok := t.Pieces[req.Num]; ok { return n, nil } + req.PieceMetadata.Cost = uint64(time.Now().UnixNano() - start) t.Pieces[req.Num] = req.PieceMetadata t.genDigest(n, req) return n, nil @@ -407,14 +409,16 @@ func (t *localTaskStore) GetPieces(ctx context.Context, req *base.PieceTaskReque break } if piece, ok := t.Pieces[num]; ok { - piecePacket.PieceInfos = append(piecePacket.PieceInfos, &base.PieceInfo{ - PieceNum: piece.Num, - RangeStart: uint64(piece.Range.Start), - RangeSize: uint32(piece.Range.Length), - PieceMd5: piece.Md5, - PieceOffset: piece.Offset, - PieceStyle: piece.Style, - }) + piecePacket.PieceInfos = append(piecePacket.PieceInfos, + &base.PieceInfo{ + PieceNum: piece.Num, + RangeStart: uint64(piece.Range.Start), + RangeSize: uint32(piece.Range.Length), + PieceMd5: piece.Md5, + PieceOffset: piece.Offset, + PieceStyle: piece.Style, + DownloadCost: piece.Cost / 1000, + }) } } return piecePacket, nil diff --git a/client/daemon/storage/metadata.go b/client/daemon/storage/metadata.go index 90d8b32e30d..2dac2429cab 100644 --- a/client/daemon/storage/metadata.go +++ b/client/daemon/storage/metadata.go @@ -47,6 +47,8 @@ type PieceMetadata struct { Offset uint64 `json:"offset,omitempty"` Range clientutil.Range `json:"range,omitempty"` Style base.PieceStyle `json:"style,omitempty"` + // time(nanosecond) consumed + Cost uint64 `json:"cost,omitempty"` } type CommonTaskRequest struct { diff --git a/client/daemon/test/mock/peer/peertask_manager.go b/client/daemon/test/mock/peer/peertask_manager.go index 94d74f8fd5a..f204bbe96c4 100644 --- a/client/daemon/test/mock/peer/peertask_manager.go +++ b/client/daemon/test/mock/peer/peertask_manager.go @@ -98,6 +98,21 @@ func (mr *MockTaskManagerMockRecorder) StartFileTask(ctx, req interface{}) *gomo return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StartFileTask", reflect.TypeOf((*MockTaskManager)(nil).StartFileTask), ctx, req) } +// StartSeedTask mocks base method. +func (m *MockTaskManager) StartSeedTask(ctx context.Context, req *peer.SeedTaskRequest) (*peer.SeedTaskResponse, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "StartSeedTask", ctx, req) + ret0, _ := ret[0].(*peer.SeedTaskResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// StartSeedTask indicates an expected call of StartSeedTask. +func (mr *MockTaskManagerMockRecorder) StartSeedTask(ctx, req interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StartSeedTask", reflect.TypeOf((*MockTaskManager)(nil).StartSeedTask), ctx, req) +} + // StartStreamTask mocks base method. func (m *MockTaskManager) StartStreamTask(ctx context.Context, req *peer.StreamTaskRequest) (io.ReadCloser, map[string]string, error) { m.ctrl.T.Helper() @@ -144,10 +159,10 @@ func (mr *MockTaskManagerMockRecorder) Stop(ctx interface{}) *gomock.Call { } // Subscribe mocks base method. -func (m *MockTaskManager) Subscribe(request *base.PieceTaskRequest) (*peer.SubscribeResult, bool) { +func (m *MockTaskManager) Subscribe(request *base.PieceTaskRequest) (*peer.SubscribeResponse, bool) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Subscribe", request) - ret0, _ := ret[0].(*peer.SubscribeResult) + ret0, _ := ret[0].(*peer.SubscribeResponse) ret1, _ := ret[1].(bool) return ret0, ret1 } diff --git a/pkg/rpc/cdnsystem/server/server.go b/pkg/rpc/cdnsystem/server/server.go index 11ab451d469..f85bfb6e587 100644 --- a/pkg/rpc/cdnsystem/server/server.go +++ b/pkg/rpc/cdnsystem/server/server.go @@ -37,7 +37,7 @@ import ( "d7y.io/dragonfly/v2/pkg/util/net/iputils" ) -// SeederServer refer to cdnsystem.SeederServer +// SeederServer refer to cdnsystem.SeederServer type SeederServer interface { // ObtainSeeds generate seeds and return to scheduler ObtainSeeds(context.Context, *cdnsystem.SeedRequest, chan<- *cdnsystem.PieceSeed) error