Skip to content

Commit

Permalink
feat: add RUEIDIS_USE_CONN_POOLING env to serve requests from the con…
Browse files Browse the repository at this point in the history
…nection pool

Signed-off-by: Rueian <[email protected]>
  • Loading branch information
rueian committed Oct 10, 2024
1 parent fc348f5 commit a6f104d
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 40 deletions.
17 changes: 15 additions & 2 deletions mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ package rueidis
import (
"context"
"net"
"os"
"runtime"
"strings"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -42,7 +44,14 @@ type conn interface {
SetOnCloseHook(func(error))
}

const useConnPoolingEnv = "DISABLE_AUTO_PIPELINING"

var _ conn = (*mux)(nil)
var useConnPooling bool

func init() {
useConnPooling = strings.ToUpper(os.Getenv(useConnPoolingEnv)) == "TRUE"
}

type mux struct {
init wire
Expand All @@ -57,6 +66,8 @@ type mux struct {
mu []sync.Mutex
maxp int
maxm int

usePool bool
}

func makeMux(dst string, option *ClientOption, dialFn dialFn) *mux {
Expand Down Expand Up @@ -90,6 +101,8 @@ func newMux(dst string, option *ClientOption, init, dead wire, wireFn wireFn, wi
sc: make([]*singleconnect, multiplex),
maxp: runtime.GOMAXPROCS(0),
maxm: option.BlockingPipeline,

usePool: useConnPooling,
}
m.clhks.Store(emptyclhks)
for i := 0; i < len(m.wire); i++ {
Expand Down Expand Up @@ -200,7 +213,7 @@ func (m *mux) DoMultiStream(ctx context.Context, multi ...Completed) MultiRedisR
}

func (m *mux) Do(ctx context.Context, cmd Completed) (resp RedisResult) {
if cmd.IsBlock() {
if m.usePool || cmd.IsBlock() {
resp = m.blocking(ctx, cmd)
} else {
resp = m.pipeline(ctx, cmd)
Expand All @@ -209,7 +222,7 @@ func (m *mux) Do(ctx context.Context, cmd Completed) (resp RedisResult) {
}

func (m *mux) DoMulti(ctx context.Context, multi ...Completed) (resp *redisresults) {
if len(multi) >= m.maxm && m.maxm > 0 {
if m.usePool || (len(multi) >= m.maxm && m.maxm > 0) {
goto block // use a dedicated connection if the pipeline is too large
}
for _, cmd := range multi {
Expand Down
109 changes: 71 additions & 38 deletions redis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"math/rand"
"net"
"os"
"reflect"
"strconv"
"sync"
Expand Down Expand Up @@ -805,63 +806,95 @@ func TestSingleClientIntegration(t *testing.T) {
t.Skip()
}
defer ShouldNotLeaked(SetupLeakDetection())
client, err := NewClient(ClientOption{
InitAddress: []string{"127.0.0.1:6379"},
ConnWriteTimeout: 180 * time.Second,
PipelineMultiplex: 1,
})
if err != nil {
t.Fatal(err)
}

run(t, client, testSETGETCSC, testMultiSETGETCSC, testMultiSETGETCSCHelpers, testMultiExec, testBlockingZPOP, testBlockingXREAD, testPubSub, testPubSubSharded, testLua)
run(t, client, testFlush)
for _, usePool := range []bool{false, true} {
if usePool {
os.Setenv(useConnPoolingEnv, "true")
}
client, err := NewClient(ClientOption{
InitAddress: []string{"127.0.0.1:6379"},
ConnWriteTimeout: 180 * time.Second,
PipelineMultiplex: 1,
})
if err != nil {
t.Fatal(err)
}

client.Close()
run(t, client, testSETGETCSC, testMultiSETGETCSC, testMultiSETGETCSCHelpers, testMultiExec, testBlockingZPOP, testBlockingXREAD, testPubSub, testPubSubSharded, testLua)
run(t, client, testFlush)

client.Close()

if usePool {
os.Unsetenv(useConnPoolingEnv)
}
}
}

func TestSentinelClientIntegration(t *testing.T) {
if testing.Short() {
t.Skip()
}
defer ShouldNotLeaked(SetupLeakDetection())
client, err := NewClient(ClientOption{
InitAddress: []string{"127.0.0.1:26379"},
ConnWriteTimeout: 180 * time.Second,
Sentinel: SentinelOption{
MasterSet: "test",
},
SelectDB: 2, // https://github.com/redis/rueidis/issues/138
PipelineMultiplex: 1,
})
if err != nil {
t.Fatal(err)
}

run(t, client, testSETGETCSC, testMultiSETGETCSC, testMultiSETGETCSCHelpers, testMultiExec, testBlockingZPOP, testBlockingXREAD, testPubSub, testPubSubSharded, testLua)
run(t, client, testFlush)
for _, usePool := range []bool{false, true} {
if usePool {
os.Setenv(useConnPoolingEnv, "true")
}

client.Close()
client, err := NewClient(ClientOption{
InitAddress: []string{"127.0.0.1:26379"},
ConnWriteTimeout: 180 * time.Second,
Sentinel: SentinelOption{
MasterSet: "test",
},
SelectDB: 2, // https://github.com/redis/rueidis/issues/138
PipelineMultiplex: 1,
})
if err != nil {
t.Fatal(err)
}

run(t, client, testSETGETCSC, testMultiSETGETCSC, testMultiSETGETCSCHelpers, testMultiExec, testBlockingZPOP, testBlockingXREAD, testPubSub, testPubSubSharded, testLua)
run(t, client, testFlush)

client.Close()

if usePool {
os.Unsetenv(useConnPoolingEnv)
}
}
}

func TestClusterClientIntegration(t *testing.T) {
if testing.Short() {
t.Skip()
}
defer ShouldNotLeaked(SetupLeakDetection())
client, err := NewClient(ClientOption{
InitAddress: []string{"127.0.0.1:7001", "127.0.0.1:7002", "127.0.0.1:7003"},
ConnWriteTimeout: 180 * time.Second,
ShuffleInit: true,
Dialer: net.Dialer{KeepAlive: -1},
PipelineMultiplex: 1,
})
if err != nil {
t.Fatal(err)
}
run(t, client, testSETGETCSC, testMultiSETGETCSC, testMultiSETGETCSCHelpers, testMultiExec, testBlockingZPOP, testBlockingXREAD, testPubSub, testPubSubSharded, testLua)

client.Close()
for _, usePool := range []bool{false, true} {
if usePool {
os.Setenv(useConnPoolingEnv, "true")
}

client, err := NewClient(ClientOption{
InitAddress: []string{"127.0.0.1:7001", "127.0.0.1:7002", "127.0.0.1:7003"},
ConnWriteTimeout: 180 * time.Second,
ShuffleInit: true,
Dialer: net.Dialer{KeepAlive: -1},
PipelineMultiplex: 1,
})
if err != nil {
t.Fatal(err)
}
run(t, client, testSETGETCSC, testMultiSETGETCSC, testMultiSETGETCSCHelpers, testMultiExec, testBlockingZPOP, testBlockingXREAD, testPubSub, testPubSubSharded, testLua)

client.Close()

if usePool {
os.Unsetenv(useConnPoolingEnv)
}
}
}

func TestSingleClient5Integration(t *testing.T) {
Expand Down

0 comments on commit a6f104d

Please sign in to comment.