Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: broken tx retries for cluster clients after #697 #709

Merged
merged 3 commits into from
Dec 24, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 69 additions & 30 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -525,9 +525,8 @@ func (c *clusterClient) toReplica(cmd Completed) bool {
return false
}

func (c *clusterClient) _pickMulti(multi []Completed) (retries *connretry) {
func (c *clusterClient) _pickMulti(multi []Completed) (retries *connretry, init bool) {
last := cmds.InitSlot
init := false

for _, cmd := range multi {
if cmd.Slot() == cmds.InitSlot {
Expand All @@ -550,7 +549,7 @@ func (c *clusterClient) _pickMulti(multi []Completed) (retries *connretry) {
cc = c.pslots[cmd.Slot()]
}
if cc == nil {
return nil
return nil, false
}
count.m[cc]++
}
Expand All @@ -569,13 +568,13 @@ func (c *clusterClient) _pickMulti(multi []Completed) (retries *connretry) {
cc = c.pslots[cmd.Slot()]
}
if cc == nil { // check cc == nil again in case of non-deterministic SendToReplicas.
return nil
return nil, false
}
re := retries.m[cc]
re.commands = append(re.commands, cmd)
re.cIndexes = append(re.cIndexes, i)
}
return retries
return retries, init
}

inits := 0
Expand All @@ -589,25 +588,28 @@ func (c *clusterClient) _pickMulti(multi []Completed) (retries *connretry) {
} else if init && last != cmd.Slot() {
panic(panicMixCxSlot)
}
p := c.pslots[cmd.Slot()]
if p == nil {
return nil
cc := c.pslots[cmd.Slot()]
if cc == nil {
return nil, false
}
count.m[p]++
count.m[cc]++
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make naming consistent.

}

if last == cmds.InitSlot {
// if all commands have no slots, such as INFO, we pick a non-nil slot.
for i, p := range c.pslots {
if p != nil {
for i, cc := range c.pslots {
if cc != nil {
last = uint16(i)
count.m[p] = inits
count.m[cc] = inits
break
}
}
if last == cmds.InitSlot {
return nil
return nil, false
}
} else if init {
cc := c.pslots[last]
count.m[cc] += inits
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't count cmd.InitSlot commands to c.pslots[last] in the code above this, so add the count back here.

}

retries = connretryp.Get(len(count.m), len(count.m))
Expand All @@ -627,25 +629,34 @@ func (c *clusterClient) _pickMulti(multi []Completed) (retries *connretry) {
re.commands = append(re.commands, cmd)
re.cIndexes = append(re.cIndexes, i)
}
return retries
return retries, init
}

func (c *clusterClient) pickMulti(ctx context.Context, multi []Completed) (*connretry, error) {
conns := c._pickMulti(multi)
func (c *clusterClient) pickMulti(ctx context.Context, multi []Completed) (*connretry, bool, error) {
conns, hasInit := c._pickMulti(multi)
if conns == nil {
if err := c.refresh(ctx); err != nil {
return nil, err
return nil, false, err
}
if conns = c._pickMulti(multi); conns == nil {
return nil, ErrNoSlot
if conns, hasInit = c._pickMulti(multi); conns == nil {
return nil, false, ErrNoSlot
}
}
return conns, nil
return conns, hasInit, nil
}

func isMulti(cmd Completed) bool {
return len(cmd.Commands()) == 1 && cmd.Commands()[0] == "MULTI"
}
func isExec(cmd Completed) bool {
return len(cmd.Commands()) == 1 && cmd.Commands()[0] == "EXEC"
}

func (c *clusterClient) doresultfn(
ctx context.Context, results *redisresults, retries *connretry, mu *sync.Mutex, cc conn, cIndexes []int, commands []Completed, resps []RedisResult, attempts int,
ctx context.Context, results *redisresults, retries *connretry, mu *sync.Mutex, cc conn, cIndexes []int, commands []Completed, resps []RedisResult, attempts int, hasInit bool,
) (clean bool) {
mi := -1
ei := -1
clean = true
for i, resp := range resps {
clean = clean && resp.NonRedisError() == nil
Expand All @@ -664,6 +675,37 @@ func (c *clusterClient) doresultfn(
} else {
nc = c.redirectOrNew(addr, cc, cm.Slot(), mode)
}
if hasInit && ei < i { // find out if there is a transaction block or not.
for mi = i; mi >= 0 && !isMulti(commands[mi]) && !isExec(commands[mi]); mi-- {
}
for ei = i; ei < len(commands) && !isMulti(commands[ei]) && !isExec(commands[ei]); ei++ {
}
Comment on lines +679 to +682
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both mi and ei cursor will stop at either MULTI or EXEC to avoid crossing tx boundaries.

if mi >= 0 && mi < ei && ei < len(commands) && isMulti(commands[mi]) && isExec(commands[ei]) && resps[mi].val.string == ok { // a transaction is found.
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the nearest MULTI didn't succeed, we don't retry the tx.

mu.Lock()
retries.Redirects++
nr := retries.m[nc]
if nr == nil {
nr = retryp.Get(0, len(commands))
retries.m[nc] = nr
}
for i := mi; i <= ei; i++ {
ii := cIndexes[i]
cm := commands[i]
if mode == RedirectAsk {
nr.aIndexes = append(nr.aIndexes, ii)
nr.cAskings = append(nr.cAskings, cm)
} else {
nr.cIndexes = append(nr.cIndexes, ii)
nr.commands = append(nr.commands, cm)
}
}
mu.Unlock()
continue // the transaction has been added to the retries, go to the next cmd.
}
}
if hasInit && mi < i && i < ei && mi >= 0 && ei < len(commands) && isMulti(commands[mi]) {
continue // the current cmd is in the processed transaction and has been added to the retries.
}
mu.Lock()
if mode != RedirectRetry {
retries.Redirects++
Expand All @@ -690,17 +732,17 @@ func (c *clusterClient) doresultfn(
}

func (c *clusterClient) doretry(
ctx context.Context, cc conn, results *redisresults, retries *connretry, re *retry, mu *sync.Mutex, wg *sync.WaitGroup, attempts int,
ctx context.Context, cc conn, results *redisresults, retries *connretry, re *retry, mu *sync.Mutex, wg *sync.WaitGroup, attempts int, hasInit bool,
) {
clean := true
if len(re.commands) != 0 {
resps := cc.DoMulti(ctx, re.commands...)
clean = c.doresultfn(ctx, results, retries, mu, cc, re.cIndexes, re.commands, resps.s, attempts)
clean = c.doresultfn(ctx, results, retries, mu, cc, re.cIndexes, re.commands, resps.s, attempts, hasInit)
resultsp.Put(resps)
}
if len(re.cAskings) != 0 {
resps := askingMulti(cc, ctx, re.cAskings)
clean = c.doresultfn(ctx, results, retries, mu, cc, re.aIndexes, re.cAskings, resps.s, attempts) && clean
clean = c.doresultfn(ctx, results, retries, mu, cc, re.aIndexes, re.cAskings, resps.s, attempts, hasInit) && clean
resultsp.Put(resps)
}
if clean {
Expand All @@ -714,7 +756,7 @@ func (c *clusterClient) DoMulti(ctx context.Context, multi ...Completed) []Redis
return nil
}

retries, err := c.pickMulti(ctx, multi)
retries, hasInit, err := c.pickMulti(ctx, multi)
if err != nil {
return fillErrs(len(multi), err)
}
Expand Down Expand Up @@ -742,18 +784,17 @@ retry:
}
for cc, re := range retries.m {
delete(retries.m, cc)
go c.doretry(ctx, cc, results, retries, re, &mu, &wg, attempts)
go c.doretry(ctx, cc, results, retries, re, &mu, &wg, attempts, hasInit)
}
mu.Unlock()
c.doretry(ctx, cc1, results, retries, re1, &mu, &wg, attempts)
c.doretry(ctx, cc1, results, retries, re1, &mu, &wg, attempts, hasInit)
wg.Wait()

if len(retries.m) != 0 {
if retries.Redirects > 0 {
retries.Redirects = 0
goto retry
}

if retries.RetryDelay >= 0 {
c.retryHandler.WaitForRetry(ctx, retries.RetryDelay)
attempts++
Expand Down Expand Up @@ -946,7 +987,6 @@ func (c *clusterClient) resultcachefn(
if !c.retry {
continue
}

retryDelay = c.retryHandler.RetryDelay(attempts, Completed(cm.Cmd), resp.Error())
} else {
nc = c.redirectOrNew(addr, cc, cm.Cmd.Slot(), mode)
Expand Down Expand Up @@ -1040,7 +1080,6 @@ retry:
retries.Redirects = 0
goto retry
}

if retries.RetryDelay >= 0 {
c.retryHandler.WaitForRetry(ctx, retries.RetryDelay)
attempts++
Expand Down
4 changes: 2 additions & 2 deletions syncp.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,8 +221,8 @@ func (r *conncount) ResetLen(n int) {
type connretry struct {
m map[conn]*retry
n int
RetryDelay time.Duration // NOTE: This is not thread-safe.
Redirects uint32 // NOTE: This is not thread-safe.
RetryDelay time.Duration // NOTE: It is not thread-safe.
}

func (r *connretry) Capacity() int {
Expand All @@ -238,8 +238,8 @@ func (r *connretry) ResetLen(n int) {
type connretrycache struct {
m map[conn]*retrycache
n int
RetryDelay time.Duration // NOTE: This is not thread-safe.
Redirects uint32 // NOTE: This is not thread-safe.
RetryDelay time.Duration // NOTE: It is not thread-safe.
}

func (r *connretrycache) Capacity() int {
Expand Down
Loading