diff --git a/client.go b/client.go index 5cfb2f44..0f4c9b5d 100644 --- a/client.go +++ b/client.go @@ -319,15 +319,6 @@ func isRetryable(err error, w wire, ctx context.Context) bool { return err != nil && w.Error() == nil && ctx.Err() == nil } -func anyRetryable(resp []RedisResult, w wire, ctx context.Context) bool { - for _, r := range resp { - if isRetryable(r.NonRedisError(), w, ctx) { - return true - } - } - return false -} - func allReadOnly(multi []Completed) bool { for _, cmd := range multi { if cmd.IsWrite() { diff --git a/mock/go.mod b/mock/go.mod index c0bfbf93..ecf194f4 100644 --- a/mock/go.mod +++ b/mock/go.mod @@ -5,7 +5,7 @@ go 1.21 replace github.com/redis/rueidis => ../ require ( - github.com/redis/rueidis v1.0.47 + github.com/redis/rueidis v1.0.48 go.uber.org/mock v0.4.0 ) diff --git a/om/go.mod b/om/go.mod index 44355263..f962427c 100644 --- a/om/go.mod +++ b/om/go.mod @@ -6,7 +6,7 @@ replace github.com/redis/rueidis => ../ require ( github.com/oklog/ulid/v2 v2.1.0 - github.com/redis/rueidis v1.0.47 + github.com/redis/rueidis v1.0.48 ) require golang.org/x/sys v0.24.0 // indirect diff --git a/pipe.go b/pipe.go index eee7c6f3..c24802b4 100644 --- a/pipe.go +++ b/pipe.go @@ -20,7 +20,7 @@ import ( ) const LibName = "rueidis" -const LibVer = "1.0.47" +const LibVer = "1.0.48" var noHello = regexp.MustCompile("unknown command .?(HELLO|hello).?") @@ -73,6 +73,7 @@ type pipe struct { waits int32 recvs int32 r2ps bool // identify this pipe is used for resp2 pubsub or not + noNoDelay bool } type pipeFn func(connFn func() (net.Conn, error), option *ClientOption) (p *pipe, err error) @@ -98,6 +99,7 @@ func _newPipe(connFn func() (net.Conn, error), option *ClientOption, r2ps, nobg timeout: option.ConnWriteTimeout, pinggap: option.Dialer.KeepAlive, maxFlushDelay: option.MaxFlushDelay, + noNoDelay: option.DisableTCPNoDelay, r2ps: r2ps, } @@ -113,7 +115,7 @@ func _newPipe(connFn func() (net.Conn, error), option *ClientOption, r2ps, nobg return _newPipe(connFn, option, true, nobg) } } - if !option.DisableCache { + if !nobg && !option.DisableCache { cacheStoreFn := option.NewCacheStoreFn if cacheStoreFn == nil { cacheStoreFn = newLRU @@ -321,6 +323,9 @@ func (p *pipe) _exit(err error) { func (p *pipe) _background() { p.conn.SetDeadline(time.Time{}) + if conn, ok := p.conn.(*net.TCPConn); ok && p.noNoDelay { + conn.SetNoDelay(false) + } go func() { p._exit(p._backgroundWrite()) close(p.close) diff --git a/pool.go b/pool.go index d45ede84..f0a69c63 100644 --- a/pool.go +++ b/pool.go @@ -9,9 +9,10 @@ func newPool(cap int, dead wire, makeFn func() wire) *pool { return &pool{ size: 0, + cap: cap, dead: dead, make: makeFn, - list: make([]wire, 0, cap), + list: make([]wire, 0, 4), cond: sync.NewCond(&sync.Mutex{}), } } @@ -22,12 +23,13 @@ type pool struct { make func() wire list []wire size int + cap int down bool } func (p *pool) Acquire() (v wire) { p.cond.L.Lock() - for len(p.list) == 0 && p.size == cap(p.list) && !p.down { + for len(p.list) == 0 && p.size == p.cap && !p.down { p.cond.Wait() } if p.down { diff --git a/retry.go b/retry.go index 1f50bab2..5b116650 100644 --- a/retry.go +++ b/retry.go @@ -2,6 +2,7 @@ package rueidis import ( "context" + "runtime" "time" "github.com/redis/rueidis/internal/util" @@ -57,36 +58,31 @@ func (r *retryer) RetryDelay(attempts int, cmd Completed, err error) time.Durati } func (r *retryer) WaitForRetry(ctx context.Context, duration time.Duration) { - if duration <= 0 { - return - } - - if ch := ctx.Done(); ch != nil { - tm := time.NewTimer(duration) - defer tm.Stop() - select { - case <-ch: - case <-tm.C: + if duration > 0 { + if ch := ctx.Done(); ch != nil { + tm := time.NewTimer(duration) + defer tm.Stop() + select { + case <-ch: + case <-tm.C: + } + } else { + time.Sleep(duration) } - } else { - time.Sleep(duration) } } func (r *retryer) WaitOrSkipRetry( ctx context.Context, attempts int, cmd Completed, err error, ) bool { - delay := r.RetryDelay(attempts, cmd, err) - if delay < 0 { - return false - } - if delay == 0 { - return true - } - - if dl, ok := ctx.Deadline(); !ok || time.Until(dl) > delay { - r.WaitForRetry(ctx, delay) + if delay := r.RetryDelay(attempts, cmd, err); delay == 0 { + runtime.Gosched() return true + } else if delay > 0 { + if dl, ok := ctx.Deadline(); !ok || time.Until(dl) > delay { + r.WaitForRetry(ctx, delay) + return true + } } return false } diff --git a/retry_test.go b/retry_test.go index 765eb4fc..111e743f 100644 --- a/retry_test.go +++ b/retry_test.go @@ -124,6 +124,19 @@ func TestRetrier_WaitOrSkipRetry(t *testing.T) { } }) + t.Run("RetryDelayFn returns 0 delay", func(t *testing.T) { + r := &retryer{ + RetryDelayFn: func(attempts int, _ Completed, err error) time.Duration { + return 0 + }, + } + + shouldRetry := r.WaitOrSkipRetry(nil, 0, Completed{}, nil) + if !shouldRetry { + t.Error("WaitOrSkipRetry() = false; want true") + } + }) + t.Run("context is canceled", func(t *testing.T) { r := &retryer{ RetryDelayFn: func(attempts int, _ Completed, err error) time.Duration { diff --git a/rueidis.go b/rueidis.go index 0fb8f62c..d0919a1d 100644 --- a/rueidis.go +++ b/rueidis.go @@ -22,7 +22,7 @@ const ( // DefaultRingScale is the default value of ClientOption.RingScaleEachConn, which results into having a ring of size 2^10 for each connection DefaultRingScale = 10 // DefaultPoolSize is the default value of ClientOption.BlockingPoolSize - DefaultPoolSize = 1000 + DefaultPoolSize = 1024 // DefaultBlockingPipeline is the default value of ClientOption.BlockingPipeline DefaultBlockingPipeline = 2000 // DefaultDialTimeout is the default value of ClientOption.Dialer.Timeout @@ -161,6 +161,11 @@ type ClientOption struct { // produce notable CPU usage reduction under load. Ref: https://github.com/redis/rueidis/issues/156 MaxFlushDelay time.Duration + // DisableTCPNoDelay turns on Nagle's algorithm in pipelining mode by using conn.SetNoDelay(false). + // Turning this on can result in lower p99 latencies and lower CPU usages if all your requests are small. + // But if you have large requests or fast network, this might degrade the performance. Ref: https://github.com/redis/rueidis/pull/650 + DisableTCPNoDelay bool + // ShuffleInit is a handy flag that shuffles the InitAddress after passing to the NewClient() if it is true ShuffleInit bool // ClientNoTouch controls whether commands alter LRU/LFU stats diff --git a/rueidiscompat/go.mod b/rueidiscompat/go.mod index 09665d06..d23bc84c 100644 --- a/rueidiscompat/go.mod +++ b/rueidiscompat/go.mod @@ -9,8 +9,8 @@ replace github.com/redis/rueidis/mock => ../mock require ( github.com/onsi/ginkgo/v2 v2.20.1 github.com/onsi/gomega v1.34.1 - github.com/redis/rueidis v1.0.47 - github.com/redis/rueidis/mock v1.0.47 + github.com/redis/rueidis v1.0.48 + github.com/redis/rueidis/mock v1.0.48 go.uber.org/mock v0.4.0 ) diff --git a/rueidishook/go.mod b/rueidishook/go.mod index 8259fc17..95aa4ddd 100644 --- a/rueidishook/go.mod +++ b/rueidishook/go.mod @@ -8,8 +8,8 @@ replace ( ) require ( - github.com/redis/rueidis v1.0.47 - github.com/redis/rueidis/mock v1.0.47 + github.com/redis/rueidis v1.0.48 + github.com/redis/rueidis/mock v1.0.48 go.uber.org/mock v0.4.0 ) diff --git a/rueidisotel/go.mod b/rueidisotel/go.mod index 59ae91da..72bb6dba 100644 --- a/rueidisotel/go.mod +++ b/rueidisotel/go.mod @@ -5,7 +5,7 @@ go 1.21 replace github.com/redis/rueidis => ../ require ( - github.com/redis/rueidis v1.0.47 + github.com/redis/rueidis v1.0.48 go.opentelemetry.io/otel v1.28.0 go.opentelemetry.io/otel/metric v1.28.0 go.opentelemetry.io/otel/sdk v1.28.0 diff --git a/rueidisprob/go.mod b/rueidisprob/go.mod index 4552ef94..e95435a9 100644 --- a/rueidisprob/go.mod +++ b/rueidisprob/go.mod @@ -5,7 +5,7 @@ go 1.21 replace github.com/redis/rueidis => ../ require ( - github.com/redis/rueidis v1.0.47 + github.com/redis/rueidis v1.0.48 github.com/twmb/murmur3 v1.1.8 )