diff --git a/pkg/manager/group_plugin_impl.go b/pkg/manager/group_plugin_impl.go index 2d714294a..0355f35ba 100644 --- a/pkg/manager/group_plugin_impl.go +++ b/pkg/manager/group_plugin_impl.go @@ -12,19 +12,18 @@ import ( func (m *manager) proxyForGroupPlugin(name string) (group.Plugin, error) { m.lock.Lock() defer m.lock.Unlock() - - endpoint, err := m.plugins.Find(name) - if err != nil { - return nil, err - } - m.backendName = name - return rpc.NewClient(endpoint.Address), nil -} -// This implements the Group Plugin interface to support single group-only operations -// This is contrast with the declarative semantics of commit. It offers an imperative -// operation by operation interface to the user. + // A late-binding proxy so that we don't have a problem with having to + // start up the manager as the last of all the plugins. + return NewProxy(func() (group.Plugin, error) { + endpoint, err := m.plugins.Find(name) + if err != nil { + return nil, err + } + return rpc.NewClient(endpoint.Address), nil + }), nil +} func (m *manager) updateConfig(spec group.Spec) error { log.Debugln("Updating config", spec) @@ -36,6 +35,8 @@ func (m *manager) updateConfig(spec group.Spec) error { stored := GlobalSpec{} err := m.snapshot.Load(&stored) + + log.Warningln("Error updating config:", spec, "with error=", err) // TODO: More robust (type-based) error handling. if err != nil && err.Error() != "not-found" { return err @@ -60,21 +61,35 @@ func (m *manager) updateConfig(spec group.Spec) error { return m.snapshot.Save(stored) } -func (m *manager) CommitGroup(grp group.Spec, pretend bool) (string, error) { - err := make(chan error) +// This implements/ overrides the Group Plugin interface to support single group-only operations +func (m *manager) CommitGroup(grp group.Spec, pretend bool) (resp string, err error) { + + resultChan := make(chan []interface{}) + m.backendOps <- backendOp{ - name: "watch", + name: "commit", operation: func() error { - log.Debugln("Proxy WatchGroup:", grp) + log.Infoln("Proxy CommitGroup:", grp) if !pretend { if err := m.updateConfig(grp); err != nil { + log.Warningln("Error updating", err) return err } } - _, err := m.Plugin.CommitGroup(grp, pretend) + resp, cerr := m.Plugin.CommitGroup(grp, pretend) + log.Infoln("Responses from CommitGroup:", resp, cerr) + resultChan <- []interface{}{resp, cerr} return err }, - err: err, } - return "TODO(chungers): Allow the commit details string to be plumbed through", <-err + + r := <-resultChan + + if v, has := r[0].(string); has { + resp = v + } + if v, has := r[1].(error); has && v != nil { + err = v + } + return } diff --git a/pkg/manager/group_proxy.go b/pkg/manager/group_proxy.go new file mode 100644 index 000000000..158ad51aa --- /dev/null +++ b/pkg/manager/group_proxy.go @@ -0,0 +1,74 @@ +package manager + +import ( + "sync" + + "github.com/docker/infrakit/pkg/spi/group" +) + +// NewProxy returns a plugin interface. The proxy is late-binding in that +// it does not resolve plugin until a method is called. +func NewProxy(finder func() (group.Plugin, error)) group.Plugin { + return &proxy{finder: finder} +} + +type proxy struct { + lock sync.Mutex + client group.Plugin + finder func() (group.Plugin, error) +} + +func (c *proxy) run(f func(group.Plugin) error) error { + c.lock.Lock() + defer c.lock.Unlock() + + if c.client == nil { + if p, err := c.finder(); err == nil { + c.client = p + } else { + return err + } + } + + return f(c.client) +} + +func (c *proxy) CommitGroup(grp group.Spec, pretend bool) (resp string, err error) { + err = c.run(func(g group.Plugin) error { + resp, err = g.CommitGroup(grp, pretend) + return err + }) + return +} + +func (c *proxy) FreeGroup(id group.ID) (err error) { + err = c.run(func(g group.Plugin) error { + err = g.FreeGroup(id) + return err + }) + return +} + +func (c *proxy) DescribeGroup(id group.ID) (desc group.Description, err error) { + err = c.run(func(g group.Plugin) error { + desc, err = g.DescribeGroup(id) + return err + }) + return +} + +func (c *proxy) DestroyGroup(id group.ID) (err error) { + err = c.run(func(g group.Plugin) error { + err = g.DestroyGroup(id) + return err + }) + return +} + +func (c *proxy) InspectGroups() (specs []group.Spec, err error) { + err = c.run(func(g group.Plugin) error { + specs, err = g.InspectGroups() + return err + }) + return +} diff --git a/pkg/manager/group_proxy_test.go b/pkg/manager/group_proxy_test.go new file mode 100644 index 000000000..f1fa96688 --- /dev/null +++ b/pkg/manager/group_proxy_test.go @@ -0,0 +1,110 @@ +package manager + +import ( + "errors" + "testing" + + "github.com/docker/infrakit/pkg/spi/group" + "github.com/stretchr/testify/require" +) + +func TestErrorOnCallsToNilPlugin(t *testing.T) { + + errMessage := "no-plugin" + proxy := NewProxy(func() (group.Plugin, error) { + return nil, errors.New(errMessage) + }) + + err := proxy.FreeGroup(group.ID("test")) + require.Error(t, err) + require.Equal(t, errMessage, err.Error()) +} + +type fakeGroupPlugin struct { + group.Plugin + commitGroup func(grp group.Spec, pretend bool) (string, error) + freeGroup func(id group.ID) error +} + +func (f *fakeGroupPlugin) CommitGroup(grp group.Spec, pretend bool) (string, error) { + return f.commitGroup(grp, pretend) +} +func (f *fakeGroupPlugin) FreeGroup(id group.ID) error { + return f.freeGroup(id) +} + +func TestDelayPluginLookupCallingMethod(t *testing.T) { + + called := false + fake := &fakeGroupPlugin{ + commitGroup: func(grp group.Spec, pretend bool) (string, error) { + called = true + require.Equal(t, group.Spec{ID: "foo"}, grp) + require.Equal(t, true, pretend) + return "some-response", nil + }, + } + + proxy := NewProxy(func() (group.Plugin, error) { return fake, nil }) + + require.False(t, called) + + actualStr, actualErr := proxy.CommitGroup(group.Spec{ID: "foo"}, true) + require.True(t, called) + require.NoError(t, actualErr) + require.Equal(t, "some-response", actualStr) +} + +func TestDelayPluginLookupCallingMethodReturnsError(t *testing.T) { + + called := false + fake := &fakeGroupPlugin{ + freeGroup: func(id group.ID) error { + called = true + require.Equal(t, group.ID("foo"), id) + return errors.New("can't-free") + }, + } + + proxy := NewProxy(func() (group.Plugin, error) { return fake, nil }) + + require.False(t, called) + + actualErr := proxy.FreeGroup(group.ID("foo")) + require.True(t, called) + require.Error(t, actualErr) + require.Equal(t, "can't-free", actualErr.Error()) +} + +func TestDelayPluginLookupCallingMultipleMethods(t *testing.T) { + + called := false + fake := &fakeGroupPlugin{ + commitGroup: func(grp group.Spec, pretend bool) (string, error) { + called = true + require.Equal(t, group.Spec{ID: "foo"}, grp) + require.Equal(t, true, pretend) + return "some-response", nil + }, + freeGroup: func(id group.ID) error { + called = true + require.Equal(t, group.ID("foo"), id) + return errors.New("can't-free") + }, + } + + proxy := NewProxy(func() (group.Plugin, error) { return fake, nil }) + + require.False(t, called) + + actualStr, actualErr := proxy.CommitGroup(group.Spec{ID: "foo"}, true) + require.True(t, called) + require.NoError(t, actualErr) + require.Equal(t, "some-response", actualStr) + + called = false + actualErr = proxy.FreeGroup(group.ID("foo")) + require.True(t, called) + require.Error(t, actualErr) + require.Equal(t, "can't-free", actualErr.Error()) +} diff --git a/pkg/manager/manager.go b/pkg/manager/manager.go index be7012eda..605df75be 100644 --- a/pkg/manager/manager.go +++ b/pkg/manager/manager.go @@ -5,7 +5,6 @@ import ( "fmt" "sync" - "errors" log "github.com/Sirupsen/logrus" "github.com/docker/infrakit/pkg/discovery" "github.com/docker/infrakit/pkg/leader" @@ -43,7 +42,6 @@ type manager struct { type backendOp struct { name string operation func() error - err chan<- error } // NewManager returns the manager which depends on other services to coordinate and manage @@ -114,9 +112,7 @@ func (m *manager) Start() (<-chan struct{}, error) { case op := <-backendOps: log.Debugln("Backend operation:", op) if m.isLeader { - op.err <- op.operation() - } else { - op.err <- errors.New("not-a-leader") + op.operation() } case <-stopWorkQueue: diff --git a/pkg/store/file/file.go b/pkg/store/file/file.go index a91affb4c..bf2503ce8 100644 --- a/pkg/store/file/file.go +++ b/pkg/store/file/file.go @@ -43,8 +43,12 @@ func (s *snapshot) Save(obj interface{}) error { // Load loads a snapshot and marshals into the given reference func (s *snapshot) Load(output interface{}) error { buff, err := ioutil.ReadFile(filepath.Join(s.dir, s.name)) - if err != nil { + if err == nil { + return json.Unmarshal(buff, output) + } + if os.IsExist(err) { + // if file exists and we have problem reading return err } - return json.Unmarshal(buff, output) + return nil } diff --git a/pkg/store/store.go b/pkg/store/store.go index da979344b..01abfd78d 100644 --- a/pkg/store/store.go +++ b/pkg/store/store.go @@ -7,6 +7,7 @@ type Snapshot interface { // Save marshals (encodes) and saves a snapshot of the given object. Save(obj interface{}) error - // Load loads a snapshot and marshals (decodes) into the given reference + // Load loads a snapshot and marshals (decodes) into the given reference. + // If no data is available to unmarshal into the given struct, the fuction returns nil. Load(output interface{}) error } diff --git a/pkg/store/swarm/swarm.go b/pkg/store/swarm/swarm.go index a1bae3023..9709f3f24 100644 --- a/pkg/store/swarm/swarm.go +++ b/pkg/store/swarm/swarm.go @@ -42,12 +42,17 @@ func (s *snapshot) Save(obj interface{}) error { // Load loads a snapshot and marshals into the given reference func (s *snapshot) Load(output interface{}) error { label, err := readSwarm(s.client) - if err != nil { + if err == nil { + return decode(label, output) + } + if err != errNotFound { return err } - return decode(label, output) + return nil } +var errNotFound = fmt.Errorf("not-found") + func readSwarm(client client.APIClient) (string, error) { info, err := client.SwarmInspect(context.Background()) if err != nil { @@ -60,7 +65,7 @@ func readSwarm(client client.APIClient) (string, error) { return l, nil } } - return "", fmt.Errorf("not-found") + return "", errNotFound } func writeSwarm(client client.APIClient, value string) error {