Skip to content

Commit

Permalink
Merge pull request #485 from weaveworks/463-dedupe-probe-posts
Browse files Browse the repository at this point in the history
Dedupe probe POSTs
  • Loading branch information
peterbourgon committed Sep 25, 2015
2 parents fa4cbf4 + 8602132 commit b323388
Show file tree
Hide file tree
Showing 19 changed files with 599 additions and 376 deletions.
13 changes: 9 additions & 4 deletions app/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,13 @@ import (
"github.com/weaveworks/scope/xfer"
)

// Set during buildtime.
var version = "dev"
var (
// Set at buildtime.
version = "dev"

// Set at runtime.
uniqueID = "0"
)

func main() {
var (
Expand All @@ -33,8 +38,8 @@ func main() {
}

rand.Seed(time.Now().UnixNano())
id := strconv.FormatInt(rand.Int63(), 16)
log.Printf("app starting, version %s, ID %s", version, id)
uniqueID = strconv.FormatInt(rand.Int63(), 16)
log.Printf("app starting, version %s, ID %s", version, uniqueID)

c := xfer.NewCollector(*window)
http.Handle("/", Router(c))
Expand Down
3 changes: 2 additions & 1 deletion app/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,11 +121,12 @@ func captureTopology(rep xfer.Reporter, f func(xfer.Reporter, topologyView, http

// APIDetails are some generic details that can be fetched from /api
type APIDetails struct {
ID string `json:"id"`
Version string `json:"version"`
}

func apiHandler(w http.ResponseWriter, r *http.Request) {
respondWith(w, http.StatusOK, APIDetails{Version: version})
respondWith(w, http.StatusOK, APIDetails{ID: uniqueID, Version: version})
}

// Topology option labels should tell the current state. The first item must
Expand Down
39 changes: 39 additions & 0 deletions common/sanitize/sanitize.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package sanitize

import (
"fmt"
"log"
"net"
"net/url"
"strings"
)

// URL returns a function that sanitizes a URL string. It lets underspecified
// strings to be converted to usable URLs via some default arguments.
func URL(scheme string, port int, path string) func(string) string {
if scheme == "" {
scheme = "http://"
}
return func(s string) string {
if s == "" {
return s // can't do much here
}
if !strings.HasPrefix(s, "http") {
s = scheme + s
}
u, err := url.Parse(s)
if err != nil {
log.Printf("%q: %v", s, err)
return s // oh well
}
if port > 0 {
if _, _, err = net.SplitHostPort(u.Host); err != nil {
u.Host += fmt.Sprintf(":%d", port)
}
}
if path != "" && u.Path != path {
u.Path = path
}
return u.String()
}
}
34 changes: 34 additions & 0 deletions common/sanitize/sanitize_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package sanitize_test

import (
"testing"

"github.com/weaveworks/scope/common/sanitize"
)

func TestSanitizeURL(t *testing.T) {
for _, input := range []struct {
scheme string
port int
path string
input string
want string
}{
{"", 0, "", "", ""},
{"", 0, "", "foo", "http://foo"},
{"", 80, "", "foo", "http://foo:80"},
{"", 0, "some/path", "foo", "http://foo/some/path"},
{"", 0, "/some/path", "foo", "http://foo/some/path"},
{"https://", 0, "", "foo", "https://foo"},
{"https://", 80, "", "foo", "https://foo:80"},
{"https://", 0, "some/path", "foo", "https://foo/some/path"},
{"https://", 0, "", "http://foo", "http://foo"}, // specified scheme beats default...
{"", 9999, "", "foo:80", "http://foo:80"}, // specified port beats default...
{"", 0, "/bar", "foo/baz", "http://foo/bar"}, // ...but default path beats specified!
} {
if want, have := input.want, sanitize.URL(input.scheme, input.port, input.path)(input.input); want != have {
t.Errorf("sanitize.URL(%q, %d, %q)(%q): want %q, have %q", input.scheme, input.port, input.path, input.input, want, have)
continue
}
}
}
2 changes: 1 addition & 1 deletion experimental/demoprobe/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func main() {
)
flag.Parse()

publisher, err := xfer.NewHTTPPublisher(*publish, "demoprobe", "demoprobe")
_, publisher, err := xfer.NewHTTPPublisher(*publish, "demoprobe", "demoprobe")
if err != nil {
log.Fatal(err)
}
Expand Down
2 changes: 1 addition & 1 deletion experimental/fixprobe/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func main() {
}
f.Close()

publisher, err := xfer.NewHTTPPublisher(*publish, "fixprobe", "fixprobe")
_, publisher, err := xfer.NewHTTPPublisher(*publish, "fixprobe", "fixprobe")
if err != nil {
log.Fatal(err)
}
Expand Down
17 changes: 7 additions & 10 deletions probe/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,16 +80,16 @@ func main() {
log.Printf("warning: process reporting enabled, but that requires root to find everything")
}

publisherFactory := func(target string) (xfer.Publisher, error) {
publisher, err := xfer.NewHTTPPublisher(target, *token, probeID)
factory := func(endpoint string) (string, xfer.Publisher, error) {
id, publisher, err := xfer.NewHTTPPublisher(endpoint, *token, probeID)
if err != nil {
return nil, err
return "", nil, err
}
return xfer.NewBackgroundPublisher(publisher), nil
return id, xfer.NewBackgroundPublisher(publisher), nil
}
publishers := xfer.NewMultiPublisher(publisherFactory)
publishers := xfer.NewMultiPublisher(factory)
defer publishers.Stop()
resolver := newStaticResolver(targets, publishers.Add)
resolver := newStaticResolver(targets, publishers.Set)
defer resolver.Stop()

addrs, err := net.InterfaceAddrs()
Expand Down Expand Up @@ -133,10 +133,7 @@ func main() {
}

if *weaveRouterAddr != "" {
weave, err := overlay.NewWeave(hostID, *weaveRouterAddr)
if err != nil {
log.Fatalf("failed to start Weave tagger: %v", err)
}
weave := overlay.NewWeave(hostID, *weaveRouterAddr)
tickers = append(tickers, weave)
taggers = append(taggers, weave)
reporters = append(reporters, weave)
Expand Down
35 changes: 4 additions & 31 deletions probe/overlay/weave.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,13 @@ import (
"encoding/json"
"fmt"
"log"
"net"
"net/http"
"net/url"
"regexp"
"strings"
"sync"

"github.com/weaveworks/scope/common/exec"
"github.com/weaveworks/scope/common/sanitize"
"github.com/weaveworks/scope/probe/docker"
"github.com/weaveworks/scope/report"
)
Expand Down Expand Up @@ -68,15 +67,11 @@ type weaveStatus struct {

// NewWeave returns a new Weave tagger based on the Weave router at
// address. The address should be an IP or FQDN, no port.
func NewWeave(hostID, weaveRouterAddress string) (*Weave, error) {
s, err := sanitize("http://", 6784, "/report")(weaveRouterAddress)
if err != nil {
return nil, err
}
func NewWeave(hostID, weaveRouterAddress string) *Weave {
return &Weave{
url: s,
url: sanitize.URL("http://", 6784, "/report")(weaveRouterAddress),
hostID: hostID,
}, nil
}
}

// Tick implements Ticker
Expand Down Expand Up @@ -202,25 +197,3 @@ func (w *Weave) Report() (report.Report, error) {
}
return r, nil
}

func sanitize(scheme string, port int, path string) func(string) (string, error) {
return func(s string) (string, error) {
if s == "" {
return "", fmt.Errorf("no host")
}
if !strings.HasPrefix(s, "http") {
s = scheme + s
}
u, err := url.Parse(s)
if err != nil {
return "", err
}
if _, _, err = net.SplitHostPort(u.Host); err != nil {
u.Host += fmt.Sprintf(":%d", port)
}
if u.Path != path {
u.Path = path
}
return u.String(), nil
}
}
6 changes: 1 addition & 5 deletions probe/overlay/weave_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,7 @@ func TestWeaveTaggerOverlayTopology(t *testing.T) {
s := httptest.NewServer(http.HandlerFunc(mockWeaveRouter))
defer s.Close()

w, err := overlay.NewWeave(mockHostID, s.URL)
if err != nil {
t.Fatal(err)
}

w := overlay.NewWeave(mockHostID, s.URL)
w.Tick()

{
Expand Down
122 changes: 63 additions & 59 deletions probe/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,92 +16,96 @@ var (
)

type staticResolver struct {
quit chan struct{}
add func(string)
peers []peer
set func(string, []string)
targets []target
quit chan struct{}
}

type peer struct {
hostname string
port string
}
type target struct{ host, port string }

func (t target) String() string { return net.JoinHostPort(t.host, t.port) }

// NewResolver starts a new resolver that periodically
// tries to resolve peers and the calls add() with all the
// resolved IPs. It explictiy supports hostnames which
// resolve to multiple IPs; it will repeatedly call
// add with the same IP, expecting the target to dedupe.
func newStaticResolver(peers []string, add func(string)) staticResolver {
// newStaticResolver periodically resolves the targets, and calls the set
// function with all the resolved IPs. It explictiy supports targets which
// resolve to multiple IPs.
func newStaticResolver(targets []string, set func(target string, endpoints []string)) staticResolver {
r := staticResolver{
quit: make(chan struct{}),
add: add,
peers: prepareNames(peers),
targets: prepare(targets),
set: set,
quit: make(chan struct{}),
}
go r.loop()
return r
}

func prepareNames(strs []string) []peer {
var results []peer
for _, s := range strs {
var (
hostname string
port string
)

if strings.Contains(s, ":") {
var err error
hostname, port, err = net.SplitHostPort(s)
if err != nil {
log.Printf("invalid address %s: %v", s, err)
continue
}
} else {
hostname, port = s, strconv.Itoa(xfer.AppPort)
}

results = append(results, peer{hostname, port})
}
return results
}

func (r staticResolver) loop() {
r.resolveHosts()
r.resolve()
t := tick(time.Minute)
for {
select {
case <-t:
r.resolveHosts()

r.resolve()
case <-r.quit:
return
}
}
}

func (r staticResolver) resolveHosts() {
for _, peer := range r.peers {
var addrs []net.IP
if addr := net.ParseIP(peer.hostname); addr != nil {
addrs = []net.IP{addr}
} else {
func (r staticResolver) Stop() {
close(r.quit)
}

func prepare(strs []string) []target {
var targets []target
for _, s := range strs {
var host, port string
if strings.Contains(s, ":") {
var err error
addrs, err = lookupIP(peer.hostname)
host, port, err = net.SplitHostPort(s)
if err != nil {
log.Printf("invalid address %s: %v", s, err)
continue
}
} else {
host, port = s, strconv.Itoa(xfer.AppPort)
}
targets = append(targets, target{host, port})
}
return targets
}

for _, addr := range addrs {
// For now, ignore IPv6
if addr.To4() == nil {
continue
}
r.add(net.JoinHostPort(addr.String(), peer.port))
}
func (r staticResolver) resolve() {
for t, endpoints := range resolveMany(r.targets) {
r.set(t.String(), endpoints)
}
}

func (r staticResolver) Stop() {
close(r.quit)
func resolveMany(targets []target) map[target][]string {
result := map[target][]string{}
for _, t := range targets {
result[t] = resolveOne(t)
}
return result
}

func resolveOne(t target) []string {
var addrs []net.IP
if addr := net.ParseIP(t.host); addr != nil {
addrs = []net.IP{addr}
} else {
var err error
addrs, err = lookupIP(t.host)
if err != nil {
return []string{}
}
}
endpoints := make([]string, 0, len(addrs))
for _, addr := range addrs {
// For now, ignore IPv6
if addr.To4() == nil {
continue
}
endpoints = append(endpoints, net.JoinHostPort(addr.String(), t.port))
}
return endpoints
}
Loading

0 comments on commit b323388

Please sign in to comment.