Skip to content

Commit

Permalink
feat: framework log
Browse files Browse the repository at this point in the history
Signed-off-by: santong <[email protected]>
  • Loading branch information
244372610 committed Aug 31, 2021
1 parent af2fe79 commit 4c7471c
Show file tree
Hide file tree
Showing 5 changed files with 19 additions and 16 deletions.
10 changes: 4 additions & 6 deletions pkg/rpc/cdnsystem/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,17 +101,15 @@ func (cc *cdnClient) ObtainSeeds(ctx context.Context, sr *cdnsystem.SeedRequest,

func (cc *cdnClient) GetPieceTasks(ctx context.Context, addr dfnet.NetAddr, req *base.PieceTaskRequest, opts ...grpc.CallOption) (*base.PiecePacket, error) {
res, err := rpc.ExecuteWithRetry(func() (interface{}, error) {
defer func() {
logger.WithTaskID(req.TaskId).Infof("invoke cdn node %s GetPieceTasks", addr.GetEndpoint())
}()
client, err := cc.getSeederClientWithTarget(addr.GetEndpoint())
if err != nil {
return nil, err
}
return client.GetPieceTasks(ctx, req, opts...)
}, 0.2, 2.0, 3, nil)
if err == nil {
return res.(*base.PiecePacket), nil
if err != nil {
logger.WithTaskID(req.TaskId).Infof("GetPieceTasks: invoke cdn node %s GetPieceTasks failed: %v", addr.GetEndpoint(), err)
return nil, err
}
return nil, err
return res.(*base.PiecePacket), nil
}
3 changes: 2 additions & 1 deletion pkg/rpc/cdnsystem/client/piece_seed_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ func (pss *PieceSeedStream) retryRecv(cause error) (*cdnsystem.PieceSeed, error)

func (pss *PieceSeedStream) replaceStream(cause error) error {
if pss.StreamTimes >= pss.MaxAttempts {
logger.WithTaskID(pss.hashKey).Info("replace stream reach max attempt")
return cause
}
var target string
Expand All @@ -115,7 +116,7 @@ func (pss *PieceSeedStream) replaceStream(cause error) error {
return client.ObtainSeeds(pss.ctx, pss.sr, pss.opts...)
}, pss.InitBackoff, pss.MaxBackOff, pss.MaxAttempts, cause)
if err != nil {
logger.WithTaskID(pss.hashKey).Infof("replaceStream: invoke cdn node %s ObtainSeeds", target)
logger.WithTaskID(pss.hashKey).Infof("replaceStream: invoke cdn node %s ObtainSeeds failed: %v", target, err)
return pss.replaceStream(cause)
}
pss.stream = stream.(cdnsystem.Seeder_ObtainSeedsClient)
Expand Down
15 changes: 9 additions & 6 deletions pkg/rpc/dfdaemon/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"sync"
"time"

logger "d7y.io/dragonfly/v2/internal/dflog"
"d7y.io/dragonfly/v2/internal/idgen"
"d7y.io/dragonfly/v2/pkg/basic/dfnet"
"d7y.io/dragonfly/v2/pkg/rpc"
Expand Down Expand Up @@ -114,12 +115,11 @@ func (dc *daemonClient) GetPieceTasks(ctx context.Context, target dfnet.NetAddr,
}
return client.GetPieceTasks(ctx, ptr, opts...)
}, 0.2, 2.0, 3, nil)

if err == nil {
return res.(*base.PiecePacket), nil
if err != nil {
logger.WithTaskID(ptr.TaskId).Infof("GetPieceTasks: invoke daemon node %s GetPieceTasks failed: %v", target, err)
return nil, err
}

return nil, err
return res.(*base.PiecePacket), nil
}

func (dc *daemonClient) CheckHealth(ctx context.Context, target dfnet.NetAddr, opts ...grpc.CallOption) (err error) {
Expand All @@ -130,6 +130,9 @@ func (dc *daemonClient) CheckHealth(ctx context.Context, target dfnet.NetAddr, o
}
return client.CheckHealth(ctx, new(empty.Empty), opts...)
}, 0.2, 2.0, 3, nil)

if err != nil {
logger.Infof("CheckHealth: invoke daemon node %s CheckHealth failed: %v", target, err)
return
}
return
}
5 changes: 3 additions & 2 deletions pkg/rpc/dfdaemon/client/down_result_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ func (drs *DownResultStream) retryRecv(cause error) (*dfdaemon.DownResult, error

func (drs *DownResultStream) replaceStream(cause error) error {
if drs.StreamTimes >= drs.MaxAttempts {
logger.WithTaskID(drs.hashKey).Info("replace stream reach max attempt")
return cause
}
var target string
Expand All @@ -126,7 +127,7 @@ func (drs *DownResultStream) replaceStream(cause error) error {
return client.Download(drs.ctx, drs.req, drs.opts...)
}, drs.InitBackoff, drs.MaxBackOff, drs.MaxAttempts, cause)
if err != nil {
logger.WithTaskID(drs.hashKey).Infof("replaceStream: invoke daemon node %s Download", target)
logger.WithTaskID(drs.hashKey).Infof("replaceStream: invoke daemon node %s Download failed: %v", target, err)
return drs.replaceClient(cause)
}
drs.stream = stream.(dfdaemon.Daemon_DownloadClient)
Expand All @@ -153,7 +154,7 @@ func (drs *DownResultStream) replaceClient(cause error) error {
return client.Download(drs.ctx, drs.req, drs.opts...)
}, drs.InitBackoff, drs.MaxBackOff, drs.MaxAttempts, cause)
if err != nil {
logger.WithTaskID(drs.hashKey).Infof("replaceClient: invoke daemon node %s Download", target)
logger.WithTaskID(drs.hashKey).Infof("replaceClient: invoke daemon node %s Download failed: %v", target, err)
return drs.replaceClient(cause)
}
drs.stream = stream.(dfdaemon.Daemon_DownloadClient)
Expand Down
2 changes: 1 addition & 1 deletion pkg/rpc/scheduler/client/peer_packet_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ func (pps *peerPacketStream) replaceClient(cause error) error {
return client.ReportPieceResult(pps.ctx, pps.opts...)
}, pps.retryMeta.InitBackoff, pps.retryMeta.MaxBackOff, pps.retryMeta.MaxAttempts, cause)
if err != nil {
logger.WithTaskID(pps.hashKey).Infof("replaceClient: invoke scheduler node %s ReportPieceResult", target)
logger.WithTaskID(pps.hashKey).Infof("replaceClient: invoke scheduler node %s ReportPieceResult failed: %v", target, err)
return pps.replaceClient(cause)
}
pps.stream = stream.(scheduler.Scheduler_ReportPieceResultClient)
Expand Down

0 comments on commit 4c7471c

Please sign in to comment.