Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: send readonly command #430

Merged
merged 13 commits into from
Dec 25, 2023
22 changes: 13 additions & 9 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (

// ErrNoSlot indicates that there is no redis node owns the key slot.
var ErrNoSlot = errors.New("the slot has no redis node")
var ErrReplicaOnlyConflict = errors.New("ReplicaOnly conflicts with SendToReplicas option")

type retry struct {
cIndexes []int
Expand Down Expand Up @@ -93,17 +92,13 @@ type connrole struct {
func newClusterClient(opt *ClientOption, connFn connFn) (client *clusterClient, err error) {
client = &clusterClient{
cmd: cmds.NewBuilder(cmds.InitSlot),
opt: opt,
connFn: connFn,
opt: opt,
conns: make(map[string]connrole),
retry: !opt.DisableRetry,
aws: len(opt.InitAddress) == 1 && strings.Contains(opt.InitAddress[0], "amazonaws.com"),
}

if opt.ReplicaOnly && opt.SendToReplicas != nil {
return nil, ErrReplicaOnlyConflict
}

client.connFn = func(dst string, opt *ClientOption) conn {
cc := connFn(dst, opt)
cc.SetOnCloseHook(func(err error) {
Expand Down Expand Up @@ -225,7 +220,15 @@ func (c *clusterClient) _refresh() (err error) {
for master, g := range groups {
conns[master] = connrole{conn: c.connFn(master, c.opt), replica: false}
for _, addr := range g.nodes[1:] {
conns[addr] = connrole{conn: c.connFn(addr, c.opt), replica: true}
var cc conn
if c.opt.ReplicaOnly {
opt := *c.opt
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am still a little confused. So now, are we requiring users to set ReplicaOnly as well even if they use SendToReplicas?

I thought we would only set ReplicaOnly transparently for users here to the opt.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix it. 3ec3548

when SendToReplicas is configured and ReplicaOnly is true,

  • connection to master: do not send "READONLY"
  • connection to replica: send "READONLY"

when SendToReplicas is configured and ReplicaOnly is false,

  • connection to master: do not send "READONLY"
  • connection to replica: do not send "READONLY"

when SendToReplicas is not configured and ReplicaOnly is true,

  • connection to master: send "READONLY"

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we don't send "READONLY" to replicas when SendToReplicas is configured and ReplicaOnly is false?

Isn't it the original issue you want to fix?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cd6a674

Yes, it doesn't matter HOW. i just want to send "READONLY" command to replica connections automatically. but if you think read-only mode as a default is better than i changed it.

cc = c.connFn(addr, &opt)
} else {
cc = c.connFn(addr, c.opt)
}

conns[addr] = connrole{conn: cc, replica: true}
}
}
// make sure InitAddress always be present
Expand All @@ -241,7 +244,8 @@ func (c *clusterClient) _refresh() (err error) {

c.mu.RLock()
for addr, cc := range c.conns {
if fresh, ok := conns[addr]; ok {
fresh, ok := conns[addr]
if ok && cc.replica == fresh.replica {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if ok && cc.replica == fresh.replica {
if ok && (cc.replica == fresh.replica || c.opt.SendToReplicas == nil) {

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can still reuse the conn if c.opt.SendToReplicas is not configured.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @proost, thank you very much! It looks great!

conns[addr] = connrole{
conn: cc.conn,
replica: fresh.replica,
Expand All @@ -256,7 +260,7 @@ func (c *clusterClient) _refresh() (err error) {
var rslots []conn
for master, g := range groups {
switch {
case c.opt.ReplicaOnly && len(g.nodes) > 1:
case c.opt.SendToReplicas == nil && c.opt.ReplicaOnly && len(g.nodes) > 1:
nodesCount := len(g.nodes)
for _, slot := range g.slots {
for i := slot[0]; i <= slot[1]; i++ {
Expand Down
68 changes: 68 additions & 0 deletions cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -911,6 +911,74 @@ func TestClusterClientInit(t *testing.T) {
t.Fatalf("unexpected node assigned to rslot 16383")
}
})

t.Run("Refresh cluster which has multi nodes per shard with SendToReplica, ReplicaOnly options", func(t *testing.T) {
primaryNodeConn := &mockConn{
DoFn: func(cmd Completed) RedisResult {
if strings.Join(cmd.Commands(), " ") == "CLUSTER SLOTS" {
return slotsMultiResp
}
return RedisResult{
err: errors.New("unexpected call"),
}
},
}
replicaNodeConn := &mockConn{
DoFn: func(cmd Completed) RedisResult {
if strings.Join(cmd.Commands(), " ") == "READONLY" {
return newResult(RedisMessage{typ: '+', string: "READONLY"}, nil)
}
return RedisResult{
err: errors.New("unexpected call"),
}
},
}

client, err := newClusterClient(
&ClientOption{
InitAddress: []string{"127.0.0.1:0"},
SendToReplicas: func(cmd Completed) bool {
return true
},
ReplicaOnly: true,
},
func(dst string, opt *ClientOption) conn {
if dst == "127.0.0.1:0" || dst == "127.0.2.1:0" {
return primaryNodeConn
} else {
return replicaNodeConn
}
},
)
if err != nil {
t.Fatalf("unexpected err %v", err)
}

if client.pslots[0] != client.conns["127.0.0.1:0"].conn {
t.Fatalf("unexpected node assigned to pslot 0")
}
if client.pslots[8192] != client.conns["127.0.0.1:0"].conn {
t.Fatalf("unexpected node assigned to pslot 8192")
}
if client.pslots[8193] != client.conns["127.0.2.1:0"].conn {
t.Fatalf("unexpected node assigned to pslot 8193")
}
if client.pslots[16383] != client.conns["127.0.2.1:0"].conn {
t.Fatalf("unexpected node assigned to pslot 16383")
}
if client.rslots[0] != client.conns["127.0.1.1:1"].conn {
t.Fatalf("unexpected node assigned to rslot 0")
}
if client.rslots[8192] != client.conns["127.0.1.1:1"].conn {
t.Fatalf("unexpected node assigned to rslot 8192")
}
if client.rslots[8193] != client.conns["127.0.3.1:1"].conn {
t.Fatalf("unexpected node assigned to rslot 8193")
}
if client.rslots[16383] != client.conns["127.0.3.1:1"].conn {
t.Fatalf("unexpected node assigned to rslot 16383")
}
})
}

//gocyclo:ignore
Expand Down
2 changes: 1 addition & 1 deletion rueidis.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ type ClientOption struct {

// SendToReplicas is a function that returns true if the command should be sent to replicas.
// currently only used for cluster client.
// NOTE: This function can't be used with ReplicaOnly option.
SendToReplicas func(cmd Completed) bool

// Sentinel options, including MasterSet and Auth options
Expand Down Expand Up @@ -161,6 +160,7 @@ type ClientOption struct {
ForceSingleClient bool

// ReplicaOnly indicates that this client will only try to connect to readonly replicas of redis setup.
// If SendToReplicas is set, client will use read-only operations through replicas connection.
ReplicaOnly bool

// ClientNoEvict sets the client eviction mode for the current connection.
Expand Down
25 changes: 0 additions & 25 deletions rueidis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"math/big"
"net"
"os"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -132,30 +131,6 @@ func TestNewClusterClientError(t *testing.T) {
}
<-done
})

t.Run("replica only and send to replicas option conflict", func(t *testing.T) {
ln, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatal(err)
}
defer ln.Close()

_, port, _ := net.SplitHostPort(ln.Addr().String())
client, err := NewClient(ClientOption{
InitAddress: []string{"127.0.0.1:" + port},
ReplicaOnly: true,
SendToReplicas: func(cmd Completed) bool {
return true
},
})
if client != nil || err == nil {
t.Errorf("unexpected return %v %v", client, err)
}

if !strings.Contains(err.Error(), ErrReplicaOnlyConflict.Error()) {
t.Errorf("unexpected error %v", err)
}
})
}

func TestFallBackSingleClient(t *testing.T) {
Expand Down