Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dedupe probe POSTs #485

Merged
merged 7 commits into from
Sep 25, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions app/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,13 @@ import (
"github.com/weaveworks/scope/xfer"
)

// Set during buildtime.
var version = "dev"
var (
// Set at buildtime.
version = "dev"

// Set at runtime.
uniqueID = "0"
)

func main() {
var (
Expand All @@ -33,8 +38,8 @@ func main() {
}

rand.Seed(time.Now().UnixNano())
id := strconv.FormatInt(rand.Int63(), 16)
log.Printf("app starting, version %s, ID %s", version, id)
uniqueID = strconv.FormatInt(rand.Int63(), 16)
log.Printf("app starting, version %s, ID %s", version, uniqueID)

c := xfer.NewCollector(*window)
http.Handle("/", Router(c))
Expand Down
3 changes: 2 additions & 1 deletion app/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,11 +121,12 @@ func captureTopology(rep xfer.Reporter, f func(xfer.Reporter, topologyView, http

// APIDetails are some generic details that can be fetched from /api
type APIDetails struct {
ID string `json:"id"`
Version string `json:"version"`
}

func apiHandler(w http.ResponseWriter, r *http.Request) {
respondWith(w, http.StatusOK, APIDetails{Version: version})
respondWith(w, http.StatusOK, APIDetails{ID: uniqueID, Version: version})
}

// Topology option labels should tell the current state. The first item must
Expand Down
39 changes: 39 additions & 0 deletions common/sanitize/sanitize.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package sanitize

import (
"fmt"
"log"
"net"
"net/url"
"strings"
)

// URL returns a function that sanitizes a URL string. It lets underspecified
// strings to be converted to usable URLs via some default arguments.
func URL(scheme string, port int, path string) func(string) string {
if scheme == "" {
scheme = "http://"
}
return func(s string) string {
if s == "" {
return s // can't do much here
}
if !strings.HasPrefix(s, "http") {
s = scheme + s
}
u, err := url.Parse(s)
if err != nil {
log.Printf("%q: %v", s, err)
return s // oh well
}
if port > 0 {
if _, _, err = net.SplitHostPort(u.Host); err != nil {
u.Host += fmt.Sprintf(":%d", port)
}
}
if path != "" && u.Path != path {
u.Path = path
}
return u.String()
}
}
34 changes: 34 additions & 0 deletions common/sanitize/sanitize_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package sanitize_test

import (
"testing"

"github.com/weaveworks/scope/common/sanitize"
)

func TestSanitizeURL(t *testing.T) {
for _, input := range []struct {
scheme string
port int
path string
input string
want string
}{
{"", 0, "", "", ""},
{"", 0, "", "foo", "http://foo"},
{"", 80, "", "foo", "http://foo:80"},
{"", 0, "some/path", "foo", "http://foo/some/path"},
{"", 0, "/some/path", "foo", "http://foo/some/path"},
{"https://", 0, "", "foo", "https://foo"},
{"https://", 80, "", "foo", "https://foo:80"},
{"https://", 0, "some/path", "foo", "https://foo/some/path"},
{"https://", 0, "", "http://foo", "http://foo"}, // specified scheme beats default...
{"", 9999, "", "foo:80", "http://foo:80"}, // specified port beats default...
{"", 0, "/bar", "foo/baz", "http://foo/bar"}, // ...but default path beats specified!
} {
if want, have := input.want, sanitize.URL(input.scheme, input.port, input.path)(input.input); want != have {
t.Errorf("sanitize.URL(%q, %d, %q)(%q): want %q, have %q", input.scheme, input.port, input.path, input.input, want, have)
continue
}
}
}
2 changes: 1 addition & 1 deletion experimental/demoprobe/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func main() {
)
flag.Parse()

publisher, err := xfer.NewHTTPPublisher(*publish, "demoprobe", "demoprobe")
_, publisher, err := xfer.NewHTTPPublisher(*publish, "demoprobe", "demoprobe")
if err != nil {
log.Fatal(err)
}
Expand Down
2 changes: 1 addition & 1 deletion experimental/fixprobe/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func main() {
}
f.Close()

publisher, err := xfer.NewHTTPPublisher(*publish, "fixprobe", "fixprobe")
_, publisher, err := xfer.NewHTTPPublisher(*publish, "fixprobe", "fixprobe")
if err != nil {
log.Fatal(err)
}
Expand Down
17 changes: 7 additions & 10 deletions probe/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,16 +80,16 @@ func main() {
log.Printf("warning: process reporting enabled, but that requires root to find everything")
}

publisherFactory := func(target string) (xfer.Publisher, error) {
publisher, err := xfer.NewHTTPPublisher(target, *token, probeID)
factory := func(endpoint string) (string, xfer.Publisher, error) {
id, publisher, err := xfer.NewHTTPPublisher(endpoint, *token, probeID)
if err != nil {
return nil, err
return "", nil, err
}
return xfer.NewBackgroundPublisher(publisher), nil
return id, xfer.NewBackgroundPublisher(publisher), nil
}
publishers := xfer.NewMultiPublisher(publisherFactory)
publishers := xfer.NewMultiPublisher(factory)
defer publishers.Stop()
resolver := newStaticResolver(targets, publishers.Add)
resolver := newStaticResolver(targets, publishers.Set)
defer resolver.Stop()

addrs, err := net.InterfaceAddrs()
Expand Down Expand Up @@ -133,10 +133,7 @@ func main() {
}

if *weaveRouterAddr != "" {
weave, err := overlay.NewWeave(hostID, *weaveRouterAddr)
if err != nil {
log.Fatalf("failed to start Weave tagger: %v", err)
}
weave := overlay.NewWeave(hostID, *weaveRouterAddr)
tickers = append(tickers, weave)
taggers = append(taggers, weave)
reporters = append(reporters, weave)
Expand Down
35 changes: 4 additions & 31 deletions probe/overlay/weave.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,13 @@ import (
"encoding/json"
"fmt"
"log"
"net"
"net/http"
"net/url"
"regexp"
"strings"
"sync"

"github.com/weaveworks/scope/common/exec"
"github.com/weaveworks/scope/common/sanitize"
"github.com/weaveworks/scope/probe/docker"
"github.com/weaveworks/scope/report"
)
Expand Down Expand Up @@ -68,15 +67,11 @@ type weaveStatus struct {

// NewWeave returns a new Weave tagger based on the Weave router at
// address. The address should be an IP or FQDN, no port.
func NewWeave(hostID, weaveRouterAddress string) (*Weave, error) {
s, err := sanitize("http://", 6784, "/report")(weaveRouterAddress)
if err != nil {
return nil, err
}
func NewWeave(hostID, weaveRouterAddress string) *Weave {
return &Weave{
url: s,
url: sanitize.URL("http://", 6784, "/report")(weaveRouterAddress),
hostID: hostID,
}, nil
}
}

// Tick implements Ticker
Expand Down Expand Up @@ -202,25 +197,3 @@ func (w *Weave) Report() (report.Report, error) {
}
return r, nil
}

func sanitize(scheme string, port int, path string) func(string) (string, error) {
return func(s string) (string, error) {
if s == "" {
return "", fmt.Errorf("no host")
}
if !strings.HasPrefix(s, "http") {
s = scheme + s
}
u, err := url.Parse(s)
if err != nil {
return "", err
}
if _, _, err = net.SplitHostPort(u.Host); err != nil {
u.Host += fmt.Sprintf(":%d", port)
}
if u.Path != path {
u.Path = path
}
return u.String(), nil
}
}
6 changes: 1 addition & 5 deletions probe/overlay/weave_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,7 @@ func TestWeaveTaggerOverlayTopology(t *testing.T) {
s := httptest.NewServer(http.HandlerFunc(mockWeaveRouter))
defer s.Close()

w, err := overlay.NewWeave(mockHostID, s.URL)
if err != nil {
t.Fatal(err)
}

w := overlay.NewWeave(mockHostID, s.URL)
w.Tick()

{
Expand Down
122 changes: 63 additions & 59 deletions probe/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,92 +16,96 @@ var (
)

type staticResolver struct {
quit chan struct{}
add func(string)
peers []peer
set func(string, []string)
targets []target
quit chan struct{}
}

type peer struct {
hostname string
port string
}
type target struct{ host, port string }

func (t target) String() string { return net.JoinHostPort(t.host, t.port) }

// NewResolver starts a new resolver that periodically
// tries to resolve peers and the calls add() with all the
// resolved IPs. It explictiy supports hostnames which
// resolve to multiple IPs; it will repeatedly call
// add with the same IP, expecting the target to dedupe.
func newStaticResolver(peers []string, add func(string)) staticResolver {
// newStaticResolver periodically resolves the targets, and calls the set
// function with all the resolved IPs. It explictiy supports targets which
// resolve to multiple IPs.
func newStaticResolver(targets []string, set func(target string, endpoints []string)) staticResolver {
r := staticResolver{
quit: make(chan struct{}),
add: add,
peers: prepareNames(peers),
targets: prepare(targets),
set: set,
quit: make(chan struct{}),
}
go r.loop()
return r
}

func prepareNames(strs []string) []peer {
var results []peer
for _, s := range strs {
var (
hostname string
port string
)

if strings.Contains(s, ":") {
var err error
hostname, port, err = net.SplitHostPort(s)
if err != nil {
log.Printf("invalid address %s: %v", s, err)
continue
}
} else {
hostname, port = s, strconv.Itoa(xfer.AppPort)
}

results = append(results, peer{hostname, port})
}
return results
}

func (r staticResolver) loop() {
r.resolveHosts()
r.resolve()
t := tick(time.Minute)
for {
select {
case <-t:
r.resolveHosts()

r.resolve()
case <-r.quit:
return
}
}
}

func (r staticResolver) resolveHosts() {
for _, peer := range r.peers {
var addrs []net.IP
if addr := net.ParseIP(peer.hostname); addr != nil {
addrs = []net.IP{addr}
} else {
func (r staticResolver) Stop() {
close(r.quit)
}

func prepare(strs []string) []target {
var targets []target
for _, s := range strs {
var host, port string
if strings.Contains(s, ":") {
var err error
addrs, err = lookupIP(peer.hostname)
host, port, err = net.SplitHostPort(s)
if err != nil {
log.Printf("invalid address %s: %v", s, err)
continue
}
} else {
host, port = s, strconv.Itoa(xfer.AppPort)
}
targets = append(targets, target{host, port})
}
return targets
}

for _, addr := range addrs {
// For now, ignore IPv6
if addr.To4() == nil {
continue
}
r.add(net.JoinHostPort(addr.String(), peer.port))
}
func (r staticResolver) resolve() {
for t, endpoints := range resolveMany(r.targets) {
r.set(t.String(), endpoints)
}
}

func (r staticResolver) Stop() {
close(r.quit)
func resolveMany(targets []target) map[target][]string {
result := map[target][]string{}
for _, t := range targets {
result[t] = resolveOne(t)
}
return result
}

func resolveOne(t target) []string {
var addrs []net.IP
if addr := net.ParseIP(t.host); addr != nil {
addrs = []net.IP{addr}
} else {
var err error
addrs, err = lookupIP(t.host)
if err != nil {
return []string{}
}
}
endpoints := make([]string, 0, len(addrs))
for _, addr := range addrs {
// For now, ignore IPv6
if addr.To4() == nil {
continue
}
endpoints = append(endpoints, net.JoinHostPort(addr.String(), t.port))
}
return endpoints
}
Loading