Skip to content

Commit

Permalink
simplify peer disconnect detection logic to ensure a single outgoing …
Browse files Browse the repository at this point in the history
…stream
  • Loading branch information
vyzo committed Dec 13, 2018
1 parent 2621f89 commit fc7795c
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 8 deletions.
17 changes: 17 additions & 0 deletions comm.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,29 @@ func (p *PubSub) handleNewPeer(ctx context.Context, pid peer.ID, outgoing <-chan
}

go p.handleSendingMessages(ctx, s, outgoing)
go p.handlePeerEOF(ctx, s)
select {
case p.newPeerStream <- s:
case <-ctx.Done():
}
}

func (p *PubSub) handlePeerEOF(ctx context.Context, s inet.Stream) {
r := ggio.NewDelimitedReader(s, 1<<20)
rpc := new(RPC)
for {
err := r.ReadMsg(&rpc.RPC)
if err != nil {
select {
case p.peerDead <- s.Conn().RemotePeer():
case <-ctx.Done():
}
return
}
log.Warning("unexpected message from ", s.Conn().RemotePeer())
}
}

func (p *PubSub) handleSendingMessages(ctx context.Context, s inet.Stream, outgoing <-chan *RPC) {
bufw := bufio.NewWriter(s)
wc := ggio.NewDelimitedWriter(bufw)
Expand Down
6 changes: 0 additions & 6 deletions notify.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,6 @@ func (p *PubSubNotif) Connected(n inet.Network, c inet.Conn) {
}

func (p *PubSubNotif) Disconnected(n inet.Network, c inet.Conn) {
go func() {
select {
case p.peerDead <- c.RemotePeer():
case <-p.ctx.Done():
}
}()
}

func (p *PubSubNotif) Listen(n inet.Network, _ ma.Multiaddr) {
Expand Down
3 changes: 1 addition & 2 deletions pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,8 +318,7 @@ func (p *PubSub) processLoop(ctx context.Context) {

if p.host.Network().Connectedness(pid) == inet.Connected {
// still connected, must be a duplicate connection being closed.
// we respawn the writer as we need to ensure there is at leat one active
// at worst we can end with two writers pushing messages from the same channel.
// we respawn the writer as we need to ensure there is a stream active
log.Warning("peer declared dead but still connected; respawning writer: ", pid)
go p.handleNewPeer(ctx, pid, ch)
continue
Expand Down

0 comments on commit fc7795c

Please sign in to comment.