Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix constructor ordering #698

Merged
merged 3 commits into from
Dec 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,14 +218,21 @@ func New(ctx context.Context, h host.Host, options ...Option) (*IpfsDHT, error)
// handle providers
dht.proc.AddChild(dht.ProviderManager.Process())

dht.proc.Go(dht.populatePeers)

// go-routine to make sure we ALWAYS have RT peer addresses in the peerstore
// since RT membership is decoupled from connectivity
go dht.persistRTPeersInPeerStore()

dht.proc.Go(dht.rtPeerLoop)

// Fill routing table with currently connected peers that are DHT servers
dht.plk.Lock()
for _, p := range dht.host.Network().Peers() {
dht.peerFound(dht.ctx, p, false)
}
dht.plk.Unlock()

dht.proc.Go(dht.populatePeers)

return dht, nil
}

Expand Down
2 changes: 1 addition & 1 deletion dht_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ type config struct {
diversityFilter peerdiversity.PeerIPGroupFilter
}

bootstrapPeers []peer.AddrInfo
bootstrapPeers []peer.AddrInfo

// test specific config options
disableFixLowPeers bool
Expand Down
78 changes: 74 additions & 4 deletions dht_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1871,12 +1871,12 @@ func TestV1ProtocolOverride(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

d1 := setupDHT(ctx, t, false, V1ProtocolOverride("/myproto") )
d2 := setupDHT(ctx, t, false, V1ProtocolOverride("/myproto") )
d1 := setupDHT(ctx, t, false, V1ProtocolOverride("/myproto"))
d2 := setupDHT(ctx, t, false, V1ProtocolOverride("/myproto"))
d3 := setupDHT(ctx, t, false, V1ProtocolOverride("/myproto2"))
d4 := setupDHT(ctx, t, false)

dhts := []*IpfsDHT{d1,d2,d3,d4}
dhts := []*IpfsDHT{d1, d2, d3, d4}

for i, dout := range dhts {
for _, din := range dhts[i+1:] {
Expand All @@ -1893,7 +1893,7 @@ func TestV1ProtocolOverride(t *testing.T) {
t.Fatal("should have one peer in the routing table")
}

if d3.RoutingTable().Size() > 0 || d4.RoutingTable().Size() > 0{
if d3.RoutingTable().Size() > 0 || d4.RoutingTable().Size() > 0 {
t.Fatal("should have an empty routing table")
}
}
Expand Down Expand Up @@ -2023,3 +2023,73 @@ func TestBootStrapWhenRTIsEmpty(t *testing.T) {
rt.Find(bootstrappers[2].self) != "" && rt.Find(bootstrapcons[1].self) != "" && rt.Find(bootstrapcons[2].self) != ""
}, 5*time.Second, 500*time.Millisecond)
}

func TestPreconnectedNodes(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// If this test fails it may hang so set a timeout
ctx, cancel = context.WithTimeout(ctx, time.Second*10)
defer cancel()

opts := []Option{
testPrefix,
DisableAutoRefresh(),
Mode(ModeServer),
}

// Create hosts
h1 := bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport))
h2 := bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport))

// Setup first DHT
d1, err := New(ctx, h1, opts...)
if err != nil {
t.Fatal(err)
}

// Connect the first host to the second
if err := h1.Connect(ctx, peer.AddrInfo{ID: h2.ID(), Addrs: h2.Addrs()}); err != nil {
t.Fatal(err)
}

// Wait until we know identify has completed by checking for supported protocols
// TODO: Is this needed? Could we do h2.Connect(h1) and that would wait for identify to complete.
for {
h1Protos, err := h2.Peerstore().SupportsProtocols(h1.ID(), d1.protocolsStrs...)
if err != nil {
t.Fatal(err)
}

if len(h1Protos) > 0 {
break
}

select {
case <-time.After(time.Millisecond * 100):
case <-ctx.Done():
t.Fatal("test hung")
}
}

// Setup the second DHT
d2, err := New(ctx, h2, opts...)
if err != nil {
t.Fatal(err)
}

// See if it works
peerCh, err := d2.GetClosestPeers(ctx, "testkey")
if err != nil {
t.Fatal(err)
}

select {
case p := <-peerCh:
if p == h1.ID() {
break
}
t.Fatal("could not find peer")
case <-ctx.Done():
t.Fatal("test hung")
}
}
7 changes: 0 additions & 7 deletions subscriber_notifee.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,6 @@ func newSubscriberNotifiee(dht *IpfsDHT) (*subscriberNotifee, error) {
// register for network notifications
dht.host.Network().Notify(nn)

// Fill routing table with currently connected peers that are DHT servers
dht.plk.Lock()
defer dht.plk.Unlock()
for _, p := range dht.host.Network().Peers() {
dht.peerFound(dht.ctx, p, false)
}

return nn, nil
}

Expand Down