Skip to content

Commit

Permalink
Merge pull request #836 from libp2p/feat/reliable-mock-notif
Browse files Browse the repository at this point in the history
feat(mock): reliable notifications
  • Loading branch information
Stebalien authored Mar 10, 2020
2 parents 4bbf43e + 5f75aa2 commit 9ad477b
Show file tree
Hide file tree
Showing 6 changed files with 161 additions and 133 deletions.
17 changes: 17 additions & 0 deletions p2p/net/mock/complement.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package mocknet

import (
"github.com/libp2p/go-libp2p-core/network"
)

// StreamComplement returns the other end of the given stream. This function
// panics when passed a non-mocknet stream.
func StreamComplement(s network.Stream) network.Stream {
return s.(*stream).rstream
}

// ConnComplement returns the other end of the given connection. This function
// panics when passed a non-mocknet connection.
func ConnComplement(c network.Conn) network.Conn {
return c.(*conn).rconn
}
36 changes: 23 additions & 13 deletions p2p/net/mock/mock_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
// live connection between two peers.
// it goes over a particular link.
type conn struct {
notifLk sync.Mutex

local peer.ID
remote peer.ID

Expand All @@ -34,8 +36,8 @@ type conn struct {
sync.RWMutex
}

func newConn(ln, rn *peernet, l *link, dir network.Direction) *conn {
c := &conn{net: ln, link: l}
func newConn(p process.Process, ln, rn *peernet, l *link, dir network.Direction) *conn {
c := &conn{net: ln, link: l, proc: p}
c.local = ln.peer
c.remote = rn.peer
c.stat = network.Stat{Direction: dir}
Expand All @@ -46,7 +48,7 @@ func newConn(ln, rn *peernet, l *link, dir network.Direction) *conn {
c.localPrivKey = ln.ps.PrivKey(ln.peer)
c.remotePubKey = rn.ps.PubKey(rn.peer)

c.proc = process.WithTeardown(c.teardown)
c.proc.AddChild(process.WithTeardown(c.teardown))
return c
}

Expand All @@ -59,6 +61,9 @@ func (c *conn) teardown() error {
s.Reset()
}
c.net.removeConn(c)

c.notifLk.Lock()
defer c.notifLk.Unlock()
c.net.notifyAll(func(n network.Notifiee) {
n.Disconnected(c.net, c)
})
Expand All @@ -69,18 +74,29 @@ func (c *conn) addStream(s *stream) {
c.Lock()
s.conn = c
c.streams.PushBack(s)
s.notifLk.Lock()
defer s.notifLk.Unlock()
c.Unlock()
c.net.notifyAll(func(n network.Notifiee) {
n.OpenedStream(c.net, s)
})
}

func (c *conn) removeStream(s *stream) {
c.Lock()
defer c.Unlock()
for e := c.streams.Front(); e != nil; e = e.Next() {
if s == e.Value {
c.streams.Remove(e)
return
break
}
}
c.Unlock()

s.notifLk.Lock()
defer s.notifLk.Unlock()
s.conn.net.notifyAll(func(n network.Notifiee) {
n.ClosedStream(s.conn.net, s)
})
}

func (c *conn) allStreams() []network.Stream {
Expand All @@ -98,18 +114,12 @@ func (c *conn) allStreams() []network.Stream {
func (c *conn) remoteOpenedStream(s *stream) {
c.addStream(s)
c.net.handleNewStream(s)
c.net.notifyAll(func(n network.Notifiee) {
n.OpenedStream(c.net, s)
})
}

func (c *conn) openStream() *stream {
sl, sr := c.link.newStreamPair()
sl, sr := newStreamPair()
go c.rconn.remoteOpenedStream(sr)
c.addStream(sl)
c.net.notifyAll(func(n network.Notifiee) {
n.OpenedStream(c.net, sl)
})
c.rconn.remoteOpenedStream(sr)
return sl
}

Expand Down
18 changes: 5 additions & 13 deletions p2p/net/mock/mock_link.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package mocknet

import (
// "fmt"
"io"
"sync"
"time"

"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"

process "github.com/jbenet/goprocess"
)

// link implements mocknet.Link
Expand All @@ -33,8 +33,9 @@ func (l *link) newConnPair(dialer *peernet) (*conn, *conn) {
l.RLock()
defer l.RUnlock()

c1 := newConn(l.nets[0], l.nets[1], l, network.DirOutbound)
c2 := newConn(l.nets[1], l.nets[0], l, network.DirInbound)
parent := process.WithTeardown(func() error { return nil })
c1 := newConn(parent, l.nets[0], l.nets[1], l, network.DirOutbound)
c2 := newConn(parent, l.nets[1], l.nets[0], l, network.DirInbound)
c1.rconn = c2
c2.rconn = c1

Expand All @@ -44,15 +45,6 @@ func (l *link) newConnPair(dialer *peernet) (*conn, *conn) {
return c2, c1
}

func (l *link) newStreamPair() (*stream, *stream) {
ra, wb := io.Pipe()
rb, wa := io.Pipe()

sa := NewStream(wa, ra, network.DirOutbound)
sb := NewStream(wb, rb, network.DirInbound)
return sa, sb
}

func (l *link) Networks() []network.Network {
l.RLock()
defer l.RUnlock()
Expand Down
Loading

0 comments on commit 9ad477b

Please sign in to comment.