Skip to content

Commit

Permalink
Merge branch 'redis:main' into issues/644
Browse files Browse the repository at this point in the history
  • Loading branch information
mingdaoy authored Oct 25, 2024
2 parents 76fe792 + 1928749 commit 80a3725
Show file tree
Hide file tree
Showing 12 changed files with 56 additions and 44 deletions.
9 changes: 0 additions & 9 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,15 +319,6 @@ func isRetryable(err error, w wire, ctx context.Context) bool {
return err != nil && w.Error() == nil && ctx.Err() == nil
}

func anyRetryable(resp []RedisResult, w wire, ctx context.Context) bool {
for _, r := range resp {
if isRetryable(r.NonRedisError(), w, ctx) {
return true
}
}
return false
}

func allReadOnly(multi []Completed) bool {
for _, cmd := range multi {
if cmd.IsWrite() {
Expand Down
2 changes: 1 addition & 1 deletion mock/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go 1.21
replace github.com/redis/rueidis => ../

require (
github.com/redis/rueidis v1.0.47
github.com/redis/rueidis v1.0.48
go.uber.org/mock v0.4.0
)

Expand Down
2 changes: 1 addition & 1 deletion om/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ replace github.com/redis/rueidis => ../

require (
github.com/oklog/ulid/v2 v2.1.0
github.com/redis/rueidis v1.0.47
github.com/redis/rueidis v1.0.48
)

require golang.org/x/sys v0.24.0 // indirect
9 changes: 7 additions & 2 deletions pipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
)

const LibName = "rueidis"
const LibVer = "1.0.47"
const LibVer = "1.0.48"

var noHello = regexp.MustCompile("unknown command .?(HELLO|hello).?")

Expand Down Expand Up @@ -73,6 +73,7 @@ type pipe struct {
waits int32
recvs int32
r2ps bool // identify this pipe is used for resp2 pubsub or not
noNoDelay bool
}

type pipeFn func(connFn func() (net.Conn, error), option *ClientOption) (p *pipe, err error)
Expand All @@ -98,6 +99,7 @@ func _newPipe(connFn func() (net.Conn, error), option *ClientOption, r2ps, nobg
timeout: option.ConnWriteTimeout,
pinggap: option.Dialer.KeepAlive,
maxFlushDelay: option.MaxFlushDelay,
noNoDelay: option.DisableTCPNoDelay,

r2ps: r2ps,
}
Expand All @@ -113,7 +115,7 @@ func _newPipe(connFn func() (net.Conn, error), option *ClientOption, r2ps, nobg
return _newPipe(connFn, option, true, nobg)
}
}
if !option.DisableCache {
if !nobg && !option.DisableCache {
cacheStoreFn := option.NewCacheStoreFn
if cacheStoreFn == nil {
cacheStoreFn = newLRU
Expand Down Expand Up @@ -321,6 +323,9 @@ func (p *pipe) _exit(err error) {

func (p *pipe) _background() {
p.conn.SetDeadline(time.Time{})
if conn, ok := p.conn.(*net.TCPConn); ok && p.noNoDelay {
conn.SetNoDelay(false)
}
go func() {
p._exit(p._backgroundWrite())
close(p.close)
Expand Down
6 changes: 4 additions & 2 deletions pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@ func newPool(cap int, dead wire, makeFn func() wire) *pool {

return &pool{
size: 0,
cap: cap,
dead: dead,
make: makeFn,
list: make([]wire, 0, cap),
list: make([]wire, 0, 4),
cond: sync.NewCond(&sync.Mutex{}),
}
}
Expand All @@ -22,12 +23,13 @@ type pool struct {
make func() wire
list []wire
size int
cap int
down bool
}

func (p *pool) Acquire() (v wire) {
p.cond.L.Lock()
for len(p.list) == 0 && p.size == cap(p.list) && !p.down {
for len(p.list) == 0 && p.size == p.cap && !p.down {
p.cond.Wait()
}
if p.down {
Expand Down
40 changes: 18 additions & 22 deletions retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package rueidis

import (
"context"
"runtime"
"time"

"github.com/redis/rueidis/internal/util"
Expand Down Expand Up @@ -57,36 +58,31 @@ func (r *retryer) RetryDelay(attempts int, cmd Completed, err error) time.Durati
}

func (r *retryer) WaitForRetry(ctx context.Context, duration time.Duration) {
if duration <= 0 {
return
}

if ch := ctx.Done(); ch != nil {
tm := time.NewTimer(duration)
defer tm.Stop()
select {
case <-ch:
case <-tm.C:
if duration > 0 {
if ch := ctx.Done(); ch != nil {
tm := time.NewTimer(duration)
defer tm.Stop()
select {
case <-ch:
case <-tm.C:
}
} else {
time.Sleep(duration)
}
} else {
time.Sleep(duration)
}
}

func (r *retryer) WaitOrSkipRetry(
ctx context.Context, attempts int, cmd Completed, err error,
) bool {
delay := r.RetryDelay(attempts, cmd, err)
if delay < 0 {
return false
}
if delay == 0 {
return true
}

if dl, ok := ctx.Deadline(); !ok || time.Until(dl) > delay {
r.WaitForRetry(ctx, delay)
if delay := r.RetryDelay(attempts, cmd, err); delay == 0 {
runtime.Gosched()
return true
} else if delay > 0 {
if dl, ok := ctx.Deadline(); !ok || time.Until(dl) > delay {
r.WaitForRetry(ctx, delay)
return true
}
}
return false
}
13 changes: 13 additions & 0 deletions retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,19 @@ func TestRetrier_WaitOrSkipRetry(t *testing.T) {
}
})

t.Run("RetryDelayFn returns 0 delay", func(t *testing.T) {
r := &retryer{
RetryDelayFn: func(attempts int, _ Completed, err error) time.Duration {
return 0
},
}

shouldRetry := r.WaitOrSkipRetry(nil, 0, Completed{}, nil)
if !shouldRetry {
t.Error("WaitOrSkipRetry() = false; want true")
}
})

t.Run("context is canceled", func(t *testing.T) {
r := &retryer{
RetryDelayFn: func(attempts int, _ Completed, err error) time.Duration {
Expand Down
7 changes: 6 additions & 1 deletion rueidis.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ const (
// DefaultRingScale is the default value of ClientOption.RingScaleEachConn, which results into having a ring of size 2^10 for each connection
DefaultRingScale = 10
// DefaultPoolSize is the default value of ClientOption.BlockingPoolSize
DefaultPoolSize = 1000
DefaultPoolSize = 1024
// DefaultBlockingPipeline is the default value of ClientOption.BlockingPipeline
DefaultBlockingPipeline = 2000
// DefaultDialTimeout is the default value of ClientOption.Dialer.Timeout
Expand Down Expand Up @@ -161,6 +161,11 @@ type ClientOption struct {
// produce notable CPU usage reduction under load. Ref: https://github.com/redis/rueidis/issues/156
MaxFlushDelay time.Duration

// DisableTCPNoDelay turns on Nagle's algorithm in pipelining mode by using conn.SetNoDelay(false).
// Turning this on can result in lower p99 latencies and lower CPU usages if all your requests are small.
// But if you have large requests or fast network, this might degrade the performance. Ref: https://github.com/redis/rueidis/pull/650
DisableTCPNoDelay bool

// ShuffleInit is a handy flag that shuffles the InitAddress after passing to the NewClient() if it is true
ShuffleInit bool
// ClientNoTouch controls whether commands alter LRU/LFU stats
Expand Down
4 changes: 2 additions & 2 deletions rueidiscompat/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ replace github.com/redis/rueidis/mock => ../mock
require (
github.com/onsi/ginkgo/v2 v2.20.1
github.com/onsi/gomega v1.34.1
github.com/redis/rueidis v1.0.47
github.com/redis/rueidis/mock v1.0.47
github.com/redis/rueidis v1.0.48
github.com/redis/rueidis/mock v1.0.48
go.uber.org/mock v0.4.0
)

Expand Down
4 changes: 2 additions & 2 deletions rueidishook/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ replace (
)

require (
github.com/redis/rueidis v1.0.47
github.com/redis/rueidis/mock v1.0.47
github.com/redis/rueidis v1.0.48
github.com/redis/rueidis/mock v1.0.48
go.uber.org/mock v0.4.0
)

Expand Down
2 changes: 1 addition & 1 deletion rueidisotel/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go 1.21
replace github.com/redis/rueidis => ../

require (
github.com/redis/rueidis v1.0.47
github.com/redis/rueidis v1.0.48
go.opentelemetry.io/otel v1.28.0
go.opentelemetry.io/otel/metric v1.28.0
go.opentelemetry.io/otel/sdk v1.28.0
Expand Down
2 changes: 1 addition & 1 deletion rueidisprob/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go 1.21
replace github.com/redis/rueidis => ../

require (
github.com/redis/rueidis v1.0.47
github.com/redis/rueidis v1.0.48
github.com/twmb/murmur3 v1.1.8
)

Expand Down

0 comments on commit 80a3725

Please sign in to comment.