From 73544b308ef686609c84bc5a4fb4eb67941bfc9e Mon Sep 17 00:00:00 2001 From: Rueian Date: Thu, 17 Oct 2024 21:04:54 -0700 Subject: [PATCH 1/6] perf: use less memory Signed-off-by: Rueian --- pipe.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pipe.go b/pipe.go index eee7c6f3..01068fda 100644 --- a/pipe.go +++ b/pipe.go @@ -113,7 +113,7 @@ func _newPipe(connFn func() (net.Conn, error), option *ClientOption, r2ps, nobg return _newPipe(connFn, option, true, nobg) } } - if !option.DisableCache { + if !nobg && !option.DisableCache { cacheStoreFn := option.NewCacheStoreFn if cacheStoreFn == nil { cacheStoreFn = newLRU From 28fba8ca2acaf97a08392594501c68c732fa4930 Mon Sep 17 00:00:00 2001 From: Rueian Date: Sun, 20 Oct 2024 12:08:05 -0700 Subject: [PATCH 2/6] feat: add DisableTCPNoDelay (#650) * feat: add DisableTCPNoDelay to turn on Nagle's algorithm Signed-off-by: Rueian * perf: turn off TCP NoDelay on pipeline mode Signed-off-by: Rueian --------- Signed-off-by: Rueian --- pipe.go | 5 +++++ rueidis.go | 5 +++++ 2 files changed, 10 insertions(+) diff --git a/pipe.go b/pipe.go index 01068fda..e54d94d6 100644 --- a/pipe.go +++ b/pipe.go @@ -73,6 +73,7 @@ type pipe struct { waits int32 recvs int32 r2ps bool // identify this pipe is used for resp2 pubsub or not + noNoDelay bool } type pipeFn func(connFn func() (net.Conn, error), option *ClientOption) (p *pipe, err error) @@ -98,6 +99,7 @@ func _newPipe(connFn func() (net.Conn, error), option *ClientOption, r2ps, nobg timeout: option.ConnWriteTimeout, pinggap: option.Dialer.KeepAlive, maxFlushDelay: option.MaxFlushDelay, + noNoDelay: option.DisableTCPNoDelay, r2ps: r2ps, } @@ -321,6 +323,9 @@ func (p *pipe) _exit(err error) { func (p *pipe) _background() { p.conn.SetDeadline(time.Time{}) + if conn, ok := p.conn.(*net.TCPConn); ok && p.noNoDelay { + conn.SetNoDelay(false) + } go func() { p._exit(p._backgroundWrite()) close(p.close) diff --git a/rueidis.go b/rueidis.go index 0fb8f62c..4e17104b 100644 --- a/rueidis.go +++ b/rueidis.go @@ -161,6 +161,11 @@ type ClientOption struct { // produce notable CPU usage reduction under load. Ref: https://github.com/redis/rueidis/issues/156 MaxFlushDelay time.Duration + // DisableTCPNoDelay turns on Nagle's algorithm in pipelining mode by using conn.SetNoDelay(false). + // Turning this on can result in lower p99 latencies and lower CPU usages if all your requests are small. + // But if you have large requests or fast network, this might degrade the performance. Ref: https://github.com/redis/rueidis/pull/650 + DisableTCPNoDelay bool + // ShuffleInit is a handy flag that shuffles the InitAddress after passing to the NewClient() if it is true ShuffleInit bool // ClientNoTouch controls whether commands alter LRU/LFU stats From a910fe89f277c74e2e7e73982a739b23a674dade Mon Sep 17 00:00:00 2001 From: Rueian Date: Sun, 20 Oct 2024 14:39:41 -0700 Subject: [PATCH 3/6] perf: improve memory efficiency of the connection pool Signed-off-by: Rueian --- pool.go | 6 ++++-- rueidis.go | 2 +- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/pool.go b/pool.go index d45ede84..f0a69c63 100644 --- a/pool.go +++ b/pool.go @@ -9,9 +9,10 @@ func newPool(cap int, dead wire, makeFn func() wire) *pool { return &pool{ size: 0, + cap: cap, dead: dead, make: makeFn, - list: make([]wire, 0, cap), + list: make([]wire, 0, 4), cond: sync.NewCond(&sync.Mutex{}), } } @@ -22,12 +23,13 @@ type pool struct { make func() wire list []wire size int + cap int down bool } func (p *pool) Acquire() (v wire) { p.cond.L.Lock() - for len(p.list) == 0 && p.size == cap(p.list) && !p.down { + for len(p.list) == 0 && p.size == p.cap && !p.down { p.cond.Wait() } if p.down { diff --git a/rueidis.go b/rueidis.go index 4e17104b..d0919a1d 100644 --- a/rueidis.go +++ b/rueidis.go @@ -22,7 +22,7 @@ const ( // DefaultRingScale is the default value of ClientOption.RingScaleEachConn, which results into having a ring of size 2^10 for each connection DefaultRingScale = 10 // DefaultPoolSize is the default value of ClientOption.BlockingPoolSize - DefaultPoolSize = 1000 + DefaultPoolSize = 1024 // DefaultBlockingPipeline is the default value of ClientOption.BlockingPipeline DefaultBlockingPipeline = 2000 // DefaultDialTimeout is the default value of ClientOption.Dialer.Timeout From bbcfadd81c150c281db2c6ae62c81d124f84e748 Mon Sep 17 00:00:00 2001 From: Rueian Date: Mon, 21 Oct 2024 12:19:43 -0700 Subject: [PATCH 4/6] feat: bump v1.0.48 Signed-off-by: Rueian --- mock/go.mod | 2 +- om/go.mod | 2 +- pipe.go | 2 +- rueidiscompat/go.mod | 4 ++-- rueidishook/go.mod | 4 ++-- rueidisotel/go.mod | 2 +- rueidisprob/go.mod | 2 +- 7 files changed, 9 insertions(+), 9 deletions(-) diff --git a/mock/go.mod b/mock/go.mod index c0bfbf93..ecf194f4 100644 --- a/mock/go.mod +++ b/mock/go.mod @@ -5,7 +5,7 @@ go 1.21 replace github.com/redis/rueidis => ../ require ( - github.com/redis/rueidis v1.0.47 + github.com/redis/rueidis v1.0.48 go.uber.org/mock v0.4.0 ) diff --git a/om/go.mod b/om/go.mod index 44355263..f962427c 100644 --- a/om/go.mod +++ b/om/go.mod @@ -6,7 +6,7 @@ replace github.com/redis/rueidis => ../ require ( github.com/oklog/ulid/v2 v2.1.0 - github.com/redis/rueidis v1.0.47 + github.com/redis/rueidis v1.0.48 ) require golang.org/x/sys v0.24.0 // indirect diff --git a/pipe.go b/pipe.go index e54d94d6..c24802b4 100644 --- a/pipe.go +++ b/pipe.go @@ -20,7 +20,7 @@ import ( ) const LibName = "rueidis" -const LibVer = "1.0.47" +const LibVer = "1.0.48" var noHello = regexp.MustCompile("unknown command .?(HELLO|hello).?") diff --git a/rueidiscompat/go.mod b/rueidiscompat/go.mod index 09665d06..d23bc84c 100644 --- a/rueidiscompat/go.mod +++ b/rueidiscompat/go.mod @@ -9,8 +9,8 @@ replace github.com/redis/rueidis/mock => ../mock require ( github.com/onsi/ginkgo/v2 v2.20.1 github.com/onsi/gomega v1.34.1 - github.com/redis/rueidis v1.0.47 - github.com/redis/rueidis/mock v1.0.47 + github.com/redis/rueidis v1.0.48 + github.com/redis/rueidis/mock v1.0.48 go.uber.org/mock v0.4.0 ) diff --git a/rueidishook/go.mod b/rueidishook/go.mod index 8259fc17..95aa4ddd 100644 --- a/rueidishook/go.mod +++ b/rueidishook/go.mod @@ -8,8 +8,8 @@ replace ( ) require ( - github.com/redis/rueidis v1.0.47 - github.com/redis/rueidis/mock v1.0.47 + github.com/redis/rueidis v1.0.48 + github.com/redis/rueidis/mock v1.0.48 go.uber.org/mock v0.4.0 ) diff --git a/rueidisotel/go.mod b/rueidisotel/go.mod index 59ae91da..72bb6dba 100644 --- a/rueidisotel/go.mod +++ b/rueidisotel/go.mod @@ -5,7 +5,7 @@ go 1.21 replace github.com/redis/rueidis => ../ require ( - github.com/redis/rueidis v1.0.47 + github.com/redis/rueidis v1.0.48 go.opentelemetry.io/otel v1.28.0 go.opentelemetry.io/otel/metric v1.28.0 go.opentelemetry.io/otel/sdk v1.28.0 diff --git a/rueidisprob/go.mod b/rueidisprob/go.mod index 4552ef94..e95435a9 100644 --- a/rueidisprob/go.mod +++ b/rueidisprob/go.mod @@ -5,7 +5,7 @@ go 1.21 replace github.com/redis/rueidis => ../ require ( - github.com/redis/rueidis v1.0.47 + github.com/redis/rueidis v1.0.48 github.com/twmb/murmur3 v1.1.8 ) From 2b51b4260b05d9631a60a594107620a7e1e68e8f Mon Sep 17 00:00:00 2001 From: Rueian Date: Thu, 24 Oct 2024 09:59:42 -0700 Subject: [PATCH 5/6] chore: remove dead code Signed-off-by: Rueian --- client.go | 9 --------- 1 file changed, 9 deletions(-) diff --git a/client.go b/client.go index 5cfb2f44..0f4c9b5d 100644 --- a/client.go +++ b/client.go @@ -319,15 +319,6 @@ func isRetryable(err error, w wire, ctx context.Context) bool { return err != nil && w.Error() == nil && ctx.Err() == nil } -func anyRetryable(resp []RedisResult, w wire, ctx context.Context) bool { - for _, r := range resp { - if isRetryable(r.NonRedisError(), w, ctx) { - return true - } - } - return false -} - func allReadOnly(multi []Completed) bool { for _, cmd := range multi { if cmd.IsWrite() { From 192874902372ee252f18780ec2b1c8ca4ee36c35 Mon Sep 17 00:00:00 2001 From: Rueian Date: Thu, 24 Oct 2024 10:01:07 -0700 Subject: [PATCH 6/6] perf: runtime.Gosched() if RetryDelay returns 0 Signed-off-by: Rueian --- retry.go | 40 ++++++++++++++++++---------------------- retry_test.go | 13 +++++++++++++ 2 files changed, 31 insertions(+), 22 deletions(-) diff --git a/retry.go b/retry.go index 1f50bab2..5b116650 100644 --- a/retry.go +++ b/retry.go @@ -2,6 +2,7 @@ package rueidis import ( "context" + "runtime" "time" "github.com/redis/rueidis/internal/util" @@ -57,36 +58,31 @@ func (r *retryer) RetryDelay(attempts int, cmd Completed, err error) time.Durati } func (r *retryer) WaitForRetry(ctx context.Context, duration time.Duration) { - if duration <= 0 { - return - } - - if ch := ctx.Done(); ch != nil { - tm := time.NewTimer(duration) - defer tm.Stop() - select { - case <-ch: - case <-tm.C: + if duration > 0 { + if ch := ctx.Done(); ch != nil { + tm := time.NewTimer(duration) + defer tm.Stop() + select { + case <-ch: + case <-tm.C: + } + } else { + time.Sleep(duration) } - } else { - time.Sleep(duration) } } func (r *retryer) WaitOrSkipRetry( ctx context.Context, attempts int, cmd Completed, err error, ) bool { - delay := r.RetryDelay(attempts, cmd, err) - if delay < 0 { - return false - } - if delay == 0 { - return true - } - - if dl, ok := ctx.Deadline(); !ok || time.Until(dl) > delay { - r.WaitForRetry(ctx, delay) + if delay := r.RetryDelay(attempts, cmd, err); delay == 0 { + runtime.Gosched() return true + } else if delay > 0 { + if dl, ok := ctx.Deadline(); !ok || time.Until(dl) > delay { + r.WaitForRetry(ctx, delay) + return true + } } return false } diff --git a/retry_test.go b/retry_test.go index 765eb4fc..111e743f 100644 --- a/retry_test.go +++ b/retry_test.go @@ -124,6 +124,19 @@ func TestRetrier_WaitOrSkipRetry(t *testing.T) { } }) + t.Run("RetryDelayFn returns 0 delay", func(t *testing.T) { + r := &retryer{ + RetryDelayFn: func(attempts int, _ Completed, err error) time.Duration { + return 0 + }, + } + + shouldRetry := r.WaitOrSkipRetry(nil, 0, Completed{}, nil) + if !shouldRetry { + t.Error("WaitOrSkipRetry() = false; want true") + } + }) + t.Run("context is canceled", func(t *testing.T) { r := &retryer{ RetryDelayFn: func(attempts int, _ Completed, err error) time.Duration {