diff --git a/cluster.go b/cluster.go index ecaac6a3..a9b4d6bf 100644 --- a/cluster.go +++ b/cluster.go @@ -858,14 +858,26 @@ func (c *clusterClient) DoCache(ctx context.Context, cmd Cacheable, ttl time.Dur } func askingMulti(cc conn, ctx context.Context, multi []Completed) *redisresults { + var inTx bool commands := make([]Completed, 0, len(multi)*2) for _, cmd := range multi { - commands = append(commands, cmds.AskingCmd, cmd) + if inTx { + commands = append(commands, cmd) + } else { + commands = append(commands, cmds.AskingCmd, cmd) + } + if !inTx && isMulti(cmd) { + inTx = true + } else if inTx && isExec(cmd) { + inTx = false + } } results := resultsp.Get(0, len(multi)) resps := cc.DoMulti(ctx, commands...) - for i := 1; i < len(resps.s); i += 2 { - results.s = append(results.s, resps.s[i]) + for i, resp := range resps.s { + if commands[i] != cmds.AskingCmd { + results.s = append(results.s, resp) + } } resultsp.Put(resps) return results diff --git a/cluster_test.go b/cluster_test.go index 90053fb5..ee768432 100644 --- a/cluster_test.go +++ b/cluster_test.go @@ -4206,6 +4206,106 @@ func TestClusterClientErr(t *testing.T) { } }) + t.Run("slot moved DoMulti transactions ASKING", func(t *testing.T) { + var count int64 + client, err := newClusterClient( + &ClientOption{InitAddress: []string{":0"}}, + func(dst string, opt *ClientOption) conn { + return &mockConn{DoFn: func(cmd Completed) RedisResult { + return slotsMultiResp + }, DoMultiFn: func(multi ...Completed) *redisresults { + switch atomic.AddInt64(&count, 1) { + case 1: + return &redisresults{s: []RedisResult{ + newResult(RedisMessage{typ: '+', string: "1"}, nil), + newResult(RedisMessage{typ: '+', string: "OK"}, nil), + newResult(RedisMessage{typ: '+', string: "QUEUED"}, nil), + newResult(RedisMessage{typ: '-', string: "ASK 0 :1"}, nil), + newResult(RedisMessage{typ: '-', string: "EXECABORT"}, nil), + newResult(RedisMessage{typ: '+', string: "4"}, nil), + newResult(RedisMessage{typ: '+', string: "OK"}, nil), + newResult(RedisMessage{typ: '-', string: "ASK 0 :1"}, nil), + newResult(RedisMessage{typ: '+', string: "QUEUED"}, nil), + newResult(RedisMessage{typ: '-', string: "EXECABORT"}, nil), + newResult(RedisMessage{typ: '+', string: "7"}, nil), + }} + case 2: + return &redisresults{s: []RedisResult{ + newResult(RedisMessage{typ: '+', string: "OK"}, nil), + newResult(RedisMessage{typ: '+', string: "OK"}, nil), + newResult(RedisMessage{typ: '+', string: "QUEUED"}, nil), + newResult(RedisMessage{typ: '+', string: "QUEUED"}, nil), + newResult(RedisMessage{typ: '*', values: []RedisMessage{ + {typ: '+', string: "2"}, + {typ: '+', string: "3"}, + }}, nil), + newResult(RedisMessage{typ: '+', string: "OK"}, nil), + newResult(RedisMessage{typ: '+', string: "OK"}, nil), + newResult(RedisMessage{typ: '+', string: "QUEUED"}, nil), + newResult(RedisMessage{typ: '+', string: "QUEUED"}, nil), + newResult(RedisMessage{typ: '*', values: []RedisMessage{ + {typ: '+', string: "5"}, + {typ: '+', string: "6"}, + }}, nil), + }} + } + return nil + }} + }, + newRetryer(defaultRetryDelayFn), + ) + if err != nil { + t.Fatalf("unexpected err %v", err) + } + resps := client.DoMulti( + context.Background(), + client.B().Get().Key("1{t}").Build(), + client.B().Multi().Build(), + client.B().Get().Key("2{t}").Build(), + client.B().Get().Key("3{t}").Build(), + client.B().Exec().Build(), + client.B().Get().Key("4{t}").Build(), + client.B().Multi().Build(), + client.B().Get().Key("5{t}").Build(), + client.B().Get().Key("6{t}").Build(), + client.B().Exec().Build(), + client.B().Get().Key("7{t}").Build(), + ) + if v, err := resps[0].ToString(); err != nil || v != "1" { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[1].ToString(); err != nil || v != "OK" { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[2].ToString(); err != nil || v != "QUEUED" { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[3].ToString(); err != nil || v != "QUEUED" { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[4].AsStrSlice(); err != nil || !reflect.DeepEqual(v, []string{"2", "3"}) { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[5].ToString(); err != nil || v != "4" { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[6].ToString(); err != nil || v != "OK" { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[7].ToString(); err != nil || v != "QUEUED" { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[8].ToString(); err != nil || v != "QUEUED" { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[9].AsStrSlice(); err != nil || !reflect.DeepEqual(v, []string{"5", "6"}) { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[10].ToString(); err != nil || v != "7" { + t.Fatalf("unexpected resp %v %v", v, err) + } + }) + t.Run("slot moved DoMulti except transactions", func(t *testing.T) { var count int64 client, err := newClusterClient( @@ -4299,6 +4399,102 @@ func TestClusterClientErr(t *testing.T) { } }) + t.Run("slot moved DoMulti except transactions ASKING", func(t *testing.T) { + var count int64 + client, err := newClusterClient( + &ClientOption{InitAddress: []string{":0"}}, + func(dst string, opt *ClientOption) conn { + return &mockConn{DoFn: func(cmd Completed) RedisResult { + return slotsMultiResp + }, DoMultiFn: func(multi ...Completed) *redisresults { + switch atomic.AddInt64(&count, 1) { + case 1: + return &redisresults{s: []RedisResult{ + newResult(RedisMessage{typ: '-', string: "ASK 0 :1"}, nil), + newResult(RedisMessage{typ: '+', string: "OK"}, nil), + newResult(RedisMessage{typ: '+', string: "QUEUED"}, nil), + newResult(RedisMessage{typ: '+', string: "QUEUED"}, nil), + newResult(RedisMessage{typ: '*', values: []RedisMessage{ + {typ: '+', string: "2"}, + {typ: '+', string: "3"}, + }}, nil), + newResult(RedisMessage{typ: '-', string: "ASK 0 :1"}, nil), + newResult(RedisMessage{typ: '+', string: "OK"}, nil), + newResult(RedisMessage{typ: '+', string: "QUEUED"}, nil), + newResult(RedisMessage{typ: '+', string: "QUEUED"}, nil), + newResult(RedisMessage{typ: '*', values: []RedisMessage{ + {typ: '+', string: "5"}, + {typ: '+', string: "6"}, + }}, nil), + newResult(RedisMessage{typ: '-', string: "ASK 0 :1"}, nil), + }} + case 2: + return &redisresults{s: []RedisResult{ + newResult(RedisMessage{typ: '+', string: "OK"}, nil), + newResult(RedisMessage{typ: '+', string: "1"}, nil), + newResult(RedisMessage{typ: '+', string: "OK"}, nil), + newResult(RedisMessage{typ: '+', string: "4"}, nil), + newResult(RedisMessage{typ: '+', string: "OK"}, nil), + newResult(RedisMessage{typ: '+', string: "7"}, nil), + }} + } + return nil + }} + }, + newRetryer(defaultRetryDelayFn), + ) + if err != nil { + t.Fatalf("unexpected err %v", err) + } + resps := client.DoMulti( + context.Background(), + client.B().Get().Key("1{t}").Build(), + client.B().Multi().Build(), + client.B().Get().Key("2{t}").Build(), + client.B().Get().Key("3{t}").Build(), + client.B().Exec().Build(), + client.B().Get().Key("4{t}").Build(), + client.B().Multi().Build(), + client.B().Get().Key("5{t}").Build(), + client.B().Get().Key("6{t}").Build(), + client.B().Exec().Build(), + client.B().Get().Key("7{t}").Build(), + ) + if v, err := resps[0].ToString(); err != nil || v != "1" { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[1].ToString(); err != nil || v != "OK" { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[2].ToString(); err != nil || v != "QUEUED" { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[3].ToString(); err != nil || v != "QUEUED" { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[4].AsStrSlice(); err != nil || !reflect.DeepEqual(v, []string{"2", "3"}) { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[5].ToString(); err != nil || v != "4" { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[6].ToString(); err != nil || v != "OK" { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[7].ToString(); err != nil || v != "QUEUED" { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[8].ToString(); err != nil || v != "QUEUED" { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[9].AsStrSlice(); err != nil || !reflect.DeepEqual(v, []string{"5", "6"}) { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[10].ToString(); err != nil || v != "7" { + t.Fatalf("unexpected resp %v %v", v, err) + } + }) + t.Run("slot moved DoMulti transactions mixed", func(t *testing.T) { var count int64 client, err := newClusterClient( @@ -4398,6 +4594,108 @@ func TestClusterClientErr(t *testing.T) { } }) + t.Run("slot moved DoMulti transactions mixed ASKING", func(t *testing.T) { + var count int64 + client, err := newClusterClient( + &ClientOption{InitAddress: []string{":0"}}, + func(dst string, opt *ClientOption) conn { + return &mockConn{DoFn: func(cmd Completed) RedisResult { + return slotsMultiResp + }, DoMultiFn: func(multi ...Completed) *redisresults { + switch atomic.AddInt64(&count, 1) { + case 1: + return &redisresults{s: []RedisResult{ + newResult(RedisMessage{typ: '+', string: "1"}, nil), + newResult(RedisMessage{typ: '+', string: "OK"}, nil), + newResult(RedisMessage{typ: '-', string: "ASK 0 :1"}, nil), + newResult(RedisMessage{typ: '-', string: "ASK 0 :1"}, nil), + newResult(RedisMessage{typ: '-', string: "EXECABORT"}, nil), + newResult(RedisMessage{typ: '-', string: "ASK 0 :1"}, nil), + newResult(RedisMessage{typ: '+', string: "OK"}, nil), + newResult(RedisMessage{typ: '-', string: "ASK 0 :1"}, nil), + newResult(RedisMessage{typ: '-', string: "ASK 0 :1"}, nil), + newResult(RedisMessage{typ: '-', string: "EXECABORT"}, nil), + newResult(RedisMessage{typ: '+', string: "7"}, nil), + }} + case 2: + return &redisresults{s: []RedisResult{ + newResult(RedisMessage{typ: '+', string: "OK"}, nil), + newResult(RedisMessage{typ: '+', string: "OK"}, nil), + newResult(RedisMessage{typ: '+', string: "QUEUED"}, nil), + newResult(RedisMessage{typ: '+', string: "QUEUED"}, nil), + newResult(RedisMessage{typ: '*', values: []RedisMessage{ + {typ: '+', string: "2"}, + {typ: '+', string: "3"}, + }}, nil), + newResult(RedisMessage{typ: '+', string: "OK"}, nil), + newResult(RedisMessage{typ: '+', string: "4"}, nil), + newResult(RedisMessage{typ: '+', string: "OK"}, nil), + newResult(RedisMessage{typ: '+', string: "OK"}, nil), + newResult(RedisMessage{typ: '+', string: "QUEUED"}, nil), + newResult(RedisMessage{typ: '+', string: "QUEUED"}, nil), + newResult(RedisMessage{typ: '*', values: []RedisMessage{ + {typ: '+', string: "5"}, + {typ: '+', string: "6"}, + }}, nil), + }} + } + return nil + }} + }, + newRetryer(defaultRetryDelayFn), + ) + if err != nil { + t.Fatalf("unexpected err %v", err) + } + resps := client.DoMulti( + context.Background(), + client.B().Get().Key("1{t}").Build(), + client.B().Multi().Build(), + client.B().Get().Key("2{t}").Build(), + client.B().Get().Key("3{t}").Build(), + client.B().Exec().Build(), + client.B().Get().Key("4{t}").Build(), + client.B().Multi().Build(), + client.B().Get().Key("5{t}").Build(), + client.B().Get().Key("6{t}").Build(), + client.B().Exec().Build(), + client.B().Get().Key("7{t}").Build(), + ) + if v, err := resps[0].ToString(); err != nil || v != "1" { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[1].ToString(); err != nil || v != "OK" { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[2].ToString(); err != nil || v != "QUEUED" { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[3].ToString(); err != nil || v != "QUEUED" { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[4].AsStrSlice(); err != nil || !reflect.DeepEqual(v, []string{"2", "3"}) { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[5].ToString(); err != nil || v != "4" { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[6].ToString(); err != nil || v != "OK" { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[7].ToString(); err != nil || v != "QUEUED" { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[8].ToString(); err != nil || v != "QUEUED" { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[9].AsStrSlice(); err != nil || !reflect.DeepEqual(v, []string{"5", "6"}) { + t.Fatalf("unexpected resp %v %v", v, err) + } + if v, err := resps[10].ToString(); err != nil || v != "7" { + t.Fatalf("unexpected resp %v %v", v, err) + } + }) + t.Run("slot moved DoMulti transactions edge cases 1", func(t *testing.T) { var count int64 client, err := newClusterClient( @@ -4545,8 +4843,6 @@ func TestClusterClientErr(t *testing.T) { newResult(RedisMessage{typ: '-', string: "ERR Command not allowed inside a transaction"}, nil), newResult(RedisMessage{typ: '-', string: "MOVED 0 :1"}, nil), }} - case 2: - fmt.Println(multi) } return nil }}