From 383a2a1aed0d828ed6d3577c9d36222f06fe1458 Mon Sep 17 00:00:00 2001 From: wyxloading Date: Fri, 13 Dec 2024 22:17:11 +0800 Subject: [PATCH] fix: single slot DoMulti writes should be retried on MOVED / ASK (#697) Co-authored-by: wuyuxiang --- cluster.go | 84 +++++++++++++++++-------------------------------- cluster_test.go | 69 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 98 insertions(+), 55 deletions(-) diff --git a/cluster.go b/cluster.go index 8babd083..04b175b3 100644 --- a/cluster.go +++ b/cluster.go @@ -555,10 +555,17 @@ func (c *clusterClient) _pickMulti(multi []Completed) (retries *connretry, last return retries, last, toReplica } + inits := 0 for _, cmd := range multi { if cmd.Slot() == cmds.InitSlot { + inits++ continue } + if last == cmds.InitSlot { + last = cmd.Slot() + } else if init && last != cmd.Slot() { + panic(panicMixCxSlot) + } p := c.pslots[cmd.Slot()] if p == nil { return nil, 0, false @@ -566,6 +573,20 @@ func (c *clusterClient) _pickMulti(multi []Completed) (retries *connretry, last count.m[p]++ } + if last == cmds.InitSlot { + // if all commands have no slots, such as INFO, we pick a non-nil slot. + for i, p := range c.pslots { + if p != nil { + last = uint16(i) + count.m[p] = inits + break + } + } + if last == cmds.InitSlot { + return nil, 0, false + } + } + retries = connretryp.Get(len(count.m), len(count.m)) for cc, n := range count.m { retries.m[cc] = retryp.Get(0, n) @@ -573,17 +594,15 @@ func (c *clusterClient) _pickMulti(multi []Completed) (retries *connretry, last conncountp.Put(count) for i, cmd := range multi { + var cc conn if cmd.Slot() != cmds.InitSlot { - if last == cmds.InitSlot { - last = cmd.Slot() - } else if init && last != cmd.Slot() { - panic(panicMixCxSlot) - } - cc := c.pslots[cmd.Slot()] - re := retries.m[cc] - re.commands = append(re.commands, cmd) - re.cIndexes = append(re.cIndexes, i) + cc = c.pslots[cmd.Slot()] + } else { + cc = c.pslots[last] } + re := retries.m[cc] + re.commands = append(re.commands, cmd) + re.cIndexes = append(re.cIndexes, i) } return retries, last, false } @@ -669,19 +688,12 @@ func (c *clusterClient) DoMulti(ctx context.Context, multi ...Completed) []Redis return nil } - retries, slot, toReplica, err := c.pickMulti(ctx, multi) + retries, _, _, err := c.pickMulti(ctx, multi) if err != nil { return fillErrs(len(multi), err) } defer connretryp.Put(retries) - if len(retries.m) <= 1 { - for _, re := range retries.m { - retryp.Put(re) - } - return c.doMulti(ctx, slot, multi, toReplica) - } - var wg sync.WaitGroup var mu sync.Mutex @@ -730,44 +742,6 @@ func fillErrs(n int, err error) (results []RedisResult) { return results } -func (c *clusterClient) doMulti(ctx context.Context, slot uint16, multi []Completed, toReplica bool) []RedisResult { - attempts := 1 - -retry: - cc, err := c.pick(ctx, slot, toReplica) - if err != nil { - return fillErrs(len(multi), err) - } - resps := cc.DoMulti(ctx, multi...) -process: - for i, resp := range resps.s { - switch addr, mode := c.shouldRefreshRetry(resp.Error(), ctx); mode { - case RedirectMove: - if c.retry && allReadOnly(multi) { - resultsp.Put(resps) - resps = c.redirectOrNew(addr, cc, multi[i].Slot(), mode).DoMulti(ctx, multi...) - goto process - } - case RedirectAsk: - if c.retry && allReadOnly(multi) { - resultsp.Put(resps) - resps = askingMulti(c.redirectOrNew(addr, cc, multi[i].Slot(), mode), ctx, multi) - goto process - } - case RedirectRetry: - if c.retry && allReadOnly(multi) { - shouldRetry := c.retryHandler.WaitOrSkipRetry(ctx, attempts, multi[i], resp.Error()) - if shouldRetry { - resultsp.Put(resps) - attempts++ - goto retry - } - } - } - } - return resps.s -} - func (c *clusterClient) doCache(ctx context.Context, cmd Cacheable, ttl time.Duration) (resp RedisResult) { attempts := 1 diff --git a/cluster_test.go b/cluster_test.go index df7ddc77..9a152cb2 100644 --- a/cluster_test.go +++ b/cluster_test.go @@ -5239,3 +5239,72 @@ func TestClusterClientLoadingRetry(t *testing.T) { } }) } + +func TestClusterClientMovedRetry(t *testing.T) { + defer ShouldNotLeaked(SetupLeakDetection()) + + setup := func() (*clusterClient, *mockConn) { + m := &mockConn{ + DoFn: func(cmd Completed) RedisResult { + if strings.Join(cmd.Commands(), " ") == "CLUSTER SLOTS" { + return slotsMultiResp + } + return RedisResult{} + }, + } + client, err := newClusterClient( + &ClientOption{InitAddress: []string{":0"}}, + func(dst string, opt *ClientOption) conn { return m }, + newRetryer(defaultRetryDelayFn), + ) + if err != nil { + t.Fatalf("unexpected err %v", err) + } + return client, m + } + + t.Run("DoMulti Retry on MOVED", func(t *testing.T) { + client, m := setup() + + attempts := 0 + m.DoMultiFn = func(multi ...Completed) *redisresults { + attempts++ + if attempts == 1 { + return &redisresults{s: []RedisResult{newResult(RedisMessage{typ: '-', string: "MOVED 0 127.0.0.1"}, nil)}} + } + return &redisresults{s: []RedisResult{newResult(RedisMessage{typ: '+', string: "OK"}, nil)}} + } + + cmd := client.B().Set().Key("test").Value(`test`).Build() + resps := client.DoMulti(context.Background(), cmd) + if len(resps) != 1 { + t.Fatalf("unexpected response length %v", len(resps)) + } + if v, err := resps[0].ToString(); err != nil || v != "OK" { + t.Fatalf("unexpected response %v %v", v, err) + } + }) + + t.Run("DoMulti Retry on ASK", func(t *testing.T) { + client, m := setup() + + attempts := 0 + m.DoMultiFn = func(multi ...Completed) *redisresults { + attempts++ + if attempts == 1 { + return &redisresults{s: []RedisResult{newResult(RedisMessage{typ: '-', string: "ASK 0 127.0.0.1"}, nil)}} + } + return &redisresults{s: []RedisResult{newResult(RedisMessage{typ: '+', string: "OK"}, nil), newResult(RedisMessage{typ: '+', string: "OK"}, nil)}} + } + + cmd := client.B().Set().Key("test").Value(`test`).Build() + resps := client.DoMulti(context.Background(), cmd) + if len(resps) != 1 { + t.Fatalf("unexpected response length %v", len(resps)) + } + if v, err := resps[0].ToString(); err != nil || v != "OK" { + t.Fatalf("unexpected response %v %v", v, err) + } + }) + +}