Skip to content

Commit

Permalink
Merge pull request #1746 from aluzzardi/selective-logbroker
Browse files Browse the repository at this point in the history
broker: Send down subscriptions only to relevant nodes.
  • Loading branch information
aluzzardi authored Nov 17, 2016
2 parents 9a03fb8 + 6423921 commit 91c6e2d
Show file tree
Hide file tree
Showing 4 changed files with 466 additions and 93 deletions.
100 changes: 70 additions & 30 deletions manager/logbroker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/docker/swarmkit/ca"
"github.com/docker/swarmkit/identity"
"github.com/docker/swarmkit/log"
"github.com/docker/swarmkit/manager/state/store"
"github.com/docker/swarmkit/watch"
"golang.org/x/net/context"
)
Expand All @@ -23,7 +24,7 @@ var (
errNotRunning = errors.New("broker is not running")
)

// LogBroker coordinates log subscriptions to services and tasks. Çlients can
// LogBroker coordinates log subscriptions to services and tasks. Clients can
// publish and subscribe to logs channels.
//
// Log subscriptions are pushed to the work nodes by creating log subscsription
Expand All @@ -33,15 +34,19 @@ type LogBroker struct {
logQueue *watch.Queue
subscriptionQueue *watch.Queue

registeredSubscriptions map[string]*api.SubscriptionMessage
registeredSubscriptions map[string]*subscription

pctx context.Context
cancelAll context.CancelFunc

store *store.MemoryStore
}

// New initializes and returns a new LogBroker
func New() *LogBroker {
return &LogBroker{}
func New(store *store.MemoryStore) *LogBroker {
return &LogBroker{
store: store,
}
}

// Run the log broker
Expand All @@ -56,7 +61,7 @@ func (lb *LogBroker) Run(ctx context.Context) error {
lb.pctx, lb.cancelAll = context.WithCancel(ctx)
lb.logQueue = watch.NewQueue()
lb.subscriptionQueue = watch.NewQueue()
lb.registeredSubscriptions = make(map[string]*api.SubscriptionMessage)
lb.registeredSubscriptions = make(map[string]*subscription)
lb.mu.Unlock()

select {
Expand Down Expand Up @@ -94,36 +99,60 @@ func validateSelector(selector *api.LogSelector) error {
return nil
}

func (lb *LogBroker) registerSubscription(subscription *api.SubscriptionMessage) {
func (lb *LogBroker) newSubscription(selector *api.LogSelector, options *api.LogSubscriptionOptions) *subscription {
lb.mu.RLock()
defer lb.mu.RUnlock()

subscription := newSubscription(lb.store, &api.SubscriptionMessage{
ID: identity.NewID(),
Selector: selector,
Options: options,
}, lb.subscriptionQueue)

return subscription
}

func (lb *LogBroker) registerSubscription(subscription *subscription) {
lb.mu.Lock()
defer lb.mu.Unlock()

lb.registeredSubscriptions[subscription.ID] = subscription
lb.registeredSubscriptions[subscription.message.ID] = subscription
lb.subscriptionQueue.Publish(subscription)
}

func (lb *LogBroker) unregisterSubscription(subscription *api.SubscriptionMessage) {
subscription = subscription.Copy()
subscription.Close = true

func (lb *LogBroker) unregisterSubscription(subscription *subscription) {
lb.mu.Lock()
defer lb.mu.Unlock()

delete(lb.registeredSubscriptions, subscription.ID)
delete(lb.registeredSubscriptions, subscription.message.ID)
subscription.message.Close = true
lb.subscriptionQueue.Publish(subscription)
}

func (lb *LogBroker) watchSubscriptions() ([]*api.SubscriptionMessage, chan events.Event, func()) {
// watchSubscriptions grabs all current subscriptions and notifies of any
// subscription change for this node.
//
// Subscriptions may fire multiple times and the caller has to protect against
// dupes.
func (lb *LogBroker) watchSubscriptions(nodeID string) ([]*subscription, chan events.Event, func()) {
lb.mu.RLock()
defer lb.mu.RUnlock()

subs := make([]*api.SubscriptionMessage, 0, len(lb.registeredSubscriptions))
for _, sub := range lb.registeredSubscriptions {
subs = append(subs, sub)
// Watch for subscription changes for this node.
ch, cancel := lb.subscriptionQueue.CallbackWatch(events.MatcherFunc(func(event events.Event) bool {
s := event.(*subscription)
return s.Contains(nodeID)
}))

// Grab current subscriptions.
subscriptions := make([]*subscription, 0, len(lb.registeredSubscriptions))
for _, s := range lb.registeredSubscriptions {
if s.Contains(nodeID) {
subscriptions = append(subscriptions, s)
}
}

ch, cancel := lb.subscriptionQueue.Watch()
return subs, ch, cancel
return subscriptions, ch, cancel
}

func (lb *LogBroker) subscribe(id string) (chan events.Event, func()) {
Expand Down Expand Up @@ -151,22 +180,20 @@ func (lb *LogBroker) SubscribeLogs(request *api.SubscribeLogsRequest, stream api
return err
}

subscription := &api.SubscriptionMessage{
ID: identity.NewID(),
Selector: request.Selector,
Options: request.Options,
}
subscription := lb.newSubscription(request.Selector, request.Options)
subscription.Run(lb.pctx)
defer subscription.Stop()

log := log.G(ctx).WithFields(
logrus.Fields{
"method": "(*LogBroker).SubscribeLogs",
"subscription.id": subscription.ID,
"subscription.id": subscription.message.ID,
},
)

log.Debug("subscribed")

publishCh, publishCancel := lb.subscribe(subscription.ID)
publishCh, publishCancel := lb.subscribe(subscription.message.ID)
defer publishCancel()

lb.registerSubscription(subscription)
Expand Down Expand Up @@ -202,11 +229,13 @@ func (lb *LogBroker) ListenSubscriptions(request *api.ListenSubscriptionsRequest
"node": remote.NodeID,
},
)
subscriptions, subscriptionCh, subscriptionCancel := lb.watchSubscriptions()
subscriptions, subscriptionCh, subscriptionCancel := lb.watchSubscriptions(remote.NodeID)
defer subscriptionCancel()

log.Debug("node registered")

activeSubscriptions := make(map[string]struct{})

// Start by sending down all active subscriptions.
for _, subscription := range subscriptions {
select {
Expand All @@ -217,19 +246,30 @@ func (lb *LogBroker) ListenSubscriptions(request *api.ListenSubscriptionsRequest
default:
}

if err := stream.Send(subscription); err != nil {
if err := stream.Send(subscription.message); err != nil {
log.Error(err)
return err
}
activeSubscriptions[subscription.message.ID] = struct{}{}
}

// Send down new subscriptions.
// TODO(aluzzardi): We should filter by relevant tasks for this node rather
for {
select {
case v := <-subscriptionCh:
subscription := v.(*api.SubscriptionMessage)
if err := stream.Send(subscription); err != nil {
subscription := v.(*subscription)

if subscription.message.Close {
log.WithField("subscription.id", subscription.message.ID).Debug("subscription closed")
} else {
// Avoid sending down the same subscription multiple times
if _, ok := activeSubscriptions[subscription.message.ID]; ok {
continue
}
activeSubscriptions[subscription.message.ID] = struct{}{}
log.WithField("subscription.id", subscription.message.ID).Debug("subscription added")
}
if err := stream.Send(subscription.message); err != nil {
log.Error(err)
return err
}
Expand Down
Loading

0 comments on commit 91c6e2d

Please sign in to comment.