From 168fe7f76bcfcef04f7d418e57d29e32c22c8937 Mon Sep 17 00:00:00 2001 From: FZambia Date: Thu, 25 Jul 2024 18:00:03 +0300 Subject: [PATCH 1/3] avoid panic on use after close, return error instead --- client.go | 23 +++++++++++++++++------ client_test.go | 24 +++++++++++++----------- cluster.go | 6 ++++-- cluster_test.go | 24 +++++++++++++----------- rueidis.go | 1 - 5 files changed, 47 insertions(+), 31 deletions(-) diff --git a/client.go b/client.go index 5eee918c..1eea7733 100644 --- a/client.go +++ b/client.go @@ -177,7 +177,9 @@ func (c *dedicatedSingleClient) B() Builder { func (c *dedicatedSingleClient) Do(ctx context.Context, cmd Completed) (resp RedisResult) { retry: - c.check() + if err := c.check(); err != nil { + return newErrResult(err) + } resp = c.wire.Do(ctx, cmd) if c.retry && cmd.IsReadOnly() && isRetryable(resp.NonRedisError(), c.wire, ctx) { goto retry @@ -197,7 +199,9 @@ func (c *dedicatedSingleClient) DoMulti(ctx context.Context, multi ...Completed) retryable = allReadOnly(multi) } retry: - c.check() + if err := c.check(); err != nil { + return fillErrs(len(multi), err) + } resp = c.wire.DoMulti(ctx, multi...).s if retryable && anyRetryable(resp, c.wire, ctx) { goto retry @@ -212,7 +216,9 @@ retry: func (c *dedicatedSingleClient) Receive(ctx context.Context, subscribe Completed, fn func(msg PubSubMessage)) (err error) { retry: - c.check() + if err := c.check(); err != nil { + return err + } err = c.wire.Receive(ctx, subscribe, fn) if c.retry { if _, ok := err.(*RedisError); !ok && isRetryable(err, c.wire, ctx) { @@ -226,7 +232,11 @@ retry: } func (c *dedicatedSingleClient) SetPubSubHooks(hooks PubSubHooks) <-chan error { - c.check() + if err := c.check(); err != nil { + ch := make(chan error, 1) + ch <- err + return ch + } return c.wire.SetPubSubHooks(hooks) } @@ -235,10 +245,11 @@ func (c *dedicatedSingleClient) Close() { c.release() } -func (c *dedicatedSingleClient) check() { +func (c *dedicatedSingleClient) check() error { if atomic.LoadUint32(&c.mark) != 0 { - panic(dedicatedClientUsedAfterReleased) + return ErrClosing } + return nil } func (c *dedicatedSingleClient) release() { diff --git a/client_test.go b/client_test.go index 1d7205ad..8b9a4774 100644 --- a/client_test.go +++ b/client_test.go @@ -548,10 +548,10 @@ func TestSingleClient(t *testing.T) { } }) - t.Run("Dedicate panic after released", func(t *testing.T) { + t.Run("Dedicate ErrClosing after released", func(t *testing.T) { m.AcquireFn = func() wire { return &mockWire{} } - check := func() { - if err := recover(); err != dedicatedClientUsedAfterReleased { + check := func(err error) { + if !errors.Is(err, ErrClosing) { t.Fatalf("unexpected err %v", err) } } @@ -567,20 +567,22 @@ func TestSingleClient(t *testing.T) { closeFn(c, cancel) for _, fn := range []func(){ func() { - defer check() - c.Do(context.Background(), c.B().Get().Key("k").Build()) + resp := c.Do(context.Background(), c.B().Get().Key("k").Build()) + check(resp.Error()) }, func() { - defer check() - c.DoMulti(context.Background(), c.B().Get().Key("k").Build()) + resp := c.DoMulti(context.Background(), c.B().Get().Key("k").Build()) + for _, r := range resp { + check(r.Error()) + } }, func() { - defer check() - c.Receive(context.Background(), c.B().Subscribe().Channel("k").Build(), func(msg PubSubMessage) {}) + err := c.Receive(context.Background(), c.B().Subscribe().Channel("k").Build(), func(msg PubSubMessage) {}) + check(err) }, func() { - defer check() - c.SetPubSubHooks(PubSubHooks{}) + ch := c.SetPubSubHooks(PubSubHooks{}) + check(<-ch) }, } { fn() diff --git a/cluster.go b/cluster.go index 679ddebd..26bf9cca 100644 --- a/cluster.go +++ b/cluster.go @@ -1111,7 +1111,7 @@ func (c *dedicatedClusterClient) acquire(ctx context.Context, slot uint16) (wire c.mu.Lock() defer c.mu.Unlock() if c.mark { - panic(dedicatedClientUsedAfterReleased) + return nil, ErrClosing } if c.slot == cmds.NoSlot { c.slot = slot @@ -1241,7 +1241,9 @@ func (c *dedicatedClusterClient) SetPubSubHooks(hooks PubSubHooks) <-chan error c.mu.Lock() defer c.mu.Unlock() if c.mark { - panic(dedicatedClientUsedAfterReleased) + ch := make(chan error, 1) + ch <- ErrClosing + return ch } if p := c.pshks; p != nil { c.pshks = nil diff --git a/cluster_test.go b/cluster_test.go index 3cd4cb07..49f9aea3 100644 --- a/cluster_test.go +++ b/cluster_test.go @@ -1520,9 +1520,9 @@ func TestClusterClient(t *testing.T) { } }) - t.Run("Dedicate panic after released", func(t *testing.T) { - check := func() { - if err := recover(); err != dedicatedClientUsedAfterReleased { + t.Run("Dedicate ErrClosing after released", func(t *testing.T) { + check := func(err error) { + if !errors.Is(err, ErrClosing) { t.Fatalf("unexpected err %v", err) } } @@ -1538,20 +1538,22 @@ func TestClusterClient(t *testing.T) { closeFn(c, cancel) for _, fn := range []func(){ func() { - defer check() - c.Do(context.Background(), c.B().Get().Key("k").Build()) + resp := c.Do(context.Background(), c.B().Get().Key("k").Build()) + check(resp.Error()) }, func() { - defer check() - c.DoMulti(context.Background(), c.B().Get().Key("k").Build()) + resp := c.DoMulti(context.Background(), c.B().Get().Key("k").Build()) + for _, r := range resp { + check(r.Error()) + } }, func() { - defer check() - c.Receive(context.Background(), c.B().Subscribe().Channel("k").Build(), func(msg PubSubMessage) {}) + err := c.Receive(context.Background(), c.B().Subscribe().Channel("k").Build(), func(msg PubSubMessage) {}) + check(err) }, func() { - defer check() - c.SetPubSubHooks(PubSubHooks{}) + ch := c.SetPubSubHooks(PubSubHooks{}) + check(<-ch) }, } { fn() diff --git a/rueidis.go b/rueidis.go index 71d13657..baefdd88 100644 --- a/rueidis.go +++ b/rueidis.go @@ -392,4 +392,3 @@ func dial(dst string, opt *ClientOption) (conn net.Conn, err error) { } const redisErrMsgCommandNotAllow = "command is not allowed" -const dedicatedClientUsedAfterReleased = "DedicatedClient should not be used after recycled" From c4e69cd4dbdac0e8bb9e038ce62c25c3b6226f03 Mon Sep 17 00:00:00 2001 From: FZambia Date: Fri, 26 Jul 2024 11:46:52 +0300 Subject: [PATCH 2/3] introduce ErrDedicatedClientRecycled --- client.go | 2 +- client_test.go | 4 ++-- cluster.go | 4 ++-- cluster_test.go | 4 ++-- rueidis.go | 2 ++ 5 files changed, 9 insertions(+), 7 deletions(-) diff --git a/client.go b/client.go index 1eea7733..1341b043 100644 --- a/client.go +++ b/client.go @@ -247,7 +247,7 @@ func (c *dedicatedSingleClient) Close() { func (c *dedicatedSingleClient) check() error { if atomic.LoadUint32(&c.mark) != 0 { - return ErrClosing + return ErrDedicatedClientRecycled } return nil } diff --git a/client_test.go b/client_test.go index 8b9a4774..d97200eb 100644 --- a/client_test.go +++ b/client_test.go @@ -548,10 +548,10 @@ func TestSingleClient(t *testing.T) { } }) - t.Run("Dedicate ErrClosing after released", func(t *testing.T) { + t.Run("Dedicate ErrDedicatedClientRecycled after released", func(t *testing.T) { m.AcquireFn = func() wire { return &mockWire{} } check := func(err error) { - if !errors.Is(err, ErrClosing) { + if !errors.Is(err, ErrDedicatedClientRecycled) { t.Fatalf("unexpected err %v", err) } } diff --git a/cluster.go b/cluster.go index 26bf9cca..e5a8bb2d 100644 --- a/cluster.go +++ b/cluster.go @@ -1111,7 +1111,7 @@ func (c *dedicatedClusterClient) acquire(ctx context.Context, slot uint16) (wire c.mu.Lock() defer c.mu.Unlock() if c.mark { - return nil, ErrClosing + return nil, ErrDedicatedClientRecycled } if c.slot == cmds.NoSlot { c.slot = slot @@ -1242,7 +1242,7 @@ func (c *dedicatedClusterClient) SetPubSubHooks(hooks PubSubHooks) <-chan error defer c.mu.Unlock() if c.mark { ch := make(chan error, 1) - ch <- ErrClosing + ch <- ErrDedicatedClientRecycled return ch } if p := c.pshks; p != nil { diff --git a/cluster_test.go b/cluster_test.go index 49f9aea3..33b54aef 100644 --- a/cluster_test.go +++ b/cluster_test.go @@ -1520,9 +1520,9 @@ func TestClusterClient(t *testing.T) { } }) - t.Run("Dedicate ErrClosing after released", func(t *testing.T) { + t.Run("Dedicate ErrDedicatedClientRecycled after released", func(t *testing.T) { check := func(err error) { - if !errors.Is(err, ErrClosing) { + if !errors.Is(err, ErrDedicatedClientRecycled) { t.Fatalf("unexpected err %v", err) } } diff --git a/rueidis.go b/rueidis.go index baefdd88..9ae77ebc 100644 --- a/rueidis.go +++ b/rueidis.go @@ -51,6 +51,8 @@ var ( ErrReplicaOnlyNotSupported = errors.New("ReplicaOnly is not supported for single client") // ErrWrongPipelineMultiplex means wrong value for ClientOption.PipelineMultiplex ErrWrongPipelineMultiplex = errors.New("ClientOption.PipelineMultiplex must not be bigger than MaxPipelineMultiplex") + // ErrDedicatedClientRecycled means the caller attempted to use the client which has been already recycled (after canceled/closed). + ErrDedicatedClientRecycled = errors.New("dedicated client should not be used after recycled") ) // ClientOption should be passed to NewClient to construct a Client From 8fd499529e60419bd513901b5fdff7973f360380 Mon Sep 17 00:00:00 2001 From: FZambia Date: Fri, 26 Jul 2024 12:12:37 +0300 Subject: [PATCH 3/3] better comment for ErrDedicatedClientRecycled --- rueidis.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rueidis.go b/rueidis.go index 9ae77ebc..b066fa7c 100644 --- a/rueidis.go +++ b/rueidis.go @@ -51,7 +51,7 @@ var ( ErrReplicaOnlyNotSupported = errors.New("ReplicaOnly is not supported for single client") // ErrWrongPipelineMultiplex means wrong value for ClientOption.PipelineMultiplex ErrWrongPipelineMultiplex = errors.New("ClientOption.PipelineMultiplex must not be bigger than MaxPipelineMultiplex") - // ErrDedicatedClientRecycled means the caller attempted to use the client which has been already recycled (after canceled/closed). + // ErrDedicatedClientRecycled means the caller attempted to use the dedicated client which has been already recycled (after canceled/closed). ErrDedicatedClientRecycled = errors.New("dedicated client should not be used after recycled") )