From f899f4451bc8d9028576a0f0d611420a1af0afc1 Mon Sep 17 00:00:00 2001 From: Paul Bellamy Date: Mon, 14 Mar 2016 11:38:21 +0000 Subject: [PATCH 01/16] adding github.com/paypal/ionet for testing --- vendor/github.com/paypal/ionet/LICENSE | 27 ++ vendor/github.com/paypal/ionet/README.md | 23 ++ .../github.com/paypal/ionet/contributing.md | 6 + vendor/github.com/paypal/ionet/ionet.go | 389 ++++++++++++++++++ .../paypal/ionet/ionet_example_test.go | 60 +++ vendor/github.com/paypal/ionet/ionet_test.go | 354 ++++++++++++++++ vendor/manifest | 6 + 7 files changed, 865 insertions(+) create mode 100644 vendor/github.com/paypal/ionet/LICENSE create mode 100644 vendor/github.com/paypal/ionet/README.md create mode 100644 vendor/github.com/paypal/ionet/contributing.md create mode 100644 vendor/github.com/paypal/ionet/ionet.go create mode 100644 vendor/github.com/paypal/ionet/ionet_example_test.go create mode 100644 vendor/github.com/paypal/ionet/ionet_test.go diff --git a/vendor/github.com/paypal/ionet/LICENSE b/vendor/github.com/paypal/ionet/LICENSE new file mode 100644 index 0000000000..058ab47893 --- /dev/null +++ b/vendor/github.com/paypal/ionet/LICENSE @@ -0,0 +1,27 @@ +Copyright (c) 2013 PayPal Inc. All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are +met: + + * Redistributions of source code must retain the above copyright +notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above +copyright notice, this list of conditions and the following disclaimer +in the documentation and/or other materials provided with the +distribution. + * Neither the name of PayPal Inc. nor the names of its +contributors may be used to endorse or promote products derived from +this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. \ No newline at end of file diff --git a/vendor/github.com/paypal/ionet/README.md b/vendor/github.com/paypal/ionet/README.md new file mode 100644 index 0000000000..3ab50cba2c --- /dev/null +++ b/vendor/github.com/paypal/ionet/README.md @@ -0,0 +1,23 @@ +[![Build Status](https://travis-ci.org/paypal/ionet.png)](https://travis-ci.org/paypal/ionet) + +ionet provides a [net.Conn](http://golang.org/pkg/net/#Conn) and a +[net.Listener](http://golang.org/pkg/net/#Listener) in which connections +use an [io.Reader](http://golang.org/pkg/io/#Reader) and an +[io.Writer](http://golang.org/pkg/io/#Writer) instead of a traditional +network stack. + +This can be handy in unit tests, because it enables you to mock out +the network. + +It's also useful when using an external network stack. At PayPal, ionet +is used in [PayPal Beacon](https://www.paypal.com/beacon). Beacon +uses a Bluetooth Low Energy chip accessed over a serial connection. +ionet enables the use of net-based code, such as the +stdlib's [net/http]((http://golang.org/pkg/net/http/), with a +mediated network. + +`go get github.com/paypal/ionet` + +See godoc for usage. + +ionet requires Go 1.1 or later, and is released under a BSD-style license similar to Go's. diff --git a/vendor/github.com/paypal/ionet/contributing.md b/vendor/github.com/paypal/ionet/contributing.md new file mode 100644 index 0000000000..9d3cf40989 --- /dev/null +++ b/vendor/github.com/paypal/ionet/contributing.md @@ -0,0 +1,6 @@ +Contributions are welcome. Please follow the basics: + +* Use `gofmt` and `godoc`. +* Keep master green; changes must be backwards-compatible. +* Leave documentation and tests better than you found them. +* If you're considering a major change, ask first. diff --git a/vendor/github.com/paypal/ionet/ionet.go b/vendor/github.com/paypal/ionet/ionet.go new file mode 100644 index 0000000000..fafcf635b2 --- /dev/null +++ b/vendor/github.com/paypal/ionet/ionet.go @@ -0,0 +1,389 @@ +// Package ionet is a bridge between the stdlib's net and io packages. +// +// ionet provides a net.Conn and a net.Listener in which connections +// use an io.Reader and an io.Writer instead of a traditional network stack. +// +// This can be handy in unit tests, because it enables you to mock out +// the network. +// +// It's also useful when using an external network stack. At PayPal, ionet +// is used in PayPal Beacon. Beacon uses a Bluetooth Low Energy chip accessed +// over a serial connection. ionet enables the use of net-based code, such as +// the stdlib's net/http, with a mediated network. +package ionet + +import ( + "fmt" + "io" + "io/ioutil" + "net" + "sync" + "time" +) + +// Conn is a net.Conn backed by an io.Reader and an io.Writer. +// The zero value for Conn uses a reader that always returns EOF +// and ioutil.Discard as a writer. +// +// "Reader" and "Writer" are relative to which half of +// the connection you are on. R and W in Conn are named from +// the server's (listener's) perspective. That is, the server reads from R +// and writes to W; the client (dialer) does the opposite. See also the +// documentation for Dial. +// +// Conn serializes reads and writes to R and W, so R and W do +// not need to be concurrency-safe. After being closed, no +// new reads/writes will be issued to R or W. However, reads/writes +// that were requested before closing (and which were perhaps blocked) +// may still be passed to R or W. +type Conn struct { + R io.Reader + W io.Writer + rmu sync.Mutex // used to serialize reads from R (io.Reader is not guaranteed to be concurrency-safe) + wmu sync.Mutex // used serialize writes to W (io.Writer is not guaranteed to be concurrency-safe) + + closing chan struct{} // closing will be closed when the Conn is closed + closingmu sync.Mutex // protect closing from concurrent changes (getting set, being closed) + + rdead time.Time // read deadline + rdeadmu sync.RWMutex // lock when being set; rlock when being used + + wdead time.Time // write deadline + wdeadmu sync.RWMutex // lock when being set; rlock when being used +} + +// nerr is a Read/Write result +type nerr struct { + n int + err error +} + +// Read implements the net.Conn interface. +// Read returns net.OpError errors. +func (c *Conn) Read(b []byte) (int, error) { + if c.R == nil { + return 0, c.readErr(false, io.EOF) + } + + c.initClosing() + + // stop now if we're already closed + select { + case <-c.closing: + return 0, c.readErr(false, connClosed) + default: + } + + // stop now if we're already timed out + c.rdeadmu.RLock() + defer c.rdeadmu.RUnlock() + if !c.rdead.IsZero() && c.rdead.Before(time.Now()) { + return 0, c.readErr(true, timedOut) + } + + // start read + readc := make(chan nerr, 1) + go func() { + c.rmu.Lock() + n, err := c.R.Read(b) + c.rmu.Unlock() + readc <- nerr{n, err} + }() + + // set up deadline timeout + var timeout <-chan time.Time + timer := deadlineTimer(c.rdead) // c.rdeadmu read lock already held above + if timer != nil { + timeout = timer.C + defer timer.Stop() + } + + // wait for read success, timeout, or closing + select { + case <-c.closing: + return 0, c.readErr(false, connClosed) + case <-timeout: + return 0, c.readErr(true, timedOut) + case nerr := <-readc: + if nerr.err != nil { + // wrap the error + return nerr.n, c.readErr(false, nerr.err) + } + return nerr.n, nil + } +} + +// Write implements the net.Conn interface. +// Write returns net.OpError errors. +func (c *Conn) Write(b []byte) (int, error) { + if c.W == nil { + // all writes to Discard succeed, so there's no need to wrap errors + return ioutil.Discard.Write(b) + } + + c.initClosing() + + // stop now if we're already closed + select { + case <-c.closing: + return 0, c.writeErr(false, connClosed) + default: + } + + // stop now if we're already timed out + c.wdeadmu.RLock() + defer c.wdeadmu.RUnlock() + if !c.wdead.IsZero() && c.wdead.Before(time.Now()) { + return 0, c.writeErr(true, timedOut) + } + + // start write + writec := make(chan nerr, 1) + go func() { + c.wmu.Lock() + n, err := c.W.Write(b) + c.wmu.Unlock() + writec <- nerr{n, err} + }() + + // set up deadline timeout + var timeout <-chan time.Time + c.wdeadmu.RLock() + timer := deadlineTimer(c.wdead) // c.wdeadmu read lock already held above + if timer != nil { + timeout = timer.C + defer timer.Stop() + } + + // wait for write success, timeout, or closing + select { + case <-c.closing: + return 0, c.writeErr(false, connClosed) + case <-timeout: + return 0, c.writeErr(true, timedOut) + case nerr := <-writec: + if nerr.err != nil { + return nerr.n, c.writeErr(false, nerr.err) // wrap the error + } + return nerr.n, nil + } +} + +// Close implements the net.Conn interface. +// Closing an already closed Conn will +// return an error (a net.OpError). +func (c *Conn) Close() error { + c.initClosing() + + c.closingmu.Lock() + defer c.closingmu.Unlock() + + // short-circuit with an error if we are already closed + select { + case <-c.closing: + return &net.OpError{ + Op: "close", + Net: network, + Addr: c.LocalAddr(), + Err: neterr{timeout: false, err: connClosed}, + } + default: + close(c.closing) + } + + return nil +} + +// Wait blocks until the Conn is closed. +func (c *Conn) Wait() { + c.initClosing() + <-c.closing +} + +// SetDeadline implements the net.Conn interface. +func (c *Conn) SetDeadline(t time.Time) error { + c.SetReadDeadline(t) + c.SetWriteDeadline(t) + return nil +} + +// SetReadDeadline implements the net.Conn interface. +func (c *Conn) SetReadDeadline(t time.Time) error { + c.rdeadmu.Lock() + c.rdead = t + c.rdeadmu.Unlock() + return nil +} + +// SetWriteDeadline implements the net.Conn interface. +func (c *Conn) SetWriteDeadline(t time.Time) error { + c.wdeadmu.Lock() + c.wdead = t + c.wdeadmu.Unlock() + return nil +} + +// LocalAddr implements the net.Conn interface. +func (c *Conn) LocalAddr() net.Addr { return addr("local") } + +// RemoteAddr implements the net.Conn interface. +func (c *Conn) RemoteAddr() net.Addr { return addr("remote") } + +// deadlineTimer returns a time.Timer that fires at the +// provided deadline. If the deadline is 0, it returns nil. +func deadlineTimer(t time.Time) *time.Timer { + if t.IsZero() { + return nil + } + return time.NewTimer(t.Sub(time.Now())) +} + +// initClosing lazily initializes c.closing. +// This helps with the bookkeeping needed +// to make the zero value of Conn usable. +func (c *Conn) initClosing() { + c.closingmu.Lock() + if c.closing == nil { + c.closing = make(chan struct{}) + } + c.closingmu.Unlock() +} + +// readErr wraps a read error in a net.OpError. +func (c *Conn) readErr(timeout bool, e error) *net.OpError { + return &net.OpError{ + Op: "read", + Net: network, + Addr: c.RemoteAddr(), + Err: neterr{timeout: timeout, err: e}, + } +} + +// writeErr wraps a write error in a net.OpError. +func (c *Conn) writeErr(timeout bool, e error) *net.OpError { + return &net.OpError{ + Op: "write", + Net: network, + Addr: c.RemoteAddr(), + Err: neterr{timeout: timeout, err: e}, + } +} + +// Listener is a net.Listener that accepts Conn connections. +type Listener struct { + sync.Mutex // hold when mutating any listener state + connc chan *Conn // channel of available connections + closing chan struct{} // closing will be closed when the listener is closed +} + +// init lazily initializes a Listener. +func (l *Listener) init() { + l.Lock() + defer l.Unlock() + if l.closing == nil { + l.closing = make(chan struct{}) + } + + // Initialize l.connc only if the listener is not yet closed. + select { + case <-l.closing: + default: + if l.connc == nil { + l.connc = make(chan *Conn) + } + } +} + +// Accept implements the net.Listener interface. +// Accept returns net.OpError errors. +func (l *Listener) Accept() (net.Conn, error) { + l.init() + select { + case conn := <-l.connc: + return conn, nil + case <-l.closing: + operr := &net.OpError{ + Op: "accept", + Net: network, + Addr: l.Addr(), + Err: neterr{timeout: false, err: listenerClosed}, + } + return nil, operr + } +} + +// Close implements the net.Listener interface. +// Closing an already closed Listener will +// return an error (a net.OpError). +func (l *Listener) Close() error { + l.init() + + l.Lock() + defer l.Unlock() + + // short-circuit with an error if we are already closed + select { + case <-l.closing: + return &net.OpError{ + Op: "close", + Net: network, + Addr: l.Addr(), + Err: neterr{timeout: false, err: listenerClosed}, + } + default: + l.connc = nil + close(l.closing) + } + + return nil +} + +// Dial connects to a Listener. r and w may be nil; see Conn. +// Note that r and w here are named from the server's perspective, +// so data that you are sending across the connection will be read +// from r, and responses from the connection will be written to w. +// See the documentation in Conn. +func (l *Listener) Dial(r io.Reader, w io.Writer) (*Conn, error) { + l.init() + c := &Conn{R: r, W: w} + select { + case <-l.closing: + operr := &net.OpError{ + Op: "dial", + Net: network, + Addr: l.Addr(), + Err: neterr{timeout: false, err: listenerClosed}, + } + return nil, operr + case l.connc <- c: + return c, nil + } +} + +// Addr implements the net.Listener interface. +func (l *Listener) Addr() net.Addr { return addr("local") } + +// addr is a trivial net.Addr +type addr string + +func (a addr) Network() string { return network } +func (a addr) String() string { return string(a) } + +const network = "ionet" + +// neterr is a simple net.Error +type neterr struct { + temporary bool + timeout bool + err error +} + +func (e neterr) Temporary() bool { return e.temporary } +func (e neterr) Timeout() bool { return e.timeout } +func (e neterr) Error() string { return e.err.Error() } + +var ( + connClosed = fmt.Errorf("conn closed") + timedOut = fmt.Errorf("timed out") + listenerClosed = fmt.Errorf("listener closed") +) diff --git a/vendor/github.com/paypal/ionet/ionet_example_test.go b/vendor/github.com/paypal/ionet/ionet_example_test.go new file mode 100644 index 0000000000..b91dff2b4e --- /dev/null +++ b/vendor/github.com/paypal/ionet/ionet_example_test.go @@ -0,0 +1,60 @@ +package ionet + +import ( + "bufio" + "bytes" + "fmt" + "io/ioutil" + "net/http" +) + +// ExampleListener uses ionet to start a http server, +// connects to it with byte buffers for readers/writers, +// makes a request, and receives and parses the response. +func ExampleListener() { + // Create an ionet.Listener + l := new(Listener) + + // Set up an http server that handles requests using that listener + mux := http.NewServeMux() + mux.Handle("/", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { + rw.Write([]byte("Hello!")) + })) + server := &http.Server{Handler: mux} + go server.Serve(l) + + // Dial our listener with an http request; write the response to a buffer + // The response buffer is called w, as in writer. That is intentional; + // all ionet variables are named from the server's perspective, and the + // server writes into the response buffer. See the Conn documentation. + r := bytes.NewBufferString("GET / HTTP/1.1\r\nConnection: close\r\n\r\n") + w := new(bytes.Buffer) + conn, err := l.Dial(r, w) + if err != nil { + fmt.Printf("Dial error: %v\n", err) + return + } + + // Wait for the connection to close + conn.Wait() + + // Parse the response (stored in w) + buf := bufio.NewReader(w) + resp, err := http.ReadResponse(buf, new(http.Request)) + if err != nil { + fmt.Printf("Response parse error: %v\n", err) + return + } + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + fmt.Printf("Response read error: %v\n", err) + return + } + resp.Body.Close() + + // Display the response + fmt.Println(string(body)) + + // Output: + // Hello! +} diff --git a/vendor/github.com/paypal/ionet/ionet_test.go b/vendor/github.com/paypal/ionet/ionet_test.go new file mode 100644 index 0000000000..1f924cc8a6 --- /dev/null +++ b/vendor/github.com/paypal/ionet/ionet_test.go @@ -0,0 +1,354 @@ +package ionet + +import ( + "bytes" + "fmt" + "net" + "sync" + "testing" + "time" +) + +// timedWaitGroup extends sync.WaitGroup with an option +// to wait for a limited time. +type timedWaitGroup struct{ sync.WaitGroup } + +// WaitFor blocks until the WaitGroup counter is zero or duration d +// has elapsed. It returns true iff it was unblocked due to the counter +// dropping to zero. +func (t *timedWaitGroup) WaitFor(d time.Duration) (completed bool) { + timeout := time.After(d) + success := make(chan struct{}) + go func() { + t.WaitGroup.Wait() + close(success) + }() + select { + case <-success: + return true + case <-timeout: + return false + } +} + +func TestConnBasics(t *testing.T) { + in := []byte("you can't read from a closed connection (but you can be happy, if you've a mind to)") + r := bytes.NewBuffer(in) + w := new(bytes.Buffer) + c := &Conn{R: r, W: w} + b := make([]byte, len(in)) + + if c.LocalAddr() == nil || c.RemoteAddr() == nil { + t.Errorf("conn should have a local and remote addr") + } + + n, err := c.Read(b) + if n == 0 || err != nil { + t.Errorf("conn read failed with err %v", err) + } + if !bytes.Equal(b, in) { + t.Errorf("read incorrect bytes, want %q got %q", in, b) + } + + n, err = c.Write(b) + if n == 0 || err != nil { + t.Errorf("conn write failed with err %v", err) + } + if !bytes.Equal(w.Bytes(), in) { + t.Errorf("wrote incorrect bytes, want %q got %q", w.Bytes(), in) + } + + err = c.Close() + if n == 0 || err != nil { + t.Errorf("close err %v", err) + } + n, err = c.Read(b) + if n != 0 || err == nil { + t.Errorf("conn read should fail after close, instead read %d bytes", n) + } + n, err = c.Write(b) + if n != 0 || err == nil { + t.Errorf("conn write should fail after close, instead read %d bytes", n) + } +} + +func TestConnDoubleClose(t *testing.T) { + c := &Conn{} + err := c.Close() + if err != nil { + t.Errorf("conn close should succeed the first time, got error: %s", err) + } + err = c.Close() + if err == nil { + t.Errorf("conn close should fail with an error the second time") + } else if err := err.(*net.OpError); err.Temporary() { + t.Errorf("conn close should fail with a permanent error the second time") + } +} + +func TestConnZero(t *testing.T) { + c := new(Conn) + b := []byte{0x00} + n, err := c.Read(b) + if n != 0 || err == nil { + t.Errorf("zero conn reads should return 0, wrapped EOF, got %d, %v", n, err) + } + n, err = c.Write(b) + if n != len(b) || err != nil { + t.Errorf("zero conn writes should succeed after writing %d bytes, wrote %d with err %v", len(b), n, err) + } + err = c.Close() + if err != nil { + t.Errorf("zero conn should close without error, got %v", err) + } +} + +// blockrw is an io.ReadWriter that blocks forever on reads and writes. +type blockrw struct{} + +func (*blockrw) Read(b []byte) (int, error) { select {} } +func (*blockrw) Write(b []byte) (int, error) { select {} } + +func TestConnCloseUnblocksReadWrites(t *testing.T) { + c := &Conn{R: &blockrw{}, W: &blockrw{}} + b := []byte{} + var w timedWaitGroup + w.Add(2) + go func() { + c.Read(b) + w.Done() + }() + go func() { + c.Write(b) + w.Done() + }() + time.Sleep(time.Millisecond * 5) // ensure that the read/write has started + c.Close() + c.Wait() + if !w.WaitFor(time.Millisecond * 10) { + t.Errorf("conn close did not unblock read/write") + } +} + +func TestConnTimeouts(t *testing.T) { + c := &Conn{R: &blockrw{}, W: &blockrw{}} + c.SetDeadline(time.Now().Add(time.Millisecond * 10)) + b := []byte{} + var w timedWaitGroup + w.Add(2) + go func() { + c.Read(b) + w.Done() + }() + go func() { + c.Write(b) + w.Done() + }() + if !w.WaitFor(time.Millisecond * 50) { + t.Errorf("conn timeout did not unblock read/write (deadline after 10ms, waited 50ms)") + } +} + +// errorrw is an io.ReadWriter that fails tests when read from or written to. +type errorrw struct { + t *testing.T + msg string +} + +func (e *errorrw) Read(b []byte) (int, error) { + e.t.Errorf("unexpected read: %v", e.msg) + return 0, fmt.Errorf("errorrw does not read") +} + +func (e *errorrw) Write(b []byte) (int, error) { + e.t.Errorf("unexpected write: %v", e.msg) + return 0, fmt.Errorf("errorrw does not write") +} + +func TestConnNoReadWriteAfterClose(t *testing.T) { + erw := &errorrw{t: t, msg: "no reads or writes after connection close"} + b := []byte{} + c := &Conn{R: erw, W: erw} + c.Close() + c.Read(b) + c.Write(b) +} + +func TestConnNoReadWriteAfterTimeout(t *testing.T) { + erw := &errorrw{t: t, msg: "no reads or writes after timeout"} + b := []byte{} + c := &Conn{R: erw, W: erw} + c.SetDeadline(time.Now().Add(-time.Second)) + _, err := c.Read(b) + if err := err.(*net.OpError); !err.Timeout() { + t.Errorf("read timeout errors should be marked as timeouts") + } + _, err = c.Write(b) + if err := err.(*net.OpError); !err.Timeout() { + t.Errorf("write timeout errors should be marked as timeouts") + } +} + +// seqrw is an io.ReadWriter that does not handle concurrency. +// It panics when multiple concurrent reads/writes occur. +// And it reads and writes slowly, to provide ample opportunity +// for concurrent access. +type seqrw struct { + reading bool + writing bool + t *testing.T + d time.Duration +} + +func (s *seqrw) Read(b []byte) (int, error) { + if s.reading { + s.t.Errorf("non-sequential read") + } + s.reading = true + time.Sleep(s.d) + s.reading = false + return 0, fmt.Errorf("seqrw does not read") +} + +func (s *seqrw) Write(b []byte) (int, error) { + if s.writing { + s.t.Errorf("non-sequential write") + } + s.writing = true + time.Sleep(s.d) + s.writing = false + return 0, fmt.Errorf("seqrw does not write") +} + +func TestConnEnforcesSequentialReadWrites(t *testing.T) { + s := &seqrw{t: t, d: time.Millisecond * 10} + b := []byte{0x00} + c := &Conn{R: s, W: s} + var w sync.WaitGroup + for i := 0; i < 2; i++ { + w.Add(1) + go func() { + c.Read(b) + w.Done() + }() + w.Add(1) + go func() { + c.Write(b) + w.Done() + }() + } + w.Wait() +} + +func TestListenerAcceptsWhenDialed(t *testing.T) { + l := Listener{} + timeout := time.After(time.Millisecond * 10) + successc := make(chan struct{}, 1) + go func() { + conn, err := l.Accept() + if conn == nil || err != nil { + t.Errorf("accept error: %v", err) + } + successc <- struct{}{} + }() + go func() { + l.Dial(nil, nil) + }() + select { + case <-successc: + case <-timeout: + t.Errorf("listener was dialed but failed to accept") + } +} + +func TestListenerAcceptManyConcurrently(t *testing.T) { + l := Listener{} + n := 10 + wg := sync.WaitGroup{} + timeout := time.After(time.Millisecond * 25) // enough time for 10 concurrent 10ms reads/writes, but not enough to 10 serial ones + successc := make(chan struct{}, 1) + // Fire off a bunch of accepts + for i := 0; i < n; i++ { + wg.Add(1) + go func() { + conn, err := l.Accept() + if conn == nil || err != nil { + t.Errorf("accept error: %v", err) + } + b := []byte{0x00} + conn.Read(b) + conn.Write(b) + wg.Done() + }() + } + // Fire off a bunch of dials with slow read/writers + for i := 0; i < n; i++ { + s := &seqrw{t: t, d: time.Millisecond * 5} + go func() { + l.Dial(s, s) + }() + } + // Wait for all reads and writes to complete; + // this will happen in time iff they are concurrent. + go func() { + wg.Wait() + successc <- struct{}{} + }() + select { + case <-successc: + case <-timeout: + t.Errorf("listener did not accept concurrently (too slow)") + } +} + +func TestListenerDialFailsWhenClosed(t *testing.T) { + l := Listener{} + l.Close() + conn, err := l.Dial(nil, nil) + if conn != nil || err == nil { + t.Errorf("dialing a listener should fail with an error when the listener is closed") + } +} + +func TestListenerCloseUnblocksAccept(t *testing.T) { + l := Listener{} + var w timedWaitGroup + w.Add(1) + go func() { + l.Accept() + w.Done() + }() + l.Close() + if !w.WaitFor(time.Millisecond * 10) { + t.Errorf("listener close did not unblock accept") + } +} + +func TestListenerDoubleClose(t *testing.T) { + l := &Listener{} + err := l.Close() + if err != nil { + t.Errorf("listener close should succeed the first time, got error: %s", err) + } + err = l.Close() + if err == nil { + t.Errorf("listener close should fail with an error the second time") + } else if err := err.(*net.OpError); err.Temporary() { + t.Errorf("listener close should fail with a permanent error the second time") + } +} + +// When you're at 9x% test coverage, and the remaining lines are trivial, it's so +// very, very hard to resist the temptation to push to 100%. :) +func TestNothingAtAllImportant(t *testing.T) { + a := addr("addr") + if a.Network() == "" || a.String() == "" { + t.Errorf("addrs should have a non-empty network and string description") + } + + s := "dummy err" + e := neterr{err: fmt.Errorf(s)} + if e.Error() != s { + t.Errorf("err did not return the wrapped error") + } +} diff --git a/vendor/manifest b/vendor/manifest index 5bd3479367..107c66eb23 100644 --- a/vendor/manifest +++ b/vendor/manifest @@ -760,6 +760,12 @@ "branch": "master", "path": "/libcontainer/utils" }, + { + "importpath": "github.com/paypal/ionet", + "repository": "https://github.com/paypal/ionet", + "revision": "ed0aaebc541736bab972353125af442dcf829af2", + "branch": "master" + }, { "importpath": "github.com/pborman/uuid", "repository": "https://github.com/pborman/uuid", From 7632e0b3c5f4e06321228b235efdea72dcd6ec81 Mon Sep 17 00:00:00 2001 From: Paul Bellamy Date: Fri, 4 Mar 2016 17:40:48 +0000 Subject: [PATCH 02/16] Adding support for plugins, with basic example of iowait, and ebpf Squash of: * Include plugins in the report * show plugin list in the UI * moving metric and metadata templates into the probe reports * update js for prime -> priority * added retry to plugin handshake * added iowait plugin * review feedback * plugin documentation --- app/api_topology.go | 4 +- app/router.go | 21 +- client/app/scripts/actions/app-actions.js | 3 +- client/app/scripts/components/app.js | 3 + .../node-details/node-details-info.js | 2 +- client/app/scripts/components/plugins.js | 22 ++ client/app/scripts/stores/app-store.js | 7 + common/xfer/constants.go | 7 +- common/xfer/plugin_spec.go | 223 ++++++++++++++ common/xfer/plugin_spec_internal_test.go | 225 ++++++++++++++ examples/plugins/README.md | 57 ++++ examples/plugins/iowait/.gitignore | 1 + examples/plugins/iowait/Dockerfile | 6 + examples/plugins/iowait/Makefile | 19 ++ examples/plugins/iowait/main.go | 138 +++++++++ experimental/graphviz/render.go | 2 +- probe/docker/reporter.go | 31 +- probe/host/reporter.go | 20 ++ probe/kubernetes/reporter.go | 24 +- probe/overlay/weave.go | 4 + probe/plugins/registry.go | 265 ++++++++++++++++ probe/plugins/registry_internal_test.go | 285 ++++++++++++++++++ probe/plugins/reporter.go | 25 ++ probe/plugins/unix_round_tripper.go | 27 ++ probe/probe.go | 13 + probe/process/reporter.go | 20 +- prog/main.go | 5 +- prog/probe.go | 19 ++ render/detailed/connections.go | 18 +- render/detailed/docker_labels.go | 6 +- render/detailed/docker_labels_test.go | 4 +- render/detailed/metadata.go | 179 +---------- render/detailed/metadata_test.go | 20 +- render/detailed/metrics.go | 167 +--------- render/detailed/metrics_test.go | 115 ++++--- render/detailed/node.go | 15 +- render/detailed/node_test.go | 92 +++--- render/detailed/summary.go | 38 +-- render/detailed/summary_test.go | 39 ++- render/mapping.go | 10 +- report/counters.go | 4 +- report/metadata_template.go | 162 ++++++++++ report/metric_row.go | 130 ++++++++ report/metric_template.go | 86 ++++++ report/metrics.go | 6 + report/report.go | 10 + report/topology.go | 40 ++- scope | 1 + test/fixture/report_fixture.go | 8 + test/fs/fs.go | 77 ++++- 50 files changed, 2168 insertions(+), 537 deletions(-) create mode 100644 client/app/scripts/components/plugins.js create mode 100644 common/xfer/plugin_spec.go create mode 100644 common/xfer/plugin_spec_internal_test.go create mode 100644 examples/plugins/README.md create mode 100644 examples/plugins/iowait/.gitignore create mode 100644 examples/plugins/iowait/Dockerfile create mode 100644 examples/plugins/iowait/Makefile create mode 100644 examples/plugins/iowait/main.go create mode 100644 probe/plugins/registry.go create mode 100644 probe/plugins/registry_internal_test.go create mode 100644 probe/plugins/reporter.go create mode 100644 probe/plugins/unix_round_tripper.go create mode 100644 report/metadata_template.go create mode 100644 report/metric_row.go create mode 100644 report/metric_template.go diff --git a/app/api_topology.go b/app/api_topology.go index e05b385b2d..57796d90f8 100644 --- a/app/api_topology.go +++ b/app/api_topology.go @@ -35,7 +35,7 @@ func handleTopology(ctx context.Context, rep Reporter, renderer render.Renderer, return } respondWith(w, http.StatusOK, APITopology{ - Nodes: detailed.Summaries(renderer.Render(report)), + Nodes: detailed.Summaries(report, renderer.Render(report)), }) } @@ -119,7 +119,7 @@ func handleWebsocket( log.Errorf("Error generating report: %v", err) return } - newTopo := detailed.Summaries(renderer.Render(report)) + newTopo := detailed.Summaries(report, renderer.Render(report)) diff := detailed.TopoDiff(previousTopo, newTopo) previousTopo = newTopo diff --git a/app/router.go b/app/router.go index 0f9f197804..260a08a55e 100644 --- a/app/router.go +++ b/app/router.go @@ -84,7 +84,7 @@ func gzipHandler(h http.HandlerFunc) http.HandlerFunc { func RegisterTopologyRoutes(router *mux.Router, r Reporter) { get := router.Methods("GET").Subrouter() get.HandleFunc("/api", - gzipHandler(requestContextDecorator(apiHandler))) + gzipHandler(requestContextDecorator(apiHandler(r)))) get.HandleFunc("/api/topology", gzipHandler(requestContextDecorator(topologyRegistry.makeTopologyList(r)))) get.HandleFunc("/api/topology/{topology}", @@ -130,10 +130,17 @@ func RegisterReportPostHandler(a Adder, router *mux.Router) { })) } -func apiHandler(_ context.Context, w http.ResponseWriter, r *http.Request) { - respondWith(w, http.StatusOK, xfer.Details{ - ID: UniqueID, - Version: Version, - Hostname: hostname.Get(), - }) +func apiHandler(rep Reporter) CtxHandlerFunc { + return func(ctx context.Context, w http.ResponseWriter, r *http.Request) { + report, err := rep.Report(ctx) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + } + respondWith(w, http.StatusOK, xfer.Details{ + ID: UniqueID, + Version: Version, + Hostname: hostname.Get(), + Plugins: report.Plugins, + }) + } } diff --git a/client/app/scripts/actions/app-actions.js b/client/app/scripts/actions/app-actions.js index 61327d4bb8..2094e257ba 100644 --- a/client/app/scripts/actions/app-actions.js +++ b/client/app/scripts/actions/app-actions.js @@ -317,7 +317,8 @@ export function receiveApiDetails(apiDetails) { AppDispatcher.dispatch({ type: ActionTypes.RECEIVE_API_DETAILS, hostname: apiDetails.hostname, - version: apiDetails.version + version: apiDetails.version, + plugins: apiDetails.plugins }); } diff --git a/client/app/scripts/components/app.js b/client/app/scripts/components/app.js index ab9babe273..198d764415 100644 --- a/client/app/scripts/components/app.js +++ b/client/app/scripts/components/app.js @@ -11,6 +11,7 @@ import HelpPanel from './help-panel'; import Status from './status.js'; import Topologies from './topologies.js'; import TopologyOptions from './topology-options.js'; +import Plugins from './plugins.js'; import { getApiDetails, getTopologies } from '../utils/web-api-utils'; import { pinNextMetric, hitEsc, unpinMetric, selectMetric, toggleHelp } from '../actions/app-actions'; @@ -53,6 +54,7 @@ function getStateFromStores() { updatePaused: AppStore.isUpdatePaused(), updatePausedAt: AppStore.getUpdatePausedAt(), version: AppStore.getVersion(), + plugins: AppStore.getPlugins(), websocketClosed: AppStore.isWebsocketClosed() }; } @@ -178,6 +180,7 @@ export default class App extends React.Component { +