Skip to content

Commit f9406f3

Browse files
Make subscriber close safe to concurrently close (#44)
Signed-off-by: GitHub Actions Bot <[email protected]> Co-authored-by: GitHub Actions Bot <[email protected]>
1 parent 7897117 commit f9406f3

File tree

1 file changed

+4
-5
lines changed

1 file changed

+4
-5
lines changed

pkg/sql/subscriber.go

+4-5
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"errors"
77
"fmt"
88
"sync"
9+
"sync/atomic"
910
"time"
1011

1112
"github.com/jackc/pgx/v5"
@@ -115,7 +116,7 @@ type Subscriber struct {
115116

116117
subscribeWg *sync.WaitGroup
117118
closing chan struct{}
118-
closed bool
119+
closed uint32
119120

120121
logger watermill.LoggerAdapter
121122
}
@@ -167,7 +168,7 @@ func newSubscriberID() ([]byte, string, error) {
167168
}
168169

169170
func (s *Subscriber) Subscribe(ctx context.Context, topic string) (o <-chan *message.Message, err error) {
170-
if s.closed {
171+
if atomic.LoadUint32(&s.closed) == 1 {
171172
return nil, ErrSubscriberClosed
172173
}
173174

@@ -486,12 +487,10 @@ ResendLoop:
486487
}
487488

488489
func (s *Subscriber) Close() error {
489-
if s.closed {
490+
if !atomic.CompareAndSwapUint32(&s.closed, 0, 1) {
490491
return nil
491492
}
492493

493-
s.closed = true
494-
495494
close(s.closing)
496495
s.subscribeWg.Wait()
497496

0 commit comments

Comments
 (0)