Skip to content

Commit

Permalink
Merge pull request #726 from weaveworks/630-merge-appclient
Browse files Browse the repository at this point in the history
Merge http publisher and app client.
  • Loading branch information
tomwilkie committed Dec 7, 2015
2 parents 72cb96b + cb8ae53 commit b857467
Show file tree
Hide file tree
Showing 14 changed files with 389 additions and 540 deletions.
8 changes: 6 additions & 2 deletions experimental/demoprobe/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,15 @@ func main() {
)
flag.Parse()

_, publisher, err := xfer.NewHTTPPublisher(*publish, *publish, "demoprobe", "demoprobe", false)
client, err := xfer.NewAppClient(xfer.ProbeConfig{
Token: "demoprobe",
ProbeID: "demoprobe",
Insecure: false,
}, *publish, *publish)
if err != nil {
log.Fatal(err)
}
rp := xfer.NewReportPublisher(publisher)
rp := xfer.NewReportPublisher(client)

rand.Seed(time.Now().UnixNano())
for range time.Tick(*publishInterval) {
Expand Down
8 changes: 6 additions & 2 deletions experimental/fixprobe/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,16 @@ func main() {
}
f.Close()

_, publisher, err := xfer.NewHTTPPublisher(*publish, *publish, "fixprobe", "fixprobe", false)
client, err := xfer.NewAppClient(xfer.ProbeConfig{
Token: "fixprobe",
ProbeID: "fixprobe",
Insecure: false,
}, *publish, *publish)
if err != nil {
log.Fatal(err)
}

rp := xfer.NewReportPublisher(publisher)
rp := xfer.NewReportPublisher(client)
for range time.Tick(*publishInterval) {
rp.Publish(fixedReport)
}
Expand Down
15 changes: 2 additions & 13 deletions prog/probe/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,32 +101,21 @@ func main() {
}
log.Printf("publishing to: %s", strings.Join(targets, ", "))

factory := func(hostname, endpoint string) (string, xfer.Publisher, error) {
id, publisher, err := xfer.NewHTTPPublisher(hostname, endpoint, *token, probeID, *insecure)
if err != nil {
return "", nil, err
}
return id, xfer.NewBackgroundPublisher(publisher), nil
}

publishers := xfer.NewMultiPublisher(factory)
defer publishers.Stop()

clients := xfer.NewMultiAppClient(xfer.ProbeConfig{
Token: *token,
ProbeID: probeID,
Insecure: *insecure,
}, xfer.ControlHandlerFunc(controls.HandleControlRequest), xfer.NewAppClient)
defer clients.Stop()

resolver := xfer.NewStaticResolver(targets, publishers.Set, clients.Set)
resolver := xfer.NewStaticResolver(targets, clients.Set)
defer resolver.Stop()

endpointReporter := endpoint.NewReporter(hostID, hostName, *spyProcs, *useConntrack)
defer endpointReporter.Stop()

processCache := process.NewCachingWalker(process.NewWalker(*procRoot))
p := probe.New(*spyInterval, *publishInterval, publishers)
p := probe.New(*spyInterval, *publishInterval, clients)
p.AddTicker(processCache)
p.AddReporter(
endpointReporter,
Expand Down
117 changes: 94 additions & 23 deletions xfer/app_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package xfer

import (
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"net/rpc"
Expand All @@ -13,6 +15,11 @@ import (
"github.com/weaveworks/scope/common/sanitize"
)

const (
initialBackoff = 1 * time.Second
maxBackoff = 60 * time.Second
)

// Details are some generic details that can be fetched from /api
type Details struct {
ID string `json:"id"`
Expand All @@ -24,6 +31,7 @@ type Details struct {
type AppClient interface {
Details() (Details, error)
ControlConnection(handler ControlHandler)
Publish(r io.Reader) error
Stop()
}

Expand All @@ -35,6 +43,11 @@ type appClient struct {
insecure bool
client http.Client

// For publish
publishLoop sync.Once
readers chan io.Reader

// For controls
controlServerCodecMtx sync.Mutex
controlServerCodec rpc.ServerCodec
}
Expand All @@ -46,20 +59,24 @@ func NewAppClient(pc ProbeConfig, hostname, target string) (AppClient, error) {
return nil, err
}

return &appClient{
appClient := &appClient{
ProbeConfig: pc,
quit: make(chan struct{}),
readers: make(chan io.Reader),
target: target,
client: http.Client{
Transport: httpTransport,
},
}, nil
}

return appClient, nil
}

// Stop stops the appClient.
func (c *appClient) Stop() {
c.controlServerCodecMtx.Lock()
defer c.controlServerCodecMtx.Unlock()
close(c.readers)
close(c.quit)
if c.controlServerCodec != nil {
c.controlServerCodec.Close()
Expand All @@ -82,6 +99,32 @@ func (c *appClient) Details() (Details, error) {
return result, json.NewDecoder(resp.Body).Decode(&result)
}

func (c *appClient) doWithBackoff(msg string, f func() (bool, error)) {
backoff := initialBackoff

for {
done, err := f()
if done {
return
}
if err == nil {
backoff = initialBackoff
continue
}

log.Printf("Error doing %s for %s, backing off %s: %v", msg, c.target, backoff, err)
select {
case <-time.After(backoff):
case <-c.quit:
return
}
backoff *= 2
if backoff > maxBackoff {
backoff = maxBackoff
}
}
}

func (c *appClient) controlConnection(handler ControlHandler) error {
dialer := websocket.Dialer{}
headers := http.Header{}
Expand Down Expand Up @@ -123,30 +166,58 @@ func (c *appClient) controlConnection(handler ControlHandler) error {
return nil
}

func (c *appClient) controlConnectionLoop(handler ControlHandler) {
defer log.Printf("Control connection to %s exiting", c.target)
backoff := initialBackoff
func (c *appClient) ControlConnection(handler ControlHandler) {
go func() {
log.Printf("Control connection to %s starting", c.target)
defer log.Printf("Control connection to %s exiting", c.target)
c.doWithBackoff("controls", func() (bool, error) {
return false, c.controlConnection(handler)
})
}()
}

for {
err := c.controlConnection(handler)
if err == nil {
backoff = initialBackoff
continue
}
func (c *appClient) publish(r io.Reader) error {
url := sanitize.URL("", 0, "/api/report")(c.target)
req, err := c.ProbeConfig.authorizedRequest("POST", url, r)
if err != nil {
return err
}
req.Header.Set("Content-Encoding", "gzip")
// req.Header.Set("Content-Type", "application/binary") // TODO: we should use http.DetectContentType(..) on the gob'ed

log.Printf("Error doing controls for %s, backing off %s: %v", c.target, backoff, err)
select {
case <-time.After(backoff):
case <-c.quit:
return
}
backoff *= 2
if backoff > maxBackoff {
backoff = maxBackoff
}
resp, err := c.client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return fmt.Errorf(resp.Status)
}
return nil
}

func (c *appClient) ControlConnection(handler ControlHandler) {
go c.controlConnectionLoop(handler)
func (c *appClient) startPublishing() {
go func() {
log.Printf("Publish loop for %s starting", c.target)
defer log.Printf("Publish loop for %s exiting", c.target)
c.doWithBackoff("publish", func() (bool, error) {
r := <-c.readers
if r == nil {
return true, nil
}
return false, c.publish(r)
})
}()
}

// Publish implements Publisher
func (c *appClient) Publish(r io.Reader) error {
// Lazily start the background publishing loop.
c.publishLoop.Do(c.startPublishing)
select {
case c.readers <- r:
default:
}
return nil
}
Loading

0 comments on commit b857467

Please sign in to comment.