Skip to content

Commit

Permalink
feat: add DisableAutoPipelining to serve requests from the connection…
Browse files Browse the repository at this point in the history
… pool

Signed-off-by: Rueian <[email protected]>
  • Loading branch information
rueian committed Oct 12, 2024
1 parent fc348f5 commit 8871aab
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 23 deletions.
25 changes: 17 additions & 8 deletions mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ type mux struct {
mu []sync.Mutex
maxp int
maxm int

usePool bool
}

func makeMux(dst string, option *ClientOption, dialFn dialFn) *mux {
Expand Down Expand Up @@ -90,6 +92,8 @@ func newMux(dst string, option *ClientOption, init, dead wire, wireFn wireFn, wi
sc: make([]*singleconnect, multiplex),
maxp: runtime.GOMAXPROCS(0),
maxm: option.BlockingPipeline,

usePool: option.DisableAutoPipelining,
}
m.clhks.Store(emptyclhks)
for i := 0; i < len(m.wire); i++ {
Expand Down Expand Up @@ -200,7 +204,7 @@ func (m *mux) DoMultiStream(ctx context.Context, multi ...Completed) MultiRedisR
}

func (m *mux) Do(ctx context.Context, cmd Completed) (resp RedisResult) {
if cmd.IsBlock() {
if (m.usePool && !cmd.NoReply()) || cmd.IsBlock() {
resp = m.blocking(ctx, cmd)
} else {
resp = m.pipeline(ctx, cmd)
Expand All @@ -209,7 +213,7 @@ func (m *mux) Do(ctx context.Context, cmd Completed) (resp RedisResult) {
}

func (m *mux) DoMulti(ctx context.Context, multi ...Completed) (resp *redisresults) {
if len(multi) >= m.maxm && m.maxm > 0 {
if m.usePool || (len(multi) >= m.maxm && m.maxm > 0) {
goto block // use a dedicated connection if the pipeline is too large
}
for _, cmd := range multi {
Expand All @@ -220,29 +224,34 @@ func (m *mux) DoMulti(ctx context.Context, multi ...Completed) (resp *redisresul
return m.pipelineMulti(ctx, multi)
block:
cmds.ToBlock(&multi[0]) // mark the first cmd as block if one of them is block to shortcut later check.
return m.blockingMulti(ctx, multi)
for _, cmd := range multi {
if cmd.NoReply() {
return m.blockingMulti(ctx, m.dpool, multi)
}
}
return m.blockingMulti(ctx, m.spool, multi)
}

func (m *mux) blocking(ctx context.Context, cmd Completed) (resp RedisResult) {
wire := m.dpool.Acquire()
wire := m.spool.Acquire()
resp = wire.Do(ctx, cmd)
if resp.NonRedisError() != nil { // abort the wire if blocking command return early (ex. context.DeadlineExceeded)
wire.Close()
}
m.dpool.Store(wire)
m.spool.Store(wire)
return resp
}

func (m *mux) blockingMulti(ctx context.Context, cmd []Completed) (resp *redisresults) {
wire := m.dpool.Acquire()
func (m *mux) blockingMulti(ctx context.Context, pool *pool, cmd []Completed) (resp *redisresults) {
wire := pool.Acquire()
resp = wire.DoMulti(ctx, cmd...)
for _, res := range resp.s {
if res.NonRedisError() != nil { // abort the wire if blocking command return early (ex. context.DeadlineExceeded)
wire.Close()
break
}
}
m.dpool.Store(wire)
pool.Store(wire)
return resp
}

Expand Down
4 changes: 2 additions & 2 deletions mux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ func TestMuxReuseWire(t *testing.T) {
t.Fatalf("unexpected dial error %v", err)
}

wire1 := m.Acquire()
wire1 := m.spool.Acquire()

go func() {
// this should use the second wire
Expand All @@ -215,7 +215,7 @@ func TestMuxReuseWire(t *testing.T) {
}()
<-blocking

m.Store(wire1)
m.spool.Store(wire1)
// this should use the first wire
if val, err := m.Do(context.Background(), cmds.NewBlockingCompleted([]string{"PING"})).ToString(); err != nil {
t.Fatalf("unexpected error %v", err)
Expand Down
29 changes: 16 additions & 13 deletions pipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,22 +91,23 @@ func _newPipe(connFn func() (net.Conn, error), option *ClientOption, r2ps, nobg
return nil, err
}
p = &pipe{
conn: conn,
queue: newRing(option.RingScaleEachConn),
r: bufio.NewReaderSize(conn, option.ReadBufferEachConn),
w: bufio.NewWriterSize(conn, option.WriteBufferEachConn),

nsubs: newSubs(),
psubs: newSubs(),
ssubs: newSubs(),
close: make(chan struct{}),
conn: conn,
r: bufio.NewReaderSize(conn, option.ReadBufferEachConn),
w: bufio.NewWriterSize(conn, option.WriteBufferEachConn),

timeout: option.ConnWriteTimeout,
pinggap: option.Dialer.KeepAlive,
maxFlushDelay: option.MaxFlushDelay,

r2ps: r2ps,
}
if !nobg {
p.queue = newRing(option.RingScaleEachConn)
p.nsubs = newSubs()
p.psubs = newSubs()
p.ssubs = newSubs()
p.close = make(chan struct{})
}
if !r2ps {
p.r2psFn = func() (p *pipe, err error) {
return _newPipe(connFn, option, true, nobg)
Expand Down Expand Up @@ -305,8 +306,10 @@ func _newPipe(connFn func() (net.Conn, error), option *ClientOption, r2ps, nobg
}

func (p *pipe) background() {
atomic.CompareAndSwapInt32(&p.state, 0, 1)
p.once.Do(func() { go p._background() })
if p.queue != nil {
atomic.CompareAndSwapInt32(&p.state, 0, 1)
p.once.Do(func() { go p._background() })
}
}

func (p *pipe) _exit(err error) {
Expand Down Expand Up @@ -825,7 +828,7 @@ func (p *pipe) Do(ctx context.Context, cmd Completed) (resp RedisResult) {
goto queue
}
dl, ok := ctx.Deadline()
if !ok && ctx.Done() != nil {
if p.queue != nil && !ok && ctx.Done() != nil {
p.background()
goto queue
}
Expand Down Expand Up @@ -925,7 +928,7 @@ func (p *pipe) DoMulti(ctx context.Context, multi ...Completed) *redisresults {
goto queue
}
dl, ok := ctx.Deadline()
if !ok && ctx.Done() != nil {
if p.queue != nil && !ok && ctx.Done() != nil {
p.background()
goto queue
}
Expand Down
10 changes: 10 additions & 0 deletions redis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"math/rand"
"net"
"os"
"reflect"
"strconv"
"sync"
Expand Down Expand Up @@ -805,10 +806,13 @@ func TestSingleClientIntegration(t *testing.T) {
t.Skip()
}
defer ShouldNotLeaked(SetupLeakDetection())

client, err := NewClient(ClientOption{
InitAddress: []string{"127.0.0.1:6379"},
ConnWriteTimeout: 180 * time.Second,
PipelineMultiplex: 1,

DisableAutoPipelining: os.Getenv("DisableAutoPipelining") == "true",
})
if err != nil {
t.Fatal(err)
Expand All @@ -825,6 +829,7 @@ func TestSentinelClientIntegration(t *testing.T) {
t.Skip()
}
defer ShouldNotLeaked(SetupLeakDetection())

client, err := NewClient(ClientOption{
InitAddress: []string{"127.0.0.1:26379"},
ConnWriteTimeout: 180 * time.Second,
Expand All @@ -833,6 +838,8 @@ func TestSentinelClientIntegration(t *testing.T) {
},
SelectDB: 2, // https://github.com/redis/rueidis/issues/138
PipelineMultiplex: 1,

DisableAutoPipelining: os.Getenv("DisableAutoPipelining") == "true",
})
if err != nil {
t.Fatal(err)
Expand All @@ -849,12 +856,15 @@ func TestClusterClientIntegration(t *testing.T) {
t.Skip()
}
defer ShouldNotLeaked(SetupLeakDetection())

client, err := NewClient(ClientOption{
InitAddress: []string{"127.0.0.1:7001", "127.0.0.1:7002", "127.0.0.1:7003"},
ConnWriteTimeout: 180 * time.Second,
ShuffleInit: true,
Dialer: net.Dialer{KeepAlive: -1},
PipelineMultiplex: 1,

DisableAutoPipelining: os.Getenv("DisableAutoPipelining") == "true",
})
if err != nil {
t.Fatal(err)
Expand Down
5 changes: 5 additions & 0 deletions rueidis.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,8 @@ type ClientOption struct {
DisableRetry bool
// DisableCache falls back Client.DoCache/Client.DoMultiCache to Client.Do/Client.DoMulti
DisableCache bool
// DisableAutoPipelining makes rueidis.Client always pick a connection from the BlockingPool to serve each request.
DisableAutoPipelining bool
// AlwaysPipelining makes rueidis.Client always pipeline redis commands even if they are not issued concurrently.
AlwaysPipelining bool
// AlwaysRESP2 makes rueidis.Client always uses RESP2, otherwise it will try using RESP3 first.
Expand Down Expand Up @@ -354,6 +356,9 @@ func NewClient(option ClientOption) (client Client, err error) {
if option.BlockingPipeline == 0 {
option.BlockingPipeline = DefaultBlockingPipeline
}
if option.DisableAutoPipelining {
option.AlwaysPipelining = false
}
if option.ShuffleInit {
util.Shuffle(len(option.InitAddress), func(i, j int) {
option.InitAddress[i], option.InitAddress[j] = option.InitAddress[j], option.InitAddress[i]
Expand Down

0 comments on commit 8871aab

Please sign in to comment.