Skip to content

Commit

Permalink
perf: reduce goroutines used by DoMulti/DoMultiCache in a cluster client
Browse files Browse the repository at this point in the history
Signed-off-by: Rueian <[email protected]>
  • Loading branch information
rueian committed Dec 13, 2024
1 parent 7e284ae commit 0975196
Showing 1 changed file with 26 additions and 6 deletions.
32 changes: 26 additions & 6 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -544,16 +544,16 @@ func (c *clusterClient) _pickMulti(multi []Completed) (retries *connretry) {

if !init && c.rslots != nil && c.opt.SendToReplicas != nil {
for _, cmd := range multi {
var p conn
var cc conn
if c.opt.SendToReplicas(cmd) {
p = c.rslots[cmd.Slot()]
cc = c.rslots[cmd.Slot()]
} else {
p = c.pslots[cmd.Slot()]
cc = c.pslots[cmd.Slot()]
}
if p == nil {
if cc == nil {
return nil
}
count.m[p]++
count.m[cc]++
}

retries = connretryp.Get(len(count.m), len(count.m))
Expand All @@ -569,7 +569,9 @@ func (c *clusterClient) _pickMulti(multi []Completed) (retries *connretry) {
} else {
cc = c.pslots[cmd.Slot()]
}

if cc == nil { // check cc == nil again in case of non-deterministic SendToReplicas.
return nil
}
re := retries.m[cc]
re.commands = append(re.commands, cmd)
re.cIndexes = append(re.cIndexes, i)
Expand Down Expand Up @@ -726,13 +728,22 @@ func (c *clusterClient) DoMulti(ctx context.Context, multi ...Completed) []Redis
retry:
retries.RetryDelay = -1 // Assume no retry. Because client retry flag can be set to false.

var cc1 conn
var re1 *retry
wg.Add(len(retries.m))
mu.Lock()
for cc, re := range retries.m {
delete(retries.m, cc)
cc1 = cc
re1 = re
break
}
for cc, re := range retries.m {
delete(retries.m, cc)
go c.doretry(ctx, cc, results, retries, re, &mu, &wg, attempts)
}
mu.Unlock()
c.doretry(ctx, cc1, results, retries, re1, &mu, &wg, attempts)
wg.Wait()

if len(retries.m) != 0 {
Expand Down Expand Up @@ -997,13 +1008,22 @@ func (c *clusterClient) DoMultiCache(ctx context.Context, multi ...CacheableTTL)
retry:
retries.RetryDelay = -1 // Assume no retry. Because client retry flag can be set to false.

var cc1 conn
var re1 *retrycache
wg.Add(len(retries.m))
mu.Lock()
for cc, re := range retries.m {
delete(retries.m, cc)
cc1 = cc
re1 = re
break
}
for cc, re := range retries.m {
delete(retries.m, cc)
go c.doretrycache(ctx, cc, results, retries, re, &mu, &wg, attempts)
}
mu.Unlock()
c.doretrycache(ctx, cc1, results, retries, re1, &mu, &wg, attempts)
wg.Wait()

if len(retries.m) != 0 {
Expand Down

0 comments on commit 0975196

Please sign in to comment.