From 82e1ff585f5bd6d09e3347bfccb7374e853d0a5b Mon Sep 17 00:00:00 2001 From: Jeromy Date: Sun, 4 Feb 2018 15:09:03 -0800 Subject: [PATCH 1/8] shutdown notifications engine when closing a bitswap session License: MIT Signed-off-by: Jeromy --- exchange/bitswap/session.go | 1 + 1 file changed, 1 insertion(+) diff --git a/exchange/bitswap/session.go b/exchange/bitswap/session.go index 07444ad36f4..049be4e9e20 100644 --- a/exchange/bitswap/session.go +++ b/exchange/bitswap/session.go @@ -83,6 +83,7 @@ func (bs *Bitswap) NewSession(ctx context.Context) *Session { } func (bs *Bitswap) removeSession(s *Session) { + s.notif.Shutdown() bs.sessLk.Lock() defer bs.sessLk.Unlock() for i := 0; i < len(bs.sessions); i++ { From 64c19cc9044a834855691cceb2e460aecbaccedc Mon Sep 17 00:00:00 2001 From: Jeromy Date: Mon, 5 Feb 2018 12:14:35 -0800 Subject: [PATCH 2/8] WIP: fix wantlist clearing by closing down session License: MIT Signed-off-by: Jeromy --- exchange/bitswap/session.go | 8 ++++++++ exchange/bitswap/session_test.go | 33 ++++++++++++++++++++++++++++++++ 2 files changed, 41 insertions(+) diff --git a/exchange/bitswap/session.go b/exchange/bitswap/session.go index 049be4e9e20..bc824dbee82 100644 --- a/exchange/bitswap/session.go +++ b/exchange/bitswap/session.go @@ -84,6 +84,14 @@ func (bs *Bitswap) NewSession(ctx context.Context) *Session { func (bs *Bitswap) removeSession(s *Session) { s.notif.Shutdown() + + live := make([]*cid.Cid, 0, len(s.liveWants)) + for c := range s.liveWants { + cs, _ := cid.Cast([]byte(c)) + live = append(live, cs) + } + bs.CancelWants(live, s.id) + bs.sessLk.Lock() defer bs.sessLk.Unlock() for i := 0; i < len(bs.sessions); i++ { diff --git a/exchange/bitswap/session_test.go b/exchange/bitswap/session_test.go index 6458904548a..2fe4672b06c 100644 --- a/exchange/bitswap/session_test.go +++ b/exchange/bitswap/session_test.go @@ -285,3 +285,36 @@ func TestMultipleSessions(t *testing.T) { } _ = blkch } + +func TestWantlistClearsOnCancel(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + vnet := getVirtualNetwork() + sesgen := NewTestSessionGenerator(vnet) + defer sesgen.Close() + bgen := blocksutil.NewBlockGenerator() + + blks := bgen.Blocks(10) + var cids []*cid.Cid + for _, blk := range blks { + cids = append(cids, blk.Cid()) + } + + inst := sesgen.Instances(1) + + a := inst[0] + + ctx1, cancel1 := context.WithCancel(ctx) + ses := a.Exchange.NewSession(ctx1) + + _, err := ses.GetBlocks(ctx, cids) + if err != nil { + t.Fatal(err) + } + cancel1() + + if len(a.Exchange.GetWantlist()) > 0 { + t.Fatal("expected empty wantlist") + } +} From 2b99858dda9d1f24ec33f38e982c2abde333cc4c Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Thu, 8 Feb 2018 17:48:22 -0800 Subject: [PATCH 3/8] remove excessive time.Now() calls from bitswap sessions License: MIT Signed-off-by: Steven Allen --- exchange/bitswap/session.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/exchange/bitswap/session.go b/exchange/bitswap/session.go index bc824dbee82..937376723e2 100644 --- a/exchange/bitswap/session.go +++ b/exchange/bitswap/session.go @@ -279,8 +279,9 @@ func (s *Session) receiveBlock(ctx context.Context, blk blocks.Block) { } func (s *Session) wantBlocks(ctx context.Context, ks []*cid.Cid) { + now := time.Now() for _, c := range ks { - s.liveWants[c.KeyString()] = time.Now() + s.liveWants[c.KeyString()] = now } s.bs.wm.WantBlocks(ctx, ks, s.activePeersArr, s.id) } From 53958266d659acd2df33466c8d20ad46751322e3 Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Fri, 9 Feb 2018 12:19:21 -0800 Subject: [PATCH 4/8] bitswap: finish unsubscribing from the pubsub instance before shutting it down Otherwise, we'll deadlock and leak a goroutine. This fix is kind of crappy but modifying the pubsub library would have been worse (and, really, it *is* reasonable to say "don't use the pubsub instance after shutting it down"). License: MIT Signed-off-by: Steven Allen --- .../bitswap/notifications/notifications.go | 46 +++++++++++++++++-- 1 file changed, 43 insertions(+), 3 deletions(-) diff --git a/exchange/bitswap/notifications/notifications.go b/exchange/bitswap/notifications/notifications.go index ba5b379ec8b..defea700aee 100644 --- a/exchange/bitswap/notifications/notifications.go +++ b/exchange/bitswap/notifications/notifications.go @@ -2,6 +2,7 @@ package notifications import ( "context" + "sync" blocks "gx/ipfs/Qmej7nf81hi2x2tvjRBF3mcp74sQyuDH4VMYDGd1YtXjb2/go-block-format" @@ -18,18 +19,33 @@ type PubSub interface { } func New() PubSub { - return &impl{*pubsub.New(bufferSize)} + return &impl{ + wrapped: *pubsub.New(bufferSize), + cancel: make(chan struct{}), + } } type impl struct { wrapped pubsub.PubSub + + // These two fields make up a shutdown "lock". + // We need them as calling, e.g., `Unsubscribe` after calling `Shutdown` + // blocks forever and fixing this in pubsub would be rather invasive. + cancel chan struct{} + wg sync.WaitGroup } func (ps *impl) Publish(block blocks.Block) { ps.wrapped.Pub(block, block.Cid().KeyString()) } +// Not safe to call more than once. func (ps *impl) Shutdown() { + // Interrupt in-progress subscriptions. + close(ps.cancel) + // Wait for them to finish. + ps.wg.Wait() + // shutdown the pubsub. ps.wrapped.Shutdown() } @@ -44,12 +60,34 @@ func (ps *impl) Subscribe(ctx context.Context, keys ...*cid.Cid) <-chan blocks.B close(blocksCh) return blocksCh } + + // prevent shutdown + ps.wg.Add(1) + + // check if shutdown *after* preventing shutdowns. + select { + case <-ps.cancel: + // abort, allow shutdown to continue. + ps.wg.Done() + close(blocksCh) + return blocksCh + default: + } + ps.wrapped.AddSubOnceEach(valuesCh, toStrings(keys)...) go func() { - defer close(blocksCh) - defer ps.wrapped.Unsub(valuesCh) // with a len(keys) buffer, this is an optimization + defer func() { + ps.wrapped.Unsub(valuesCh) + close(blocksCh) + + // Unblock shutdown. + ps.wg.Done() + }() + for { select { + case <-ps.cancel: + return case <-ctx.Done(): return case val, ok := <-valuesCh: @@ -61,6 +99,8 @@ func (ps *impl) Subscribe(ctx context.Context, keys ...*cid.Cid) <-chan blocks.B return } select { + case <-ps.cancel: + return case <-ctx.Done(): return case blocksCh <- block: // continue From 2baa3312d195089c74eac1531f4126d2d67494b4 Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Fri, 9 Feb 2018 17:33:57 -0800 Subject: [PATCH 5/8] bitswap: test canceling subscription context after shutting down License: MIT Signed-off-by: Steven Allen --- .../notifications/notifications_test.go | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/exchange/bitswap/notifications/notifications_test.go b/exchange/bitswap/notifications/notifications_test.go index 0377c307d6a..a70a0755a00 100644 --- a/exchange/bitswap/notifications/notifications_test.go +++ b/exchange/bitswap/notifications/notifications_test.go @@ -100,6 +100,25 @@ func TestDuplicateSubscribe(t *testing.T) { assertBlocksEqual(t, e1, r2) } +func TestShutdownBeforeUnsubscribe(t *testing.T) { + e1 := blocks.NewBlock([]byte("1")) + + n := New() + ctx, cancel := context.WithCancel(context.Background()) + ch := n.Subscribe(ctx, e1.Cid()) // no keys provided + n.Shutdown() + cancel() + + select { + case _, ok := <-ch: + if ok { + t.Fatal("channel should have been closed") + } + default: + t.Fatal("channel should have been closed") + } +} + func TestSubscribeIsANoopWhenCalledWithNoKeys(t *testing.T) { n := New() defer n.Shutdown() From 1a37c0a870357a71c0b21d8f8e8fe2d28d0aefa8 Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Sun, 11 Feb 2018 12:51:50 -0800 Subject: [PATCH 6/8] avoid publishing if notification system has been shut down (will deadlock) License: MIT Signed-off-by: Steven Allen --- exchange/bitswap/notifications/notifications.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/exchange/bitswap/notifications/notifications.go b/exchange/bitswap/notifications/notifications.go index defea700aee..9a6f10b5256 100644 --- a/exchange/bitswap/notifications/notifications.go +++ b/exchange/bitswap/notifications/notifications.go @@ -36,6 +36,16 @@ type impl struct { } func (ps *impl) Publish(block blocks.Block) { + ps.wg.Add(1) + defer ps.wg.Done() + + select { + case <-ps.cancel: + // Already shutdown, bail. + return + default: + } + ps.wrapped.Pub(block, block.Cid().KeyString()) } From d4d30f4e8a0d08a95cbbe635d05a4318be095793 Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Mon, 12 Feb 2018 21:02:19 -0800 Subject: [PATCH 7/8] bitswap: actually *update* wantlist entries in outbound wantlist messages Before, we weren't using a pointer so we were throwing away the update. License: MIT Signed-off-by: Steven Allen --- exchange/bitswap/message/message.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/exchange/bitswap/message/message.go b/exchange/bitswap/message/message.go index 7ede57f8724..9a166c942e5 100644 --- a/exchange/bitswap/message/message.go +++ b/exchange/bitswap/message/message.go @@ -50,7 +50,7 @@ type Exportable interface { type impl struct { full bool - wantlist map[string]Entry + wantlist map[string]*Entry blocks map[string]blocks.Block } @@ -61,7 +61,7 @@ func New(full bool) BitSwapMessage { func newMsg(full bool) *impl { return &impl{ blocks: make(map[string]blocks.Block), - wantlist: make(map[string]Entry), + wantlist: make(map[string]*Entry), full: full, } } @@ -122,7 +122,7 @@ func (m *impl) Empty() bool { func (m *impl) Wantlist() []Entry { out := make([]Entry, 0, len(m.wantlist)) for _, e := range m.wantlist { - out = append(out, e) + out = append(out, *e) } return out } @@ -151,7 +151,7 @@ func (m *impl) addEntry(c *cid.Cid, priority int, cancel bool) { e.Priority = priority e.Cancel = cancel } else { - m.wantlist[k] = Entry{ + m.wantlist[k] = &Entry{ Entry: &wantlist.Entry{ Cid: c, Priority: priority, From 0dd0f252710eca9c6de191b39fc2932eca2b9abf Mon Sep 17 00:00:00 2001 From: Jeromy Date: Mon, 12 Feb 2018 23:40:15 -0800 Subject: [PATCH 8/8] bitswap virtual test net code should send messages in order License: MIT Signed-off-by: Jeromy --- exchange/bitswap/testnet/virtual.go | 61 ++++++++++++++++++++++++++--- 1 file changed, 55 insertions(+), 6 deletions(-) diff --git a/exchange/bitswap/testnet/virtual.go b/exchange/bitswap/testnet/virtual.go index c5ba6e0ae3c..0524d17c53c 100644 --- a/exchange/bitswap/testnet/virtual.go +++ b/exchange/bitswap/testnet/virtual.go @@ -4,6 +4,7 @@ import ( "context" "errors" "sync" + "time" bsmsg "github.com/ipfs/go-ipfs/exchange/bitswap/message" bsnet "github.com/ipfs/go-ipfs/exchange/bitswap/network" @@ -22,7 +23,7 @@ var log = logging.Logger("bstestnet") func VirtualNetwork(rs mockrouting.Server, d delay.D) Network { return &network{ - clients: make(map[peer.ID]bsnet.Receiver), + clients: make(map[peer.ID]*receiverQueue), delay: d, routingserver: rs, conns: make(map[string]struct{}), @@ -31,12 +32,28 @@ func VirtualNetwork(rs mockrouting.Server, d delay.D) Network { type network struct { mu sync.Mutex - clients map[peer.ID]bsnet.Receiver + clients map[peer.ID]*receiverQueue routingserver mockrouting.Server delay delay.D conns map[string]struct{} } +type message struct { + from peer.ID + msg bsmsg.BitSwapMessage + shouldSend time.Time +} + +// receiverQueue queues up a set of messages to be sent, and sends them *in +// order* with their delays respected as much as sending them in order allows +// for +type receiverQueue struct { + receiver bsnet.Receiver + queue []*message + active bool + lk sync.Mutex +} + func (n *network) Adapter(p testutil.Identity) bsnet.BitSwapNetwork { n.mu.Lock() defer n.mu.Unlock() @@ -46,7 +63,7 @@ func (n *network) Adapter(p testutil.Identity) bsnet.BitSwapNetwork { network: n, routing: n.routingserver.Client(p), } - n.clients[p.ID()] = client + n.clients[p.ID()] = &receiverQueue{receiver: client} return client } @@ -64,7 +81,7 @@ func (n *network) SendMessage( ctx context.Context, from peer.ID, to peer.ID, - message bsmsg.BitSwapMessage) error { + mes bsmsg.BitSwapMessage) error { n.mu.Lock() defer n.mu.Unlock() @@ -77,7 +94,12 @@ func (n *network) SendMessage( // nb: terminate the context since the context wouldn't actually be passed // over the network in a real scenario - go n.deliver(receiver, from, message) + msg := &message{ + from: from, + msg: mes, + shouldSend: time.Now().Add(n.delay.Get()), + } + receiver.enqueue(msg) return nil } @@ -191,11 +213,38 @@ func (nc *networkClient) ConnectTo(_ context.Context, p peer.ID) error { // TODO: add handling for disconnects - otherClient.PeerConnected(nc.local) + otherClient.receiver.PeerConnected(nc.local) nc.Receiver.PeerConnected(p) return nil } +func (rq *receiverQueue) enqueue(m *message) { + rq.lk.Lock() + defer rq.lk.Unlock() + rq.queue = append(rq.queue, m) + if !rq.active { + rq.active = true + go rq.process() + } +} + +func (rq *receiverQueue) process() { + for { + rq.lk.Lock() + if len(rq.queue) == 0 { + rq.active = false + rq.lk.Unlock() + return + } + m := rq.queue[0] + rq.queue = rq.queue[1:] + rq.lk.Unlock() + + time.Sleep(time.Until(m.shouldSend)) + rq.receiver.ReceiveMessage(context.TODO(), m.from, m.msg) + } +} + func tagForPeers(a, b peer.ID) string { if a < b { return string(a + b)