Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/concurrent dead lock #509

Merged
merged 7 commits into from
Jul 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion cdnsystem/server/service/cdn_seed_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,6 @@ func (css *CdnSeedServer) GetPieceTasks(ctx context.Context, req *base.PieceTask
return nil, dferrors.Newf(dfcodes.BadRequest, "failed to validate seed request for task(%s): %v", req.TaskId, err)
}
task, err := css.taskMgr.Get(req.TaskId)
logger.Debugf("task: %+v", task)
if err != nil {
if cdnerrors.IsDataNotFound(err) {
return nil, dferrors.Newf(dfcodes.CdnTaskNotFound, "failed to get task(%s) from cdn: %v", req.TaskId, err)
Expand Down
4 changes: 2 additions & 2 deletions client/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,9 @@ func New(opt *config.DaemonOption) (Daemon, error) {
PeerId: request.PeerID,
})
if er != nil {
logger.Errorf("leave task %s/%s, error: %v", request.TaskID, request.PeerID, er)
logger.Errorf("step 4:leave task %s/%s, error: %v", request.TaskID, request.PeerID, er)
} else {
logger.Infof("leave task %s/%s state ok", request.TaskID, request.PeerID)
logger.Infof("step 4:leave task %s/%s state ok", request.TaskID, request.PeerID)
}
})
if err != nil {
Expand Down
8 changes: 5 additions & 3 deletions client/daemon/peer/peertask_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -685,7 +685,8 @@ func (pt *peerTask) getPieceTasks(span trace.Span, curPeerPacket *scheduler.Peer
span.RecordError(getErr)
// fast way to exit retry
if curPeerPacket != pt.peerPacket {
pt.Warnf("get piece tasks with error: %s, but peer packet changed, switch to new peer packet", getErr)
pt.Warnf("get piece tasks with error: %s, but peer packet changed, switch to new peer packet, current destPeer %s, new destPeer %s", getErr,
curPeerPacket.MainPeer.PeerId, pt.peerPacket.MainPeer.PeerId)
peerPacketChanged = true
return nil, true, nil
}
Expand All @@ -709,13 +710,14 @@ func (pt *peerTask) getPieceTasks(span trace.Span, curPeerPacket *scheduler.Peer
}
// fast way to exit retry
if curPeerPacket != pt.peerPacket {
pt.Warnf("get empty pieces and peer packet changed, switch to new peer packet")
pt.Warnf("get empty pieces and peer packet changed, switch to new peer packet, current destPeer %s, new destPeer %s",
curPeerPacket.MainPeer.PeerId, pt.peerPacket.MainPeer.PeerId)
peerPacketChanged = true
return nil, true, nil
}
span.AddEvent("retry due to empty pieces",
trace.WithAttributes(config.AttributeGetPieceRetry.Int(count)))
pt.Warnf("peer %s returns success but with empty pieces, retry later", peer.PeerId)
pt.Infof("peer %s returns success but with empty pieces, retry later", peer.PeerId)
return nil, false, dferrors.ErrEmptyValue
}
return pp, false, nil
Expand Down
9 changes: 6 additions & 3 deletions client/daemon/peer/peertask_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,13 @@ func newFilePeerTask(ctx context.Context,
// trace register
_, regSpan := tracer.Start(ctx, config.SpanRegisterTask)
result, err := schedulerClient.RegisterPeerTask(ctx, request)
logger.Infof("step 1: peer %s start to register", request.PeerId)
regSpan.RecordError(err)
regSpan.End()

var backSource bool
if err != nil {
logger.Errorf("step 1: peer %s register failed: err", request.PeerId, err)
// check if it is back source error
if de, ok := err.(*dferrors.DfError); ok && de.Code == dfcodes.SchedNeedBackSource {
backSource = true
Expand All @@ -102,18 +104,17 @@ func newFilePeerTask(ctx context.Context,
if !backSource {
span.RecordError(err)
span.End()
logger.Errorf("register peer task failed: %s, peer id: %s", err, request.PeerId)
return ctx, nil, nil, err
}
}
if result == nil {
defer span.End()
span.RecordError(err)
err = errors.Errorf("empty schedule result")
err = errors.Errorf("step 1: peer register result is nil")
return ctx, nil, nil, err
}
span.SetAttributes(config.AttributeTaskID.String(result.TaskId))
logger.Infof("register task success, task id: %s, peer id: %s, SizeScope: %s",
logger.Infof("step 1: register task success, task id: %s, peer id: %s, SizeScope: %s",
result.TaskId, request.PeerId, base.SizeScope_name[int32(result.SizeScope)])

var singlePiece *scheduler.SinglePiece
Expand Down Expand Up @@ -147,7 +148,9 @@ func newFilePeerTask(ctx context.Context,
}

peerPacketStream, err := schedulerClient.ReportPieceResult(ctx, result.TaskId, request)
logger.Infof("step 2: start report peer %s piece result", request.PeerId)
if err != nil {
logger.Errorf("step 2: peer %s report piece failed: err", request.PeerId, err)
defer span.End()
span.RecordError(err)
return ctx, nil, nil, err
Expand Down
8 changes: 4 additions & 4 deletions client/daemon/peer/peertask_file_callback.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,9 @@ func (p *filePeerTaskCallback) Done(pt Task) error {
Code: dfcodes.Success,
})
if err != nil {
pt.Log().Errorf("report successful peer result, error: %v", err)
pt.Log().Errorf("step 3: report successful peer result, error: %v", err)
} else {
pt.Log().Infof("report successful peer result ok")
pt.Log().Infof("step 3: report successful peer result ok")
}
return nil
}
Expand All @@ -131,9 +131,9 @@ func (p *filePeerTaskCallback) Fail(pt Task, code base.Code, reason string) erro
Code: code,
})
if err != nil {
pt.Log().Errorf("report fail peer result, error: %v", err)
pt.Log().Errorf("step 3: report fail peer result, error: %v", err)
} else {
pt.Log().Infof("report fail peer result ok")
pt.Log().Infof("step 3: report fail peer result ok")
}
return nil
}
2 changes: 2 additions & 0 deletions client/daemon/peer/peertask_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ func newStreamPeerTask(ctx context.Context,
// trace register
_, regSpan := tracer.Start(ctx, config.SpanRegisterTask)
result, err := schedulerClient.RegisterPeerTask(ctx, request)
logger.Infof("step 1: peer %s start to register", request.PeerId)
regSpan.RecordError(err)
regSpan.End()

Expand Down Expand Up @@ -128,6 +129,7 @@ func newStreamPeerTask(ctx context.Context,
}

peerPacketStream, err := schedulerClient.ReportPieceResult(ctx, result.TaskId, request)
logger.Infof("step 2: start report peer %s piece result", request.PeerId)
if err != nil {
defer span.End()
span.RecordError(err)
Expand Down
8 changes: 4 additions & 4 deletions client/daemon/peer/peertask_stream_callback.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,9 @@ func (p *streamPeerTaskCallback) Done(pt Task) error {
Code: dfcodes.Success,
})
if err != nil {
pt.Log().Errorf("report successful peer result, error: %v", err)
pt.Log().Errorf("step 3: report successful peer result, error: %v", err)
} else {
pt.Log().Infof("report successful peer result ok")
pt.Log().Infof("step 3: report successful peer result ok")
}
return nil
}
Expand All @@ -129,9 +129,9 @@ func (p *streamPeerTaskCallback) Fail(pt Task, code base.Code, reason string) er
Code: code,
})
if err != nil {
pt.Log().Errorf("report fail peer result, error: %v", err)
pt.Log().Errorf("step 3: report fail peer result, error: %v", err)
} else {
pt.Log().Infof("report fail peer result ok")
pt.Log().Infof("step 3: report fail peer result ok")
}
return nil
}
2 changes: 1 addition & 1 deletion client/daemon/service/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func (m *manager) GetPieceTasks(ctx context.Context, request *base.PieceTaskRequ
return nil, dferrors.New(code, err.Error())
}

logger.Warnf("try to get piece tasks, "+
logger.Infof("try to get piece tasks, "+
"but target peer task is initializing, "+
"there is no available pieces, "+
"task id: %s, src peer: %s, dst peer: %s, piece num: %d, limit: %d",
Expand Down
3 changes: 2 additions & 1 deletion internal/dfcodes/rpc_code.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ const (
ClientRequestLimitFail base.Code = 4006

// scheduler response error 5000-5999
SchedError base.Code = 5000
SchedError base.Code = 5000
/** @deprecated */
SchedNeedBackSource base.Code = 5001 // client should try to download from source
SchedPeerGone base.Code = 5002 // client should disconnect from scheduler
SchedPeerRegisterFail base.Code = 5003
Expand Down
66 changes: 39 additions & 27 deletions pkg/rpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"d7y.io/dragonfly/v2/internal/dferrors"
logger "d7y.io/dragonfly/v2/internal/dflog"
"d7y.io/dragonfly/v2/pkg/basic/dfnet"
"d7y.io/dragonfly/v2/pkg/synclock"
"github.com/pkg/errors"
"github.com/serialx/hashring"
"google.golang.org/grpc"
Expand Down Expand Up @@ -55,7 +54,7 @@ type ConnStatus string
type Connection struct {
ctx context.Context
cancelFun context.CancelFunc
rwMutex *synclock.LockerPool
rwMutex sync.RWMutex
dialOpts []grpc.DialOption
key2NodeMap sync.Map // key -> node(many to one)
node2ClientMap sync.Map // node -> clientConn(one to one)
Expand All @@ -75,11 +74,7 @@ func newDefaultConnection(ctx context.Context) *Connection {
return &Connection{
ctx: childCtx,
cancelFun: cancel,
rwMutex: synclock.NewLockerPool(),
dialOpts: defaultClientOpts,
key2NodeMap: sync.Map{},
node2ClientMap: sync.Map{},
accessNodeMap: sync.Map{},
connExpireTime: defaultConnExpireTime,
gcConnTimeout: defaultGcConnTimeout,
gcConnInterval: defaultGcConnInterval,
Expand Down Expand Up @@ -169,10 +164,10 @@ func (conn *Connection) CorrectKey2NodeRelation(tmpHashKey, realHashKey string)
if tmpHashKey == realHashKey {
return
}
conn.rwMutex.Lock()
defer conn.rwMutex.Unlock()
key, _ := conn.key2NodeMap.Load(tmpHashKey)
serverNode := key.(string)
conn.rwMutex.Lock(serverNode, false)
defer conn.rwMutex.UnLock(serverNode, false)
conn.key2NodeMap.Store(realHashKey, serverNode)
conn.key2NodeMap.Delete(tmpHashKey)
}
Expand All @@ -197,18 +192,33 @@ func (conn *Connection) UpdateAccessNodeMapByServerNode(serverNode string) {
}

func (conn *Connection) AddServerNodes(addrs []dfnet.NetAddr) error {
conn.rwMutex.Lock()
defer conn.rwMutex.Unlock()
for _, addr := range addrs {
serverNode := addr.GetEndpoint()
conn.rwMutex.Lock(serverNode, false)
conn.hashRing = conn.hashRing.AddNode(serverNode)
conn.rwMutex.UnLock(serverNode, false)
logger.With("conn", conn.name).Debugf("success add %s to server node list", addr)
}
return nil
}

// findCandidateClientConn find candidate node client conn other than exclusiveNodes
func (conn *Connection) findCandidateClientConn(key string, exclusiveNodes ...string) (*candidateClient, error) {
if node, ok := conn.key2NodeMap.Load(key); ok {
candidateNode := node.(string)
selected := true
for _, exclusiveNode := range exclusiveNodes {
if exclusiveNode == candidateNode {
selected = false
}
}
if selected {
if client, ok := conn.node2ClientMap.Load(node); ok {
return client.(*candidateClient), nil
}
}
}

ringNodes, ok := conn.hashRing.GetNodes(key, conn.hashRing.Size())
if !ok {
logger.Warnf("cannot obtain expected %d server nodes", conn.hashRing.Size())
Expand All @@ -231,12 +241,10 @@ func (conn *Connection) findCandidateClientConn(key string, exclusiveNodes ...st
logger.With("conn", conn.name).Infof("candidate result for hash key %s: all server node list: %v, exclusiveNodes node list: %v, candidate node list: %v",
key, ringNodes, exclusiveNodes, candidateNodes)
for _, candidateNode := range candidateNodes {
conn.rwMutex.Lock(candidateNode, true)
// Check whether there is a corresponding mapping client in the node2ClientMap
// TODO 下面部分可以直接调用loadOrCreate方法,但是日志没有这么调用打印全
if client, ok := conn.node2ClientMap.Load(candidateNode); ok {
logger.With("conn", conn.name).Infof("hit cache candidateNode %s for hash key %s", candidateNode, key)
conn.rwMutex.UnLock(candidateNode, true)
return &candidateClient{
node: candidateNode,
Ref: client,
Expand All @@ -246,15 +254,13 @@ func (conn *Connection) findCandidateClientConn(key string, exclusiveNodes ...st
clientConn, err := conn.createClient(candidateNode, append(defaultClientOpts, conn.dialOpts...)...)
if err == nil {
logger.With("conn", conn.name).Infof("success connect to candidateNode %s for hash key %s", candidateNode, key)
conn.rwMutex.UnLock(candidateNode, true)
return &candidateClient{
node: candidateNode,
Ref: clientConn,
}, nil
}

logger.With("conn", conn.name).Infof("failed to connect candidateNode %s for hash key %s: %v", candidateNode, key, err)
conn.rwMutex.UnLock(candidateNode, true)
}
return nil, dferrors.ErrNoCandidateNode
}
Expand All @@ -273,6 +279,8 @@ func (conn *Connection) createClient(target string, opts ...grpc.DialOption) (*g

// GetServerNode
func (conn *Connection) GetServerNode(hashKey string) (string, bool) {
conn.rwMutex.RLock()
defer conn.rwMutex.RUnlock()
node, ok := conn.key2NodeMap.Load(hashKey)
serverNode := node.(string)
if ok {
Expand All @@ -283,8 +291,8 @@ func (conn *Connection) GetServerNode(hashKey string) (string, bool) {

func (conn *Connection) GetClientConnByTarget(node string) (*grpc.ClientConn, error) {
logger.With("conn", conn.name).Debugf("start to get client conn by target %s", node)
conn.rwMutex.Lock(node, true)
defer conn.rwMutex.UnLock(node, true)
conn.rwMutex.RLock()
defer conn.rwMutex.RUnlock()
clientConn, err := conn.loadOrCreateClientConnByNode(node)
if err != nil {
return nil, errors.Wrapf(err, "get client conn by conn %s", node)
Expand Down Expand Up @@ -322,30 +330,32 @@ func (conn *Connection) loadOrCreateClientConnByNode(node string) (clientConn *g
// stick whether hash key need already associated with specify node
func (conn *Connection) GetClientConn(hashKey string, stick bool) (*grpc.ClientConn, error) {
logger.With("conn", conn.name).Debugf("start to get client conn hashKey %s, stick %t", hashKey, stick)
conn.rwMutex.RLock()
node, ok := conn.key2NodeMap.Load(hashKey)
if stick && !ok {
conn.rwMutex.RUnlock()
// if request is stateful, hash key must exist in key2NodeMap
return nil, fmt.Errorf("it is a stateful request, but cannot find hash key(%s) in key2NodeMap", hashKey)
}
if ok {
// if exist
serverNode := node.(string)
conn.rwMutex.Lock(serverNode, true)
clientConn, err := conn.loadOrCreateClientConnByNode(serverNode)
conn.rwMutex.UnLock(serverNode, true)
conn.rwMutex.RUnlock()
if err != nil {
return nil, err
}
return clientConn, nil
}
logger.With("conn", conn.name).Infof("no server node associated with hash key %s was found, start find candidate", hashKey)
conn.rwMutex.RUnlock()
// if absence
conn.rwMutex.Lock()
defer conn.rwMutex.Unlock()
client, err := conn.findCandidateClientConn(hashKey)
if err != nil {
return nil, errors.Wrapf(err, "prob candidate client conn for hash key %s", hashKey)
}
conn.rwMutex.Lock(client.node, false)
defer conn.rwMutex.UnLock(client.node, false)
conn.key2NodeMap.Store(hashKey, client.node)
conn.node2ClientMap.Store(client.node, client.Ref)
conn.accessNodeMap.Store(client.node, time.Now())
Expand All @@ -363,29 +373,32 @@ func (conn *Connection) TryMigrate(key string, cause error, exclusiveNodes []str
}
}
currentNode := ""
conn.rwMutex.RLock()
if currentNode, ok := conn.key2NodeMap.Load(key); ok {
preNode = currentNode.(string)
exclusiveNodes = append(exclusiveNodes, currentNode.(string))
} else {
logger.With("conn", conn.name).Warnf("failed to find server node for hash key %s", key)
}
conn.rwMutex.RUnlock()
conn.rwMutex.Lock()
defer conn.rwMutex.Unlock()
client, err := conn.findCandidateClientConn(key, exclusiveNodes...)
if err != nil {
return "", errors.Wrapf(err, "find candidate client conn for hash key %s", key)
}
logger.With("conn", conn.name).Infof("successfully migrate hash key %s from server node %s to %s", key, currentNode, client.node)
conn.rwMutex.Lock(client.node, false)
defer conn.rwMutex.UnLock(client.node, false)
conn.key2NodeMap.Store(key, client.node)
conn.node2ClientMap.Store(client.node, client.Ref)
conn.accessNodeMap.Store(client.node, time.Now())
return
}

func (conn *Connection) Close() error {
conn.rwMutex.Lock()
defer conn.rwMutex.Unlock()
for i := range conn.serverNodes {
serverNode := conn.serverNodes[i].GetEndpoint()
conn.rwMutex.Lock(serverNode, false)
conn.hashRing.RemoveNode(serverNode)
value, ok := conn.node2ClientMap.Load(serverNode)
if ok {
Expand All @@ -406,19 +419,18 @@ func (conn *Connection) Close() error {
return true
})
conn.accessNodeMap.Delete(serverNode)
conn.rwMutex.UnLock(serverNode, false)
}
conn.cancelFun()
return nil
}

func (conn *Connection) UpdateState(addrs []dfnet.NetAddr) {
// TODO lock
conn.serverNodes = addrs
var addresses []string
for _, addr := range addrs {
addresses = append(addresses, addr.GetEndpoint())
}

conn.rwMutex.Lock()
defer conn.rwMutex.Unlock()
conn.serverNodes = addrs
conn.hashRing = hashring.New(addresses)
}
Loading