Skip to content

Commit

Permalink
feat: send DISCARD command when conn recycled (#580)
Browse files Browse the repository at this point in the history
* feat: send DISCARD command when conn recycled

* feat: add tests

* fix: add suggestion

Co-authored-by: Rueian <[email protected]>

* fix: DISCARD should reply a OK message instead of a push message.

---------

Co-authored-by: Anuragkillswitch <[email protected]>
Co-authored-by: Rueian <[email protected]>
  • Loading branch information
3 people authored Jun 27, 2024
1 parent a5b0dc5 commit 45cdf7e
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 7 deletions.
5 changes: 5 additions & 0 deletions internal/cmds/cmds.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,11 @@ var (
cs: newCommandSlice([]string{"UNSUBSCRIBE", "+sentinel", "+slave", "-sdown", "+sdown", "+switch-master", "+reboot"}),
cf: noRetTag,
}

// DiscardCmd is predefined DISCARD
DiscardCmd = Completed{
cs: newCommandSlice([]string{"DISCARD"}),
}
)

// ToBlock marks the command with blockTag
Expand Down
4 changes: 2 additions & 2 deletions pipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -751,9 +751,9 @@ func (p *pipe) Receive(ctx context.Context, subscribe Completed, fn func(message
func (p *pipe) CleanSubscriptions() {
if atomic.LoadInt32(&p.state) == 1 {
if p.version >= 7 {
p.DoMulti(context.Background(), cmds.UnsubscribeCmd, cmds.PUnsubscribeCmd, cmds.SUnsubscribeCmd)
p.DoMulti(context.Background(), cmds.UnsubscribeCmd, cmds.PUnsubscribeCmd, cmds.SUnsubscribeCmd, cmds.DiscardCmd)
} else {
p.DoMulti(context.Background(), cmds.UnsubscribeCmd, cmds.PUnsubscribeCmd)
p.DoMulti(context.Background(), cmds.UnsubscribeCmd, cmds.PUnsubscribeCmd, cmds.DiscardCmd)
}
}
}
Expand Down
14 changes: 9 additions & 5 deletions pipe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4119,7 +4119,7 @@ func TestPipe_CleanSubscriptions_6(t *testing.T) {
go func() {
p.CleanSubscriptions()
}()
mock.Expect("UNSUBSCRIBE").Expect("PUNSUBSCRIBE").Reply(
mock.Expect("UNSUBSCRIBE").Expect("PUNSUBSCRIBE").Expect("DISCARD").Reply(
RedisMessage{typ: '>', values: []RedisMessage{
{typ: '+', string: "unsubscribe"},
{typ: '_'},
Expand All @@ -4129,7 +4129,9 @@ func TestPipe_CleanSubscriptions_6(t *testing.T) {
{typ: '+', string: "punsubscribe"},
{typ: '_'},
{typ: ':', integer: 2},
}})
}},
RedisMessage{typ: '+', string: "OK"},
)
}

func TestPipe_CleanSubscriptions_7(t *testing.T) {
Expand All @@ -4141,7 +4143,7 @@ func TestPipe_CleanSubscriptions_7(t *testing.T) {
go func() {
p.CleanSubscriptions()
}()
mock.Expect("UNSUBSCRIBE").Expect("PUNSUBSCRIBE").Expect("SUNSUBSCRIBE").Reply(
mock.Expect("UNSUBSCRIBE").Expect("PUNSUBSCRIBE").Expect("SUNSUBSCRIBE").Expect("DISCARD").Reply(
RedisMessage{typ: '>', values: []RedisMessage{
{typ: '+', string: "unsubscribe"},
{typ: '_'},
Expand All @@ -4155,8 +4157,10 @@ func TestPipe_CleanSubscriptions_7(t *testing.T) {
RedisMessage{typ: '>', values: []RedisMessage{
{typ: '+', string: "sunsubscribe"},
{typ: '_'},
{typ: ':', integer: 2},
}})
{typ: ':', integer: 3},
}},
RedisMessage{typ: '+', string: "OK"},
)
}

func TestPingOnConnError(t *testing.T) {
Expand Down

0 comments on commit 45cdf7e

Please sign in to comment.