Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Framework refactor #590

Merged
merged 8 commits into from
Aug 31, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 4 additions & 6 deletions pkg/rpc/cdnsystem/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,17 +101,15 @@ func (cc *cdnClient) ObtainSeeds(ctx context.Context, sr *cdnsystem.SeedRequest,

func (cc *cdnClient) GetPieceTasks(ctx context.Context, addr dfnet.NetAddr, req *base.PieceTaskRequest, opts ...grpc.CallOption) (*base.PiecePacket, error) {
res, err := rpc.ExecuteWithRetry(func() (interface{}, error) {
defer func() {
logger.WithTaskID(req.TaskId).Infof("invoke cdn node %s GetPieceTasks", addr.GetEndpoint())
}()
client, err := cc.getSeederClientWithTarget(addr.GetEndpoint())
if err != nil {
return nil, err
}
return client.GetPieceTasks(ctx, req, opts...)
}, 0.2, 2.0, 3, nil)
if err == nil {
return res.(*base.PiecePacket), nil
if err != nil {
logger.WithTaskID(req.TaskId).Infof("GetPieceTasks: invoke cdn node %s GetPieceTasks failed: %v", addr.GetEndpoint(), err)
return nil, err
}
return nil, err
return res.(*base.PiecePacket), nil
}
69 changes: 36 additions & 33 deletions pkg/rpc/cdnsystem/client/piece_seed_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
logger "d7y.io/dragonfly/v2/internal/dflog"
"d7y.io/dragonfly/v2/pkg/rpc"
"d7y.io/dragonfly/v2/pkg/rpc/cdnsystem"
"github.com/pkg/errors"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand Down Expand Up @@ -64,22 +63,23 @@ func newPieceSeedStream(ctx context.Context, sc *cdnClient, hashKey string, sr *
}

func (pss *PieceSeedStream) initStream() error {
var target string
stream, err := rpc.ExecuteWithRetry(func() (interface{}, error) {
client, cdnServerNode, err := pss.sc.getCdnClient(pss.hashKey, false)
var client cdnsystem.SeederClient
var err error
client, target, err = pss.sc.getCdnClient(pss.hashKey, false)
if err != nil {
return nil, err
}
logger.WithTaskID(pss.hashKey).Infof("invoke cdn node %s ObtainSeeds", cdnServerNode)
return client.ObtainSeeds(pss.ctx, pss.sr, pss.opts...)
}, pss.InitBackoff, pss.MaxBackOff, pss.MaxAttempts, nil)
if err == nil {
pss.stream = stream.(cdnsystem.Seeder_ObtainSeedsClient)
pss.StreamTimes = 1
}
if err != nil {
err = pss.replaceClient(pss.hashKey, err)
logger.WithTaskID(pss.hashKey).Infof("initStream: invoke cdn node %s ObtainSeeds failed: %v", target, err)
return pss.replaceClient(pss.hashKey, err)
}
return err
pss.stream = stream.(cdnsystem.Seeder_ObtainSeedsClient)
pss.StreamTimes = 1
return nil
}

func (pss *PieceSeedStream) Recv() (ps *cdnsystem.PieceSeed, err error) {
Expand All @@ -91,58 +91,61 @@ func (pss *PieceSeedStream) Recv() (ps *cdnsystem.PieceSeed, err error) {
}

func (pss *PieceSeedStream) retryRecv(cause error) (*cdnsystem.PieceSeed, error) {
if status.Code(cause) == codes.DeadlineExceeded {
if status.Code(cause) == codes.DeadlineExceeded || status.Code(cause) == codes.Canceled {
return nil, cause
}

if err := pss.replaceStream(pss.hashKey, cause); err != nil {
if err := pss.replaceClient(pss.hashKey, cause); err != nil {
return nil, cause
}
if err := pss.replaceStream(cause); err != nil {
return nil, err
}

return pss.Recv()
}

func (pss *PieceSeedStream) replaceStream(key string, cause error) error {
func (pss *PieceSeedStream) replaceStream(cause error) error {
if pss.StreamTimes >= pss.MaxAttempts {
return errors.New("times of replacing stream reaches limit")
logger.WithTaskID(pss.hashKey).Info("replace stream reach max attempt")
return cause
}

var target string
stream, err := rpc.ExecuteWithRetry(func() (interface{}, error) {
client, _, err := pss.sc.getCdnClient(key, true)
var client cdnsystem.SeederClient
var err error
client, target, err = pss.sc.getCdnClient(pss.hashKey, true)
if err != nil {
return nil, err
}
return client.ObtainSeeds(pss.ctx, pss.sr, pss.opts...)
}, pss.InitBackoff, pss.MaxBackOff, pss.MaxAttempts, cause)
if err == nil {
pss.stream = stream.(cdnsystem.Seeder_ObtainSeedsClient)
pss.StreamTimes++
if err != nil {
logger.WithTaskID(pss.hashKey).Infof("replaceStream: invoke cdn node %s ObtainSeeds failed: %v", target, err)
return pss.replaceStream(cause)
}
return err
pss.stream = stream.(cdnsystem.Seeder_ObtainSeedsClient)
pss.StreamTimes++
return nil
}

func (pss *PieceSeedStream) replaceClient(key string, cause error) error {
preNode, err := pss.sc.TryMigrate(key, cause, pss.failedServers)
if err != nil {
return err
logger.WithTaskID(pss.hashKey).Infof("replaceClient: tryMigrate cdn node failed: %v", err)
return cause
}
pss.failedServers = append(pss.failedServers, preNode)

var target string
stream, err := rpc.ExecuteWithRetry(func() (interface{}, error) {
client, _, err := pss.sc.getCdnClient(key, true)
var client cdnsystem.SeederClient
var err error
client, target, err = pss.sc.getCdnClient(key, true)
if err != nil {
return nil, err
}
return client.ObtainSeeds(pss.ctx, pss.sr, pss.opts...)
}, pss.InitBackoff, pss.MaxBackOff, pss.MaxAttempts, cause)
if err == nil {
pss.stream = stream.(cdnsystem.Seeder_ObtainSeedsClient)
pss.StreamTimes = 1
}
if err != nil {
err = pss.replaceClient(key, cause)
logger.WithTaskID(pss.hashKey).Infof("replaceClient: invoke cdn node %s ObtainSeeds failed: %v", target, err)
return pss.replaceClient(key, cause)
}
return err
pss.stream = stream.(cdnsystem.Seeder_ObtainSeedsClient)
pss.StreamTimes = 1
return nil
}
2 changes: 1 addition & 1 deletion pkg/rpc/cdnsystem/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func init() {
})
}

// see cdnsystem.SeederServer
// SeederServer see cdnsystem.SeederServer
type SeederServer interface {
ObtainSeeds(context.Context, *cdnsystem.SeedRequest, chan<- *cdnsystem.PieceSeed) error
GetPieceTasks(context.Context, *base.PieceTaskRequest) (*base.PiecePacket, error)
Expand Down
43 changes: 20 additions & 23 deletions pkg/rpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@ import (
"github.com/pkg/errors"
"github.com/serialx/hashring"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/status"
"k8s.io/apimachinery/pkg/util/sets"

"d7y.io/dragonfly/v2/internal/dfcodes"
"d7y.io/dragonfly/v2/internal/dferrors"
Expand Down Expand Up @@ -204,16 +207,10 @@ func (conn *Connection) AddServerNodes(addrs []dfnet.NetAddr) error {
}

// findCandidateClientConn find candidate node client conn other than exclusiveNodes
func (conn *Connection) findCandidateClientConn(key string, exclusiveNodes ...string) (*candidateClient, error) {
func (conn *Connection) findCandidateClientConn(key string, exclusiveNodes sets.String) (*candidateClient, error) {
if node, ok := conn.key2NodeMap.Load(key); ok {
candidateNode := node.(string)
selected := true
for _, exclusiveNode := range exclusiveNodes {
if exclusiveNode == candidateNode {
selected = false
}
}
if selected {
if !exclusiveNodes.Has(candidateNode) {
if client, ok := conn.node2ClientMap.Load(node); ok {
return &candidateClient{
node: candidateNode,
Expand All @@ -232,23 +229,17 @@ func (conn *Connection) findCandidateClientConn(key string, exclusiveNodes ...st
}
candidateNodes := make([]string, 0)
for _, ringNode := range ringNodes {
candidate := true
for _, exclusiveNode := range exclusiveNodes {
if exclusiveNode == ringNode {
candidate = false
}
}
if candidate {
if !exclusiveNodes.Has(ringNode) {
candidateNodes = append(candidateNodes, ringNode)
}
}
logger.With("conn", conn.name).Infof("candidate result for hash key %s: all server node list: %v, exclusiveNodes node list: %v, candidate node list: %v",
key, ringNodes, exclusiveNodes, candidateNodes)
key, ringNodes, exclusiveNodes.List(), candidateNodes)
for _, candidateNode := range candidateNodes {
// Check whether there is a corresponding mapping client in the node2ClientMap
// TODO 下面部分可以直接调用loadOrCreate方法,但是日志没有这么调用打印全
if client, ok := conn.node2ClientMap.Load(candidateNode); ok {
logger.With("conn", conn.name).Infof("hit cache candidateNode %s for hash key %s", candidateNode, key)
logger.With("conn", conn.name).Debugf("hit cache candidateNode %s for hash key %s", candidateNode, key)
return &candidateClient{
node: candidateNode,
Ref: client,
Expand Down Expand Up @@ -334,6 +325,7 @@ func (conn *Connection) loadOrCreateClientConnByNode(node string) (clientConn *g
// stick whether hash key need already associated with specify node
func (conn *Connection) GetClientConn(hashKey string, stick bool) (*grpc.ClientConn, error) {
logger.With("conn", conn.name).Debugf("start to get client conn hashKey %s, stick %t", hashKey, stick)
defer logger.With("conn", conn.name).Debugf("get client conn done, hashKey %s, stick %t end", hashKey, stick)
conn.rwMutex.RLock()
node, ok := conn.key2NodeMap.Load(hashKey)
if stick && !ok {
Expand All @@ -351,12 +343,12 @@ func (conn *Connection) GetClientConn(hashKey string, stick bool) (*grpc.ClientC
}
return clientConn, nil
}
logger.With("conn", conn.name).Infof("no server node associated with hash key %s was found, start find candidate", hashKey)
logger.With("conn", conn.name).Infof("no server node associated with hash key %s was found, start find candidate server", hashKey)
conn.rwMutex.RUnlock()
// if absence
conn.rwMutex.Lock()
defer conn.rwMutex.Unlock()
client, err := conn.findCandidateClientConn(hashKey)
client, err := conn.findCandidateClientConn(hashKey, sets.NewString())
if err != nil {
return nil, errors.Wrapf(err, "prob candidate client conn for hash key %s", hashKey)
}
Expand All @@ -370,6 +362,10 @@ func (conn *Connection) GetClientConn(hashKey string, stick bool) (*grpc.ClientC
// preNode node before the migration
func (conn *Connection) TryMigrate(key string, cause error, exclusiveNodes []string) (preNode string, err error) {
logger.With("conn", conn.name).Infof("start try migrate server node for key %s, cause err: %v", key, cause)
if status.Code(cause) == codes.DeadlineExceeded || status.Code(cause) == codes.Canceled {
logger.With("conn", conn.name).Infof("migrate server node for key %s failed, cause err: %v", key, cause)
return "", cause
}
// TODO recover findCandidateClientConn error
if e, ok := cause.(*dferrors.DfError); ok {
if e.Code != dfcodes.ResourceLacked && e.Code != dfcodes.UnknownError {
Expand All @@ -378,16 +374,17 @@ func (conn *Connection) TryMigrate(key string, cause error, exclusiveNodes []str
}
currentNode := ""
conn.rwMutex.RLock()
if currentNode, ok := conn.key2NodeMap.Load(key); ok {
preNode = currentNode.(string)
exclusiveNodes = append(exclusiveNodes, currentNode.(string))
if node, ok := conn.key2NodeMap.Load(key); ok {
currentNode = node.(string)
preNode = currentNode
exclusiveNodes = append(exclusiveNodes, preNode)
} else {
logger.With("conn", conn.name).Warnf("failed to find server node for hash key %s", key)
}
conn.rwMutex.RUnlock()
conn.rwMutex.Lock()
defer conn.rwMutex.Unlock()
client, err := conn.findCandidateClientConn(key, exclusiveNodes...)
client, err := conn.findCandidateClientConn(key, sets.NewString(exclusiveNodes...))
if err != nil {
return "", errors.Wrapf(err, "find candidate client conn for hash key %s", key)
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/rpc/client_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"d7y.io/dragonfly/v2/internal/dfcodes"
Expand Down Expand Up @@ -190,6 +191,9 @@ func ExecuteWithRetry(f func() (interface{}, error), initBackoff float64, maxBac
return res, cause
}
}
if status.Code(cause) == codes.DeadlineExceeded || status.Code(cause) == codes.Canceled {
return res, cause
}
if i > 0 {
time.Sleep(mathutils.RandBackoff(initBackoff, maxBackoff, 2.0, i))
}
Expand Down
17 changes: 10 additions & 7 deletions pkg/rpc/dfdaemon/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"sync"
"time"

logger "d7y.io/dragonfly/v2/internal/dflog"
"d7y.io/dragonfly/v2/internal/idgen"
"d7y.io/dragonfly/v2/pkg/basic/dfnet"
"d7y.io/dragonfly/v2/pkg/rpc"
Expand Down Expand Up @@ -67,7 +68,7 @@ func GetElasticClientByAddrs(addrs []dfnet.NetAddr, opts ...grpc.DialOption) (Da
return elasticDaemonClient, nil
}

// see dfdaemon.DaemonClient
// DaemonClient see dfdaemon.DaemonClient
type DaemonClient interface {
Download(ctx context.Context, req *dfdaemon.DownRequest, opts ...grpc.CallOption) (*DownResultStream, error)

Expand Down Expand Up @@ -114,12 +115,11 @@ func (dc *daemonClient) GetPieceTasks(ctx context.Context, target dfnet.NetAddr,
}
return client.GetPieceTasks(ctx, ptr, opts...)
}, 0.2, 2.0, 3, nil)

if err == nil {
return res.(*base.PiecePacket), nil
if err != nil {
logger.WithTaskID(ptr.TaskId).Infof("GetPieceTasks: invoke daemon node %s GetPieceTasks failed: %v", target, err)
return nil, err
}

return nil, err
return res.(*base.PiecePacket), nil
}

func (dc *daemonClient) CheckHealth(ctx context.Context, target dfnet.NetAddr, opts ...grpc.CallOption) (err error) {
Expand All @@ -130,6 +130,9 @@ func (dc *daemonClient) CheckHealth(ctx context.Context, target dfnet.NetAddr, o
}
return client.CheckHealth(ctx, new(empty.Empty), opts...)
}, 0.2, 2.0, 3, nil)

if err != nil {
logger.Infof("CheckHealth: invoke daemon node %s CheckHealth failed: %v", target, err)
return
}
return
}
Loading