Skip to content

Commit

Permalink
fix: close connections that are still blocked while recycling them to…
Browse files Browse the repository at this point in the history
… pools (#667)

Signed-off-by: Rueian <[email protected]>
  • Loading branch information
rueian authored Nov 11, 2024
1 parent b9b810e commit adffb5f
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 1 deletion.
4 changes: 3 additions & 1 deletion pipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -741,7 +741,9 @@ func (p *pipe) Receive(ctx context.Context, subscribe Completed, fn func(message
}

func (p *pipe) CleanSubscriptions() {
if atomic.LoadInt32(&p.state) == 1 {
if atomic.LoadInt32(&p.blcksig) != 0 {
p.Close()
} else if atomic.LoadInt32(&p.state) == 1 {
if p.version >= 7 {
p.DoMulti(context.Background(), cmds.UnsubscribeCmd, cmds.PUnsubscribeCmd, cmds.SUnsubscribeCmd, cmds.DiscardCmd)
} else {
Expand Down
17 changes: 17 additions & 0 deletions pipe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4285,6 +4285,23 @@ func TestPipe_CleanSubscriptions_6(t *testing.T) {
)
}

func TestPipe_CleanSubscriptions_Blocking(t *testing.T) {
defer ShouldNotLeaked(SetupLeakDetection())
p, mock, cancel, _ := setup(t, ClientOption{ConnWriteTimeout: time.Second / 2, Dialer: net.Dialer{KeepAlive: time.Second / 3}})
defer cancel()
p.background()
ctx, cancel := context.WithCancel(context.Background())
go func() {
mock.Expect("BLPOP")
cancel()
}()
p.Do(ctx, cmds.NewBlockingCompleted([]string{"BLPOP"}))
p.CleanSubscriptions()
if p.Error() != ErrClosing {
t.Fatal("unexpected error")
}
}

func TestPipe_CleanSubscriptions_7(t *testing.T) {
defer ShouldNotLeaked(SetupLeakDetection())
p, mock, cancel, _ := setup(t, ClientOption{ConnWriteTimeout: time.Second / 2, Dialer: net.Dialer{KeepAlive: time.Second / 3}})
Expand Down

0 comments on commit adffb5f

Please sign in to comment.