From 5f75aa2068f276cc5140879cda7fed1f0b131afe Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Mon, 9 Mar 2020 17:04:37 -0700 Subject: [PATCH] feat(mock): reliable notifications * Export StreamComplement/ConnComplement convenience functions. * Make the TestNotifications test pass reliably, even when we have a bunch of streams (identify, etc.). * Make the mock net order disconnect events after connect events. * Make closing one side of a connection actually close both sides. * Make it possible to extract a mock stream's complement. * Fire remote events at the same time as the local events. --- p2p/net/mock/complement.go | 17 +++ p2p/net/mock/mock_conn.go | 36 ++++--- p2p/net/mock/mock_link.go | 18 +--- p2p/net/mock/mock_notif_test.go | 180 ++++++++++++++++---------------- p2p/net/mock/mock_peernet.go | 19 ++-- p2p/net/mock/mock_stream.go | 24 +++-- 6 files changed, 161 insertions(+), 133 deletions(-) create mode 100644 p2p/net/mock/complement.go diff --git a/p2p/net/mock/complement.go b/p2p/net/mock/complement.go new file mode 100644 index 0000000000..4a8b251da0 --- /dev/null +++ b/p2p/net/mock/complement.go @@ -0,0 +1,17 @@ +package mocknet + +import ( + "github.com/libp2p/go-libp2p-core/network" +) + +// StreamComplement returns the other end of the given stream. This function +// panics when passed a non-mocknet stream. +func StreamComplement(s network.Stream) network.Stream { + return s.(*stream).rstream +} + +// ConnComplement returns the other end of the given connection. This function +// panics when passed a non-mocknet connection. +func ConnComplement(c network.Conn) network.Conn { + return c.(*conn).rconn +} diff --git a/p2p/net/mock/mock_conn.go b/p2p/net/mock/mock_conn.go index 17f56528ca..882522b2ea 100644 --- a/p2p/net/mock/mock_conn.go +++ b/p2p/net/mock/mock_conn.go @@ -15,6 +15,8 @@ import ( // live connection between two peers. // it goes over a particular link. type conn struct { + notifLk sync.Mutex + local peer.ID remote peer.ID @@ -34,8 +36,8 @@ type conn struct { sync.RWMutex } -func newConn(ln, rn *peernet, l *link, dir network.Direction) *conn { - c := &conn{net: ln, link: l} +func newConn(p process.Process, ln, rn *peernet, l *link, dir network.Direction) *conn { + c := &conn{net: ln, link: l, proc: p} c.local = ln.peer c.remote = rn.peer c.stat = network.Stat{Direction: dir} @@ -46,7 +48,7 @@ func newConn(ln, rn *peernet, l *link, dir network.Direction) *conn { c.localPrivKey = ln.ps.PrivKey(ln.peer) c.remotePubKey = rn.ps.PubKey(rn.peer) - c.proc = process.WithTeardown(c.teardown) + c.proc.AddChild(process.WithTeardown(c.teardown)) return c } @@ -59,6 +61,9 @@ func (c *conn) teardown() error { s.Reset() } c.net.removeConn(c) + + c.notifLk.Lock() + defer c.notifLk.Unlock() c.net.notifyAll(func(n network.Notifiee) { n.Disconnected(c.net, c) }) @@ -69,18 +74,29 @@ func (c *conn) addStream(s *stream) { c.Lock() s.conn = c c.streams.PushBack(s) + s.notifLk.Lock() + defer s.notifLk.Unlock() c.Unlock() + c.net.notifyAll(func(n network.Notifiee) { + n.OpenedStream(c.net, s) + }) } func (c *conn) removeStream(s *stream) { c.Lock() - defer c.Unlock() for e := c.streams.Front(); e != nil; e = e.Next() { if s == e.Value { c.streams.Remove(e) - return + break } } + c.Unlock() + + s.notifLk.Lock() + defer s.notifLk.Unlock() + s.conn.net.notifyAll(func(n network.Notifiee) { + n.ClosedStream(s.conn.net, s) + }) } func (c *conn) allStreams() []network.Stream { @@ -98,18 +114,12 @@ func (c *conn) allStreams() []network.Stream { func (c *conn) remoteOpenedStream(s *stream) { c.addStream(s) c.net.handleNewStream(s) - c.net.notifyAll(func(n network.Notifiee) { - n.OpenedStream(c.net, s) - }) } func (c *conn) openStream() *stream { - sl, sr := c.link.newStreamPair() + sl, sr := newStreamPair() + go c.rconn.remoteOpenedStream(sr) c.addStream(sl) - c.net.notifyAll(func(n network.Notifiee) { - n.OpenedStream(c.net, sl) - }) - c.rconn.remoteOpenedStream(sr) return sl } diff --git a/p2p/net/mock/mock_link.go b/p2p/net/mock/mock_link.go index bd9481fb2d..294b2bc627 100644 --- a/p2p/net/mock/mock_link.go +++ b/p2p/net/mock/mock_link.go @@ -1,13 +1,13 @@ package mocknet import ( - // "fmt" - "io" "sync" "time" "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" + + process "github.com/jbenet/goprocess" ) // link implements mocknet.Link @@ -33,8 +33,9 @@ func (l *link) newConnPair(dialer *peernet) (*conn, *conn) { l.RLock() defer l.RUnlock() - c1 := newConn(l.nets[0], l.nets[1], l, network.DirOutbound) - c2 := newConn(l.nets[1], l.nets[0], l, network.DirInbound) + parent := process.WithTeardown(func() error { return nil }) + c1 := newConn(parent, l.nets[0], l.nets[1], l, network.DirOutbound) + c2 := newConn(parent, l.nets[1], l.nets[0], l, network.DirInbound) c1.rconn = c2 c2.rconn = c1 @@ -44,15 +45,6 @@ func (l *link) newConnPair(dialer *peernet) (*conn, *conn) { return c2, c1 } -func (l *link) newStreamPair() (*stream, *stream) { - ra, wb := io.Pipe() - rb, wa := io.Pipe() - - sa := NewStream(wa, ra, network.DirOutbound) - sb := NewStream(wb, rb, network.DirInbound) - return sa, sb -} - func (l *link) Networks() []network.Network { l.RLock() defer l.RUnlock() diff --git a/p2p/net/mock/mock_notif_test.go b/p2p/net/mock/mock_notif_test.go index 4caecbedfa..f443d4f6f0 100644 --- a/p2p/net/mock/mock_notif_test.go +++ b/p2p/net/mock/mock_notif_test.go @@ -2,6 +2,7 @@ package mocknet import ( "context" + "sync" "testing" "time" @@ -13,8 +14,10 @@ import ( func TestNotifications(t *testing.T) { const swarmSize = 5 + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() - mn, err := FullMeshLinked(context.Background(), swarmSize) + mn, err := FullMeshLinked(ctx, swarmSize) if err != nil { t.Fatal(err) } @@ -23,11 +26,13 @@ func TestNotifications(t *testing.T) { // signup notifs nets := mn.Nets() - notifiees := make([]*netNotifiee, len(nets)) - for i, pn := range nets { - n := newNetNotifiee(swarmSize) + notifiees := make(map[peer.ID]*netNotifiee, len(nets)) + for _, pn := range nets { + defer pn.Close() + + n := newNetNotifiee(t, swarmSize) pn.Notify(n) - notifiees[i] = n + notifiees[pn.LocalPeer()] = n } // connect all but self @@ -36,16 +41,16 @@ func TestNotifications(t *testing.T) { } // test everyone got the correct connection opened calls - for i, s := range nets { - n := notifiees[i] + for _, s1 := range nets { + n := notifiees[s1.LocalPeer()] notifs := make(map[peer.ID][]network.Conn) - for j, s2 := range nets { - if i == j { + for _, s2 := range nets { + if s2 == s1 { continue } // this feels a little sketchy, but its probably okay - for len(s.ConnsToPeer(s2.LocalPeer())) != len(notifs[s2.LocalPeer()]) { + for len(s1.ConnsToPeer(s2.LocalPeer())) != len(notifs[s2.LocalPeer()]) { select { case c := <-n.connected: nfp := notifs[c.RemotePeer()] @@ -57,7 +62,7 @@ func TestNotifications(t *testing.T) { } for p, cons := range notifs { - expect := s.ConnsToPeer(p) + expect := s1.ConnsToPeer(p) if len(expect) != len(cons) { t.Fatal("got different number of connections") } @@ -78,100 +83,49 @@ func TestNotifications(t *testing.T) { } } - complement := func(c network.Conn) (network.Network, *netNotifiee, *conn) { - for i, s := range nets { - for _, c2 := range s.Conns() { - if c2.(*conn).rconn == c { - return s, notifiees[i], c2.(*conn) - } - } - } - t.Fatal("complementary conn not found", c) - return nil, nil, nil - } - - testOCStream := func(n *netNotifiee, s network.Stream) { - var s2 network.Stream - select { - case s2 = <-n.openedStream: - t.Log("got notif for opened stream") - case <-time.After(timeout): - t.Fatal("timeout") - } - if s != nil && s != s2 { - t.Fatalf("got incorrect stream %p %p", s, s2) - } - - select { - case s2 = <-n.closedStream: - t.Log("got notif for closed stream") - case <-time.After(timeout): - t.Fatal("timeout") - } - if s != nil && s != s2 { - t.Fatalf("got incorrect stream %p %p", s, s2) - } - } - for _, s := range nets { s.SetStreamHandler(func(s network.Stream) { helpers.FullClose(s) }) } - // there's one stream per conn that we need to drain.... - // unsure where these are coming from - for i := range nets { - n := notifiees[i] - for j := 0; j < len(nets)-1; j++ { - testOCStream(n, nil) - } - } - - streams := make(chan network.Stream) for _, s := range nets { s.SetStreamHandler(func(s network.Stream) { - streams <- s helpers.FullClose(s) }) } - // open a streams in each conn - for i, s := range nets { + // Make sure we've received at last one stream per conn. + for _, s := range nets { conns := s.Conns() for _, c := range conns { - _, n2, c2 := complement(c) st1, err := c.NewStream() if err != nil { t.Error(err) - } else { - t.Logf("%s %s <--%p--> %s %s", c.LocalPeer(), c.LocalMultiaddr(), st1, c.RemotePeer(), c.RemoteMultiaddr()) - // st1.Write([]byte("hello")) - go helpers.FullClose(st1) - st2 := <-streams - t.Logf("%s %s <--%p--> %s %s", c2.LocalPeer(), c2.LocalMultiaddr(), st2, c2.RemotePeer(), c2.RemoteMultiaddr()) - testOCStream(notifiees[i], st1) - testOCStream(n2, st2) + continue } + t.Logf("%s %s <--%p--> %s %s", c.LocalPeer(), c.LocalMultiaddr(), st1, c.RemotePeer(), c.RemoteMultiaddr()) + helpers.FullClose(st1) } } // close conns - for i, s := range nets { - n := notifiees[i] - for _, c := range s.Conns() { - _, n2, c2 := complement(c) - c.(*conn).Close() - c2.Close() + for _, s1 := range nets { + n1 := notifiees[s1.LocalPeer()] + for _, c1 := range s1.Conns() { + c2 := ConnComplement(c1) + + n2 := notifiees[c2.LocalPeer()] + c1.Close() var c3, c4 network.Conn select { - case c3 = <-n.disconnected: + case c3 = <-n1.disconnected: case <-time.After(timeout): t.Fatal("timeout") } - if c != c3 { - t.Fatal("got incorrect conn", c, c3) + if c1 != c3 { + t.Fatal("got incorrect conn", c1, c3) } select { @@ -180,30 +134,53 @@ func TestNotifications(t *testing.T) { t.Fatal("timeout") } if c2 != c4 { - t.Fatal("got incorrect conn", c, c2) + t.Fatal("got incorrect conn", c1, c2) } } } + + for _, n1 := range notifiees { + n1.streamState.Lock() + for str1, ch1 := range n1.streamState.m { + <-ch1 + str2 := StreamComplement(str1) + n2 := notifiees[str1.Conn().RemotePeer()] + + n2.streamState.Lock() + ch2 := n2.streamState.m[str2] + n2.streamState.Unlock() + + <-ch2 + } + + n1.streamState.Unlock() + } } type netNotifiee struct { + t *testing.T + listen chan ma.Multiaddr listenClose chan ma.Multiaddr connected chan network.Conn disconnected chan network.Conn - openedStream chan network.Stream - closedStream chan network.Stream + + streamState struct { + sync.Mutex + m map[network.Stream]chan struct{} + } } -func newNetNotifiee(buffer int) *netNotifiee { - return &netNotifiee{ - listen: make(chan ma.Multiaddr, buffer), - listenClose: make(chan ma.Multiaddr, buffer), - connected: make(chan network.Conn, buffer), - disconnected: make(chan network.Conn, buffer), - openedStream: make(chan network.Stream, buffer), - closedStream: make(chan network.Stream, buffer), +func newNetNotifiee(t *testing.T, buffer int) *netNotifiee { + nn := &netNotifiee{ + t: t, + listen: make(chan ma.Multiaddr, 1), + listenClose: make(chan ma.Multiaddr, 1), + connected: make(chan network.Conn, buffer*2), + disconnected: make(chan network.Conn, buffer*2), } + nn.streamState.m = make(map[network.Stream]chan struct{}) + return nn } func (nn *netNotifiee) Listen(n network.Network, a ma.Multiaddr) { @@ -218,9 +195,28 @@ func (nn *netNotifiee) Connected(n network.Network, v network.Conn) { func (nn *netNotifiee) Disconnected(n network.Network, v network.Conn) { nn.disconnected <- v } -func (nn *netNotifiee) OpenedStream(n network.Network, v network.Stream) { - nn.openedStream <- v +func (nn *netNotifiee) OpenedStream(n network.Network, s network.Stream) { + nn.streamState.Lock() + defer nn.streamState.Unlock() + _, ok := nn.streamState.m[s] + if ok { + nn.t.Error("duplicate stream open") + return + } + nn.streamState.m[s] = make(chan struct{}) } -func (nn *netNotifiee) ClosedStream(n network.Network, v network.Stream) { - nn.closedStream <- v +func (nn *netNotifiee) ClosedStream(n network.Network, s network.Stream) { + nn.streamState.Lock() + defer nn.streamState.Unlock() + ch, ok := nn.streamState.m[s] + if !ok { + nn.t.Error("saw close event but no open event") + return + } + select { + case <-ch: + nn.t.Error("duplicate close event") + default: + close(ch) + } } diff --git a/p2p/net/mock/mock_peernet.go b/p2p/net/mock/mock_peernet.go index bcf0644656..ff549873e0 100644 --- a/p2p/net/mock/mock_peernet.go +++ b/p2p/net/mock/mock_peernet.go @@ -160,11 +160,8 @@ func (pn *peernet) connect(p peer.ID) (*conn, error) { func (pn *peernet) openConn(r peer.ID, l *link) *conn { lc, rc := l.newConnPair(pn) log.Debugf("%s opening connection to %s", pn.LocalPeer(), lc.RemotePeer()) + go rc.net.remoteOpenedConn(rc) pn.addConn(lc) - pn.notifyAll(func(n network.Notifiee) { - n.Connected(pn, lc) - }) - rc.net.remoteOpenedConn(rc) return lc } @@ -172,16 +169,12 @@ func (pn *peernet) remoteOpenedConn(c *conn) { log.Debugf("%s accepting connection from %s", pn.LocalPeer(), c.RemotePeer()) pn.addConn(c) pn.handleNewConn(c) - pn.notifyAll(func(n network.Notifiee) { - n.Connected(pn, c) - }) } // addConn constructs and adds a connection // to given remote peer over given link func (pn *peernet) addConn(c *conn) { pn.Lock() - defer pn.Unlock() _, found := pn.connsByPeer[c.RemotePeer()] if !found { @@ -194,6 +187,14 @@ func (pn *peernet) addConn(c *conn) { pn.connsByLink[c.link] = map[*conn]struct{}{} } pn.connsByLink[c.link][c] = struct{}{} + + c.notifLk.Lock() + defer c.notifLk.Unlock() + pn.Unlock() + + pn.notifyAll(func(n network.Notifiee) { + n.Connected(pn, c) + }) } // removeConn removes a given conn @@ -380,6 +381,6 @@ func (pn *peernet) notifyAll(notification func(f network.Notifiee)) { notification(n) }(n) } - wg.Wait() pn.notifmu.Unlock() + wg.Wait() } diff --git a/p2p/net/mock/mock_stream.go b/p2p/net/mock/mock_stream.go index d71e53c115..ecb32ddbbf 100644 --- a/p2p/net/mock/mock_stream.go +++ b/p2p/net/mock/mock_stream.go @@ -5,6 +5,7 @@ import ( "errors" "io" "net" + "sync" "sync/atomic" "time" @@ -15,9 +16,13 @@ import ( // stream implements network.Stream type stream struct { + notifLk sync.Mutex + + rstream *stream + conn *conn + write *io.PipeWriter read *io.PipeReader - conn *conn toDeliver chan *transportObject reset chan struct{} @@ -37,7 +42,18 @@ type transportObject struct { arrivalTime time.Time } -func NewStream(w *io.PipeWriter, r *io.PipeReader, dir network.Direction) *stream { +func newStreamPair() (*stream, *stream) { + ra, wb := io.Pipe() + rb, wa := io.Pipe() + + sa := newStream(wa, ra, network.DirOutbound) + sb := newStream(wb, rb, network.DirInbound) + sa.rstream = sb + sb.rstream = sa + return sa, sb +} + +func newStream(w *io.PipeWriter, r *io.PipeReader, dir network.Direction) *stream { s := &stream{ read: r, write: w, @@ -117,10 +133,6 @@ func (s *stream) teardown() { // Mark as closed. close(s.closed) - - s.conn.net.notifyAll(func(n network.Notifiee) { - n.ClosedStream(s.conn.net, s) - }) } func (s *stream) Conn() network.Conn {