Skip to content
This repository has been archived by the owner on Jan 21, 2020. It is now read-only.

Manager now uses a late binding proxy for group plugin #354

Merged
merged 3 commits into from
Jan 9, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 33 additions & 18 deletions pkg/manager/group_plugin_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,18 @@ import (
func (m *manager) proxyForGroupPlugin(name string) (group.Plugin, error) {
m.lock.Lock()
defer m.lock.Unlock()

endpoint, err := m.plugins.Find(name)
if err != nil {
return nil, err
}

m.backendName = name
return rpc.NewClient(endpoint.Address), nil
}

// This implements the Group Plugin interface to support single group-only operations
// This is contrast with the declarative semantics of commit. It offers an imperative
// operation by operation interface to the user.
// A late-binding proxy so that we don't have a problem with having to
// start up the manager as the last of all the plugins.
return NewProxy(func() (group.Plugin, error) {
endpoint, err := m.plugins.Find(name)
if err != nil {
return nil, err
}
return rpc.NewClient(endpoint.Address), nil
}), nil
}

func (m *manager) updateConfig(spec group.Spec) error {
log.Debugln("Updating config", spec)
Expand All @@ -36,6 +35,8 @@ func (m *manager) updateConfig(spec group.Spec) error {
stored := GlobalSpec{}

err := m.snapshot.Load(&stored)

log.Warningln("Error updating config:", spec, "with error=", err)
// TODO: More robust (type-based) error handling.
if err != nil && err.Error() != "not-found" {
return err
Expand All @@ -60,21 +61,35 @@ func (m *manager) updateConfig(spec group.Spec) error {
return m.snapshot.Save(stored)
}

func (m *manager) CommitGroup(grp group.Spec, pretend bool) (string, error) {
err := make(chan error)
// This implements/ overrides the Group Plugin interface to support single group-only operations
func (m *manager) CommitGroup(grp group.Spec, pretend bool) (resp string, err error) {

resultChan := make(chan []interface{})

m.backendOps <- backendOp{
name: "watch",
name: "commit",
operation: func() error {
log.Debugln("Proxy WatchGroup:", grp)
log.Infoln("Proxy CommitGroup:", grp)
if !pretend {
if err := m.updateConfig(grp); err != nil {
log.Warningln("Error updating", err)
return err
}
}
_, err := m.Plugin.CommitGroup(grp, pretend)
resp, cerr := m.Plugin.CommitGroup(grp, pretend)
log.Infoln("Responses from CommitGroup:", resp, cerr)
resultChan <- []interface{}{resp, cerr}
return err
},
err: err,
}
return "TODO(chungers): Allow the commit details string to be plumbed through", <-err

r := <-resultChan

if v, has := r[0].(string); has {
resp = v
}
if v, has := r[1].(error); has && v != nil {
err = v
}
return
}
74 changes: 74 additions & 0 deletions pkg/manager/group_proxy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package manager

import (
"sync"

"github.com/docker/infrakit/pkg/spi/group"
)

// NewProxy returns a plugin interface. The proxy is late-binding in that
// it does not resolve plugin until a method is called.
func NewProxy(finder func() (group.Plugin, error)) group.Plugin {
return &proxy{finder: finder}
}

type proxy struct {
lock sync.Mutex
client group.Plugin
finder func() (group.Plugin, error)
}

func (c *proxy) run(f func(group.Plugin) error) error {
c.lock.Lock()
defer c.lock.Unlock()

if c.client == nil {
if p, err := c.finder(); err == nil {
c.client = p
} else {
return err
}
}

return f(c.client)
}

func (c *proxy) CommitGroup(grp group.Spec, pretend bool) (resp string, err error) {
err = c.run(func(g group.Plugin) error {
resp, err = g.CommitGroup(grp, pretend)
return err
})
return
}

func (c *proxy) FreeGroup(id group.ID) (err error) {
err = c.run(func(g group.Plugin) error {
err = g.FreeGroup(id)
return err
})
return
}

func (c *proxy) DescribeGroup(id group.ID) (desc group.Description, err error) {
err = c.run(func(g group.Plugin) error {
desc, err = g.DescribeGroup(id)
return err
})
return
}

func (c *proxy) DestroyGroup(id group.ID) (err error) {
err = c.run(func(g group.Plugin) error {
err = g.DestroyGroup(id)
return err
})
return
}

func (c *proxy) InspectGroups() (specs []group.Spec, err error) {
err = c.run(func(g group.Plugin) error {
specs, err = g.InspectGroups()
return err
})
return
}
110 changes: 110 additions & 0 deletions pkg/manager/group_proxy_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package manager

import (
"errors"
"testing"

"github.com/docker/infrakit/pkg/spi/group"
"github.com/stretchr/testify/require"
)

func TestErrorOnCallsToNilPlugin(t *testing.T) {

errMessage := "no-plugin"
proxy := NewProxy(func() (group.Plugin, error) {
return nil, errors.New(errMessage)
})

err := proxy.FreeGroup(group.ID("test"))
require.Error(t, err)
require.Equal(t, errMessage, err.Error())
}

type fakeGroupPlugin struct {
group.Plugin
commitGroup func(grp group.Spec, pretend bool) (string, error)
freeGroup func(id group.ID) error
}

func (f *fakeGroupPlugin) CommitGroup(grp group.Spec, pretend bool) (string, error) {
return f.commitGroup(grp, pretend)
}
func (f *fakeGroupPlugin) FreeGroup(id group.ID) error {
return f.freeGroup(id)
}

func TestDelayPluginLookupCallingMethod(t *testing.T) {

called := false
fake := &fakeGroupPlugin{
commitGroup: func(grp group.Spec, pretend bool) (string, error) {
called = true
require.Equal(t, group.Spec{ID: "foo"}, grp)
require.Equal(t, true, pretend)
return "some-response", nil
},
}

proxy := NewProxy(func() (group.Plugin, error) { return fake, nil })

require.False(t, called)

actualStr, actualErr := proxy.CommitGroup(group.Spec{ID: "foo"}, true)
require.True(t, called)
require.NoError(t, actualErr)
require.Equal(t, "some-response", actualStr)
}

func TestDelayPluginLookupCallingMethodReturnsError(t *testing.T) {

called := false
fake := &fakeGroupPlugin{
freeGroup: func(id group.ID) error {
called = true
require.Equal(t, group.ID("foo"), id)
return errors.New("can't-free")
},
}

proxy := NewProxy(func() (group.Plugin, error) { return fake, nil })

require.False(t, called)

actualErr := proxy.FreeGroup(group.ID("foo"))
require.True(t, called)
require.Error(t, actualErr)
require.Equal(t, "can't-free", actualErr.Error())
}

func TestDelayPluginLookupCallingMultipleMethods(t *testing.T) {

called := false
fake := &fakeGroupPlugin{
commitGroup: func(grp group.Spec, pretend bool) (string, error) {
called = true
require.Equal(t, group.Spec{ID: "foo"}, grp)
require.Equal(t, true, pretend)
return "some-response", nil
},
freeGroup: func(id group.ID) error {
called = true
require.Equal(t, group.ID("foo"), id)
return errors.New("can't-free")
},
}

proxy := NewProxy(func() (group.Plugin, error) { return fake, nil })

require.False(t, called)

actualStr, actualErr := proxy.CommitGroup(group.Spec{ID: "foo"}, true)
require.True(t, called)
require.NoError(t, actualErr)
require.Equal(t, "some-response", actualStr)

called = false
actualErr = proxy.FreeGroup(group.ID("foo"))
require.True(t, called)
require.Error(t, actualErr)
require.Equal(t, "can't-free", actualErr.Error())
}
6 changes: 1 addition & 5 deletions pkg/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"
"sync"

"errors"
log "github.com/Sirupsen/logrus"
"github.com/docker/infrakit/pkg/discovery"
"github.com/docker/infrakit/pkg/leader"
Expand Down Expand Up @@ -43,7 +42,6 @@ type manager struct {
type backendOp struct {
name string
operation func() error
err chan<- error
}

// NewManager returns the manager which depends on other services to coordinate and manage
Expand Down Expand Up @@ -114,9 +112,7 @@ func (m *manager) Start() (<-chan struct{}, error) {
case op := <-backendOps:
log.Debugln("Backend operation:", op)
if m.isLeader {
op.err <- op.operation()
} else {
op.err <- errors.New("not-a-leader")
op.operation()
}

case <-stopWorkQueue:
Expand Down
8 changes: 6 additions & 2 deletions pkg/store/file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,12 @@ func (s *snapshot) Save(obj interface{}) error {
// Load loads a snapshot and marshals into the given reference
func (s *snapshot) Load(output interface{}) error {
buff, err := ioutil.ReadFile(filepath.Join(s.dir, s.name))
if err != nil {
if err == nil {
return json.Unmarshal(buff, output)
}
if os.IsExist(err) {
// if file exists and we have problem reading
return err
}
return json.Unmarshal(buff, output)
return nil
}
3 changes: 2 additions & 1 deletion pkg/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ type Snapshot interface {
// Save marshals (encodes) and saves a snapshot of the given object.
Save(obj interface{}) error

// Load loads a snapshot and marshals (decodes) into the given reference
// Load loads a snapshot and marshals (decodes) into the given reference.
// If no data is available to unmarshal into the given struct, the fuction returns nil.
Load(output interface{}) error
}
11 changes: 8 additions & 3 deletions pkg/store/swarm/swarm.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,17 @@ func (s *snapshot) Save(obj interface{}) error {
// Load loads a snapshot and marshals into the given reference
func (s *snapshot) Load(output interface{}) error {
label, err := readSwarm(s.client)
if err != nil {
if err == nil {
return decode(label, output)
}
if err != errNotFound {
return err
}
return decode(label, output)
return nil
}

var errNotFound = fmt.Errorf("not-found")

func readSwarm(client client.APIClient) (string, error) {
info, err := client.SwarmInspect(context.Background())
if err != nil {
Expand All @@ -60,7 +65,7 @@ func readSwarm(client client.APIClient) (string, error) {
return l, nil
}
}
return "", fmt.Errorf("not-found")
return "", errNotFound
}

func writeSwarm(client client.APIClient, value string) error {
Expand Down