Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

reuse streams for dht messaging #2817

Merged
merged 3 commits into from
Jun 8, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions routing/dht/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ type IpfsDHT struct {

ctx context.Context
proc goprocess.Process

strmap map[peer.ID]*messageSender
smlk sync.Mutex
}

// NewDHT creates a new DHT object with the given peer as the 'local' host
Expand All @@ -77,6 +80,7 @@ func NewDHT(ctx context.Context, h host.Host, dstore ds.Datastore) *IpfsDHT {
return nil
})

dht.strmap = make(map[peer.ID]*messageSender)
dht.ctx = ctx

h.SetStreamHandler(ProtocolDHT, dht.handleNewStream)
Expand Down
228 changes: 163 additions & 65 deletions routing/dht/dht_net.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package dht

import (
"errors"
"sync"
"time"

pb "github.com/ipfs/go-ipfs/routing/dht/pb"
Expand All @@ -27,40 +27,42 @@ func (dht *IpfsDHT) handleNewMessage(s inet.Stream) {
w := ggio.NewDelimitedWriter(cw)
mPeer := s.Conn().RemotePeer()

// receive msg
pmes := new(pb.Message)
if err := r.ReadMsg(pmes); err != nil {
log.Debugf("Error unmarshaling data: %s", err)
return
}

// update the peer (on valid msgs only)
dht.updateFromMessage(ctx, mPeer, pmes)

// get handler for this msg type.
handler := dht.handlerForMsgType(pmes.GetType())
if handler == nil {
log.Debug("got back nil handler from handlerForMsgType")
return
}

// dispatch handler.
rpmes, err := handler(ctx, mPeer, pmes)
if err != nil {
log.Debugf("handle message error: %s", err)
return
}

// if nil response, return it before serializing
if rpmes == nil {
log.Debug("Got back nil response from request.")
return
}

// send out response msg
if err := w.WriteMsg(rpmes); err != nil {
log.Debugf("send response error: %s", err)
return
for {
// receive msg
pmes := new(pb.Message)
if err := r.ReadMsg(pmes); err != nil {
log.Debugf("Error unmarshaling data: %s", err)
return
}

// update the peer (on valid msgs only)
dht.updateFromMessage(ctx, mPeer, pmes)

// get handler for this msg type.
handler := dht.handlerForMsgType(pmes.GetType())
if handler == nil {
log.Debug("got back nil handler from handlerForMsgType")
return
}

// dispatch handler.
rpmes, err := handler(ctx, mPeer, pmes)
if err != nil {
log.Debugf("handle message error: %s", err)
return
}

// if nil response, return it before serializing
if rpmes == nil {
log.Debug("Got back nil response from request.")
continue
}

// send out response msg
if err := w.WriteMsg(rpmes); err != nil {
log.Debugf("send response error: %s", err)
return
}
}

return
Expand All @@ -70,32 +72,14 @@ func (dht *IpfsDHT) handleNewMessage(s inet.Stream) {
// measure the RTT for latency measurements.
func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) {

log.Debugf("%s DHT starting stream", dht.self)
s, err := dht.host.NewStream(ctx, ProtocolDHT, p)
if err != nil {
return nil, err
}
defer s.Close()

cr := ctxio.NewReader(ctx, s) // ok to use. we defer close stream in this func
cw := ctxio.NewWriter(ctx, s) // ok to use. we defer close stream in this func
r := ggio.NewDelimitedReader(cr, inet.MessageSizeMax)
w := ggio.NewDelimitedWriter(cw)
ms := dht.messageSenderForPeer(p)

start := time.Now()

if err := w.WriteMsg(pmes); err != nil {
return nil, err
}
log.Event(ctx, "dhtSentMessage", dht.self, p, pmes)

rpmes := new(pb.Message)
if err := r.ReadMsg(rpmes); err != nil {
rpmes, err := ms.SendRequest(ctx, pmes)
if err != nil {
return nil, err
}
if rpmes == nil {
return nil, errors.New("no response to request")
}

// update the peer (on valid msgs only)
dht.updateFromMessage(ctx, p, rpmes)
Expand All @@ -108,24 +92,138 @@ func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.ID, pmes *pb.Message
// sendMessage sends out a message
func (dht *IpfsDHT) sendMessage(ctx context.Context, p peer.ID, pmes *pb.Message) error {

log.Debugf("%s DHT starting stream", dht.self)
s, err := dht.host.NewStream(ctx, ProtocolDHT, p)
ms := dht.messageSenderForPeer(p)

if err := ms.SendMessage(ctx, pmes); err != nil {
return err
}
log.Event(ctx, "dhtSentMessage", dht.self, p, pmes)
return nil
}

func (dht *IpfsDHT) updateFromMessage(ctx context.Context, p peer.ID, mes *pb.Message) error {
dht.Update(ctx, p)
return nil
}

func (dht *IpfsDHT) messageSenderForPeer(p peer.ID) *messageSender {
dht.smlk.Lock()
defer dht.smlk.Unlock()

ms, ok := dht.strmap[p]
if !ok {
ms = dht.newMessageSender(p)
dht.strmap[p] = ms
}

return ms
}

type messageSender struct {
s inet.Stream
r ggio.ReadCloser
w ggio.WriteCloser
lk sync.Mutex
p peer.ID
dht *IpfsDHT

singleMes int
}

func (dht *IpfsDHT) newMessageSender(p peer.ID) *messageSender {
return &messageSender{p: p, dht: dht}
}

func (ms *messageSender) prep() error {
if ms.s != nil {
return nil
}

nstr, err := ms.dht.host.NewStream(ms.dht.ctx, ProtocolDHT, ms.p)
if err != nil {
return err
}
defer s.Close()

cw := ctxio.NewWriter(ctx, s) // ok to use. we defer close stream in this func
w := ggio.NewDelimitedWriter(cw)
ms.r = ggio.NewDelimitedReader(nstr, inet.MessageSizeMax)
ms.w = ggio.NewDelimitedWriter(nstr)
ms.s = nstr

return nil
}

// streamReuseTries is the number of times we will try to reuse a stream to a
// given peer before giving up and reverting to the old one-message-per-stream
// behaviour.
const streamReuseTries = 3

if err := w.WriteMsg(pmes); err != nil {
func (ms *messageSender) SendMessage(ctx context.Context, pmes *pb.Message) error {
ms.lk.Lock()
defer ms.lk.Unlock()
if err := ms.prep(); err != nil {
return err
}
log.Event(ctx, "dhtSentMessage", dht.self, p, pmes)

if err := ms.writeMessage(pmes); err != nil {
return err
}

if ms.singleMes > streamReuseTries {
ms.s.Close()
ms.s = nil
}

return nil
}

func (dht *IpfsDHT) updateFromMessage(ctx context.Context, p peer.ID, mes *pb.Message) error {
dht.Update(ctx, p)
func (ms *messageSender) writeMessage(pmes *pb.Message) error {
err := ms.w.WriteMsg(pmes)
if err != nil {
// If the other side isnt expecting us to be reusing streams, we're gonna
// end up erroring here. To make sure things work seamlessly, lets retry once
// before continuing

log.Infof("error writing message: ", err)
ms.s.Close()
ms.s = nil
if err := ms.prep(); err != nil {
return err
}

if err := ms.w.WriteMsg(pmes); err != nil {
return err
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this area is a bit error prone for those who may come in here to edit. AFAICT, the call graph is:

ms.SendMessage AND ms.SendRequest
  -> ms.prep
  -> ms.writeMessage
    -> ms.w.WriteMsg
    -> ms.prep
    -> ms.w.WriteMsg

With stream being set to nil in write message. i would imagine this being a bit safer to edits:

ms.SendMessage AND ms.SendRequest
  -> ms.writeMessage
    -> for (tries = 2)
      -> ms.prep
      -> ms.w.WriteMsg

Basically, push the prep call down into writeMessage and reframe it a bit.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

something like this could work. warning: have not tested.

// make sure it's always cleared exactly the same way
func (ms *messageSender) clearStream() {
  ms.s.Close()
  ms.s = nil
}

func (ms *messageSender) SendMessage(ctx context.Context, pmes *pb.Message) error {
  ms.lk.Lock()
  defer ms.lk.Unlock()

  if err := ms.writeMessage(pmes); err != nil {
    return err
  }

  if ms.singleMes > streamReuseTries {
    ms.clearStream()
  }

  return nil
}

func (ms *messageSender) SendRequest(ctx context.Context, pmes *pb.Message) (*pb.Message, error) {
  ms.lk.Lock()
  defer ms.lk.Unlock()

  if err := ms.writeMessage(pmes); err != nil {
    return nil, err
  }

  log.Event(ctx, "dhtSentMessage", ms.dht.self, ms.p, pmes)

  mes := new(pb.Message)
  if err := ms.r.ReadMsg(mes); err != nil {
    ms.clearStream()
    return nil, err
  }

  if ms.singleMes > streamReuseTries {
    ms.clearStream()
  }

  return mes, nil
}

func (ms *messageSender) writeMessage(pmes *pb.Message) error {

  var didRetrySending = false

  // alternatively can use a for (i := 0; i < 2; i++) { ... }. but this seemed cleaner
trySending:

  if err := ms.prep(); err != nil {
    return err
  }

  if err := ms.w.WriteMsg(pmes); err != nil {

    // If the other side isnt expecting us to be reusing streams, we're gonna
    // end up erroring here. To make sure things work seamlessly, lets retry once
    // before continuing
    if !didRetrySending {
      log.Infof("error writing message: ", err)
      ms.clearStream()
      didRetrySending = true
      goto trySending
    }

    return err
  }

  if didRetrySending { // and succeeded the second time.
    // keep track of this happening. If it happens a few times, its
    // likely we can assume the otherside will never support stream reuse
    ms.singleMes++
  }

  return nil
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could maybe push this block into the beginning of writeMessage too:

if ms.singleMes > streamReuseTries {
    ms.clearStream()
}

But i'm not sure if leaving the stream hanging for a while like that is bad. probably, so i left it as is.


// keep track of this happening. If it happens a few times, its
// likely we can assume the otherside will never support stream reuse
ms.singleMes++
}
return nil
}

func (ms *messageSender) SendRequest(ctx context.Context, pmes *pb.Message) (*pb.Message, error) {
ms.lk.Lock()
defer ms.lk.Unlock()
if err := ms.prep(); err != nil {
return nil, err
}

if err := ms.writeMessage(pmes); err != nil {
return nil, err
}

log.Event(ctx, "dhtSentMessage", ms.dht.self, ms.p, pmes)

mes := new(pb.Message)
if err := ms.r.ReadMsg(mes); err != nil {
ms.s.Close()
ms.s = nil
return nil, err
}

if ms.singleMes > streamReuseTries {
ms.s.Close()
ms.s = nil
}

return mes, nil
}
6 changes: 2 additions & 4 deletions routing/dht/ext_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package dht

import (
"io"
"io/ioutil"
"math/rand"
"testing"
"time"
Expand Down Expand Up @@ -40,8 +39,7 @@ func TestGetFailures(t *testing.T) {

// Reply with failures to every message
hosts[1].SetStreamHandler(ProtocolDHT, func(s inet.Stream) {
defer s.Close()
io.Copy(ioutil.Discard, s)
s.Close()
})

// This one should time out
Expand All @@ -51,7 +49,7 @@ func TestGetFailures(t *testing.T) {
err = merr[0]
}

if err.Error() != "process closing" {
if err != io.EOF {
t.Fatal("Got different error than we expected", err)
}
} else {
Expand Down