diff --git a/core/core.go b/core/core.go index 2cc67eb6f17..d422a1aa80c 100644 --- a/core/core.go +++ b/core/core.go @@ -48,6 +48,7 @@ import ( "github.com/ipfs/go-ipfs/namesys" ipnsrp "github.com/ipfs/go-ipfs/namesys/republisher" "github.com/ipfs/go-ipfs/p2p" + "github.com/ipfs/go-ipfs/peering" "github.com/ipfs/go-ipfs/repo" ) @@ -83,6 +84,7 @@ type IpfsNode struct { // Online PeerHost p2phost.Host `optional:"true"` // the network host (server+client) + Peering peering.PeeringService `optional:"true"` Filters *ma.Filters `optional:"true"` Bootstrapper io.Closer `optional:"true"` // the periodic bootstrapper Routing routing.Routing `optional:"true"` // the routing system. recommend ipfs-dht diff --git a/core/node/groups.go b/core/node/groups.go index 9078f52b69f..ad51473452e 100644 --- a/core/node/groups.go +++ b/core/node/groups.go @@ -250,6 +250,8 @@ func Online(bcfg *BuildCfg, cfg *config.Config) fx.Option { fx.Provide(OnlineExchange(shouldBitswapProvide)), maybeProvide(Graphsync, cfg.Experimental.GraphsyncEnabled), fx.Provide(Namesys(ipnsCacheSize)), + fx.Provide(Peering), + PeerWith(cfg.Peering.Peers...), fx.Invoke(IpnsRepublisher(repubPeriod, recordLifetime)), diff --git a/core/node/peering.go b/core/node/peering.go new file mode 100644 index 00000000000..b5e7caadc33 --- /dev/null +++ b/core/node/peering.go @@ -0,0 +1,34 @@ +package node + +import ( + "context" + + "github.com/ipfs/go-ipfs/peering" + "github.com/libp2p/go-libp2p-core/host" + "github.com/libp2p/go-libp2p-core/peer" + "go.uber.org/fx" +) + +// Peering constructs the peering service and hooks it into fx's lifetime +// management system. +func Peering(lc fx.Lifecycle, host host.Host) *peering.PeeringService { + ps := peering.NewPeeringService(host) + lc.Append(fx.Hook{ + OnStart: func(context.Context) error { + return ps.Start() + }, + OnStop: func(context.Context) error { + return ps.Stop() + }, + }) + return ps +} + +// PeerWith configures the peering service to peer with the specified peers. +func PeerWith(peers ...peer.AddrInfo) fx.Option { + return fx.Invoke(func(ps *peering.PeeringService) { + for _, ai := range peers { + ps.AddPeer(ai) + } + }) +} diff --git a/docs/config.md b/docs/config.md index f2211954cb8..ac5e01682c1 100644 --- a/docs/config.md +++ b/docs/config.md @@ -139,6 +139,8 @@ documented in `ipfs config profile --help`. - [`Pubsub`](#pubsub) - [`Pubsub.Router`](#pubsubrouter) - [`Pubsub.DisableSigning`](#pubsubdisablesigning) + - [`Peering`](#peering) + - [`Peering.Peers`](#peeringpeers) - [`Reprovider`](#reprovider) - [`Reprovider.Interval`](#reproviderinterval) - [`Reprovider.Strategy`](#reproviderstrategy) @@ -157,6 +159,7 @@ documented in `ipfs config profile --help`. - [`Swarm.ConnMgr.HighWater`](#swarmconnmgrhighwater) - [`Swarm.ConnMgr.GracePeriod`](#swarmconnmgrgraceperiod) + ## `Addresses` Contains information about various listener addresses to be used by this node. @@ -703,6 +706,56 @@ intentionally re-using the real message's message ID. Default: `false` +### `Peering` + +Configures the peering subsystem. The peering subsystem configures go-ipfs to +connect to, remain connected to, and reconnect to a set of nodes. Nodes should +use this subsystem to create "sticky" links between frequently useful peers to +improve reliability. + +Use-cases: + +* An IPFS gateway connected to an IPFS cluster should peer to ensure that the + gateway can always fetch content from the cluster. +* A dapp may peer embedded go-ipfs nodes with a set of pinning services or + textile cafes/hubs. +* A set of friends may peer to ensure that they can always fetch each other's + content. + +When a node is added to the set of peered nodes, go-ipfs will: + +1. Protect connections to this node from the connection manager. That is, + go-ipfs will never automatically close the connection to this node and + connections to this node will not count towards the connection limit. +2. Connect to this node on startup. +3. Repeatedly try to reconnect to this node if the last connection dies or the + node goes offline. This repeated re-connect logic is governed by a randomized + exponential backoff delay ranging from ~5 seconds to ~10 minutes to avoid + repeatedly reconnect to a node that's offline. + +Peering can be asymmetric or symmetric: + +* When symmetric, the connection will be protected by both nodes and will likely + be vary stable. +* When asymmetric, only one node (the node that configured peering) will protect + the connection and attempt to re-connect to the peered node on disconnect. If + the peered node is under heavy load and/or has a low connection limit, the + connection may flap repeatedly. Be careful when asymmetrically peering to not + overload peers. + +#### `Peering.Peers` + +The set of peers with which to peer. Each entry is of the form: + +```js +{ + "ID": "QmSomePeerID", # The peers ID. + "Addrs": ["/ip4/1.2.3.4/tcp/1234"] # Known addresses for the peer. If none are specified, the DHT will be queried. +} +``` + +Additional fields may be added in the future. + ## `Reprovider` ### `Reprovider.Interval` diff --git a/go.mod b/go.mod index 86db942a976..f3bebcd9da0 100644 --- a/go.mod +++ b/go.mod @@ -32,7 +32,7 @@ require ( github.com/ipfs/go-ipfs-blockstore v0.1.4 github.com/ipfs/go-ipfs-chunker v0.0.5 github.com/ipfs/go-ipfs-cmds v0.2.9 - github.com/ipfs/go-ipfs-config v0.6.1 + github.com/ipfs/go-ipfs-config v0.7.0 github.com/ipfs/go-ipfs-ds-help v0.1.1 github.com/ipfs/go-ipfs-exchange-interface v0.0.1 github.com/ipfs/go-ipfs-exchange-offline v0.0.1 @@ -94,6 +94,7 @@ require ( github.com/opentracing/opentracing-go v1.1.0 github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.6.0 + github.com/stretchr/testify v1.5.1 github.com/syndtr/goleveldb v1.0.0 github.com/whyrusleeping/base32 v0.0.0-20170828182744-c30ac30633cc github.com/whyrusleeping/go-sysinfo v0.0.0-20190219211824-4a357d4b90b1 diff --git a/go.sum b/go.sum index f2e2219b1fe..5e6b4b75b16 100644 --- a/go.sum +++ b/go.sum @@ -301,8 +301,8 @@ github.com/ipfs/go-ipfs-chunker v0.0.5 h1:ojCf7HV/m+uS2vhUGWcogIIxiO5ubl5O57Q7Na github.com/ipfs/go-ipfs-chunker v0.0.5/go.mod h1:jhgdF8vxRHycr00k13FM8Y0E+6BoalYeobXmUyTreP8= github.com/ipfs/go-ipfs-cmds v0.2.9 h1:zQTENe9UJrtCb2bOtRoDGjtuo3rQjmuPdPnVlqoBV/M= github.com/ipfs/go-ipfs-cmds v0.2.9/go.mod h1:ZgYiWVnCk43ChwoH8hAmI1IRbuVtq3GSTHwtRB/Kqhk= -github.com/ipfs/go-ipfs-config v0.6.1 h1:d1f0fEEpUQ9R+6c0VZMNy2P+wCl4K4DO4VHJBvgWwFw= -github.com/ipfs/go-ipfs-config v0.6.1/go.mod h1:GQUxqb0NfkZmEU92PxqqqLVVFTLpoGGUlBaTyDaAqrE= +github.com/ipfs/go-ipfs-config v0.7.0 h1:cClINg8v28//KaYMwt1aSjbS8eGJjNKIEnahpT/2hYk= +github.com/ipfs/go-ipfs-config v0.7.0/go.mod h1:GQUxqb0NfkZmEU92PxqqqLVVFTLpoGGUlBaTyDaAqrE= github.com/ipfs/go-ipfs-delay v0.0.0-20181109222059-70721b86a9a8/go.mod h1:8SP1YXK1M1kXuc4KJZINY3TQQ03J2rwBG9QfXmbRPrw= github.com/ipfs/go-ipfs-delay v0.0.1 h1:r/UXYyRcddO6thwOnhiznIAiSvxMECGgtv35Xs1IeRQ= github.com/ipfs/go-ipfs-delay v0.0.1/go.mod h1:8SP1YXK1M1kXuc4KJZINY3TQQ03J2rwBG9QfXmbRPrw= diff --git a/peering/peering.go b/peering/peering.go new file mode 100644 index 00000000000..ed0b43226c0 --- /dev/null +++ b/peering/peering.go @@ -0,0 +1,290 @@ +package peering + +import ( + "context" + "errors" + "math/rand" + "sync" + "time" + + "github.com/ipfs/go-log" + "github.com/libp2p/go-libp2p-core/host" + "github.com/libp2p/go-libp2p-core/network" + "github.com/libp2p/go-libp2p-core/peer" + "github.com/multiformats/go-multiaddr" +) + +// Seed the random number generator. +// +// We don't need good randomness, but we do need randomness. +const ( + // maxBackoff is the maximum time between reconnect attempts. + maxBackoff = 10 * time.Minute + // The backoff will be cut off when we get within 10% of the actual max. + // If we go over the max, we'll adjust the delay down to a random value + // between 90-100% of the max backoff. + maxBackoffJitter = 10 // % + connmgrTag = "ipfs-peering" + // This needs to be sufficient to prevent two sides from simultaneously + // dialing. + initialDelay = 5 * time.Second +) + +var logger = log.Logger("peering") + +type state int + +const ( + stateInit state = iota + stateRunning + stateStopped +) + +// peerHandler keeps track of all state related to a specific "peering" peer. +type peerHandler struct { + peer peer.ID + host host.Host + ctx context.Context + cancel context.CancelFunc + + mu sync.Mutex + addrs []multiaddr.Multiaddr + reconnectTimer *time.Timer + + nextDelay time.Duration +} + +// setAddrs sets the addresses for this peer. +func (ph *peerHandler) setAddrs(addrs []multiaddr.Multiaddr) { + // Not strictly necessary, but it helps to not trust the calling code. + addrCopy := make([]multiaddr.Multiaddr, len(addrs)) + copy(addrCopy, addrs) + + ph.mu.Lock() + defer ph.mu.Unlock() + ph.addrs = addrCopy +} + +// getAddrs returns a shared slice of addresses for this peer. Do not modify. +func (ph *peerHandler) getAddrs() []multiaddr.Multiaddr { + ph.mu.Lock() + defer ph.mu.Unlock() + return ph.addrs +} + +// stop permanently stops the peer handler. +func (ph *peerHandler) stop() { + ph.cancel() + + ph.mu.Lock() + defer ph.mu.Unlock() + if ph.reconnectTimer != nil { + ph.reconnectTimer.Stop() + ph.reconnectTimer = nil + } +} + +func (ph *peerHandler) nextBackoff() time.Duration { + if ph.nextDelay < maxBackoff { + ph.nextDelay += ph.nextDelay/2 + time.Duration(rand.Int63n(int64(ph.nextDelay))) + } + + // If we've gone over the max backoff, reduce it under the max. + if ph.nextDelay > maxBackoff { + ph.nextDelay = maxBackoff + // randomize the backoff a bit (10%). + ph.nextDelay -= time.Duration(rand.Int63n(int64(maxBackoff) * maxBackoffJitter / 100)) + } + + return ph.nextDelay +} + +func (ph *peerHandler) reconnect() { + // Try connecting + addrs := ph.getAddrs() + logger.Debugw("reconnecting", "peer", ph.peer, "addrs", addrs) + + err := ph.host.Connect(ph.ctx, peer.AddrInfo{ID: ph.peer, Addrs: addrs}) + if err != nil { + logger.Debugw("failed to reconnect", "peer", ph.peer, "error", err) + // Ok, we failed. Extend the timeout. + ph.mu.Lock() + if ph.reconnectTimer != nil { + // Only counts if the reconnectTimer still exists. If not, a + // connection _was_ somehow established. + ph.reconnectTimer.Reset(ph.nextBackoff()) + } + // Otherwise, someone else has stopped us so we can assume that + // we're either connected or someone else will start us. + ph.mu.Unlock() + } + + // Always call this. We could have connected since we processed the + // error. + ph.stopIfConnected() +} + +func (ph *peerHandler) stopIfConnected() { + ph.mu.Lock() + defer ph.mu.Unlock() + + if ph.reconnectTimer != nil && ph.host.Network().Connectedness(ph.peer) == network.Connected { + logger.Debugw("successfully reconnected", "peer", ph.peer) + ph.reconnectTimer.Stop() + ph.reconnectTimer = nil + ph.nextDelay = initialDelay + } +} + +// startIfDisconnected is the inverse of stopIfConnected. +func (ph *peerHandler) startIfDisconnected() { + ph.mu.Lock() + defer ph.mu.Unlock() + + if ph.reconnectTimer == nil && ph.host.Network().Connectedness(ph.peer) != network.Connected { + logger.Debugw("disconnected from peer", "peer", ph.peer) + // Always start with a short timeout so we can stagger things a bit. + ph.reconnectTimer = time.AfterFunc(ph.nextBackoff(), ph.reconnect) + } +} + +// PeeringService maintains connections to specified peers, reconnecting on +// disconnect with a back-off. +type PeeringService struct { + host host.Host + + mu sync.RWMutex + peers map[peer.ID]*peerHandler + state state +} + +// NewPeeringService constructs a new peering service. Peers can be added and +// removed immediately, but connections won't be formed until `Start` is called. +func NewPeeringService(host host.Host) *PeeringService { + return &PeeringService{host: host, peers: make(map[peer.ID]*peerHandler)} +} + +// Start starts the peering service, connecting and maintaining connections to +// all registered peers. It returns an error if the service has already been +// stopped. +func (ps *PeeringService) Start() error { + ps.mu.Lock() + defer ps.mu.Unlock() + + switch ps.state { + case stateInit: + logger.Infow("starting") + case stateRunning: + return nil + case stateStopped: + return errors.New("already stopped") + } + ps.host.Network().Notify((*netNotifee)(ps)) + ps.state = stateRunning + for _, handler := range ps.peers { + go handler.startIfDisconnected() + } + return nil +} + +// Stop stops the peering service. +func (ps *PeeringService) Stop() error { + ps.host.Network().StopNotify((*netNotifee)(ps)) + + ps.mu.Lock() + defer ps.mu.Unlock() + + switch ps.state { + case stateInit, stateRunning: + logger.Infow("stopping") + for _, handler := range ps.peers { + handler.stop() + } + ps.state = stateStopped + } + return nil +} + +// AddPeer adds a peer to the peering service. This function may be safely +// called at any time: before the service is started, while running, or after it +// stops. +// +// Add peer may also be called multiple times for the same peer. The new +// addresses will replace the old. +func (ps *PeeringService) AddPeer(info peer.AddrInfo) { + ps.mu.Lock() + defer ps.mu.Unlock() + + if handler, ok := ps.peers[info.ID]; ok { + logger.Infow("updating addresses", "peer", info.ID, "addrs", info.Addrs) + handler.setAddrs(info.Addrs) + } else { + logger.Infow("peer added", "peer", info.ID, "addrs", info.Addrs) + ps.host.ConnManager().Protect(info.ID, connmgrTag) + + handler = &peerHandler{ + host: ps.host, + peer: info.ID, + addrs: info.Addrs, + nextDelay: initialDelay, + } + handler.ctx, handler.cancel = context.WithCancel(context.Background()) + ps.peers[info.ID] = handler + switch ps.state { + case stateRunning: + go handler.startIfDisconnected() + case stateStopped: + // We still construct everything in this state because + // it's easier to reason about. But we should still free + // resources. + handler.cancel() + } + } +} + +// RemovePeer removes a peer from the peering service. This function may be +// safely called at any time: before the service is started, while running, or +// after it stops. +func (ps *PeeringService) RemovePeer(id peer.ID) { + ps.mu.Lock() + defer ps.mu.Unlock() + + if handler, ok := ps.peers[id]; ok { + logger.Infow("peer removed", "peer", id) + ps.host.ConnManager().Unprotect(id, connmgrTag) + + handler.stop() + delete(ps.peers, id) + } +} + +type netNotifee PeeringService + +func (nn *netNotifee) Connected(_ network.Network, c network.Conn) { + ps := (*PeeringService)(nn) + + p := c.RemotePeer() + ps.mu.RLock() + defer ps.mu.RUnlock() + + if handler, ok := ps.peers[p]; ok { + // use a goroutine to avoid blocking events. + go handler.stopIfConnected() + } +} +func (nn *netNotifee) Disconnected(_ network.Network, c network.Conn) { + ps := (*PeeringService)(nn) + + p := c.RemotePeer() + ps.mu.RLock() + defer ps.mu.RUnlock() + + if handler, ok := ps.peers[p]; ok { + // use a goroutine to avoid blocking events. + go handler.startIfDisconnected() + } +} +func (nn *netNotifee) OpenedStream(network.Network, network.Stream) {} +func (nn *netNotifee) ClosedStream(network.Network, network.Stream) {} +func (nn *netNotifee) Listen(network.Network, multiaddr.Multiaddr) {} +func (nn *netNotifee) ListenClose(network.Network, multiaddr.Multiaddr) {} diff --git a/peering/peering_test.go b/peering/peering_test.go new file mode 100644 index 00000000000..1f21b7816a2 --- /dev/null +++ b/peering/peering_test.go @@ -0,0 +1,158 @@ +package peering + +import ( + "context" + "testing" + "time" + + "github.com/libp2p/go-libp2p" + connmgr "github.com/libp2p/go-libp2p-connmgr" + "github.com/libp2p/go-libp2p-core/host" + "github.com/libp2p/go-libp2p-core/network" + "github.com/libp2p/go-libp2p-core/peer" + + "github.com/stretchr/testify/require" +) + +func newNode(ctx context.Context, t *testing.T) host.Host { + h, err := libp2p.New( + ctx, + libp2p.ListenAddrStrings("/ip4/127.0.0.1/tcp/0"), + // We'd like to set the connection manager low water to 0, but + // that would disable the connection manager. + libp2p.ConnectionManager(connmgr.NewConnManager(1, 100, 0)), + ) + require.NoError(t, err) + return h +} + +func TestPeeringService(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + h1 := newNode(ctx, t) + ps1 := NewPeeringService(h1) + + h2 := newNode(ctx, t) + h3 := newNode(ctx, t) + h4 := newNode(ctx, t) + + // peer 1 -> 2 + ps1.AddPeer(peer.AddrInfo{ID: h2.ID(), Addrs: h2.Addrs()}) + + // We haven't started so we shouldn't have any peers. + require.Never(t, func() bool { + return len(h1.Network().Peers()) > 0 + }, 100*time.Millisecond, 1*time.Second, "expected host 1 to have no peers") + + // Use p4 to take up the one slot we have in the connection manager. + for _, h := range []host.Host{h1, h2} { + require.NoError(t, h.Connect(ctx, peer.AddrInfo{ID: h4.ID(), Addrs: h4.Addrs()})) + h.ConnManager().TagPeer(h4.ID(), "sticky-peer", 1000) + } + + // Now start. + require.NoError(t, ps1.Start()) + // starting twice is fine. + require.NoError(t, ps1.Start()) + + // We should eventually connect. + require.Eventually(t, func() bool { + return h1.Network().Connectedness(h2.ID()) == network.Connected + }, 30*time.Second, 10*time.Millisecond) + + // Now explicitly connect to p3. + require.NoError(t, h1.Connect(ctx, peer.AddrInfo{ID: h3.ID(), Addrs: h3.Addrs()})) + require.Eventually(t, func() bool { + return h1.Network().Connectedness(h2.ID()) == network.Connected + }, 30*time.Second, 100*time.Millisecond) + + require.Len(t, h1.Network().Peers(), 3) + + // force a disconnect + h1.ConnManager().TrimOpenConns(ctx) + + // Should disconnect from p3. + require.Eventually(t, func() bool { + return h1.Network().Connectedness(h3.ID()) != network.Connected + }, 5*time.Second, 10*time.Millisecond) + + // Should remain connected to p2 + require.Never(t, func() bool { + return h1.Network().Connectedness(h2.ID()) != network.Connected + }, 5*time.Second, 1*time.Second) + + // Now force h2 to disconnect (we have an asymmetric peering). + conns := h2.Network().ConnsToPeer(h1.ID()) + require.NotEmpty(t, conns) + h2.ConnManager().TrimOpenConns(ctx) + + // All conns to peer should eventually close. + for _, c := range conns { + require.Eventually(t, func() bool { + s, err := c.NewStream() + if s != nil { + _ = s.Reset() + } + return err != nil + }, 5*time.Second, 10*time.Millisecond) + } + + // Should eventually re-connect. + require.Eventually(t, func() bool { + return h1.Network().Connectedness(h2.ID()) == network.Connected + }, 30*time.Second, 1*time.Second) + + // Unprotect 2 from 1. + ps1.RemovePeer(h2.ID()) + + // Trim connections. + h1.ConnManager().TrimOpenConns(ctx) + + // Should disconnect + require.Eventually(t, func() bool { + return h1.Network().Connectedness(h2.ID()) != network.Connected + }, 5*time.Second, 10*time.Millisecond) + + // Should never reconnect. + require.Never(t, func() bool { + return h1.Network().Connectedness(h2.ID()) == network.Connected + }, 20*time.Second, 1*time.Second) + + // Until added back + ps1.AddPeer(peer.AddrInfo{ID: h2.ID(), Addrs: h2.Addrs()}) + ps1.AddPeer(peer.AddrInfo{ID: h3.ID(), Addrs: h3.Addrs()}) + require.Eventually(t, func() bool { + return h1.Network().Connectedness(h2.ID()) == network.Connected + }, 30*time.Second, 1*time.Second) + require.Eventually(t, func() bool { + return h1.Network().Connectedness(h3.ID()) == network.Connected + }, 30*time.Second, 1*time.Second) + + // Should be able to repeatedly stop. + require.NoError(t, ps1.Stop()) + require.NoError(t, ps1.Stop()) + + // Adding and removing should work after stopping. + ps1.AddPeer(peer.AddrInfo{ID: h4.ID(), Addrs: h4.Addrs()}) + ps1.RemovePeer(h2.ID()) +} + +func TestNextBackoff(t *testing.T) { + minMaxBackoff := (100 - maxBackoffJitter) / 100 * maxBackoff + for x := 0; x < 1000; x++ { + ph := peerHandler{nextDelay: time.Second} + for min, max := time.Second*3/2, time.Second*5/2; min < minMaxBackoff; min, max = min*3/2, max*5/2 { + b := ph.nextBackoff() + if b > max || b < min { + t.Errorf("expected backoff %s to be between %s and %s", b, min, max) + } + } + for i := 0; i < 100; i++ { + b := ph.nextBackoff() + if b < minMaxBackoff || b > maxBackoff { + t.Fatal("failed to stay within max bounds") + } + } + } +} diff --git a/test/sharness/t0171-peering.sh b/test/sharness/t0171-peering.sh new file mode 100755 index 00000000000..9b775cb3cbe --- /dev/null +++ b/test/sharness/t0171-peering.sh @@ -0,0 +1,127 @@ +#!/usr/bin/env bash + +test_description="Test peering service" + +. lib/test-lib.sh + +NUM_NODES=3 + +test_expect_success 'init iptb' ' + rm -rf .iptb/ && + iptb testbed create -type localipfs -count $NUM_NODES -init +' + +test_expect_success 'disabling routing' ' + iptb run -- ipfs config Routing.Type none +' + +for i in $(seq 0 2); do + ADDR="$(printf '["/ip4/127.0.0.1/tcp/%s"]' "$(( 3000 + ( RANDOM % 1000 ) ))")" + test_expect_success "configuring node $i to listen on $ADDR" ' + ipfsi "$i" config --json Addresses.Swarm "$ADDR" + ' +done + +peer_id() { + ipfsi "$1" config Identity.PeerID +} + +peer_addrs() { + ipfsi "$1" config Addresses.Swarm +} + +peer() { + PEER1="$1" && + PEER2="$2" && + PEER_LIST="$(ipfsi "$PEER1" config Peering.Peers)" && + { [[ "$PEER_LIST" == "null" ]] || PEER_LIST_INNER="${PEER_LIST:1:-1}"; } && + ADDR_INFO="$(printf '[%s{"ID": "%s", "Addrs": %s}]' \ + "${PEER_LIST_INNER:+${PEER_LIST_INNER},}" \ + "$(peer_id "$PEER2")" \ + "$(peer_addrs "$PEER2")")" && + ipfsi "$PEER1" config --json Peering.Peers "${ADDR_INFO}" +} + +# Peer: +# - 0 <-> 1 +# - 1 -> 2 +test_expect_success 'configure peering' ' + peer 0 1 && + peer 1 0 && + peer 1 2 +' + +list_peers() { + ipfsi "$1" swarm peers | sed 's|.*/p2p/\([^/]*\)$|\1|' | sort -u +} + +check_peers() { + sleep 20 # give it some time to settle. + test_expect_success 'verifying peering for peer 0' ' + list_peers 0 > peers_0_actual && + peer_id 1 > peers_0_expected && + test_cmp peers_0_expected peers_0_actual + ' + + test_expect_success 'verifying peering for peer 1' ' + list_peers 1 > peers_1_actual && + { peer_id 0 && peer_id 2 ; } | sort -u > peers_1_expected && + test_cmp peers_1_expected peers_1_actual + ' + + test_expect_success 'verifying peering for peer 2' ' + list_peers 2 > peers_2_actual && + peer_id 1 > peers_2_expected && + test_cmp peers_2_expected peers_2_actual + ' +} + +test_expect_success 'startup cluster' ' + iptb start -wait && + iptb run -- ipfs log level peering debug +' + +check_peers + +disconnect() { + ipfsi "$1" swarm disconnect "/p2p/$(peer_id "$2")" +} + +# Bidirectional peering shouldn't cause problems (e.g., simultaneous connect +# issues). +test_expect_success 'disconnecting 0->1' ' + disconnect 0 1 +' + +check_peers + +# 1 should reconnect to 2 when 2 disconnects from 1. +test_expect_success 'disconnecting 2->1' ' + disconnect 2 1 +' + +check_peers + +# 2 isn't peering. This test ensures that 1 will re-peer with 2 when it comes +# back online. +test_expect_success 'stopping 2' ' + iptb stop 2 +' + +# Wait to disconnect +sleep 30 + +test_expect_success 'starting 2' ' + iptb start 2 +' + +# Wait for backoff +sleep 30 + +check_peers + +test_expect_success "stop testbed" ' + iptb stop +' + +test_done