Skip to content

Commit

Permalink
Add ReplicaOnly support for cluster client
Browse files Browse the repository at this point in the history
  • Loading branch information
NeoHuang committed Aug 28, 2023
1 parent 92d534f commit 345c26d
Show file tree
Hide file tree
Showing 7 changed files with 269 additions and 15 deletions.
4 changes: 4 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ func newSingleClient(opt *ClientOption, prev conn, connFn connFn) (*singleClient
return nil, ErrNoAddr
}

if opt.ReplicaOnly {
return nil, ErrReplicaOnlyNotSupported
}

conn := connFn(opt.InitAddress[0], opt)
conn.Override(prev)
if err := conn.Dial(); err != nil {
Expand Down
9 changes: 9 additions & 0 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,15 @@ func TestNewSingleClientNoNode(t *testing.T) {
}
}

func TestNewSingleClientReplicaOnlyNotSupported(t *testing.T) {
defer ShouldNotLeaked(SetupLeakDetection())
if _, err := newSingleClient(&ClientOption{ReplicaOnly: true, InitAddress: []string{"localhost"}}, nil, func(dst string, opt *ClientOption) conn {
return nil
}); err != ErrReplicaOnlyNotSupported {
t.Fatalf("unexpected err %v", err)
}
}

func TestNewSingleClientError(t *testing.T) {
v := errors.New("dail err")
if _, err := newSingleClient(&ClientOption{InitAddress: []string{""}}, nil, func(dst string, opt *ClientOption) conn {
Expand Down
8 changes: 6 additions & 2 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package rueidis
import (
"context"
"errors"
"math/rand"
"net"
"runtime"
"strconv"
Expand Down Expand Up @@ -176,7 +177,6 @@ func (c *clusterClient) _refresh() (err error) {

groups := parseSlots(reply, addr)

// TODO support read from replicas
conns := make(map[string]conn, len(groups))
for _, g := range groups {
for _, addr := range g.nodes {
Expand Down Expand Up @@ -204,7 +204,11 @@ func (c *clusterClient) _refresh() (err error) {

slots := [16384]conn{}
for master, g := range groups {
cc := conns[master]
addr := master
if c.opt.ReplicaOnly && len(g.nodes) > 1 {
addr = g.nodes[1+rand.Intn(len(g.nodes)-1)]
}
cc := conns[addr]
for _, slot := range g.slots {
for i := slot[0]; i <= slot[1]; i++ {
slots[i] = cc
Expand Down
59 changes: 59 additions & 0 deletions cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1687,3 +1687,62 @@ func TestClusterClientRetry(t *testing.T) {
return c
})
}

func TestClusterClientReplicaOnly_PickReplica(t *testing.T) {
defer ShouldNotLeaked(SetupLeakDetection())
m := &mockConn{
DoFn: func(cmd Completed) RedisResult {
if strings.Join(cmd.Commands(), " ") == "CLUSTER SLOTS" {
return slotsMultiResp
}
return RedisResult{}
},
}

client, err := newClusterClient(&ClientOption{InitAddress: []string{"127.0.0.1:0"}, ReplicaOnly: true}, func(dst string, opt *ClientOption) conn {
copiedM := *m
return &copiedM
})
if err != nil {
t.Fatalf("unexpected err %v", err)
}
t.Run("replicas should be picked", func(t *testing.T) {
if client.slots[0] != client.conns["127.0.1.1:1"] {
t.Fatalf("unexpected replica node assigned to slot 0")
}
if client.slots[8192] != client.conns["127.0.1.1:1"] {
t.Fatalf("unexpected replica node assigned to slot 8192")
}
if client.slots[8193] != client.conns["127.0.3.1:1"] {
t.Fatalf("unexpected replica node assigned to slot 8193")
}
if client.slots[16383] != client.conns["127.0.3.1:1"] {
t.Fatalf("unexpected replica node assigned to slot 16383")
}
})
}

func TestClusterClientReplicaOnly_PickMasterIfNoReplica(t *testing.T) {
defer ShouldNotLeaked(SetupLeakDetection())
m := &mockConn{
DoFn: func(cmd Completed) RedisResult {
if strings.Join(cmd.Commands(), " ") == "CLUSTER SLOTS" {
return singleSlotResp
}
return RedisResult{}
},
}

client, err := newClusterClient(&ClientOption{InitAddress: []string{"127.0.0.1:0"}, ReplicaOnly: true}, func(dst string, opt *ClientOption) conn {
copiedM := *m
return &copiedM
})
if err != nil {
t.Fatalf("unexpected err %v", err)
}
t.Run("replicas should be picked", func(t *testing.T) {
if client.slots[0] != client.conns["127.0.0.1:0"] {
t.Fatalf("unexpected node assigned to slot 0")
}
})
}
16 changes: 15 additions & 1 deletion pipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,9 @@ func _newPipe(connFn func() (net.Conn, error), option *ClientOption, r2ps bool)
if option.SelectDB != 0 {
init = append(init, []string{"SELECT", strconv.Itoa(option.SelectDB)})
}
if option.ReplicaOnly && option.Sentinel.MasterSet == "" {
init = append(init, []string{"READONLY"})
}
if option.ClientNoTouch {
init = append(init, []string{"CLIENT", "NO-TOUCH", "ON"})
}
Expand Down Expand Up @@ -204,6 +207,10 @@ func _newPipe(connFn func() (net.Conn, error), option *ClientOption, r2ps bool)
err = r.Error()
}
if err != nil {
if init[i][0] == "READONLY" {
// igore READONLY command error
continue
}
if re, ok := err.(*RedisError); ok {
if !r2 && noHello.MatchString(re.string) {
r2 = true
Expand Down Expand Up @@ -247,6 +254,9 @@ func _newPipe(connFn func() (net.Conn, error), option *ClientOption, r2ps bool)
if option.SelectDB != 0 {
init = append(init, []string{"SELECT", strconv.Itoa(option.SelectDB)})
}
if option.ReplicaOnly && option.Sentinel.MasterSet == "" {
init = append(init, []string{"READONLY"})
}
if option.ClientNoTouch {
init = append(init, []string{"CLIENT", "NO-TOUCH", "ON"})
}
Expand All @@ -262,7 +272,11 @@ func _newPipe(connFn func() (net.Conn, error), option *ClientOption, r2ps bool)
if len(init) != 0 {
resp := p.DoMulti(ctx, cmds.NewMultiCompleted(init)...)
defer resultsp.Put(resp)
for _, r := range resp.s[:len(resp.s)-1] { // skip error checking on the last CLIENT SETINFO
for i, r := range resp.s[:len(resp.s)-1] { // skip error checking on the last CLIENT SETINFO
if init[i][0] == "READONLY" {
// igore READONLY command error
continue
}
if err = r.Error(); err != nil {
p.Close()
return nil, err
Expand Down
Loading

0 comments on commit 345c26d

Please sign in to comment.