diff --git a/cl/sentinel/gossip.go b/cl/sentinel/gossip.go index 0eccede3469..a0d4476fb2f 100644 --- a/cl/sentinel/gossip.go +++ b/cl/sentinel/gossip.go @@ -141,11 +141,8 @@ func (s *GossipManager) AddSubscription(topic string, sub *GossipSubscription) { func (s *GossipManager) unsubscribe(topic string) { s.mu.Lock() defer s.mu.Unlock() - if _, ok := s.subscriptions[topic]; !ok { - return - } - sub := s.subscriptions[topic] - if sub == nil { + sub, ok := s.subscriptions[topic] + if !ok || sub == nil { return } sub.Close() @@ -170,8 +167,18 @@ func (s *Sentinel) forkWatcher() { return } if prevDigest != digest { - subs := s.subManager.subscriptions - for path, sub := range subs { + copy := func() map[string]*GossipSubscription { + s.subManager.mu.Lock() + defer s.subManager.mu.Unlock() + // copy the map + copy := make(map[string]*GossipSubscription) + for k, v := range s.subManager.subscriptions { + copy[k] = v + } + return copy + }() + // unsubscribe and resubscribe to all topics + for path, sub := range copy { s.subManager.unsubscribe(path) newSub, err := s.SubscribeGossip(sub.gossip_topic) if err != nil { @@ -292,8 +299,9 @@ type GossipSubscription struct { cf context.CancelFunc rf pubsub.RelayCancelFunc - setup sync.Once - stopCh chan struct{} + setup sync.Once + stopCh chan struct{} + closeOnce sync.Once } func (sub *GossipSubscription) Listen() (err error) { @@ -313,21 +321,23 @@ func (sub *GossipSubscription) Listen() (err error) { // calls the cancel func for the subscriber and closes the topic and sub func (s *GossipSubscription) Close() { - s.stopCh <- struct{}{} - if s.cf != nil { - s.cf() - } - if s.rf != nil { - s.rf() - } - if s.sub != nil { - s.sub.Cancel() - s.sub = nil - } - if s.topic != nil { - s.topic.Close() - s.topic = nil - } + s.closeOnce.Do(func() { + close(s.stopCh) + if s.cf != nil { + s.cf() + } + if s.rf != nil { + s.rf() + } + if s.sub != nil { + s.sub.Cancel() + s.sub = nil + } + if s.topic != nil { + s.topic.Close() + s.topic = nil + } + }) } type GossipMessage struct {