diff --git a/pkg/rpc/cdnsystem/client/client.go b/pkg/rpc/cdnsystem/client/client.go index 8e9bb6befad..b26f1a598e1 100644 --- a/pkg/rpc/cdnsystem/client/client.go +++ b/pkg/rpc/cdnsystem/client/client.go @@ -101,17 +101,15 @@ func (cc *cdnClient) ObtainSeeds(ctx context.Context, sr *cdnsystem.SeedRequest, func (cc *cdnClient) GetPieceTasks(ctx context.Context, addr dfnet.NetAddr, req *base.PieceTaskRequest, opts ...grpc.CallOption) (*base.PiecePacket, error) { res, err := rpc.ExecuteWithRetry(func() (interface{}, error) { - defer func() { - logger.WithTaskID(req.TaskId).Infof("invoke cdn node %s GetPieceTasks", addr.GetEndpoint()) - }() client, err := cc.getSeederClientWithTarget(addr.GetEndpoint()) if err != nil { return nil, err } return client.GetPieceTasks(ctx, req, opts...) }, 0.2, 2.0, 3, nil) - if err == nil { - return res.(*base.PiecePacket), nil + if err != nil { + logger.WithTaskID(req.TaskId).Infof("GetPieceTasks: invoke cdn node %s GetPieceTasks failed: %v", addr.GetEndpoint(), err) + return nil, err } - return nil, err + return res.(*base.PiecePacket), nil } diff --git a/pkg/rpc/cdnsystem/client/piece_seed_stream.go b/pkg/rpc/cdnsystem/client/piece_seed_stream.go index 21fca81c0ea..781426fa1e6 100644 --- a/pkg/rpc/cdnsystem/client/piece_seed_stream.go +++ b/pkg/rpc/cdnsystem/client/piece_seed_stream.go @@ -23,7 +23,6 @@ import ( logger "d7y.io/dragonfly/v2/internal/dflog" "d7y.io/dragonfly/v2/pkg/rpc" "d7y.io/dragonfly/v2/pkg/rpc/cdnsystem" - "github.com/pkg/errors" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -64,22 +63,23 @@ func newPieceSeedStream(ctx context.Context, sc *cdnClient, hashKey string, sr * } func (pss *PieceSeedStream) initStream() error { + var target string stream, err := rpc.ExecuteWithRetry(func() (interface{}, error) { - client, cdnServerNode, err := pss.sc.getCdnClient(pss.hashKey, false) + var client cdnsystem.SeederClient + var err error + client, target, err = pss.sc.getCdnClient(pss.hashKey, false) if err != nil { return nil, err } - logger.WithTaskID(pss.hashKey).Infof("invoke cdn node %s ObtainSeeds", cdnServerNode) return client.ObtainSeeds(pss.ctx, pss.sr, pss.opts...) }, pss.InitBackoff, pss.MaxBackOff, pss.MaxAttempts, nil) - if err == nil { - pss.stream = stream.(cdnsystem.Seeder_ObtainSeedsClient) - pss.StreamTimes = 1 - } if err != nil { - err = pss.replaceClient(pss.hashKey, err) + logger.WithTaskID(pss.hashKey).Infof("initStream: invoke cdn node %s ObtainSeeds failed: %v", target, err) + return pss.replaceClient(pss.hashKey, err) } - return err + pss.stream = stream.(cdnsystem.Seeder_ObtainSeedsClient) + pss.StreamTimes = 1 + return nil } func (pss *PieceSeedStream) Recv() (ps *cdnsystem.PieceSeed, err error) { @@ -91,58 +91,61 @@ func (pss *PieceSeedStream) Recv() (ps *cdnsystem.PieceSeed, err error) { } func (pss *PieceSeedStream) retryRecv(cause error) (*cdnsystem.PieceSeed, error) { - if status.Code(cause) == codes.DeadlineExceeded { + if status.Code(cause) == codes.DeadlineExceeded || status.Code(cause) == codes.Canceled { return nil, cause } - - if err := pss.replaceStream(pss.hashKey, cause); err != nil { - if err := pss.replaceClient(pss.hashKey, cause); err != nil { - return nil, cause - } + if err := pss.replaceStream(cause); err != nil { + return nil, err } - return pss.Recv() } -func (pss *PieceSeedStream) replaceStream(key string, cause error) error { +func (pss *PieceSeedStream) replaceStream(cause error) error { if pss.StreamTimes >= pss.MaxAttempts { - return errors.New("times of replacing stream reaches limit") + logger.WithTaskID(pss.hashKey).Info("replace stream reach max attempt") + return cause } - + var target string stream, err := rpc.ExecuteWithRetry(func() (interface{}, error) { - client, _, err := pss.sc.getCdnClient(key, true) + var client cdnsystem.SeederClient + var err error + client, target, err = pss.sc.getCdnClient(pss.hashKey, true) if err != nil { return nil, err } return client.ObtainSeeds(pss.ctx, pss.sr, pss.opts...) }, pss.InitBackoff, pss.MaxBackOff, pss.MaxAttempts, cause) - if err == nil { - pss.stream = stream.(cdnsystem.Seeder_ObtainSeedsClient) - pss.StreamTimes++ + if err != nil { + logger.WithTaskID(pss.hashKey).Infof("replaceStream: invoke cdn node %s ObtainSeeds failed: %v", target, err) + return pss.replaceStream(cause) } - return err + pss.stream = stream.(cdnsystem.Seeder_ObtainSeedsClient) + pss.StreamTimes++ + return nil } func (pss *PieceSeedStream) replaceClient(key string, cause error) error { preNode, err := pss.sc.TryMigrate(key, cause, pss.failedServers) if err != nil { - return err + logger.WithTaskID(pss.hashKey).Infof("replaceClient: tryMigrate cdn node failed: %v", err) + return cause } pss.failedServers = append(pss.failedServers, preNode) - + var target string stream, err := rpc.ExecuteWithRetry(func() (interface{}, error) { - client, _, err := pss.sc.getCdnClient(key, true) + var client cdnsystem.SeederClient + var err error + client, target, err = pss.sc.getCdnClient(key, true) if err != nil { return nil, err } return client.ObtainSeeds(pss.ctx, pss.sr, pss.opts...) }, pss.InitBackoff, pss.MaxBackOff, pss.MaxAttempts, cause) - if err == nil { - pss.stream = stream.(cdnsystem.Seeder_ObtainSeedsClient) - pss.StreamTimes = 1 - } if err != nil { - err = pss.replaceClient(key, cause) + logger.WithTaskID(pss.hashKey).Infof("replaceClient: invoke cdn node %s ObtainSeeds failed: %v", target, err) + return pss.replaceClient(key, cause) } - return err + pss.stream = stream.(cdnsystem.Seeder_ObtainSeedsClient) + pss.StreamTimes = 1 + return nil } diff --git a/pkg/rpc/cdnsystem/server/server.go b/pkg/rpc/cdnsystem/server/server.go index dba2f0b7c90..ff01f88f9b4 100644 --- a/pkg/rpc/cdnsystem/server/server.go +++ b/pkg/rpc/cdnsystem/server/server.go @@ -39,7 +39,7 @@ func init() { }) } -// see cdnsystem.SeederServer +// SeederServer see cdnsystem.SeederServer type SeederServer interface { ObtainSeeds(context.Context, *cdnsystem.SeedRequest, chan<- *cdnsystem.PieceSeed) error GetPieceTasks(context.Context, *base.PieceTaskRequest) (*base.PiecePacket, error) diff --git a/pkg/rpc/client.go b/pkg/rpc/client.go index 438f1bce09d..bdf2bbbd318 100644 --- a/pkg/rpc/client.go +++ b/pkg/rpc/client.go @@ -25,7 +25,10 @@ import ( "github.com/pkg/errors" "github.com/serialx/hashring" "google.golang.org/grpc" + "google.golang.org/grpc/codes" "google.golang.org/grpc/keepalive" + "google.golang.org/grpc/status" + "k8s.io/apimachinery/pkg/util/sets" "d7y.io/dragonfly/v2/internal/dfcodes" "d7y.io/dragonfly/v2/internal/dferrors" @@ -204,16 +207,10 @@ func (conn *Connection) AddServerNodes(addrs []dfnet.NetAddr) error { } // findCandidateClientConn find candidate node client conn other than exclusiveNodes -func (conn *Connection) findCandidateClientConn(key string, exclusiveNodes ...string) (*candidateClient, error) { +func (conn *Connection) findCandidateClientConn(key string, exclusiveNodes sets.String) (*candidateClient, error) { if node, ok := conn.key2NodeMap.Load(key); ok { candidateNode := node.(string) - selected := true - for _, exclusiveNode := range exclusiveNodes { - if exclusiveNode == candidateNode { - selected = false - } - } - if selected { + if !exclusiveNodes.Has(candidateNode) { if client, ok := conn.node2ClientMap.Load(node); ok { return &candidateClient{ node: candidateNode, @@ -232,23 +229,17 @@ func (conn *Connection) findCandidateClientConn(key string, exclusiveNodes ...st } candidateNodes := make([]string, 0) for _, ringNode := range ringNodes { - candidate := true - for _, exclusiveNode := range exclusiveNodes { - if exclusiveNode == ringNode { - candidate = false - } - } - if candidate { + if !exclusiveNodes.Has(ringNode) { candidateNodes = append(candidateNodes, ringNode) } } logger.With("conn", conn.name).Infof("candidate result for hash key %s: all server node list: %v, exclusiveNodes node list: %v, candidate node list: %v", - key, ringNodes, exclusiveNodes, candidateNodes) + key, ringNodes, exclusiveNodes.List(), candidateNodes) for _, candidateNode := range candidateNodes { // Check whether there is a corresponding mapping client in the node2ClientMap // TODO 下面部分可以直接调用loadOrCreate方法,但是日志没有这么调用打印全 if client, ok := conn.node2ClientMap.Load(candidateNode); ok { - logger.With("conn", conn.name).Infof("hit cache candidateNode %s for hash key %s", candidateNode, key) + logger.With("conn", conn.name).Debugf("hit cache candidateNode %s for hash key %s", candidateNode, key) return &candidateClient{ node: candidateNode, Ref: client, @@ -334,6 +325,7 @@ func (conn *Connection) loadOrCreateClientConnByNode(node string) (clientConn *g // stick whether hash key need already associated with specify node func (conn *Connection) GetClientConn(hashKey string, stick bool) (*grpc.ClientConn, error) { logger.With("conn", conn.name).Debugf("start to get client conn hashKey %s, stick %t", hashKey, stick) + defer logger.With("conn", conn.name).Debugf("get client conn done, hashKey %s, stick %t end", hashKey, stick) conn.rwMutex.RLock() node, ok := conn.key2NodeMap.Load(hashKey) if stick && !ok { @@ -351,12 +343,12 @@ func (conn *Connection) GetClientConn(hashKey string, stick bool) (*grpc.ClientC } return clientConn, nil } - logger.With("conn", conn.name).Infof("no server node associated with hash key %s was found, start find candidate", hashKey) + logger.With("conn", conn.name).Infof("no server node associated with hash key %s was found, start find candidate server", hashKey) conn.rwMutex.RUnlock() // if absence conn.rwMutex.Lock() defer conn.rwMutex.Unlock() - client, err := conn.findCandidateClientConn(hashKey) + client, err := conn.findCandidateClientConn(hashKey, sets.NewString()) if err != nil { return nil, errors.Wrapf(err, "prob candidate client conn for hash key %s", hashKey) } @@ -370,6 +362,10 @@ func (conn *Connection) GetClientConn(hashKey string, stick bool) (*grpc.ClientC // preNode node before the migration func (conn *Connection) TryMigrate(key string, cause error, exclusiveNodes []string) (preNode string, err error) { logger.With("conn", conn.name).Infof("start try migrate server node for key %s, cause err: %v", key, cause) + if status.Code(cause) == codes.DeadlineExceeded || status.Code(cause) == codes.Canceled { + logger.With("conn", conn.name).Infof("migrate server node for key %s failed, cause err: %v", key, cause) + return "", cause + } // TODO recover findCandidateClientConn error if e, ok := cause.(*dferrors.DfError); ok { if e.Code != dfcodes.ResourceLacked && e.Code != dfcodes.UnknownError { @@ -378,16 +374,17 @@ func (conn *Connection) TryMigrate(key string, cause error, exclusiveNodes []str } currentNode := "" conn.rwMutex.RLock() - if currentNode, ok := conn.key2NodeMap.Load(key); ok { - preNode = currentNode.(string) - exclusiveNodes = append(exclusiveNodes, currentNode.(string)) + if node, ok := conn.key2NodeMap.Load(key); ok { + currentNode = node.(string) + preNode = currentNode + exclusiveNodes = append(exclusiveNodes, preNode) } else { logger.With("conn", conn.name).Warnf("failed to find server node for hash key %s", key) } conn.rwMutex.RUnlock() conn.rwMutex.Lock() defer conn.rwMutex.Unlock() - client, err := conn.findCandidateClientConn(key, exclusiveNodes...) + client, err := conn.findCandidateClientConn(key, sets.NewString(exclusiveNodes...)) if err != nil { return "", errors.Wrapf(err, "find candidate client conn for hash key %s", key) } diff --git a/pkg/rpc/client_util.go b/pkg/rpc/client_util.go index d6ddf92e4b8..2d3a47a1e21 100644 --- a/pkg/rpc/client_util.go +++ b/pkg/rpc/client_util.go @@ -22,6 +22,7 @@ import ( "time" "google.golang.org/grpc" + "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "d7y.io/dragonfly/v2/internal/dfcodes" @@ -190,6 +191,9 @@ func ExecuteWithRetry(f func() (interface{}, error), initBackoff float64, maxBac return res, cause } } + if status.Code(cause) == codes.DeadlineExceeded || status.Code(cause) == codes.Canceled { + return res, cause + } if i > 0 { time.Sleep(mathutils.RandBackoff(initBackoff, maxBackoff, 2.0, i)) } diff --git a/pkg/rpc/dfdaemon/client/client.go b/pkg/rpc/dfdaemon/client/client.go index a23f2a0aa65..bfd842b3b07 100644 --- a/pkg/rpc/dfdaemon/client/client.go +++ b/pkg/rpc/dfdaemon/client/client.go @@ -22,6 +22,7 @@ import ( "sync" "time" + logger "d7y.io/dragonfly/v2/internal/dflog" "d7y.io/dragonfly/v2/internal/idgen" "d7y.io/dragonfly/v2/pkg/basic/dfnet" "d7y.io/dragonfly/v2/pkg/rpc" @@ -67,7 +68,7 @@ func GetElasticClientByAddrs(addrs []dfnet.NetAddr, opts ...grpc.DialOption) (Da return elasticDaemonClient, nil } -// see dfdaemon.DaemonClient +// DaemonClient see dfdaemon.DaemonClient type DaemonClient interface { Download(ctx context.Context, req *dfdaemon.DownRequest, opts ...grpc.CallOption) (*DownResultStream, error) @@ -114,12 +115,11 @@ func (dc *daemonClient) GetPieceTasks(ctx context.Context, target dfnet.NetAddr, } return client.GetPieceTasks(ctx, ptr, opts...) }, 0.2, 2.0, 3, nil) - - if err == nil { - return res.(*base.PiecePacket), nil + if err != nil { + logger.WithTaskID(ptr.TaskId).Infof("GetPieceTasks: invoke daemon node %s GetPieceTasks failed: %v", target, err) + return nil, err } - - return nil, err + return res.(*base.PiecePacket), nil } func (dc *daemonClient) CheckHealth(ctx context.Context, target dfnet.NetAddr, opts ...grpc.CallOption) (err error) { @@ -130,6 +130,9 @@ func (dc *daemonClient) CheckHealth(ctx context.Context, target dfnet.NetAddr, o } return client.CheckHealth(ctx, new(empty.Empty), opts...) }, 0.2, 2.0, 3, nil) - + if err != nil { + logger.Infof("CheckHealth: invoke daemon node %s CheckHealth failed: %v", target, err) + return + } return } diff --git a/pkg/rpc/dfdaemon/client/down_result_stream.go b/pkg/rpc/dfdaemon/client/down_result_stream.go index 66d9eafc912..91a12fd8f86 100644 --- a/pkg/rpc/dfdaemon/client/down_result_stream.go +++ b/pkg/rpc/dfdaemon/client/down_result_stream.go @@ -23,7 +23,6 @@ import ( logger "d7y.io/dragonfly/v2/internal/dflog" "d7y.io/dragonfly/v2/pkg/rpc" "d7y.io/dragonfly/v2/pkg/rpc/dfdaemon" - "github.com/pkg/errors" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -63,22 +62,23 @@ func newDownResultStream(ctx context.Context, dc *daemonClient, hashKey string, } func (drs *DownResultStream) initStream() error { + var target string stream, err := rpc.ExecuteWithRetry(func() (interface{}, error) { - client, _, err := drs.dc.getDaemonClient(drs.hashKey, false) + var client dfdaemon.DaemonClient + var err error + client, target, err = drs.dc.getDaemonClient(drs.hashKey, false) if err != nil { return nil, err } return client.Download(drs.ctx, drs.req, drs.opts...) }, drs.InitBackoff, drs.MaxBackOff, drs.MaxAttempts, nil) - if err == nil { - drs.stream = stream.(dfdaemon.Daemon_DownloadClient) - drs.StreamTimes = 1 - } if err != nil { - err = drs.replaceClient(err) + logger.WithTaskID(drs.hashKey).Infof("initStream: invoke daemon node %s Download failed: %v", target, err) + return drs.replaceClient(err) } - - return err + drs.stream = stream.(dfdaemon.Daemon_DownloadClient) + drs.StreamTimes = 1 + return nil } func (drs *DownResultStream) Recv() (dr *dfdaemon.DownResult, err error) { @@ -100,14 +100,12 @@ func (drs *DownResultStream) Recv() (dr *dfdaemon.DownResult, err error) { } func (drs *DownResultStream) retryRecv(cause error) (*dfdaemon.DownResult, error) { - if status.Code(cause) == codes.DeadlineExceeded { + if status.Code(cause) == codes.DeadlineExceeded || status.Code(cause) == codes.Canceled { return nil, cause } if err := drs.replaceStream(cause); err != nil { - if err := drs.replaceClient(cause); err != nil { - return nil, cause - } + return nil, err } return drs.Recv() @@ -115,43 +113,51 @@ func (drs *DownResultStream) retryRecv(cause error) (*dfdaemon.DownResult, error func (drs *DownResultStream) replaceStream(cause error) error { if drs.StreamTimes >= drs.MaxAttempts { - return errors.New("times of replacing stream reaches limit") + logger.WithTaskID(drs.hashKey).Info("replace stream reach max attempt") + return cause } - + var target string stream, err := rpc.ExecuteWithRetry(func() (interface{}, error) { - client, _, err := drs.dc.getDaemonClient(drs.hashKey, true) + var client dfdaemon.DaemonClient + var err error + client, target, err = drs.dc.getDaemonClient(drs.hashKey, true) if err != nil { return nil, err } return client.Download(drs.ctx, drs.req, drs.opts...) }, drs.InitBackoff, drs.MaxBackOff, drs.MaxAttempts, cause) - if err == nil { - drs.stream = stream.(dfdaemon.Daemon_DownloadClient) - drs.StreamTimes++ + if err != nil { + logger.WithTaskID(drs.hashKey).Infof("replaceStream: invoke daemon node %s Download failed: %v", target, err) + return drs.replaceClient(cause) } - return err + drs.stream = stream.(dfdaemon.Daemon_DownloadClient) + drs.StreamTimes++ + return nil } func (drs *DownResultStream) replaceClient(cause error) error { preNode, err := drs.dc.TryMigrate(drs.hashKey, cause, drs.failedServers) if err != nil { - return err + logger.WithTaskID(drs.hashKey).Infof("replaceClient: tryMigrate daemon node failed: %v", err) + return cause } drs.failedServers = append(drs.failedServers, preNode) + var target string stream, err := rpc.ExecuteWithRetry(func() (interface{}, error) { - client, _, err := drs.dc.getDaemonClient(drs.hashKey, true) + var client dfdaemon.DaemonClient + var err error + client, target, err = drs.dc.getDaemonClient(drs.hashKey, true) if err != nil { return nil, err } return client.Download(drs.ctx, drs.req, drs.opts...) }, drs.InitBackoff, drs.MaxBackOff, drs.MaxAttempts, cause) - if err == nil { - drs.stream = stream.(dfdaemon.Daemon_DownloadClient) - drs.StreamTimes = 1 - } if err != nil { - err = drs.replaceClient(cause) + logger.WithTaskID(drs.hashKey).Infof("replaceClient: invoke daemon node %s Download failed: %v", target, err) + return drs.replaceClient(cause) } - return err + drs.stream = stream.(dfdaemon.Daemon_DownloadClient) + drs.StreamTimes = 1 + return nil } diff --git a/pkg/rpc/scheduler/client/client.go b/pkg/rpc/scheduler/client/client.go index 58c792f27de..f600b55b270 100644 --- a/pkg/rpc/scheduler/client/client.go +++ b/pkg/rpc/scheduler/client/client.go @@ -20,17 +20,17 @@ import ( "context" "time" - "github.com/pkg/errors" - "google.golang.org/grpc" - "d7y.io/dragonfly/v2/internal/dfcodes" - "d7y.io/dragonfly/v2/internal/dferrors" logger "d7y.io/dragonfly/v2/internal/dflog" "d7y.io/dragonfly/v2/internal/idgen" "d7y.io/dragonfly/v2/pkg/basic/dfnet" "d7y.io/dragonfly/v2/pkg/rpc" "d7y.io/dragonfly/v2/pkg/rpc/base" "d7y.io/dragonfly/v2/pkg/rpc/scheduler" + "github.com/pkg/errors" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) func GetClientByAddr(addrs []dfnet.NetAddr, opts ...grpc.DialOption) (SchedulerClient, error) { @@ -47,10 +47,11 @@ func GetClientByAddr(addrs []dfnet.NetAddr, opts ...grpc.DialOption) (SchedulerC return sc, nil } -// see scheduler.SchedulerClient +// SchedulerClient see scheduler.SchedulerClient type SchedulerClient interface { + // RegisterPeerTask register peer task to scheduler RegisterPeerTask(context.Context, *scheduler.PeerTaskRequest, ...grpc.CallOption) (*scheduler.RegisterResult, error) - // IsMigrating of ptr will be set to true + // ReportPieceResult IsMigrating of ptr will be set to true ReportPieceResult(context.Context, string, *scheduler.PeerTaskRequest, ...grpc.CallOption) (PeerPacketStream, error) ReportPeerResult(context.Context, *scheduler.PeerResult, ...grpc.CallOption) error @@ -72,16 +73,9 @@ func (sc *schedulerClient) getSchedulerClient(key string, stick bool) (scheduler return scheduler.NewSchedulerClient(clientConn), clientConn.Target(), nil } -func (sc *schedulerClient) RegisterPeerTask(ctx context.Context, ptr *scheduler.PeerTaskRequest, opts ...grpc.CallOption) (rr *scheduler.RegisterResult, - err error) { - return sc.doRegisterPeerTask(ctx, ptr, []string{}, opts) -} - -func (sc *schedulerClient) doRegisterPeerTask(ctx context.Context, ptr *scheduler.PeerTaskRequest, exclusiveNodes []string, - opts []grpc.CallOption) (rr *scheduler.RegisterResult, err error) { +func (sc *schedulerClient) RegisterPeerTask(ctx context.Context, ptr *scheduler.PeerTaskRequest, opts ...grpc.CallOption) (*scheduler.RegisterResult, error) { var ( taskID string - code base.Code schedulerNode string res interface{} ) @@ -90,65 +84,66 @@ func (sc *schedulerClient) doRegisterPeerTask(ctx context.Context, ptr *schedule ptr.Url) reg := func() (interface{}, error) { var client scheduler.SchedulerClient + var err error client, schedulerNode, err = sc.getSchedulerClient(key, false) if err != nil { - code = dfcodes.ServerUnavailable return nil, err } return client.RegisterPeerTask(ctx, ptr, opts...) } - res, err = rpc.ExecuteWithRetry(reg, 0.5, 5.0, 5, nil) - if err == nil { - rr = res.(*scheduler.RegisterResult) - taskID = rr.TaskId - code = dfcodes.Success - if taskID != key { - logger.WithTaskAndPeerID(taskID, ptr.PeerId).Warnf("register peer task correct taskId from %s to %s", key, taskID) - sc.Connection.CorrectKey2NodeRelation(key, taskID) - } - logger.With("peerId", ptr.PeerId). - Infof("register peer task result success for taskId: %s, url: %s, scheduler: %s", - taskID, ptr.Url, schedulerNode) - return + res, err := rpc.ExecuteWithRetry(reg, 0.2, 2.0, 3, nil) + if err != nil { + logger.WithTaskAndPeerID(key, ptr.PeerId).Errorf("RegisterPeerTask: register peer task to scheduler %s failed: %v", schedulerNode, err) + return sc.retryRegisterPeerTask(ctx, key, ptr, []string{schedulerNode}, err, opts) } - - if de, ok := err.(*dferrors.DfError); ok { - code = de.Code + rr := res.(*scheduler.RegisterResult) + taskID = rr.TaskId + if taskID != key { + logger.WithTaskAndPeerID(taskID, ptr.PeerId).Warnf("register peer task correct taskId from %s to %s", key, taskID) + sc.Connection.CorrectKey2NodeRelation(key, taskID) } - logger.With("peerId", ptr.PeerId, "errMsg", err). - Errorf("register peer task result failed, code: [%d] for taskId: %s, url: %s, scheduler: %s", - int32(code), taskID, ptr.Url, schedulerNode) + logger.WithTaskAndPeerID(taskID, ptr.PeerId). + Infof("register peer task result success url: %s, scheduler: %s", ptr.Url, schedulerNode) + return rr, err +} - // previous schedule failed, report peer task to free load and other resources - var client scheduler.SchedulerClient - client, schedulerNode, err = sc.getSchedulerClient(key, false) +func (sc *schedulerClient) retryRegisterPeerTask(ctx context.Context, hashKey string, ptr *scheduler.PeerTaskRequest, exclusiveNodes []string, cause error, + opts []grpc.CallOption) (*scheduler.RegisterResult, error) { + if status.Code(cause) == codes.Canceled || status.Code(cause) == codes.DeadlineExceeded { + return nil, cause + } + var ( + taskID string + schedulerNode string + ) + preNode, err := sc.TryMigrate(hashKey, cause, exclusiveNodes) if err != nil { - logger.With("peerId", ptr.PeerId, "errMsg", err).Errorf("get scheduler client failed") - } else { - _, e := client.ReportPeerResult( - context.Background(), - &scheduler.PeerResult{ - TaskId: taskID, - PeerId: ptr.PeerId, - SrcIp: ptr.PeerHost.Ip, - SecurityDomain: ptr.PeerHost.SecurityDomain, - Idc: ptr.PeerHost.Idc, - Url: ptr.Url, - ContentLength: -1, - Traffic: -1, - Cost: 0, - Success: false, - Code: dfcodes.UnknownError, - }) - logger.With("peerId", ptr.PeerId, "errMsg", e).Warnf("report failed peer result") - } - - var preNode string - if preNode, err = sc.TryMigrate(key, err, exclusiveNodes); err == nil { - exclusiveNodes = append(exclusiveNodes, preNode) - return sc.doRegisterPeerTask(ctx, ptr, exclusiveNodes, opts) + return nil, cause } - return + exclusiveNodes = append(exclusiveNodes, preNode) + res, err := rpc.ExecuteWithRetry(func() (interface{}, error) { + var client scheduler.SchedulerClient + var err error + client, schedulerNode, err = sc.getSchedulerClient(hashKey, true) + if err != nil { + return nil, err + } + return client.RegisterPeerTask(ctx, ptr, opts...) + }, 0.2, 2.0, 3, cause) + if err != nil { + logger.WithTaskAndPeerID(hashKey, ptr.PeerId).Errorf("retryRegisterPeerTask: register peer task to scheduler %s failed: %v", schedulerNode, err) + return sc.retryRegisterPeerTask(ctx, hashKey, ptr, exclusiveNodes, err, opts) + } + rr := res.(*scheduler.RegisterResult) + taskID = rr.TaskId + if taskID != hashKey { + logger.WithTaskAndPeerID(taskID, ptr.PeerId).Warnf("register peer task correct taskId from %s to %s", hashKey, taskID) + sc.Connection.CorrectKey2NodeRelation(hashKey, taskID) + } + logger.WithTaskAndPeerID(taskID, ptr.PeerId). + Infof("register peer task result success url: %s, scheduler: %s", ptr.Url, schedulerNode) + return rr, nil + } func (sc *schedulerClient) ReportPieceResult(ctx context.Context, taskID string, ptr *scheduler.PeerTaskRequest, opts ...grpc.CallOption) (PeerPacketStream, error) { @@ -162,16 +157,40 @@ func (sc *schedulerClient) ReportPieceResult(ctx context.Context, taskID string, } func (sc *schedulerClient) ReportPeerResult(ctx context.Context, pr *scheduler.PeerResult, opts ...grpc.CallOption) error { - return sc.doReportPeerResult(ctx, pr, []string{}, opts) + var ( + schedulerNode string + ) + _, err := rpc.ExecuteWithRetry(func() (interface{}, error) { + var client scheduler.SchedulerClient + var err error + client, schedulerNode, err = sc.getSchedulerClient(pr.TaskId, true) + if err != nil { + return nil, err + } + return client.ReportPeerResult(ctx, pr, opts...) + }, 0.2, 2.0, 3, nil) + if err != nil { + logger.WithTaskAndPeerID(pr.TaskId, pr.PeerId).Errorf("ReportPeerResult: report peer result to scheduler %s failed: %v", schedulerNode, err) + return sc.retryReportPeerResult(ctx, pr, []string{schedulerNode}, err, opts) + } + return nil } -func (sc *schedulerClient) doReportPeerResult(ctx context.Context, pr *scheduler.PeerResult, exclusiveNodes []string, opts []grpc.CallOption) (err error) { +func (sc *schedulerClient) retryReportPeerResult(ctx context.Context, pr *scheduler.PeerResult, exclusiveNodes []string, + cause error, opts []grpc.CallOption) (err error) { + if status.Code(cause) == codes.Canceled || status.Code(cause) == codes.DeadlineExceeded { + return cause + } var ( schedulerNode string suc bool code base.Code ) - + preNode, err := sc.TryMigrate(pr.TaskId, err, exclusiveNodes) + if err != nil { + return cause + } + exclusiveNodes = append(exclusiveNodes, preNode) _, err = rpc.ExecuteWithRetry(func() (interface{}, error) { var client scheduler.SchedulerClient client, schedulerNode, err = sc.getSchedulerClient(pr.TaskId, true) @@ -180,24 +199,16 @@ func (sc *schedulerClient) doReportPeerResult(ctx context.Context, pr *scheduler return nil, err } return client.ReportPeerResult(ctx, pr, opts...) - }, 0.5, 5.0, 5, nil) - if err == nil { - suc = true - code = dfcodes.Success + }, 0.2, 2.0, 3, nil) + if err != nil { + logger.WithTaskAndPeerID(pr.TaskId, pr.PeerId).Errorf("retryReportPeerResult: report peer result to scheduler %s failed: %v", schedulerNode, err) + return sc.retryReportPeerResult(ctx, pr, exclusiveNodes, cause, opts) } logger.With("peerId", pr.PeerId, "errMsg", err). Infof("report peer result: %t[%d], peer task down result: %t[%d] for taskId: %s, url: %s, scheduler: %s, length: %d, traffic: %d, cost: %d", suc, int32(code), pr.Success, int32(pr.Code), pr.TaskId, pr.Url, schedulerNode, pr.ContentLength, pr.Traffic, pr.Cost) - if err != nil { - var preNode string - if preNode, err = sc.TryMigrate(pr.TaskId, err, exclusiveNodes); err == nil { - exclusiveNodes = append(exclusiveNodes, preNode) - return sc.doReportPeerResult(ctx, pr, exclusiveNodes, opts) - } - } - return } @@ -207,21 +218,22 @@ func (sc *schedulerClient) LeaveTask(ctx context.Context, pt *scheduler.PeerTarg suc bool ) defer func() { - logger.With("peerId", pt.PeerId, "errMsg", err).Infof("leave from task result: %t for taskId: %s, scheduler: %s", suc, pt.TaskId, schedulerNode) + logger.With("peerId", pt.PeerId, "errMsg", err).Infof("leave from task result: %t for taskId: %s, scheduler server node: %s, err:%v", suc, pt.TaskId, + schedulerNode, err) }() - _, err = rpc.ExecuteWithRetry(func() (interface{}, error) { + leaveFun := func() (interface{}, error) { var client scheduler.SchedulerClient - client, schedulerNode, err = sc.getSchedulerClient(pt.TaskId, true) + client, schedulerNode, err = sc.getSchedulerClient(pt.TaskId, false) if err != nil { return nil, err } return client.LeaveTask(ctx, pt, opts...) - }, 0.5, 5.0, 3, nil) + } + _, err = rpc.ExecuteWithRetry(leaveFun, 0.2, 2.0, 3, nil) if err == nil { suc = true } - return } diff --git a/pkg/rpc/scheduler/client/peer_packet_stream.go b/pkg/rpc/scheduler/client/peer_packet_stream.go index 8c88a2c1734..dc15efdfae9 100644 --- a/pkg/rpc/scheduler/client/peer_packet_stream.go +++ b/pkg/rpc/scheduler/client/peer_packet_stream.go @@ -20,7 +20,7 @@ import ( "context" "io" - "github.com/pkg/errors" + logger "d7y.io/dragonfly/v2/internal/dflog" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -103,21 +103,19 @@ func (pps *peerPacketStream) Recv() (pp *scheduler.PeerPacket, err error) { } func (pps *peerPacketStream) retrySend(pr *scheduler.PieceResult, cause error) error { - if status.Code(cause) == codes.DeadlineExceeded { + if status.Code(cause) == codes.DeadlineExceeded || status.Code(cause) == codes.Canceled { return cause } if err := pps.replaceStream(cause); err != nil { - if err := pps.replaceClient(cause); err != nil { - return cause - } + return err } return pps.Send(pr) } func (pps *peerPacketStream) retryRecv(cause error) (*scheduler.PeerPacket, error) { - if status.Code(cause) == codes.DeadlineExceeded { + if status.Code(cause) == codes.DeadlineExceeded || status.Code(cause) == codes.Canceled { return nil, cause } _, err := rpc.ExecuteWithRetry(func() (interface{}, error) { @@ -150,67 +148,74 @@ func (pps *peerPacketStream) retryRecv(cause error) (*scheduler.PeerPacket, erro } func (pps *peerPacketStream) initStream() error { + var target string stream, err := rpc.ExecuteWithRetry(func() (interface{}, error) { - client, _, err := pps.sc.getSchedulerClient(pps.hashKey, true) + var client scheduler.SchedulerClient + var err error + client, target, err = pps.sc.getSchedulerClient(pps.hashKey, true) if err != nil { return nil, err } return client.ReportPieceResult(pps.ctx, pps.opts...) }, pps.retryMeta.InitBackoff, pps.retryMeta.MaxBackOff, pps.retryMeta.MaxAttempts, nil) - if err == nil { - pps.stream = stream.(scheduler.Scheduler_ReportPieceResultClient) - pps.retryMeta.StreamTimes = 1 - } if err != nil { - err = pps.replaceClient(err) + logger.WithTaskID(pps.hashKey).Infof("initStream: invoke scheduler node %s ReportPieceResult failed: %v", target, err) + return pps.replaceClient(err) } - return err + pps.stream = stream.(scheduler.Scheduler_ReportPieceResultClient) + pps.retryMeta.StreamTimes = 1 + return nil } func (pps *peerPacketStream) replaceStream(cause error) error { if pps.retryMeta.StreamTimes >= pps.retryMeta.MaxAttempts { - return errors.New("times of replacing stream reaches limit") + return cause } + var target string res, err := rpc.ExecuteWithRetry(func() (interface{}, error) { - client, _, err := pps.sc.getSchedulerClient(pps.hashKey, true) + var client scheduler.SchedulerClient + var err error + client, target, err = pps.sc.getSchedulerClient(pps.hashKey, true) if err != nil { return nil, err } return client.ReportPieceResult(pps.ctx, pps.opts...) }, pps.retryMeta.InitBackoff, pps.retryMeta.MaxBackOff, pps.retryMeta.MaxAttempts, cause) - if err == nil { - pps.stream = res.(scheduler.Scheduler_ReportPieceResultClient) - pps.retryMeta.StreamTimes++ + if err != nil { + logger.WithTaskID(pps.hashKey).Infof("replaceStream: invoke scheduler node %s ReportPieceResult failed: %v", target, err) + return pps.replaceStream(cause) } - return err + pps.stream = res.(scheduler.Scheduler_ReportPieceResultClient) + pps.retryMeta.StreamTimes++ + return nil } func (pps *peerPacketStream) replaceClient(cause error) error { preNode, err := pps.sc.TryMigrate(pps.hashKey, cause, pps.failedServers) if err != nil { - return err + logger.WithTaskID(pps.hashKey).Infof("replaceClient: tryMigrate scheduler node failed: %v", err) + return cause } pps.failedServers = append(pps.failedServers, preNode) - + var target string stream, err := rpc.ExecuteWithRetry(func() (interface{}, error) { - client, _, err := pps.sc.getSchedulerClient(pps.hashKey, true) + var client scheduler.SchedulerClient + var err error + client, target, err = pps.sc.getSchedulerClient(pps.hashKey, true) if err != nil { return nil, err } - //timeCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - //defer cancel() _, err = client.RegisterPeerTask(pps.ctx, pps.ptr) if err != nil { return nil, err } return client.ReportPieceResult(pps.ctx, pps.opts...) }, pps.retryMeta.InitBackoff, pps.retryMeta.MaxBackOff, pps.retryMeta.MaxAttempts, cause) - if err == nil { - pps.stream = stream.(scheduler.Scheduler_ReportPieceResultClient) - pps.retryMeta.StreamTimes = 1 - } if err != nil { - err = pps.replaceClient(cause) + logger.WithTaskID(pps.hashKey).Infof("replaceClient: invoke scheduler node %s ReportPieceResult failed: %v", target, err) + return pps.replaceClient(cause) } - return err + pps.stream = stream.(scheduler.Scheduler_ReportPieceResultClient) + pps.retryMeta.StreamTimes = 1 + return nil } diff --git a/scheduler/core/scheduler/basic/basic_scheduler.go b/scheduler/core/scheduler/basic/basic_scheduler.go index 289649ec5dc..0c1f4589f1f 100644 --- a/scheduler/core/scheduler/basic/basic_scheduler.go +++ b/scheduler/core/scheduler/basic/basic_scheduler.go @@ -146,6 +146,7 @@ func (s *Scheduler) selectCandidateChildren(peer *supervisor.Peer, limit int) (l logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("******candidate child peer is not selected because it is nil") return false } + // TODO IsWaiting if candidateNode.IsDone() { logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("******candidate child peer %s is not selected because it has done", candidateNode.PeerID) diff --git a/scheduler/core/service.go b/scheduler/core/service.go index 04597461203..5cf659cccf2 100644 --- a/scheduler/core/service.go +++ b/scheduler/core/service.go @@ -179,14 +179,12 @@ func (s *SchedulerService) GetPeerTask(peerTaskID string) (peerTask *supervisor. } func (s *SchedulerService) RegisterPeerTask(req *schedulerRPC.PeerTaskRequest, task *supervisor.Task) (*supervisor.Peer, error) { - // get or create host - reqPeerHost := req.PeerHost var ( peer *supervisor.Peer ok bool peerHost *supervisor.PeerHost ) - + reqPeerHost := req.PeerHost if peerHost, ok = s.hostManager.Get(reqPeerHost.Uuid); !ok { peerHost = supervisor.NewClientPeerHost(reqPeerHost.Uuid, reqPeerHost.Ip, reqPeerHost.HostName, reqPeerHost.RpcPort, reqPeerHost.DownPort, reqPeerHost.SecurityDomain, reqPeerHost.Location, reqPeerHost.Idc, reqPeerHost.NetTopology, s.config.ClientLoad)