Skip to content

Commit

Permalink
added retry to plugin handshake
Browse files Browse the repository at this point in the history
  • Loading branch information
paulbellamy committed Mar 31, 2016
1 parent 16d4133 commit 90fb733
Showing 1 changed file with 41 additions and 28 deletions.
69 changes: 41 additions & 28 deletions probe/plugins/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ var (

const (
pluginTimeout = 500 * time.Millisecond
pluginRetry = 5 * time.Second
)

// Registry maintains a list of available plugins by name.
Expand Down Expand Up @@ -101,14 +102,7 @@ func (r *Registry) addPath(ctx context.Context, path string) error {
return nil
}
client := &http.Client{Transport: tr, Timeout: pluginTimeout}
plugin, err := NewPlugin(ctx, path, client, r.apiVersion, r.handshakeMetadata)
if err != nil {
log.Errorf("plugins: error loading plugin %s: %v", path, err)
return nil
}

log.Infof("plugins: loaded plugin %s: %s", plugin.ID, strings.Join(plugin.Interfaces, ", "))
r.pluginsBySocket[path] = plugin
r.pluginsBySocket[path] = NewPlugin(ctx, path, client, r.apiVersion, r.handshakeMetadata)
default:
log.Infof("plugins: unknown filemode %s", path)
}
Expand Down Expand Up @@ -192,31 +186,20 @@ type Plugin struct {
context context.Context
socket string
client *http.Client
quit chan struct{}
}

// NewPlugin loads and initializes a new plugin. If client is nil,
// http.DefaultClient will be used.
func NewPlugin(ctx context.Context, socket string, client *http.Client, expectedAPIVersion string, handshakeMetadata map[string]string) (*Plugin, error) {
func NewPlugin(ctx context.Context, socket string, client *http.Client, expectedAPIVersion string, handshakeMetadata map[string]string) *Plugin {
params := url.Values{}
for k, v := range handshakeMetadata {
params.Add(k, v)
}

p := &Plugin{context: ctx, socket: socket, client: client}
resp, err := p.handshake(ctx, params)
if err != nil {
return nil, err
}
if resp.Name == "" {
return nil, fmt.Errorf("plugin did not provide a name")
}
if resp.APIVersion != expectedAPIVersion {
return nil, fmt.Errorf("plugin did not provide correct API version: expected %q, got %q", expectedAPIVersion, resp.APIVersion)
}
p.ID, p.Label = resp.Name, resp.Name
p.Description = resp.Description
p.Interfaces = resp.Interfaces
return p, nil
p := &Plugin{context: ctx, socket: socket, client: client, quit: make(chan struct{})}
p.handshake(ctx, expectedAPIVersion, params, 0)
return p
}

type handshakeResponse struct {
Expand All @@ -226,10 +209,39 @@ type handshakeResponse struct {
APIVersion string `json:"api_version,omitempty"`
}

func (p *Plugin) handshake(ctx context.Context, params url.Values) (handshakeResponse, error) {
var result handshakeResponse
err := p.get("/", params, &result)
return result, err
// handshake periodically retries the handshake with this plugin until it succeeds.
func (p *Plugin) handshake(ctx context.Context, expectedAPIVersion string, params url.Values, delay time.Duration) {
select {
case <-p.quit:
return
case <-time.After(delay):
// noop
}
if err := p.tryHandshake(ctx, expectedAPIVersion, params); err != nil {
log.Errorf("plugins: error loading plugin %s: %v", p.socket, err)
go p.handshake(ctx, expectedAPIVersion, params, pluginRetry)
return
}
log.Infof("plugins: loaded plugin %s: %s", p.ID, strings.Join(p.Interfaces, ", "))
}

// helper function to try a handshake once
func (p *Plugin) tryHandshake(ctx context.Context, expectedAPIVersion string, params url.Values) error {
var resp handshakeResponse
if err := p.get("/", params, &resp); err != nil {
return err
}

if resp.Name == "" {
return fmt.Errorf("plugin did not provide a name")
}
if resp.APIVersion != expectedAPIVersion {
return fmt.Errorf("plugin did not provide correct API version: expected %q, got %q", expectedAPIVersion, resp.APIVersion)
}
p.ID, p.Label = resp.Name, resp.Name
p.Description = resp.Description
p.Interfaces = resp.Interfaces
return nil
}

// Report gets the latest report from the plugin
Expand All @@ -254,5 +266,6 @@ func (p *Plugin) get(path string, params url.Values, result interface{}) error {
// Close closes the client
func (p *Plugin) Close() error {
// TODO(paulbellamy): cancel outstanding http requests here
close(p.quit)
return nil
}

0 comments on commit 90fb733

Please sign in to comment.