From 8871aab6010eddfebe262ef828e96713baa5211b Mon Sep 17 00:00:00 2001 From: Rueian Date: Wed, 9 Oct 2024 17:49:04 -0700 Subject: [PATCH] feat: add DisableAutoPipelining to serve requests from the connection pool Signed-off-by: Rueian --- mux.go | 25 +++++++++++++++++-------- mux_test.go | 4 ++-- pipe.go | 29 ++++++++++++++++------------- redis_test.go | 10 ++++++++++ rueidis.go | 5 +++++ 5 files changed, 50 insertions(+), 23 deletions(-) diff --git a/mux.go b/mux.go index 925b1a5b..d09aa86f 100644 --- a/mux.go +++ b/mux.go @@ -57,6 +57,8 @@ type mux struct { mu []sync.Mutex maxp int maxm int + + usePool bool } func makeMux(dst string, option *ClientOption, dialFn dialFn) *mux { @@ -90,6 +92,8 @@ func newMux(dst string, option *ClientOption, init, dead wire, wireFn wireFn, wi sc: make([]*singleconnect, multiplex), maxp: runtime.GOMAXPROCS(0), maxm: option.BlockingPipeline, + + usePool: option.DisableAutoPipelining, } m.clhks.Store(emptyclhks) for i := 0; i < len(m.wire); i++ { @@ -200,7 +204,7 @@ func (m *mux) DoMultiStream(ctx context.Context, multi ...Completed) MultiRedisR } func (m *mux) Do(ctx context.Context, cmd Completed) (resp RedisResult) { - if cmd.IsBlock() { + if (m.usePool && !cmd.NoReply()) || cmd.IsBlock() { resp = m.blocking(ctx, cmd) } else { resp = m.pipeline(ctx, cmd) @@ -209,7 +213,7 @@ func (m *mux) Do(ctx context.Context, cmd Completed) (resp RedisResult) { } func (m *mux) DoMulti(ctx context.Context, multi ...Completed) (resp *redisresults) { - if len(multi) >= m.maxm && m.maxm > 0 { + if m.usePool || (len(multi) >= m.maxm && m.maxm > 0) { goto block // use a dedicated connection if the pipeline is too large } for _, cmd := range multi { @@ -220,21 +224,26 @@ func (m *mux) DoMulti(ctx context.Context, multi ...Completed) (resp *redisresul return m.pipelineMulti(ctx, multi) block: cmds.ToBlock(&multi[0]) // mark the first cmd as block if one of them is block to shortcut later check. - return m.blockingMulti(ctx, multi) + for _, cmd := range multi { + if cmd.NoReply() { + return m.blockingMulti(ctx, m.dpool, multi) + } + } + return m.blockingMulti(ctx, m.spool, multi) } func (m *mux) blocking(ctx context.Context, cmd Completed) (resp RedisResult) { - wire := m.dpool.Acquire() + wire := m.spool.Acquire() resp = wire.Do(ctx, cmd) if resp.NonRedisError() != nil { // abort the wire if blocking command return early (ex. context.DeadlineExceeded) wire.Close() } - m.dpool.Store(wire) + m.spool.Store(wire) return resp } -func (m *mux) blockingMulti(ctx context.Context, cmd []Completed) (resp *redisresults) { - wire := m.dpool.Acquire() +func (m *mux) blockingMulti(ctx context.Context, pool *pool, cmd []Completed) (resp *redisresults) { + wire := pool.Acquire() resp = wire.DoMulti(ctx, cmd...) for _, res := range resp.s { if res.NonRedisError() != nil { // abort the wire if blocking command return early (ex. context.DeadlineExceeded) @@ -242,7 +251,7 @@ func (m *mux) blockingMulti(ctx context.Context, cmd []Completed) (resp *redisre break } } - m.dpool.Store(wire) + pool.Store(wire) return resp } diff --git a/mux_test.go b/mux_test.go index 49dc7395..0eb2ad72 100644 --- a/mux_test.go +++ b/mux_test.go @@ -202,7 +202,7 @@ func TestMuxReuseWire(t *testing.T) { t.Fatalf("unexpected dial error %v", err) } - wire1 := m.Acquire() + wire1 := m.spool.Acquire() go func() { // this should use the second wire @@ -215,7 +215,7 @@ func TestMuxReuseWire(t *testing.T) { }() <-blocking - m.Store(wire1) + m.spool.Store(wire1) // this should use the first wire if val, err := m.Do(context.Background(), cmds.NewBlockingCompleted([]string{"PING"})).ToString(); err != nil { t.Fatalf("unexpected error %v", err) diff --git a/pipe.go b/pipe.go index 3096064a..fb90aa1e 100644 --- a/pipe.go +++ b/pipe.go @@ -91,15 +91,9 @@ func _newPipe(connFn func() (net.Conn, error), option *ClientOption, r2ps, nobg return nil, err } p = &pipe{ - conn: conn, - queue: newRing(option.RingScaleEachConn), - r: bufio.NewReaderSize(conn, option.ReadBufferEachConn), - w: bufio.NewWriterSize(conn, option.WriteBufferEachConn), - - nsubs: newSubs(), - psubs: newSubs(), - ssubs: newSubs(), - close: make(chan struct{}), + conn: conn, + r: bufio.NewReaderSize(conn, option.ReadBufferEachConn), + w: bufio.NewWriterSize(conn, option.WriteBufferEachConn), timeout: option.ConnWriteTimeout, pinggap: option.Dialer.KeepAlive, @@ -107,6 +101,13 @@ func _newPipe(connFn func() (net.Conn, error), option *ClientOption, r2ps, nobg r2ps: r2ps, } + if !nobg { + p.queue = newRing(option.RingScaleEachConn) + p.nsubs = newSubs() + p.psubs = newSubs() + p.ssubs = newSubs() + p.close = make(chan struct{}) + } if !r2ps { p.r2psFn = func() (p *pipe, err error) { return _newPipe(connFn, option, true, nobg) @@ -305,8 +306,10 @@ func _newPipe(connFn func() (net.Conn, error), option *ClientOption, r2ps, nobg } func (p *pipe) background() { - atomic.CompareAndSwapInt32(&p.state, 0, 1) - p.once.Do(func() { go p._background() }) + if p.queue != nil { + atomic.CompareAndSwapInt32(&p.state, 0, 1) + p.once.Do(func() { go p._background() }) + } } func (p *pipe) _exit(err error) { @@ -825,7 +828,7 @@ func (p *pipe) Do(ctx context.Context, cmd Completed) (resp RedisResult) { goto queue } dl, ok := ctx.Deadline() - if !ok && ctx.Done() != nil { + if p.queue != nil && !ok && ctx.Done() != nil { p.background() goto queue } @@ -925,7 +928,7 @@ func (p *pipe) DoMulti(ctx context.Context, multi ...Completed) *redisresults { goto queue } dl, ok := ctx.Deadline() - if !ok && ctx.Done() != nil { + if p.queue != nil && !ok && ctx.Done() != nil { p.background() goto queue } diff --git a/redis_test.go b/redis_test.go index f531593f..e50c5db5 100644 --- a/redis_test.go +++ b/redis_test.go @@ -5,6 +5,7 @@ import ( "context" "math/rand" "net" + "os" "reflect" "strconv" "sync" @@ -805,10 +806,13 @@ func TestSingleClientIntegration(t *testing.T) { t.Skip() } defer ShouldNotLeaked(SetupLeakDetection()) + client, err := NewClient(ClientOption{ InitAddress: []string{"127.0.0.1:6379"}, ConnWriteTimeout: 180 * time.Second, PipelineMultiplex: 1, + + DisableAutoPipelining: os.Getenv("DisableAutoPipelining") == "true", }) if err != nil { t.Fatal(err) @@ -825,6 +829,7 @@ func TestSentinelClientIntegration(t *testing.T) { t.Skip() } defer ShouldNotLeaked(SetupLeakDetection()) + client, err := NewClient(ClientOption{ InitAddress: []string{"127.0.0.1:26379"}, ConnWriteTimeout: 180 * time.Second, @@ -833,6 +838,8 @@ func TestSentinelClientIntegration(t *testing.T) { }, SelectDB: 2, // https://github.com/redis/rueidis/issues/138 PipelineMultiplex: 1, + + DisableAutoPipelining: os.Getenv("DisableAutoPipelining") == "true", }) if err != nil { t.Fatal(err) @@ -849,12 +856,15 @@ func TestClusterClientIntegration(t *testing.T) { t.Skip() } defer ShouldNotLeaked(SetupLeakDetection()) + client, err := NewClient(ClientOption{ InitAddress: []string{"127.0.0.1:7001", "127.0.0.1:7002", "127.0.0.1:7003"}, ConnWriteTimeout: 180 * time.Second, ShuffleInit: true, Dialer: net.Dialer{KeepAlive: -1}, PipelineMultiplex: 1, + + DisableAutoPipelining: os.Getenv("DisableAutoPipelining") == "true", }) if err != nil { t.Fatal(err) diff --git a/rueidis.go b/rueidis.go index 169ae8b9..6b13974b 100644 --- a/rueidis.go +++ b/rueidis.go @@ -167,6 +167,8 @@ type ClientOption struct { DisableRetry bool // DisableCache falls back Client.DoCache/Client.DoMultiCache to Client.Do/Client.DoMulti DisableCache bool + // DisableAutoPipelining makes rueidis.Client always pick a connection from the BlockingPool to serve each request. + DisableAutoPipelining bool // AlwaysPipelining makes rueidis.Client always pipeline redis commands even if they are not issued concurrently. AlwaysPipelining bool // AlwaysRESP2 makes rueidis.Client always uses RESP2, otherwise it will try using RESP3 first. @@ -354,6 +356,9 @@ func NewClient(option ClientOption) (client Client, err error) { if option.BlockingPipeline == 0 { option.BlockingPipeline = DefaultBlockingPipeline } + if option.DisableAutoPipelining { + option.AlwaysPipelining = false + } if option.ShuffleInit { util.Shuffle(len(option.InitAddress), func(i, j int) { option.InitAddress[i], option.InitAddress[j] = option.InitAddress[j], option.InitAddress[i]