Skip to content

Commit

Permalink
go/p2p/rpc: Fix peer grading and memory leaks in RPC multi calls
Browse files Browse the repository at this point in the history
  • Loading branch information
peternose committed Oct 29, 2022
1 parent b797444 commit 1aefc95
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 26 deletions.
4 changes: 4 additions & 0 deletions .changelog/5007.bugfix.1.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
go/p2p/rpc: Fix peer grading when context is canceled

When method `CallMulti` finishes early, the requests in progress are canceled
and unfairly recorded as failed.
4 changes: 4 additions & 0 deletions .changelog/5007.bugfix.2.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
go/p2p/rpc: Fix memory leak when RPC multi call finishes early

When method `CallMulti` finishes early, the result channel is never cleared.
Therefore, the channel never closes and leaves one go routine hanging.
53 changes: 27 additions & 26 deletions go/p2p/rpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"time"

"github.com/cenkalti/backoff/v4"
"github.com/eapache/channels"
"github.com/libp2p/go-libp2p/core"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
Expand Down Expand Up @@ -398,18 +397,16 @@ func (c *client) CallMulti(
type result struct {
rsp interface{}
pf PeerFeedback
err error
}
var resultChs []channels.SimpleOutChannel

// Prepare a non-blocking channel for workers to push their results.
resultCh := make(chan result, len(peers))

for _, peer := range peers {
peer := peer // Make sure goroutine below operates on the right instance.

ch := channels.NewNativeChannel(channels.BufferCap(1))
resultChs = append(resultChs, ch)

pool.Submit(func() {
defer close(ch)

// Abort early in case we are done.
select {
case <-peerCtx.Done():
Expand All @@ -419,36 +416,37 @@ func (c *client) CallMulti(

rsp := reflect.New(reflect.TypeOf(rspTyp)).Interface()
pf, err := c.timeCall(peerCtx, peer, &request, rsp, co.maxPeerResponseTime)
if err != nil {
// Ignore failed results.
return
}

ch.In() <- &result{rsp, pf}
resultCh <- result{rsp, pf, err}
})
}

if len(resultChs) == 0 {
return nil, nil, nil
}
resultCh := channels.NewNativeChannel(channels.None)
channels.Multiplex(resultCh, resultChs...)

// Gather results.
var (
rsps []interface{}
pfs []PeerFeedback
)
for r := range resultCh.Out() {
result := r.(*result)

rsps = append(rsps, result.rsp)
pfs = append(pfs, result.pf)

if co.aggregateFn != nil {
if !co.aggregateFn(result.rsp, result.pf) {
loop:
for i := 0; i < len(peers); i++ {
select {
case result := <-resultCh:
// Ignore failed results.
if result.err != nil {
break
}

rsps = append(rsps, result.rsp)
pfs = append(pfs, result.pf)

if co.aggregateFn != nil {
if !co.aggregateFn(result.rsp, result.pf) {
break loop
}
}

case <-peerCtx.Done():
break loop
}
}

Expand All @@ -472,7 +470,10 @@ func (c *client) timeCall(
latency := time.Since(start)

if err != nil {
c.recordFailure(peerID, latency)
// If the caller canceled the context we should not degrade the peer.
if !errors.Is(err, context.Canceled) {
c.recordFailure(peerID, latency)
}

c.logger.Debug("failed to call method",
"err", err,
Expand Down

0 comments on commit 1aefc95

Please sign in to comment.