Skip to content

Commit

Permalink
potential data race in sentinel (#9735)
Browse files Browse the repository at this point in the history
  • Loading branch information
domiwei authored Mar 18, 2024
1 parent 5bce91f commit a15603a
Showing 1 changed file with 34 additions and 24 deletions.
58 changes: 34 additions & 24 deletions cl/sentinel/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,11 +141,8 @@ func (s *GossipManager) AddSubscription(topic string, sub *GossipSubscription) {
func (s *GossipManager) unsubscribe(topic string) {
s.mu.Lock()
defer s.mu.Unlock()
if _, ok := s.subscriptions[topic]; !ok {
return
}
sub := s.subscriptions[topic]
if sub == nil {
sub, ok := s.subscriptions[topic]
if !ok || sub == nil {
return
}
sub.Close()
Expand All @@ -170,8 +167,18 @@ func (s *Sentinel) forkWatcher() {
return
}
if prevDigest != digest {
subs := s.subManager.subscriptions
for path, sub := range subs {
copy := func() map[string]*GossipSubscription {
s.subManager.mu.Lock()
defer s.subManager.mu.Unlock()
// copy the map
copy := make(map[string]*GossipSubscription)
for k, v := range s.subManager.subscriptions {
copy[k] = v
}
return copy
}()
// unsubscribe and resubscribe to all topics
for path, sub := range copy {
s.subManager.unsubscribe(path)
newSub, err := s.SubscribeGossip(sub.gossip_topic)
if err != nil {
Expand Down Expand Up @@ -292,8 +299,9 @@ type GossipSubscription struct {
cf context.CancelFunc
rf pubsub.RelayCancelFunc

setup sync.Once
stopCh chan struct{}
setup sync.Once
stopCh chan struct{}
closeOnce sync.Once
}

func (sub *GossipSubscription) Listen() (err error) {
Expand All @@ -313,21 +321,23 @@ func (sub *GossipSubscription) Listen() (err error) {

// calls the cancel func for the subscriber and closes the topic and sub
func (s *GossipSubscription) Close() {
s.stopCh <- struct{}{}
if s.cf != nil {
s.cf()
}
if s.rf != nil {
s.rf()
}
if s.sub != nil {
s.sub.Cancel()
s.sub = nil
}
if s.topic != nil {
s.topic.Close()
s.topic = nil
}
s.closeOnce.Do(func() {
close(s.stopCh)
if s.cf != nil {
s.cf()
}
if s.rf != nil {
s.rf()
}
if s.sub != nil {
s.sub.Cancel()
s.sub = nil
}
if s.topic != nil {
s.topic.Close()
s.topic = nil
}
})
}

type GossipMessage struct {
Expand Down

0 comments on commit a15603a

Please sign in to comment.