diff --git a/probe/plugins/registry.go b/probe/plugins/registry.go index 279bfad69a..b4394453ba 100644 --- a/probe/plugins/registry.go +++ b/probe/plugins/registry.go @@ -2,10 +2,8 @@ package plugins import ( "fmt" - "io" - "net" - "net/rpc" - "net/rpc/jsonrpc" + "net/http" + "net/url" "path/filepath" "sort" "strings" @@ -15,17 +13,19 @@ import ( log "github.com/Sirupsen/logrus" "github.com/fsnotify/fsnotify" + "github.com/ugorji/go/codec" + "golang.org/x/net/context" + "golang.org/x/net/context/ctxhttp" "github.com/weaveworks/scope/common/fs" "github.com/weaveworks/scope/common/fswatch" "github.com/weaveworks/scope/common/xfer" + "github.com/weaveworks/scope/report" ) +// Exposed for testing var ( - // made available for testing - dialer = net.DialTimeout - - ErrTimeout = fmt.Errorf("rpc call timeout") + transport = makeUnixRoundTripper ) const ( @@ -58,7 +58,7 @@ func NewRegistry(root, apiVersion string, handshakeMetadata map[string]string) ( watcher: watcher, done: make(chan struct{}), } - if err := r.addPath(r.root); err != nil { + if err := r.addPath(context.Background(), r.root); err != nil { r.Close() return nil, err } @@ -68,7 +68,7 @@ func NewRegistry(root, apiVersion string, handshakeMetadata map[string]string) ( // add recursively crawls the path provided, adding it to the watcher, and // looking for any existing sockets, loading them as plugins. -func (r *Registry) addPath(path string) error { +func (r *Registry) addPath(ctx context.Context, path string) error { var statT syscall.Stat_t if err := fs.Stat(path, &statT); err != nil { return err @@ -85,7 +85,7 @@ func (r *Registry) addPath(path string) error { return nil } for _, file := range files { - if err := r.addPath(filepath.Join(path, file.Name())); err != nil { + if err := r.addPath(ctx, filepath.Join(path, file.Name())); err != nil { log.Errorf("plugins: error loading path %s: %v", filepath.Join(path, file.Name()), err) return nil } @@ -95,14 +95,13 @@ func (r *Registry) addPath(path string) error { log.Infof("plugins: plugin already exists %s: conflicting %s", plugin.ID, path) return nil } - - conn, err := dialer("unix", path, 1*time.Second) + tr, err := transport(path, pluginTimeout) if err != nil { log.Errorf("plugins: error loading plugin %s: %v", path, err) return nil } - transport := &onClose{conn, func() error { r.removePath(path); return nil }} - plugin, err := NewPlugin(transport, r.apiVersion, r.handshakeMetadata) + client := &http.Client{Transport: tr, Timeout: pluginTimeout} + plugin, err := NewPlugin(ctx, path, client, r.apiVersion, r.handshakeMetadata) if err != nil { log.Errorf("plugins: error loading plugin %s: %v", path, err) return nil @@ -116,7 +115,7 @@ func (r *Registry) addPath(path string) error { return nil } -func (r *Registry) removePath(path string) error { +func (r *Registry) removePath(ctx context.Context, path string) error { r.watcher.Remove(path) if plugin, ok := r.pluginsBySocket[path]; ok { delete(r.pluginsBySocket, path) @@ -131,14 +130,14 @@ func (r *Registry) loop() { case <-r.done: return case evt := <-r.watcher.Events(): - handlers := map[fsnotify.Op]func(string) error{ + handlers := map[fsnotify.Op]func(context.Context, string) error{ fsnotify.Create: r.addPath, fsnotify.Remove: r.removePath, fsnotify.Chmod: r.addPath, } if handler, ok := handlers[evt.Op]; ok { r.lock.Lock() - if err := handler(evt.Name); err != nil { + if err := handler(context.Background(), evt.Name); err != nil { log.Errorf("plugins: event %v: error: %v", evt, err) } r.lock.Unlock() @@ -190,22 +189,22 @@ func (r *Registry) Close() error { type Plugin struct { xfer.PluginSpec - conn io.ReadWriteCloser - client *rpc.Client + context context.Context + socket string + client *http.Client } -type handshakeResponse struct { - Name string `json:"name"` - Description string `json:"description,omitempty"` - Interfaces []string `json:"interfaces"` - APIVersion string `json:"api_version,omitempty"` -} +// NewPlugin loads and initializes a new plugin. If client is nil, +// http.DefaultClient will be used. +func NewPlugin(ctx context.Context, socket string, client *http.Client, expectedAPIVersion string, handshakeMetadata map[string]string) (*Plugin, error) { + params := url.Values{} + for k, v := range handshakeMetadata { + params.Add(k, v) + } -// NewPlugin loads and initializes a new plugin -func NewPlugin(conn io.ReadWriteCloser, expectedAPIVersion string, handshakeMetadata map[string]string) (*Plugin, error) { - p := &Plugin{conn: conn, client: jsonrpc.NewClient(conn)} - var resp handshakeResponse - if err := p.Call("Handshake", handshakeMetadata, &resp); err != nil { + p := &Plugin{context: ctx, socket: socket, client: client} + resp, err := p.handshake(ctx, params) + if err != nil { return nil, err } if resp.Name == "" { @@ -220,44 +219,40 @@ func NewPlugin(conn io.ReadWriteCloser, expectedAPIVersion string, handshakeMeta return p, nil } -// Call calls some method on the remote plugin. Should replace this with a -// dynamic interface thing maybe. -func (p *Plugin) Call(cmd string, args interface{}, reply interface{}) error { - err := make(chan error, 1) - go func() { - err <- p.client.Call("Plugin."+cmd, args, reply) - }() - select { - case e := <-err: - if e == rpc.ErrShutdown { - p.Close() - } - return e - case <-time.After(pluginTimeout): - // timeout - } - return ErrTimeout +type handshakeResponse struct { + Name string `json:"name"` + Description string `json:"description,omitempty"` + Interfaces []string `json:"interfaces"` + APIVersion string `json:"api_version,omitempty"` } -// Close closes the rpc client -func (p *Plugin) Close() error { - return p.client.Close() +func (p *Plugin) handshake(ctx context.Context, params url.Values) (handshakeResponse, error) { + var result handshakeResponse + err := p.get("/", params, &result) + return result, err } -// onClose lets us attach a callback to be called after the underlying -// transport Closes -type onClose struct { - io.ReadWriteCloser - fn func() error +// Report gets the latest report from the plugin +func (p *Plugin) Report() (report.Report, error) { + result := report.MakeReport() + err := p.get("/report", nil, &result) + return result, err } -func (c *onClose) Close() error { - err := c.ReadWriteCloser.Close() - if c.fn != nil { - err2 := c.fn() - if err == nil { - err = err2 - } +// TODO(paulbellamy): better error handling on wrong status codes +func (p *Plugin) get(path string, params url.Values, result interface{}) error { + ctx, cancel := context.WithTimeout(context.Background(), pluginTimeout) + defer cancel() + resp, err := ctxhttp.Get(ctx, p.client, fmt.Sprintf("unix://%s?%s", path, params.Encode())) + if err != nil { + return err } - return err + defer resp.Body.Close() + return codec.NewDecoder(resp.Body, &codec.JsonHandle{}).Decode(&result) +} + +// Close closes the client +func (p *Plugin) Close() error { + // TODO(paulbellamy): cancel outstanding http requests here + return nil } diff --git a/probe/plugins/registry_test.go b/probe/plugins/registry_test.go index 64f3aae5ab..f1faee2064 100644 --- a/probe/plugins/registry_test.go +++ b/probe/plugins/registry_test.go @@ -1,12 +1,13 @@ package plugins import ( - "bytes" + "fmt" "io" - "io/ioutil" "net" + "net/http" + "net/http/httptest" + "net/http/httputil" "path/filepath" - "strings" "syscall" "testing" "time" @@ -20,20 +21,41 @@ import ( "github.com/weaveworks/scope/test/fswatch" ) -func stubDialer(fn func(network, address string, timeout time.Duration) (net.Conn, error)) { - dialer = fn +func stubTransport(fn func(socket string, timeout time.Duration) (http.RoundTripper, error)) { + transport = fn } -func restoreDialer() { dialer = net.DialTimeout } +func restoreTransport() { transport = makeUnixRoundTripper } -type NopWriteCloser struct{ io.Writer } +type readWriteCloseRoundTripper struct { + io.ReadWriteCloser +} -func (n NopWriteCloser) Close() error { return nil } +func (rwc readWriteCloseRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) { + conn := &closeableConn{ + Conn: &ionet.Conn{R: rwc, W: rwc}, + Closer: rwc, + } + client := httputil.NewClientConn(conn, nil) + defer client.Close() + return client.Do(req) +} + +// closeableConn gives us an overrideable Close, where ionet.Conn does not. +type closeableConn struct { + net.Conn + io.Closer +} + +func (c *closeableConn) Close() error { + c.Conn.Close() + return c.Closer.Close() +} type mockPlugin struct { - Name string - R io.Reader - W io.Writer - Closer io.Closer + t *testing.T + Name string + Handler http.Handler + Requests chan *http.Request } func (p mockPlugin) dir() string { @@ -49,11 +71,25 @@ func (p mockPlugin) base() string { } func (p mockPlugin) file() fs.File { + incomingR, incomingW := io.Pipe() + outgoingR, outgoingW := io.Pipe() + go func() { + conn := httputil.NewServerConn(&ionet.Conn{R: incomingR, W: outgoingW}, nil) + req, err := conn.Read() + if err != nil { + p.t.Fatal(err) + } + resp := httptest.NewRecorder() + p.Handler.ServeHTTP(resp, req) + fmt.Fprintf(outgoingW, "HTTP/1.1 200 OK\nContent-Length: %d\n\n%s", resp.Body.Len(), resp.Body.String()) + if p.Requests != nil { + p.Requests <- req + } + }() return fs.File{ FName: p.base(), - FReader: p.R, - FWriter: p.W, - FCloser: p.Closer, + FWriter: incomingW, + FReader: outgoingR, FStat: syscall.Stat_t{Mode: syscall.S_IFSOCK}, } } @@ -81,12 +117,9 @@ func setup(t *testing.T, mockPlugins ...mockPlugin) (fs.Entry, *fswatch.MockWatc fs_hook.Mock( mockFS) - stubDialer(func(network, address string, timeout time.Duration) (net.Conn, error) { - if network != "unix" { - t.Fatalf("Expected dial to unix socket, got: %q", network) - } - f, err := mockFS.Open(address) - return &closeableConn{&ionet.Conn{R: f, W: f}, f}, err + stubTransport(func(socket string, timeout time.Duration) (http.RoundTripper, error) { + f, err := mockFS.Open(socket) + return readWriteCloseRoundTripper{f}, err }) mockWatcher := fswatch.NewMockWatcher() @@ -94,21 +127,10 @@ func setup(t *testing.T, mockPlugins ...mockPlugin) (fs.Entry, *fswatch.MockWatc return mockFS, mockWatcher } -// closeableConn gives us an overrideable Close, where ionet.Conn does not. -type closeableConn struct { - net.Conn - io.Closer -} - -func (c *closeableConn) Close() error { - c.Conn.Close() - return c.Closer.Close() -} - func restore(t *testing.T) { fs_hook.Restore() fswatch_hook.Restore() - restoreDialer() + restoreTransport() } type iterator func(func(*Plugin)) @@ -130,9 +152,15 @@ func checkLoadedPlugins(t *testing.T, forEach iterator, expectedIDs []string) { } } +// stringHandler returns an http.Handler which just prints the given string +func stringHandler(j string) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + fmt.Fprint(w, j) + }) +} + func TestRegistryLoadsExistingPlugins(t *testing.T) { - rBuf := bytes.NewBufferString(`{"id":0,"result":{"name":"testPlugin","interfaces":["reporter"],"api_version":"1"}}`) - setup(t, mockPlugin{Name: "testPlugin", R: rBuf, W: ioutil.Discard}) + setup(t, mockPlugin{t: t, Name: "testPlugin", Handler: stringHandler(`{"name":"testPlugin","interfaces":["reporter"],"api_version":"1"}`)}) defer restore(t) root := "/plugins" @@ -159,13 +187,11 @@ func TestRegistryDiscoversNewPlugins(t *testing.T) { checkLoadedPlugins(t, r.ForEach, []string{}) // Add the new plugin - rBuf := bytes.NewBufferString(`{"id":0,"result":{"name":"testPlugin","interfaces":["reporter"]}}`) - w := chanWriter(make(chan []byte)) - plugin := mockPlugin{Name: "testPlugin", R: rBuf, W: w} + plugin := mockPlugin{t: t, Name: "testPlugin", Requests: make(chan *http.Request), Handler: stringHandler(`{"name":"testPlugin","interfaces":["reporter"]}`)} mockFS.Add(plugin.dir(), plugin.file()) mockWatcher.Events() <- fsnotify.Event{Name: plugin.path(), Op: fsnotify.Create} select { - case <-w: + case <-plugin.Requests: // registry connected to this plugin case <-time.After(100 * time.Millisecond): // timeout @@ -180,8 +206,7 @@ func TestRegistryDiscoversNewPlugins(t *testing.T) { } func TestRegistryRemovesPlugins(t *testing.T) { - rBuf := bytes.NewBufferString(`{"id":0,"result":{"name":"testPlugin","interfaces":["reporter"]}}`) - plugin := mockPlugin{Name: "testPlugin", R: rBuf, Closer: chanWriter(make(chan []byte))} + plugin := mockPlugin{t: t, Name: "testPlugin", Requests: make(chan *http.Request), Handler: stringHandler(`{"name":"testPlugin","interfaces":["reporter"]}`)} _, mockWatcher := setup(t, plugin) defer restore(t) @@ -197,40 +222,7 @@ func TestRegistryRemovesPlugins(t *testing.T) { // Remove the plugin mockWatcher.Events() <- fsnotify.Event{Name: plugin.path(), Op: fsnotify.Remove} select { - case <-plugin.Closer.(chanWriter): - // registry closed connection to this plugin - case <-time.After(100 * time.Millisecond): - // timeout - t.Errorf("timeout waiting for registry to remove plugin") - } - - checkLoadedPlugins(t, r.ForEach, []string{}) - - if _, ok := mockWatcher.Watched()[plugin.path()]; ok { - t.Errorf("Expected registry not to be watching %s, but was", plugin.path()) - } -} - -func TestRegistryRemovesPluginsWhenTheyClose(t *testing.T) { - // the reader here will EOF after this message, which should count as the - // connection closing. - rBuf := strings.NewReader(`{"id":0,"result":{"name":"testPlugin","interfaces":["reporter"]}}`) - plugin := mockPlugin{Name: "testPlugin", R: rBuf, Closer: chanWriter(make(chan []byte))} - _, mockWatcher := setup(t, plugin) - defer restore(t) - - root := "/plugins" - r, err := NewRegistry(root, "", nil) - if err != nil { - t.Fatal(err) - } - defer r.Close() - - checkLoadedPlugins(t, r.ForEach, []string{"testPlugin"}) - - // Remove the plugin - select { - case <-plugin.Closer.(chanWriter): + case <-plugin.Requests: // registry closed connection to this plugin case <-time.After(100 * time.Millisecond): // timeout @@ -248,14 +240,14 @@ func TestRegistryReturnsPluginsByInterface(t *testing.T) { setup( t, mockPlugin{ - Name: "plugin1", - R: bytes.NewBufferString(`{"id":0,"result":{"name":"plugin1","interfaces":["reporter"]}}`), - W: ioutil.Discard, + t: t, + Name: "plugin1", + Handler: stringHandler(`{"name":"plugin1","interfaces":["reporter"]}`), }, mockPlugin{ - Name: "plugin2", - R: bytes.NewBufferString(`{"id":0,"result":{"name":"plugin2","interfaces":["other"]}}`), - W: ioutil.Discard, + t: t, + Name: "plugin2", + Handler: stringHandler(`{"name":"plugin2","interfaces":["other"]}`), }, ) defer restore(t) @@ -276,14 +268,14 @@ func TestRegistryHandlesConflictingPlugins(t *testing.T) { setup( t, mockPlugin{ - Name: "plugin1", - R: bytes.NewBufferString(`{"id":0,"result":{"name":"plugin1","interfaces":["reporter"]}}`), - W: ioutil.Discard, + t: t, + Name: "plugin1", + Handler: stringHandler(`{"name":"plugin1","interfaces":["reporter"]}`), }, mockPlugin{ - Name: "plugin1", - R: bytes.NewBufferString(`{"id":0,"result":{"name":"plugin2","interfaces":["other"]}}`), - W: ioutil.Discard, + t: t, + Name: "plugin1", + Handler: stringHandler(`{"name":"plugin2","interfaces":["other"]}`), }, ) defer restore(t) diff --git a/probe/plugins/reporter.go b/probe/plugins/reporter.go index 721a0fee4d..d6c7d78c74 100644 --- a/probe/plugins/reporter.go +++ b/probe/plugins/reporter.go @@ -1,12 +1,9 @@ package plugins import ( - "bytes" - "encoding/json" - log "github.com/Sirupsen/logrus" - "github.com/ugorji/go/codec" + "github.com/weaveworks/scope/probe" "github.com/weaveworks/scope/report" ) @@ -14,16 +11,12 @@ func Reporter(pluginRegistry *Registry) probe.Reporter { return probe.ReporterFunc("plugins", func() (report.Report, error) { rpt := report.MakeReport() pluginRegistry.Implementors("reporter", func(plugin *Plugin) { - var pluginReportJSON json.RawMessage - if err := plugin.Call("Report", nil, &pluginReportJSON); err != nil { + pluginReport, err := plugin.Report() + if err != nil { log.Errorf("plugins: error getting report from %s: %v", plugin.ID, err) + return } - - pluginReport := report.MakeReport() pluginReport.Plugins = pluginReport.Plugins.Add(plugin.PluginSpec) - if err := codec.NewDecoder(bytes.NewReader(pluginReportJSON), &codec.JsonHandle{}).Decode(&pluginReport); err != nil { - log.Errorf("plugins: error decoding report from %s: %v", plugin.ID, err) - } rpt = rpt.Merge(pluginReport) }) return rpt, nil diff --git a/probe/plugins/sysdig/main.go b/probe/plugins/sysdig/main.go index c249c3a87a..f7597cad64 100644 --- a/probe/plugins/sysdig/main.go +++ b/probe/plugins/sysdig/main.go @@ -2,13 +2,13 @@ package main import ( "bufio" + "encoding/json" "flag" "fmt" "io" "log" "net" - "net/rpc" - "net/rpc/jsonrpc" + "net/http" "os" "os/exec" "strconv" @@ -37,12 +37,12 @@ func main() { log.Println("Found sysdig...") os.Remove(*addr) - l, err := net.Listen("unix", *addr) + listener, err := net.Listen("unix", *addr) if err != nil { log.Fatal(err) } defer func() { - l.Close() + listener.Close() os.Remove(*addr) }() @@ -53,14 +53,11 @@ func main() { log.Println(err) return } - rpc.Register(&Plugin{httpLog: httpLog, HostID: *hostID}) - for { - conn, err := l.Accept() - if err != nil { - log.Printf("error: %v", err) - break - } - go rpc.ServeCodec(jsonrpc.NewServerCodec(conn)) + plugin := &Plugin{httpLog: httpLog, HostID: *hostID} + http.HandleFunc("/", plugin.Handshake) + http.HandleFunc("/report", plugin.Report) + if err := http.Serve(listener, nil); err != nil { + log.Printf("error: %v", err) } } @@ -69,18 +66,20 @@ type Plugin struct { HostID string } -func (p *Plugin) Handshake(args map[string]string, resp *map[string]interface{}) error { - log.Printf("Probe %s handshake", args["probe_id"]) - (*resp) = map[string]interface{}{ +func (p *Plugin) Handshake(w http.ResponseWriter, r *http.Request) { + log.Printf("Probe %s handshake", r.FormValue("probe_id")) + err := json.NewEncoder(w).Encode(map[string]interface{}{ "name": "sysdig", "description": "Displays HTTP request rates by process based on sysdig data", "interfaces": []string{"reporter"}, "api_version": "1", + }) + if err != nil { + log.Printf("error: %v", err) } - return nil } -func (p *Plugin) Report(args map[string]string, resp *map[string]interface{}) error { +func (p *Plugin) Report(w http.ResponseWriter, r *http.Request) { now := time.Now() nowISO := now.Format(time.RFC3339) nodes := map[string]interface{}{} @@ -107,7 +106,7 @@ func (p *Plugin) Report(args map[string]string, resp *map[string]interface{}) er }, } }) - (*resp) = map[string]interface{}{ + err := json.NewEncoder(w).Encode(map[string]interface{}{ "Process": map[string]interface{}{ "nodes": nodes, "metadata_templates": map[string]interface{}{ @@ -125,8 +124,10 @@ func (p *Plugin) Report(args map[string]string, resp *map[string]interface{}) er }, }, }, + }) + if err != nil { + log.Printf("error: %v", err) } - return nil } type latest struct { diff --git a/probe/plugins/unix_round_tripper.go b/probe/plugins/unix_round_tripper.go new file mode 100644 index 0000000000..0bbd49b51d --- /dev/null +++ b/probe/plugins/unix_round_tripper.go @@ -0,0 +1,27 @@ +package plugins + +import ( + "net" + "net/http" + "net/http/httputil" + "time" +) + +type unixRoundTripper struct { + address string + timeout time.Duration +} + +func makeUnixRoundTripper(address string, timeout time.Duration) (http.RoundTripper, error) { + return unixRoundTripper{address: address, timeout: timeout}, nil +} + +func (t unixRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) { + conn, err := net.DialTimeout("unix", t.address, t.timeout) + if err != nil { + return nil, err + } + client := httputil.NewClientConn(conn, nil) + defer client.Close() + return client.Do(req) +}