Skip to content

Commit

Permalink
Add X-Scope-Probe-ID header to POSTs
Browse files Browse the repository at this point in the history
  • Loading branch information
peterbourgon committed Aug 12, 2015
1 parent 514f4bc commit c06cbf2
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 8 deletions.
2 changes: 1 addition & 1 deletion experimental/demoprobe/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func main() {
)
flag.Parse()

publisher, err := xfer.NewHTTPPublisher(*publish, "demoprobe")
publisher, err := xfer.NewHTTPPublisher(*publish, "demoprobe", "demoprobe")
if err != nil {
log.Fatal(err)
}
Expand Down
2 changes: 1 addition & 1 deletion experimental/fixprobe/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func main() {
}
f.Close()

publisher, err := xfer.NewHTTPPublisher(*publish, "fixprobe")
publisher, err := xfer.NewHTTPPublisher(*publish, "fixprobe", "fixprobe")
if err != nil {
log.Fatal(err)
}
Expand Down
11 changes: 7 additions & 4 deletions probe/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,12 @@ func main() {
)
flag.Parse()

log.Printf("probe starting, version %s", version)
var (
hostName = hostname()
hostID = hostName // TODO: we should sanitize the hostname
probeID = hostName // TODO: does this need to be a random string instead?
)
log.Printf("probe starting, version %s, ID %s", version, probeID)

if len(flag.Args()) > 0 {
targets = flag.Args()
Expand All @@ -74,7 +79,7 @@ func main() {
log.Printf("warning: process reporting enabled, but that requires root to find everything")
}

publisherFactory := func(target string) (xfer.Publisher, error) { return xfer.NewHTTPPublisher(target, *token) }
publisherFactory := func(target string) (xfer.Publisher, error) { return xfer.NewHTTPPublisher(target, *token, probeID) }
publishers := xfer.NewMultiPublisher(publisherFactory)
resolver := newStaticResolver(targets, publishers.Add)
defer resolver.Stop()
Expand All @@ -92,8 +97,6 @@ func main() {
}

var (
hostName = hostname()
hostID = hostName // TODO: we should sanitize the hostname
taggers = []Tagger{newTopologyTagger(), host.NewTagger(hostID)}
reporters = []Reporter{host.NewReporter(hostID, hostName, localNets), endpoint.NewReporter(hostID, hostName, *spyProcs)}
processCache *process.CachingWalker
Expand Down
12 changes: 11 additions & 1 deletion xfer/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,18 @@ type Publisher interface {
type HTTPPublisher struct {
url string
token string
id string
}

// ScopeProbeIDHeader is the header we use to carry the probe's unique ID. The
// ID is generated at runtime and has the same lifetime as the probe process.
// It's designed to deduplicate reports from the same probe to the same
// receiver, in case the probe is configured to publish to multiple receivers
// that resolve to the same app.
const ScopeProbeIDHeader = "X-Scope-Probe-ID"

// NewHTTPPublisher returns an HTTPPublisher ready for use.
func NewHTTPPublisher(target, token string) (*HTTPPublisher, error) {
func NewHTTPPublisher(target, token, id string) (*HTTPPublisher, error) {
if !strings.HasPrefix(target, "http") {
target = "http://" + target
}
Expand All @@ -39,6 +47,7 @@ func NewHTTPPublisher(target, token string) (*HTTPPublisher, error) {
return &HTTPPublisher{
url: u.String(),
token: token,
id: id,
}, nil
}

Expand All @@ -53,6 +62,7 @@ func (p HTTPPublisher) Publish(rpt report.Report) error {
return err
}
req.Header.Set("Authorization", AuthorizationHeader(p.token))
req.Header.Set(ScopeProbeIDHeader, p.id)
resp, err := http.DefaultClient.Do(req)
if err != nil {
return err
Expand Down
15 changes: 14 additions & 1 deletion xfer/publisher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"net/http/httptest"
"reflect"
"testing"
"time"

"github.com/weaveworks/scope/report"
"github.com/weaveworks/scope/test"
Expand All @@ -15,13 +16,18 @@ import (
func TestHTTPPublisher(t *testing.T) {
var (
token = "abcdefg"
id = "1234567"
rpt = report.MakeReport()
done = make(chan struct{})
)

s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if want, have := xfer.AuthorizationHeader(token), r.Header.Get("Authorization"); want != have {
t.Errorf("want %q, have %q", want, have)
}
if want, have := id, r.Header.Get(xfer.ScopeProbeIDHeader); want != have {
t.Errorf("want %q, have %q", want, have)
}
var have report.Report
if err := gob.NewDecoder(r.Body).Decode(&have); err != nil {
t.Error(err)
Expand All @@ -32,16 +38,23 @@ func TestHTTPPublisher(t *testing.T) {
return
}
w.WriteHeader(http.StatusOK)
close(done)
}))
defer s.Close()

p, err := xfer.NewHTTPPublisher(s.URL, token)
p, err := xfer.NewHTTPPublisher(s.URL, token, id)
if err != nil {
t.Fatal(err)
}
if err := p.Publish(rpt); err != nil {
t.Error(err)
}

select {
case <-done:
case <-time.After(time.Millisecond):
t.Error("timeout")
}
}

func TestMultiPublisher(t *testing.T) {
Expand Down

0 comments on commit c06cbf2

Please sign in to comment.