diff --git a/cluster.go b/cluster.go index 2b5e796b..5eb75a0d 100644 --- a/cluster.go +++ b/cluster.go @@ -703,7 +703,7 @@ func (c *clusterClient) doresultfn( continue // the transaction has been added to the retries, go to the next cmd. } } - if hasInit && mi < i && i < ei && mi >= 0 && ei < len(commands) && isMulti(commands[mi]) { + if hasInit && (mi < i && i < ei && mi >= 0 && isMulti(commands[mi]) || i > ei && ei >= 0 && isMulti(commands[ei])) { continue // the current cmd is in the processed transaction and has been added to the retries. } mu.Lock() diff --git a/cluster_test.go b/cluster_test.go index 5221648a..90053fb5 100644 --- a/cluster_test.go +++ b/cluster_test.go @@ -4108,6 +4108,479 @@ func TestClusterClientErr(t *testing.T) { } }) + t.Run("slot moved DoMulti transactions", func(t *testing.T) { + var count int64 + client, err := newClusterClient( + &ClientOption{InitAddress: []string{":0"}}, + func(dst string, opt *ClientOption) conn { + return &mockConn{DoFn: func(cmd Completed) RedisResult { + return slotsMultiResp + }, DoMultiFn: func(multi ...Completed) *redisresults { + switch atomic.AddInt64(&count, 1) { + case 1: + return &redisresults{s: []RedisResult{ + newResult(RedisMessage{typ: '+', string: "1"}, nil), + newResult(RedisMessage{typ: '+', string: "OK"}, nil), + newResult(RedisMessage{typ: '+', string: "QUEUED"}, nil), + newResult(RedisMessage{typ: '-', string: "MOVED 0 :1"}, nil), + newResult(RedisMessage{typ: '-', string: "EXECABORT"}, nil), + newResult(RedisMessage{typ: '+', string: "4"}, nil), + newResult(RedisMessage{typ: '+', string: "OK"}, nil), + newResult(RedisMessage{typ: '-', string: "MOVED 0 :1"}, nil), + newResult(RedisMessage{typ: '+', string: "QUEUED"}, nil), + newResult(RedisMessage{typ: '-', string: "EXECABORT"}, nil), + newResult(RedisMessage{typ: '+', string: "7"}, nil), + }} + case 2: + return &redisresults{s: []RedisResult{ + newResult(RedisMessage{typ: '+', string: "OK"}, nil), + newResult(RedisMessage{typ: '+', string: "QUEUED"}, nil), + newResult(RedisMessage{typ: '+', string: "QUEUED"}, nil), + newResult(RedisMessage{typ: '*', values: []RedisMessage{ + {typ: '+', string: "2"}, + {typ: '+', string: "3"}, + }}, nil), + newResult(RedisMessage{typ: '+', string: "OK"}, nil), + newResult(RedisMessage{typ: '+', string: "QUEUED"}, nil), + newResult(RedisMessage{typ: '+', string: "QUEUED"}, nil), + newResult(RedisMessage{typ: '*', values: []RedisMessage{ + {typ: '+', string: "5"}, + {typ: '+', string: "6"}, + }}, nil), + }} + } + return nil + }} + }, + newRetryer(defaultRetryDelayFn), + ) + if err != nil { + t.Fatalf("unexpected err %v", err) + } + resps := client.DoMulti( + context.Background(), + client.B().Get().Key("1{t}").Build(), + client.B().Multi().Build(), + client.B().Get().Key("2{t}").Build(), + client.B().Get().Key("3{t}").Build(), + client.B().Exec().Build(), + client.B().Get().Key("4{t}").Build(), + client.B().Multi().Build(), + client.B().Get().Key("5{t}").Build(), + client.B().Get().Key("6{t}").Build(), + client.B().Exec().Build(), + client.B().Get().Key("7{t}").Build(), + ) + if v, err := resps[0].ToString(); err != nil || v != "1" { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[1].ToString(); err != nil || v != "OK" { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[2].ToString(); err != nil || v != "QUEUED" { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[3].ToString(); err != nil || v != "QUEUED" { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[4].AsStrSlice(); err != nil || !reflect.DeepEqual(v, []string{"2", "3"}) { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[5].ToString(); err != nil || v != "4" { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[6].ToString(); err != nil || v != "OK" { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[7].ToString(); err != nil || v != "QUEUED" { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[8].ToString(); err != nil || v != "QUEUED" { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[9].AsStrSlice(); err != nil || !reflect.DeepEqual(v, []string{"5", "6"}) { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[10].ToString(); err != nil || v != "7" { + t.Fatalf("unexpected resp %v %v", v, err) + } + }) + + t.Run("slot moved DoMulti except transactions", func(t *testing.T) { + var count int64 + client, err := newClusterClient( + &ClientOption{InitAddress: []string{":0"}}, + func(dst string, opt *ClientOption) conn { + return &mockConn{DoFn: func(cmd Completed) RedisResult { + return slotsMultiResp + }, DoMultiFn: func(multi ...Completed) *redisresults { + switch atomic.AddInt64(&count, 1) { + case 1: + return &redisresults{s: []RedisResult{ + newResult(RedisMessage{typ: '-', string: "MOVED 0 :1"}, nil), + newResult(RedisMessage{typ: '+', string: "OK"}, nil), + newResult(RedisMessage{typ: '+', string: "QUEUED"}, nil), + newResult(RedisMessage{typ: '+', string: "QUEUED"}, nil), + newResult(RedisMessage{typ: '*', values: []RedisMessage{ + {typ: '+', string: "2"}, + {typ: '+', string: "3"}, + }}, nil), + newResult(RedisMessage{typ: '-', string: "MOVED 0 :1"}, nil), + newResult(RedisMessage{typ: '+', string: "OK"}, nil), + newResult(RedisMessage{typ: '+', string: "QUEUED"}, nil), + newResult(RedisMessage{typ: '+', string: "QUEUED"}, nil), + newResult(RedisMessage{typ: '*', values: []RedisMessage{ + {typ: '+', string: "5"}, + {typ: '+', string: "6"}, + }}, nil), + newResult(RedisMessage{typ: '-', string: "MOVED 0 :1"}, nil), + }} + case 2: + return &redisresults{s: []RedisResult{ + newResult(RedisMessage{typ: '+', string: "1"}, nil), + newResult(RedisMessage{typ: '+', string: "4"}, nil), + newResult(RedisMessage{typ: '+', string: "7"}, nil), + }} + } + return nil + }} + }, + newRetryer(defaultRetryDelayFn), + ) + if err != nil { + t.Fatalf("unexpected err %v", err) + } + resps := client.DoMulti( + context.Background(), + client.B().Get().Key("1{t}").Build(), + client.B().Multi().Build(), + client.B().Get().Key("2{t}").Build(), + client.B().Get().Key("3{t}").Build(), + client.B().Exec().Build(), + client.B().Get().Key("4{t}").Build(), + client.B().Multi().Build(), + client.B().Get().Key("5{t}").Build(), + client.B().Get().Key("6{t}").Build(), + client.B().Exec().Build(), + client.B().Get().Key("7{t}").Build(), + ) + if v, err := resps[0].ToString(); err != nil || v != "1" { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[1].ToString(); err != nil || v != "OK" { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[2].ToString(); err != nil || v != "QUEUED" { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[3].ToString(); err != nil || v != "QUEUED" { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[4].AsStrSlice(); err != nil || !reflect.DeepEqual(v, []string{"2", "3"}) { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[5].ToString(); err != nil || v != "4" { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[6].ToString(); err != nil || v != "OK" { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[7].ToString(); err != nil || v != "QUEUED" { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[8].ToString(); err != nil || v != "QUEUED" { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[9].AsStrSlice(); err != nil || !reflect.DeepEqual(v, []string{"5", "6"}) { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[10].ToString(); err != nil || v != "7" { + t.Fatalf("unexpected resp %v %v", v, err) + } + }) + + t.Run("slot moved DoMulti transactions mixed", func(t *testing.T) { + var count int64 + client, err := newClusterClient( + &ClientOption{InitAddress: []string{":0"}}, + func(dst string, opt *ClientOption) conn { + return &mockConn{DoFn: func(cmd Completed) RedisResult { + return slotsMultiResp + }, DoMultiFn: func(multi ...Completed) *redisresults { + switch atomic.AddInt64(&count, 1) { + case 1: + return &redisresults{s: []RedisResult{ + newResult(RedisMessage{typ: '+', string: "1"}, nil), + newResult(RedisMessage{typ: '+', string: "OK"}, nil), + newResult(RedisMessage{typ: '-', string: "MOVED 0 :1"}, nil), + newResult(RedisMessage{typ: '-', string: "MOVED 0 :1"}, nil), + newResult(RedisMessage{typ: '-', string: "EXECABORT"}, nil), + newResult(RedisMessage{typ: '-', string: "MOVED 0 :1"}, nil), + newResult(RedisMessage{typ: '+', string: "OK"}, nil), + newResult(RedisMessage{typ: '-', string: "MOVED 0 :1"}, nil), + newResult(RedisMessage{typ: '-', string: "MOVED 0 :1"}, nil), + newResult(RedisMessage{typ: '-', string: "EXECABORT"}, nil), + newResult(RedisMessage{typ: '+', string: "7"}, nil), + }} + case 2: + return &redisresults{s: []RedisResult{ + newResult(RedisMessage{typ: '+', string: "OK"}, nil), + newResult(RedisMessage{typ: '+', string: "QUEUED"}, nil), + newResult(RedisMessage{typ: '+', string: "QUEUED"}, nil), + newResult(RedisMessage{typ: '*', values: []RedisMessage{ + {typ: '+', string: "2"}, + {typ: '+', string: "3"}, + }}, nil), + newResult(RedisMessage{typ: '+', string: "4"}, nil), + newResult(RedisMessage{typ: '+', string: "OK"}, nil), + newResult(RedisMessage{typ: '+', string: "QUEUED"}, nil), + newResult(RedisMessage{typ: '+', string: "QUEUED"}, nil), + newResult(RedisMessage{typ: '*', values: []RedisMessage{ + {typ: '+', string: "5"}, + {typ: '+', string: "6"}, + }}, nil), + }} + } + return nil + }} + }, + newRetryer(defaultRetryDelayFn), + ) + if err != nil { + t.Fatalf("unexpected err %v", err) + } + resps := client.DoMulti( + context.Background(), + client.B().Get().Key("1{t}").Build(), + client.B().Multi().Build(), + client.B().Get().Key("2{t}").Build(), + client.B().Get().Key("3{t}").Build(), + client.B().Exec().Build(), + client.B().Get().Key("4{t}").Build(), + client.B().Multi().Build(), + client.B().Get().Key("5{t}").Build(), + client.B().Get().Key("6{t}").Build(), + client.B().Exec().Build(), + client.B().Get().Key("7{t}").Build(), + ) + if v, err := resps[0].ToString(); err != nil || v != "1" { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[1].ToString(); err != nil || v != "OK" { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[2].ToString(); err != nil || v != "QUEUED" { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[3].ToString(); err != nil || v != "QUEUED" { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[4].AsStrSlice(); err != nil || !reflect.DeepEqual(v, []string{"2", "3"}) { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[5].ToString(); err != nil || v != "4" { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[6].ToString(); err != nil || v != "OK" { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[7].ToString(); err != nil || v != "QUEUED" { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[8].ToString(); err != nil || v != "QUEUED" { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[9].AsStrSlice(); err != nil || !reflect.DeepEqual(v, []string{"5", "6"}) { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[10].ToString(); err != nil || v != "7" { + t.Fatalf("unexpected resp %v %v", v, err) + } + }) + + t.Run("slot moved DoMulti transactions edge cases 1", func(t *testing.T) { + var count int64 + client, err := newClusterClient( + &ClientOption{InitAddress: []string{":0"}}, + func(dst string, opt *ClientOption) conn { + return &mockConn{DoFn: func(cmd Completed) RedisResult { + return slotsMultiResp + }, DoMultiFn: func(multi ...Completed) *redisresults { + switch atomic.AddInt64(&count, 1) { + case 1: + return &redisresults{s: []RedisResult{ + newResult(RedisMessage{typ: '+', string: "1"}, nil), + newResult(RedisMessage{typ: '+', string: "OK"}, nil), + newResult(RedisMessage{typ: '-', string: "ERR Command not allowed inside a transaction"}, nil), + newResult(RedisMessage{typ: '-', string: "MOVED 0 :1"}, nil), + newResult(RedisMessage{typ: '-', string: "MOVED 0 :1"}, nil), + newResult(RedisMessage{typ: '-', string: "EXECABORT"}, nil), + newResult(RedisMessage{typ: '-', string: "MOVED 0 :1"}, nil), + }} + case 2: + return &redisresults{s: []RedisResult{ + newResult(RedisMessage{typ: '+', string: "4"}, nil), + }} + } + return nil + }} + }, + newRetryer(defaultRetryDelayFn), + ) + if err != nil { + t.Fatalf("unexpected err %v", err) + } + resps := client.DoMulti( + context.Background(), + client.B().Get().Key("1{t}").Build(), + client.B().Multi().Build(), + client.B().Multi().Build(), // nested multi + client.B().Get().Key("2{t}").Build(), + client.B().Get().Key("3{t}").Build(), + client.B().Exec().Build(), + client.B().Get().Key("4{t}").Build(), + ) + if v, err := resps[0].ToString(); err != nil || v != "1" { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[1].ToString(); err != nil || v != "OK" { + t.Fatalf("unexpected resp %v %v", v, err) + } + if err := resps[2].Error(); err == nil || !strings.Contains(err.Error(), "Command not allowed inside a transaction") { + t.Fatalf("unexpected err %v", err) + } + if err := resps[3].Error(); err == nil || !strings.Contains(err.Error(), "MOVED 0 :1") { + t.Fatalf("unexpected err %v", err) + } + if err := resps[4].Error(); err == nil || !strings.Contains(err.Error(), "MOVED 0 :1") { + t.Fatalf("unexpected err %v", err) + } + if err := resps[5].Error(); err == nil || !strings.Contains(err.Error(), "EXECABORT") { + t.Fatalf("unexpected err %v", err) + } + if v, err := resps[6].ToString(); err != nil || v != "4" { + t.Fatalf("unexpected resp %v %v", v, err) + } + }) + + t.Run("slot moved DoMulti transactions edge cases 2", func(t *testing.T) { + var count int64 + client, err := newClusterClient( + &ClientOption{InitAddress: []string{":0"}}, + func(dst string, opt *ClientOption) conn { + return &mockConn{DoFn: func(cmd Completed) RedisResult { + return slotsMultiResp + }, DoMultiFn: func(multi ...Completed) *redisresults { + switch atomic.AddInt64(&count, 1) { + case 1: + return &redisresults{s: []RedisResult{ + newResult(RedisMessage{typ: '+', string: "1"}, nil), + newResult(RedisMessage{typ: '+', string: "OK"}, nil), + newResult(RedisMessage{typ: '-', string: "MOVED 0 :1"}, nil), + newResult(RedisMessage{typ: '-', string: "ERR Command not allowed inside a transaction"}, nil), + newResult(RedisMessage{typ: '-', string: "MOVED 0 :1"}, nil), + newResult(RedisMessage{typ: '-', string: "EXECABORT"}, nil), + newResult(RedisMessage{typ: '-', string: "MOVED 0 :1"}, nil), + }} + case 2: + return &redisresults{s: []RedisResult{ + newResult(RedisMessage{typ: '+', string: "4"}, nil), + }} + } + return nil + }} + }, + newRetryer(defaultRetryDelayFn), + ) + if err != nil { + t.Fatalf("unexpected err %v", err) + } + resps := client.DoMulti( + context.Background(), + client.B().Get().Key("1{t}").Build(), + client.B().Multi().Build(), + client.B().Get().Key("2{t}").Build(), + client.B().Multi().Build(), // nested multi + client.B().Get().Key("3{t}").Build(), + client.B().Exec().Build(), + client.B().Get().Key("4{t}").Build(), + ) + if v, err := resps[0].ToString(); err != nil || v != "1" { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[1].ToString(); err != nil || v != "OK" { + t.Fatalf("unexpected resp %v %v", v, err) + } + if err := resps[2].Error(); err == nil || !strings.Contains(err.Error(), "MOVED 0 :1") { + t.Fatalf("unexpected err %v", err) + } + if err := resps[3].Error(); err == nil || !strings.Contains(err.Error(), "Command not allowed inside a transaction") { + t.Fatalf("unexpected err %v", err) + } + if err := resps[4].Error(); err == nil || !strings.Contains(err.Error(), "MOVED 0 :1") { + t.Fatalf("unexpected err %v", err) + } + if err := resps[5].Error(); err == nil || !strings.Contains(err.Error(), "EXECABORT") { + t.Fatalf("unexpected err %v", err) + } + if v, err := resps[6].ToString(); err != nil || v != "4" { + t.Fatalf("unexpected resp %v %v", v, err) + } + }) + + t.Run("slot moved DoMulti transactions edge cases 3", func(t *testing.T) { + var count int64 + client, err := newClusterClient( + &ClientOption{InitAddress: []string{":0"}}, + func(dst string, opt *ClientOption) conn { + return &mockConn{DoFn: func(cmd Completed) RedisResult { + return slotsMultiResp + }, DoMultiFn: func(multi ...Completed) *redisresults { + switch atomic.AddInt64(&count, 1) { + case 1: + return &redisresults{s: []RedisResult{ + newResult(RedisMessage{typ: '+', string: "1"}, nil), + newResult(RedisMessage{typ: '+', string: "OK"}, nil), + newResult(RedisMessage{typ: '-', string: "MOVED 0 :1"}, nil), + newResult(RedisMessage{typ: '-', string: "ERR Command not allowed inside a transaction"}, nil), + newResult(RedisMessage{typ: '-', string: "MOVED 0 :1"}, nil), + }} + case 2: + fmt.Println(multi) + } + return nil + }} + }, + newRetryer(defaultRetryDelayFn), + ) + if err != nil { + t.Fatalf("unexpected err %v", err) + } + resps := client.DoMulti( + context.Background(), + client.B().Get().Key("1{t}").Build(), + client.B().Multi().Build(), + client.B().Get().Key("2{t}").Build(), + client.B().Multi().Build(), // nested multi + client.B().Get().Key("3{t}").Build(), + ) + if v, err := resps[0].ToString(); err != nil || v != "1" { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[1].ToString(); err != nil || v != "OK" { + t.Fatalf("unexpected resp %v %v", v, err) + } + if err := resps[2].Error(); err == nil || !strings.Contains(err.Error(), "MOVED 0 :1") { + t.Fatalf("unexpected err %v", err) + } + if err := resps[3].Error(); err == nil || !strings.Contains(err.Error(), "Command not allowed inside a transaction") { + t.Fatalf("unexpected err %v", err) + } + if err := resps[4].Error(); err == nil || !strings.Contains(err.Error(), "MOVED 0 :1") { + t.Fatalf("unexpected err %v", err) + } + }) + t.Run("slot moved DoMulti (multi)", func(t *testing.T) { var count int64 client, err := newClusterClient(