From 5ce9446937b2c164c7228492ee855c8fc52921d2 Mon Sep 17 00:00:00 2001 From: Rueian Date: Wed, 9 Oct 2024 17:49:04 -0700 Subject: [PATCH] feat: add RUEIDIS_USE_CONN_POOLING env to serve requests from the connection pool Signed-off-by: Rueian --- mux.go | 17 ++++++++++-- redis_test.go | 77 +++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 92 insertions(+), 2 deletions(-) diff --git a/mux.go b/mux.go index 925b1a5b..e40865fa 100644 --- a/mux.go +++ b/mux.go @@ -3,7 +3,9 @@ package rueidis import ( "context" "net" + "os" "runtime" + "strings" "sync" "sync/atomic" "time" @@ -42,7 +44,14 @@ type conn interface { SetOnCloseHook(func(error)) } +const useConnPoolingEnv = "RUEIDIS_USE_CONN_POOLING" + var _ conn = (*mux)(nil) +var useConnPooling bool + +func init() { + useConnPooling = strings.ToUpper(os.Getenv(useConnPoolingEnv)) == "TRUE" +} type mux struct { init wire @@ -57,6 +66,8 @@ type mux struct { mu []sync.Mutex maxp int maxm int + + usePool bool } func makeMux(dst string, option *ClientOption, dialFn dialFn) *mux { @@ -90,6 +101,8 @@ func newMux(dst string, option *ClientOption, init, dead wire, wireFn wireFn, wi sc: make([]*singleconnect, multiplex), maxp: runtime.GOMAXPROCS(0), maxm: option.BlockingPipeline, + + usePool: useConnPooling, } m.clhks.Store(emptyclhks) for i := 0; i < len(m.wire); i++ { @@ -200,7 +213,7 @@ func (m *mux) DoMultiStream(ctx context.Context, multi ...Completed) MultiRedisR } func (m *mux) Do(ctx context.Context, cmd Completed) (resp RedisResult) { - if cmd.IsBlock() { + if m.usePool || cmd.IsBlock() { resp = m.blocking(ctx, cmd) } else { resp = m.pipeline(ctx, cmd) @@ -209,7 +222,7 @@ func (m *mux) Do(ctx context.Context, cmd Completed) (resp RedisResult) { } func (m *mux) DoMulti(ctx context.Context, multi ...Completed) (resp *redisresults) { - if len(multi) >= m.maxm && m.maxm > 0 { + if m.usePool || (len(multi) >= m.maxm && m.maxm > 0) { goto block // use a dedicated connection if the pipeline is too large } for _, cmd := range multi { diff --git a/redis_test.go b/redis_test.go index f531593f..7c9ef6d1 100644 --- a/redis_test.go +++ b/redis_test.go @@ -5,6 +5,7 @@ import ( "context" "math/rand" "net" + "os" "reflect" "strconv" "sync" @@ -820,6 +821,30 @@ func TestSingleClientIntegration(t *testing.T) { client.Close() } +func TestSingleClientIntegrationWithPool(t *testing.T) { + if testing.Short() { + t.Skip() + } + defer ShouldNotLeaked(SetupLeakDetection()) + + os.Setenv(useConnPoolingEnv, "true") + defer os.Unsetenv(useConnPoolingEnv) + + client, err := NewClient(ClientOption{ + InitAddress: []string{"127.0.0.1:6379"}, + ConnWriteTimeout: 180 * time.Second, + PipelineMultiplex: 1, + }) + if err != nil { + t.Fatal(err) + } + + run(t, client, testSETGETCSC, testMultiSETGETCSC, testMultiSETGETCSCHelpers, testMultiExec, testBlockingZPOP, testBlockingXREAD, testPubSub, testPubSubSharded, testLua) + run(t, client, testFlush) + + client.Close() +} + func TestSentinelClientIntegration(t *testing.T) { if testing.Short() { t.Skip() @@ -844,6 +869,34 @@ func TestSentinelClientIntegration(t *testing.T) { client.Close() } +func TestSentinelClientIntegrationWithPool(t *testing.T) { + if testing.Short() { + t.Skip() + } + defer ShouldNotLeaked(SetupLeakDetection()) + + os.Setenv(useConnPoolingEnv, "true") + defer os.Unsetenv(useConnPoolingEnv) + + client, err := NewClient(ClientOption{ + InitAddress: []string{"127.0.0.1:26379"}, + ConnWriteTimeout: 180 * time.Second, + Sentinel: SentinelOption{ + MasterSet: "test", + }, + SelectDB: 2, // https://github.com/redis/rueidis/issues/138 + PipelineMultiplex: 1, + }) + if err != nil { + t.Fatal(err) + } + + run(t, client, testSETGETCSC, testMultiSETGETCSC, testMultiSETGETCSCHelpers, testMultiExec, testBlockingZPOP, testBlockingXREAD, testPubSub, testPubSubSharded, testLua) + run(t, client, testFlush) + + client.Close() +} + func TestClusterClientIntegration(t *testing.T) { if testing.Short() { t.Skip() @@ -864,6 +917,30 @@ func TestClusterClientIntegration(t *testing.T) { client.Close() } +func TestClusterClientIntegrationWithPool(t *testing.T) { + if testing.Short() { + t.Skip() + } + defer ShouldNotLeaked(SetupLeakDetection()) + + os.Setenv(useConnPoolingEnv, "true") + defer os.Unsetenv(useConnPoolingEnv) + + client, err := NewClient(ClientOption{ + InitAddress: []string{"127.0.0.1:7001", "127.0.0.1:7002", "127.0.0.1:7003"}, + ConnWriteTimeout: 180 * time.Second, + ShuffleInit: true, + Dialer: net.Dialer{KeepAlive: -1}, + PipelineMultiplex: 1, + }) + if err != nil { + t.Fatal(err) + } + run(t, client, testSETGETCSC, testMultiSETGETCSC, testMultiSETGETCSCHelpers, testMultiExec, testBlockingZPOP, testBlockingXREAD, testPubSub, testPubSubSharded, testLua) + + client.Close() +} + func TestSingleClient5Integration(t *testing.T) { if testing.Short() { t.Skip()