Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: client rpcserver subscriber hang #3246

Merged
merged 4 commits into from
May 7, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 42 additions & 20 deletions client/daemon/rpcserver/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,13 @@ func searchNextPieceNum(sentMap map[int32]struct{}, cur uint32) (nextPieceNum ui
}

// sendExistPieces will send as much as possible pieces
func (s *subscriber) sendExistPieces(startNum uint32) (total int32, err error) {
func (s *subscriber) sendExistPieces(startNum uint32) error {
s.request.StartNum = startNum
return sendExistPieces(s.sync.Context(), s.SugaredLoggerOnWith, s.getPieces, s.request, s.sync, s.sentMap, true)
total, err := sendExistPieces(s.sync.Context(), s.SugaredLoggerOnWith, s.getPieces, s.request, s.sync, s.sentMap, true)
if total > -1 && s.isUnknownTotalPieces() {
s.totalPieces = total
}
return err
}

func (s *subscriber) receiveRemainingPieceTaskRequests() {
Expand Down Expand Up @@ -161,9 +165,23 @@ func (s *subscriber) receiveRemainingPieceTaskRequests() {
}
}

// totalPieces is -1, 0, n
func (s *subscriber) isKnownTotalPieces() bool {
return s.totalPieces > -1
}

func (s *subscriber) isUnknownTotalPieces() bool {
return !s.isKnownTotalPieces()
}

func (s *subscriber) isAllPiecesSent(nextPieceNum uint32) bool {
return nextPieceNum == uint32(s.totalPieces)
}

func (s *subscriber) sendRemainingPieceTasks() error {
// nextPieceNum is the least piece num which did not send to remote peer
// may great then total piece count, check the total piece count when use it
// available values: [0, n], n is total piece count
// when nextPieceNum is n, indicate all pieces done
var nextPieceNum uint32
s.Lock()
for i := int32(s.skipPieceCount); ; i++ {
Expand All @@ -173,6 +191,7 @@ func (s *subscriber) sendRemainingPieceTasks() error {
}
}
s.Unlock()
s.Debugf("desired next piece num: %d", nextPieceNum)
loop:
for {
select {
Expand All @@ -182,51 +201,54 @@ loop:
case info := <-s.PieceInfoChannel:
s.Infof("receive piece info, num: %d, finished: %v", info.Num, info.Finished)
// not desired piece
if s.totalPieces > -1 && uint32(info.Num) < nextPieceNum {
if uint32(info.Num) < nextPieceNum {
continue
}

s.Lock()
total, err := s.sendExistPieces(uint32(info.Num))
err := s.sendExistPieces(uint32(info.Num))
if err != nil {
err = s.saveError(err)
s.Unlock()
return err
}
if total > -1 && s.totalPieces == -1 {
s.totalPieces = total
}
if s.totalPieces > -1 && len(s.sentMap)+int(s.skipPieceCount) == int(s.totalPieces) {
s.Unlock()
break loop
}
if info.Finished {

nextPieceNum = s.searchNextPieceNum(nextPieceNum)
s.Debugf("update desired next piece num: %d", nextPieceNum)

if info.Finished && s.isAllPiecesSent(nextPieceNum) {
s.Unlock()
break loop
}
nextPieceNum = s.searchNextPieceNum(nextPieceNum)
s.Unlock()
case <-s.Success:
s.Infof("peer task is success, send remaining pieces")
s.Lock()
// all pieces already sent
// empty piece task will reach sendExistPieces to sync content length and piece count
if s.totalPieces > 0 && nextPieceNum == uint32(s.totalPieces) {
if s.totalPieces > 0 && s.isAllPiecesSent(nextPieceNum) {
s.Unlock()
break loop
}
total, err := s.sendExistPieces(nextPieceNum)

err := s.sendExistPieces(nextPieceNum)
if err != nil {
err = s.saveError(err)
s.Unlock()
return err
}
if total > -1 && s.totalPieces == -1 {
s.totalPieces = total

if s.isUnknownTotalPieces() {
s.Unlock()
msg := "task success, but total pieces is unknown"
s.Errorf(msg)
return dferrors.Newf(commonv1.Code_ClientError, msg)
}
if s.totalPieces > -1 && len(s.sentMap)+int(s.skipPieceCount) != int(s.totalPieces) {

nextPieceNum = s.searchNextPieceNum(nextPieceNum)
if !s.isAllPiecesSent(nextPieceNum) {
s.Unlock()
msg := "peer task success, but can not send all pieces"
msg := "task success, but not all pieces are sent out"
s.Errorf(msg)
return dferrors.Newf(commonv1.Code_ClientError, msg)
}
Expand Down
Loading