Skip to content

Commit

Permalink
xfer: move Buffer to own file; update comment
Browse files Browse the repository at this point in the history
overlay: mutex for Weave status
  • Loading branch information
peterbourgon committed Sep 11, 2015
1 parent a8c163b commit ff3aae2
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 45 deletions.
17 changes: 14 additions & 3 deletions probe/overlay/weave.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"net/url"
"regexp"
"strings"
"sync"

"github.com/weaveworks/scope/common/exec"
"github.com/weaveworks/scope/probe/docker"
Expand Down Expand Up @@ -43,6 +44,8 @@ var ipMatch = regexp.MustCompile(`([0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3
type Weave struct {
url string
hostID string

mtx sync.RWMutex
status weaveStatus
}

Expand Down Expand Up @@ -78,7 +81,6 @@ func NewWeave(hostID, weaveRouterAddress string) (*Weave, error) {

// Tick implements Ticker
func (w *Weave) Tick() error {
var result weaveStatus
req, err := http.NewRequest("GET", w.url, nil)
if err != nil {
return err
Expand All @@ -94,10 +96,13 @@ func (w *Weave) Tick() error {
return fmt.Errorf("Weave Tagger: got %d", resp.StatusCode)
}

var result weaveStatus
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return err
}

w.mtx.Lock()
defer w.mtx.Unlock()
w.status = result
return nil
}
Expand Down Expand Up @@ -140,7 +145,10 @@ func (w Weave) ps() ([]psEntry, error) {
}

// Tag implements Tagger.
func (w Weave) Tag(r report.Report) (report.Report, error) {
func (w *Weave) Tag(r report.Report) (report.Report, error) {
w.mtx.RLock()
defer w.mtx.RUnlock()

// Put information from weaveDNS on the container nodes
for _, entry := range w.status.DNS.Entries {
if entry.Tombstone > 0 {
Expand Down Expand Up @@ -181,7 +189,10 @@ func (w Weave) Tag(r report.Report) (report.Report, error) {
}

// Report implements Reporter.
func (w Weave) Report() (report.Report, error) {
func (w *Weave) Report() (report.Report, error) {
w.mtx.RLock()
defer w.mtx.RUnlock()

r := report.MakeReport()
for _, peer := range w.status.Router.Peers {
r.Overlay.Nodes[report.MakeOverlayNodeID(peer.Name)] = report.MakeNodeWith(map[string]string{
Expand Down
46 changes: 46 additions & 0 deletions xfer/buffer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package xfer

import (
"bytes"
"sync"
"sync/atomic"
)

// A Buffer is a reference counted bytes.Buffer, which belongs
// to a sync.Pool
type Buffer struct {
bytes.Buffer
pool *sync.Pool
refs int32
}

// NewBuffer creates a new buffer
func NewBuffer(pool *sync.Pool) *Buffer {
return &Buffer{
pool: pool,
refs: 0,
}
}

// Get increases the reference count. It is safe for concurrent calls.
func (b *Buffer) Get() {
atomic.AddInt32(&b.refs, 1)
}

// Put decreases the reference count, and when it hits zero, puts the
// buffer back in the pool.
func (b *Buffer) Put() {
if atomic.AddInt32(&b.refs, -1) == 0 {
b.Reset()
b.pool.Put(b)
}
}

// NewBufferPool creates a new buffer pool.
func NewBufferPool() *sync.Pool {
result := &sync.Pool{}
result.New = func() interface{} {
return NewBuffer(result)
}
return result
}
3 changes: 2 additions & 1 deletion xfer/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ const (
maxBackoff = 60 * time.Second
)

// Publisher is something which can send a report to a remote collector.
// Publisher is something which can send a buffered set of data somewhere,
// probably to a collector.
type Publisher interface {
Publish(*Buffer) error
Stop()
Expand Down
41 changes: 0 additions & 41 deletions xfer/report_publisher.go
Original file line number Diff line number Diff line change
@@ -1,54 +1,13 @@
package xfer

import (
"bytes"
"compress/gzip"
"encoding/gob"
"sync"
"sync/atomic"

"github.com/weaveworks/scope/report"
)

// A Buffer is a reference counted bytes.Buffer, which belongs
// to a sync.Pool
type Buffer struct {
bytes.Buffer
pool *sync.Pool
refs int32
}

// NewBuffer creates a new buffer
func NewBuffer(pool *sync.Pool) *Buffer {
return &Buffer{
pool: pool,
refs: 0,
}
}

// Get increases the reference count. It is safe for concurrent calls.
func (b *Buffer) Get() {
atomic.AddInt32(&b.refs, 1)
}

// Put decreases the reference count, and when it hits zero, puts the
// buffer back in the pool.
func (b *Buffer) Put() {
if atomic.AddInt32(&b.refs, -1) == 0 {
b.Reset()
b.pool.Put(b)
}
}

// NewBufferPool creates a new buffer pool.
func NewBufferPool() *sync.Pool {
result := &sync.Pool{}
result.New = func() interface{} {
return NewBuffer(result)
}
return result
}

// A ReportPublisher uses a buffer pool to serialise reports, which it
// then passes to a publisher
type ReportPublisher struct {
Expand Down

0 comments on commit ff3aae2

Please sign in to comment.