Skip to content

Commit

Permalink
net/mock: mimic Swarm's event and notification behavior in MockNet (#…
Browse files Browse the repository at this point in the history
…2287)

* net/mock: mimic Swarm's event and notificaion behavior in MockNet

* go fmt
  • Loading branch information
Wondertan authored May 16, 2023
1 parent 301a197 commit 8719fc4
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 26 deletions.
12 changes: 2 additions & 10 deletions p2p/net/mock/mock_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,20 +86,12 @@ func (c *conn) Close() error {
return nil
}

func (c *conn) teardown() error {
func (c *conn) teardown() {
for _, s := range c.allStreams() {
s.Reset()
}
c.net.removeConn(c)

go func() {
c.notifLk.Lock()
defer c.notifLk.Unlock()
c.net.notifyAll(func(n network.Notifiee) {
n.Disconnected(c.net, c)
})
}()
return nil
c.net.removeConn(c)
}

func (c *conn) addStream(s *stream) {
Expand Down
5 changes: 4 additions & 1 deletion p2p/net/mock/mock_net.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
bhost "github.com/libp2p/go-libp2p/p2p/host/basic"
"github.com/libp2p/go-libp2p/p2p/host/eventbus"
"github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoremem"

ma "github.com/multiformats/go-multiaddr"
Expand Down Expand Up @@ -108,14 +109,16 @@ func (mn *mocknet) AddPeer(k ic.PrivKey, a ma.Multiaddr) (host.Host, error) {
}

func (mn *mocknet) AddPeerWithPeerstore(p peer.ID, ps peerstore.Peerstore) (host.Host, error) {
n, err := newPeernet(mn, p, ps)
bus := eventbus.NewBus()
n, err := newPeernet(mn, p, ps, bus)
if err != nil {
return nil, err
}

opts := &bhost.HostOpts{
NegotiationTimeout: -1,
DisableSignedPeerRecord: true,
EventBus: bus,
}

h, err := bhost.NewHost(n, opts)
Expand Down
50 changes: 35 additions & 15 deletions p2p/net/mock/mock_peernet.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,20 @@ import (
"math/rand"
"sync"

"github.com/libp2p/go-libp2p/core/event"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"

ma "github.com/multiformats/go-multiaddr"
)

// peernet implements network.Network
type peernet struct {
mocknet *mocknet // parent

peer peer.ID
ps peerstore.Peerstore
peer peer.ID
ps peerstore.Peerstore
emitter event.Emitter

// conns are actual live connections between peers.
// many conns could run over each link.
Expand All @@ -37,11 +38,17 @@ type peernet struct {
}

// newPeernet constructs a new peernet
func newPeernet(m *mocknet, p peer.ID, ps peerstore.Peerstore) (*peernet, error) {
func newPeernet(m *mocknet, p peer.ID, ps peerstore.Peerstore, bus event.Bus) (*peernet, error) {
emitter, err := bus.Emitter(&event.EvtPeerConnectednessChanged{})
if err != nil {
return nil, err
}

n := &peernet{
mocknet: m,
peer: p,
ps: ps,
emitter: emitter,

connsByPeer: map[peer.ID]map[*conn]struct{}{},
connsByLink: map[*link]map[*conn]struct{}{},
Expand All @@ -57,6 +64,7 @@ func (pn *peernet) Close() error {
for _, c := range pn.allConns() {
c.Close()
}
pn.emitter.Close()
return pn.ps.Close()
}

Expand Down Expand Up @@ -192,13 +200,16 @@ func (pn *peernet) addConn(c *conn) {
pn.notifyAll(func(n network.Notifiee) {
n.Connected(pn, c)
})

pn.emitter.Emit(event.EvtPeerConnectednessChanged{
Peer: c.remote,
Connectedness: network.Connected,
})
}

// removeConn removes a given conn
func (pn *peernet) removeConn(c *conn) {
pn.Lock()
defer pn.Unlock()

cs, found := pn.connsByLink[c.link]
if !found || len(cs) < 1 {
panic(fmt.Sprintf("attempting to remove a conn that doesnt exist %p", c.link))
Expand All @@ -210,6 +221,22 @@ func (pn *peernet) removeConn(c *conn) {
panic(fmt.Sprintf("attempting to remove a conn that doesnt exist %v", c.remote))
}
delete(cs, c)
pn.Unlock()

// notify asynchronously to mimic Swarm
// FIXME: IIRC, we wanted to make notify for Close synchronous
go func() {
c.notifLk.Lock()
defer c.notifLk.Unlock()
pn.notifyAll(func(n network.Notifiee) {
n.Disconnected(c.net, c)
})
}()

c.net.emitter.Emit(event.EvtPeerConnectednessChanged{
Peer: c.remote,
Connectedness: network.NotConnected,
})
}

// LocalPeer the network's LocalPeer
Expand Down Expand Up @@ -355,18 +382,11 @@ func (pn *peernet) StopNotify(f network.Notifiee) {
// notifyAll runs the notification function on all Notifiees
func (pn *peernet) notifyAll(notification func(f network.Notifiee)) {
pn.notifmu.Lock()
var wg sync.WaitGroup
// notify synchronously to mimic Swarm
for n := range pn.notifs {
// make sure we dont block
// and they dont block each other.
wg.Add(1)
go func(n network.Notifiee) {
defer wg.Done()
notification(n)
}(n)
notification(n)
}
pn.notifmu.Unlock()
wg.Wait()
}

func (pn *peernet) ResourceManager() network.ResourceManager {
Expand Down
81 changes: 81 additions & 0 deletions p2p/net/mock/mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"time"

"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/event"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
Expand Down Expand Up @@ -600,3 +601,83 @@ func TestStreamsWithLatency(t *testing.T) {
t.Fatalf("Expected write to take ~%s (+/- %s), but took %s", latency.String(), tolerance.String(), delta.String())
}
}

func TestEventBus(t *testing.T) {
const peers = 2

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

mn, err := FullMeshLinked(peers)
if err != nil {
t.Fatal(err)
}
defer mn.Close()

sub0, err := mn.Hosts()[0].EventBus().Subscribe(new(event.EvtPeerConnectednessChanged))
if err != nil {
t.Fatal(err)
}
defer sub0.Close()
sub1, err := mn.Hosts()[1].EventBus().Subscribe(new(event.EvtPeerConnectednessChanged))
if err != nil {
t.Fatal(err)
}
defer sub1.Close()

id0, id1 := mn.Hosts()[0].ID(), mn.Hosts()[1].ID()

_, err = mn.ConnectPeers(id0, id1)
if err != nil {
t.Fatal(err)
}
for range make([]int, peers) {
select {
case evt := <-sub0.Out():
evnt := evt.(event.EvtPeerConnectednessChanged)
if evnt.Peer != id1 {
t.Fatal("wrong remote peer")
}
if evnt.Connectedness != network.Connected {
t.Fatal("wrong connectedness type")
}
case evt := <-sub1.Out():
evnt := evt.(event.EvtPeerConnectednessChanged)
if evnt.Peer != id0 {
t.Fatal("wrong remote peer")
}
if evnt.Connectedness != network.Connected {
t.Fatal("wrong connectedness type")
}
case <-ctx.Done():
t.Fatal("didn't get connectedness events in time")
}
}

err = mn.DisconnectPeers(id0, id1)
if err != nil {
t.Fatal(err)
}
for range make([]int, peers) {
select {
case evt := <-sub0.Out():
evnt := evt.(event.EvtPeerConnectednessChanged)
if evnt.Peer != id1 {
t.Fatal("wrong remote peer")
}
if evnt.Connectedness != network.NotConnected {
t.Fatal("wrong connectedness type")
}
case evt := <-sub1.Out():
evnt := evt.(event.EvtPeerConnectednessChanged)
if evnt.Peer != id0 {
t.Fatal("wrong remote peer")
}
if evnt.Connectedness != network.NotConnected {
t.Fatal("wrong connectedness type")
}
case <-ctx.Done():
t.Fatal("didn't get connectedness events in time")
}
}
}

0 comments on commit 8719fc4

Please sign in to comment.