diff --git a/mux.go b/mux.go index 925b1a5b..b1a35d7b 100644 --- a/mux.go +++ b/mux.go @@ -3,7 +3,9 @@ package rueidis import ( "context" "net" + "os" "runtime" + "strings" "sync" "sync/atomic" "time" @@ -42,6 +44,10 @@ type conn interface { SetOnCloseHook(func(error)) } +const useConnPoolingEnv = "DISABLE_AUTO_PIPELINING" + +var useConnPooling = strings.ToUpper(os.Getenv(useConnPoolingEnv)) == "TRUE" + var _ conn = (*mux)(nil) type mux struct { @@ -57,6 +63,8 @@ type mux struct { mu []sync.Mutex maxp int maxm int + + usePool bool } func makeMux(dst string, option *ClientOption, dialFn dialFn) *mux { @@ -90,6 +98,8 @@ func newMux(dst string, option *ClientOption, init, dead wire, wireFn wireFn, wi sc: make([]*singleconnect, multiplex), maxp: runtime.GOMAXPROCS(0), maxm: option.BlockingPipeline, + + usePool: useConnPooling, } m.clhks.Store(emptyclhks) for i := 0; i < len(m.wire); i++ { @@ -200,7 +210,7 @@ func (m *mux) DoMultiStream(ctx context.Context, multi ...Completed) MultiRedisR } func (m *mux) Do(ctx context.Context, cmd Completed) (resp RedisResult) { - if cmd.IsBlock() { + if m.usePool || cmd.IsBlock() { resp = m.blocking(ctx, cmd) } else { resp = m.pipeline(ctx, cmd) @@ -209,7 +219,7 @@ func (m *mux) Do(ctx context.Context, cmd Completed) (resp RedisResult) { } func (m *mux) DoMulti(ctx context.Context, multi ...Completed) (resp *redisresults) { - if len(multi) >= m.maxm && m.maxm > 0 { + if m.usePool || (len(multi) >= m.maxm && m.maxm > 0) { goto block // use a dedicated connection if the pipeline is too large } for _, cmd := range multi { diff --git a/redis_test.go b/redis_test.go index f531593f..9fc3ab46 100644 --- a/redis_test.go +++ b/redis_test.go @@ -5,6 +5,7 @@ import ( "context" "math/rand" "net" + "os" "reflect" "strconv" "sync" @@ -805,6 +806,7 @@ func TestSingleClientIntegration(t *testing.T) { t.Skip() } defer ShouldNotLeaked(SetupLeakDetection()) + client, err := NewClient(ClientOption{ InitAddress: []string{"127.0.0.1:6379"}, ConnWriteTimeout: 180 * time.Second, @@ -820,11 +822,18 @@ func TestSingleClientIntegration(t *testing.T) { client.Close() } +func TestSingleClientIntegrationWithPool(t *testing.T) { + os.Setenv(useConnPoolingEnv, "true") + defer os.Unsetenv(useConnPoolingEnv) + TestSingleClientIntegration(t) +} + func TestSentinelClientIntegration(t *testing.T) { if testing.Short() { t.Skip() } defer ShouldNotLeaked(SetupLeakDetection()) + client, err := NewClient(ClientOption{ InitAddress: []string{"127.0.0.1:26379"}, ConnWriteTimeout: 180 * time.Second, @@ -844,11 +853,18 @@ func TestSentinelClientIntegration(t *testing.T) { client.Close() } +func TestSentinelClientIntegrationWithPool(t *testing.T) { + os.Setenv(useConnPoolingEnv, "true") + defer os.Unsetenv(useConnPoolingEnv) + TestSentinelClientIntegration(t) +} + func TestClusterClientIntegration(t *testing.T) { if testing.Short() { t.Skip() } defer ShouldNotLeaked(SetupLeakDetection()) + client, err := NewClient(ClientOption{ InitAddress: []string{"127.0.0.1:7001", "127.0.0.1:7002", "127.0.0.1:7003"}, ConnWriteTimeout: 180 * time.Second, @@ -864,6 +880,12 @@ func TestClusterClientIntegration(t *testing.T) { client.Close() } +func TestClusterClientIntegrationWithPool(t *testing.T) { + os.Setenv(useConnPoolingEnv, "true") + defer os.Unsetenv(useConnPoolingEnv) + TestClusterClientIntegration(t) +} + func TestSingleClient5Integration(t *testing.T) { if testing.Short() { t.Skip()