From 57f44fe8432866168b69a8a10b5ce17d7716b46d Mon Sep 17 00:00:00 2001 From: Rueian Date: Wed, 9 Oct 2024 17:49:04 -0700 Subject: [PATCH] feat: add DisableAutoPipelining to serve requests from the connection pool Signed-off-by: Rueian --- mux.go | 8 ++++++-- redis_test.go | 28 ++++++++++++++++++++++++++++ rueidis.go | 5 +++++ 3 files changed, 39 insertions(+), 2 deletions(-) diff --git a/mux.go b/mux.go index 925b1a5b..240b5daa 100644 --- a/mux.go +++ b/mux.go @@ -57,6 +57,8 @@ type mux struct { mu []sync.Mutex maxp int maxm int + + usePool bool } func makeMux(dst string, option *ClientOption, dialFn dialFn) *mux { @@ -90,6 +92,8 @@ func newMux(dst string, option *ClientOption, init, dead wire, wireFn wireFn, wi sc: make([]*singleconnect, multiplex), maxp: runtime.GOMAXPROCS(0), maxm: option.BlockingPipeline, + + usePool: option.DisableAutoPipelining, } m.clhks.Store(emptyclhks) for i := 0; i < len(m.wire); i++ { @@ -200,7 +204,7 @@ func (m *mux) DoMultiStream(ctx context.Context, multi ...Completed) MultiRedisR } func (m *mux) Do(ctx context.Context, cmd Completed) (resp RedisResult) { - if cmd.IsBlock() { + if m.usePool || cmd.IsBlock() { resp = m.blocking(ctx, cmd) } else { resp = m.pipeline(ctx, cmd) @@ -209,7 +213,7 @@ func (m *mux) Do(ctx context.Context, cmd Completed) (resp RedisResult) { } func (m *mux) DoMulti(ctx context.Context, multi ...Completed) (resp *redisresults) { - if len(multi) >= m.maxm && m.maxm > 0 { + if m.usePool || (len(multi) >= m.maxm && m.maxm > 0) { goto block // use a dedicated connection if the pipeline is too large } for _, cmd := range multi { diff --git a/redis_test.go b/redis_test.go index f531593f..327f82ac 100644 --- a/redis_test.go +++ b/redis_test.go @@ -5,6 +5,7 @@ import ( "context" "math/rand" "net" + "os" "reflect" "strconv" "sync" @@ -805,10 +806,13 @@ func TestSingleClientIntegration(t *testing.T) { t.Skip() } defer ShouldNotLeaked(SetupLeakDetection()) + client, err := NewClient(ClientOption{ InitAddress: []string{"127.0.0.1:6379"}, ConnWriteTimeout: 180 * time.Second, PipelineMultiplex: 1, + + DisableAutoPipelining: os.Getenv("DisableAutoPipelining") == "true", }) if err != nil { t.Fatal(err) @@ -820,11 +824,18 @@ func TestSingleClientIntegration(t *testing.T) { client.Close() } +func TestSingleClientIntegrationWithPool(t *testing.T) { + os.Setenv("DisableAutoPipelining", "true") + defer os.Unsetenv("DisableAutoPipelining") + TestSingleClientIntegration(t) +} + func TestSentinelClientIntegration(t *testing.T) { if testing.Short() { t.Skip() } defer ShouldNotLeaked(SetupLeakDetection()) + client, err := NewClient(ClientOption{ InitAddress: []string{"127.0.0.1:26379"}, ConnWriteTimeout: 180 * time.Second, @@ -833,6 +844,8 @@ func TestSentinelClientIntegration(t *testing.T) { }, SelectDB: 2, // https://github.com/redis/rueidis/issues/138 PipelineMultiplex: 1, + + DisableAutoPipelining: os.Getenv("DisableAutoPipelining") == "true", }) if err != nil { t.Fatal(err) @@ -844,17 +857,26 @@ func TestSentinelClientIntegration(t *testing.T) { client.Close() } +func TestSentinelClientIntegrationWithPool(t *testing.T) { + os.Setenv("DisableAutoPipelining", "true") + defer os.Unsetenv("DisableAutoPipelining") + TestSentinelClientIntegration(t) +} + func TestClusterClientIntegration(t *testing.T) { if testing.Short() { t.Skip() } defer ShouldNotLeaked(SetupLeakDetection()) + client, err := NewClient(ClientOption{ InitAddress: []string{"127.0.0.1:7001", "127.0.0.1:7002", "127.0.0.1:7003"}, ConnWriteTimeout: 180 * time.Second, ShuffleInit: true, Dialer: net.Dialer{KeepAlive: -1}, PipelineMultiplex: 1, + + DisableAutoPipelining: os.Getenv("DisableAutoPipelining") == "true", }) if err != nil { t.Fatal(err) @@ -864,6 +886,12 @@ func TestClusterClientIntegration(t *testing.T) { client.Close() } +func TestClusterClientIntegrationWithPool(t *testing.T) { + os.Setenv("DisableAutoPipelining", "true") + defer os.Unsetenv("DisableAutoPipelining") + TestClusterClientIntegration(t) +} + func TestSingleClient5Integration(t *testing.T) { if testing.Short() { t.Skip() diff --git a/rueidis.go b/rueidis.go index 169ae8b9..6b13974b 100644 --- a/rueidis.go +++ b/rueidis.go @@ -167,6 +167,8 @@ type ClientOption struct { DisableRetry bool // DisableCache falls back Client.DoCache/Client.DoMultiCache to Client.Do/Client.DoMulti DisableCache bool + // DisableAutoPipelining makes rueidis.Client always pick a connection from the BlockingPool to serve each request. + DisableAutoPipelining bool // AlwaysPipelining makes rueidis.Client always pipeline redis commands even if they are not issued concurrently. AlwaysPipelining bool // AlwaysRESP2 makes rueidis.Client always uses RESP2, otherwise it will try using RESP3 first. @@ -354,6 +356,9 @@ func NewClient(option ClientOption) (client Client, err error) { if option.BlockingPipeline == 0 { option.BlockingPipeline = DefaultBlockingPipeline } + if option.DisableAutoPipelining { + option.AlwaysPipelining = false + } if option.ShuffleInit { util.Shuffle(len(option.InitAddress), func(i, j int) { option.InitAddress[i], option.InitAddress[j] = option.InitAddress[j], option.InitAddress[i]