Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add DisableAutoPipelining to serve requests from the conn pool #646

Merged
merged 1 commit into from
Oct 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 16 additions & 7 deletions mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ type mux struct {
mu []sync.Mutex
maxp int
maxm int

usePool bool
}

func makeMux(dst string, option *ClientOption, dialFn dialFn) *mux {
Expand Down Expand Up @@ -90,6 +92,8 @@ func newMux(dst string, option *ClientOption, init, dead wire, wireFn wireFn, wi
sc: make([]*singleconnect, multiplex),
maxp: runtime.GOMAXPROCS(0),
maxm: option.BlockingPipeline,

usePool: option.DisableAutoPipelining,
}
m.clhks.Store(emptyclhks)
for i := 0; i < len(m.wire); i++ {
Expand Down Expand Up @@ -200,7 +204,7 @@ func (m *mux) DoMultiStream(ctx context.Context, multi ...Completed) MultiRedisR
}

func (m *mux) Do(ctx context.Context, cmd Completed) (resp RedisResult) {
if cmd.IsBlock() {
if (m.usePool && !cmd.NoReply()) || cmd.IsBlock() {
resp = m.blocking(ctx, cmd)
} else {
resp = m.pipeline(ctx, cmd)
Expand All @@ -209,40 +213,45 @@ func (m *mux) Do(ctx context.Context, cmd Completed) (resp RedisResult) {
}

func (m *mux) DoMulti(ctx context.Context, multi ...Completed) (resp *redisresults) {
if len(multi) >= m.maxm && m.maxm > 0 {
if m.usePool || (len(multi) >= m.maxm && m.maxm > 0) {
goto block // use a dedicated connection if the pipeline is too large
}
for _, cmd := range multi {
if cmd.IsBlock() {
cmds.ToBlock(&multi[0]) // mark the first cmd as block if one of them is block to shortcut later check.
goto block
}
}
return m.pipelineMulti(ctx, multi)
block:
cmds.ToBlock(&multi[0]) // mark the first cmd as block if one of them is block to shortcut later check.
for _, cmd := range multi {
if cmd.NoReply() {
return m.pipelineMulti(ctx, multi)
}
}
return m.blockingMulti(ctx, multi)
}

func (m *mux) blocking(ctx context.Context, cmd Completed) (resp RedisResult) {
wire := m.dpool.Acquire()
wire := m.spool.Acquire()
resp = wire.Do(ctx, cmd)
if resp.NonRedisError() != nil { // abort the wire if blocking command return early (ex. context.DeadlineExceeded)
wire.Close()
}
m.dpool.Store(wire)
m.spool.Store(wire)
return resp
}

func (m *mux) blockingMulti(ctx context.Context, cmd []Completed) (resp *redisresults) {
wire := m.dpool.Acquire()
wire := m.spool.Acquire()
resp = wire.DoMulti(ctx, cmd...)
for _, res := range resp.s {
if res.NonRedisError() != nil { // abort the wire if blocking command return early (ex. context.DeadlineExceeded)
wire.Close()
break
}
}
m.dpool.Store(wire)
m.spool.Store(wire)
return resp
}

Expand Down
4 changes: 2 additions & 2 deletions mux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ func TestMuxReuseWire(t *testing.T) {
t.Fatalf("unexpected dial error %v", err)
}

wire1 := m.Acquire()
wire1 := m.spool.Acquire()

go func() {
// this should use the second wire
Expand All @@ -215,7 +215,7 @@ func TestMuxReuseWire(t *testing.T) {
}()
<-blocking

m.Store(wire1)
m.spool.Store(wire1)
// this should use the first wire
if val, err := m.Do(context.Background(), cmds.NewBlockingCompleted([]string{"PING"})).ToString(); err != nil {
t.Fatalf("unexpected error %v", err)
Expand Down
35 changes: 22 additions & 13 deletions pipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,22 +91,23 @@ func _newPipe(connFn func() (net.Conn, error), option *ClientOption, r2ps, nobg
return nil, err
}
p = &pipe{
conn: conn,
queue: newRing(option.RingScaleEachConn),
r: bufio.NewReaderSize(conn, option.ReadBufferEachConn),
w: bufio.NewWriterSize(conn, option.WriteBufferEachConn),

nsubs: newSubs(),
psubs: newSubs(),
ssubs: newSubs(),
close: make(chan struct{}),
conn: conn,
r: bufio.NewReaderSize(conn, option.ReadBufferEachConn),
w: bufio.NewWriterSize(conn, option.WriteBufferEachConn),

timeout: option.ConnWriteTimeout,
pinggap: option.Dialer.KeepAlive,
maxFlushDelay: option.MaxFlushDelay,

r2ps: r2ps,
}
if !nobg {
p.queue = newRing(option.RingScaleEachConn)
p.nsubs = newSubs()
p.psubs = newSubs()
p.ssubs = newSubs()
p.close = make(chan struct{})
}
if !r2ps {
p.r2psFn = func() (p *pipe, err error) {
return _newPipe(connFn, option, true, nobg)
Expand Down Expand Up @@ -305,8 +306,10 @@ func _newPipe(connFn func() (net.Conn, error), option *ClientOption, r2ps, nobg
}

func (p *pipe) background() {
atomic.CompareAndSwapInt32(&p.state, 0, 1)
p.once.Do(func() { go p._background() })
if p.queue != nil {
atomic.CompareAndSwapInt32(&p.state, 0, 1)
p.once.Do(func() { go p._background() })
}
}

func (p *pipe) _exit(err error) {
Expand Down Expand Up @@ -825,7 +828,7 @@ func (p *pipe) Do(ctx context.Context, cmd Completed) (resp RedisResult) {
goto queue
}
dl, ok := ctx.Deadline()
if !ok && ctx.Done() != nil {
if p.queue != nil && !ok && ctx.Done() != nil {
p.background()
goto queue
}
Expand Down Expand Up @@ -896,6 +899,12 @@ func (p *pipe) DoMulti(ctx context.Context, multi ...Completed) *redisresults {

for _, cmd := range multi {
if cmd.IsBlock() {
if noReply != 0 {
for i := 0; i < len(resp.s); i++ {
resp.s[i] = newErrResult(ErrBlockingPubSubMixed)
}
return resp
}
atomic.AddInt32(&p.blcksig, 1)
defer func() {
for _, r := range resp.s {
Expand Down Expand Up @@ -925,7 +934,7 @@ func (p *pipe) DoMulti(ctx context.Context, multi ...Completed) *redisresults {
goto queue
}
dl, ok := ctx.Deadline()
if !ok && ctx.Done() != nil {
if p.queue != nil && !ok && ctx.Done() != nil {
p.background()
goto queue
}
Expand Down
16 changes: 16 additions & 0 deletions pipe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3033,6 +3033,22 @@ func TestPubSub(t *testing.T) {
}
})

t.Run("PubSub blocking mixed", func(t *testing.T) {
p, _, cancel, _ := setup(t, ClientOption{})
defer cancel()

commands := []Completed{
builder.Subscribe().Channel("a").Build(),
builder.Psubscribe().Pattern("b").Build(),
builder.Blpop().Key("c").Timeout(0).Build(),
}
for _, resp := range p.DoMulti(context.Background(), commands...).s {
if e := resp.Error(); e != ErrBlockingPubSubMixed {
t.Fatalf("unexpected err %v", e)
}
}
})

t.Run("RESP2 pubsub mixed", func(t *testing.T) {
p, _, cancel, _ := setup(t, ClientOption{})
p.version = 5
Expand Down
28 changes: 28 additions & 0 deletions redis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"math/rand"
"net"
"os"
"reflect"
"strconv"
"sync"
Expand Down Expand Up @@ -805,10 +806,13 @@ func TestSingleClientIntegration(t *testing.T) {
t.Skip()
}
defer ShouldNotLeaked(SetupLeakDetection())

client, err := NewClient(ClientOption{
InitAddress: []string{"127.0.0.1:6379"},
ConnWriteTimeout: 180 * time.Second,
PipelineMultiplex: 1,

DisableAutoPipelining: os.Getenv("DisableAutoPipelining") == "true",
})
if err != nil {
t.Fatal(err)
Expand All @@ -820,11 +824,18 @@ func TestSingleClientIntegration(t *testing.T) {
client.Close()
}

func TestSingleClientIntegrationWithPool(t *testing.T) {
os.Setenv("DisableAutoPipelining", "true")
defer os.Unsetenv("DisableAutoPipelining")
TestSingleClientIntegration(t)
}

func TestSentinelClientIntegration(t *testing.T) {
if testing.Short() {
t.Skip()
}
defer ShouldNotLeaked(SetupLeakDetection())

client, err := NewClient(ClientOption{
InitAddress: []string{"127.0.0.1:26379"},
ConnWriteTimeout: 180 * time.Second,
Expand All @@ -833,6 +844,8 @@ func TestSentinelClientIntegration(t *testing.T) {
},
SelectDB: 2, // https://github.com/redis/rueidis/issues/138
PipelineMultiplex: 1,

DisableAutoPipelining: os.Getenv("DisableAutoPipelining") == "true",
})
if err != nil {
t.Fatal(err)
Expand All @@ -844,17 +857,26 @@ func TestSentinelClientIntegration(t *testing.T) {
client.Close()
}

func TestSentinelClientIntegrationWithPool(t *testing.T) {
os.Setenv("DisableAutoPipelining", "true")
defer os.Unsetenv("DisableAutoPipelining")
TestSentinelClientIntegration(t)
}

func TestClusterClientIntegration(t *testing.T) {
if testing.Short() {
t.Skip()
}
defer ShouldNotLeaked(SetupLeakDetection())

client, err := NewClient(ClientOption{
InitAddress: []string{"127.0.0.1:7001", "127.0.0.1:7002", "127.0.0.1:7003"},
ConnWriteTimeout: 180 * time.Second,
ShuffleInit: true,
Dialer: net.Dialer{KeepAlive: -1},
PipelineMultiplex: 1,

DisableAutoPipelining: os.Getenv("DisableAutoPipelining") == "true",
})
if err != nil {
t.Fatal(err)
Expand All @@ -864,6 +886,12 @@ func TestClusterClientIntegration(t *testing.T) {
client.Close()
}

func TestClusterClientIntegrationWithPool(t *testing.T) {
os.Setenv("DisableAutoPipelining", "true")
defer os.Unsetenv("DisableAutoPipelining")
TestClusterClientIntegration(t)
}

func TestSingleClient5Integration(t *testing.T) {
if testing.Short() {
t.Skip()
Expand Down
7 changes: 7 additions & 0 deletions rueidis.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ var (
ErrNoCache = errors.New("ClientOption.DisableCache must be true for redis not supporting client-side caching or not supporting RESP3")
// ErrRESP2PubSubMixed means your redis does not support RESP3 and rueidis can't handle SUBSCRIBE/PSUBSCRIBE/SSUBSCRIBE in mixed case
ErrRESP2PubSubMixed = errors.New("rueidis does not support SUBSCRIBE/PSUBSCRIBE/SSUBSCRIBE mixed with other commands in RESP2")
// ErrBlockingPubSubMixed rueidis can't handle SUBSCRIBE/PSUBSCRIBE/SSUBSCRIBE mixed with other blocking commands
ErrBlockingPubSubMixed = errors.New("rueidis does not support SUBSCRIBE/PSUBSCRIBE/SSUBSCRIBE mixed with other blocking commands")
// ErrDoCacheAborted means redis abort EXEC request or connection closed
ErrDoCacheAborted = errors.New("failed to fetch the cache because EXEC was aborted by redis or connection closed")
// ErrReplicaOnlyNotSupported means ReplicaOnly flag is not supported by
Expand Down Expand Up @@ -167,6 +169,8 @@ type ClientOption struct {
DisableRetry bool
// DisableCache falls back Client.DoCache/Client.DoMultiCache to Client.Do/Client.DoMulti
DisableCache bool
// DisableAutoPipelining makes rueidis.Client always pick a connection from the BlockingPool to serve each request.
DisableAutoPipelining bool
// AlwaysPipelining makes rueidis.Client always pipeline redis commands even if they are not issued concurrently.
AlwaysPipelining bool
// AlwaysRESP2 makes rueidis.Client always uses RESP2, otherwise it will try using RESP3 first.
Expand Down Expand Up @@ -354,6 +358,9 @@ func NewClient(option ClientOption) (client Client, err error) {
if option.BlockingPipeline == 0 {
option.BlockingPipeline = DefaultBlockingPipeline
}
if option.DisableAutoPipelining {
option.AlwaysPipelining = false
}
if option.ShuffleInit {
util.Shuffle(len(option.InitAddress), func(i, j int) {
option.InitAddress[i], option.InitAddress[j] = option.InitAddress[j], option.InitAddress[i]
Expand Down