Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add latency and bandwidth options to mocknet #1431

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion exchange/bitswap/testutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (i *Instance) SetBlockstoreLatency(t time.Duration) time.Duration {
// NB: It's easy make mistakes by providing the same peer ID to two different
// sessions. To safeguard, use the SessionGenerator to generate sessions. It's
// just a much better idea.
func session(ctx context.Context, net tn.Network, p testutil.Identity) Instance {
func Session(ctx context.Context, net tn.Network, p testutil.Identity) Instance {
bsdelay := delay.Fixed(0)
const writeCacheElems = 100

Expand Down
8 changes: 4 additions & 4 deletions p2p/net/mock/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,12 @@
package mocknet

import (
"io"
"time"

ic "github.com/ipfs/go-ipfs/p2p/crypto"
host "github.com/ipfs/go-ipfs/p2p/host"
inet "github.com/ipfs/go-ipfs/p2p/net"
peer "github.com/ipfs/go-ipfs/p2p/peer"
"io"
"time"

ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
)
Expand Down Expand Up @@ -59,13 +58,14 @@ type Mocknet interface {
ConnectNets(inet.Network, inet.Network) (inet.Conn, error)
DisconnectPeers(peer.ID, peer.ID) error
DisconnectNets(inet.Network, inet.Network) error
LinkAll() error
}

// LinkOptions are used to change aspects of the links.
// Sorry but they dont work yet :(
type LinkOptions struct {
Latency time.Duration
Bandwidth int // in bytes-per-second
Bandwidth float64 // in bytes-per-second
// we can make these values distributions down the road.
}

Expand Down
33 changes: 26 additions & 7 deletions p2p/net/mock/mock_link.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package mocknet

import (
// "fmt"
"io"
"sync"
"time"

inet "github.com/ipfs/go-ipfs/p2p/net"
peer "github.com/ipfs/go-ipfs/p2p/peer"
Expand All @@ -11,17 +13,20 @@ import (
// link implements mocknet.Link
// and, for simplicity, inet.Conn
type link struct {
mock *mocknet
nets []*peernet
opts LinkOptions

mock *mocknet
nets []*peernet
opts LinkOptions
ratelimiter *ratelimiter
// this could have addresses on both sides.

sync.RWMutex
}

func newLink(mn *mocknet, opts LinkOptions) *link {
return &link{mock: mn, opts: opts}
l := &link{mock: mn,
opts: opts,
ratelimiter: NewRatelimiter(opts.Bandwidth)}
return l
}

func (l *link) newConnPair(dialer *peernet) (*conn, *conn) {
Expand Down Expand Up @@ -57,8 +62,13 @@ func (l *link) newStreamPair() (*stream, *stream) {
r1, w1 := io.Pipe()
r2, w2 := io.Pipe()

s1 := &stream{Reader: r1, Writer: w2}
s2 := &stream{Reader: r2, Writer: w1}
s1 := &stream{Reader: r1, Writer: w2, toDeliver: make(chan *transportObject),
done: make(chan bool)}
s2 := &stream{Reader: r2, Writer: w1, toDeliver: make(chan *transportObject),
done: make(chan bool)}

go s1.transport()
go s2.transport()
return s1, s2
}

Expand Down Expand Up @@ -86,8 +96,17 @@ func (l *link) Peers() []peer.ID {

func (l *link) SetOptions(o LinkOptions) {
l.opts = o
l.ratelimiter.UpdateBandwidth(l.opts.Bandwidth)
}

func (l *link) Options() LinkOptions {
return l.opts
}

func (l *link) GetLatency() time.Duration {
return l.opts.Latency
}

func (l *link) RateLimit(dataSize int) time.Duration {
return l.ratelimiter.Limit(dataSize)
}
79 changes: 76 additions & 3 deletions p2p/net/mock/mock_stream.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,41 @@
package mocknet

import (
"io"

"bytes"
"fmt"
inet "github.com/ipfs/go-ipfs/p2p/net"
"io"
"time"
)

// stream implements inet.Stream
type stream struct {
io.Reader
io.Writer
conn *conn
conn *conn
toDeliver chan *transportObject
done chan bool
}

type transportObject struct {
msg []byte
arrivalTime time.Time
}

// How to handle errors with writes?
func (s *stream) Write(p []byte) (n int, err error) {
l := s.conn.link
delay := l.GetLatency() + l.RateLimit(len(p))
t := time.Now().Add(delay)
s.toDeliver <- &transportObject{msg: p, arrivalTime: t}
return len(p), nil
}

func (s *stream) Close() error {
s.conn.removeStream(s)
close(s.toDeliver)
// wait for transport to finish writing/sleeping before closing stream
<-s.done
if r, ok := (s.Reader).(io.Closer); ok {
r.Close()
}
Expand All @@ -30,3 +51,55 @@ func (s *stream) Close() error {
func (s *stream) Conn() inet.Conn {
return s.conn
}

// transport will grab message arrival times, wait until that time, and
// then write the message out when it is scheduled to arrive
func (s *stream) transport() {
bufsize := 256
buf := new(bytes.Buffer)
ticker := time.NewTicker(time.Millisecond * 4)
loop:
for {
select {
case o, ok := <-s.toDeliver:
if !ok {
close(s.done)
return
}

buffered := len(o.msg) + buf.Len()

now := time.Now()
if now.Before(o.arrivalTime) {
if buffered < bufsize {
buf.Write(o.msg)
continue loop
} else {
time.Sleep(o.arrivalTime.Sub(now))
}
}

if buf.Len() > 0 {
_, err := s.Writer.Write(buf.Bytes())
if err != nil {
return
}
buf.Reset()
}

_, err := s.Writer.Write(o.msg)
if err != nil {
fmt.Println("mock_stream", err)
}

case <-ticker.C:
if buf.Len() > 0 {
_, err := s.Writer.Write(buf.Bytes())
if err != nil {
return
}
buf.Reset()
}
}
}
}
24 changes: 24 additions & 0 deletions p2p/net/mock/mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ package mocknet
import (
"bytes"
"io"
"math"
"math/rand"
"sync"
"testing"
"time"

inet "github.com/ipfs/go-ipfs/p2p/net"
peer "github.com/ipfs/go-ipfs/p2p/peer"
Expand Down Expand Up @@ -478,3 +480,25 @@ func TestAdding(t *testing.T) {
}

}

func TestRateLimiting(t *testing.T) {
rl := NewRatelimiter(10)

if !within(rl.Limit(10), time.Duration(float32(time.Second)), time.Millisecond/10) {
t.Fail()
}

rl.UpdateBandwidth(50)
if !within(rl.Limit(75), time.Duration(float32(time.Second)*1.5), time.Millisecond/10) {
t.Fail()
}

rl.UpdateBandwidth(100)
if !within(rl.Limit(1), time.Duration(time.Millisecond*10), time.Millisecond/10) {
t.Fail()
}
}

func within(t1 time.Duration, t2 time.Duration, tolerance time.Duration) bool {
return math.Abs(float64(t1)-float64(t2)) < float64(tolerance)
}
63 changes: 63 additions & 0 deletions p2p/net/mock/ratelimiter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package mocknet

import (
"time"
)

type ratelimiter struct {
bandwidth float64 // bytes per nanosecond
allowance float64 // in bytes
maxAllowance float64 // in bytes
lastUpdate time.Time // when allowance was updated last
count int // number of times rate limiting was applied
duration time.Duration // total delay introduced due to rate limiting
}

func NewRatelimiter(bandwidth float64) *ratelimiter {
// convert bandwidth to bytes per nanosecond
b := bandwidth / float64(time.Second)
return &ratelimiter{
bandwidth: b,
allowance: 0,
maxAllowance: bandwidth,
lastUpdate: time.Now(),
}
}

func (r *ratelimiter) UpdateBandwidth(bandwidth float64) {
b := bandwidth / float64(time.Second)
r.bandwidth = b
r.allowance = 0
r.maxAllowance = bandwidth
r.lastUpdate = time.Now()
}

func (r *ratelimiter) Limit(dataSize int) time.Duration {
// update time
var duration time.Duration = time.Duration(0)
if r.bandwidth == 0 {
return duration
}
current := time.Now()
elapsedTime := current.Sub(r.lastUpdate)
r.lastUpdate = current

allowance := r.allowance + float64(elapsedTime)*r.bandwidth
// allowance can't exceed bandwidth
if allowance > r.maxAllowance {
allowance = r.maxAllowance
}

allowance -= float64(dataSize)
if allowance < 0 {
// sleep until allowance is back to 0
duration = time.Duration(-allowance / r.bandwidth)
allowance = 0
// rate limiting was applied, record stats
r.count++
r.duration += duration
}

r.allowance = allowance
return duration
}