diff --git a/pkg/rpc/cdnsystem/client/client.go b/pkg/rpc/cdnsystem/client/client.go index 8e9bb6befad..b26f1a598e1 100644 --- a/pkg/rpc/cdnsystem/client/client.go +++ b/pkg/rpc/cdnsystem/client/client.go @@ -101,17 +101,15 @@ func (cc *cdnClient) ObtainSeeds(ctx context.Context, sr *cdnsystem.SeedRequest, func (cc *cdnClient) GetPieceTasks(ctx context.Context, addr dfnet.NetAddr, req *base.PieceTaskRequest, opts ...grpc.CallOption) (*base.PiecePacket, error) { res, err := rpc.ExecuteWithRetry(func() (interface{}, error) { - defer func() { - logger.WithTaskID(req.TaskId).Infof("invoke cdn node %s GetPieceTasks", addr.GetEndpoint()) - }() client, err := cc.getSeederClientWithTarget(addr.GetEndpoint()) if err != nil { return nil, err } return client.GetPieceTasks(ctx, req, opts...) }, 0.2, 2.0, 3, nil) - if err == nil { - return res.(*base.PiecePacket), nil + if err != nil { + logger.WithTaskID(req.TaskId).Infof("GetPieceTasks: invoke cdn node %s GetPieceTasks failed: %v", addr.GetEndpoint(), err) + return nil, err } - return nil, err + return res.(*base.PiecePacket), nil } diff --git a/pkg/rpc/cdnsystem/client/piece_seed_stream.go b/pkg/rpc/cdnsystem/client/piece_seed_stream.go index 4bcbd0dc897..781426fa1e6 100644 --- a/pkg/rpc/cdnsystem/client/piece_seed_stream.go +++ b/pkg/rpc/cdnsystem/client/piece_seed_stream.go @@ -102,6 +102,7 @@ func (pss *PieceSeedStream) retryRecv(cause error) (*cdnsystem.PieceSeed, error) func (pss *PieceSeedStream) replaceStream(cause error) error { if pss.StreamTimes >= pss.MaxAttempts { + logger.WithTaskID(pss.hashKey).Info("replace stream reach max attempt") return cause } var target string @@ -115,7 +116,7 @@ func (pss *PieceSeedStream) replaceStream(cause error) error { return client.ObtainSeeds(pss.ctx, pss.sr, pss.opts...) }, pss.InitBackoff, pss.MaxBackOff, pss.MaxAttempts, cause) if err != nil { - logger.WithTaskID(pss.hashKey).Infof("replaceStream: invoke cdn node %s ObtainSeeds", target) + logger.WithTaskID(pss.hashKey).Infof("replaceStream: invoke cdn node %s ObtainSeeds failed: %v", target, err) return pss.replaceStream(cause) } pss.stream = stream.(cdnsystem.Seeder_ObtainSeedsClient) diff --git a/pkg/rpc/dfdaemon/client/client.go b/pkg/rpc/dfdaemon/client/client.go index 9548a3c13c4..bfd842b3b07 100644 --- a/pkg/rpc/dfdaemon/client/client.go +++ b/pkg/rpc/dfdaemon/client/client.go @@ -22,6 +22,7 @@ import ( "sync" "time" + logger "d7y.io/dragonfly/v2/internal/dflog" "d7y.io/dragonfly/v2/internal/idgen" "d7y.io/dragonfly/v2/pkg/basic/dfnet" "d7y.io/dragonfly/v2/pkg/rpc" @@ -114,12 +115,11 @@ func (dc *daemonClient) GetPieceTasks(ctx context.Context, target dfnet.NetAddr, } return client.GetPieceTasks(ctx, ptr, opts...) }, 0.2, 2.0, 3, nil) - - if err == nil { - return res.(*base.PiecePacket), nil + if err != nil { + logger.WithTaskID(ptr.TaskId).Infof("GetPieceTasks: invoke daemon node %s GetPieceTasks failed: %v", target, err) + return nil, err } - - return nil, err + return res.(*base.PiecePacket), nil } func (dc *daemonClient) CheckHealth(ctx context.Context, target dfnet.NetAddr, opts ...grpc.CallOption) (err error) { @@ -130,6 +130,9 @@ func (dc *daemonClient) CheckHealth(ctx context.Context, target dfnet.NetAddr, o } return client.CheckHealth(ctx, new(empty.Empty), opts...) }, 0.2, 2.0, 3, nil) - + if err != nil { + logger.Infof("CheckHealth: invoke daemon node %s CheckHealth failed: %v", target, err) + return + } return } diff --git a/pkg/rpc/dfdaemon/client/down_result_stream.go b/pkg/rpc/dfdaemon/client/down_result_stream.go index 41a6328b5a0..91a12fd8f86 100644 --- a/pkg/rpc/dfdaemon/client/down_result_stream.go +++ b/pkg/rpc/dfdaemon/client/down_result_stream.go @@ -113,6 +113,7 @@ func (drs *DownResultStream) retryRecv(cause error) (*dfdaemon.DownResult, error func (drs *DownResultStream) replaceStream(cause error) error { if drs.StreamTimes >= drs.MaxAttempts { + logger.WithTaskID(drs.hashKey).Info("replace stream reach max attempt") return cause } var target string @@ -126,7 +127,7 @@ func (drs *DownResultStream) replaceStream(cause error) error { return client.Download(drs.ctx, drs.req, drs.opts...) }, drs.InitBackoff, drs.MaxBackOff, drs.MaxAttempts, cause) if err != nil { - logger.WithTaskID(drs.hashKey).Infof("replaceStream: invoke daemon node %s Download", target) + logger.WithTaskID(drs.hashKey).Infof("replaceStream: invoke daemon node %s Download failed: %v", target, err) return drs.replaceClient(cause) } drs.stream = stream.(dfdaemon.Daemon_DownloadClient) @@ -153,7 +154,7 @@ func (drs *DownResultStream) replaceClient(cause error) error { return client.Download(drs.ctx, drs.req, drs.opts...) }, drs.InitBackoff, drs.MaxBackOff, drs.MaxAttempts, cause) if err != nil { - logger.WithTaskID(drs.hashKey).Infof("replaceClient: invoke daemon node %s Download", target) + logger.WithTaskID(drs.hashKey).Infof("replaceClient: invoke daemon node %s Download failed: %v", target, err) return drs.replaceClient(cause) } drs.stream = stream.(dfdaemon.Daemon_DownloadClient) diff --git a/pkg/rpc/scheduler/client/peer_packet_stream.go b/pkg/rpc/scheduler/client/peer_packet_stream.go index 711c420dec3..dc15efdfae9 100644 --- a/pkg/rpc/scheduler/client/peer_packet_stream.go +++ b/pkg/rpc/scheduler/client/peer_packet_stream.go @@ -212,7 +212,7 @@ func (pps *peerPacketStream) replaceClient(cause error) error { return client.ReportPieceResult(pps.ctx, pps.opts...) }, pps.retryMeta.InitBackoff, pps.retryMeta.MaxBackOff, pps.retryMeta.MaxAttempts, cause) if err != nil { - logger.WithTaskID(pps.hashKey).Infof("replaceClient: invoke scheduler node %s ReportPieceResult", target) + logger.WithTaskID(pps.hashKey).Infof("replaceClient: invoke scheduler node %s ReportPieceResult failed: %v", target, err) return pps.replaceClient(cause) } pps.stream = stream.(scheduler.Scheduler_ReportPieceResultClient)