Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Framework refactor #590

Merged
merged 8 commits into from
Aug 31, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
feat: optimize grpc
Signed-off-by: santong <[email protected]>
  • Loading branch information
244372610 committed Aug 31, 2021
commit 8c6ec64ecd2b8d493981ed5a574dfa0d7e24f47d
58 changes: 31 additions & 27 deletions pkg/rpc/cdnsystem/client/piece_seed_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,22 +63,23 @@ func newPieceSeedStream(ctx context.Context, sc *cdnClient, hashKey string, sr *
}

func (pss *PieceSeedStream) initStream() error {
var target string
stream, err := rpc.ExecuteWithRetry(func() (interface{}, error) {
client, target, err := pss.sc.getCdnClient(pss.hashKey, false)
var client cdnsystem.SeederClient
var err error
client, target, err = pss.sc.getCdnClient(pss.hashKey, false)
if err != nil {
return nil, err
}
logger.WithTaskID(pss.hashKey).Infof("initStream: invoke cdn node %s ObtainSeeds", target)
return client.ObtainSeeds(pss.ctx, pss.sr, pss.opts...)
}, pss.InitBackoff, pss.MaxBackOff, pss.MaxAttempts, nil)
if err == nil {
pss.stream = stream.(cdnsystem.Seeder_ObtainSeedsClient)
pss.StreamTimes = 1
}
if err != nil {
err = pss.replaceClient(pss.hashKey, err)
logger.WithTaskID(pss.hashKey).Infof("initStream: invoke cdn node %s ObtainSeeds failed: %v", target, err)
return pss.replaceClient(pss.hashKey, err)
}
return err
pss.stream = stream.(cdnsystem.Seeder_ObtainSeedsClient)
pss.StreamTimes = 1
return nil
}

func (pss *PieceSeedStream) Recv() (ps *cdnsystem.PieceSeed, err error) {
Expand All @@ -103,44 +104,47 @@ func (pss *PieceSeedStream) replaceStream(cause error) error {
if pss.StreamTimes >= pss.MaxAttempts {
return cause
}

var target string
stream, err := rpc.ExecuteWithRetry(func() (interface{}, error) {
client, target, err := pss.sc.getCdnClient(pss.hashKey, true)
var client cdnsystem.SeederClient
var err error
client, target, err = pss.sc.getCdnClient(pss.hashKey, true)
if err != nil {
return nil, err
}
logger.WithTaskID(pss.hashKey).Infof("replaceStream: invoke cdn node %s ObtainSeeds", target)
return client.ObtainSeeds(pss.ctx, pss.sr, pss.opts...)
}, pss.InitBackoff, pss.MaxBackOff, pss.MaxAttempts, cause)
if err == nil {
pss.stream = stream.(cdnsystem.Seeder_ObtainSeedsClient)
pss.StreamTimes++
} else {
err = pss.replaceStream(cause)
if err != nil {
logger.WithTaskID(pss.hashKey).Infof("replaceStream: invoke cdn node %s ObtainSeeds", target)
return pss.replaceStream(cause)
}
return err
pss.stream = stream.(cdnsystem.Seeder_ObtainSeedsClient)
pss.StreamTimes++
return nil
}

func (pss *PieceSeedStream) replaceClient(key string, cause error) error {
preNode, err := pss.sc.TryMigrate(key, cause, pss.failedServers)
if err != nil {
return err
logger.WithTaskID(pss.hashKey).Infof("replaceClient: tryMigrate cdn node failed: %v", err)
return cause
}
pss.failedServers = append(pss.failedServers, preNode)

var target string
stream, err := rpc.ExecuteWithRetry(func() (interface{}, error) {
client, target, err := pss.sc.getCdnClient(key, true)
var client cdnsystem.SeederClient
var err error
client, target, err = pss.sc.getCdnClient(key, true)
if err != nil {
return nil, err
}
logger.WithTaskID(pss.hashKey).Infof("replaceClient: invoke cdn node %s ObtainSeeds", target)
return client.ObtainSeeds(pss.ctx, pss.sr, pss.opts...)
}, pss.InitBackoff, pss.MaxBackOff, pss.MaxAttempts, cause)
if err == nil {
pss.stream = stream.(cdnsystem.Seeder_ObtainSeedsClient)
pss.StreamTimes = 1
} else {
err = pss.replaceClient(key, cause)
if err != nil {
logger.WithTaskID(pss.hashKey).Infof("replaceClient: invoke cdn node %s ObtainSeeds failed: %v", target, err)
return pss.replaceClient(key, cause)
}
return err
pss.stream = stream.(cdnsystem.Seeder_ObtainSeedsClient)
pss.StreamTimes = 1
return nil
}
2 changes: 1 addition & 1 deletion pkg/rpc/cdnsystem/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func init() {
})
}

// see cdnsystem.SeederServer
// SeederServer see cdnsystem.SeederServer
type SeederServer interface {
ObtainSeeds(context.Context, *cdnsystem.SeedRequest, chan<- *cdnsystem.PieceSeed) error
GetPieceTasks(context.Context, *base.PieceTaskRequest) (*base.PiecePacket, error)
Expand Down
6 changes: 6 additions & 0 deletions pkg/rpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ import (
"github.com/pkg/errors"
"github.com/serialx/hashring"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/status"
"k8s.io/apimachinery/pkg/util/sets"

"d7y.io/dragonfly/v2/internal/dfcodes"
Expand Down Expand Up @@ -360,6 +362,10 @@ func (conn *Connection) GetClientConn(hashKey string, stick bool) (*grpc.ClientC
// preNode node before the migration
func (conn *Connection) TryMigrate(key string, cause error, exclusiveNodes []string) (preNode string, err error) {
logger.With("conn", conn.name).Infof("start try migrate server node for key %s, cause err: %v", key, cause)
if status.Code(cause) == codes.DeadlineExceeded || status.Code(cause) == codes.Canceled {
logger.With("conn", conn.name).Infof("migrate server node for key %s failed, cause err: %v", key, cause)
return "", cause
}
// TODO recover findCandidateClientConn error
if e, ok := cause.(*dferrors.DfError); ok {
if e.Code != dfcodes.ResourceLacked && e.Code != dfcodes.UnknownError {
Expand Down
2 changes: 1 addition & 1 deletion pkg/rpc/dfdaemon/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func GetElasticClientByAddrs(addrs []dfnet.NetAddr, opts ...grpc.DialOption) (Da
return elasticDaemonClient, nil
}

// see dfdaemon.DaemonClient
// DaemonClient see dfdaemon.DaemonClient
type DaemonClient interface {
Download(ctx context.Context, req *dfdaemon.DownRequest, opts ...grpc.CallOption) (*DownResultStream, error)

Expand Down
58 changes: 31 additions & 27 deletions pkg/rpc/dfdaemon/client/down_result_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,22 +62,23 @@ func newDownResultStream(ctx context.Context, dc *daemonClient, hashKey string,
}

func (drs *DownResultStream) initStream() error {
var target string
stream, err := rpc.ExecuteWithRetry(func() (interface{}, error) {
client, target, err := drs.dc.getDaemonClient(drs.hashKey, false)
var client dfdaemon.DaemonClient
var err error
client, target, err = drs.dc.getDaemonClient(drs.hashKey, false)
if err != nil {
return nil, err
}
logger.WithTaskID(drs.hashKey).Infof("initStream: invoke daemon node %s Download", target)
return client.Download(drs.ctx, drs.req, drs.opts...)
}, drs.InitBackoff, drs.MaxBackOff, drs.MaxAttempts, nil)
if err == nil {
drs.stream = stream.(dfdaemon.Daemon_DownloadClient)
drs.StreamTimes = 1
} else {
err = drs.replaceClient(err)
if err != nil {
logger.WithTaskID(drs.hashKey).Infof("initStream: invoke daemon node %s Download failed: %v", target, err)
return drs.replaceClient(err)
}

return err
drs.stream = stream.(dfdaemon.Daemon_DownloadClient)
drs.StreamTimes = 1
return nil
}

func (drs *DownResultStream) Recv() (dr *dfdaemon.DownResult, err error) {
Expand Down Expand Up @@ -114,45 +115,48 @@ func (drs *DownResultStream) replaceStream(cause error) error {
if drs.StreamTimes >= drs.MaxAttempts {
return cause
}

var target string
stream, err := rpc.ExecuteWithRetry(func() (interface{}, error) {
client, target, err := drs.dc.getDaemonClient(drs.hashKey, true)
var client dfdaemon.DaemonClient
var err error
client, target, err = drs.dc.getDaemonClient(drs.hashKey, true)
if err != nil {
return nil, err
}
logger.WithTaskID(drs.hashKey).Infof("replaceStream: invoke daemon node %s Download", target)
return client.Download(drs.ctx, drs.req, drs.opts...)
}, drs.InitBackoff, drs.MaxBackOff, drs.MaxAttempts, cause)
if err == nil {
drs.stream = stream.(dfdaemon.Daemon_DownloadClient)
drs.StreamTimes++
} else {
err = drs.replaceClient(cause)
if err != nil {
logger.WithTaskID(drs.hashKey).Infof("replaceStream: invoke daemon node %s Download", target)
return drs.replaceClient(cause)
}
return err
drs.stream = stream.(dfdaemon.Daemon_DownloadClient)
drs.StreamTimes++
return nil
}

func (drs *DownResultStream) replaceClient(cause error) error {
preNode, err := drs.dc.TryMigrate(drs.hashKey, cause, drs.failedServers)
if err != nil {
return err
logger.WithTaskID(drs.hashKey).Infof("replaceClient: tryMigrate daemon node failed: %v", err)
return cause
}
drs.failedServers = append(drs.failedServers, preNode)

var target string
stream, err := rpc.ExecuteWithRetry(func() (interface{}, error) {
client, target, err := drs.dc.getDaemonClient(drs.hashKey, true)
var client dfdaemon.DaemonClient
var err error
client, target, err = drs.dc.getDaemonClient(drs.hashKey, true)
if err != nil {
return nil, err
}
logger.WithTaskID(drs.hashKey).Infof("replaceClient: invoke daemon node %s Download", target)
return client.Download(drs.ctx, drs.req, drs.opts...)
}, drs.InitBackoff, drs.MaxBackOff, drs.MaxAttempts, cause)
if err == nil {
drs.stream = stream.(dfdaemon.Daemon_DownloadClient)
drs.StreamTimes = 1
}
if err != nil {
err = drs.replaceClient(cause)
logger.WithTaskID(drs.hashKey).Infof("replaceClient: invoke daemon node %s Download", target)
return drs.replaceClient(cause)
}
return err
drs.stream = stream.(dfdaemon.Daemon_DownloadClient)
drs.StreamTimes = 1
return nil
}
57 changes: 30 additions & 27 deletions pkg/rpc/scheduler/client/peer_packet_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,71 +148,74 @@ func (pps *peerPacketStream) retryRecv(cause error) (*scheduler.PeerPacket, erro
}

func (pps *peerPacketStream) initStream() error {
var target string
stream, err := rpc.ExecuteWithRetry(func() (interface{}, error) {
client, target, err := pps.sc.getSchedulerClient(pps.hashKey, true)
var client scheduler.SchedulerClient
var err error
client, target, err = pps.sc.getSchedulerClient(pps.hashKey, true)
if err != nil {
return nil, err
}
logger.WithTaskID(pps.hashKey).Infof("initStream: invoke scheduler node %s ReportPieceResult", target)
return client.ReportPieceResult(pps.ctx, pps.opts...)
}, pps.retryMeta.InitBackoff, pps.retryMeta.MaxBackOff, pps.retryMeta.MaxAttempts, nil)
if err == nil {
pps.stream = stream.(scheduler.Scheduler_ReportPieceResultClient)
pps.retryMeta.StreamTimes = 1
}
if err != nil {
err = pps.replaceClient(err)
logger.WithTaskID(pps.hashKey).Infof("initStream: invoke scheduler node %s ReportPieceResult failed: %v", target, err)
return pps.replaceClient(err)
}
return err
pps.stream = stream.(scheduler.Scheduler_ReportPieceResultClient)
pps.retryMeta.StreamTimes = 1
return nil
}

func (pps *peerPacketStream) replaceStream(cause error) error {
if pps.retryMeta.StreamTimes >= pps.retryMeta.MaxAttempts {
return cause
}
var target string
res, err := rpc.ExecuteWithRetry(func() (interface{}, error) {
client, target, err := pps.sc.getSchedulerClient(pps.hashKey, true)
var client scheduler.SchedulerClient
var err error
client, target, err = pps.sc.getSchedulerClient(pps.hashKey, true)
if err != nil {
return nil, err
}
logger.WithTaskID(pps.hashKey).Infof("replaceStream: invoke scheduler node %s ReportPieceResult", target)
return client.ReportPieceResult(pps.ctx, pps.opts...)
}, pps.retryMeta.InitBackoff, pps.retryMeta.MaxBackOff, pps.retryMeta.MaxAttempts, cause)
if err == nil {
pps.stream = res.(scheduler.Scheduler_ReportPieceResultClient)
pps.retryMeta.StreamTimes++
} else {
if err != nil {
logger.WithTaskID(pps.hashKey).Infof("replaceStream: invoke scheduler node %s ReportPieceResult failed: %v", target, err)
err = pps.replaceStream(cause)
}
return err
pps.stream = res.(scheduler.Scheduler_ReportPieceResultClient)
pps.retryMeta.StreamTimes++
return nil
}

func (pps *peerPacketStream) replaceClient(cause error) error {
preNode, err := pps.sc.TryMigrate(pps.hashKey, cause, pps.failedServers)
if err != nil {
return err
logger.WithTaskID(pps.hashKey).Infof("replaceClient: tryMigrate scheduler node failed: %v", err)
return cause
}
pps.failedServers = append(pps.failedServers, preNode)

var target string
stream, err := rpc.ExecuteWithRetry(func() (interface{}, error) {
client, target, err := pps.sc.getSchedulerClient(pps.hashKey, true)
var client scheduler.SchedulerClient
var err error
client, target, err = pps.sc.getSchedulerClient(pps.hashKey, true)
if err != nil {
return nil, err
}
//timeCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
//defer cancel()
_, err = client.RegisterPeerTask(pps.ctx, pps.ptr)
if err != nil {
return nil, err
}
logger.WithTaskID(pps.hashKey).Infof("replaceClient: invoke scheduler node %s ReportPieceResult", target)
return client.ReportPieceResult(pps.ctx, pps.opts...)
}, pps.retryMeta.InitBackoff, pps.retryMeta.MaxBackOff, pps.retryMeta.MaxAttempts, cause)
if err == nil {
pps.stream = stream.(scheduler.Scheduler_ReportPieceResultClient)
pps.retryMeta.StreamTimes = 1
} else {
err = pps.replaceClient(cause)
if err != nil {
logger.WithTaskID(pps.hashKey).Infof("replaceClient: invoke scheduler node %s ReportPieceResult", target)
return pps.replaceClient(cause)
}
return err
pps.stream = stream.(scheduler.Scheduler_ReportPieceResultClient)
pps.retryMeta.StreamTimes = 1
return nil
}