From d83f05586179286c7eb8f5871c3b74782bbef911 Mon Sep 17 00:00:00 2001 From: Jim Ma Date: Tue, 30 Apr 2024 22:33:08 +0800 Subject: [PATCH 1/4] fix: client rpcserver subscriber hang Signed-off-by: Jim Ma --- client/daemon/rpcserver/subscriber.go | 52 +++++++++++++++++---------- 1 file changed, 34 insertions(+), 18 deletions(-) diff --git a/client/daemon/rpcserver/subscriber.go b/client/daemon/rpcserver/subscriber.go index 4028a9dd050..5143f5d9cfe 100644 --- a/client/daemon/rpcserver/subscriber.go +++ b/client/daemon/rpcserver/subscriber.go @@ -115,9 +115,13 @@ func searchNextPieceNum(sentMap map[int32]struct{}, cur uint32) (nextPieceNum ui } // sendExistPieces will send as much as possible pieces -func (s *subscriber) sendExistPieces(startNum uint32) (total int32, err error) { +func (s *subscriber) sendExistPieces(startNum uint32) error { s.request.StartNum = startNum - return sendExistPieces(s.sync.Context(), s.SugaredLoggerOnWith, s.getPieces, s.request, s.sync, s.sentMap, true) + total, err := sendExistPieces(s.sync.Context(), s.SugaredLoggerOnWith, s.getPieces, s.request, s.sync, s.sentMap, true) + if total > -1 && s.isUnknownTotalPieces() { + s.totalPieces = total + } + return err } func (s *subscriber) receiveRemainingPieceTaskRequests() { @@ -161,9 +165,18 @@ func (s *subscriber) receiveRemainingPieceTaskRequests() { } } +// totalPieces is [0, n) +func (s *subscriber) isKnownTotalPieces() bool { + return s.totalPieces > -1 +} + +func (s *subscriber) isUnknownTotalPieces() bool { + return !s.isKnownTotalPieces() +} + func (s *subscriber) sendRemainingPieceTasks() error { // nextPieceNum is the least piece num which did not send to remote peer - // may great then total piece count, check the total piece count when use it + // may great then total piece count var nextPieceNum uint32 s.Lock() for i := int32(s.skipPieceCount); ; i++ { @@ -173,6 +186,7 @@ func (s *subscriber) sendRemainingPieceTasks() error { } } s.Unlock() + s.Debugf("desired next piece num: %d", nextPieceNum) loop: for { select { @@ -182,29 +196,24 @@ loop: case info := <-s.PieceInfoChannel: s.Infof("receive piece info, num: %d, finished: %v", info.Num, info.Finished) // not desired piece - if s.totalPieces > -1 && uint32(info.Num) < nextPieceNum { + if uint32(info.Num) < nextPieceNum { continue } s.Lock() - total, err := s.sendExistPieces(uint32(info.Num)) + err := s.sendExistPieces(uint32(info.Num)) if err != nil { err = s.saveError(err) s.Unlock() return err } - if total > -1 && s.totalPieces == -1 { - s.totalPieces = total - } - if s.totalPieces > -1 && len(s.sentMap)+int(s.skipPieceCount) == int(s.totalPieces) { - s.Unlock() - break loop - } + if info.Finished { s.Unlock() break loop } nextPieceNum = s.searchNextPieceNum(nextPieceNum) + s.Debugf("update desired next piece num: %d", nextPieceNum) s.Unlock() case <-s.Success: s.Infof("peer task is success, send remaining pieces") @@ -215,18 +224,25 @@ loop: s.Unlock() break loop } - total, err := s.sendExistPieces(nextPieceNum) + + err := s.sendExistPieces(nextPieceNum) if err != nil { err = s.saveError(err) s.Unlock() return err } - if total > -1 && s.totalPieces == -1 { - s.totalPieces = total - } - if s.totalPieces > -1 && len(s.sentMap)+int(s.skipPieceCount) != int(s.totalPieces) { + + if s.isKnownTotalPieces() { + nextPieceNum = s.searchNextPieceNum(nextPieceNum) + if int32(nextPieceNum) < s.totalPieces { + s.Unlock() + msg := "task success, but not all pieces are sent out" + s.Errorf(msg) + return dferrors.Newf(commonv1.Code_ClientError, msg) + } + } else { s.Unlock() - msg := "peer task success, but can not send all pieces" + msg := "task success, but total pieces is unknown" s.Errorf(msg) return dferrors.Newf(commonv1.Code_ClientError, msg) } From 89a260119ac4d78b5a9606d87062392806048bf7 Mon Sep 17 00:00:00 2001 From: Jim Ma Date: Mon, 6 May 2024 10:41:55 +0800 Subject: [PATCH 2/4] chore: optimize rpcserver subscriber logic Signed-off-by: Jim Ma --- client/daemon/rpcserver/subscriber.go | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/client/daemon/rpcserver/subscriber.go b/client/daemon/rpcserver/subscriber.go index 5143f5d9cfe..be3435288b5 100644 --- a/client/daemon/rpcserver/subscriber.go +++ b/client/daemon/rpcserver/subscriber.go @@ -232,20 +232,20 @@ loop: return err } - if s.isKnownTotalPieces() { - nextPieceNum = s.searchNextPieceNum(nextPieceNum) - if int32(nextPieceNum) < s.totalPieces { - s.Unlock() - msg := "task success, but not all pieces are sent out" - s.Errorf(msg) - return dferrors.Newf(commonv1.Code_ClientError, msg) - } - } else { + if s.isUnknownTotalPieces() { s.Unlock() msg := "task success, but total pieces is unknown" s.Errorf(msg) return dferrors.Newf(commonv1.Code_ClientError, msg) } + + nextPieceNum = s.searchNextPieceNum(nextPieceNum) + if int32(nextPieceNum) < s.totalPieces { + s.Unlock() + msg := "task success, but not all pieces are sent out" + s.Errorf(msg) + return dferrors.Newf(commonv1.Code_ClientError, msg) + } s.Unlock() break loop case <-s.Fail: From 174f9d708b38d2da4ea6dbe64d59f115d557033a Mon Sep 17 00:00:00 2001 From: Jim Ma Date: Mon, 6 May 2024 14:59:52 +0800 Subject: [PATCH 3/4] chore: update comment Signed-off-by: Jim Ma --- client/daemon/rpcserver/subscriber.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/client/daemon/rpcserver/subscriber.go b/client/daemon/rpcserver/subscriber.go index be3435288b5..5ebbccf678b 100644 --- a/client/daemon/rpcserver/subscriber.go +++ b/client/daemon/rpcserver/subscriber.go @@ -165,7 +165,7 @@ func (s *subscriber) receiveRemainingPieceTaskRequests() { } } -// totalPieces is [0, n) +// totalPieces is -1, 0, n func (s *subscriber) isKnownTotalPieces() bool { return s.totalPieces > -1 } @@ -176,7 +176,8 @@ func (s *subscriber) isUnknownTotalPieces() bool { func (s *subscriber) sendRemainingPieceTasks() error { // nextPieceNum is the least piece num which did not send to remote peer - // may great then total piece count + // available values: [0, n], n is total piece count + // when nextPieceNum is n, indicate all pieces done var nextPieceNum uint32 s.Lock() for i := int32(s.skipPieceCount); ; i++ { From 53a4880c680cd04723471d51ad1e284862a71192 Mon Sep 17 00:00:00 2001 From: Jim Ma Date: Mon, 6 May 2024 15:56:09 +0800 Subject: [PATCH 4/4] chore: optimize isAllPiecesSent Signed-off-by: Jim Ma --- client/daemon/rpcserver/subscriber.go | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/client/daemon/rpcserver/subscriber.go b/client/daemon/rpcserver/subscriber.go index 5ebbccf678b..a9382f7adff 100644 --- a/client/daemon/rpcserver/subscriber.go +++ b/client/daemon/rpcserver/subscriber.go @@ -174,6 +174,10 @@ func (s *subscriber) isUnknownTotalPieces() bool { return !s.isKnownTotalPieces() } +func (s *subscriber) isAllPiecesSent(nextPieceNum uint32) bool { + return nextPieceNum == uint32(s.totalPieces) +} + func (s *subscriber) sendRemainingPieceTasks() error { // nextPieceNum is the least piece num which did not send to remote peer // available values: [0, n], n is total piece count @@ -209,19 +213,20 @@ loop: return err } - if info.Finished { + nextPieceNum = s.searchNextPieceNum(nextPieceNum) + s.Debugf("update desired next piece num: %d", nextPieceNum) + + if info.Finished && s.isAllPiecesSent(nextPieceNum) { s.Unlock() break loop } - nextPieceNum = s.searchNextPieceNum(nextPieceNum) - s.Debugf("update desired next piece num: %d", nextPieceNum) s.Unlock() case <-s.Success: s.Infof("peer task is success, send remaining pieces") s.Lock() // all pieces already sent // empty piece task will reach sendExistPieces to sync content length and piece count - if s.totalPieces > 0 && nextPieceNum == uint32(s.totalPieces) { + if s.totalPieces > 0 && s.isAllPiecesSent(nextPieceNum) { s.Unlock() break loop } @@ -241,7 +246,7 @@ loop: } nextPieceNum = s.searchNextPieceNum(nextPieceNum) - if int32(nextPieceNum) < s.totalPieces { + if !s.isAllPiecesSent(nextPieceNum) { s.Unlock() msg := "task success, but not all pieces are sent out" s.Errorf(msg)