Skip to content

Commit

Permalink
WIP -- working on plugins, still includes some debugging printlns
Browse files Browse the repository at this point in the history
  • Loading branch information
paulbellamy committed Mar 4, 2016
1 parent 8c0fc31 commit ec2f8dc
Show file tree
Hide file tree
Showing 23 changed files with 864 additions and 196 deletions.
228 changes: 228 additions & 0 deletions plugins/registry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,228 @@
package plugins

import (
"fmt"
"net"
"net/rpc"
"net/rpc/jsonrpc"
"os"
"path/filepath"
"strings"
"sync"
"time"

log "github.com/Sirupsen/logrus"
"github.com/fsnotify/fsnotify"

"github.com/weaveworks/scope/common/fs"
)

// Registry maintains a list of available plugins by name.
type Registry struct {
root string
handshakeMetadata map[string]string
pluginsBySocket map[string]*Plugin
lock sync.RWMutex
watcher *fsnotify.Watcher
done chan struct{}
}

// NewRegistry creates a new registry which watches the given dir root for new
// plugins, and adds them.
func NewRegistry(root string, handshakeMetadata map[string]string) (*Registry, error) {
watcher, err := fsnotify.NewWatcher()
if err != nil {
return nil, err
}
r := &Registry{
root: root,
handshakeMetadata: handshakeMetadata,
pluginsBySocket: map[string]*Plugin{},
watcher: watcher,
done: make(chan struct{}),
}
if err := r.addPath(r.root); err != nil {
r.Close()
return nil, err
}
go r.loop()
return r, nil
}

// add recursively crawls the path provided, adding it to the watcher, and
// looking for any existing sockets, loading them as plugins.
func (r *Registry) addPath(path string) error {
info, err := os.Stat(path)
if err != nil {
return err
}
if err := r.watcher.Add(path); err != nil {
return err
}
switch {
case info.IsDir():
files, err := fs.ReadDir(path)
if err != nil {
return err
}
for _, file := range files {
if err := r.addPath(filepath.Join(path, file.Name())); err != nil {
return err
}
}
case info.Mode()&os.ModeSocket != 0:
if plugin, ok := r.pluginsBySocket[path]; ok {
log.Infof("plugins: plugin already exists %s: conflicting %s and %s", plugin.Name, plugin.socket, path)
} else {
if plugin, err := NewPlugin(path, r.handshakeMetadata); err != nil {
log.Errorf("plugins: error loading plugin %s: %v", path, err)
} else {
log.Infof("plugins: loaded plugin %s: %s", plugin.Name, strings.Join(plugin.Interfaces, ", "))
plugin.OnClose(r.remove)
r.pluginsBySocket[path] = plugin
}
}
default:
log.Infof("plugins: unknown filemode %s: %v", path, info.Mode())
}
return nil
}

func (r *Registry) removePath(path string) error {
r.watcher.Remove(path)
if plugin, ok := r.pluginsBySocket[path]; ok {
delete(r.pluginsBySocket, path)
return plugin.Close()
}
return nil
}

func (r *Registry) remove(p *Plugin) error {
return r.removePath(p.socket)
}

func (r *Registry) loop() {
for {
select {
case <-r.done:
return
case evt := <-r.watcher.Events:
handlers := map[fsnotify.Op]func(string) error{
fsnotify.Create: r.addPath,
fsnotify.Remove: r.removePath,
fsnotify.Chmod: r.addPath,
}
if handler, ok := handlers[evt.Op]; ok {
r.lock.Lock()
if err := handler(evt.Name); err != nil {
log.Errorf("plugins: event %v: error: %v", evt, err)
}
r.lock.Unlock()
} else {
log.Errorf("plugins: event %v: no handler", evt)
}
case err := <-r.watcher.Errors:
log.Errorf("plugins: error: %v", err)
}
}
}

// ForEach walks through all the plugins running f for each one.
func (r *Registry) ForEach(f func(p *Plugin)) {
r.lock.RLock()
defer r.lock.RUnlock()
for _, p := range r.pluginsBySocket {
f(p)
}
}

// Implementors walks the available plugins fulfilling the given interface
func (r *Registry) Implementors(iface string, f func(p *Plugin)) {
r.ForEach(func(p *Plugin) {
for _, piface := range p.Interfaces {
if piface == iface {
f(p)
}
}
})
}

// Close shuts down the registry. It can still be used after this, but will be
// out of date.
func (r *Registry) Close() error {
close(r.done)
r.lock.Lock()
defer r.lock.Unlock()
for _, plugin := range r.pluginsBySocket {
plugin.Close()
}
return r.watcher.Close()
}

type Plugin struct {
Name string
Interfaces []string
socket string
client *rpc.Client
onclose func(*Plugin) error
}

type handshakeResponse struct {
Name string `json:"name"`
Interfaces []string `json:"interfaces"`
}

// NewPlugin loads and initializes a new plugin
func NewPlugin(socketPath string, handshakeMetadata map[string]string) (*Plugin, error) {
log.Println("plugins: dialing", socketPath)
conn, err := net.DialTimeout("unix", socketPath, 1*time.Second)
if err != nil {
return nil, err
}
log.Println("plugins: connected", socketPath)
p := &Plugin{socket: socketPath, client: jsonrpc.NewClient(conn)}
var resp handshakeResponse
if err := p.Call("Handshake", handshakeMetadata, &resp); err != nil {
return nil, err
}
if resp.Name == "" {
return nil, fmt.Errorf("plugin did not provide a name")
}
log.Printf("plugins: %s: handshake successful", resp.Name)
p.Name = resp.Name
p.Interfaces = resp.Interfaces
return p, nil
}

// Call calls some method on the remote plugin. Should replace this with a
// dynamic interface thing maybe.
func (p *Plugin) Call(cmd string, args interface{}, reply interface{}) error {
err := p.client.Call("Plugin."+cmd, args, reply)
if err == rpc.ErrShutdown {
p.Close()
}
return err
}

// OnClose func is called just after the plugin is closed. Either by the user
// or by the remote
func (p *Plugin) OnClose(f func(*Plugin) error) {
p.onclose = f
}

func (p *Plugin) Close() error {
err := p.client.Close()
if p.onclose != nil {
err2 := p.onclose(p)
if err == nil {
err = err2
}
}
return err
}

/*
func pluginName(socketPath string) string {
return strings.TrimSuffix(filepath.Base(socketPath), filepath.Ext(socketPath))
}
*/
14 changes: 14 additions & 0 deletions plugins/registry_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package plugins

import (
"testing"
)

func TestRegistryLoadsExistingPlugins(t *testing.T) { t.Error("pending") }
func TestRegistryDiscoversNewPlugins(t *testing.T) { t.Error("pending") }
func TestRegistryRemovesPlugins(t *testing.T) { t.Error("pending") }
func TestRegistryRemovesPluginsWhenTheyClose(t *testing.T) { t.Error("pending") }
func TestRegistryReturnsPluginsByInterface(t *testing.T) { t.Error("pending") }
func TestRegistryHandlesConflictingPlugins(t *testing.T) { t.Error("pending") }
func TestRegistryIgnoresDeadSockets(t *testing.T) { t.Error("pending") }
func TestRegistryIgnoresPluginsWhichHaveGoneAwayButLeftASocket(t *testing.T) { t.Error("pending") }
1 change: 1 addition & 0 deletions plugins/sysdig/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
sysdig-plugin
4 changes: 4 additions & 0 deletions plugins/sysdig/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
FROM sysdig/sysdig

COPY ./sysdig-plugin /usr/bin/sysdig-plugin
CMD ["/usr/bin/sysdig-plugin"]
18 changes: 18 additions & 0 deletions plugins/sysdig/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
.PHONY: run clean

EXE=sysdig-plugin
UPTODATE=.$(EXE).uptodate

run: $(UPTODATE)
docker run --rm -it --privileged -v /var/run/scope/plugins:/var/run/scope/plugins -v /var/run/docker.sock:/host/var/run/docker.sock -v /dev:/host/dev -v /proc:/host/proc:ro -v /boot:/host/boot:ro -v /lib/modules:/host/lib/modules:ro -v /usr:/host/usr:ro --name sysdig-plugin sysdig-plugin

$(UPTODATE): $(EXE)
docker build -t sysdig-plugin .
touch $@

$(EXE): main.go
docker run --rm -v "$$PWD":/usr/src/$(EXE) -w /usr/src/$(EXE) golang:1.6 go build -v

clean:
- rm -rf $(UPTODATE) $(EXE)

12 changes: 12 additions & 0 deletions plugins/sysdig/docker-entrypoint.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
#!/bin/bash
#set -e

echo "* Setting up /usr/src links from host"

for i in $(ls $SYSDIG_HOST_ROOT/usr/src); do
ln -s $SYSDIG_HOST_ROOT/usr/src/$i /usr/src/$i
done

/usr/bin/sysdig-probe-loader

exec "$@"
Loading

0 comments on commit ec2f8dc

Please sign in to comment.