From 90fb7333aee19a27831724fce6b7c3d016d656d2 Mon Sep 17 00:00:00 2001 From: Paul Bellamy Date: Thu, 31 Mar 2016 14:38:04 +0100 Subject: [PATCH] added retry to plugin handshake --- probe/plugins/registry.go | 69 +++++++++++++++++++++++---------------- 1 file changed, 41 insertions(+), 28 deletions(-) diff --git a/probe/plugins/registry.go b/probe/plugins/registry.go index b4394453ba..100623d4cd 100644 --- a/probe/plugins/registry.go +++ b/probe/plugins/registry.go @@ -30,6 +30,7 @@ var ( const ( pluginTimeout = 500 * time.Millisecond + pluginRetry = 5 * time.Second ) // Registry maintains a list of available plugins by name. @@ -101,14 +102,7 @@ func (r *Registry) addPath(ctx context.Context, path string) error { return nil } client := &http.Client{Transport: tr, Timeout: pluginTimeout} - plugin, err := NewPlugin(ctx, path, client, r.apiVersion, r.handshakeMetadata) - if err != nil { - log.Errorf("plugins: error loading plugin %s: %v", path, err) - return nil - } - - log.Infof("plugins: loaded plugin %s: %s", plugin.ID, strings.Join(plugin.Interfaces, ", ")) - r.pluginsBySocket[path] = plugin + r.pluginsBySocket[path] = NewPlugin(ctx, path, client, r.apiVersion, r.handshakeMetadata) default: log.Infof("plugins: unknown filemode %s", path) } @@ -192,31 +186,20 @@ type Plugin struct { context context.Context socket string client *http.Client + quit chan struct{} } // NewPlugin loads and initializes a new plugin. If client is nil, // http.DefaultClient will be used. -func NewPlugin(ctx context.Context, socket string, client *http.Client, expectedAPIVersion string, handshakeMetadata map[string]string) (*Plugin, error) { +func NewPlugin(ctx context.Context, socket string, client *http.Client, expectedAPIVersion string, handshakeMetadata map[string]string) *Plugin { params := url.Values{} for k, v := range handshakeMetadata { params.Add(k, v) } - p := &Plugin{context: ctx, socket: socket, client: client} - resp, err := p.handshake(ctx, params) - if err != nil { - return nil, err - } - if resp.Name == "" { - return nil, fmt.Errorf("plugin did not provide a name") - } - if resp.APIVersion != expectedAPIVersion { - return nil, fmt.Errorf("plugin did not provide correct API version: expected %q, got %q", expectedAPIVersion, resp.APIVersion) - } - p.ID, p.Label = resp.Name, resp.Name - p.Description = resp.Description - p.Interfaces = resp.Interfaces - return p, nil + p := &Plugin{context: ctx, socket: socket, client: client, quit: make(chan struct{})} + p.handshake(ctx, expectedAPIVersion, params, 0) + return p } type handshakeResponse struct { @@ -226,10 +209,39 @@ type handshakeResponse struct { APIVersion string `json:"api_version,omitempty"` } -func (p *Plugin) handshake(ctx context.Context, params url.Values) (handshakeResponse, error) { - var result handshakeResponse - err := p.get("/", params, &result) - return result, err +// handshake periodically retries the handshake with this plugin until it succeeds. +func (p *Plugin) handshake(ctx context.Context, expectedAPIVersion string, params url.Values, delay time.Duration) { + select { + case <-p.quit: + return + case <-time.After(delay): + // noop + } + if err := p.tryHandshake(ctx, expectedAPIVersion, params); err != nil { + log.Errorf("plugins: error loading plugin %s: %v", p.socket, err) + go p.handshake(ctx, expectedAPIVersion, params, pluginRetry) + return + } + log.Infof("plugins: loaded plugin %s: %s", p.ID, strings.Join(p.Interfaces, ", ")) +} + +// helper function to try a handshake once +func (p *Plugin) tryHandshake(ctx context.Context, expectedAPIVersion string, params url.Values) error { + var resp handshakeResponse + if err := p.get("/", params, &resp); err != nil { + return err + } + + if resp.Name == "" { + return fmt.Errorf("plugin did not provide a name") + } + if resp.APIVersion != expectedAPIVersion { + return fmt.Errorf("plugin did not provide correct API version: expected %q, got %q", expectedAPIVersion, resp.APIVersion) + } + p.ID, p.Label = resp.Name, resp.Name + p.Description = resp.Description + p.Interfaces = resp.Interfaces + return nil } // Report gets the latest report from the plugin @@ -254,5 +266,6 @@ func (p *Plugin) get(path string, params url.Values, result interface{}) error { // Close closes the client func (p *Plugin) Close() error { // TODO(paulbellamy): cancel outstanding http requests here + close(p.quit) return nil }