Skip to content

Commit

Permalink
feat: add DisableAutoPipelining to serve requests from the connection…
Browse files Browse the repository at this point in the history
… pool

Signed-off-by: Rueian <[email protected]>
  • Loading branch information
rueian committed Oct 10, 2024
1 parent fc348f5 commit 57f44fe
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 2 deletions.
8 changes: 6 additions & 2 deletions mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ type mux struct {
mu []sync.Mutex
maxp int
maxm int

usePool bool
}

func makeMux(dst string, option *ClientOption, dialFn dialFn) *mux {
Expand Down Expand Up @@ -90,6 +92,8 @@ func newMux(dst string, option *ClientOption, init, dead wire, wireFn wireFn, wi
sc: make([]*singleconnect, multiplex),
maxp: runtime.GOMAXPROCS(0),
maxm: option.BlockingPipeline,

usePool: option.DisableAutoPipelining,
}
m.clhks.Store(emptyclhks)
for i := 0; i < len(m.wire); i++ {
Expand Down Expand Up @@ -200,7 +204,7 @@ func (m *mux) DoMultiStream(ctx context.Context, multi ...Completed) MultiRedisR
}

func (m *mux) Do(ctx context.Context, cmd Completed) (resp RedisResult) {
if cmd.IsBlock() {
if m.usePool || cmd.IsBlock() {
resp = m.blocking(ctx, cmd)
} else {
resp = m.pipeline(ctx, cmd)
Expand All @@ -209,7 +213,7 @@ func (m *mux) Do(ctx context.Context, cmd Completed) (resp RedisResult) {
}

func (m *mux) DoMulti(ctx context.Context, multi ...Completed) (resp *redisresults) {
if len(multi) >= m.maxm && m.maxm > 0 {
if m.usePool || (len(multi) >= m.maxm && m.maxm > 0) {
goto block // use a dedicated connection if the pipeline is too large
}
for _, cmd := range multi {
Expand Down
28 changes: 28 additions & 0 deletions redis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"math/rand"
"net"
"os"
"reflect"
"strconv"
"sync"
Expand Down Expand Up @@ -805,10 +806,13 @@ func TestSingleClientIntegration(t *testing.T) {
t.Skip()
}
defer ShouldNotLeaked(SetupLeakDetection())

client, err := NewClient(ClientOption{
InitAddress: []string{"127.0.0.1:6379"},
ConnWriteTimeout: 180 * time.Second,
PipelineMultiplex: 1,

DisableAutoPipelining: os.Getenv("DisableAutoPipelining") == "true",
})
if err != nil {
t.Fatal(err)
Expand All @@ -820,11 +824,18 @@ func TestSingleClientIntegration(t *testing.T) {
client.Close()
}

func TestSingleClientIntegrationWithPool(t *testing.T) {
os.Setenv("DisableAutoPipelining", "true")
defer os.Unsetenv("DisableAutoPipelining")
TestSingleClientIntegration(t)
}

func TestSentinelClientIntegration(t *testing.T) {
if testing.Short() {
t.Skip()
}
defer ShouldNotLeaked(SetupLeakDetection())

client, err := NewClient(ClientOption{
InitAddress: []string{"127.0.0.1:26379"},
ConnWriteTimeout: 180 * time.Second,
Expand All @@ -833,6 +844,8 @@ func TestSentinelClientIntegration(t *testing.T) {
},
SelectDB: 2, // https://github.com/redis/rueidis/issues/138
PipelineMultiplex: 1,

DisableAutoPipelining: os.Getenv("DisableAutoPipelining") == "true",
})
if err != nil {
t.Fatal(err)
Expand All @@ -844,17 +857,26 @@ func TestSentinelClientIntegration(t *testing.T) {
client.Close()
}

func TestSentinelClientIntegrationWithPool(t *testing.T) {
os.Setenv("DisableAutoPipelining", "true")
defer os.Unsetenv("DisableAutoPipelining")
TestSentinelClientIntegration(t)
}

func TestClusterClientIntegration(t *testing.T) {
if testing.Short() {
t.Skip()
}
defer ShouldNotLeaked(SetupLeakDetection())

client, err := NewClient(ClientOption{
InitAddress: []string{"127.0.0.1:7001", "127.0.0.1:7002", "127.0.0.1:7003"},
ConnWriteTimeout: 180 * time.Second,
ShuffleInit: true,
Dialer: net.Dialer{KeepAlive: -1},
PipelineMultiplex: 1,

DisableAutoPipelining: os.Getenv("DisableAutoPipelining") == "true",
})
if err != nil {
t.Fatal(err)
Expand All @@ -864,6 +886,12 @@ func TestClusterClientIntegration(t *testing.T) {
client.Close()
}

func TestClusterClientIntegrationWithPool(t *testing.T) {
os.Setenv("DisableAutoPipelining", "true")
defer os.Unsetenv("DisableAutoPipelining")
TestClusterClientIntegration(t)
}

func TestSingleClient5Integration(t *testing.T) {
if testing.Short() {
t.Skip()
Expand Down
5 changes: 5 additions & 0 deletions rueidis.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,8 @@ type ClientOption struct {
DisableRetry bool
// DisableCache falls back Client.DoCache/Client.DoMultiCache to Client.Do/Client.DoMulti
DisableCache bool
// DisableAutoPipelining makes rueidis.Client always pick a connection from the BlockingPool to serve each request.
DisableAutoPipelining bool
// AlwaysPipelining makes rueidis.Client always pipeline redis commands even if they are not issued concurrently.
AlwaysPipelining bool
// AlwaysRESP2 makes rueidis.Client always uses RESP2, otherwise it will try using RESP3 first.
Expand Down Expand Up @@ -354,6 +356,9 @@ func NewClient(option ClientOption) (client Client, err error) {
if option.BlockingPipeline == 0 {
option.BlockingPipeline = DefaultBlockingPipeline
}
if option.DisableAutoPipelining {
option.AlwaysPipelining = false
}
if option.ShuffleInit {
util.Shuffle(len(option.InitAddress), func(i, j int) {
option.InitAddress[i], option.InitAddress[j] = option.InitAddress[j], option.InitAddress[i]
Expand Down

0 comments on commit 57f44fe

Please sign in to comment.