Skip to content

Commit

Permalink
Framework refactor (#590)
Browse files Browse the repository at this point in the history
* feat: optimize grpc

Signed-off-by: santong <[email protected]>

* feat: framework log

Signed-off-by: santong <[email protected]>
  • Loading branch information
244372610 authored and gaius-qi committed Jun 28, 2023
1 parent 74696f3 commit 09824e5
Show file tree
Hide file tree
Showing 11 changed files with 240 additions and 213 deletions.
10 changes: 4 additions & 6 deletions pkg/rpc/cdnsystem/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,17 +101,15 @@ func (cc *cdnClient) ObtainSeeds(ctx context.Context, sr *cdnsystem.SeedRequest,

func (cc *cdnClient) GetPieceTasks(ctx context.Context, addr dfnet.NetAddr, req *base.PieceTaskRequest, opts ...grpc.CallOption) (*base.PiecePacket, error) {
res, err := rpc.ExecuteWithRetry(func() (interface{}, error) {
defer func() {
logger.WithTaskID(req.TaskId).Infof("invoke cdn node %s GetPieceTasks", addr.GetEndpoint())
}()
client, err := cc.getSeederClientWithTarget(addr.GetEndpoint())
if err != nil {
return nil, err
}
return client.GetPieceTasks(ctx, req, opts...)
}, 0.2, 2.0, 3, nil)
if err == nil {
return res.(*base.PiecePacket), nil
if err != nil {
logger.WithTaskID(req.TaskId).Infof("GetPieceTasks: invoke cdn node %s GetPieceTasks failed: %v", addr.GetEndpoint(), err)
return nil, err
}
return nil, err
return res.(*base.PiecePacket), nil
}
69 changes: 36 additions & 33 deletions pkg/rpc/cdnsystem/client/piece_seed_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
logger "d7y.io/dragonfly/v2/internal/dflog"
"d7y.io/dragonfly/v2/pkg/rpc"
"d7y.io/dragonfly/v2/pkg/rpc/cdnsystem"
"github.com/pkg/errors"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand Down Expand Up @@ -64,22 +63,23 @@ func newPieceSeedStream(ctx context.Context, sc *cdnClient, hashKey string, sr *
}

func (pss *PieceSeedStream) initStream() error {
var target string
stream, err := rpc.ExecuteWithRetry(func() (interface{}, error) {
client, cdnServerNode, err := pss.sc.getCdnClient(pss.hashKey, false)
var client cdnsystem.SeederClient
var err error
client, target, err = pss.sc.getCdnClient(pss.hashKey, false)
if err != nil {
return nil, err
}
logger.WithTaskID(pss.hashKey).Infof("invoke cdn node %s ObtainSeeds", cdnServerNode)
return client.ObtainSeeds(pss.ctx, pss.sr, pss.opts...)
}, pss.InitBackoff, pss.MaxBackOff, pss.MaxAttempts, nil)
if err == nil {
pss.stream = stream.(cdnsystem.Seeder_ObtainSeedsClient)
pss.StreamTimes = 1
}
if err != nil {
err = pss.replaceClient(pss.hashKey, err)
logger.WithTaskID(pss.hashKey).Infof("initStream: invoke cdn node %s ObtainSeeds failed: %v", target, err)
return pss.replaceClient(pss.hashKey, err)
}
return err
pss.stream = stream.(cdnsystem.Seeder_ObtainSeedsClient)
pss.StreamTimes = 1
return nil
}

func (pss *PieceSeedStream) Recv() (ps *cdnsystem.PieceSeed, err error) {
Expand All @@ -91,58 +91,61 @@ func (pss *PieceSeedStream) Recv() (ps *cdnsystem.PieceSeed, err error) {
}

func (pss *PieceSeedStream) retryRecv(cause error) (*cdnsystem.PieceSeed, error) {
if status.Code(cause) == codes.DeadlineExceeded {
if status.Code(cause) == codes.DeadlineExceeded || status.Code(cause) == codes.Canceled {
return nil, cause
}

if err := pss.replaceStream(pss.hashKey, cause); err != nil {
if err := pss.replaceClient(pss.hashKey, cause); err != nil {
return nil, cause
}
if err := pss.replaceStream(cause); err != nil {
return nil, err
}

return pss.Recv()
}

func (pss *PieceSeedStream) replaceStream(key string, cause error) error {
func (pss *PieceSeedStream) replaceStream(cause error) error {
if pss.StreamTimes >= pss.MaxAttempts {
return errors.New("times of replacing stream reaches limit")
logger.WithTaskID(pss.hashKey).Info("replace stream reach max attempt")
return cause
}

var target string
stream, err := rpc.ExecuteWithRetry(func() (interface{}, error) {
client, _, err := pss.sc.getCdnClient(key, true)
var client cdnsystem.SeederClient
var err error
client, target, err = pss.sc.getCdnClient(pss.hashKey, true)
if err != nil {
return nil, err
}
return client.ObtainSeeds(pss.ctx, pss.sr, pss.opts...)
}, pss.InitBackoff, pss.MaxBackOff, pss.MaxAttempts, cause)
if err == nil {
pss.stream = stream.(cdnsystem.Seeder_ObtainSeedsClient)
pss.StreamTimes++
if err != nil {
logger.WithTaskID(pss.hashKey).Infof("replaceStream: invoke cdn node %s ObtainSeeds failed: %v", target, err)
return pss.replaceStream(cause)
}
return err
pss.stream = stream.(cdnsystem.Seeder_ObtainSeedsClient)
pss.StreamTimes++
return nil
}

func (pss *PieceSeedStream) replaceClient(key string, cause error) error {
preNode, err := pss.sc.TryMigrate(key, cause, pss.failedServers)
if err != nil {
return err
logger.WithTaskID(pss.hashKey).Infof("replaceClient: tryMigrate cdn node failed: %v", err)
return cause
}
pss.failedServers = append(pss.failedServers, preNode)

var target string
stream, err := rpc.ExecuteWithRetry(func() (interface{}, error) {
client, _, err := pss.sc.getCdnClient(key, true)
var client cdnsystem.SeederClient
var err error
client, target, err = pss.sc.getCdnClient(key, true)
if err != nil {
return nil, err
}
return client.ObtainSeeds(pss.ctx, pss.sr, pss.opts...)
}, pss.InitBackoff, pss.MaxBackOff, pss.MaxAttempts, cause)
if err == nil {
pss.stream = stream.(cdnsystem.Seeder_ObtainSeedsClient)
pss.StreamTimes = 1
}
if err != nil {
err = pss.replaceClient(key, cause)
logger.WithTaskID(pss.hashKey).Infof("replaceClient: invoke cdn node %s ObtainSeeds failed: %v", target, err)
return pss.replaceClient(key, cause)
}
return err
pss.stream = stream.(cdnsystem.Seeder_ObtainSeedsClient)
pss.StreamTimes = 1
return nil
}
2 changes: 1 addition & 1 deletion pkg/rpc/cdnsystem/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func init() {
})
}

// see cdnsystem.SeederServer
// SeederServer see cdnsystem.SeederServer
type SeederServer interface {
ObtainSeeds(context.Context, *cdnsystem.SeedRequest, chan<- *cdnsystem.PieceSeed) error
GetPieceTasks(context.Context, *base.PieceTaskRequest) (*base.PiecePacket, error)
Expand Down
43 changes: 20 additions & 23 deletions pkg/rpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@ import (
"github.com/pkg/errors"
"github.com/serialx/hashring"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/status"
"k8s.io/apimachinery/pkg/util/sets"

"d7y.io/dragonfly/v2/internal/dfcodes"
"d7y.io/dragonfly/v2/internal/dferrors"
Expand Down Expand Up @@ -204,16 +207,10 @@ func (conn *Connection) AddServerNodes(addrs []dfnet.NetAddr) error {
}

// findCandidateClientConn find candidate node client conn other than exclusiveNodes
func (conn *Connection) findCandidateClientConn(key string, exclusiveNodes ...string) (*candidateClient, error) {
func (conn *Connection) findCandidateClientConn(key string, exclusiveNodes sets.String) (*candidateClient, error) {
if node, ok := conn.key2NodeMap.Load(key); ok {
candidateNode := node.(string)
selected := true
for _, exclusiveNode := range exclusiveNodes {
if exclusiveNode == candidateNode {
selected = false
}
}
if selected {
if !exclusiveNodes.Has(candidateNode) {
if client, ok := conn.node2ClientMap.Load(node); ok {
return &candidateClient{
node: candidateNode,
Expand All @@ -232,23 +229,17 @@ func (conn *Connection) findCandidateClientConn(key string, exclusiveNodes ...st
}
candidateNodes := make([]string, 0)
for _, ringNode := range ringNodes {
candidate := true
for _, exclusiveNode := range exclusiveNodes {
if exclusiveNode == ringNode {
candidate = false
}
}
if candidate {
if !exclusiveNodes.Has(ringNode) {
candidateNodes = append(candidateNodes, ringNode)
}
}
logger.With("conn", conn.name).Infof("candidate result for hash key %s: all server node list: %v, exclusiveNodes node list: %v, candidate node list: %v",
key, ringNodes, exclusiveNodes, candidateNodes)
key, ringNodes, exclusiveNodes.List(), candidateNodes)
for _, candidateNode := range candidateNodes {
// Check whether there is a corresponding mapping client in the node2ClientMap
// TODO 下面部分可以直接调用loadOrCreate方法,但是日志没有这么调用打印全
if client, ok := conn.node2ClientMap.Load(candidateNode); ok {
logger.With("conn", conn.name).Infof("hit cache candidateNode %s for hash key %s", candidateNode, key)
logger.With("conn", conn.name).Debugf("hit cache candidateNode %s for hash key %s", candidateNode, key)
return &candidateClient{
node: candidateNode,
Ref: client,
Expand Down Expand Up @@ -334,6 +325,7 @@ func (conn *Connection) loadOrCreateClientConnByNode(node string) (clientConn *g
// stick whether hash key need already associated with specify node
func (conn *Connection) GetClientConn(hashKey string, stick bool) (*grpc.ClientConn, error) {
logger.With("conn", conn.name).Debugf("start to get client conn hashKey %s, stick %t", hashKey, stick)
defer logger.With("conn", conn.name).Debugf("get client conn done, hashKey %s, stick %t end", hashKey, stick)
conn.rwMutex.RLock()
node, ok := conn.key2NodeMap.Load(hashKey)
if stick && !ok {
Expand All @@ -351,12 +343,12 @@ func (conn *Connection) GetClientConn(hashKey string, stick bool) (*grpc.ClientC
}
return clientConn, nil
}
logger.With("conn", conn.name).Infof("no server node associated with hash key %s was found, start find candidate", hashKey)
logger.With("conn", conn.name).Infof("no server node associated with hash key %s was found, start find candidate server", hashKey)
conn.rwMutex.RUnlock()
// if absence
conn.rwMutex.Lock()
defer conn.rwMutex.Unlock()
client, err := conn.findCandidateClientConn(hashKey)
client, err := conn.findCandidateClientConn(hashKey, sets.NewString())
if err != nil {
return nil, errors.Wrapf(err, "prob candidate client conn for hash key %s", hashKey)
}
Expand All @@ -370,6 +362,10 @@ func (conn *Connection) GetClientConn(hashKey string, stick bool) (*grpc.ClientC
// preNode node before the migration
func (conn *Connection) TryMigrate(key string, cause error, exclusiveNodes []string) (preNode string, err error) {
logger.With("conn", conn.name).Infof("start try migrate server node for key %s, cause err: %v", key, cause)
if status.Code(cause) == codes.DeadlineExceeded || status.Code(cause) == codes.Canceled {
logger.With("conn", conn.name).Infof("migrate server node for key %s failed, cause err: %v", key, cause)
return "", cause
}
// TODO recover findCandidateClientConn error
if e, ok := cause.(*dferrors.DfError); ok {
if e.Code != dfcodes.ResourceLacked && e.Code != dfcodes.UnknownError {
Expand All @@ -378,16 +374,17 @@ func (conn *Connection) TryMigrate(key string, cause error, exclusiveNodes []str
}
currentNode := ""
conn.rwMutex.RLock()
if currentNode, ok := conn.key2NodeMap.Load(key); ok {
preNode = currentNode.(string)
exclusiveNodes = append(exclusiveNodes, currentNode.(string))
if node, ok := conn.key2NodeMap.Load(key); ok {
currentNode = node.(string)
preNode = currentNode
exclusiveNodes = append(exclusiveNodes, preNode)
} else {
logger.With("conn", conn.name).Warnf("failed to find server node for hash key %s", key)
}
conn.rwMutex.RUnlock()
conn.rwMutex.Lock()
defer conn.rwMutex.Unlock()
client, err := conn.findCandidateClientConn(key, exclusiveNodes...)
client, err := conn.findCandidateClientConn(key, sets.NewString(exclusiveNodes...))
if err != nil {
return "", errors.Wrapf(err, "find candidate client conn for hash key %s", key)
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/rpc/client_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"d7y.io/dragonfly/v2/internal/dfcodes"
Expand Down Expand Up @@ -190,6 +191,9 @@ func ExecuteWithRetry(f func() (interface{}, error), initBackoff float64, maxBac
return res, cause
}
}
if status.Code(cause) == codes.DeadlineExceeded || status.Code(cause) == codes.Canceled {
return res, cause
}
if i > 0 {
time.Sleep(mathutils.RandBackoff(initBackoff, maxBackoff, 2.0, i))
}
Expand Down
17 changes: 10 additions & 7 deletions pkg/rpc/dfdaemon/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"sync"
"time"

logger "d7y.io/dragonfly/v2/internal/dflog"
"d7y.io/dragonfly/v2/internal/idgen"
"d7y.io/dragonfly/v2/pkg/basic/dfnet"
"d7y.io/dragonfly/v2/pkg/rpc"
Expand Down Expand Up @@ -67,7 +68,7 @@ func GetElasticClientByAddrs(addrs []dfnet.NetAddr, opts ...grpc.DialOption) (Da
return elasticDaemonClient, nil
}

// see dfdaemon.DaemonClient
// DaemonClient see dfdaemon.DaemonClient
type DaemonClient interface {
Download(ctx context.Context, req *dfdaemon.DownRequest, opts ...grpc.CallOption) (*DownResultStream, error)

Expand Down Expand Up @@ -114,12 +115,11 @@ func (dc *daemonClient) GetPieceTasks(ctx context.Context, target dfnet.NetAddr,
}
return client.GetPieceTasks(ctx, ptr, opts...)
}, 0.2, 2.0, 3, nil)

if err == nil {
return res.(*base.PiecePacket), nil
if err != nil {
logger.WithTaskID(ptr.TaskId).Infof("GetPieceTasks: invoke daemon node %s GetPieceTasks failed: %v", target, err)
return nil, err
}

return nil, err
return res.(*base.PiecePacket), nil
}

func (dc *daemonClient) CheckHealth(ctx context.Context, target dfnet.NetAddr, opts ...grpc.CallOption) (err error) {
Expand All @@ -130,6 +130,9 @@ func (dc *daemonClient) CheckHealth(ctx context.Context, target dfnet.NetAddr, o
}
return client.CheckHealth(ctx, new(empty.Empty), opts...)
}, 0.2, 2.0, 3, nil)

if err != nil {
logger.Infof("CheckHealth: invoke daemon node %s CheckHealth failed: %v", target, err)
return
}
return
}
Loading

0 comments on commit 09824e5

Please sign in to comment.