From 4ccbbc8d783870eab1aa1a87e755ec295bc4f86e Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Thu, 10 Jan 2019 11:52:41 -0800 Subject: [PATCH 1/3] fix(sessions): explicitly connect found peers when providers are found in a session, explicitly connect them so they get added to the peer manager fix #53 --- bitswap_with_sessions_test.go | 40 +++++++++++++++++++ session/session.go | 2 +- sessionpeermanager/sessionpeermanager.go | 18 ++++++++- sessionpeermanager/sessionpeermanager_test.go | 6 ++- 4 files changed, 63 insertions(+), 3 deletions(-) diff --git a/bitswap_with_sessions_test.go b/bitswap_with_sessions_test.go index 5034aaee..f0b97ba8 100644 --- a/bitswap_with_sessions_test.go +++ b/bitswap_with_sessions_test.go @@ -152,6 +152,46 @@ func TestSessionSplitFetch(t *testing.T) { } } +func TestFetchNotConnected(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + vnet := getVirtualNetwork() + sesgen := NewTestSessionGenerator(vnet) + defer sesgen.Close() + bgen := blocksutil.NewBlockGenerator() + + other := sesgen.Next() + + blks := bgen.Blocks(10) + for _, block := range blks { + if err := other.Exchange.HasBlock(block); err != nil { + t.Fatal(err) + } + } + + var cids []cid.Cid + for _, blk := range blks { + cids = append(cids, blk.Cid()) + } + + thisNode := sesgen.Next() + ses := thisNode.Exchange.NewSession(ctx).(*bssession.Session) + ses.SetBaseTickDelay(time.Millisecond * 10) + + ch, err := ses.GetBlocks(ctx, cids) + if err != nil { + t.Fatal(err) + } + + var got []blocks.Block + for b := range ch { + got = append(got, b) + } + if err := assertBlockLists(got, blks); err != nil { + t.Fatal(err) + } +} func TestInterestCacheOverflow(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() diff --git a/session/session.go b/session/session.go index bae52bd0..c17b45a5 100644 --- a/session/session.go +++ b/session/session.go @@ -222,7 +222,7 @@ func (s *Session) SetBaseTickDelay(baseTickDelay time.Duration) { } } -const provSearchDelay = time.Second * 10 +const provSearchDelay = time.Second // Session run loop -- everything function below here should not be called // of this loop diff --git a/sessionpeermanager/sessionpeermanager.go b/sessionpeermanager/sessionpeermanager.go index 3b951c42..ebd3cb5f 100644 --- a/sessionpeermanager/sessionpeermanager.go +++ b/sessionpeermanager/sessionpeermanager.go @@ -4,12 +4,17 @@ import ( "context" "fmt" "math/rand" + "sync" + + logging "github.com/ipfs/go-log" cid "github.com/ipfs/go-cid" ifconnmgr "github.com/libp2p/go-libp2p-interface-connmgr" peer "github.com/libp2p/go-libp2p-peer" ) +var log = logging.Logger("bitswap") + const ( maxOptimizedPeers = 32 reservePeers = 2 @@ -18,6 +23,7 @@ const ( // PeerNetwork is an interface for finding providers and managing connections type PeerNetwork interface { ConnectionManager() ifconnmgr.ConnManager + ConnectTo(context.Context, peer.ID) error FindProvidersAsync(context.Context, cid.Cid, int) <-chan peer.ID } @@ -101,9 +107,19 @@ func (spm *SessionPeerManager) FindMorePeers(ctx context.Context, c cid.Cid) { // - manage timeouts // - ensure two 'findprovs' calls for the same block don't run concurrently // - share peers between sessions based on interest set + wg := &sync.WaitGroup{} for p := range spm.network.FindProvidersAsync(ctx, k, 10) { - spm.peerMessages <- &peerFoundMessage{p} + wg.Add(1) + go func(p peer.ID) { + defer wg.Done() + err := spm.network.ConnectTo(ctx, p) + if err != nil { + log.Debugf("failed to connect to provider %s: %s", p, err) + } + spm.peerMessages <- &peerFoundMessage{p} + }(p) } + wg.Wait() }(c) } diff --git a/sessionpeermanager/sessionpeermanager_test.go b/sessionpeermanager/sessionpeermanager_test.go index ba23c87d..b4e723b1 100644 --- a/sessionpeermanager/sessionpeermanager_test.go +++ b/sessionpeermanager/sessionpeermanager_test.go @@ -2,8 +2,8 @@ package sessionpeermanager import ( "context" - "sync" "math/rand" + "sync" "testing" "time" @@ -24,6 +24,10 @@ func (fpn *fakePeerNetwork) ConnectionManager() ifconnmgr.ConnManager { return fpn.connManager } +func (fpn *fakePeerNetwork) ConnectTo(context.Context, peer.ID) error { + return nil +} + func (fpn *fakePeerNetwork) FindProvidersAsync(ctx context.Context, c cid.Cid, num int) <-chan peer.ID { peerCh := make(chan peer.ID) go func() { From 48875c4da4317d10fc0ad093e8c39e7ddb12900b Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Fri, 11 Jan 2019 15:13:54 -0800 Subject: [PATCH 2/3] fix(session): make provSearchDelay configurable --- bitswap_with_sessions_test.go | 1 + session/session.go | 7 ++++++- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/bitswap_with_sessions_test.go b/bitswap_with_sessions_test.go index f0b97ba8..0be7bc97 100644 --- a/bitswap_with_sessions_test.go +++ b/bitswap_with_sessions_test.go @@ -156,6 +156,7 @@ func TestFetchNotConnected(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() + bssession.SetProviderSearchDelay(10 * time.Millisecond) vnet := getVirtualNetwork() sesgen := NewTestSessionGenerator(vnet) defer sesgen.Close() diff --git a/session/session.go b/session/session.go index c17b45a5..b57f472e 100644 --- a/session/session.go +++ b/session/session.go @@ -222,7 +222,12 @@ func (s *Session) SetBaseTickDelay(baseTickDelay time.Duration) { } } -const provSearchDelay = time.Second +var provSearchDelay = time.Second + +// SetProviderSearchDelay overwrites the global provider search delay +func SetProviderSearchDelay(newProvSearchDelay time.Duration) { + provSearchDelay = newProvSearchDelay +} // Session run loop -- everything function below here should not be called // of this loop From 6f7a77e0658c25b573bfdd226ee9056d58727ef1 Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Fri, 11 Jan 2019 15:15:32 -0800 Subject: [PATCH 3/3] fix(sessionpeermanager): remove waitGroup Remove sync.waitGroup in SessionPeerManager till it's needed --- sessionpeermanager/sessionpeermanager.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/sessionpeermanager/sessionpeermanager.go b/sessionpeermanager/sessionpeermanager.go index ebd3cb5f..2e733832 100644 --- a/sessionpeermanager/sessionpeermanager.go +++ b/sessionpeermanager/sessionpeermanager.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "math/rand" - "sync" logging "github.com/ipfs/go-log" @@ -107,11 +106,8 @@ func (spm *SessionPeerManager) FindMorePeers(ctx context.Context, c cid.Cid) { // - manage timeouts // - ensure two 'findprovs' calls for the same block don't run concurrently // - share peers between sessions based on interest set - wg := &sync.WaitGroup{} for p := range spm.network.FindProvidersAsync(ctx, k, 10) { - wg.Add(1) go func(p peer.ID) { - defer wg.Done() err := spm.network.ConnectTo(ctx, p) if err != nil { log.Debugf("failed to connect to provider %s: %s", p, err) @@ -119,7 +115,6 @@ func (spm *SessionPeerManager) FindMorePeers(ctx context.Context, c cid.Cid) { spm.peerMessages <- &peerFoundMessage{p} }(p) } - wg.Wait() }(c) }