From 3185bbc5c8af65c1bd23d39ac97d13e64df248d0 Mon Sep 17 00:00:00 2001 From: proost Date: Wed, 11 Dec 2024 23:26:54 +0900 Subject: [PATCH 1/8] feat: add reader node selector --- cluster.go | 38 ++++++++- cluster_test.go | 221 ++++++++++++++++++++++++++++++++++++++++++++++++ rueidis.go | 14 +++ rueidis_test.go | 50 +++++++++++ 4 files changed, 321 insertions(+), 2 deletions(-) diff --git a/cluster.go b/cluster.go index 324507b5..9e28e7ca 100644 --- a/cluster.go +++ b/cluster.go @@ -18,6 +18,8 @@ import ( var ErrNoSlot = errors.New("the slot has no redis node") var ErrReplicaOnlyConflict = errors.New("ReplicaOnly conflicts with SendToReplicas option") var ErrInvalidShardsRefreshInterval = errors.New("ShardsRefreshInterval must be greater than or equal to 0") +var ErrReplicaOnlyConflictWithReaderNodeSelector = errors.New("ReplicaOnly conflicts with ReaderNodeSelector option") +var ErrSendToReplicasConflictWithReaderNodeSelector = errors.New("SendToReplicas conflicts with ReadPreference option") type clusterClient struct { pslots [16384]conn @@ -41,6 +43,14 @@ type connrole struct { replica bool } +var sendToReader = func(cmd Completed) bool { + return cmd.IsReadOnly() +} + +var replicaOnlySelector = func(_ uint16, replicas []ReplicaInfo) int { + return util.FastRand(len(replicas)) +} + func newClusterClient(opt *ClientOption, connFn connFn, retryer retryHandler) (*clusterClient, error) { client := &clusterClient{ cmd: cmds.NewBuilder(cmds.InitSlot), @@ -55,8 +65,20 @@ func newClusterClient(opt *ClientOption, connFn connFn, retryer retryHandler) (* if opt.ReplicaOnly && opt.SendToReplicas != nil { return nil, ErrReplicaOnlyConflict } + if opt.ReplicaOnly && opt.ReaderNodeSelector != nil { + return nil, ErrReplicaOnlyConflictWithReaderNodeSelector + } + if opt.SendToReplicas != nil && opt.ReaderNodeSelector != nil { + return nil, ErrSendToReplicasConflictWithReaderNodeSelector + } if opt.SendToReplicas != nil { + opt.ReaderNodeSelector = replicaOnlySelector + } else if opt.ReaderNodeSelector != nil { + opt.SendToReplicas = sendToReader + } + + if opt.SendToReplicas != nil || opt.ReaderNodeSelector != nil { rOpt := *opt rOpt.ReplicaOnly = true client.rOpt = &rOpt @@ -236,15 +258,27 @@ func (c *clusterClient) _refresh() (err error) { pslots[i] = conns[g.nodes[1+util.FastRand(nodesCount-1)]].conn } } - case c.rOpt != nil: // implies c.opt.SendToReplicas != nil + case c.rOpt != nil: if len(rslots) == 0 { // lazy init rslots = make([]conn, 16384) } if len(g.nodes) > 1 { + n := len(g.nodes) - 1 + replicas := make([]ReplicaInfo, 0, n) + for _, addr := range g.nodes[1:] { + replicas = append(replicas, ReplicaInfo{Addr: addr}) + } + for _, slot := range g.slots { for i := slot[0]; i <= slot[1]; i++ { pslots[i] = conns[master].conn - rslots[i] = conns[g.nodes[1+util.FastRand(len(g.nodes)-1)]].conn + + rIndex := c.opt.ReaderNodeSelector(uint16(i), replicas) + if rIndex >= 0 && rIndex < n { + rslots[i] = conns[g.nodes[1+rIndex]].conn + } else { + rslots[i] = conns[master].conn + } } } } else { diff --git a/cluster_test.go b/cluster_test.go index 82bf2a6b..27fd97d2 100644 --- a/cluster_test.go +++ b/cluster_test.go @@ -1117,6 +1117,227 @@ func TestClusterClientInit(t *testing.T) { t.Fatalf("unexpected err %v", err) } }) + + t.Run("Refresh cluster which has only primary node per shard with ReaderNodeSelector option", func(t *testing.T) { + m := &mockConn{ + DoFn: func(cmd Completed) RedisResult { + if strings.Join(cmd.Commands(), " ") == "CLUSTER SLOTS" { + return slotsMultiRespWithoutReplicas + } + return RedisResult{} + }, + } + + client, err := newClusterClient( + &ClientOption{ + InitAddress: []string{"127.0.0.1:0"}, + ReaderNodeSelector: func(slot uint16, replicas []ReplicaInfo) int { + return 0 + }, + }, + func(dst string, opt *ClientOption) conn { + copiedM := *m + return &copiedM + }, + newRetryer(defaultRetryDelayFn), + ) + if err != nil { + t.Fatalf("unexpected err %v", err) + } + + if client.pslots[0] != client.conns["127.0.0.1:0"].conn { + t.Fatalf("unexpected node assigned to pslot 0") + } + if client.pslots[8192] != client.conns["127.0.0.1:0"].conn { + t.Fatalf("unexpected node assigned to pslot 8192") + } + if client.pslots[8193] != client.conns["127.0.1.1:0"].conn { + t.Fatalf("unexpected node assigned to pslot 8193") + } + if client.pslots[16383] != client.conns["127.0.1.1:0"].conn { + t.Fatalf("unexpected node assigned to pslot 16383") + } + if client.rslots[0] != client.conns["127.0.0.1:0"].conn { + t.Fatalf("unexpected node assigned to rslot 0") + } + if client.rslots[8192] != client.conns["127.0.0.1:0"].conn { + t.Fatalf("unexpected node assigned to rslot 8192") + } + if client.rslots[8193] != client.conns["127.0.1.1:0"].conn { + t.Fatalf("unexpected node assigned to rslot 8193") + } + if client.rslots[16383] != client.conns["127.0.1.1:0"].conn { + t.Fatalf("unexpected node assigned to rslot 16383") + } + }) + + t.Run("Refresh cluster which has multi replicas per shard with ReaderNodeSelector option. Returned index is within range", func(t *testing.T) { + primaryNodeConn := &mockConn{ + DoFn: func(cmd Completed) RedisResult { + if strings.Join(cmd.Commands(), " ") == "CLUSTER SLOTS" { + return slotsMultiRespWithMultiReplicas + } + return RedisResult{ + err: errors.New("unexpected call"), + } + }, + } + replicaNodeConn1 := &mockConn{ + DoFn: func(cmd Completed) RedisResult { + return RedisResult{ + err: errors.New("unexpected call"), + } + }, + } + replicaNodeConn2 := &mockConn{ + DoFn: func(cmd Completed) RedisResult { + return RedisResult{ + err: errors.New("unexpected call"), + } + }, + } + replicaNodeConn3 := &mockConn{ + DoFn: func(cmd Completed) RedisResult { + return RedisResult{ + err: errors.New("unexpected call"), + } + }, + } + + client, err := newClusterClient( + &ClientOption{ + InitAddress: []string{"127.0.0.1:0"}, + ReaderNodeSelector: func(slot uint16, replicas []ReplicaInfo) int { + return 1 + }, + }, + func(dst string, opt *ClientOption) conn { + switch { + case dst == "127.0.0.2:1" || dst == "127.0.1.2:1": + return replicaNodeConn1 + case dst == "127.0.0.3:2" || dst == "127.0.1.3:2": + return replicaNodeConn2 + case dst == "127.0.0.4:3" || dst == "127.0.1.4:3": + return replicaNodeConn3 + default: + return primaryNodeConn + } + }, + newRetryer(defaultRetryDelayFn), + ) + if err != nil { + t.Fatalf("unexpected err %v", err) + } + + if client.pslots[0] != primaryNodeConn { + t.Fatalf("unexpected node assigned to pslot 0") + } + if client.pslots[8192] != primaryNodeConn { + t.Fatalf("unexpected node assigned to pslot 8192") + } + if client.pslots[8193] != primaryNodeConn { + t.Fatalf("unexpected node assigned to pslot 8193") + } + if client.pslots[16383] != primaryNodeConn { + t.Fatalf("unexpected node assigned to pslot 16383") + } + if client.rslots[0] != replicaNodeConn2 { + t.Fatalf("unexpected node assigned to rslot 0") + } + if client.rslots[8192] != replicaNodeConn2 { + t.Fatalf("unexpected node assigned to rslot 8192") + } + if client.rslots[8193] != replicaNodeConn2 { + t.Fatalf("unexpected node assigned to rslot 8193") + } + if client.rslots[16383] != replicaNodeConn2 { + t.Fatalf("unexpected node assigned to rslot 16383") + } + }) + + t.Run("Refresh cluster which has multi replicas per shard with ReaderNodeSelector option. Returned index is out of range", func(t *testing.T) { + primaryNodeConn := &mockConn{ + DoFn: func(cmd Completed) RedisResult { + if strings.Join(cmd.Commands(), " ") == "CLUSTER SLOTS" { + return slotsMultiRespWithMultiReplicas + } + return RedisResult{ + err: errors.New("unexpected call"), + } + }, + } + replicaNodeConn1 := &mockConn{ + DoFn: func(cmd Completed) RedisResult { + return RedisResult{ + err: errors.New("unexpected call"), + } + }, + } + replicaNodeConn2 := &mockConn{ + DoFn: func(cmd Completed) RedisResult { + return RedisResult{ + err: errors.New("unexpected call"), + } + }, + } + replicaNodeConn3 := &mockConn{ + DoFn: func(cmd Completed) RedisResult { + return RedisResult{ + err: errors.New("unexpected call"), + } + }, + } + + client, err := newClusterClient( + &ClientOption{ + InitAddress: []string{"127.0.0.1:0"}, + ReaderNodeSelector: func(slot uint16, replicas []ReplicaInfo) int { + return -1 + }, + }, + func(dst string, opt *ClientOption) conn { + switch { + case dst == "127.0.0.2:1" || dst == "127.0.1.2:1": + return replicaNodeConn1 + case dst == "127.0.0.3:2" || dst == "127.0.1.3:2": + return replicaNodeConn2 + case dst == "127.0.0.4:3" || dst == "127.0.1.4:3": + return replicaNodeConn3 + default: + return primaryNodeConn + } + }, + newRetryer(defaultRetryDelayFn), + ) + if err != nil { + t.Fatalf("unexpected err %v", err) + } + + if client.pslots[0] != primaryNodeConn { + t.Fatalf("unexpected node assigned to pslot 0") + } + if client.pslots[8192] != primaryNodeConn { + t.Fatalf("unexpected node assigned to pslot 8192") + } + if client.pslots[8193] != primaryNodeConn { + t.Fatalf("unexpected node assigned to pslot 8193") + } + if client.pslots[16383] != primaryNodeConn { + t.Fatalf("unexpected node assigned to pslot 16383") + } + if client.rslots[0] != primaryNodeConn { + t.Fatalf("unexpected node assigned to rslot 0") + } + if client.rslots[8192] != primaryNodeConn { + t.Fatalf("unexpected node assigned to rslot 8192") + } + if client.rslots[8193] != primaryNodeConn { + t.Fatalf("unexpected node assigned to rslot 8193") + } + if client.rslots[16383] != primaryNodeConn { + t.Fatalf("unexpected node assigned to rslot 16383") + } + }) } //gocyclo:ignore diff --git a/rueidis.go b/rueidis.go index 2c6c9ace..0f51cd86 100644 --- a/rueidis.go +++ b/rueidis.go @@ -208,6 +208,15 @@ type ClientOption struct { // ClusterOption is the options for the redis cluster client. ClusterOption ClusterOption + + // ReaderNodeSelector selects a reader node to send read-only commands to. + // Returned value is the index of the replica node in the replicas slice. + // If the returned value is out of range, the primary node will be selected. + // If replica nodes are empty, the primary node will be selected and + // the function will not be called. + // NOTE: This function can't be used with ReplicaOnly option. + // NOTE: This function can't be used with SendToReplicas option. + ReaderNodeSelector func(slot uint16, replicas []ReplicaInfo) int } // SentinelOption contains MasterSet, @@ -234,6 +243,11 @@ type ClusterOption struct { ShardsRefreshInterval time.Duration } +// ReplicaInfo is the information of a replica node in a redis cluster. +type ReplicaInfo struct { + Addr string +} + // Client is the redis client interface for both single redis instance and redis cluster. It should be created from the NewClient() type Client interface { CoreClient diff --git a/rueidis_test.go b/rueidis_test.go index e644b232..6ee4c763 100644 --- a/rueidis_test.go +++ b/rueidis_test.go @@ -156,6 +156,56 @@ func TestNewClusterClientError(t *testing.T) { t.Errorf("unexpected error %v", err) } }) + + t.Run("replica only and reader node selector option conflict", func(t *testing.T) { + ln, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + defer ln.Close() + + _, port, _ := net.SplitHostPort(ln.Addr().String()) + client, err := NewClient(ClientOption{ + InitAddress: []string{"127.0.0.1:" + port}, + ReplicaOnly: true, + ReaderNodeSelector: func(slot uint16, replicas []ReplicaInfo) int { + return 0 + }, + }) + if client != nil || err == nil { + t.Errorf("unexpected return %v %v", client, err) + } + + if !strings.Contains(err.Error(), ErrReplicaOnlyConflictWithReaderNodeSelector.Error()) { + t.Errorf("unexpected error %v", err) + } + }) + + t.Run("send to replicas and reader node selector option conflict", func(t *testing.T) { + ln, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + defer ln.Close() + + _, port, _ := net.SplitHostPort(ln.Addr().String()) + client, err := NewClient(ClientOption{ + InitAddress: []string{"127.0.0.1:" + port}, + SendToReplicas: func(cmd Completed) bool { + return true + }, + ReaderNodeSelector: func(slot uint16, replicas []ReplicaInfo) int { + return 0 + }, + }) + if client != nil || err == nil { + t.Errorf("unexpected return %v %v", client, err) + } + + if !strings.Contains(err.Error(), ErrSendToReplicasConflictWithReaderNodeSelector.Error()) { + t.Errorf("unexpected error %v", err) + } + }) } func TestFallBackSingleClient(t *testing.T) { From 9499b9bb40de3fbf2fe70dd61066afdb87e7edc0 Mon Sep 17 00:00:00 2001 From: proost Date: Wed, 11 Dec 2024 23:30:16 +0900 Subject: [PATCH 2/8] doc: add comments --- rueidis.go | 1 + 1 file changed, 1 insertion(+) diff --git a/rueidis.go b/rueidis.go index 0f51cd86..be1ab979 100644 --- a/rueidis.go +++ b/rueidis.go @@ -210,6 +210,7 @@ type ClientOption struct { ClusterOption ClusterOption // ReaderNodeSelector selects a reader node to send read-only commands to. + // If the function is set, the client will send read-only commands to the selected node. // Returned value is the index of the replica node in the replicas slice. // If the returned value is out of range, the primary node will be selected. // If replica nodes are empty, the primary node will be selected and From 5261cb1b4739c52960aec28e086ad1f3098a4b3a Mon Sep 17 00:00:00 2001 From: proost Date: Wed, 11 Dec 2024 23:41:24 +0900 Subject: [PATCH 3/8] fix: wrong error message --- cluster.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cluster.go b/cluster.go index 9e28e7ca..b9613c9a 100644 --- a/cluster.go +++ b/cluster.go @@ -19,7 +19,7 @@ var ErrNoSlot = errors.New("the slot has no redis node") var ErrReplicaOnlyConflict = errors.New("ReplicaOnly conflicts with SendToReplicas option") var ErrInvalidShardsRefreshInterval = errors.New("ShardsRefreshInterval must be greater than or equal to 0") var ErrReplicaOnlyConflictWithReaderNodeSelector = errors.New("ReplicaOnly conflicts with ReaderNodeSelector option") -var ErrSendToReplicasConflictWithReaderNodeSelector = errors.New("SendToReplicas conflicts with ReadPreference option") +var ErrSendToReplicasConflictWithReaderNodeSelector = errors.New("SendToReplicas conflicts with ReaderNodeSelector option") type clusterClient struct { pslots [16384]conn From a7062fd078b11e7f779e8fec04d4f4fafe8d85cb Mon Sep 17 00:00:00 2001 From: proost Date: Wed, 11 Dec 2024 23:57:15 +0900 Subject: [PATCH 4/8] test: add send read-only command to reader node --- cluster_test.go | 594 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 594 insertions(+) diff --git a/cluster_test.go b/cluster_test.go index 27fd97d2..503655cf 100644 --- a/cluster_test.go +++ b/cluster_test.go @@ -5453,3 +5453,597 @@ func TestClusterClientLoadingRetry(t *testing.T) { } }) } + +//gocyclo:ignore +func TestClusterClient_SendReadOperationToReaderNodeWriteOperationToPrimaryNode(t *testing.T) { + defer ShouldNotLeaked(SetupLeakDetection()) + + primaryNodeConn := &mockConn{ + DoOverride: map[string]func(cmd Completed) RedisResult{ + "CLUSTER SLOTS": func(cmd Completed) RedisResult { + return slotsMultiResp + }, + "INFO": func(cmd Completed) RedisResult { + return newResult(RedisMessage{typ: '+', string: "INFO"}, nil) + }, + "SET Do V": func(cmd Completed) RedisResult { + return newResult(RedisMessage{typ: '+', string: "SET Do V"}, nil) + }, + "SET K2{a} V2{a}": func(cmd Completed) RedisResult { + return newResult(RedisMessage{typ: '+', string: "SET K2{a} V2{a}"}, nil) + }, + }, + DoMultiFn: func(multi ...Completed) *redisresults { + resps := make([]RedisResult, len(multi)) + for i, cmd := range multi { + if strings.HasPrefix(strings.Join(cmd.Commands(), " "), "SET K1") { + resps[i] = newResult(RedisMessage{typ: '+', string: strings.Join(cmd.Commands(), " ")}, nil) + continue + } + if strings.HasPrefix(strings.Join(cmd.Commands(), " "), "SET K2") { + resps[i] = newResult(RedisMessage{typ: '+', string: strings.Join(cmd.Commands(), " ")}, nil) + continue + } + if strings.HasPrefix(strings.Join(cmd.Commands(), " "), "MULTI") { + resps[i] = newResult(RedisMessage{typ: '+', string: "MULTI"}, nil) + continue + } + if strings.HasPrefix(strings.Join(cmd.Commands(), " "), "EXEC") { + resps[i] = newResult(RedisMessage{typ: '+', string: "EXEC"}, nil) + continue + } + + return &redisresults{ + s: []RedisResult{}, + } + } + return &redisresults{s: resps} + }, + } + replicaNodeConn := &mockConn{ + DoOverride: map[string]func(cmd Completed) RedisResult{ + "GET Do": func(cmd Completed) RedisResult { + return newResult(RedisMessage{typ: '+', string: "GET Do"}, nil) + }, + "GET K1{a}": func(cmd Completed) RedisResult { + return newResult(RedisMessage{typ: '+', string: "GET K1{a}"}, nil) + }, + "GET K2{a}": func(cmd Completed) RedisResult { + return newResult(RedisMessage{typ: '+', string: "GET K2{a}"}, nil) + }, + }, + DoMultiFn: func(multi ...Completed) *redisresults { + resps := make([]RedisResult, len(multi)) + for i, cmd := range multi { + if strings.HasPrefix(strings.Join(cmd.Commands(), " "), "GET K1") { + resps[i] = newResult(RedisMessage{typ: '+', string: strings.Join(cmd.Commands(), " ")}, nil) + continue + } + + return &redisresults{ + s: []RedisResult{}, + } + } + return &redisresults{s: resps} + }, + DoCacheOverride: map[string]func(cmd Cacheable, ttl time.Duration) RedisResult{ + "GET DoCache": func(cmd Cacheable, ttl time.Duration) RedisResult { + return newResult(RedisMessage{typ: '+', string: "GET DoCache"}, nil) + }, + "GET K1{a}": func(cmd Cacheable, ttl time.Duration) RedisResult { + return newResult(RedisMessage{typ: '+', string: "GET K1{a}"}, nil) + }, + "GET K2{a}": func(cmd Cacheable, ttl time.Duration) RedisResult { + return newResult(RedisMessage{typ: '+', string: "GET K2{a}"}, nil) + }, + }, + DoMultiCacheFn: func(multi ...CacheableTTL) *redisresults { + resps := make([]RedisResult, len(multi)) + for i, cmd := range multi { + if strings.HasPrefix(strings.Join(cmd.Cmd.Commands(), " "), "GET K1") { + resps[i] = newResult(RedisMessage{typ: '+', string: strings.Join(cmd.Cmd.Commands(), " ")}, nil) + continue + } + + return &redisresults{ + s: []RedisResult{}, + } + } + return &redisresults{s: resps} + }, + } + + client, err := newClusterClient( + &ClientOption{ + InitAddress: []string{"127.0.0.1:0"}, + ReaderNodeSelector: func(slot uint16, replicas []ReplicaInfo) int { + return 0 + }, + }, + func(dst string, opt *ClientOption) conn { + if dst == "127.0.0.1:0" || dst == "127.0.2.1:0" { // primary node + return primaryNodeConn + } else { // replica node + return replicaNodeConn + } + }, + newRetryer(defaultRetryDelayFn), + ) + if err != nil { + t.Fatalf("unexpected err %v", err) + } + + t.Run("Do with no slot", func(t *testing.T) { + c := client.B().Info().Build() + if v, err := client.Do(context.Background(), c).ToString(); err != nil || v != "INFO" { + t.Fatalf("unexpected response %v %v", v, err) + } + }) + + t.Run("Do read operation", func(t *testing.T) { + c := client.B().Get().Key("Do").Build() + if v, err := client.Do(context.Background(), c).ToString(); err != nil || v != "GET Do" { + t.Fatalf("unexpected response %v %v", v, err) + } + }) + + t.Run("Do write operation", func(t *testing.T) { + c := client.B().Set().Key("Do").Value("V").Build() + if v, err := client.Do(context.Background(), c).ToString(); err != nil || v != "SET Do V" { + t.Fatalf("unexpected response %v %v", v, err) + } + }) + + t.Run("DoMulti Single Slot All Read Operations", func(t *testing.T) { + c1 := client.B().Get().Key("K1{a}").Build() + c2 := client.B().Get().Key("K2{a}").Build() + resps := client.DoMulti(context.Background(), c1, c2) + if v, err := resps[0].ToString(); err != nil || v != "GET K1{a}" { + t.Fatalf("unexpected response %v %v", v, err) + } + if v, err := resps[1].ToString(); err != nil || v != "GET K2{a}" { + t.Fatalf("unexpected response %v %v", v, err) + } + }) + + t.Run("DoMulti Single Slot Read Operation And Write Operation", func(t *testing.T) { + c1 := client.B().Get().Key("K1{a}").Build() + c2 := client.B().Set().Key("K2{a}").Value("V2{a}").Build() + resps := client.DoMulti(context.Background(), c1, c2) + if v, err := resps[0].ToString(); err != nil || v != "GET K1{a}" { + t.Fatalf("unexpected response %v %v", v, err) + } + if v, err := resps[1].ToString(); err != nil || v != "SET K2{a} V2{a}" { + t.Fatalf("unexpected response %v %v", v, err) + } + }) + + t.Run("DoMulti Single Slot Operations + Init Slot", func(t *testing.T) { + c1 := client.B().Multi().Build() + c2 := client.B().Set().Key("K1{a}").Value("V1{a}").Build() + c3 := client.B().Set().Key("K2{a}").Value("V2{a}").Build() + c4 := client.B().Exec().Build() + resps := client.DoMulti(context.Background(), c1, c2, c3, c4) + if v, err := resps[0].ToString(); err != nil || v != "MULTI" { + t.Fatalf("unexpected response %v %v", v, err) + } + if v, err := resps[1].ToString(); err != nil || v != "SET K1{a} V1{a}" { + t.Fatalf("unexpected response %v %v", v, err) + } + if v, err := resps[2].ToString(); err != nil || v != "SET K2{a} V2{a}" { + t.Fatalf("unexpected response %v %v", v, err) + } + if v, err := resps[3].ToString(); err != nil || v != "EXEC" { + t.Fatalf("unexpected response %v %v", v, err) + } + }) + + t.Run("DoMulti Cross Slot + Init Slot", func(t *testing.T) { + defer func() { + if err := recover(); err != panicMixCxSlot { + t.Errorf("DoMulti should panic if Cross Slot + Init Slot") + } + }() + c1 := client.B().Get().Key("K1{a}").Build() + c2 := client.B().Get().Key("K1{b}").Build() + c3 := client.B().Info().Build() + client.DoMulti(context.Background(), c1, c2, c3) + }) + + t.Run("DoMulti Multi Slot All Read Operations", func(t *testing.T) { + multi := make([]Completed, 500) + for i := 0; i < len(multi); i++ { + multi[i] = client.B().Get().Key(fmt.Sprintf("K1{%d}", i)).Build() + } + resps := client.DoMulti(context.Background(), multi...) + for i := 0; i < len(multi); i++ { + if v, err := resps[i].ToString(); err != nil || v != fmt.Sprintf("GET K1{%d}", i) { + t.Fatalf("unexpected response %v %v", v, err) + } + } + }) + t.Run("DoMulti Multi Slot Read & Write Operations", func(t *testing.T) { + multi := make([]Completed, 500) + for i := 0; i < len(multi); i++ { + if i%2 == 0 { + multi[i] = client.B().Get().Key(fmt.Sprintf("K1{%d}", i)).Build() + } else { + multi[i] = client.B().Set().Key(fmt.Sprintf("K2{%d}", i)).Value(fmt.Sprintf("V2{%d}", i)).Build() + } + } + resps := client.DoMulti(context.Background(), multi...) + for i := 0; i < len(multi); i++ { + if i%2 == 0 { + if v, err := resps[i].ToString(); err != nil || v != fmt.Sprintf("GET K1{%d}", i) { + t.Fatalf("unexpected response %v %v", v, err) + } + } else { + if v, err := resps[i].ToString(); err != nil || v != fmt.Sprintf("SET K2{%d} V2{%d}", i, i) { + t.Fatalf("unexpected response %v %v", v, err) + } + } + } + }) + + t.Run("DoCache Operation", func(t *testing.T) { + c := client.B().Get().Key("DoCache").Cache() + if v, err := client.DoCache(context.Background(), c, 100).ToString(); err != nil || v != "GET DoCache" { + t.Fatalf("unexpected response %v %v", v, err) + } + }) + + t.Run("DoMultiCache Single Slot", func(t *testing.T) { + c1 := client.B().Get().Key("K1{a}").Cache() + c2 := client.B().Get().Key("K2{a}").Cache() + resps := client.DoMultiCache(context.Background(), CT(c1, time.Second), CT(c2, time.Second)) + if v, err := resps[0].ToString(); err != nil || v != "GET K1{a}" { + t.Fatalf("unexpected response %v %v", v, err) + } + if v, err := resps[1].ToString(); err != nil || v != "GET K2{a}" { + t.Fatalf("unexpected response %v %v", v, err) + } + }) + + t.Run("DoMultiCache Multi Slot", func(t *testing.T) { + multi := make([]CacheableTTL, 500) + for i := 0; i < len(multi); i++ { + multi[i] = CT(client.B().Get().Key(fmt.Sprintf("K1{%d}", i)).Cache(), time.Second) + } + resps := client.DoMultiCache(context.Background(), multi...) + for i := 0; i < len(multi); i++ { + if v, err := resps[i].ToString(); err != nil || v != fmt.Sprintf("GET K1{%d}", i) { + t.Fatalf("unexpected response %v %v", v, err) + } + } + }) + + t.Run("Receive", func(t *testing.T) { + c := client.B().Subscribe().Channel("ch").Build() + hdl := func(message PubSubMessage) {} + primaryNodeConn.ReceiveFn = func(ctx context.Context, subscribe Completed, fn func(message PubSubMessage)) error { + if !reflect.DeepEqual(subscribe.Commands(), c.Commands()) { + t.Fatalf("unexpected command %v", subscribe) + } + return nil + } + replicaNodeConn.ReceiveFn = func(ctx context.Context, subscribe Completed, fn func(message PubSubMessage)) error { + if !reflect.DeepEqual(subscribe.Commands(), c.Commands()) { + t.Fatalf("unexpected command %v", subscribe) + } + return nil + } + if err := client.Receive(context.Background(), c, hdl); err != nil { + t.Fatalf("unexpected response %v", err) + } + }) + + t.Run("Receive Redis Err", func(t *testing.T) { + c := client.B().Ssubscribe().Channel("ch").Build() + e := &RedisError{} + primaryNodeConn.ReceiveFn = func(ctx context.Context, subscribe Completed, fn func(message PubSubMessage)) error { + return e + } + replicaNodeConn.ReceiveFn = func(ctx context.Context, subscribe Completed, fn func(message PubSubMessage)) error { + return e + } + if err := client.Receive(context.Background(), c, func(message PubSubMessage) {}); err != e { + t.Fatalf("unexpected response %v", err) + } + }) + + t.Run("Dedicated Cross Slot Err", func(t *testing.T) { + defer func() { + if err := recover(); err != panicMsgCxSlot { + t.Errorf("Dedicated should panic if cross slots is used") + } + }() + primaryNodeConn.AcquireFn = func() wire { return &mockWire{} } + client.Dedicated(func(c DedicatedClient) error { + c.Do(context.Background(), c.B().Get().Key("a").Build()).Error() + return c.Do(context.Background(), c.B().Get().Key("b").Build()).Error() + }) + }) + + t.Run("Dedicated Cross Slot Err Multi", func(t *testing.T) { + defer func() { + if err := recover(); err != panicMsgCxSlot { + t.Errorf("Dedicated should panic if cross slots is used") + } + }() + primaryNodeConn.AcquireFn = func() wire { + return &mockWire{ + DoMultiFn: func(multi ...Completed) *redisresults { + return &redisresults{s: []RedisResult{ + newResult(RedisMessage{typ: '+', string: "OK"}, nil), + newResult(RedisMessage{typ: '+', string: "OK"}, nil), + newResult(RedisMessage{typ: '*', values: []RedisMessage{{typ: '+', string: "a"}}}, nil), + }} + }, + } + } + client.Dedicated(func(c DedicatedClient) (err error) { + c.DoMulti( + context.Background(), + c.B().Multi().Build(), + c.B().Get().Key("a").Build(), + c.B().Exec().Build(), + ) + c.DoMulti( + context.Background(), + c.B().Multi().Build(), + c.B().Get().Key("b").Build(), + c.B().Exec().Build(), + ) + return nil + }) + }) + + t.Run("Dedicated Multi Cross Slot Err", func(t *testing.T) { + primaryNodeConn.AcquireFn = func() wire { return &mockWire{} } + err := client.Dedicated(func(c DedicatedClient) (err error) { + defer func() { + err = errors.New(recover().(string)) + }() + c.DoMulti( + context.Background(), + c.B().Get().Key("a").Build(), + c.B().Get().Key("b").Build(), + ) + return nil + }) + if err == nil || err.Error() != panicMsgCxSlot { + t.Errorf("Multi should panic if cross slots is used") + } + }) + + t.Run("Dedicated Receive Redis Err", func(t *testing.T) { + e := &RedisError{} + w := &mockWire{ + ReceiveFn: func(ctx context.Context, subscribe Completed, fn func(message PubSubMessage)) error { + return e + }, + } + primaryNodeConn.AcquireFn = func() wire { + return w + } + if err := client.Dedicated(func(c DedicatedClient) error { + return c.Receive(context.Background(), c.B().Subscribe().Channel("a").Build(), func(msg PubSubMessage) {}) + }); err != e { + t.Fatalf("unexpected err %v", err) + } + }) + + t.Run("Dedicated", func(t *testing.T) { + closed := false + w := &mockWire{ + DoFn: func(cmd Completed) RedisResult { + return newResult(RedisMessage{typ: '+', string: "Delegate"}, nil) + }, + DoMultiFn: func(cmd ...Completed) *redisresults { + if len(cmd) == 4 { + return &redisresults{s: []RedisResult{ + newResult(RedisMessage{typ: '+', string: "OK"}, nil), + newResult(RedisMessage{typ: '+', string: "OK"}, nil), + newResult(RedisMessage{typ: '+', string: "OK"}, nil), + newResult(RedisMessage{typ: '*', values: []RedisMessage{ + {typ: '+', string: "Delegate0"}, + {typ: '+', string: "Delegate1"}, + }}, nil), + }} + } + return &redisresults{s: []RedisResult{ + newResult(RedisMessage{typ: '+', string: "Delegate0"}, nil), + newResult(RedisMessage{typ: '+', string: "Delegate1"}, nil), + }} + }, + ReceiveFn: func(ctx context.Context, subscribe Completed, fn func(message PubSubMessage)) error { + return ErrClosing + }, + SetPubSubHooksFn: func(hooks PubSubHooks) <-chan error { + ch := make(chan error, 1) + ch <- ErrClosing + close(ch) + return ch + }, + ErrorFn: func() error { + return ErrClosing + }, + CloseFn: func() { + closed = true + }, + } + primaryNodeConn.AcquireFn = func() wire { + return w + } + stored := false + primaryNodeConn.StoreFn = func(ww wire) { + if ww != w { + t.Fatalf("received unexpected wire %v", ww) + } + stored = true + } + if err := client.Dedicated(func(c DedicatedClient) error { + ch := c.SetPubSubHooks(PubSubHooks{OnMessage: func(m PubSubMessage) {}}) + if v, err := c.Do(context.Background(), c.B().Get().Key("a").Build()).ToString(); err != nil || v != "Delegate" { + t.Fatalf("unexpected response %v %v", v, err) + } + if v := c.DoMulti(context.Background()); len(v) != 0 { + t.Fatalf("received unexpected response %v", v) + } + for i, resp := range c.DoMulti( + context.Background(), + c.B().Info().Build(), + c.B().Info().Build(), + ) { + if v, err := resp.ToString(); err != nil || v != "Delegate"+strconv.Itoa(i) { + t.Fatalf("unexpected response %v %v", v, err) + } + } + for i, resp := range c.DoMulti( + context.Background(), + c.B().Get().Key("a").Build(), + c.B().Get().Key("a").Build(), + ) { + if v, err := resp.ToString(); err != nil || v != "Delegate"+strconv.Itoa(i) { + t.Fatalf("unexpected response %v %v", v, err) + } + } + for i, resp := range c.DoMulti( + context.Background(), + c.B().Multi().Build(), + c.B().Get().Key("a").Build(), + c.B().Get().Key("a").Build(), + c.B().Exec().Build(), + )[3].val.values { + if v, err := resp.ToString(); err != nil || v != "Delegate"+strconv.Itoa(i) { + t.Fatalf("unexpected response %v %v", v, err) + } + } + if err := c.Receive(context.Background(), c.B().Ssubscribe().Channel("a").Build(), func(msg PubSubMessage) {}); err != ErrClosing { + t.Fatalf("unexpected ret %v", err) + } + if err := <-ch; err != ErrClosing { + t.Fatalf("unexpected ret %v", err) + } + if err := <-c.SetPubSubHooks(PubSubHooks{OnMessage: func(m PubSubMessage) {}}); err != ErrClosing { + t.Fatalf("unexpected ret %v", err) + } + c.Close() + return nil + }); err != nil { + t.Fatalf("unexpected err %v", err) + } + if !stored { + t.Fatalf("Dedicated desn't put back the wire") + } + if !closed { + t.Fatalf("Dedicated desn't delegate Close") + } + }) + + t.Run("Dedicate", func(t *testing.T) { + closed := false + w := &mockWire{ + DoFn: func(cmd Completed) RedisResult { + return newResult(RedisMessage{typ: '+', string: "Delegate"}, nil) + }, + DoMultiFn: func(cmd ...Completed) *redisresults { + if len(cmd) == 4 { + return &redisresults{s: []RedisResult{ + newResult(RedisMessage{typ: '+', string: "OK"}, nil), + newResult(RedisMessage{typ: '+', string: "OK"}, nil), + newResult(RedisMessage{typ: '+', string: "OK"}, nil), + newResult(RedisMessage{typ: '*', values: []RedisMessage{ + {typ: '+', string: "Delegate0"}, + {typ: '+', string: "Delegate1"}, + }}, nil), + }} + } + return &redisresults{s: []RedisResult{ + newResult(RedisMessage{typ: '+', string: "Delegate0"}, nil), + newResult(RedisMessage{typ: '+', string: "Delegate1"}, nil), + }} + }, + ReceiveFn: func(ctx context.Context, subscribe Completed, fn func(message PubSubMessage)) error { + return ErrClosing + }, + SetPubSubHooksFn: func(hooks PubSubHooks) <-chan error { + ch := make(chan error, 1) + ch <- ErrClosing + close(ch) + return ch + }, + ErrorFn: func() error { + return ErrClosing + }, + CloseFn: func() { + closed = true + }, + } + primaryNodeConn.AcquireFn = func() wire { + return w + } + stored := false + primaryNodeConn.StoreFn = func(ww wire) { + if ww != w { + t.Fatalf("received unexpected wire %v", ww) + } + stored = true + } + c, cancel := client.Dedicate() + ch := c.SetPubSubHooks(PubSubHooks{OnMessage: func(m PubSubMessage) {}}) + if v, err := c.Do(context.Background(), c.B().Get().Key("a").Build()).ToString(); err != nil || v != "Delegate" { + t.Fatalf("unexpected response %v %v", v, err) + } + if v := c.DoMulti(context.Background()); len(v) != 0 { + t.Fatalf("received unexpected response %v", v) + } + for i, resp := range c.DoMulti( + context.Background(), + c.B().Info().Build(), + c.B().Info().Build(), + ) { + if v, err := resp.ToString(); err != nil || v != "Delegate"+strconv.Itoa(i) { + t.Fatalf("unexpected response %v %v", v, err) + } + } + for i, resp := range c.DoMulti( + context.Background(), + c.B().Get().Key("a").Build(), + c.B().Get().Key("a").Build(), + ) { + if v, err := resp.ToString(); err != nil || v != "Delegate"+strconv.Itoa(i) { + t.Fatalf("unexpected response %v %v", v, err) + } + } + for i, resp := range c.DoMulti( + context.Background(), + c.B().Multi().Build(), + c.B().Get().Key("a").Build(), + c.B().Get().Key("a").Build(), + c.B().Exec().Build(), + )[3].val.values { + if v, err := resp.ToString(); err != nil || v != "Delegate"+strconv.Itoa(i) { + t.Fatalf("unexpected response %v %v", v, err) + } + } + if err := c.Receive(context.Background(), c.B().Ssubscribe().Channel("a").Build(), func(msg PubSubMessage) {}); err != ErrClosing { + t.Fatalf("unexpected ret %v", err) + } + if err := <-ch; err != ErrClosing { + t.Fatalf("unexpected ret %v", err) + } + if err := <-c.SetPubSubHooks(PubSubHooks{OnMessage: func(m PubSubMessage) {}}); err != ErrClosing { + t.Fatalf("unexpected ret %v", err) + } + c.Close() + cancel() + + if !stored { + t.Fatalf("Dedicated desn't put back the wire") + } + if !closed { + t.Fatalf("Dedicated desn't delegate Close") + } + }) +} From 76f58a3081c07d67e5cd50f24bc718a390af966a Mon Sep 17 00:00:00 2001 From: proost Date: Fri, 13 Dec 2024 00:18:58 +0900 Subject: [PATCH 5/8] refactor: use sendtoreplicas and replicaselector --- cluster.go | 26 +- cluster_test.go | 1036 ++++++++++++++++++++++++++++++++++++++++++++++- rueidis.go | 13 +- rueidis_test.go | 15 +- 4 files changed, 1051 insertions(+), 39 deletions(-) diff --git a/cluster.go b/cluster.go index b9613c9a..a0e658b7 100644 --- a/cluster.go +++ b/cluster.go @@ -18,8 +18,8 @@ import ( var ErrNoSlot = errors.New("the slot has no redis node") var ErrReplicaOnlyConflict = errors.New("ReplicaOnly conflicts with SendToReplicas option") var ErrInvalidShardsRefreshInterval = errors.New("ShardsRefreshInterval must be greater than or equal to 0") -var ErrReplicaOnlyConflictWithReaderNodeSelector = errors.New("ReplicaOnly conflicts with ReaderNodeSelector option") -var ErrSendToReplicasConflictWithReaderNodeSelector = errors.New("SendToReplicas conflicts with ReaderNodeSelector option") +var ErrReplicaOnlyConflictWithReplicaSelector = errors.New("ReplicaOnly conflicts with ReplicaSelector option") +var ErrSendToReplicasNotSet = errors.New("SendToReplicas must be set when ReplicaSelector is set") type clusterClient struct { pslots [16384]conn @@ -43,10 +43,6 @@ type connrole struct { replica bool } -var sendToReader = func(cmd Completed) bool { - return cmd.IsReadOnly() -} - var replicaOnlySelector = func(_ uint16, replicas []ReplicaInfo) int { return util.FastRand(len(replicas)) } @@ -65,20 +61,18 @@ func newClusterClient(opt *ClientOption, connFn connFn, retryer retryHandler) (* if opt.ReplicaOnly && opt.SendToReplicas != nil { return nil, ErrReplicaOnlyConflict } - if opt.ReplicaOnly && opt.ReaderNodeSelector != nil { - return nil, ErrReplicaOnlyConflictWithReaderNodeSelector + if opt.ReplicaOnly && opt.ReplicaSelector != nil { + return nil, ErrReplicaOnlyConflictWithReplicaSelector } - if opt.SendToReplicas != nil && opt.ReaderNodeSelector != nil { - return nil, ErrSendToReplicasConflictWithReaderNodeSelector + if opt.ReplicaSelector != nil && opt.SendToReplicas == nil { + return nil, ErrSendToReplicasNotSet } - if opt.SendToReplicas != nil { - opt.ReaderNodeSelector = replicaOnlySelector - } else if opt.ReaderNodeSelector != nil { - opt.SendToReplicas = sendToReader + if opt.SendToReplicas != nil && opt.ReplicaSelector == nil { + opt.ReplicaSelector = replicaOnlySelector } - if opt.SendToReplicas != nil || opt.ReaderNodeSelector != nil { + if opt.SendToReplicas != nil { rOpt := *opt rOpt.ReplicaOnly = true client.rOpt = &rOpt @@ -273,7 +267,7 @@ func (c *clusterClient) _refresh() (err error) { for i := slot[0]; i <= slot[1]; i++ { pslots[i] = conns[master].conn - rIndex := c.opt.ReaderNodeSelector(uint16(i), replicas) + rIndex := c.opt.ReplicaSelector(uint16(i), replicas) if rIndex >= 0 && rIndex < n { rslots[i] = conns[g.nodes[1+rIndex]].conn } else { diff --git a/cluster_test.go b/cluster_test.go index 503655cf..e21694ca 100644 --- a/cluster_test.go +++ b/cluster_test.go @@ -1118,7 +1118,7 @@ func TestClusterClientInit(t *testing.T) { } }) - t.Run("Refresh cluster which has only primary node per shard with ReaderNodeSelector option", func(t *testing.T) { + t.Run("Refresh cluster which has only primary node per shard with ReplicaSelector option", func(t *testing.T) { m := &mockConn{ DoFn: func(cmd Completed) RedisResult { if strings.Join(cmd.Commands(), " ") == "CLUSTER SLOTS" { @@ -1131,7 +1131,10 @@ func TestClusterClientInit(t *testing.T) { client, err := newClusterClient( &ClientOption{ InitAddress: []string{"127.0.0.1:0"}, - ReaderNodeSelector: func(slot uint16, replicas []ReplicaInfo) int { + SendToReplicas: func(cmd Completed) bool { + return true + }, + ReplicaSelector: func(slot uint16, replicas []ReplicaInfo) int { return 0 }, }, @@ -1171,7 +1174,7 @@ func TestClusterClientInit(t *testing.T) { } }) - t.Run("Refresh cluster which has multi replicas per shard with ReaderNodeSelector option. Returned index is within range", func(t *testing.T) { + t.Run("Refresh cluster which has multi replicas per shard with ReplicaSelector option. Returned index is within range", func(t *testing.T) { primaryNodeConn := &mockConn{ DoFn: func(cmd Completed) RedisResult { if strings.Join(cmd.Commands(), " ") == "CLUSTER SLOTS" { @@ -1207,7 +1210,10 @@ func TestClusterClientInit(t *testing.T) { client, err := newClusterClient( &ClientOption{ InitAddress: []string{"127.0.0.1:0"}, - ReaderNodeSelector: func(slot uint16, replicas []ReplicaInfo) int { + SendToReplicas: func(cmd Completed) bool { + return true + }, + ReplicaSelector: func(slot uint16, replicas []ReplicaInfo) int { return 1 }, }, @@ -1255,7 +1261,7 @@ func TestClusterClientInit(t *testing.T) { } }) - t.Run("Refresh cluster which has multi replicas per shard with ReaderNodeSelector option. Returned index is out of range", func(t *testing.T) { + t.Run("Refresh cluster which has multi replicas per shard with ReplicaSelector option. Returned index is out of range", func(t *testing.T) { primaryNodeConn := &mockConn{ DoFn: func(cmd Completed) RedisResult { if strings.Join(cmd.Commands(), " ") == "CLUSTER SLOTS" { @@ -1291,7 +1297,10 @@ func TestClusterClientInit(t *testing.T) { client, err := newClusterClient( &ClientOption{ InitAddress: []string{"127.0.0.1:0"}, - ReaderNodeSelector: func(slot uint16, replicas []ReplicaInfo) int { + SendToReplicas: func(cmd Completed) bool { + return true + }, + ReplicaSelector: func(slot uint16, replicas []ReplicaInfo) int { return -1 }, }, @@ -5455,7 +5464,7 @@ func TestClusterClientLoadingRetry(t *testing.T) { } //gocyclo:ignore -func TestClusterClient_SendReadOperationToReaderNodeWriteOperationToPrimaryNode(t *testing.T) { +func TestClusterClient_SendReadOperationToReplicaNodeWriteOperationToPrimaryNode(t *testing.T) { defer ShouldNotLeaked(SetupLeakDetection()) primaryNodeConn := &mockConn{ @@ -5556,7 +5565,10 @@ func TestClusterClient_SendReadOperationToReaderNodeWriteOperationToPrimaryNode( client, err := newClusterClient( &ClientOption{ InitAddress: []string{"127.0.0.1:0"}, - ReaderNodeSelector: func(slot uint16, replicas []ReplicaInfo) int { + SendToReplicas: func(cmd Completed) bool { + return cmd.IsReadOnly() + }, + ReplicaSelector: func(slot uint16, replicas []ReplicaInfo) int { return 0 }, }, @@ -6047,3 +6059,1011 @@ func TestClusterClient_SendReadOperationToReaderNodeWriteOperationToPrimaryNode( } }) } + +//gocyclo:ignore +func TestClusterClient_SendToOnlyPrimaryNodeWhenPrimaryNodeSelected(t *testing.T) { + defer ShouldNotLeaked(SetupLeakDetection()) + + primaryNodeConn := &mockConn{ + DoOverride: map[string]func(cmd Completed) RedisResult{ + "CLUSTER SLOTS": func(cmd Completed) RedisResult { + return slotsMultiResp + }, + "GET Do": func(cmd Completed) RedisResult { + return newResult(RedisMessage{typ: '+', string: "GET Do"}, nil) + }, + "GET K1{a}": func(cmd Completed) RedisResult { + return newResult(RedisMessage{typ: '+', string: "GET K1{a}"}, nil) + }, + "GET K2{a}": func(cmd Completed) RedisResult { + return newResult(RedisMessage{typ: '+', string: "GET K2{a}"}, nil) + }, + "INFO": func(cmd Completed) RedisResult { + return newResult(RedisMessage{typ: '+', string: "INFO"}, nil) + }, + }, + DoMultiFn: func(multi ...Completed) *redisresults { + resps := make([]RedisResult, len(multi)) + for i, cmd := range multi { + resps[i] = newResult(RedisMessage{typ: '+', string: strings.Join(cmd.Commands(), " ")}, nil) + } + return &redisresults{s: resps} + }, + DoCacheOverride: map[string]func(cmd Cacheable, ttl time.Duration) RedisResult{ + "GET DoCache": func(cmd Cacheable, ttl time.Duration) RedisResult { + return newResult(RedisMessage{typ: '+', string: "GET DoCache"}, nil) + }, + "GET K1{a}": func(cmd Cacheable, ttl time.Duration) RedisResult { + return newResult(RedisMessage{typ: '+', string: "GET K1{a}"}, nil) + }, + "GET K2{a}": func(cmd Cacheable, ttl time.Duration) RedisResult { + return newResult(RedisMessage{typ: '+', string: "GET K2{a}"}, nil) + }, + }, + DoMultiCacheFn: func(multi ...CacheableTTL) *redisresults { + resps := make([]RedisResult, len(multi)) + for i, cmd := range multi { + resps[i] = newResult(RedisMessage{typ: '+', string: strings.Join(cmd.Cmd.Commands(), " ")}, nil) + } + return &redisresults{s: resps} + }, + } + replicaNodeConn := &mockConn{} + + client, err := newClusterClient( + &ClientOption{ + InitAddress: []string{"127.0.0.1:0"}, + SendToReplicas: func(cmd Completed) bool { + return true + }, + ReplicaSelector: func(slot uint16, replicas []ReplicaInfo) int { + return -1 + }, + }, + func(dst string, opt *ClientOption) conn { + if dst == "127.0.0.1:0" || dst == "127.0.2.1:0" { // primary nodes + return primaryNodeConn + } else { // replica nodes + return replicaNodeConn + } + }, + newRetryer(defaultRetryDelayFn), + ) + if err != nil { + t.Fatalf("unexpected err %v", err) + } + + t.Run("Do with no slot", func(t *testing.T) { + c := client.B().Info().Build() + if v, err := client.Do(context.Background(), c).ToString(); err != nil || v != "INFO" { + t.Fatalf("unexpected response %v %v", v, err) + } + }) + + t.Run("Do", func(t *testing.T) { + c := client.B().Get().Key("Do").Build() + if v, err := client.Do(context.Background(), c).ToString(); err != nil || v != "GET Do" { + t.Fatalf("unexpected response %v %v", v, err) + } + }) + + t.Run("DoMulti Single Slot", func(t *testing.T) { + c1 := client.B().Get().Key("K1{a}").Build() + c2 := client.B().Get().Key("K2{a}").Build() + resps := client.DoMulti(context.Background(), c1, c2) + if v, err := resps[0].ToString(); err != nil || v != "GET K1{a}" { + t.Fatalf("unexpected response %v %v", v, err) + } + if v, err := resps[1].ToString(); err != nil || v != "GET K2{a}" { + t.Fatalf("unexpected response %v %v", v, err) + } + }) + + t.Run("DoMulti Single Slot + Init Slot", func(t *testing.T) { + c1 := client.B().Get().Key("K1{a}").Build() + c2 := client.B().Info().Build() + resps := client.DoMulti(context.Background(), c1, c2) + if v, err := resps[0].ToString(); err != nil || v != "GET K1{a}" { + t.Fatalf("unexpected response %v %v", v, err) + } + if v, err := resps[1].ToString(); err != nil || v != "INFO" { + t.Fatalf("unexpected response %v %v", v, err) + } + }) + + t.Run("DoMulti Cross Slot + Init Slot", func(t *testing.T) { + defer func() { + if err := recover(); err != panicMixCxSlot { + t.Errorf("DoMulti should panic if Cross Slot + Init Slot") + } + }() + c1 := client.B().Get().Key("K1{a}").Build() + c2 := client.B().Get().Key("K1{b}").Build() + c3 := client.B().Info().Build() + client.DoMulti(context.Background(), c1, c2, c3) + }) + + t.Run("DoMulti Multi Slot", func(t *testing.T) { + multi := make([]Completed, 500) + for i := 0; i < len(multi); i++ { + multi[i] = client.B().Get().Key(fmt.Sprintf("K1{%d}", i)).Build() + } + resps := client.DoMulti(context.Background(), multi...) + for i := 0; i < len(multi); i++ { + if v, err := resps[i].ToString(); err != nil || v != fmt.Sprintf("GET K1{%d}", i) { + t.Fatalf("unexpected response %v %v", v, err) + } + } + }) + + t.Run("DoCache", func(t *testing.T) { + c := client.B().Get().Key("DoCache").Cache() + if v, err := client.DoCache(context.Background(), c, 100).ToString(); err != nil || v != "GET DoCache" { + t.Fatalf("unexpected response %v %v", v, err) + } + }) + + t.Run("DoMultiCache Single Slot", func(t *testing.T) { + c1 := client.B().Get().Key("K1{a}").Cache() + c2 := client.B().Get().Key("K2{a}").Cache() + resps := client.DoMultiCache(context.Background(), CT(c1, time.Second), CT(c2, time.Second)) + if v, err := resps[0].ToString(); err != nil || v != "GET K1{a}" { + t.Fatalf("unexpected response %v %v", v, err) + } + if v, err := resps[1].ToString(); err != nil || v != "GET K2{a}" { + t.Fatalf("unexpected response %v %v", v, err) + } + }) + + t.Run("DoMultiCache Multi Slot", func(t *testing.T) { + multi := make([]CacheableTTL, 500) + for i := 0; i < len(multi); i++ { + multi[i] = CT(client.B().Get().Key(fmt.Sprintf("K1{%d}", i)).Cache(), time.Second) + } + resps := client.DoMultiCache(context.Background(), multi...) + for i := 0; i < len(multi); i++ { + if v, err := resps[i].ToString(); err != nil || v != fmt.Sprintf("GET K1{%d}", i) { + t.Fatalf("unexpected response %v %v", v, err) + } + } + }) + + t.Run("Receive", func(t *testing.T) { + c := client.B().Subscribe().Channel("ch").Build() + hdl := func(message PubSubMessage) {} + primaryNodeConn.ReceiveFn = func(ctx context.Context, subscribe Completed, fn func(message PubSubMessage)) error { + if !reflect.DeepEqual(subscribe.Commands(), c.Commands()) { + t.Fatalf("unexpected command %v", subscribe) + } + return nil + } + replicaNodeConn.ReceiveFn = func(ctx context.Context, subscribe Completed, fn func(message PubSubMessage)) error { + if !reflect.DeepEqual(subscribe.Commands(), c.Commands()) { + t.Fatalf("unexpected command %v", subscribe) + } + return nil + } + + if err := client.Receive(context.Background(), c, hdl); err != nil { + t.Fatalf("unexpected response %v", err) + } + }) + + t.Run("Receive Redis Err", func(t *testing.T) { + c := client.B().Subscribe().Channel("ch").Build() + e := &RedisError{} + primaryNodeConn.ReceiveFn = func(ctx context.Context, subscribe Completed, fn func(message PubSubMessage)) error { + return e + } + replicaNodeConn.ReceiveFn = func(ctx context.Context, subscribe Completed, fn func(message PubSubMessage)) error { + return e + } + if err := client.Receive(context.Background(), c, func(message PubSubMessage) {}); err != e { + t.Fatalf("unexpected response %v", err) + } + }) + + t.Run("Dedicated Err", func(t *testing.T) { + v := errors.New("fn err") + if err := client.Dedicated(func(client DedicatedClient) error { + return v + }); err != v { + t.Fatalf("unexpected err %v", err) + } + }) + + t.Run("Dedicated Cross Slot Err", func(t *testing.T) { + defer func() { + if err := recover(); err != panicMsgCxSlot { + t.Errorf("Dedicated should panic if cross slots is used") + } + }() + primaryNodeConn.AcquireFn = func() wire { return &mockWire{} } + client.Dedicated(func(c DedicatedClient) error { + c.Do(context.Background(), c.B().Get().Key("a").Build()).Error() + return c.Do(context.Background(), c.B().Get().Key("b").Build()).Error() + }) + }) + + t.Run("Dedicated Cross Slot Err Multi", func(t *testing.T) { + defer func() { + if err := recover(); err != panicMsgCxSlot { + t.Errorf("Dedicated should panic if cross slots is used") + } + }() + primaryNodeConn.AcquireFn = func() wire { + return &mockWire{ + DoMultiFn: func(multi ...Completed) *redisresults { + return &redisresults{s: []RedisResult{ + newResult(RedisMessage{typ: '+', string: "OK"}, nil), + newResult(RedisMessage{typ: '+', string: "OK"}, nil), + newResult(RedisMessage{typ: '*', values: []RedisMessage{{typ: '+', string: "a"}}}, nil), + }} + }, + } + } + client.Dedicated(func(c DedicatedClient) (err error) { + c.DoMulti( + context.Background(), + c.B().Multi().Build(), + c.B().Get().Key("a").Build(), + c.B().Exec().Build(), + ) + c.DoMulti( + context.Background(), + c.B().Multi().Build(), + c.B().Get().Key("b").Build(), + c.B().Exec().Build(), + ) + return nil + }) + }) + + t.Run("Dedicated Multi Cross Slot Err", func(t *testing.T) { + primaryNodeConn.AcquireFn = func() wire { return &mockWire{} } + err := client.Dedicated(func(c DedicatedClient) (err error) { + defer func() { + err = errors.New(recover().(string)) + }() + c.DoMulti( + context.Background(), + c.B().Get().Key("a").Build(), + c.B().Get().Key("b").Build(), + ) + return nil + }) + if err == nil || err.Error() != panicMsgCxSlot { + t.Errorf("Multi should panic if cross slots is used") + } + }) + + t.Run("Dedicated Receive Redis Err", func(t *testing.T) { + e := &RedisError{} + w := &mockWire{ + ReceiveFn: func(ctx context.Context, subscribe Completed, fn func(message PubSubMessage)) error { + return e + }, + } + primaryNodeConn.AcquireFn = func() wire { + return w + } + if err := client.Dedicated(func(c DedicatedClient) error { + return c.Receive(context.Background(), c.B().Ssubscribe().Channel("a").Build(), func(msg PubSubMessage) {}) + }); err != e { + t.Fatalf("unexpected err %v", err) + } + }) + + t.Run("Dedicated", func(t *testing.T) { + closed := false + w := &mockWire{ + DoFn: func(cmd Completed) RedisResult { + return newResult(RedisMessage{typ: '+', string: "Delegate"}, nil) + }, + DoMultiFn: func(cmd ...Completed) *redisresults { + if len(cmd) == 4 { + return &redisresults{s: []RedisResult{ + newResult(RedisMessage{typ: '+', string: "OK"}, nil), + newResult(RedisMessage{typ: '+', string: "OK"}, nil), + newResult(RedisMessage{typ: '+', string: "OK"}, nil), + newResult(RedisMessage{typ: '*', values: []RedisMessage{ + {typ: '+', string: "Delegate0"}, + {typ: '+', string: "Delegate1"}, + }}, nil), + }} + } + return &redisresults{s: []RedisResult{ + newResult(RedisMessage{typ: '+', string: "Delegate0"}, nil), + newResult(RedisMessage{typ: '+', string: "Delegate1"}, nil), + }} + }, + ReceiveFn: func(ctx context.Context, subscribe Completed, fn func(message PubSubMessage)) error { + return ErrClosing + }, + SetPubSubHooksFn: func(hooks PubSubHooks) <-chan error { + ch := make(chan error, 1) + ch <- ErrClosing + close(ch) + return ch + }, + ErrorFn: func() error { + return ErrClosing + }, + CloseFn: func() { + closed = true + }, + } + primaryNodeConn.AcquireFn = func() wire { + return w + } + stored := false + primaryNodeConn.StoreFn = func(ww wire) { + if ww != w { + t.Fatalf("received unexpected wire %v", ww) + } + stored = true + } + if err := client.Dedicated(func(c DedicatedClient) error { + ch := c.SetPubSubHooks(PubSubHooks{OnMessage: func(m PubSubMessage) {}}) + if v, err := c.Do(context.Background(), c.B().Get().Key("a").Build()).ToString(); err != nil || v != "Delegate" { + t.Fatalf("unexpected response %v %v", v, err) + } + if v := c.DoMulti(context.Background()); len(v) != 0 { + t.Fatalf("received unexpected response %v", v) + } + for i, resp := range c.DoMulti( + context.Background(), + c.B().Info().Build(), + c.B().Info().Build(), + ) { + if v, err := resp.ToString(); err != nil || v != "Delegate"+strconv.Itoa(i) { + t.Fatalf("unexpected response %v %v", v, err) + } + } + for i, resp := range c.DoMulti( + context.Background(), + c.B().Get().Key("a").Build(), + c.B().Get().Key("a").Build(), + ) { + if v, err := resp.ToString(); err != nil || v != "Delegate"+strconv.Itoa(i) { + t.Fatalf("unexpected response %v %v", v, err) + } + } + for i, resp := range c.DoMulti( + context.Background(), + c.B().Multi().Build(), + c.B().Get().Key("a").Build(), + c.B().Get().Key("a").Build(), + c.B().Exec().Build(), + )[3].val.values { + if v, err := resp.ToString(); err != nil || v != "Delegate"+strconv.Itoa(i) { + t.Fatalf("unexpected response %v %v", v, err) + } + } + if err := c.Receive(context.Background(), c.B().Ssubscribe().Channel("a").Build(), func(msg PubSubMessage) {}); err != ErrClosing { + t.Fatalf("unexpected ret %v", err) + } + if err := <-ch; err != ErrClosing { + t.Fatalf("unexpected ret %v", err) + } + if err := <-c.SetPubSubHooks(PubSubHooks{OnMessage: func(m PubSubMessage) {}}); err != ErrClosing { + t.Fatalf("unexpected ret %v", err) + } + c.Close() + return nil + }); err != nil { + t.Fatalf("unexpected err %v", err) + } + if !stored { + t.Fatalf("Dedicated desn't put back the wire") + } + if !closed { + t.Fatalf("Dedicated desn't delegate Close") + } + }) + + t.Run("Dedicate", func(t *testing.T) { + closed := false + w := &mockWire{ + DoFn: func(cmd Completed) RedisResult { + return newResult(RedisMessage{typ: '+', string: "Delegate"}, nil) + }, + DoMultiFn: func(cmd ...Completed) *redisresults { + if len(cmd) == 4 { + return &redisresults{s: []RedisResult{ + newResult(RedisMessage{typ: '+', string: "OK"}, nil), + newResult(RedisMessage{typ: '+', string: "OK"}, nil), + newResult(RedisMessage{typ: '+', string: "OK"}, nil), + newResult(RedisMessage{typ: '*', values: []RedisMessage{ + {typ: '+', string: "Delegate0"}, + {typ: '+', string: "Delegate1"}, + }}, nil), + }} + } + return &redisresults{s: []RedisResult{ + newResult(RedisMessage{typ: '+', string: "Delegate0"}, nil), + newResult(RedisMessage{typ: '+', string: "Delegate1"}, nil), + }} + }, + ReceiveFn: func(ctx context.Context, subscribe Completed, fn func(message PubSubMessage)) error { + return ErrClosing + }, + SetPubSubHooksFn: func(hooks PubSubHooks) <-chan error { + ch := make(chan error, 1) + ch <- ErrClosing + close(ch) + return ch + }, + ErrorFn: func() error { + return ErrClosing + }, + CloseFn: func() { + closed = true + }, + } + primaryNodeConn.AcquireFn = func() wire { + return w + } + stored := false + primaryNodeConn.StoreFn = func(ww wire) { + if ww != w { + t.Fatalf("received unexpected wire %v", ww) + } + stored = true + } + c, cancel := client.Dedicate() + ch := c.SetPubSubHooks(PubSubHooks{OnMessage: func(m PubSubMessage) {}}) + if v, err := c.Do(context.Background(), c.B().Get().Key("a").Build()).ToString(); err != nil || v != "Delegate" { + t.Fatalf("unexpected response %v %v", v, err) + } + if v := c.DoMulti(context.Background()); len(v) != 0 { + t.Fatalf("received unexpected response %v", v) + } + for i, resp := range c.DoMulti( + context.Background(), + c.B().Info().Build(), + c.B().Info().Build(), + ) { + if v, err := resp.ToString(); err != nil || v != "Delegate"+strconv.Itoa(i) { + t.Fatalf("unexpected response %v %v", v, err) + } + } + for i, resp := range c.DoMulti( + context.Background(), + c.B().Get().Key("a").Build(), + c.B().Get().Key("a").Build(), + ) { + if v, err := resp.ToString(); err != nil || v != "Delegate"+strconv.Itoa(i) { + t.Fatalf("unexpected response %v %v", v, err) + } + } + for i, resp := range c.DoMulti( + context.Background(), + c.B().Multi().Build(), + c.B().Get().Key("a").Build(), + c.B().Get().Key("a").Build(), + c.B().Exec().Build(), + )[3].val.values { + if v, err := resp.ToString(); err != nil || v != "Delegate"+strconv.Itoa(i) { + t.Fatalf("unexpected response %v %v", v, err) + } + } + if err := c.Receive(context.Background(), c.B().Ssubscribe().Channel("a").Build(), func(msg PubSubMessage) {}); err != ErrClosing { + t.Fatalf("unexpected ret %v", err) + } + if err := <-ch; err != ErrClosing { + t.Fatalf("unexpected ret %v", err) + } + if err := <-c.SetPubSubHooks(PubSubHooks{OnMessage: func(m PubSubMessage) {}}); err != ErrClosing { + t.Fatalf("unexpected ret %v", err) + } + c.Close() + cancel() + + if !stored { + t.Fatalf("Dedicated desn't put back the wire") + } + if !closed { + t.Fatalf("Dedicated desn't delegate Close") + } + }) +} + +//gocyclo:ignore +func TestClusterClient_SendToOnlyReplicaNodeWhenReplicaNodeSelected(t *testing.T) { + defer ShouldNotLeaked(SetupLeakDetection()) + + primaryNodeConn := &mockConn{ + DoOverride: map[string]func(cmd Completed) RedisResult{ + "CLUSTER SLOTS": func(cmd Completed) RedisResult { + return slotsMultiResp + }, + "INFO": func(cmd Completed) RedisResult { + return newResult(RedisMessage{typ: '+', string: "INFO"}, nil) + }, + "GET K1{a}": func(cmd Completed) RedisResult { + return newResult(RedisMessage{typ: '+', string: "GET K1{a}"}, nil) + }, + }, + } + replicaNodeConn := &mockConn{ + DoMultiFn: func(multi ...Completed) *redisresults { + resps := make([]RedisResult, len(multi)) + for i, cmd := range multi { + resps[i] = newResult(RedisMessage{typ: '+', string: strings.Join(cmd.Commands(), " ")}, nil) + } + return &redisresults{s: resps} + }, + DoMultiCacheFn: func(multi ...CacheableTTL) *redisresults { + resps := make([]RedisResult, len(multi)) + for i, cmd := range multi { + resps[i] = newResult(RedisMessage{typ: '+', string: strings.Join(cmd.Cmd.Commands(), " ")}, nil) + } + return &redisresults{s: resps} + }, + DoOverride: map[string]func(cmd Completed) RedisResult{ + "GET Do": func(cmd Completed) RedisResult { + return newResult(RedisMessage{typ: '+', string: "GET Do"}, nil) + }, + "GET K1{a}": func(cmd Completed) RedisResult { + return newResult(RedisMessage{typ: '+', string: "GET K1{a}"}, nil) + }, + "GET K2{a}": func(cmd Completed) RedisResult { + return newResult(RedisMessage{typ: '+', string: "GET K2{a}"}, nil) + }, + }, + DoCacheOverride: map[string]func(cmd Cacheable, ttl time.Duration) RedisResult{ + "GET DoCache": func(cmd Cacheable, ttl time.Duration) RedisResult { + return newResult(RedisMessage{typ: '+', string: "GET DoCache"}, nil) + }, + }, + } + + client, err := newClusterClient( + &ClientOption{ + InitAddress: []string{"127.0.0.1:0"}, + SendToReplicas: func(cmd Completed) bool { + return true + }, + ReplicaSelector: func(slot uint16, replicas []ReplicaInfo) int { + return 0 + }, + }, + func(dst string, opt *ClientOption) conn { + if dst == "127.0.0.1:0" || dst == "127.0.2.1:0" { // primary nodes + return primaryNodeConn + } else { // replica nodes + return replicaNodeConn + } + }, + newRetryer(defaultRetryDelayFn), + ) + if err != nil { + t.Fatalf("unexpected err %v", err) + } + + t.Run("Do with no slot", func(t *testing.T) { + c := client.B().Info().Build() + if v, err := client.Do(context.Background(), c).ToString(); err != nil || v != "INFO" { + t.Fatalf("unexpected response %v %v", v, err) + } + }) + + t.Run("Do", func(t *testing.T) { + c := client.B().Get().Key("Do").Build() + if v, err := client.Do(context.Background(), c).ToString(); err != nil || v != "GET Do" { + t.Fatalf("unexpected response %v %v", v, err) + } + }) + + t.Run("DoMulti Single Slot", func(t *testing.T) { + c1 := client.B().Get().Key("K1{a}").Build() + c2 := client.B().Get().Key("K2{a}").Build() + resps := client.DoMulti(context.Background(), c1, c2) + if v, err := resps[0].ToString(); err != nil || v != "GET K1{a}" { + t.Fatalf("unexpected response %v %v", v, err) + } + if v, err := resps[1].ToString(); err != nil || v != "GET K2{a}" { + t.Fatalf("unexpected response %v %v", v, err) + } + }) + + t.Run("DoMulti Single Slot + Init Slot", func(t *testing.T) { + c1 := client.B().Get().Key("K1{a}").Build() + c2 := client.B().Info().Build() + resps := client.DoMulti(context.Background(), c1, c2) + if v, err := resps[0].ToString(); err != nil || v != "GET K1{a}" { + t.Fatalf("unexpected response %v %v", v, err) + } + if v, err := resps[1].ToString(); err != nil || v != "INFO" { + t.Fatalf("unexpected response %v %v", v, err) + } + }) + + t.Run("DoMulti Cross Slot + Init Slot", func(t *testing.T) { + defer func() { + if err := recover(); err != panicMixCxSlot { + t.Errorf("DoMulti should panic if Cross Slot + Init Slot") + } + }() + c1 := client.B().Get().Key("K1{a}").Build() + c2 := client.B().Get().Key("K1{b}").Build() + c3 := client.B().Info().Build() + client.DoMulti(context.Background(), c1, c2, c3) + }) + + t.Run("DoMulti Multi Slot", func(t *testing.T) { + multi := make([]Completed, 500) + for i := 0; i < len(multi); i++ { + multi[i] = client.B().Get().Key(fmt.Sprintf("K1{%d}", i)).Build() + } + resps := client.DoMulti(context.Background(), multi...) + for i := 0; i < len(multi); i++ { + if v, err := resps[i].ToString(); err != nil || v != fmt.Sprintf("GET K1{%d}", i) { + t.Fatalf("unexpected response %v %v", v, err) + } + } + }) + + t.Run("DoCache", func(t *testing.T) { + c := client.B().Get().Key("DoCache").Cache() + if v, err := client.DoCache(context.Background(), c, 100).ToString(); err != nil || v != "GET DoCache" { + t.Fatalf("unexpected response %v %v", v, err) + } + }) + + t.Run("DoMultiCache Single Slot", func(t *testing.T) { + c1 := client.B().Get().Key("K1{a}").Cache() + c2 := client.B().Get().Key("K2{a}").Cache() + resps := client.DoMultiCache(context.Background(), CT(c1, time.Second), CT(c2, time.Second)) + if v, err := resps[0].ToString(); err != nil || v != "GET K1{a}" { + t.Fatalf("unexpected response %v %v", v, err) + } + if v, err := resps[1].ToString(); err != nil || v != "GET K2{a}" { + t.Fatalf("unexpected response %v %v", v, err) + } + }) + + t.Run("DoMultiCache Multi Slot", func(t *testing.T) { + multi := make([]CacheableTTL, 500) + for i := 0; i < len(multi); i++ { + multi[i] = CT(client.B().Get().Key(fmt.Sprintf("K1{%d}", i)).Cache(), time.Second) + } + resps := client.DoMultiCache(context.Background(), multi...) + for i := 0; i < len(multi); i++ { + if v, err := resps[i].ToString(); err != nil || v != fmt.Sprintf("GET K1{%d}", i) { + t.Fatalf("unexpected response %v %v", v, err) + } + } + }) + + t.Run("Receive", func(t *testing.T) { + c := client.B().Subscribe().Channel("ch").Build() + hdl := func(message PubSubMessage) {} + primaryNodeConn.ReceiveFn = func(ctx context.Context, subscribe Completed, fn func(message PubSubMessage)) error { + if !reflect.DeepEqual(subscribe.Commands(), c.Commands()) { + t.Fatalf("unexpected command %v", subscribe) + } + return nil + } + replicaNodeConn.ReceiveFn = func(ctx context.Context, subscribe Completed, fn func(message PubSubMessage)) error { + if !reflect.DeepEqual(subscribe.Commands(), c.Commands()) { + t.Fatalf("unexpected command %v", subscribe) + } + return nil + } + if err := client.Receive(context.Background(), c, hdl); err != nil { + t.Fatalf("unexpected response %v", err) + } + }) + + t.Run("Receive Redis Err", func(t *testing.T) { + c := client.B().Ssubscribe().Channel("ch").Build() + e := &RedisError{} + primaryNodeConn.ReceiveFn = func(ctx context.Context, subscribe Completed, fn func(message PubSubMessage)) error { + return e + } + replicaNodeConn.ReceiveFn = func(ctx context.Context, subscribe Completed, fn func(message PubSubMessage)) error { + return e + } + if err := client.Receive(context.Background(), c, func(message PubSubMessage) {}); err != e { + t.Fatalf("unexpected response %v", err) + } + }) + + t.Run("Dedicated Cross Slot Err", func(t *testing.T) { + defer func() { + if err := recover(); err != panicMsgCxSlot { + t.Errorf("Dedicated should panic if cross slots is used") + } + }() + primaryNodeConn.AcquireFn = func() wire { return &mockWire{} } + client.Dedicated(func(c DedicatedClient) error { + c.Do(context.Background(), c.B().Get().Key("a").Build()).Error() + return c.Do(context.Background(), c.B().Get().Key("b").Build()).Error() + }) + }) + + t.Run("Dedicated Cross Slot Err Multi", func(t *testing.T) { + defer func() { + if err := recover(); err != panicMsgCxSlot { + t.Errorf("Dedicated should panic if cross slots is used") + } + }() + primaryNodeConn.AcquireFn = func() wire { + return &mockWire{ + DoMultiFn: func(multi ...Completed) *redisresults { + return &redisresults{s: []RedisResult{ + newResult(RedisMessage{typ: '+', string: "OK"}, nil), + newResult(RedisMessage{typ: '+', string: "OK"}, nil), + newResult(RedisMessage{typ: '*', values: []RedisMessage{{typ: '+', string: "a"}}}, nil), + }} + }, + } + } + client.Dedicated(func(c DedicatedClient) (err error) { + c.DoMulti( + context.Background(), + c.B().Multi().Build(), + c.B().Get().Key("a").Build(), + c.B().Exec().Build(), + ) + c.DoMulti( + context.Background(), + c.B().Multi().Build(), + c.B().Get().Key("b").Build(), + c.B().Exec().Build(), + ) + return nil + }) + }) + + t.Run("Dedicated Multi Cross Slot Err", func(t *testing.T) { + primaryNodeConn.AcquireFn = func() wire { return &mockWire{} } + err := client.Dedicated(func(c DedicatedClient) (err error) { + defer func() { + err = errors.New(recover().(string)) + }() + c.DoMulti( + context.Background(), + c.B().Get().Key("a").Build(), + c.B().Get().Key("b").Build(), + ) + return nil + }) + if err == nil || err.Error() != panicMsgCxSlot { + t.Errorf("Multi should panic if cross slots is used") + } + }) + + t.Run("Dedicated Receive Redis Err", func(t *testing.T) { + e := &RedisError{} + w := &mockWire{ + ReceiveFn: func(ctx context.Context, subscribe Completed, fn func(message PubSubMessage)) error { + return e + }, + } + primaryNodeConn.AcquireFn = func() wire { + return w + } + if err := client.Dedicated(func(c DedicatedClient) error { + return c.Receive(context.Background(), c.B().Subscribe().Channel("a").Build(), func(msg PubSubMessage) {}) + }); err != e { + t.Fatalf("unexpected err %v", err) + } + }) + + t.Run("Dedicated", func(t *testing.T) { + closed := false + w := &mockWire{ + DoFn: func(cmd Completed) RedisResult { + return newResult(RedisMessage{typ: '+', string: "Delegate"}, nil) + }, + DoMultiFn: func(cmd ...Completed) *redisresults { + if len(cmd) == 4 { + return &redisresults{s: []RedisResult{ + newResult(RedisMessage{typ: '+', string: "OK"}, nil), + newResult(RedisMessage{typ: '+', string: "OK"}, nil), + newResult(RedisMessage{typ: '+', string: "OK"}, nil), + newResult(RedisMessage{typ: '*', values: []RedisMessage{ + {typ: '+', string: "Delegate0"}, + {typ: '+', string: "Delegate1"}, + }}, nil), + }} + } + return &redisresults{s: []RedisResult{ + newResult(RedisMessage{typ: '+', string: "Delegate0"}, nil), + newResult(RedisMessage{typ: '+', string: "Delegate1"}, nil), + }} + }, + ReceiveFn: func(ctx context.Context, subscribe Completed, fn func(message PubSubMessage)) error { + return ErrClosing + }, + SetPubSubHooksFn: func(hooks PubSubHooks) <-chan error { + ch := make(chan error, 1) + ch <- ErrClosing + close(ch) + return ch + }, + ErrorFn: func() error { + return ErrClosing + }, + CloseFn: func() { + closed = true + }, + } + primaryNodeConn.AcquireFn = func() wire { + return w + } + stored := false + primaryNodeConn.StoreFn = func(ww wire) { + if ww != w { + t.Fatalf("received unexpected wire %v", ww) + } + stored = true + } + if err := client.Dedicated(func(c DedicatedClient) error { + ch := c.SetPubSubHooks(PubSubHooks{OnMessage: func(m PubSubMessage) {}}) + if v, err := c.Do(context.Background(), c.B().Get().Key("a").Build()).ToString(); err != nil || v != "Delegate" { + t.Fatalf("unexpected response %v %v", v, err) + } + if v := c.DoMulti(context.Background()); len(v) != 0 { + t.Fatalf("received unexpected response %v", v) + } + for i, resp := range c.DoMulti( + context.Background(), + c.B().Info().Build(), + c.B().Info().Build(), + ) { + if v, err := resp.ToString(); err != nil || v != "Delegate"+strconv.Itoa(i) { + t.Fatalf("unexpected response %v %v", v, err) + } + } + for i, resp := range c.DoMulti( + context.Background(), + c.B().Get().Key("a").Build(), + c.B().Get().Key("a").Build(), + ) { + if v, err := resp.ToString(); err != nil || v != "Delegate"+strconv.Itoa(i) { + t.Fatalf("unexpected response %v %v", v, err) + } + } + for i, resp := range c.DoMulti( + context.Background(), + c.B().Multi().Build(), + c.B().Get().Key("a").Build(), + c.B().Get().Key("a").Build(), + c.B().Exec().Build(), + )[3].val.values { + if v, err := resp.ToString(); err != nil || v != "Delegate"+strconv.Itoa(i) { + t.Fatalf("unexpected response %v %v", v, err) + } + } + if err := c.Receive(context.Background(), c.B().Ssubscribe().Channel("a").Build(), func(msg PubSubMessage) {}); err != ErrClosing { + t.Fatalf("unexpected ret %v", err) + } + if err := <-ch; err != ErrClosing { + t.Fatalf("unexpected ret %v", err) + } + if err := <-c.SetPubSubHooks(PubSubHooks{OnMessage: func(m PubSubMessage) {}}); err != ErrClosing { + t.Fatalf("unexpected ret %v", err) + } + c.Close() + return nil + }); err != nil { + t.Fatalf("unexpected err %v", err) + } + if !stored { + t.Fatalf("Dedicated desn't put back the wire") + } + if !closed { + t.Fatalf("Dedicated desn't delegate Close") + } + }) + + t.Run("Dedicate", func(t *testing.T) { + closed := false + w := &mockWire{ + DoFn: func(cmd Completed) RedisResult { + return newResult(RedisMessage{typ: '+', string: "Delegate"}, nil) + }, + DoMultiFn: func(cmd ...Completed) *redisresults { + if len(cmd) == 4 { + return &redisresults{s: []RedisResult{ + newResult(RedisMessage{typ: '+', string: "OK"}, nil), + newResult(RedisMessage{typ: '+', string: "OK"}, nil), + newResult(RedisMessage{typ: '+', string: "OK"}, nil), + newResult(RedisMessage{typ: '*', values: []RedisMessage{ + {typ: '+', string: "Delegate0"}, + {typ: '+', string: "Delegate1"}, + }}, nil), + }} + } + return &redisresults{s: []RedisResult{ + newResult(RedisMessage{typ: '+', string: "Delegate0"}, nil), + newResult(RedisMessage{typ: '+', string: "Delegate1"}, nil), + }} + }, + ReceiveFn: func(ctx context.Context, subscribe Completed, fn func(message PubSubMessage)) error { + return ErrClosing + }, + SetPubSubHooksFn: func(hooks PubSubHooks) <-chan error { + ch := make(chan error, 1) + ch <- ErrClosing + close(ch) + return ch + }, + ErrorFn: func() error { + return ErrClosing + }, + CloseFn: func() { + closed = true + }, + } + primaryNodeConn.AcquireFn = func() wire { + return w + } + stored := false + primaryNodeConn.StoreFn = func(ww wire) { + if ww != w { + t.Fatalf("received unexpected wire %v", ww) + } + stored = true + } + c, cancel := client.Dedicate() + ch := c.SetPubSubHooks(PubSubHooks{OnMessage: func(m PubSubMessage) {}}) + if v, err := c.Do(context.Background(), c.B().Get().Key("a").Build()).ToString(); err != nil || v != "Delegate" { + t.Fatalf("unexpected response %v %v", v, err) + } + if v := c.DoMulti(context.Background()); len(v) != 0 { + t.Fatalf("received unexpected response %v", v) + } + for i, resp := range c.DoMulti( + context.Background(), + c.B().Info().Build(), + c.B().Info().Build(), + ) { + if v, err := resp.ToString(); err != nil || v != "Delegate"+strconv.Itoa(i) { + t.Fatalf("unexpected response %v %v", v, err) + } + } + for i, resp := range c.DoMulti( + context.Background(), + c.B().Get().Key("a").Build(), + c.B().Get().Key("a").Build(), + ) { + if v, err := resp.ToString(); err != nil || v != "Delegate"+strconv.Itoa(i) { + t.Fatalf("unexpected response %v %v", v, err) + } + } + for i, resp := range c.DoMulti( + context.Background(), + c.B().Multi().Build(), + c.B().Get().Key("a").Build(), + c.B().Get().Key("a").Build(), + c.B().Exec().Build(), + )[3].val.values { + if v, err := resp.ToString(); err != nil || v != "Delegate"+strconv.Itoa(i) { + t.Fatalf("unexpected response %v %v", v, err) + } + } + if err := c.Receive(context.Background(), c.B().Ssubscribe().Channel("a").Build(), func(msg PubSubMessage) {}); err != ErrClosing { + t.Fatalf("unexpected ret %v", err) + } + if err := <-ch; err != ErrClosing { + t.Fatalf("unexpected ret %v", err) + } + if err := <-c.SetPubSubHooks(PubSubHooks{OnMessage: func(m PubSubMessage) {}}); err != ErrClosing { + t.Fatalf("unexpected ret %v", err) + } + c.Close() + cancel() + + if !stored { + t.Fatalf("Dedicated desn't put back the wire") + } + if !closed { + t.Fatalf("Dedicated desn't delegate Close") + } + }) +} diff --git a/rueidis.go b/rueidis.go index be1ab979..17103333 100644 --- a/rueidis.go +++ b/rueidis.go @@ -209,15 +209,16 @@ type ClientOption struct { // ClusterOption is the options for the redis cluster client. ClusterOption ClusterOption - // ReaderNodeSelector selects a reader node to send read-only commands to. - // If the function is set, the client will send read-only commands to the selected node. + // ReplicaSelector selects a replica node when `SendToReplicas` returns true. + // If the function is set, the client will send selected command to the replica node. // Returned value is the index of the replica node in the replicas slice. // If the returned value is out of range, the primary node will be selected. - // If replica nodes are empty, the primary node will be selected and - // the function will not be called. + // If primary node does not have any replica, the primary node will be selected + // and function will not be called. + // currently only used for cluster client. // NOTE: This function can't be used with ReplicaOnly option. - // NOTE: This function can't be used with SendToReplicas option. - ReaderNodeSelector func(slot uint16, replicas []ReplicaInfo) int + // NOTE: This function must be used with SendToReplicas function. + ReplicaSelector func(slot uint16, replicas []ReplicaInfo) int } // SentinelOption contains MasterSet, diff --git a/rueidis_test.go b/rueidis_test.go index 6ee4c763..4da05a95 100644 --- a/rueidis_test.go +++ b/rueidis_test.go @@ -157,7 +157,7 @@ func TestNewClusterClientError(t *testing.T) { } }) - t.Run("replica only and reader node selector option conflict", func(t *testing.T) { + t.Run("replica only and replica selector option conflict", func(t *testing.T) { ln, err := net.Listen("tcp", "127.0.0.1:0") if err != nil { t.Fatal(err) @@ -168,7 +168,7 @@ func TestNewClusterClientError(t *testing.T) { client, err := NewClient(ClientOption{ InitAddress: []string{"127.0.0.1:" + port}, ReplicaOnly: true, - ReaderNodeSelector: func(slot uint16, replicas []ReplicaInfo) int { + ReplicaSelector: func(slot uint16, replicas []ReplicaInfo) int { return 0 }, }) @@ -176,12 +176,12 @@ func TestNewClusterClientError(t *testing.T) { t.Errorf("unexpected return %v %v", client, err) } - if !strings.Contains(err.Error(), ErrReplicaOnlyConflictWithReaderNodeSelector.Error()) { + if !strings.Contains(err.Error(), ErrReplicaOnlyConflictWithReplicaSelector.Error()) { t.Errorf("unexpected error %v", err) } }) - t.Run("send to replicas and reader node selector option conflict", func(t *testing.T) { + t.Run("send to replicas should be set when replica selector is set", func(t *testing.T) { ln, err := net.Listen("tcp", "127.0.0.1:0") if err != nil { t.Fatal(err) @@ -191,10 +191,7 @@ func TestNewClusterClientError(t *testing.T) { _, port, _ := net.SplitHostPort(ln.Addr().String()) client, err := NewClient(ClientOption{ InitAddress: []string{"127.0.0.1:" + port}, - SendToReplicas: func(cmd Completed) bool { - return true - }, - ReaderNodeSelector: func(slot uint16, replicas []ReplicaInfo) int { + ReplicaSelector: func(slot uint16, replicas []ReplicaInfo) int { return 0 }, }) @@ -202,7 +199,7 @@ func TestNewClusterClientError(t *testing.T) { t.Errorf("unexpected return %v %v", client, err) } - if !strings.Contains(err.Error(), ErrSendToReplicasConflictWithReaderNodeSelector.Error()) { + if !strings.Contains(err.Error(), ErrSendToReplicasNotSet.Error()) { t.Errorf("unexpected error %v", err) } }) From 90387d2a94e31990bb38b2b5e0d0bf68123db9d7 Mon Sep 17 00:00:00 2001 From: proost Date: Fri, 13 Dec 2024 00:20:40 +0900 Subject: [PATCH 6/8] test: remove duplicated cases --- cluster_test.go | 499 ------------------------------------------------ 1 file changed, 499 deletions(-) diff --git a/cluster_test.go b/cluster_test.go index e21694ca..31389b91 100644 --- a/cluster_test.go +++ b/cluster_test.go @@ -6568,502 +6568,3 @@ func TestClusterClient_SendToOnlyPrimaryNodeWhenPrimaryNodeSelected(t *testing.T } }) } - -//gocyclo:ignore -func TestClusterClient_SendToOnlyReplicaNodeWhenReplicaNodeSelected(t *testing.T) { - defer ShouldNotLeaked(SetupLeakDetection()) - - primaryNodeConn := &mockConn{ - DoOverride: map[string]func(cmd Completed) RedisResult{ - "CLUSTER SLOTS": func(cmd Completed) RedisResult { - return slotsMultiResp - }, - "INFO": func(cmd Completed) RedisResult { - return newResult(RedisMessage{typ: '+', string: "INFO"}, nil) - }, - "GET K1{a}": func(cmd Completed) RedisResult { - return newResult(RedisMessage{typ: '+', string: "GET K1{a}"}, nil) - }, - }, - } - replicaNodeConn := &mockConn{ - DoMultiFn: func(multi ...Completed) *redisresults { - resps := make([]RedisResult, len(multi)) - for i, cmd := range multi { - resps[i] = newResult(RedisMessage{typ: '+', string: strings.Join(cmd.Commands(), " ")}, nil) - } - return &redisresults{s: resps} - }, - DoMultiCacheFn: func(multi ...CacheableTTL) *redisresults { - resps := make([]RedisResult, len(multi)) - for i, cmd := range multi { - resps[i] = newResult(RedisMessage{typ: '+', string: strings.Join(cmd.Cmd.Commands(), " ")}, nil) - } - return &redisresults{s: resps} - }, - DoOverride: map[string]func(cmd Completed) RedisResult{ - "GET Do": func(cmd Completed) RedisResult { - return newResult(RedisMessage{typ: '+', string: "GET Do"}, nil) - }, - "GET K1{a}": func(cmd Completed) RedisResult { - return newResult(RedisMessage{typ: '+', string: "GET K1{a}"}, nil) - }, - "GET K2{a}": func(cmd Completed) RedisResult { - return newResult(RedisMessage{typ: '+', string: "GET K2{a}"}, nil) - }, - }, - DoCacheOverride: map[string]func(cmd Cacheable, ttl time.Duration) RedisResult{ - "GET DoCache": func(cmd Cacheable, ttl time.Duration) RedisResult { - return newResult(RedisMessage{typ: '+', string: "GET DoCache"}, nil) - }, - }, - } - - client, err := newClusterClient( - &ClientOption{ - InitAddress: []string{"127.0.0.1:0"}, - SendToReplicas: func(cmd Completed) bool { - return true - }, - ReplicaSelector: func(slot uint16, replicas []ReplicaInfo) int { - return 0 - }, - }, - func(dst string, opt *ClientOption) conn { - if dst == "127.0.0.1:0" || dst == "127.0.2.1:0" { // primary nodes - return primaryNodeConn - } else { // replica nodes - return replicaNodeConn - } - }, - newRetryer(defaultRetryDelayFn), - ) - if err != nil { - t.Fatalf("unexpected err %v", err) - } - - t.Run("Do with no slot", func(t *testing.T) { - c := client.B().Info().Build() - if v, err := client.Do(context.Background(), c).ToString(); err != nil || v != "INFO" { - t.Fatalf("unexpected response %v %v", v, err) - } - }) - - t.Run("Do", func(t *testing.T) { - c := client.B().Get().Key("Do").Build() - if v, err := client.Do(context.Background(), c).ToString(); err != nil || v != "GET Do" { - t.Fatalf("unexpected response %v %v", v, err) - } - }) - - t.Run("DoMulti Single Slot", func(t *testing.T) { - c1 := client.B().Get().Key("K1{a}").Build() - c2 := client.B().Get().Key("K2{a}").Build() - resps := client.DoMulti(context.Background(), c1, c2) - if v, err := resps[0].ToString(); err != nil || v != "GET K1{a}" { - t.Fatalf("unexpected response %v %v", v, err) - } - if v, err := resps[1].ToString(); err != nil || v != "GET K2{a}" { - t.Fatalf("unexpected response %v %v", v, err) - } - }) - - t.Run("DoMulti Single Slot + Init Slot", func(t *testing.T) { - c1 := client.B().Get().Key("K1{a}").Build() - c2 := client.B().Info().Build() - resps := client.DoMulti(context.Background(), c1, c2) - if v, err := resps[0].ToString(); err != nil || v != "GET K1{a}" { - t.Fatalf("unexpected response %v %v", v, err) - } - if v, err := resps[1].ToString(); err != nil || v != "INFO" { - t.Fatalf("unexpected response %v %v", v, err) - } - }) - - t.Run("DoMulti Cross Slot + Init Slot", func(t *testing.T) { - defer func() { - if err := recover(); err != panicMixCxSlot { - t.Errorf("DoMulti should panic if Cross Slot + Init Slot") - } - }() - c1 := client.B().Get().Key("K1{a}").Build() - c2 := client.B().Get().Key("K1{b}").Build() - c3 := client.B().Info().Build() - client.DoMulti(context.Background(), c1, c2, c3) - }) - - t.Run("DoMulti Multi Slot", func(t *testing.T) { - multi := make([]Completed, 500) - for i := 0; i < len(multi); i++ { - multi[i] = client.B().Get().Key(fmt.Sprintf("K1{%d}", i)).Build() - } - resps := client.DoMulti(context.Background(), multi...) - for i := 0; i < len(multi); i++ { - if v, err := resps[i].ToString(); err != nil || v != fmt.Sprintf("GET K1{%d}", i) { - t.Fatalf("unexpected response %v %v", v, err) - } - } - }) - - t.Run("DoCache", func(t *testing.T) { - c := client.B().Get().Key("DoCache").Cache() - if v, err := client.DoCache(context.Background(), c, 100).ToString(); err != nil || v != "GET DoCache" { - t.Fatalf("unexpected response %v %v", v, err) - } - }) - - t.Run("DoMultiCache Single Slot", func(t *testing.T) { - c1 := client.B().Get().Key("K1{a}").Cache() - c2 := client.B().Get().Key("K2{a}").Cache() - resps := client.DoMultiCache(context.Background(), CT(c1, time.Second), CT(c2, time.Second)) - if v, err := resps[0].ToString(); err != nil || v != "GET K1{a}" { - t.Fatalf("unexpected response %v %v", v, err) - } - if v, err := resps[1].ToString(); err != nil || v != "GET K2{a}" { - t.Fatalf("unexpected response %v %v", v, err) - } - }) - - t.Run("DoMultiCache Multi Slot", func(t *testing.T) { - multi := make([]CacheableTTL, 500) - for i := 0; i < len(multi); i++ { - multi[i] = CT(client.B().Get().Key(fmt.Sprintf("K1{%d}", i)).Cache(), time.Second) - } - resps := client.DoMultiCache(context.Background(), multi...) - for i := 0; i < len(multi); i++ { - if v, err := resps[i].ToString(); err != nil || v != fmt.Sprintf("GET K1{%d}", i) { - t.Fatalf("unexpected response %v %v", v, err) - } - } - }) - - t.Run("Receive", func(t *testing.T) { - c := client.B().Subscribe().Channel("ch").Build() - hdl := func(message PubSubMessage) {} - primaryNodeConn.ReceiveFn = func(ctx context.Context, subscribe Completed, fn func(message PubSubMessage)) error { - if !reflect.DeepEqual(subscribe.Commands(), c.Commands()) { - t.Fatalf("unexpected command %v", subscribe) - } - return nil - } - replicaNodeConn.ReceiveFn = func(ctx context.Context, subscribe Completed, fn func(message PubSubMessage)) error { - if !reflect.DeepEqual(subscribe.Commands(), c.Commands()) { - t.Fatalf("unexpected command %v", subscribe) - } - return nil - } - if err := client.Receive(context.Background(), c, hdl); err != nil { - t.Fatalf("unexpected response %v", err) - } - }) - - t.Run("Receive Redis Err", func(t *testing.T) { - c := client.B().Ssubscribe().Channel("ch").Build() - e := &RedisError{} - primaryNodeConn.ReceiveFn = func(ctx context.Context, subscribe Completed, fn func(message PubSubMessage)) error { - return e - } - replicaNodeConn.ReceiveFn = func(ctx context.Context, subscribe Completed, fn func(message PubSubMessage)) error { - return e - } - if err := client.Receive(context.Background(), c, func(message PubSubMessage) {}); err != e { - t.Fatalf("unexpected response %v", err) - } - }) - - t.Run("Dedicated Cross Slot Err", func(t *testing.T) { - defer func() { - if err := recover(); err != panicMsgCxSlot { - t.Errorf("Dedicated should panic if cross slots is used") - } - }() - primaryNodeConn.AcquireFn = func() wire { return &mockWire{} } - client.Dedicated(func(c DedicatedClient) error { - c.Do(context.Background(), c.B().Get().Key("a").Build()).Error() - return c.Do(context.Background(), c.B().Get().Key("b").Build()).Error() - }) - }) - - t.Run("Dedicated Cross Slot Err Multi", func(t *testing.T) { - defer func() { - if err := recover(); err != panicMsgCxSlot { - t.Errorf("Dedicated should panic if cross slots is used") - } - }() - primaryNodeConn.AcquireFn = func() wire { - return &mockWire{ - DoMultiFn: func(multi ...Completed) *redisresults { - return &redisresults{s: []RedisResult{ - newResult(RedisMessage{typ: '+', string: "OK"}, nil), - newResult(RedisMessage{typ: '+', string: "OK"}, nil), - newResult(RedisMessage{typ: '*', values: []RedisMessage{{typ: '+', string: "a"}}}, nil), - }} - }, - } - } - client.Dedicated(func(c DedicatedClient) (err error) { - c.DoMulti( - context.Background(), - c.B().Multi().Build(), - c.B().Get().Key("a").Build(), - c.B().Exec().Build(), - ) - c.DoMulti( - context.Background(), - c.B().Multi().Build(), - c.B().Get().Key("b").Build(), - c.B().Exec().Build(), - ) - return nil - }) - }) - - t.Run("Dedicated Multi Cross Slot Err", func(t *testing.T) { - primaryNodeConn.AcquireFn = func() wire { return &mockWire{} } - err := client.Dedicated(func(c DedicatedClient) (err error) { - defer func() { - err = errors.New(recover().(string)) - }() - c.DoMulti( - context.Background(), - c.B().Get().Key("a").Build(), - c.B().Get().Key("b").Build(), - ) - return nil - }) - if err == nil || err.Error() != panicMsgCxSlot { - t.Errorf("Multi should panic if cross slots is used") - } - }) - - t.Run("Dedicated Receive Redis Err", func(t *testing.T) { - e := &RedisError{} - w := &mockWire{ - ReceiveFn: func(ctx context.Context, subscribe Completed, fn func(message PubSubMessage)) error { - return e - }, - } - primaryNodeConn.AcquireFn = func() wire { - return w - } - if err := client.Dedicated(func(c DedicatedClient) error { - return c.Receive(context.Background(), c.B().Subscribe().Channel("a").Build(), func(msg PubSubMessage) {}) - }); err != e { - t.Fatalf("unexpected err %v", err) - } - }) - - t.Run("Dedicated", func(t *testing.T) { - closed := false - w := &mockWire{ - DoFn: func(cmd Completed) RedisResult { - return newResult(RedisMessage{typ: '+', string: "Delegate"}, nil) - }, - DoMultiFn: func(cmd ...Completed) *redisresults { - if len(cmd) == 4 { - return &redisresults{s: []RedisResult{ - newResult(RedisMessage{typ: '+', string: "OK"}, nil), - newResult(RedisMessage{typ: '+', string: "OK"}, nil), - newResult(RedisMessage{typ: '+', string: "OK"}, nil), - newResult(RedisMessage{typ: '*', values: []RedisMessage{ - {typ: '+', string: "Delegate0"}, - {typ: '+', string: "Delegate1"}, - }}, nil), - }} - } - return &redisresults{s: []RedisResult{ - newResult(RedisMessage{typ: '+', string: "Delegate0"}, nil), - newResult(RedisMessage{typ: '+', string: "Delegate1"}, nil), - }} - }, - ReceiveFn: func(ctx context.Context, subscribe Completed, fn func(message PubSubMessage)) error { - return ErrClosing - }, - SetPubSubHooksFn: func(hooks PubSubHooks) <-chan error { - ch := make(chan error, 1) - ch <- ErrClosing - close(ch) - return ch - }, - ErrorFn: func() error { - return ErrClosing - }, - CloseFn: func() { - closed = true - }, - } - primaryNodeConn.AcquireFn = func() wire { - return w - } - stored := false - primaryNodeConn.StoreFn = func(ww wire) { - if ww != w { - t.Fatalf("received unexpected wire %v", ww) - } - stored = true - } - if err := client.Dedicated(func(c DedicatedClient) error { - ch := c.SetPubSubHooks(PubSubHooks{OnMessage: func(m PubSubMessage) {}}) - if v, err := c.Do(context.Background(), c.B().Get().Key("a").Build()).ToString(); err != nil || v != "Delegate" { - t.Fatalf("unexpected response %v %v", v, err) - } - if v := c.DoMulti(context.Background()); len(v) != 0 { - t.Fatalf("received unexpected response %v", v) - } - for i, resp := range c.DoMulti( - context.Background(), - c.B().Info().Build(), - c.B().Info().Build(), - ) { - if v, err := resp.ToString(); err != nil || v != "Delegate"+strconv.Itoa(i) { - t.Fatalf("unexpected response %v %v", v, err) - } - } - for i, resp := range c.DoMulti( - context.Background(), - c.B().Get().Key("a").Build(), - c.B().Get().Key("a").Build(), - ) { - if v, err := resp.ToString(); err != nil || v != "Delegate"+strconv.Itoa(i) { - t.Fatalf("unexpected response %v %v", v, err) - } - } - for i, resp := range c.DoMulti( - context.Background(), - c.B().Multi().Build(), - c.B().Get().Key("a").Build(), - c.B().Get().Key("a").Build(), - c.B().Exec().Build(), - )[3].val.values { - if v, err := resp.ToString(); err != nil || v != "Delegate"+strconv.Itoa(i) { - t.Fatalf("unexpected response %v %v", v, err) - } - } - if err := c.Receive(context.Background(), c.B().Ssubscribe().Channel("a").Build(), func(msg PubSubMessage) {}); err != ErrClosing { - t.Fatalf("unexpected ret %v", err) - } - if err := <-ch; err != ErrClosing { - t.Fatalf("unexpected ret %v", err) - } - if err := <-c.SetPubSubHooks(PubSubHooks{OnMessage: func(m PubSubMessage) {}}); err != ErrClosing { - t.Fatalf("unexpected ret %v", err) - } - c.Close() - return nil - }); err != nil { - t.Fatalf("unexpected err %v", err) - } - if !stored { - t.Fatalf("Dedicated desn't put back the wire") - } - if !closed { - t.Fatalf("Dedicated desn't delegate Close") - } - }) - - t.Run("Dedicate", func(t *testing.T) { - closed := false - w := &mockWire{ - DoFn: func(cmd Completed) RedisResult { - return newResult(RedisMessage{typ: '+', string: "Delegate"}, nil) - }, - DoMultiFn: func(cmd ...Completed) *redisresults { - if len(cmd) == 4 { - return &redisresults{s: []RedisResult{ - newResult(RedisMessage{typ: '+', string: "OK"}, nil), - newResult(RedisMessage{typ: '+', string: "OK"}, nil), - newResult(RedisMessage{typ: '+', string: "OK"}, nil), - newResult(RedisMessage{typ: '*', values: []RedisMessage{ - {typ: '+', string: "Delegate0"}, - {typ: '+', string: "Delegate1"}, - }}, nil), - }} - } - return &redisresults{s: []RedisResult{ - newResult(RedisMessage{typ: '+', string: "Delegate0"}, nil), - newResult(RedisMessage{typ: '+', string: "Delegate1"}, nil), - }} - }, - ReceiveFn: func(ctx context.Context, subscribe Completed, fn func(message PubSubMessage)) error { - return ErrClosing - }, - SetPubSubHooksFn: func(hooks PubSubHooks) <-chan error { - ch := make(chan error, 1) - ch <- ErrClosing - close(ch) - return ch - }, - ErrorFn: func() error { - return ErrClosing - }, - CloseFn: func() { - closed = true - }, - } - primaryNodeConn.AcquireFn = func() wire { - return w - } - stored := false - primaryNodeConn.StoreFn = func(ww wire) { - if ww != w { - t.Fatalf("received unexpected wire %v", ww) - } - stored = true - } - c, cancel := client.Dedicate() - ch := c.SetPubSubHooks(PubSubHooks{OnMessage: func(m PubSubMessage) {}}) - if v, err := c.Do(context.Background(), c.B().Get().Key("a").Build()).ToString(); err != nil || v != "Delegate" { - t.Fatalf("unexpected response %v %v", v, err) - } - if v := c.DoMulti(context.Background()); len(v) != 0 { - t.Fatalf("received unexpected response %v", v) - } - for i, resp := range c.DoMulti( - context.Background(), - c.B().Info().Build(), - c.B().Info().Build(), - ) { - if v, err := resp.ToString(); err != nil || v != "Delegate"+strconv.Itoa(i) { - t.Fatalf("unexpected response %v %v", v, err) - } - } - for i, resp := range c.DoMulti( - context.Background(), - c.B().Get().Key("a").Build(), - c.B().Get().Key("a").Build(), - ) { - if v, err := resp.ToString(); err != nil || v != "Delegate"+strconv.Itoa(i) { - t.Fatalf("unexpected response %v %v", v, err) - } - } - for i, resp := range c.DoMulti( - context.Background(), - c.B().Multi().Build(), - c.B().Get().Key("a").Build(), - c.B().Get().Key("a").Build(), - c.B().Exec().Build(), - )[3].val.values { - if v, err := resp.ToString(); err != nil || v != "Delegate"+strconv.Itoa(i) { - t.Fatalf("unexpected response %v %v", v, err) - } - } - if err := c.Receive(context.Background(), c.B().Ssubscribe().Channel("a").Build(), func(msg PubSubMessage) {}); err != ErrClosing { - t.Fatalf("unexpected ret %v", err) - } - if err := <-ch; err != ErrClosing { - t.Fatalf("unexpected ret %v", err) - } - if err := <-c.SetPubSubHooks(PubSubHooks{OnMessage: func(m PubSubMessage) {}}); err != ErrClosing { - t.Fatalf("unexpected ret %v", err) - } - c.Close() - cancel() - - if !stored { - t.Fatalf("Dedicated desn't put back the wire") - } - if !closed { - t.Fatalf("Dedicated desn't delegate Close") - } - }) -} From 91bbcf3d1555cf96061a922c5401256704eded26 Mon Sep 17 00:00:00 2001 From: proost Date: Sat, 14 Dec 2024 00:04:19 +0900 Subject: [PATCH 7/8] test: remove cases --- cluster_test.go | 18 ------------------ 1 file changed, 18 deletions(-) diff --git a/cluster_test.go b/cluster_test.go index df9f91a8..d99abf14 100644 --- a/cluster_test.go +++ b/cluster_test.go @@ -6636,21 +6636,3 @@ func TestClusterClient_SendToOnlyPrimaryNodeWhenPrimaryNodeSelected(t *testing.T } }) } - -type nodeInfo []ReplicaInfo - -func Test__(t *testing.T) { - nodes := nodeInfo{ - { - Addr: "aaaa", - }, - { - Addr: "bbbb", - }, - { - Addr: "cccc", - }, - } - - _ = []ReplicaInfo(nodes[1:]) -} From 50fad71c72cfbed5b35b6cde3589b75296f83fe6 Mon Sep 17 00:00:00 2001 From: proost Date: Sat, 14 Dec 2024 00:30:49 +0900 Subject: [PATCH 8/8] perf: introduce nodes --- cluster.go | 43 ++++++++++++++++++++----------------------- cluster_test.go | 34 +++++++++++++++++++--------------- rueidis.go | 3 ++- 3 files changed, 41 insertions(+), 39 deletions(-) diff --git a/cluster.go b/cluster.go index d6399d75..a54a89b1 100644 --- a/cluster.go +++ b/cluster.go @@ -210,12 +210,12 @@ func (c *clusterClient) _refresh() (err error) { for master, g := range groups { conns[master] = connrole{conn: c.connFn(master, c.opt)} if c.rOpt != nil { - for _, addr := range g.nodes[1:] { - conns[addr] = connrole{conn: c.connFn(addr, c.rOpt)} + for _, nodeInfo := range g.nodes[1:] { + conns[nodeInfo.Addr] = connrole{conn: c.connFn(nodeInfo.Addr, c.rOpt)} } } else { - for _, addr := range g.nodes[1:] { - conns[addr] = connrole{conn: c.connFn(addr, c.opt)} + for _, nodeInfo := range g.nodes[1:] { + conns[nodeInfo.Addr] = connrole{conn: c.connFn(nodeInfo.Addr, c.opt)} } } } @@ -250,7 +250,7 @@ func (c *clusterClient) _refresh() (err error) { nodesCount := len(g.nodes) for _, slot := range g.slots { for i := slot[0]; i <= slot[1]; i++ { - pslots[i] = conns[g.nodes[1+util.FastRand(nodesCount-1)]].conn + pslots[i] = conns[g.nodes[1+util.FastRand(nodesCount-1)].Addr].conn } } case c.rOpt != nil: @@ -259,18 +259,13 @@ func (c *clusterClient) _refresh() (err error) { } if len(g.nodes) > 1 { n := len(g.nodes) - 1 - replicas := make([]ReplicaInfo, 0, n) - for _, addr := range g.nodes[1:] { - replicas = append(replicas, ReplicaInfo{Addr: addr}) - } - for _, slot := range g.slots { for i := slot[0]; i <= slot[1]; i++ { pslots[i] = conns[master].conn - rIndex := c.opt.ReplicaSelector(uint16(i), replicas) + rIndex := c.opt.ReplicaSelector(uint16(i), g.nodes[1:]) if rIndex >= 0 && rIndex < n { - rslots[i] = conns[g.nodes[1+rIndex]].conn + rslots[i] = conns[g.nodes[1+rIndex].Addr].conn } else { rslots[i] = conns[master].conn } @@ -325,8 +320,10 @@ func (c *clusterClient) nodes() []string { return nodes } +type nodes []ReplicaInfo + type group struct { - nodes []string + nodes nodes slots [][2]int64 } @@ -352,10 +349,10 @@ func parseSlots(slots RedisMessage, defaultAddr string) map[string]group { g, ok := groups[master] if !ok { g.slots = make([][2]int64, 0) - g.nodes = make([]string, 0, len(v.values)-2) + g.nodes = make(nodes, 0, len(v.values)-2) for i := 2; i < len(v.values); i++ { if dst := parseEndpoint(defaultAddr, v.values[i].values[0].string, v.values[i].values[1].integer); dst != "" { - g.nodes = append(g.nodes, dst) + g.nodes = append(g.nodes, ReplicaInfo{Addr: dst}) } } } @@ -373,16 +370,16 @@ func parseShards(shards RedisMessage, defaultAddr string, tls bool) map[string]g m := -1 shard, _ := v.AsMap() slots := shard["slots"].values - nodes := shard["nodes"].values + _nodes := shard["nodes"].values g := group{ - nodes: make([]string, 0, len(nodes)), + nodes: make(nodes, 0, len(_nodes)), slots: make([][2]int64, len(slots)/2), } for i := range g.slots { g.slots[i][0], _ = slots[i*2].AsInt64() g.slots[i][1], _ = slots[i*2+1].AsInt64() } - for _, n := range nodes { + for _, n := range _nodes { dict, _ := n.AsMap() if dict["health"].string != "online" { continue @@ -395,12 +392,12 @@ func parseShards(shards RedisMessage, defaultAddr string, tls bool) map[string]g if dict["role"].string == "master" { m = len(g.nodes) } - g.nodes = append(g.nodes, dst) + g.nodes = append(g.nodes, ReplicaInfo{Addr: dst}) } } if m >= 0 { g.nodes[0], g.nodes[m] = g.nodes[m], g.nodes[0] - groups[g.nodes[0]] = g + groups[g.nodes[0].Addr] = g } } return groups @@ -1106,15 +1103,15 @@ func (c *clusterClient) Dedicate() (DedicatedClient, func()) { func (c *clusterClient) Nodes() map[string]Client { c.mu.RLock() - nodes := make(map[string]Client, len(c.conns)) + _nodes := make(map[string]Client, len(c.conns)) disableCache := c.opt != nil && c.opt.DisableCache for addr, cc := range c.conns { if !cc.hidden { - nodes[addr] = newSingleClientWithConn(cc.conn, c.cmd, c.retry, disableCache, c.retryHandler) + _nodes[addr] = newSingleClientWithConn(cc.conn, c.cmd, c.retry, disableCache, c.retryHandler) } } c.mu.RUnlock() - return nodes + return _nodes } func (c *clusterClient) Close() { diff --git a/cluster_test.go b/cluster_test.go index d99abf14..1620b54c 100644 --- a/cluster_test.go +++ b/cluster_test.go @@ -5048,13 +5048,15 @@ func TestClusterShardsParsing(t *testing.T) { t.Fatalf("unexpected result %v", result) } for _, val := range result { - nodes := val.nodes - sort.Strings(nodes) - if len(nodes) != 3 || - nodes[0] != "127.0.1.1:1" || - nodes[1] != "127.0.2.1:2" || - nodes[2] != "127.0.3.1:3" { - t.Fatalf("unexpected nodes %v", nodes) + _nodes := val.nodes + sort.Slice(_nodes, func(i, j int) bool { + return _nodes[i].Addr < _nodes[j].Addr + }) + if len(_nodes) != 3 || + _nodes[0].Addr != "127.0.1.1:1" || + _nodes[1].Addr != "127.0.2.1:2" || + _nodes[2].Addr != "127.0.3.1:3" { + t.Fatalf("unexpected nodes %v", _nodes) } } @@ -5063,13 +5065,15 @@ func TestClusterShardsParsing(t *testing.T) { t.Fatalf("unexpected result %v", result) } for _, val := range result { - nodes := val.nodes - sort.Strings(nodes) - if len(nodes) != 3 || - nodes[0] != "127.0.1.1:0" || - nodes[1] != "127.0.2.1:0" || - nodes[2] != "127.0.3.1:3" { - t.Fatalf("unexpected nodes %v", nodes) + _nodes := val.nodes + sort.Slice(_nodes, func(i, j int) bool { + return _nodes[i].Addr < _nodes[j].Addr + }) + if len(_nodes) != 3 || + _nodes[0].Addr != "127.0.1.1:0" || + _nodes[1].Addr != "127.0.2.1:0" || + _nodes[2].Addr != "127.0.3.1:3" { + t.Fatalf("unexpected nodes %v", _nodes) } } }) @@ -5080,7 +5084,7 @@ func TestClusterShardsParsing(t *testing.T) { t.Fatalf("unexpected result %v", result) } for master, group := range result { - if len(group.nodes) == 0 || group.nodes[0] != master { + if len(group.nodes) == 0 || group.nodes[0].Addr != master { t.Fatalf("unexpected first node %v", group) } } diff --git a/rueidis.go b/rueidis.go index 17103333..bfe56648 100644 --- a/rueidis.go +++ b/rueidis.go @@ -215,7 +215,8 @@ type ClientOption struct { // If the returned value is out of range, the primary node will be selected. // If primary node does not have any replica, the primary node will be selected // and function will not be called. - // currently only used for cluster client. + // Currently only used for cluster client. + // Each ReplicaInfo must not be modified. // NOTE: This function can't be used with ReplicaOnly option. // NOTE: This function must be used with SendToReplicas function. ReplicaSelector func(slot uint16, replicas []ReplicaInfo) int