Skip to content

Commit

Permalink
moving plugins interface to http instead of jsonrpc
Browse files Browse the repository at this point in the history
  • Loading branch information
paulbellamy committed Mar 31, 2016
1 parent 9592cab commit 16d4133
Show file tree
Hide file tree
Showing 5 changed files with 189 additions and 181 deletions.
123 changes: 59 additions & 64 deletions probe/plugins/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,8 @@ package plugins

import (
"fmt"
"io"
"net"
"net/rpc"
"net/rpc/jsonrpc"
"net/http"
"net/url"
"path/filepath"
"sort"
"strings"
Expand All @@ -15,17 +13,19 @@ import (

log "github.com/Sirupsen/logrus"
"github.com/fsnotify/fsnotify"
"github.com/ugorji/go/codec"
"golang.org/x/net/context"
"golang.org/x/net/context/ctxhttp"

"github.com/weaveworks/scope/common/fs"
"github.com/weaveworks/scope/common/fswatch"
"github.com/weaveworks/scope/common/xfer"
"github.com/weaveworks/scope/report"
)

// Exposed for testing
var (
// made available for testing
dialer = net.DialTimeout

ErrTimeout = fmt.Errorf("rpc call timeout")
transport = makeUnixRoundTripper
)

const (
Expand Down Expand Up @@ -58,7 +58,7 @@ func NewRegistry(root, apiVersion string, handshakeMetadata map[string]string) (
watcher: watcher,
done: make(chan struct{}),
}
if err := r.addPath(r.root); err != nil {
if err := r.addPath(context.Background(), r.root); err != nil {
r.Close()
return nil, err
}
Expand All @@ -68,7 +68,7 @@ func NewRegistry(root, apiVersion string, handshakeMetadata map[string]string) (

// add recursively crawls the path provided, adding it to the watcher, and
// looking for any existing sockets, loading them as plugins.
func (r *Registry) addPath(path string) error {
func (r *Registry) addPath(ctx context.Context, path string) error {
var statT syscall.Stat_t
if err := fs.Stat(path, &statT); err != nil {
return err
Expand All @@ -85,7 +85,7 @@ func (r *Registry) addPath(path string) error {
return nil
}
for _, file := range files {
if err := r.addPath(filepath.Join(path, file.Name())); err != nil {
if err := r.addPath(ctx, filepath.Join(path, file.Name())); err != nil {
log.Errorf("plugins: error loading path %s: %v", filepath.Join(path, file.Name()), err)
return nil
}
Expand All @@ -95,14 +95,13 @@ func (r *Registry) addPath(path string) error {
log.Infof("plugins: plugin already exists %s: conflicting %s", plugin.ID, path)
return nil
}

conn, err := dialer("unix", path, 1*time.Second)
tr, err := transport(path, pluginTimeout)
if err != nil {
log.Errorf("plugins: error loading plugin %s: %v", path, err)
return nil
}
transport := &onClose{conn, func() error { r.removePath(path); return nil }}
plugin, err := NewPlugin(transport, r.apiVersion, r.handshakeMetadata)
client := &http.Client{Transport: tr, Timeout: pluginTimeout}
plugin, err := NewPlugin(ctx, path, client, r.apiVersion, r.handshakeMetadata)
if err != nil {
log.Errorf("plugins: error loading plugin %s: %v", path, err)
return nil
Expand All @@ -116,7 +115,7 @@ func (r *Registry) addPath(path string) error {
return nil
}

func (r *Registry) removePath(path string) error {
func (r *Registry) removePath(ctx context.Context, path string) error {
r.watcher.Remove(path)
if plugin, ok := r.pluginsBySocket[path]; ok {
delete(r.pluginsBySocket, path)
Expand All @@ -131,14 +130,14 @@ func (r *Registry) loop() {
case <-r.done:
return
case evt := <-r.watcher.Events():
handlers := map[fsnotify.Op]func(string) error{
handlers := map[fsnotify.Op]func(context.Context, string) error{
fsnotify.Create: r.addPath,
fsnotify.Remove: r.removePath,
fsnotify.Chmod: r.addPath,
}
if handler, ok := handlers[evt.Op]; ok {
r.lock.Lock()
if err := handler(evt.Name); err != nil {
if err := handler(context.Background(), evt.Name); err != nil {
log.Errorf("plugins: event %v: error: %v", evt, err)
}
r.lock.Unlock()
Expand Down Expand Up @@ -190,22 +189,22 @@ func (r *Registry) Close() error {

type Plugin struct {
xfer.PluginSpec
conn io.ReadWriteCloser
client *rpc.Client
context context.Context
socket string
client *http.Client
}

type handshakeResponse struct {
Name string `json:"name"`
Description string `json:"description,omitempty"`
Interfaces []string `json:"interfaces"`
APIVersion string `json:"api_version,omitempty"`
}
// NewPlugin loads and initializes a new plugin. If client is nil,
// http.DefaultClient will be used.
func NewPlugin(ctx context.Context, socket string, client *http.Client, expectedAPIVersion string, handshakeMetadata map[string]string) (*Plugin, error) {
params := url.Values{}
for k, v := range handshakeMetadata {
params.Add(k, v)
}

// NewPlugin loads and initializes a new plugin
func NewPlugin(conn io.ReadWriteCloser, expectedAPIVersion string, handshakeMetadata map[string]string) (*Plugin, error) {
p := &Plugin{conn: conn, client: jsonrpc.NewClient(conn)}
var resp handshakeResponse
if err := p.Call("Handshake", handshakeMetadata, &resp); err != nil {
p := &Plugin{context: ctx, socket: socket, client: client}
resp, err := p.handshake(ctx, params)
if err != nil {
return nil, err
}
if resp.Name == "" {
Expand All @@ -220,44 +219,40 @@ func NewPlugin(conn io.ReadWriteCloser, expectedAPIVersion string, handshakeMeta
return p, nil
}

// Call calls some method on the remote plugin. Should replace this with a
// dynamic interface thing maybe.
func (p *Plugin) Call(cmd string, args interface{}, reply interface{}) error {
err := make(chan error, 1)
go func() {
err <- p.client.Call("Plugin."+cmd, args, reply)
}()
select {
case e := <-err:
if e == rpc.ErrShutdown {
p.Close()
}
return e
case <-time.After(pluginTimeout):
// timeout
}
return ErrTimeout
type handshakeResponse struct {
Name string `json:"name"`
Description string `json:"description,omitempty"`
Interfaces []string `json:"interfaces"`
APIVersion string `json:"api_version,omitempty"`
}

// Close closes the rpc client
func (p *Plugin) Close() error {
return p.client.Close()
func (p *Plugin) handshake(ctx context.Context, params url.Values) (handshakeResponse, error) {
var result handshakeResponse
err := p.get("/", params, &result)
return result, err
}

// onClose lets us attach a callback to be called after the underlying
// transport Closes
type onClose struct {
io.ReadWriteCloser
fn func() error
// Report gets the latest report from the plugin
func (p *Plugin) Report() (report.Report, error) {
result := report.MakeReport()
err := p.get("/report", nil, &result)
return result, err
}

func (c *onClose) Close() error {
err := c.ReadWriteCloser.Close()
if c.fn != nil {
err2 := c.fn()
if err == nil {
err = err2
}
// TODO(paulbellamy): better error handling on wrong status codes
func (p *Plugin) get(path string, params url.Values, result interface{}) error {
ctx, cancel := context.WithTimeout(context.Background(), pluginTimeout)
defer cancel()
resp, err := ctxhttp.Get(ctx, p.client, fmt.Sprintf("unix://%s?%s", path, params.Encode()))
if err != nil {
return err
}
return err
defer resp.Body.Close()
return codec.NewDecoder(resp.Body, &codec.JsonHandle{}).Decode(&result)
}

// Close closes the client
func (p *Plugin) Close() error {
// TODO(paulbellamy): cancel outstanding http requests here
return nil
}
Loading

0 comments on commit 16d4133

Please sign in to comment.