Skip to content

Commit

Permalink
feat: add DisableAutoPipelining to serve requests from the connection…
Browse files Browse the repository at this point in the history
… pool

Signed-off-by: Rueian <[email protected]>
  • Loading branch information
rueian committed Oct 11, 2024
1 parent fc348f5 commit 4cf6b70
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 8 deletions.
16 changes: 10 additions & 6 deletions mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ type mux struct {
mu []sync.Mutex
maxp int
maxm int

usePool bool
}

func makeMux(dst string, option *ClientOption, dialFn dialFn) *mux {
Expand Down Expand Up @@ -90,6 +92,8 @@ func newMux(dst string, option *ClientOption, init, dead wire, wireFn wireFn, wi
sc: make([]*singleconnect, multiplex),
maxp: runtime.GOMAXPROCS(0),
maxm: option.BlockingPipeline,

usePool: option.DisableAutoPipelining,
}
m.clhks.Store(emptyclhks)
for i := 0; i < len(m.wire); i++ {
Expand Down Expand Up @@ -200,7 +204,7 @@ func (m *mux) DoMultiStream(ctx context.Context, multi ...Completed) MultiRedisR
}

func (m *mux) Do(ctx context.Context, cmd Completed) (resp RedisResult) {
if cmd.IsBlock() {
if m.usePool || cmd.IsBlock() {
resp = m.blocking(ctx, cmd)
} else {
resp = m.pipeline(ctx, cmd)
Expand All @@ -209,7 +213,7 @@ func (m *mux) Do(ctx context.Context, cmd Completed) (resp RedisResult) {
}

func (m *mux) DoMulti(ctx context.Context, multi ...Completed) (resp *redisresults) {
if len(multi) >= m.maxm && m.maxm > 0 {
if m.usePool || (len(multi) >= m.maxm && m.maxm > 0) {
goto block // use a dedicated connection if the pipeline is too large
}
for _, cmd := range multi {
Expand All @@ -224,25 +228,25 @@ block:
}

func (m *mux) blocking(ctx context.Context, cmd Completed) (resp RedisResult) {
wire := m.dpool.Acquire()
wire := m.spool.Acquire()
resp = wire.Do(ctx, cmd)
if resp.NonRedisError() != nil { // abort the wire if blocking command return early (ex. context.DeadlineExceeded)
wire.Close()
}
m.dpool.Store(wire)
m.spool.Store(wire)
return resp
}

func (m *mux) blockingMulti(ctx context.Context, cmd []Completed) (resp *redisresults) {
wire := m.dpool.Acquire()
wire := m.spool.Acquire()
resp = wire.DoMulti(ctx, cmd...)
for _, res := range resp.s {
if res.NonRedisError() != nil { // abort the wire if blocking command return early (ex. context.DeadlineExceeded)
wire.Close()
break
}
}
m.dpool.Store(wire)
m.spool.Store(wire)
return resp
}

Expand Down
4 changes: 2 additions & 2 deletions mux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ func TestMuxReuseWire(t *testing.T) {
t.Fatalf("unexpected dial error %v", err)
}

wire1 := m.Acquire()
wire1 := m.spool.Acquire()

go func() {
// this should use the second wire
Expand All @@ -215,7 +215,7 @@ func TestMuxReuseWire(t *testing.T) {
}()
<-blocking

m.Store(wire1)
m.spool.Store(wire1)
// this should use the first wire
if val, err := m.Do(context.Background(), cmds.NewBlockingCompleted([]string{"PING"})).ToString(); err != nil {
t.Fatalf("unexpected error %v", err)
Expand Down
10 changes: 10 additions & 0 deletions redis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"math/rand"
"net"
"os"
"reflect"
"strconv"
"sync"
Expand Down Expand Up @@ -805,10 +806,13 @@ func TestSingleClientIntegration(t *testing.T) {
t.Skip()
}
defer ShouldNotLeaked(SetupLeakDetection())

client, err := NewClient(ClientOption{
InitAddress: []string{"127.0.0.1:6379"},
ConnWriteTimeout: 180 * time.Second,
PipelineMultiplex: 1,

DisableAutoPipelining: os.Getenv("DisableAutoPipelining") == "true",
})
if err != nil {
t.Fatal(err)
Expand All @@ -825,6 +829,7 @@ func TestSentinelClientIntegration(t *testing.T) {
t.Skip()
}
defer ShouldNotLeaked(SetupLeakDetection())

client, err := NewClient(ClientOption{
InitAddress: []string{"127.0.0.1:26379"},
ConnWriteTimeout: 180 * time.Second,
Expand All @@ -833,6 +838,8 @@ func TestSentinelClientIntegration(t *testing.T) {
},
SelectDB: 2, // https://github.com/redis/rueidis/issues/138
PipelineMultiplex: 1,

DisableAutoPipelining: os.Getenv("DisableAutoPipelining") == "true",
})
if err != nil {
t.Fatal(err)
Expand All @@ -849,12 +856,15 @@ func TestClusterClientIntegration(t *testing.T) {
t.Skip()
}
defer ShouldNotLeaked(SetupLeakDetection())

client, err := NewClient(ClientOption{
InitAddress: []string{"127.0.0.1:7001", "127.0.0.1:7002", "127.0.0.1:7003"},
ConnWriteTimeout: 180 * time.Second,
ShuffleInit: true,
Dialer: net.Dialer{KeepAlive: -1},
PipelineMultiplex: 1,

DisableAutoPipelining: os.Getenv("DisableAutoPipelining") == "true",
})
if err != nil {
t.Fatal(err)
Expand Down
5 changes: 5 additions & 0 deletions rueidis.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,8 @@ type ClientOption struct {
DisableRetry bool
// DisableCache falls back Client.DoCache/Client.DoMultiCache to Client.Do/Client.DoMulti
DisableCache bool
// DisableAutoPipelining makes rueidis.Client always pick a connection from the BlockingPool to serve each request.
DisableAutoPipelining bool
// AlwaysPipelining makes rueidis.Client always pipeline redis commands even if they are not issued concurrently.
AlwaysPipelining bool
// AlwaysRESP2 makes rueidis.Client always uses RESP2, otherwise it will try using RESP3 first.
Expand Down Expand Up @@ -354,6 +356,9 @@ func NewClient(option ClientOption) (client Client, err error) {
if option.BlockingPipeline == 0 {
option.BlockingPipeline = DefaultBlockingPipeline
}
if option.DisableAutoPipelining {
option.AlwaysPipelining = false
}
if option.ShuffleInit {
util.Shuffle(len(option.InitAddress), func(i, j int) {
option.InitAddress[i], option.InitAddress[j] = option.InitAddress[j], option.InitAddress[i]
Expand Down

0 comments on commit 4cf6b70

Please sign in to comment.