From 501c1db26c2ae16acc9af1697175963d9a403f30 Mon Sep 17 00:00:00 2001 From: Hannah Howard Date: Wed, 28 Oct 2020 19:27:40 -0700 Subject: [PATCH] Update graphsync & fix in-progress request memory leak by consuming responses (#109) * feat(deps): update to graphsync v0.4.1 * feat(graphsync): consume response channel consume response channel so graphsync does not buffer responses in memory * fix(deps): update graphsync to fix bug --- go.mod | 4 ++-- go.sum | 12 ++++++------ testutil/fakegraphsync.go | 3 ++- transport/graphsync/graphsync.go | 25 ++++++++++--------------- transport/graphsync/graphsync_test.go | 3 +++ 5 files changed, 23 insertions(+), 24 deletions(-) diff --git a/go.mod b/go.mod index b9f6c58f..852dba13 100644 --- a/go.mod +++ b/go.mod @@ -14,7 +14,7 @@ require ( github.com/ipfs/go-cid v0.0.7 github.com/ipfs/go-datastore v0.4.5 github.com/ipfs/go-ds-badger v0.2.1 - github.com/ipfs/go-graphsync v0.3.1 + github.com/ipfs/go-graphsync v0.4.2 github.com/ipfs/go-ipfs-blockstore v1.0.1 github.com/ipfs/go-ipfs-blocksutil v0.0.1 github.com/ipfs/go-ipfs-chunker v0.0.5 @@ -26,7 +26,7 @@ require ( github.com/ipfs/go-log/v2 v2.0.3 github.com/ipfs/go-merkledag v0.3.2 github.com/ipfs/go-unixfs v0.2.4 - github.com/ipld/go-ipld-prime v0.5.1-0.20200828233916-988837377a7f + github.com/ipld/go-ipld-prime v0.5.1-0.20201021195245-109253e8a018 github.com/jbenet/go-random v0.0.0-20190219211222-123a90aedc0c github.com/jpillora/backoff v1.0.0 github.com/libp2p/go-libp2p v0.6.0 diff --git a/go.sum b/go.sum index b4eae04a..1d0aae27 100644 --- a/go.sum +++ b/go.sum @@ -212,8 +212,8 @@ github.com/ipfs/go-ds-badger v0.2.1 h1:RsC9DDlwFhFdfT+s2PeC8joxbSp2YMufK8w/RBOxK github.com/ipfs/go-ds-badger v0.2.1/go.mod h1:Tx7l3aTph3FMFrRS838dcSJh+jjA7cX9DrGVwx/NOwE= github.com/ipfs/go-ds-leveldb v0.0.1/go.mod h1:feO8V3kubwsEF22n0YRQCffeb79OOYIykR4L04tMOYc= github.com/ipfs/go-ds-leveldb v0.4.1/go.mod h1:jpbku/YqBSsBc1qgME8BkWS4AxzF2cEu1Ii2r79Hh9s= -github.com/ipfs/go-graphsync v0.3.1 h1:dJLYrck4oyJDfMVhGEKiWHxaY8oYMWko4m2Fi+4bofo= -github.com/ipfs/go-graphsync v0.3.1/go.mod h1:bw4LiLM5Oq/uLdzEtih9LK8GrwSijv+XqYiWCTxHMqs= +github.com/ipfs/go-graphsync v0.4.2 h1:Y/jt5r619yj0LI7OLtGKh4jYm8goYUcuJ09y7TZ3zMo= +github.com/ipfs/go-graphsync v0.4.2/go.mod h1:/VmbZTUdUMTbNkgzAiCEucIIAU3BkLE2cZrDCVUhyi0= github.com/ipfs/go-ipfs-blockstore v0.0.1/go.mod h1:d3WClOmRQKFnJ0Jz/jj/zmksX0ma1gROTlovZKBmN08= github.com/ipfs/go-ipfs-blockstore v0.1.0/go.mod h1:5aD0AvHPi7mZc6Ci1WCAhiBQu2IsfTduLl+422H6Rqw= github.com/ipfs/go-ipfs-blockstore v0.1.4 h1:2SGI6U1B44aODevza8Rde3+dY30Pb+lbcObe1LETxOQ= @@ -290,10 +290,10 @@ github.com/ipfs/go-unixfs v0.2.4 h1:6NwppOXefWIyysZ4LR/qUBPvXd5//8J3jiMdvpbw6Lo= github.com/ipfs/go-unixfs v0.2.4/go.mod h1:SUdisfUjNoSDzzhGVxvCL9QO/nKdwXdr+gbMUdqcbYw= github.com/ipfs/go-verifcid v0.0.1 h1:m2HI7zIuR5TFyQ1b79Da5N9dnnCP1vcu2QqawmWlK2E= github.com/ipfs/go-verifcid v0.0.1/go.mod h1:5Hrva5KBeIog4A+UpqlaIU+DEstipcJYQQZc0g37pY0= -github.com/ipld/go-ipld-prime v0.5.1-0.20200828233916-988837377a7f h1:XpOuNQ5GbXxUcSukbQcW9jkE7REpaFGJU2/T00fo9kA= -github.com/ipld/go-ipld-prime v0.5.1-0.20200828233916-988837377a7f/go.mod h1:0xEgdD6MKbZ1vF0GC+YcR/C4SQCAlRuOjIJ2i0HxqzM= -github.com/ipld/go-ipld-prime-proto v0.0.0-20200922192210-9a2bfd4440a6 h1:6Mq+tZGSEMEoJJ1NbJRhddeelkXZcU8yfH/ZRYUo/Es= -github.com/ipld/go-ipld-prime-proto v0.0.0-20200922192210-9a2bfd4440a6/go.mod h1:3pHYooM9Ea65jewRwrb2u5uHZCNkNTe9ABsVB+SrkH0= +github.com/ipld/go-ipld-prime v0.5.1-0.20201021195245-109253e8a018 h1:RbRHv8epkmvBYA5cGfz68GUSbOgx5j/7ObLIl4Rsif0= +github.com/ipld/go-ipld-prime v0.5.1-0.20201021195245-109253e8a018/go.mod h1:0xEgdD6MKbZ1vF0GC+YcR/C4SQCAlRuOjIJ2i0HxqzM= +github.com/ipld/go-ipld-prime-proto v0.1.0 h1:j7gjqrfwbT4+gXpHwEx5iMssma3mnctC7YaCimsFP70= +github.com/ipld/go-ipld-prime-proto v0.1.0/go.mod h1:11zp8f3sHVgIqtb/c9Kr5ZGqpnCLF1IVTNOez9TopzE= github.com/jackpal/gateway v1.0.5 h1:qzXWUJfuMdlLMtt0a3Dgt+xkWQiA5itDEITVJtuSwMc= github.com/jackpal/gateway v1.0.5/go.mod h1:lTpwd4ACLXmpyiCTRtfiNyVnUmqT9RivzCDQetPfnjA= github.com/jackpal/go-nat-pmp v1.0.1 h1:i0LektDkO1QlrTm/cSuP+PyBCDnYvjPLGl4LdWEMiaA= diff --git a/testutil/fakegraphsync.go b/testutil/fakegraphsync.go index 3dd426a5..25e7a035 100644 --- a/testutil/fakegraphsync.go +++ b/testutil/fakegraphsync.go @@ -43,6 +43,7 @@ type ReceivedGraphSyncRequest struct { Root ipld.Link Selector ipld.Node Extensions []graphsync.ExtensionData + ResponseChan chan graphsync.ResponseProgress ResponseErrChan chan error } @@ -247,8 +248,8 @@ func (fgs *FakeGraphSync) AssertDoesNotHavePersistenceOption(t *testing.T, name // Request initiates a new GraphSync request to the given peer using the given selector spec. func (fgs *FakeGraphSync) Request(ctx context.Context, p peer.ID, root ipld.Link, selector ipld.Node, extensions ...graphsync.ExtensionData) (<-chan graphsync.ResponseProgress, <-chan error) { errors := make(chan error) - fgs.requests <- ReceivedGraphSyncRequest{ctx, p, root, selector, extensions, errors} responses := make(chan graphsync.ResponseProgress) + fgs.requests <- ReceivedGraphSyncRequest{ctx, p, root, selector, extensions, responses, errors} if !fgs.leaveRequestsOpen { close(responses) close(errors) diff --git a/transport/graphsync/graphsync.go b/transport/graphsync/graphsync.go index bb3944b7..8d751e95 100644 --- a/transport/graphsync/graphsync.go +++ b/transport/graphsync/graphsync.go @@ -126,29 +126,24 @@ func (t *Transport) OpenChannel(ctx context.Context, Data: bz} exts = append(exts, doNotSendExt) } - _, errChan := t.gs.Request(internalCtx, dataSender, root, stor, exts...) + responseChan, errChan := t.gs.Request(internalCtx, dataSender, root, stor, exts...) - go t.executeGsRequest(ctx, channelID, errChan) + go t.executeGsRequest(ctx, channelID, responseChan, errChan) return nil } -func (t *Transport) consumeResponses(ctx context.Context, errChan <-chan error) error { +func (t *Transport) consumeResponses(responseChan <-chan graphsync.ResponseProgress, errChan <-chan error) error { var lastError error - for { - select { - case <-ctx.Done(): - return errContextCancelled - case err, ok := <-errChan: - if !ok { - return lastError - } - lastError = err - } + for range responseChan { + } + for err := range errChan { + lastError = err } + return lastError } -func (t *Transport) executeGsRequest(ctx context.Context, channelID datatransfer.ChannelID, errChan <-chan error) { - lastError := t.consumeResponses(ctx, errChan) +func (t *Transport) executeGsRequest(ctx context.Context, channelID datatransfer.ChannelID, responseChan <-chan graphsync.ResponseProgress, errChan <-chan error) { + lastError := t.consumeResponses(responseChan, errChan) if _, ok := lastError.(graphsync.RequestContextCancelledErr); ok { log.Warnf("graphsync request context cancelled, channel Id: %v", channelID) diff --git a/transport/graphsync/graphsync_test.go b/transport/graphsync/graphsync_test.go index 0e6f767a..7eb90359 100644 --- a/transport/graphsync/graphsync_test.go +++ b/transport/graphsync/graphsync_test.go @@ -710,6 +710,7 @@ func TestManager(t *testing.T) { }, check: func(t *testing.T, events *fakeEvents, gsData *harness) { requestReceived := gsData.fgs.AssertRequestReceived(gsData.ctx, t) + close(requestReceived.ResponseChan) close(requestReceived.ResponseErrChan) require.Eventually(t, func() bool { @@ -734,6 +735,7 @@ func TestManager(t *testing.T) { }, check: func(t *testing.T, events *fakeEvents, gsData *harness) { requestReceived := gsData.fgs.AssertRequestReceived(gsData.ctx, t) + close(requestReceived.ResponseChan) requestReceived.ResponseErrChan <- graphsync.RequestFailedUnknownErr{} close(requestReceived.ResponseErrChan) @@ -789,6 +791,7 @@ func TestManager(t *testing.T) { }, check: func(t *testing.T, events *fakeEvents, gsData *harness) { requestReceived := gsData.fgs.AssertRequestReceived(gsData.ctx, t) + close(requestReceived.ResponseChan) requestReceived.ResponseErrChan <- graphsync.RequestContextCancelledErr{} close(requestReceived.ResponseErrChan)