Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: use dedicated connections for DoMulti() with 2000+ commands by default #631

Merged
merged 2 commits into from
Sep 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ type mux struct {
sc []*singleconnect
mu []sync.Mutex
maxp int
maxm int
}

func makeMux(dst string, option *ClientOption, dialFn dialFn) *mux {
Expand Down Expand Up @@ -88,6 +89,7 @@ func newMux(dst string, option *ClientOption, init, dead wire, wireFn wireFn, wi
mu: make([]sync.Mutex, multiplex),
sc: make([]*singleconnect, multiplex),
maxp: runtime.GOMAXPROCS(0),
maxm: option.BlockingPipeline,
}
m.clhks.Store(emptyclhks)
for i := 0; i < len(m.wire); i++ {
Expand Down Expand Up @@ -207,6 +209,9 @@ func (m *mux) Do(ctx context.Context, cmd Completed) (resp RedisResult) {
}

func (m *mux) DoMulti(ctx context.Context, multi ...Completed) (resp *redisresults) {
if len(multi) >= m.maxm && m.maxm > 0 {
goto block // use a dedicated connection if the pipeline is too large
}
for _, cmd := range multi {
if cmd.IsBlock() {
goto block
Expand Down
56 changes: 56 additions & 0 deletions mux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ func setupMuxWithOption(wires []*mockWire, option *ClientOption) (conn *mux, che
count++
return wires[count]
}
if option.BlockingPipeline == 0 {
option.BlockingPipeline = DefaultBlockingPipeline
}
return newMux("", option, (*mockWire)(nil), (*mockWire)(nil), wfn, wfn), func(t *testing.T) {
if count != len(wires)-1 {
t.Fatalf("there is %d remaining unused wires", len(wires)-count-1)
Expand Down Expand Up @@ -695,6 +698,59 @@ func TestMuxDelegation(t *testing.T) {
wg.Wait()
})

t.Run("multiple long pipeline", func(t *testing.T) {
blocked := make(chan struct{})
responses := make(chan RedisResult)

m, checkClean := setupMux([]*mockWire{
{
// leave first wire for pipeline calls
},
{
DoMultiFn: func(cmd ...Completed) *redisresults {
blocked <- struct{}{}
return &redisresults{s: []RedisResult{<-responses}}
},
},
{
DoMultiFn: func(cmd ...Completed) *redisresults {
blocked <- struct{}{}
return &redisresults{s: []RedisResult{<-responses}}
},
},
})
defer checkClean(t)
defer m.Close()
if err := m.Dial(); err != nil {
t.Fatalf("unexpected dial error %v", err)
}

wg := sync.WaitGroup{}
wg.Add(2)
for i := 0; i < 2; i++ {
go func() {
pipeline := make(Commands, DefaultBlockingPipeline)
for i := 0; i < len(pipeline); i++ {
pipeline[i] = cmds.NewCompleted([]string{"SET"})
}
if val, err := m.DoMulti(context.Background(), pipeline...).s[0].ToString(); err != nil {
t.Errorf("unexpected error %v", err)
} else if val != "BLOCK_COMMANDS_RESPONSE" {
t.Errorf("unexpected response %v", val)
} else {
wg.Done()
}
}()
}
for i := 0; i < 2; i++ {
<-blocked
}
for i := 0; i < 2; i++ {
responses <- newResult(RedisMessage{typ: '+', string: "BLOCK_COMMANDS_RESPONSE"}, nil)
}
wg.Wait()
})

t.Run("multi blocking no recycle the wire if err", func(t *testing.T) {
closed := false
m, checkClean := setupMux([]*mockWire{
Expand Down
7 changes: 7 additions & 0 deletions rueidis.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ const (
DefaultRingScale = 10
// DefaultPoolSize is the default value of ClientOption.BlockingPoolSize
DefaultPoolSize = 1000
// DefaultBlockingPipeline is the default value of ClientOption.BlockingPipeline
DefaultBlockingPipeline = 2000
// DefaultDialTimeout is the default value of ClientOption.Dialer.Timeout
DefaultDialTimeout = 5 * time.Second
// DefaultTCPKeepAlive is the default value of ClientOption.Dialer.KeepAlive
Expand Down Expand Up @@ -132,6 +134,8 @@ type ClientOption struct {
// BlockingPoolSize is the size of the connection pool shared by blocking commands (ex BLPOP, XREAD with BLOCK).
// The default is DefaultPoolSize.
BlockingPoolSize int
// BlockingPipeline is the threshold of a pipeline that will be treated as blocking commands when exceeding it.
BlockingPipeline int

// PipelineMultiplex determines how many tcp connections used to pipeline commands to one redis instance.
// The default for single and sentinel clients is 2, which means 4 connections (2^2).
Expand Down Expand Up @@ -336,6 +340,9 @@ func NewClient(option ClientOption) (client Client, err error) {
if option.ConnWriteTimeout == 0 {
option.ConnWriteTimeout = option.Dialer.KeepAlive * 10
}
if option.BlockingPipeline == 0 {
option.BlockingPipeline = DefaultBlockingPipeline
}
if option.ShuffleInit {
util.Shuffle(len(option.InitAddress), func(i, j int) {
option.InitAddress[i], option.InitAddress[j] = option.InitAddress[j], option.InitAddress[i]
Expand Down