From f23d45274fd4c9aae1017a8ffe0ffd8a78a837cf Mon Sep 17 00:00:00 2001 From: Peter Bourgon Date: Fri, 18 Sep 2015 10:53:29 +0200 Subject: [PATCH] Publish an io.Reader, not a bytes.Buffer --- xfer/background_publisher.go | 16 ++++++++-------- xfer/http_publisher.go | 6 +++--- xfer/http_publisher_test.go | 2 +- xfer/multi_publisher.go | 20 +++++++++++++++----- xfer/multi_publisher_test.go | 5 +++-- xfer/publisher.go | 8 ++++---- 6 files changed, 34 insertions(+), 23 deletions(-) diff --git a/xfer/background_publisher.go b/xfer/background_publisher.go index 726659dec1..cd74d23d3e 100644 --- a/xfer/background_publisher.go +++ b/xfer/background_publisher.go @@ -1,7 +1,7 @@ package xfer import ( - "bytes" + "io" "log" "time" ) @@ -16,7 +16,7 @@ const ( // concurrent publishes are dropped. type BackgroundPublisher struct { publisher Publisher - buffers chan *bytes.Buffer + readers chan io.Reader quit chan struct{} } @@ -24,7 +24,7 @@ type BackgroundPublisher struct { func NewBackgroundPublisher(p Publisher) *BackgroundPublisher { bp := &BackgroundPublisher{ publisher: p, - buffers: make(chan *bytes.Buffer), + readers: make(chan io.Reader), quit: make(chan struct{}), } go bp.loop() @@ -34,8 +34,8 @@ func NewBackgroundPublisher(p Publisher) *BackgroundPublisher { func (bp *BackgroundPublisher) loop() { backoff := initialBackoff - for buf := range bp.buffers { - err := bp.publisher.Publish(buf) + for r := range bp.readers { + err := bp.publisher.Publish(r) if err == nil { backoff = initialBackoff continue @@ -54,9 +54,9 @@ func (bp *BackgroundPublisher) loop() { } // Publish implements Publisher -func (bp *BackgroundPublisher) Publish(buf *bytes.Buffer) error { +func (bp *BackgroundPublisher) Publish(r io.Reader) error { select { - case bp.buffers <- buf: + case bp.readers <- r: default: } return nil @@ -64,7 +64,7 @@ func (bp *BackgroundPublisher) Publish(buf *bytes.Buffer) error { // Stop implements Publisher func (bp *BackgroundPublisher) Stop() { - close(bp.buffers) + close(bp.readers) close(bp.quit) bp.publisher.Stop() } diff --git a/xfer/http_publisher.go b/xfer/http_publisher.go index 87baa922a6..bb9aa3f576 100644 --- a/xfer/http_publisher.go +++ b/xfer/http_publisher.go @@ -1,9 +1,9 @@ package xfer import ( - "bytes" "encoding/json" "fmt" + "io" "net/http" "github.com/weaveworks/scope/common/sanitize" @@ -42,8 +42,8 @@ func (p HTTPPublisher) String() string { } // Publish publishes the report to the URL. -func (p HTTPPublisher) Publish(buf *bytes.Buffer) error { - req, err := http.NewRequest("POST", p.url, buf) +func (p HTTPPublisher) Publish(r io.Reader) error { + req, err := http.NewRequest("POST", p.url, r) if err != nil { return err } diff --git a/xfer/http_publisher_test.go b/xfer/http_publisher_test.go index e0147a7382..d023f44693 100644 --- a/xfer/http_publisher_test.go +++ b/xfer/http_publisher_test.go @@ -27,7 +27,7 @@ func TestHTTPPublisher(t *testing.T) { handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if r.URL.Path == "/api" { - json.NewEncoder(w).Encode(map[string]string{"id": "irrelevant"}) + _ = json.NewEncoder(w).Encode(map[string]string{"id": "irrelevant"}) return } diff --git a/xfer/multi_publisher.go b/xfer/multi_publisher.go index cda0bca32b..88d1455aaa 100644 --- a/xfer/multi_publisher.go +++ b/xfer/multi_publisher.go @@ -3,6 +3,8 @@ package xfer import ( "bytes" "errors" + "io" + "io/ioutil" "log" "strings" "sync" @@ -60,21 +62,29 @@ func (p *MultiPublisher) Delete(target string) { p.list = p.appendFilter([]tuple{}, func(t tuple) bool { return t.target != target }) } -// Publish implements Publisher by publishing the buffer to all of the -// underlying publishers sequentially. But, it will publish to one endpoint -// for each unique ID. Failed publishes don't count. -func (p *MultiPublisher) Publish(buf *bytes.Buffer) error { +// Publish implements Publisher by publishing the reader to all of the +// underlying publishers sequentially. To do that, it needs to drain the +// reader, and recreate new readers for each publisher. Note that it will +// publish to one endpoint for each unique ID. Failed publishes don't count. +func (p *MultiPublisher) Publish(r io.Reader) error { + buf, err := ioutil.ReadAll(r) + if err != nil { + return err + } + var ( ids = map[string]struct{}{} errs = []string{} ) + p.mtx.Lock() defer p.mtx.Unlock() + for _, t := range p.list { if _, ok := ids[t.id]; ok { continue } - if err := t.publisher.Publish(bytes.NewBuffer(buf.Bytes())); err != nil { + if err := t.publisher.Publish(bytes.NewReader(buf)); err != nil { errs = append(errs, err.Error()) continue } diff --git a/xfer/multi_publisher_test.go b/xfer/multi_publisher_test.go index 0ec919bde9..2664340b7d 100644 --- a/xfer/multi_publisher_test.go +++ b/xfer/multi_publisher_test.go @@ -3,6 +3,7 @@ package xfer_test import ( "bytes" "fmt" + "io" "testing" "github.com/weaveworks/scope/xfer" @@ -48,5 +49,5 @@ func TestMultiPublisher(t *testing.T) { type mockPublisher struct{ count int } -func (p *mockPublisher) Publish(*bytes.Buffer) error { p.count++; return nil } -func (p *mockPublisher) Stop() {} +func (p *mockPublisher) Publish(io.Reader) error { p.count++; return nil } +func (p *mockPublisher) Stop() {} diff --git a/xfer/publisher.go b/xfer/publisher.go index 10a7cd960d..7ac56a93eb 100644 --- a/xfer/publisher.go +++ b/xfer/publisher.go @@ -1,10 +1,10 @@ package xfer -import "bytes" +import "io" -// Publisher is something which can send a buffered set of data somewhere, -// probably to a remote collector. +// Publisher is something which can send a stream of data somewhere, probably +// to a remote collector. type Publisher interface { - Publish(*bytes.Buffer) error + Publish(io.Reader) error Stop() }