From 5b2333ee278649576b6d91fd46f6fce3f6f2172a Mon Sep 17 00:00:00 2001 From: santong <244372610@qq.com> Date: Tue, 31 Aug 2021 00:57:07 +0800 Subject: [PATCH 1/8] feat: optimize grpc Signed-off-by: santong <244372610@qq.com> --- pkg/rpc/cdnsystem/client/piece_seed_stream.go | 33 +++++++---------- pkg/rpc/client.go | 36 +++++++------------ pkg/rpc/client_util.go | 4 +++ pkg/rpc/dfdaemon/client/down_result_stream.go | 21 +++++------ pkg/rpc/scheduler/client/client.go | 5 +-- .../scheduler/client/peer_packet_stream.go | 22 ++++++------ scheduler/core/service.go | 4 +-- 7 files changed, 57 insertions(+), 68 deletions(-) diff --git a/pkg/rpc/cdnsystem/client/piece_seed_stream.go b/pkg/rpc/cdnsystem/client/piece_seed_stream.go index 21fca81c0ea..9a792889e75 100644 --- a/pkg/rpc/cdnsystem/client/piece_seed_stream.go +++ b/pkg/rpc/cdnsystem/client/piece_seed_stream.go @@ -23,10 +23,7 @@ import ( logger "d7y.io/dragonfly/v2/internal/dflog" "d7y.io/dragonfly/v2/pkg/rpc" "d7y.io/dragonfly/v2/pkg/rpc/cdnsystem" - "github.com/pkg/errors" "google.golang.org/grpc" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" ) type PieceSeedStream struct { @@ -65,11 +62,11 @@ func newPieceSeedStream(ctx context.Context, sc *cdnClient, hashKey string, sr * func (pss *PieceSeedStream) initStream() error { stream, err := rpc.ExecuteWithRetry(func() (interface{}, error) { - client, cdnServerNode, err := pss.sc.getCdnClient(pss.hashKey, false) + client, target, err := pss.sc.getCdnClient(pss.hashKey, false) if err != nil { return nil, err } - logger.WithTaskID(pss.hashKey).Infof("invoke cdn node %s ObtainSeeds", cdnServerNode) + logger.WithTaskID(pss.hashKey).Infof("initStream: invoke cdn node %s ObtainSeeds", target) return client.ObtainSeeds(pss.ctx, pss.sr, pss.opts...) }, pss.InitBackoff, pss.MaxBackOff, pss.MaxAttempts, nil) if err == nil { @@ -91,34 +88,30 @@ func (pss *PieceSeedStream) Recv() (ps *cdnsystem.PieceSeed, err error) { } func (pss *PieceSeedStream) retryRecv(cause error) (*cdnsystem.PieceSeed, error) { - if status.Code(cause) == codes.DeadlineExceeded { - return nil, cause - } - - if err := pss.replaceStream(pss.hashKey, cause); err != nil { - if err := pss.replaceClient(pss.hashKey, cause); err != nil { - return nil, cause - } + if err := pss.replaceStream(cause); err != nil { + return nil, err } - return pss.Recv() } -func (pss *PieceSeedStream) replaceStream(key string, cause error) error { +func (pss *PieceSeedStream) replaceStream(cause error) error { if pss.StreamTimes >= pss.MaxAttempts { - return errors.New("times of replacing stream reaches limit") + return cause } stream, err := rpc.ExecuteWithRetry(func() (interface{}, error) { - client, _, err := pss.sc.getCdnClient(key, true) + client, target, err := pss.sc.getCdnClient(pss.hashKey, true) if err != nil { return nil, err } + logger.WithTaskID(pss.hashKey).Infof("replaceStream: invoke cdn node %s ObtainSeeds", target) return client.ObtainSeeds(pss.ctx, pss.sr, pss.opts...) }, pss.InitBackoff, pss.MaxBackOff, pss.MaxAttempts, cause) if err == nil { pss.stream = stream.(cdnsystem.Seeder_ObtainSeedsClient) pss.StreamTimes++ + } else { + err = pss.replaceStream(cause) } return err } @@ -131,17 +124,17 @@ func (pss *PieceSeedStream) replaceClient(key string, cause error) error { pss.failedServers = append(pss.failedServers, preNode) stream, err := rpc.ExecuteWithRetry(func() (interface{}, error) { - client, _, err := pss.sc.getCdnClient(key, true) + client, target, err := pss.sc.getCdnClient(key, true) if err != nil { return nil, err } + logger.WithTaskID(pss.hashKey).Infof("replaceClient: invoke cdn node %s ObtainSeeds", target) return client.ObtainSeeds(pss.ctx, pss.sr, pss.opts...) }, pss.InitBackoff, pss.MaxBackOff, pss.MaxAttempts, cause) if err == nil { pss.stream = stream.(cdnsystem.Seeder_ObtainSeedsClient) pss.StreamTimes = 1 - } - if err != nil { + } else { err = pss.replaceClient(key, cause) } return err diff --git a/pkg/rpc/client.go b/pkg/rpc/client.go index 438f1bce09d..51d66e26a86 100644 --- a/pkg/rpc/client.go +++ b/pkg/rpc/client.go @@ -26,6 +26,7 @@ import ( "github.com/serialx/hashring" "google.golang.org/grpc" "google.golang.org/grpc/keepalive" + "k8s.io/apimachinery/pkg/util/sets" "d7y.io/dragonfly/v2/internal/dfcodes" "d7y.io/dragonfly/v2/internal/dferrors" @@ -204,16 +205,10 @@ func (conn *Connection) AddServerNodes(addrs []dfnet.NetAddr) error { } // findCandidateClientConn find candidate node client conn other than exclusiveNodes -func (conn *Connection) findCandidateClientConn(key string, exclusiveNodes ...string) (*candidateClient, error) { +func (conn *Connection) findCandidateClientConn(key string, exclusiveNodes sets.String) (*candidateClient, error) { if node, ok := conn.key2NodeMap.Load(key); ok { candidateNode := node.(string) - selected := true - for _, exclusiveNode := range exclusiveNodes { - if exclusiveNode == candidateNode { - selected = false - } - } - if selected { + if !exclusiveNodes.Has(candidateNode) { if client, ok := conn.node2ClientMap.Load(node); ok { return &candidateClient{ node: candidateNode, @@ -232,23 +227,17 @@ func (conn *Connection) findCandidateClientConn(key string, exclusiveNodes ...st } candidateNodes := make([]string, 0) for _, ringNode := range ringNodes { - candidate := true - for _, exclusiveNode := range exclusiveNodes { - if exclusiveNode == ringNode { - candidate = false - } - } - if candidate { + if !exclusiveNodes.Has(ringNode) { candidateNodes = append(candidateNodes, ringNode) } } logger.With("conn", conn.name).Infof("candidate result for hash key %s: all server node list: %v, exclusiveNodes node list: %v, candidate node list: %v", - key, ringNodes, exclusiveNodes, candidateNodes) + key, ringNodes, exclusiveNodes.List(), candidateNodes) for _, candidateNode := range candidateNodes { // Check whether there is a corresponding mapping client in the node2ClientMap // TODO 下面部分可以直接调用loadOrCreate方法,但是日志没有这么调用打印全 if client, ok := conn.node2ClientMap.Load(candidateNode); ok { - logger.With("conn", conn.name).Infof("hit cache candidateNode %s for hash key %s", candidateNode, key) + logger.With("conn", conn.name).Debugf("hit cache candidateNode %s for hash key %s", candidateNode, key) return &candidateClient{ node: candidateNode, Ref: client, @@ -351,12 +340,12 @@ func (conn *Connection) GetClientConn(hashKey string, stick bool) (*grpc.ClientC } return clientConn, nil } - logger.With("conn", conn.name).Infof("no server node associated with hash key %s was found, start find candidate", hashKey) + logger.With("conn", conn.name).Infof("no server node associated with hash key %s was found, start find candidate server", hashKey) conn.rwMutex.RUnlock() // if absence conn.rwMutex.Lock() defer conn.rwMutex.Unlock() - client, err := conn.findCandidateClientConn(hashKey) + client, err := conn.findCandidateClientConn(hashKey, sets.NewString()) if err != nil { return nil, errors.Wrapf(err, "prob candidate client conn for hash key %s", hashKey) } @@ -378,16 +367,17 @@ func (conn *Connection) TryMigrate(key string, cause error, exclusiveNodes []str } currentNode := "" conn.rwMutex.RLock() - if currentNode, ok := conn.key2NodeMap.Load(key); ok { - preNode = currentNode.(string) - exclusiveNodes = append(exclusiveNodes, currentNode.(string)) + if node, ok := conn.key2NodeMap.Load(key); ok { + currentNode = node.(string) + preNode = currentNode + exclusiveNodes = append(exclusiveNodes, preNode) } else { logger.With("conn", conn.name).Warnf("failed to find server node for hash key %s", key) } conn.rwMutex.RUnlock() conn.rwMutex.Lock() defer conn.rwMutex.Unlock() - client, err := conn.findCandidateClientConn(key, exclusiveNodes...) + client, err := conn.findCandidateClientConn(key, sets.NewString(exclusiveNodes...)) if err != nil { return "", errors.Wrapf(err, "find candidate client conn for hash key %s", key) } diff --git a/pkg/rpc/client_util.go b/pkg/rpc/client_util.go index d6ddf92e4b8..2d3a47a1e21 100644 --- a/pkg/rpc/client_util.go +++ b/pkg/rpc/client_util.go @@ -22,6 +22,7 @@ import ( "time" "google.golang.org/grpc" + "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "d7y.io/dragonfly/v2/internal/dfcodes" @@ -190,6 +191,9 @@ func ExecuteWithRetry(f func() (interface{}, error), initBackoff float64, maxBac return res, cause } } + if status.Code(cause) == codes.DeadlineExceeded || status.Code(cause) == codes.Canceled { + return res, cause + } if i > 0 { time.Sleep(mathutils.RandBackoff(initBackoff, maxBackoff, 2.0, i)) } diff --git a/pkg/rpc/dfdaemon/client/down_result_stream.go b/pkg/rpc/dfdaemon/client/down_result_stream.go index 66d9eafc912..4d1848129bb 100644 --- a/pkg/rpc/dfdaemon/client/down_result_stream.go +++ b/pkg/rpc/dfdaemon/client/down_result_stream.go @@ -23,7 +23,6 @@ import ( logger "d7y.io/dragonfly/v2/internal/dflog" "d7y.io/dragonfly/v2/pkg/rpc" "d7y.io/dragonfly/v2/pkg/rpc/dfdaemon" - "github.com/pkg/errors" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -64,17 +63,17 @@ func newDownResultStream(ctx context.Context, dc *daemonClient, hashKey string, func (drs *DownResultStream) initStream() error { stream, err := rpc.ExecuteWithRetry(func() (interface{}, error) { - client, _, err := drs.dc.getDaemonClient(drs.hashKey, false) + client, target, err := drs.dc.getDaemonClient(drs.hashKey, false) if err != nil { return nil, err } + logger.WithTaskID(drs.hashKey).Infof("initStream: invoke daemon node %s Download", target) return client.Download(drs.ctx, drs.req, drs.opts...) }, drs.InitBackoff, drs.MaxBackOff, drs.MaxAttempts, nil) if err == nil { drs.stream = stream.(dfdaemon.Daemon_DownloadClient) drs.StreamTimes = 1 - } - if err != nil { + } else { err = drs.replaceClient(err) } @@ -105,9 +104,7 @@ func (drs *DownResultStream) retryRecv(cause error) (*dfdaemon.DownResult, error } if err := drs.replaceStream(cause); err != nil { - if err := drs.replaceClient(cause); err != nil { - return nil, cause - } + return nil, err } return drs.Recv() @@ -115,19 +112,22 @@ func (drs *DownResultStream) retryRecv(cause error) (*dfdaemon.DownResult, error func (drs *DownResultStream) replaceStream(cause error) error { if drs.StreamTimes >= drs.MaxAttempts { - return errors.New("times of replacing stream reaches limit") + return cause } stream, err := rpc.ExecuteWithRetry(func() (interface{}, error) { - client, _, err := drs.dc.getDaemonClient(drs.hashKey, true) + client, target, err := drs.dc.getDaemonClient(drs.hashKey, true) if err != nil { return nil, err } + logger.WithTaskID(drs.hashKey).Infof("replaceStream: invoke daemon node %s Download", target) return client.Download(drs.ctx, drs.req, drs.opts...) }, drs.InitBackoff, drs.MaxBackOff, drs.MaxAttempts, cause) if err == nil { drs.stream = stream.(dfdaemon.Daemon_DownloadClient) drs.StreamTimes++ + } else { + err = drs.replaceClient(cause) } return err } @@ -140,10 +140,11 @@ func (drs *DownResultStream) replaceClient(cause error) error { drs.failedServers = append(drs.failedServers, preNode) stream, err := rpc.ExecuteWithRetry(func() (interface{}, error) { - client, _, err := drs.dc.getDaemonClient(drs.hashKey, true) + client, target, err := drs.dc.getDaemonClient(drs.hashKey, true) if err != nil { return nil, err } + logger.WithTaskID(drs.hashKey).Infof("replaceClient: invoke daemon node %s Download", target) return client.Download(drs.ctx, drs.req, drs.opts...) }, drs.InitBackoff, drs.MaxBackOff, drs.MaxAttempts, cause) if err == nil { diff --git a/pkg/rpc/scheduler/client/client.go b/pkg/rpc/scheduler/client/client.go index 58c792f27de..65bbf0f6e42 100644 --- a/pkg/rpc/scheduler/client/client.go +++ b/pkg/rpc/scheduler/client/client.go @@ -47,10 +47,11 @@ func GetClientByAddr(addrs []dfnet.NetAddr, opts ...grpc.DialOption) (SchedulerC return sc, nil } -// see scheduler.SchedulerClient +// SchedulerClient see scheduler.SchedulerClient type SchedulerClient interface { + // RegisterPeerTask register peer task to scheduler RegisterPeerTask(context.Context, *scheduler.PeerTaskRequest, ...grpc.CallOption) (*scheduler.RegisterResult, error) - // IsMigrating of ptr will be set to true + // ReportPieceResult IsMigrating of ptr will be set to true ReportPieceResult(context.Context, string, *scheduler.PeerTaskRequest, ...grpc.CallOption) (PeerPacketStream, error) ReportPeerResult(context.Context, *scheduler.PeerResult, ...grpc.CallOption) error diff --git a/pkg/rpc/scheduler/client/peer_packet_stream.go b/pkg/rpc/scheduler/client/peer_packet_stream.go index 8c88a2c1734..061ca922f87 100644 --- a/pkg/rpc/scheduler/client/peer_packet_stream.go +++ b/pkg/rpc/scheduler/client/peer_packet_stream.go @@ -20,7 +20,7 @@ import ( "context" "io" - "github.com/pkg/errors" + logger "d7y.io/dragonfly/v2/internal/dflog" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -108,9 +108,7 @@ func (pps *peerPacketStream) retrySend(pr *scheduler.PieceResult, cause error) e } if err := pps.replaceStream(cause); err != nil { - if err := pps.replaceClient(cause); err != nil { - return cause - } + return err } return pps.Send(pr) @@ -151,10 +149,11 @@ func (pps *peerPacketStream) retryRecv(cause error) (*scheduler.PeerPacket, erro func (pps *peerPacketStream) initStream() error { stream, err := rpc.ExecuteWithRetry(func() (interface{}, error) { - client, _, err := pps.sc.getSchedulerClient(pps.hashKey, true) + client, target, err := pps.sc.getSchedulerClient(pps.hashKey, true) if err != nil { return nil, err } + logger.WithTaskID(pps.hashKey).Infof("initStream: invoke scheduler node %s ReportPieceResult", target) return client.ReportPieceResult(pps.ctx, pps.opts...) }, pps.retryMeta.InitBackoff, pps.retryMeta.MaxBackOff, pps.retryMeta.MaxAttempts, nil) if err == nil { @@ -169,18 +168,21 @@ func (pps *peerPacketStream) initStream() error { func (pps *peerPacketStream) replaceStream(cause error) error { if pps.retryMeta.StreamTimes >= pps.retryMeta.MaxAttempts { - return errors.New("times of replacing stream reaches limit") + return cause } res, err := rpc.ExecuteWithRetry(func() (interface{}, error) { - client, _, err := pps.sc.getSchedulerClient(pps.hashKey, true) + client, target, err := pps.sc.getSchedulerClient(pps.hashKey, true) if err != nil { return nil, err } + logger.WithTaskID(pps.hashKey).Infof("replaceStream: invoke scheduler node %s ReportPieceResult", target) return client.ReportPieceResult(pps.ctx, pps.opts...) }, pps.retryMeta.InitBackoff, pps.retryMeta.MaxBackOff, pps.retryMeta.MaxAttempts, cause) if err == nil { pps.stream = res.(scheduler.Scheduler_ReportPieceResultClient) pps.retryMeta.StreamTimes++ + } else { + err = pps.replaceStream(cause) } return err } @@ -193,7 +195,7 @@ func (pps *peerPacketStream) replaceClient(cause error) error { pps.failedServers = append(pps.failedServers, preNode) stream, err := rpc.ExecuteWithRetry(func() (interface{}, error) { - client, _, err := pps.sc.getSchedulerClient(pps.hashKey, true) + client, target, err := pps.sc.getSchedulerClient(pps.hashKey, true) if err != nil { return nil, err } @@ -203,13 +205,13 @@ func (pps *peerPacketStream) replaceClient(cause error) error { if err != nil { return nil, err } + logger.WithTaskID(pps.hashKey).Infof("replaceClient: invoke scheduler node %s ReportPieceResult", target) return client.ReportPieceResult(pps.ctx, pps.opts...) }, pps.retryMeta.InitBackoff, pps.retryMeta.MaxBackOff, pps.retryMeta.MaxAttempts, cause) if err == nil { pps.stream = stream.(scheduler.Scheduler_ReportPieceResultClient) pps.retryMeta.StreamTimes = 1 - } - if err != nil { + } else { err = pps.replaceClient(cause) } return err diff --git a/scheduler/core/service.go b/scheduler/core/service.go index 04597461203..5cf659cccf2 100644 --- a/scheduler/core/service.go +++ b/scheduler/core/service.go @@ -179,14 +179,12 @@ func (s *SchedulerService) GetPeerTask(peerTaskID string) (peerTask *supervisor. } func (s *SchedulerService) RegisterPeerTask(req *schedulerRPC.PeerTaskRequest, task *supervisor.Task) (*supervisor.Peer, error) { - // get or create host - reqPeerHost := req.PeerHost var ( peer *supervisor.Peer ok bool peerHost *supervisor.PeerHost ) - + reqPeerHost := req.PeerHost if peerHost, ok = s.hostManager.Get(reqPeerHost.Uuid); !ok { peerHost = supervisor.NewClientPeerHost(reqPeerHost.Uuid, reqPeerHost.Ip, reqPeerHost.HostName, reqPeerHost.RpcPort, reqPeerHost.DownPort, reqPeerHost.SecurityDomain, reqPeerHost.Location, reqPeerHost.Idc, reqPeerHost.NetTopology, s.config.ClientLoad) From 79da20ead4a86069df2756b1309c9ec77332464b Mon Sep 17 00:00:00 2001 From: santong <244372610@qq.com> Date: Tue, 31 Aug 2021 01:01:03 +0800 Subject: [PATCH 2/8] feat: optimize grpc Signed-off-by: santong <244372610@qq.com> --- pkg/rpc/cdnsystem/client/piece_seed_stream.go | 5 +++++ pkg/rpc/dfdaemon/client/down_result_stream.go | 2 +- pkg/rpc/scheduler/client/peer_packet_stream.go | 4 ++-- 3 files changed, 8 insertions(+), 3 deletions(-) diff --git a/pkg/rpc/cdnsystem/client/piece_seed_stream.go b/pkg/rpc/cdnsystem/client/piece_seed_stream.go index 9a792889e75..84f8cb22b20 100644 --- a/pkg/rpc/cdnsystem/client/piece_seed_stream.go +++ b/pkg/rpc/cdnsystem/client/piece_seed_stream.go @@ -24,6 +24,8 @@ import ( "d7y.io/dragonfly/v2/pkg/rpc" "d7y.io/dragonfly/v2/pkg/rpc/cdnsystem" "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) type PieceSeedStream struct { @@ -88,6 +90,9 @@ func (pss *PieceSeedStream) Recv() (ps *cdnsystem.PieceSeed, err error) { } func (pss *PieceSeedStream) retryRecv(cause error) (*cdnsystem.PieceSeed, error) { + if status.Code(cause) == codes.DeadlineExceeded || status.Code(cause) == codes.Canceled { + return nil, cause + } if err := pss.replaceStream(cause); err != nil { return nil, err } diff --git a/pkg/rpc/dfdaemon/client/down_result_stream.go b/pkg/rpc/dfdaemon/client/down_result_stream.go index 4d1848129bb..b400c702133 100644 --- a/pkg/rpc/dfdaemon/client/down_result_stream.go +++ b/pkg/rpc/dfdaemon/client/down_result_stream.go @@ -99,7 +99,7 @@ func (drs *DownResultStream) Recv() (dr *dfdaemon.DownResult, err error) { } func (drs *DownResultStream) retryRecv(cause error) (*dfdaemon.DownResult, error) { - if status.Code(cause) == codes.DeadlineExceeded { + if status.Code(cause) == codes.DeadlineExceeded || status.Code(cause) == codes.Canceled { return nil, cause } diff --git a/pkg/rpc/scheduler/client/peer_packet_stream.go b/pkg/rpc/scheduler/client/peer_packet_stream.go index 061ca922f87..b0fd68f0513 100644 --- a/pkg/rpc/scheduler/client/peer_packet_stream.go +++ b/pkg/rpc/scheduler/client/peer_packet_stream.go @@ -103,7 +103,7 @@ func (pps *peerPacketStream) Recv() (pp *scheduler.PeerPacket, err error) { } func (pps *peerPacketStream) retrySend(pr *scheduler.PieceResult, cause error) error { - if status.Code(cause) == codes.DeadlineExceeded { + if status.Code(cause) == codes.DeadlineExceeded || status.Code(cause) == codes.Canceled { return cause } @@ -115,7 +115,7 @@ func (pps *peerPacketStream) retrySend(pr *scheduler.PieceResult, cause error) e } func (pps *peerPacketStream) retryRecv(cause error) (*scheduler.PeerPacket, error) { - if status.Code(cause) == codes.DeadlineExceeded { + if status.Code(cause) == codes.DeadlineExceeded || status.Code(cause) == codes.Canceled { return nil, cause } _, err := rpc.ExecuteWithRetry(func() (interface{}, error) { From 29030ded691d7499f5efee6b80ee8eae65de2837 Mon Sep 17 00:00:00 2001 From: santong <244372610@qq.com> Date: Tue, 31 Aug 2021 16:26:09 +0800 Subject: [PATCH 3/8] feat: optimize grpc Signed-off-by: santong <244372610@qq.com> --- pkg/rpc/client.go | 1 + pkg/rpc/scheduler/client/client.go | 172 ++++++++++-------- .../core/scheduler/basic/basic_scheduler.go | 1 + 3 files changed, 95 insertions(+), 79 deletions(-) diff --git a/pkg/rpc/client.go b/pkg/rpc/client.go index 51d66e26a86..e6019a69e50 100644 --- a/pkg/rpc/client.go +++ b/pkg/rpc/client.go @@ -323,6 +323,7 @@ func (conn *Connection) loadOrCreateClientConnByNode(node string) (clientConn *g // stick whether hash key need already associated with specify node func (conn *Connection) GetClientConn(hashKey string, stick bool) (*grpc.ClientConn, error) { logger.With("conn", conn.name).Debugf("start to get client conn hashKey %s, stick %t", hashKey, stick) + defer logger.With("conn", conn.name).Debugf("get client conn hashKey %s, stick %t end", hashKey, stick) conn.rwMutex.RLock() node, ok := conn.key2NodeMap.Load(hashKey) if stick && !ok { diff --git a/pkg/rpc/scheduler/client/client.go b/pkg/rpc/scheduler/client/client.go index 65bbf0f6e42..4da03ff952e 100644 --- a/pkg/rpc/scheduler/client/client.go +++ b/pkg/rpc/scheduler/client/client.go @@ -20,17 +20,17 @@ import ( "context" "time" - "github.com/pkg/errors" - "google.golang.org/grpc" - "d7y.io/dragonfly/v2/internal/dfcodes" - "d7y.io/dragonfly/v2/internal/dferrors" logger "d7y.io/dragonfly/v2/internal/dflog" "d7y.io/dragonfly/v2/internal/idgen" "d7y.io/dragonfly/v2/pkg/basic/dfnet" "d7y.io/dragonfly/v2/pkg/rpc" "d7y.io/dragonfly/v2/pkg/rpc/base" "d7y.io/dragonfly/v2/pkg/rpc/scheduler" + "github.com/pkg/errors" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) func GetClientByAddr(addrs []dfnet.NetAddr, opts ...grpc.DialOption) (SchedulerClient, error) { @@ -73,16 +73,9 @@ func (sc *schedulerClient) getSchedulerClient(key string, stick bool) (scheduler return scheduler.NewSchedulerClient(clientConn), clientConn.Target(), nil } -func (sc *schedulerClient) RegisterPeerTask(ctx context.Context, ptr *scheduler.PeerTaskRequest, opts ...grpc.CallOption) (rr *scheduler.RegisterResult, - err error) { - return sc.doRegisterPeerTask(ctx, ptr, []string{}, opts) -} - -func (sc *schedulerClient) doRegisterPeerTask(ctx context.Context, ptr *scheduler.PeerTaskRequest, exclusiveNodes []string, - opts []grpc.CallOption) (rr *scheduler.RegisterResult, err error) { +func (sc *schedulerClient) RegisterPeerTask(ctx context.Context, ptr *scheduler.PeerTaskRequest, opts ...grpc.CallOption) (*scheduler.RegisterResult, error) { var ( taskID string - code base.Code schedulerNode string res interface{} ) @@ -91,65 +84,67 @@ func (sc *schedulerClient) doRegisterPeerTask(ctx context.Context, ptr *schedule ptr.Url) reg := func() (interface{}, error) { var client scheduler.SchedulerClient + var err error client, schedulerNode, err = sc.getSchedulerClient(key, false) if err != nil { - code = dfcodes.ServerUnavailable return nil, err } return client.RegisterPeerTask(ctx, ptr, opts...) } - res, err = rpc.ExecuteWithRetry(reg, 0.5, 5.0, 5, nil) - if err == nil { - rr = res.(*scheduler.RegisterResult) - taskID = rr.TaskId - code = dfcodes.Success - if taskID != key { - logger.WithTaskAndPeerID(taskID, ptr.PeerId).Warnf("register peer task correct taskId from %s to %s", key, taskID) - sc.Connection.CorrectKey2NodeRelation(key, taskID) - } - logger.With("peerId", ptr.PeerId). - Infof("register peer task result success for taskId: %s, url: %s, scheduler: %s", - taskID, ptr.Url, schedulerNode) - return + res, err := rpc.ExecuteWithRetry(reg, 0.2, 2.0, 3, nil) + if err != nil { + logger.WithTaskAndPeerID(key, ptr.PeerId).Errorf("RegisterPeerTask: register peer task to scheduler %s failed: %v", schedulerNode, err) + return sc.retryRegisterPeerTask(ctx, key, ptr, []string{schedulerNode}, err, opts) } - - if de, ok := err.(*dferrors.DfError); ok { - code = de.Code + rr := res.(*scheduler.RegisterResult) + taskID = rr.TaskId + if taskID != key { + logger.WithTaskAndPeerID(taskID, ptr.PeerId).Warnf("register peer task correct taskId from %s to %s", key, taskID) + sc.Connection.CorrectKey2NodeRelation(key, taskID) } - logger.With("peerId", ptr.PeerId, "errMsg", err). - Errorf("register peer task result failed, code: [%d] for taskId: %s, url: %s, scheduler: %s", - int32(code), taskID, ptr.Url, schedulerNode) + logger.WithTaskAndPeerID(taskID, ptr.PeerId). + Infof("register peer task result success url: %s, scheduler: %s", ptr.Url, schedulerNode) + return rr, err +} - // previous schedule failed, report peer task to free load and other resources - var client scheduler.SchedulerClient - client, schedulerNode, err = sc.getSchedulerClient(key, false) - if err != nil { - logger.With("peerId", ptr.PeerId, "errMsg", err).Errorf("get scheduler client failed") +func (sc *schedulerClient) retryRegisterPeerTask(ctx context.Context, hashKey string, ptr *scheduler.PeerTaskRequest, exclusiveNodes []string, cause error, + opts []grpc.CallOption) (*scheduler.RegisterResult, error) { + if status.Code(cause) == codes.Canceled || status.Code(cause) == codes.DeadlineExceeded { + return nil, cause + } + var ( + taskID string + schedulerNode string + res interface{} + ) + if preNode, err := sc.TryMigrate(hashKey, cause, exclusiveNodes); err != nil { + return nil, cause } else { - _, e := client.ReportPeerResult( - context.Background(), - &scheduler.PeerResult{ - TaskId: taskID, - PeerId: ptr.PeerId, - SrcIp: ptr.PeerHost.Ip, - SecurityDomain: ptr.PeerHost.SecurityDomain, - Idc: ptr.PeerHost.Idc, - Url: ptr.Url, - ContentLength: -1, - Traffic: -1, - Cost: 0, - Success: false, - Code: dfcodes.UnknownError, - }) - logger.With("peerId", ptr.PeerId, "errMsg", e).Warnf("report failed peer result") - } - - var preNode string - if preNode, err = sc.TryMigrate(key, err, exclusiveNodes); err == nil { exclusiveNodes = append(exclusiveNodes, preNode) - return sc.doRegisterPeerTask(ctx, ptr, exclusiveNodes, opts) } - return + res, err := rpc.ExecuteWithRetry(func() (interface{}, error) { + var client scheduler.SchedulerClient + var err error + client, schedulerNode, err = sc.getSchedulerClient(hashKey, true) + if err != nil { + return nil, err + } + return client.RegisterPeerTask(ctx, ptr, opts...) + }, 0.2, 2.0, 3, cause) + if err != nil { + logger.WithTaskAndPeerID(hashKey, ptr.PeerId).Errorf("retryRegisterPeerTask: register peer task to scheduler %s failed: %v", schedulerNode, err) + return sc.retryRegisterPeerTask(ctx, hashKey, ptr, exclusiveNodes, err, opts) + } + rr := res.(*scheduler.RegisterResult) + taskID = rr.TaskId + if taskID != hashKey { + logger.WithTaskAndPeerID(taskID, ptr.PeerId).Warnf("register peer task correct taskId from %s to %s", hashKey, taskID) + sc.Connection.CorrectKey2NodeRelation(hashKey, taskID) + } + logger.WithTaskAndPeerID(taskID, ptr.PeerId). + Infof("register peer task result success url: %s, scheduler: %s", ptr.Url, schedulerNode) + return rr, nil + } func (sc *schedulerClient) ReportPieceResult(ctx context.Context, taskID string, ptr *scheduler.PeerTaskRequest, opts ...grpc.CallOption) (PeerPacketStream, error) { @@ -163,16 +158,42 @@ func (sc *schedulerClient) ReportPieceResult(ctx context.Context, taskID string, } func (sc *schedulerClient) ReportPeerResult(ctx context.Context, pr *scheduler.PeerResult, opts ...grpc.CallOption) error { - return sc.doReportPeerResult(ctx, pr, []string{}, opts) + var ( + schedulerNode string + code base.Code + ) + _, err := rpc.ExecuteWithRetry(func() (interface{}, error) { + var client scheduler.SchedulerClient + var err error + client, schedulerNode, err = sc.getSchedulerClient(pr.TaskId, true) + if err != nil { + code = dfcodes.ServerUnavailable + return nil, err + } + return client.ReportPeerResult(ctx, pr, opts...) + }, 0.2, 2.0, 3, nil) + if err != nil { + logger.WithTaskAndPeerID(pr.TaskId, pr.PeerId).Errorf("ReportPeerResult: report peer result to scheduler %s failed: %v", schedulerNode, err) + return sc.retryReportPeerResult(ctx, pr, []string{schedulerNode}, err, opts) + } + return nil } -func (sc *schedulerClient) doReportPeerResult(ctx context.Context, pr *scheduler.PeerResult, exclusiveNodes []string, opts []grpc.CallOption) (err error) { +func (sc *schedulerClient) retryReportPeerResult(ctx context.Context, pr *scheduler.PeerResult, exclusiveNodes []string, + cause error, opts []grpc.CallOption) (err error) { + if status.Code(cause) == codes.Canceled || status.Code(cause) == codes.DeadlineExceeded { + return cause + } var ( schedulerNode string suc bool code base.Code ) - + if preNode, err := sc.TryMigrate(pr.TaskId, err, exclusiveNodes); err != nil { + return cause + } else { + exclusiveNodes = append(exclusiveNodes, preNode) + } _, err = rpc.ExecuteWithRetry(func() (interface{}, error) { var client scheduler.SchedulerClient client, schedulerNode, err = sc.getSchedulerClient(pr.TaskId, true) @@ -181,24 +202,16 @@ func (sc *schedulerClient) doReportPeerResult(ctx context.Context, pr *scheduler return nil, err } return client.ReportPeerResult(ctx, pr, opts...) - }, 0.5, 5.0, 5, nil) - if err == nil { - suc = true - code = dfcodes.Success + }, 0.2, 2.0, 3, nil) + if err != nil { + logger.WithTaskAndPeerID(pr.TaskId, pr.PeerId).Errorf("retryReportPeerResult: report peer result to scheduler %s failed: %v", schedulerNode, err) + return sc.retryReportPeerResult(ctx, pr, exclusiveNodes, cause, opts) } logger.With("peerId", pr.PeerId, "errMsg", err). Infof("report peer result: %t[%d], peer task down result: %t[%d] for taskId: %s, url: %s, scheduler: %s, length: %d, traffic: %d, cost: %d", suc, int32(code), pr.Success, int32(pr.Code), pr.TaskId, pr.Url, schedulerNode, pr.ContentLength, pr.Traffic, pr.Cost) - if err != nil { - var preNode string - if preNode, err = sc.TryMigrate(pr.TaskId, err, exclusiveNodes); err == nil { - exclusiveNodes = append(exclusiveNodes, preNode) - return sc.doReportPeerResult(ctx, pr, exclusiveNodes, opts) - } - } - return } @@ -208,21 +221,22 @@ func (sc *schedulerClient) LeaveTask(ctx context.Context, pt *scheduler.PeerTarg suc bool ) defer func() { - logger.With("peerId", pt.PeerId, "errMsg", err).Infof("leave from task result: %t for taskId: %s, scheduler: %s", suc, pt.TaskId, schedulerNode) + logger.With("peerId", pt.PeerId, "errMsg", err).Infof("leave from task result: %t for taskId: %s, scheduler server node: %s, err:%v", suc, pt.TaskId, + schedulerNode, err) }() - _, err = rpc.ExecuteWithRetry(func() (interface{}, error) { + leaveFun := func() (interface{}, error) { var client scheduler.SchedulerClient - client, schedulerNode, err = sc.getSchedulerClient(pt.TaskId, true) + client, schedulerNode, err = sc.getSchedulerClient(pt.TaskId, false) if err != nil { return nil, err } return client.LeaveTask(ctx, pt, opts...) - }, 0.5, 5.0, 3, nil) + } + _, err = rpc.ExecuteWithRetry(leaveFun, 0.2, 2.0, 3, nil) if err == nil { suc = true } - return } diff --git a/scheduler/core/scheduler/basic/basic_scheduler.go b/scheduler/core/scheduler/basic/basic_scheduler.go index 289649ec5dc..0c1f4589f1f 100644 --- a/scheduler/core/scheduler/basic/basic_scheduler.go +++ b/scheduler/core/scheduler/basic/basic_scheduler.go @@ -146,6 +146,7 @@ func (s *Scheduler) selectCandidateChildren(peer *supervisor.Peer, limit int) (l logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("******candidate child peer is not selected because it is nil") return false } + // TODO IsWaiting if candidateNode.IsDone() { logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("******candidate child peer %s is not selected because it has done", candidateNode.PeerID) From 839579c2af2f980cb9976608aca46bcbf4eafea4 Mon Sep 17 00:00:00 2001 From: santong <244372610@qq.com> Date: Tue, 31 Aug 2021 16:28:55 +0800 Subject: [PATCH 4/8] feat: optimize grpc Signed-off-by: santong <244372610@qq.com> --- pkg/rpc/scheduler/client/client.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/pkg/rpc/scheduler/client/client.go b/pkg/rpc/scheduler/client/client.go index 4da03ff952e..1baebd5e2a9 100644 --- a/pkg/rpc/scheduler/client/client.go +++ b/pkg/rpc/scheduler/client/client.go @@ -160,14 +160,12 @@ func (sc *schedulerClient) ReportPieceResult(ctx context.Context, taskID string, func (sc *schedulerClient) ReportPeerResult(ctx context.Context, pr *scheduler.PeerResult, opts ...grpc.CallOption) error { var ( schedulerNode string - code base.Code ) _, err := rpc.ExecuteWithRetry(func() (interface{}, error) { var client scheduler.SchedulerClient var err error client, schedulerNode, err = sc.getSchedulerClient(pr.TaskId, true) if err != nil { - code = dfcodes.ServerUnavailable return nil, err } return client.ReportPeerResult(ctx, pr, opts...) From 8c6ec64ecd2b8d493981ed5a574dfa0d7e24f47d Mon Sep 17 00:00:00 2001 From: santong <244372610@qq.com> Date: Tue, 31 Aug 2021 16:59:42 +0800 Subject: [PATCH 5/8] feat: optimize grpc Signed-off-by: santong <244372610@qq.com> --- pkg/rpc/cdnsystem/client/piece_seed_stream.go | 58 ++++++++++--------- pkg/rpc/cdnsystem/server/server.go | 2 +- pkg/rpc/client.go | 6 ++ pkg/rpc/dfdaemon/client/client.go | 2 +- pkg/rpc/dfdaemon/client/down_result_stream.go | 58 ++++++++++--------- .../scheduler/client/peer_packet_stream.go | 57 +++++++++--------- 6 files changed, 100 insertions(+), 83 deletions(-) diff --git a/pkg/rpc/cdnsystem/client/piece_seed_stream.go b/pkg/rpc/cdnsystem/client/piece_seed_stream.go index 84f8cb22b20..4bcbd0dc897 100644 --- a/pkg/rpc/cdnsystem/client/piece_seed_stream.go +++ b/pkg/rpc/cdnsystem/client/piece_seed_stream.go @@ -63,22 +63,23 @@ func newPieceSeedStream(ctx context.Context, sc *cdnClient, hashKey string, sr * } func (pss *PieceSeedStream) initStream() error { + var target string stream, err := rpc.ExecuteWithRetry(func() (interface{}, error) { - client, target, err := pss.sc.getCdnClient(pss.hashKey, false) + var client cdnsystem.SeederClient + var err error + client, target, err = pss.sc.getCdnClient(pss.hashKey, false) if err != nil { return nil, err } - logger.WithTaskID(pss.hashKey).Infof("initStream: invoke cdn node %s ObtainSeeds", target) return client.ObtainSeeds(pss.ctx, pss.sr, pss.opts...) }, pss.InitBackoff, pss.MaxBackOff, pss.MaxAttempts, nil) - if err == nil { - pss.stream = stream.(cdnsystem.Seeder_ObtainSeedsClient) - pss.StreamTimes = 1 - } if err != nil { - err = pss.replaceClient(pss.hashKey, err) + logger.WithTaskID(pss.hashKey).Infof("initStream: invoke cdn node %s ObtainSeeds failed: %v", target, err) + return pss.replaceClient(pss.hashKey, err) } - return err + pss.stream = stream.(cdnsystem.Seeder_ObtainSeedsClient) + pss.StreamTimes = 1 + return nil } func (pss *PieceSeedStream) Recv() (ps *cdnsystem.PieceSeed, err error) { @@ -103,44 +104,47 @@ func (pss *PieceSeedStream) replaceStream(cause error) error { if pss.StreamTimes >= pss.MaxAttempts { return cause } - + var target string stream, err := rpc.ExecuteWithRetry(func() (interface{}, error) { - client, target, err := pss.sc.getCdnClient(pss.hashKey, true) + var client cdnsystem.SeederClient + var err error + client, target, err = pss.sc.getCdnClient(pss.hashKey, true) if err != nil { return nil, err } - logger.WithTaskID(pss.hashKey).Infof("replaceStream: invoke cdn node %s ObtainSeeds", target) return client.ObtainSeeds(pss.ctx, pss.sr, pss.opts...) }, pss.InitBackoff, pss.MaxBackOff, pss.MaxAttempts, cause) - if err == nil { - pss.stream = stream.(cdnsystem.Seeder_ObtainSeedsClient) - pss.StreamTimes++ - } else { - err = pss.replaceStream(cause) + if err != nil { + logger.WithTaskID(pss.hashKey).Infof("replaceStream: invoke cdn node %s ObtainSeeds", target) + return pss.replaceStream(cause) } - return err + pss.stream = stream.(cdnsystem.Seeder_ObtainSeedsClient) + pss.StreamTimes++ + return nil } func (pss *PieceSeedStream) replaceClient(key string, cause error) error { preNode, err := pss.sc.TryMigrate(key, cause, pss.failedServers) if err != nil { - return err + logger.WithTaskID(pss.hashKey).Infof("replaceClient: tryMigrate cdn node failed: %v", err) + return cause } pss.failedServers = append(pss.failedServers, preNode) - + var target string stream, err := rpc.ExecuteWithRetry(func() (interface{}, error) { - client, target, err := pss.sc.getCdnClient(key, true) + var client cdnsystem.SeederClient + var err error + client, target, err = pss.sc.getCdnClient(key, true) if err != nil { return nil, err } - logger.WithTaskID(pss.hashKey).Infof("replaceClient: invoke cdn node %s ObtainSeeds", target) return client.ObtainSeeds(pss.ctx, pss.sr, pss.opts...) }, pss.InitBackoff, pss.MaxBackOff, pss.MaxAttempts, cause) - if err == nil { - pss.stream = stream.(cdnsystem.Seeder_ObtainSeedsClient) - pss.StreamTimes = 1 - } else { - err = pss.replaceClient(key, cause) + if err != nil { + logger.WithTaskID(pss.hashKey).Infof("replaceClient: invoke cdn node %s ObtainSeeds failed: %v", target, err) + return pss.replaceClient(key, cause) } - return err + pss.stream = stream.(cdnsystem.Seeder_ObtainSeedsClient) + pss.StreamTimes = 1 + return nil } diff --git a/pkg/rpc/cdnsystem/server/server.go b/pkg/rpc/cdnsystem/server/server.go index dba2f0b7c90..ff01f88f9b4 100644 --- a/pkg/rpc/cdnsystem/server/server.go +++ b/pkg/rpc/cdnsystem/server/server.go @@ -39,7 +39,7 @@ func init() { }) } -// see cdnsystem.SeederServer +// SeederServer see cdnsystem.SeederServer type SeederServer interface { ObtainSeeds(context.Context, *cdnsystem.SeedRequest, chan<- *cdnsystem.PieceSeed) error GetPieceTasks(context.Context, *base.PieceTaskRequest) (*base.PiecePacket, error) diff --git a/pkg/rpc/client.go b/pkg/rpc/client.go index e6019a69e50..729cb502506 100644 --- a/pkg/rpc/client.go +++ b/pkg/rpc/client.go @@ -25,7 +25,9 @@ import ( "github.com/pkg/errors" "github.com/serialx/hashring" "google.golang.org/grpc" + "google.golang.org/grpc/codes" "google.golang.org/grpc/keepalive" + "google.golang.org/grpc/status" "k8s.io/apimachinery/pkg/util/sets" "d7y.io/dragonfly/v2/internal/dfcodes" @@ -360,6 +362,10 @@ func (conn *Connection) GetClientConn(hashKey string, stick bool) (*grpc.ClientC // preNode node before the migration func (conn *Connection) TryMigrate(key string, cause error, exclusiveNodes []string) (preNode string, err error) { logger.With("conn", conn.name).Infof("start try migrate server node for key %s, cause err: %v", key, cause) + if status.Code(cause) == codes.DeadlineExceeded || status.Code(cause) == codes.Canceled { + logger.With("conn", conn.name).Infof("migrate server node for key %s failed, cause err: %v", key, cause) + return "", cause + } // TODO recover findCandidateClientConn error if e, ok := cause.(*dferrors.DfError); ok { if e.Code != dfcodes.ResourceLacked && e.Code != dfcodes.UnknownError { diff --git a/pkg/rpc/dfdaemon/client/client.go b/pkg/rpc/dfdaemon/client/client.go index a23f2a0aa65..9548a3c13c4 100644 --- a/pkg/rpc/dfdaemon/client/client.go +++ b/pkg/rpc/dfdaemon/client/client.go @@ -67,7 +67,7 @@ func GetElasticClientByAddrs(addrs []dfnet.NetAddr, opts ...grpc.DialOption) (Da return elasticDaemonClient, nil } -// see dfdaemon.DaemonClient +// DaemonClient see dfdaemon.DaemonClient type DaemonClient interface { Download(ctx context.Context, req *dfdaemon.DownRequest, opts ...grpc.CallOption) (*DownResultStream, error) diff --git a/pkg/rpc/dfdaemon/client/down_result_stream.go b/pkg/rpc/dfdaemon/client/down_result_stream.go index b400c702133..41a6328b5a0 100644 --- a/pkg/rpc/dfdaemon/client/down_result_stream.go +++ b/pkg/rpc/dfdaemon/client/down_result_stream.go @@ -62,22 +62,23 @@ func newDownResultStream(ctx context.Context, dc *daemonClient, hashKey string, } func (drs *DownResultStream) initStream() error { + var target string stream, err := rpc.ExecuteWithRetry(func() (interface{}, error) { - client, target, err := drs.dc.getDaemonClient(drs.hashKey, false) + var client dfdaemon.DaemonClient + var err error + client, target, err = drs.dc.getDaemonClient(drs.hashKey, false) if err != nil { return nil, err } - logger.WithTaskID(drs.hashKey).Infof("initStream: invoke daemon node %s Download", target) return client.Download(drs.ctx, drs.req, drs.opts...) }, drs.InitBackoff, drs.MaxBackOff, drs.MaxAttempts, nil) - if err == nil { - drs.stream = stream.(dfdaemon.Daemon_DownloadClient) - drs.StreamTimes = 1 - } else { - err = drs.replaceClient(err) + if err != nil { + logger.WithTaskID(drs.hashKey).Infof("initStream: invoke daemon node %s Download failed: %v", target, err) + return drs.replaceClient(err) } - - return err + drs.stream = stream.(dfdaemon.Daemon_DownloadClient) + drs.StreamTimes = 1 + return nil } func (drs *DownResultStream) Recv() (dr *dfdaemon.DownResult, err error) { @@ -114,45 +115,48 @@ func (drs *DownResultStream) replaceStream(cause error) error { if drs.StreamTimes >= drs.MaxAttempts { return cause } - + var target string stream, err := rpc.ExecuteWithRetry(func() (interface{}, error) { - client, target, err := drs.dc.getDaemonClient(drs.hashKey, true) + var client dfdaemon.DaemonClient + var err error + client, target, err = drs.dc.getDaemonClient(drs.hashKey, true) if err != nil { return nil, err } - logger.WithTaskID(drs.hashKey).Infof("replaceStream: invoke daemon node %s Download", target) return client.Download(drs.ctx, drs.req, drs.opts...) }, drs.InitBackoff, drs.MaxBackOff, drs.MaxAttempts, cause) - if err == nil { - drs.stream = stream.(dfdaemon.Daemon_DownloadClient) - drs.StreamTimes++ - } else { - err = drs.replaceClient(cause) + if err != nil { + logger.WithTaskID(drs.hashKey).Infof("replaceStream: invoke daemon node %s Download", target) + return drs.replaceClient(cause) } - return err + drs.stream = stream.(dfdaemon.Daemon_DownloadClient) + drs.StreamTimes++ + return nil } func (drs *DownResultStream) replaceClient(cause error) error { preNode, err := drs.dc.TryMigrate(drs.hashKey, cause, drs.failedServers) if err != nil { - return err + logger.WithTaskID(drs.hashKey).Infof("replaceClient: tryMigrate daemon node failed: %v", err) + return cause } drs.failedServers = append(drs.failedServers, preNode) + var target string stream, err := rpc.ExecuteWithRetry(func() (interface{}, error) { - client, target, err := drs.dc.getDaemonClient(drs.hashKey, true) + var client dfdaemon.DaemonClient + var err error + client, target, err = drs.dc.getDaemonClient(drs.hashKey, true) if err != nil { return nil, err } - logger.WithTaskID(drs.hashKey).Infof("replaceClient: invoke daemon node %s Download", target) return client.Download(drs.ctx, drs.req, drs.opts...) }, drs.InitBackoff, drs.MaxBackOff, drs.MaxAttempts, cause) - if err == nil { - drs.stream = stream.(dfdaemon.Daemon_DownloadClient) - drs.StreamTimes = 1 - } if err != nil { - err = drs.replaceClient(cause) + logger.WithTaskID(drs.hashKey).Infof("replaceClient: invoke daemon node %s Download", target) + return drs.replaceClient(cause) } - return err + drs.stream = stream.(dfdaemon.Daemon_DownloadClient) + drs.StreamTimes = 1 + return nil } diff --git a/pkg/rpc/scheduler/client/peer_packet_stream.go b/pkg/rpc/scheduler/client/peer_packet_stream.go index b0fd68f0513..281fe903448 100644 --- a/pkg/rpc/scheduler/client/peer_packet_stream.go +++ b/pkg/rpc/scheduler/client/peer_packet_stream.go @@ -148,71 +148,74 @@ func (pps *peerPacketStream) retryRecv(cause error) (*scheduler.PeerPacket, erro } func (pps *peerPacketStream) initStream() error { + var target string stream, err := rpc.ExecuteWithRetry(func() (interface{}, error) { - client, target, err := pps.sc.getSchedulerClient(pps.hashKey, true) + var client scheduler.SchedulerClient + var err error + client, target, err = pps.sc.getSchedulerClient(pps.hashKey, true) if err != nil { return nil, err } - logger.WithTaskID(pps.hashKey).Infof("initStream: invoke scheduler node %s ReportPieceResult", target) return client.ReportPieceResult(pps.ctx, pps.opts...) }, pps.retryMeta.InitBackoff, pps.retryMeta.MaxBackOff, pps.retryMeta.MaxAttempts, nil) - if err == nil { - pps.stream = stream.(scheduler.Scheduler_ReportPieceResultClient) - pps.retryMeta.StreamTimes = 1 - } if err != nil { - err = pps.replaceClient(err) + logger.WithTaskID(pps.hashKey).Infof("initStream: invoke scheduler node %s ReportPieceResult failed: %v", target, err) + return pps.replaceClient(err) } - return err + pps.stream = stream.(scheduler.Scheduler_ReportPieceResultClient) + pps.retryMeta.StreamTimes = 1 + return nil } func (pps *peerPacketStream) replaceStream(cause error) error { if pps.retryMeta.StreamTimes >= pps.retryMeta.MaxAttempts { return cause } + var target string res, err := rpc.ExecuteWithRetry(func() (interface{}, error) { - client, target, err := pps.sc.getSchedulerClient(pps.hashKey, true) + var client scheduler.SchedulerClient + var err error + client, target, err = pps.sc.getSchedulerClient(pps.hashKey, true) if err != nil { return nil, err } - logger.WithTaskID(pps.hashKey).Infof("replaceStream: invoke scheduler node %s ReportPieceResult", target) return client.ReportPieceResult(pps.ctx, pps.opts...) }, pps.retryMeta.InitBackoff, pps.retryMeta.MaxBackOff, pps.retryMeta.MaxAttempts, cause) - if err == nil { - pps.stream = res.(scheduler.Scheduler_ReportPieceResultClient) - pps.retryMeta.StreamTimes++ - } else { + if err != nil { + logger.WithTaskID(pps.hashKey).Infof("replaceStream: invoke scheduler node %s ReportPieceResult failed: %v", target, err) err = pps.replaceStream(cause) } - return err + pps.stream = res.(scheduler.Scheduler_ReportPieceResultClient) + pps.retryMeta.StreamTimes++ + return nil } func (pps *peerPacketStream) replaceClient(cause error) error { preNode, err := pps.sc.TryMigrate(pps.hashKey, cause, pps.failedServers) if err != nil { - return err + logger.WithTaskID(pps.hashKey).Infof("replaceClient: tryMigrate scheduler node failed: %v", err) + return cause } pps.failedServers = append(pps.failedServers, preNode) - + var target string stream, err := rpc.ExecuteWithRetry(func() (interface{}, error) { - client, target, err := pps.sc.getSchedulerClient(pps.hashKey, true) + var client scheduler.SchedulerClient + var err error + client, target, err = pps.sc.getSchedulerClient(pps.hashKey, true) if err != nil { return nil, err } - //timeCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - //defer cancel() _, err = client.RegisterPeerTask(pps.ctx, pps.ptr) if err != nil { return nil, err } - logger.WithTaskID(pps.hashKey).Infof("replaceClient: invoke scheduler node %s ReportPieceResult", target) return client.ReportPieceResult(pps.ctx, pps.opts...) }, pps.retryMeta.InitBackoff, pps.retryMeta.MaxBackOff, pps.retryMeta.MaxAttempts, cause) - if err == nil { - pps.stream = stream.(scheduler.Scheduler_ReportPieceResultClient) - pps.retryMeta.StreamTimes = 1 - } else { - err = pps.replaceClient(cause) + if err != nil { + logger.WithTaskID(pps.hashKey).Infof("replaceClient: invoke scheduler node %s ReportPieceResult", target) + return pps.replaceClient(cause) } - return err + pps.stream = stream.(scheduler.Scheduler_ReportPieceResultClient) + pps.retryMeta.StreamTimes = 1 + return nil } From af2fe79872bae7fbffd379c73815233ef8860bc4 Mon Sep 17 00:00:00 2001 From: santong <244372610@qq.com> Date: Tue, 31 Aug 2021 17:06:01 +0800 Subject: [PATCH 6/8] feat: optimize grpc Signed-off-by: santong <244372610@qq.com> --- pkg/rpc/scheduler/client/client.go | 13 ++++++------- pkg/rpc/scheduler/client/peer_packet_stream.go | 2 +- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/pkg/rpc/scheduler/client/client.go b/pkg/rpc/scheduler/client/client.go index 1baebd5e2a9..f600b55b270 100644 --- a/pkg/rpc/scheduler/client/client.go +++ b/pkg/rpc/scheduler/client/client.go @@ -115,13 +115,12 @@ func (sc *schedulerClient) retryRegisterPeerTask(ctx context.Context, hashKey st var ( taskID string schedulerNode string - res interface{} ) - if preNode, err := sc.TryMigrate(hashKey, cause, exclusiveNodes); err != nil { + preNode, err := sc.TryMigrate(hashKey, cause, exclusiveNodes) + if err != nil { return nil, cause - } else { - exclusiveNodes = append(exclusiveNodes, preNode) } + exclusiveNodes = append(exclusiveNodes, preNode) res, err := rpc.ExecuteWithRetry(func() (interface{}, error) { var client scheduler.SchedulerClient var err error @@ -187,11 +186,11 @@ func (sc *schedulerClient) retryReportPeerResult(ctx context.Context, pr *schedu suc bool code base.Code ) - if preNode, err := sc.TryMigrate(pr.TaskId, err, exclusiveNodes); err != nil { + preNode, err := sc.TryMigrate(pr.TaskId, err, exclusiveNodes) + if err != nil { return cause - } else { - exclusiveNodes = append(exclusiveNodes, preNode) } + exclusiveNodes = append(exclusiveNodes, preNode) _, err = rpc.ExecuteWithRetry(func() (interface{}, error) { var client scheduler.SchedulerClient client, schedulerNode, err = sc.getSchedulerClient(pr.TaskId, true) diff --git a/pkg/rpc/scheduler/client/peer_packet_stream.go b/pkg/rpc/scheduler/client/peer_packet_stream.go index 281fe903448..711c420dec3 100644 --- a/pkg/rpc/scheduler/client/peer_packet_stream.go +++ b/pkg/rpc/scheduler/client/peer_packet_stream.go @@ -183,7 +183,7 @@ func (pps *peerPacketStream) replaceStream(cause error) error { }, pps.retryMeta.InitBackoff, pps.retryMeta.MaxBackOff, pps.retryMeta.MaxAttempts, cause) if err != nil { logger.WithTaskID(pps.hashKey).Infof("replaceStream: invoke scheduler node %s ReportPieceResult failed: %v", target, err) - err = pps.replaceStream(cause) + return pps.replaceStream(cause) } pps.stream = res.(scheduler.Scheduler_ReportPieceResultClient) pps.retryMeta.StreamTimes++ From 4c7471ccf314c1b02fad328ff1f306ec7b168022 Mon Sep 17 00:00:00 2001 From: santong <244372610@qq.com> Date: Tue, 31 Aug 2021 17:38:33 +0800 Subject: [PATCH 7/8] feat: framework log Signed-off-by: santong <244372610@qq.com> --- pkg/rpc/cdnsystem/client/client.go | 10 ++++------ pkg/rpc/cdnsystem/client/piece_seed_stream.go | 3 ++- pkg/rpc/dfdaemon/client/client.go | 15 +++++++++------ pkg/rpc/dfdaemon/client/down_result_stream.go | 5 +++-- pkg/rpc/scheduler/client/peer_packet_stream.go | 2 +- 5 files changed, 19 insertions(+), 16 deletions(-) diff --git a/pkg/rpc/cdnsystem/client/client.go b/pkg/rpc/cdnsystem/client/client.go index 8e9bb6befad..b26f1a598e1 100644 --- a/pkg/rpc/cdnsystem/client/client.go +++ b/pkg/rpc/cdnsystem/client/client.go @@ -101,17 +101,15 @@ func (cc *cdnClient) ObtainSeeds(ctx context.Context, sr *cdnsystem.SeedRequest, func (cc *cdnClient) GetPieceTasks(ctx context.Context, addr dfnet.NetAddr, req *base.PieceTaskRequest, opts ...grpc.CallOption) (*base.PiecePacket, error) { res, err := rpc.ExecuteWithRetry(func() (interface{}, error) { - defer func() { - logger.WithTaskID(req.TaskId).Infof("invoke cdn node %s GetPieceTasks", addr.GetEndpoint()) - }() client, err := cc.getSeederClientWithTarget(addr.GetEndpoint()) if err != nil { return nil, err } return client.GetPieceTasks(ctx, req, opts...) }, 0.2, 2.0, 3, nil) - if err == nil { - return res.(*base.PiecePacket), nil + if err != nil { + logger.WithTaskID(req.TaskId).Infof("GetPieceTasks: invoke cdn node %s GetPieceTasks failed: %v", addr.GetEndpoint(), err) + return nil, err } - return nil, err + return res.(*base.PiecePacket), nil } diff --git a/pkg/rpc/cdnsystem/client/piece_seed_stream.go b/pkg/rpc/cdnsystem/client/piece_seed_stream.go index 4bcbd0dc897..781426fa1e6 100644 --- a/pkg/rpc/cdnsystem/client/piece_seed_stream.go +++ b/pkg/rpc/cdnsystem/client/piece_seed_stream.go @@ -102,6 +102,7 @@ func (pss *PieceSeedStream) retryRecv(cause error) (*cdnsystem.PieceSeed, error) func (pss *PieceSeedStream) replaceStream(cause error) error { if pss.StreamTimes >= pss.MaxAttempts { + logger.WithTaskID(pss.hashKey).Info("replace stream reach max attempt") return cause } var target string @@ -115,7 +116,7 @@ func (pss *PieceSeedStream) replaceStream(cause error) error { return client.ObtainSeeds(pss.ctx, pss.sr, pss.opts...) }, pss.InitBackoff, pss.MaxBackOff, pss.MaxAttempts, cause) if err != nil { - logger.WithTaskID(pss.hashKey).Infof("replaceStream: invoke cdn node %s ObtainSeeds", target) + logger.WithTaskID(pss.hashKey).Infof("replaceStream: invoke cdn node %s ObtainSeeds failed: %v", target, err) return pss.replaceStream(cause) } pss.stream = stream.(cdnsystem.Seeder_ObtainSeedsClient) diff --git a/pkg/rpc/dfdaemon/client/client.go b/pkg/rpc/dfdaemon/client/client.go index 9548a3c13c4..bfd842b3b07 100644 --- a/pkg/rpc/dfdaemon/client/client.go +++ b/pkg/rpc/dfdaemon/client/client.go @@ -22,6 +22,7 @@ import ( "sync" "time" + logger "d7y.io/dragonfly/v2/internal/dflog" "d7y.io/dragonfly/v2/internal/idgen" "d7y.io/dragonfly/v2/pkg/basic/dfnet" "d7y.io/dragonfly/v2/pkg/rpc" @@ -114,12 +115,11 @@ func (dc *daemonClient) GetPieceTasks(ctx context.Context, target dfnet.NetAddr, } return client.GetPieceTasks(ctx, ptr, opts...) }, 0.2, 2.0, 3, nil) - - if err == nil { - return res.(*base.PiecePacket), nil + if err != nil { + logger.WithTaskID(ptr.TaskId).Infof("GetPieceTasks: invoke daemon node %s GetPieceTasks failed: %v", target, err) + return nil, err } - - return nil, err + return res.(*base.PiecePacket), nil } func (dc *daemonClient) CheckHealth(ctx context.Context, target dfnet.NetAddr, opts ...grpc.CallOption) (err error) { @@ -130,6 +130,9 @@ func (dc *daemonClient) CheckHealth(ctx context.Context, target dfnet.NetAddr, o } return client.CheckHealth(ctx, new(empty.Empty), opts...) }, 0.2, 2.0, 3, nil) - + if err != nil { + logger.Infof("CheckHealth: invoke daemon node %s CheckHealth failed: %v", target, err) + return + } return } diff --git a/pkg/rpc/dfdaemon/client/down_result_stream.go b/pkg/rpc/dfdaemon/client/down_result_stream.go index 41a6328b5a0..91a12fd8f86 100644 --- a/pkg/rpc/dfdaemon/client/down_result_stream.go +++ b/pkg/rpc/dfdaemon/client/down_result_stream.go @@ -113,6 +113,7 @@ func (drs *DownResultStream) retryRecv(cause error) (*dfdaemon.DownResult, error func (drs *DownResultStream) replaceStream(cause error) error { if drs.StreamTimes >= drs.MaxAttempts { + logger.WithTaskID(drs.hashKey).Info("replace stream reach max attempt") return cause } var target string @@ -126,7 +127,7 @@ func (drs *DownResultStream) replaceStream(cause error) error { return client.Download(drs.ctx, drs.req, drs.opts...) }, drs.InitBackoff, drs.MaxBackOff, drs.MaxAttempts, cause) if err != nil { - logger.WithTaskID(drs.hashKey).Infof("replaceStream: invoke daemon node %s Download", target) + logger.WithTaskID(drs.hashKey).Infof("replaceStream: invoke daemon node %s Download failed: %v", target, err) return drs.replaceClient(cause) } drs.stream = stream.(dfdaemon.Daemon_DownloadClient) @@ -153,7 +154,7 @@ func (drs *DownResultStream) replaceClient(cause error) error { return client.Download(drs.ctx, drs.req, drs.opts...) }, drs.InitBackoff, drs.MaxBackOff, drs.MaxAttempts, cause) if err != nil { - logger.WithTaskID(drs.hashKey).Infof("replaceClient: invoke daemon node %s Download", target) + logger.WithTaskID(drs.hashKey).Infof("replaceClient: invoke daemon node %s Download failed: %v", target, err) return drs.replaceClient(cause) } drs.stream = stream.(dfdaemon.Daemon_DownloadClient) diff --git a/pkg/rpc/scheduler/client/peer_packet_stream.go b/pkg/rpc/scheduler/client/peer_packet_stream.go index 711c420dec3..dc15efdfae9 100644 --- a/pkg/rpc/scheduler/client/peer_packet_stream.go +++ b/pkg/rpc/scheduler/client/peer_packet_stream.go @@ -212,7 +212,7 @@ func (pps *peerPacketStream) replaceClient(cause error) error { return client.ReportPieceResult(pps.ctx, pps.opts...) }, pps.retryMeta.InitBackoff, pps.retryMeta.MaxBackOff, pps.retryMeta.MaxAttempts, cause) if err != nil { - logger.WithTaskID(pps.hashKey).Infof("replaceClient: invoke scheduler node %s ReportPieceResult", target) + logger.WithTaskID(pps.hashKey).Infof("replaceClient: invoke scheduler node %s ReportPieceResult failed: %v", target, err) return pps.replaceClient(cause) } pps.stream = stream.(scheduler.Scheduler_ReportPieceResultClient) From 81e18377411e6a4a00d38f85145907d4969e4ca3 Mon Sep 17 00:00:00 2001 From: santong <244372610@qq.com> Date: Tue, 31 Aug 2021 17:40:13 +0800 Subject: [PATCH 8/8] feat: framework log Signed-off-by: santong <244372610@qq.com> --- pkg/rpc/client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/rpc/client.go b/pkg/rpc/client.go index 729cb502506..bdf2bbbd318 100644 --- a/pkg/rpc/client.go +++ b/pkg/rpc/client.go @@ -325,7 +325,7 @@ func (conn *Connection) loadOrCreateClientConnByNode(node string) (clientConn *g // stick whether hash key need already associated with specify node func (conn *Connection) GetClientConn(hashKey string, stick bool) (*grpc.ClientConn, error) { logger.With("conn", conn.name).Debugf("start to get client conn hashKey %s, stick %t", hashKey, stick) - defer logger.With("conn", conn.name).Debugf("get client conn hashKey %s, stick %t end", hashKey, stick) + defer logger.With("conn", conn.name).Debugf("get client conn done, hashKey %s, stick %t end", hashKey, stick) conn.rwMutex.RLock() node, ok := conn.key2NodeMap.Load(hashKey) if stick && !ok {