Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: send readonly command #430

Merged
merged 13 commits into from
Dec 25, 2023
49 changes: 39 additions & 10 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (

// ErrNoSlot indicates that there is no redis node owns the key slot.
var ErrNoSlot = errors.New("the slot has no redis node")
var ErrReplicaOnlyConflict = errors.New("ReplicaOnly conflicts with SendToReplicas option")

type retry struct {
cIndexes []int
Expand Down Expand Up @@ -93,17 +92,13 @@ type connrole struct {
func newClusterClient(opt *ClientOption, connFn connFn) (client *clusterClient, err error) {
client = &clusterClient{
cmd: cmds.NewBuilder(cmds.InitSlot),
opt: opt,
connFn: connFn,
opt: opt,
conns: make(map[string]connrole),
retry: !opt.DisableRetry,
aws: len(opt.InitAddress) == 1 && strings.Contains(opt.InitAddress[0], "amazonaws.com"),
}

if opt.ReplicaOnly && opt.SendToReplicas != nil {
return nil, ErrReplicaOnlyConflict
}

client.connFn = func(dst string, opt *ClientOption) conn {
cc := connFn(dst, opt)
cc.SetOnCloseHook(func(err error) {
Expand Down Expand Up @@ -222,10 +217,42 @@ func (c *clusterClient) _refresh() (err error) {

groups := result.parse(c.opt.TLSConfig != nil)
conns := make(map[string]connrole, len(groups))
var wg sync.WaitGroup
for master, g := range groups {
conns[master] = connrole{conn: c.connFn(master, c.opt), replica: false}
conns[master] = connrole{
conn: c.connFn(master, c.opt),
replica: false,
}
for _, addr := range g.nodes[1:] {
conns[addr] = connrole{conn: c.connFn(addr, c.opt), replica: true}
cc := c.connFn(addr, c.opt)

if !c.opt.ReplicaOnly {
conns[addr] = connrole{
conn: cc,
replica: true,
}
continue
}

wg.Add(1)
go func(cc conn) {
defer wg.Done()

timeout := c.opt.Dialer.Timeout
if timeout <= 0 {
timeout = DefaultDialTimeout
}

ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

cc.Do(ctx, cmds.NewCompleted([]string{"READONLY"})) // ignore error
}(cc)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @proost,

Thank you for the changes, but this goroutine looks wired to me and I don't expect there will be breaking change by reusing the ReplicaOnly. I am afraid we have some misunderstanding here.

What I previously suggested is that the pipe will send READONLY command already if its ReplicaOnly is true.
So I think we can reuse this behavior by modifying the c.connFn(addr, c.opt) in the refresh function, and then the above goroutine is not necessary.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that the signature of the connFn is func(dst string, opt *ClientOption) conn which accepts a ClientOption pointer, I think we can copy the c.opt to c.optReplica with the ReplicaOnly set to true and then we can use c.connFn(addr, c.optReplica) for replica connections.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

98040d7

@rueian
I'm sorry about serious misunderstanding. I figured out.


conns[addr] = connrole{
conn: cc,
replica: true,
}
}
}
// make sure InitAddress always be present
Expand All @@ -236,12 +263,14 @@ func (c *clusterClient) _refresh() (err error) {
}
}
}
wg.Wait()

var removes []conn

c.mu.RLock()
for addr, cc := range c.conns {
if fresh, ok := conns[addr]; ok {
fresh, ok := conns[addr]
if ok && cc.replica == fresh.replica {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if ok && cc.replica == fresh.replica {
if ok && (cc.replica == fresh.replica || c.opt.SendToReplicas == nil) {

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can still reuse the conn if c.opt.SendToReplicas is not configured.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @proost, thank you very much! It looks great!

conns[addr] = connrole{
conn: cc.conn,
replica: fresh.replica,
Expand All @@ -256,7 +285,7 @@ func (c *clusterClient) _refresh() (err error) {
var rslots []conn
for master, g := range groups {
switch {
case c.opt.ReplicaOnly && len(g.nodes) > 1:
case c.opt.SendToReplicas == nil && c.opt.ReplicaOnly && len(g.nodes) > 1:
nodesCount := len(g.nodes)
for _, slot := range g.slots {
for i := slot[0]; i <= slot[1]; i++ {
Expand Down
68 changes: 68 additions & 0 deletions cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -911,6 +911,74 @@ func TestClusterClientInit(t *testing.T) {
t.Fatalf("unexpected node assigned to rslot 16383")
}
})

t.Run("Refresh cluster which has multi nodes per shard with SendToReplica, ReplicaOnly options", func(t *testing.T) {
primaryNodeConn := &mockConn{
DoFn: func(cmd Completed) RedisResult {
if strings.Join(cmd.Commands(), " ") == "CLUSTER SLOTS" {
return slotsMultiResp
}
return RedisResult{
err: errors.New("unexpected call"),
}
},
}
replicaNodeConn := &mockConn{
DoFn: func(cmd Completed) RedisResult {
if strings.Join(cmd.Commands(), " ") == "READONLY" {
return newResult(RedisMessage{typ: '+', string: "READONLY"}, nil)
}
return RedisResult{
err: errors.New("unexpected call"),
}
},
}

client, err := newClusterClient(
&ClientOption{
InitAddress: []string{"127.0.0.1:0"},
SendToReplicas: func(cmd Completed) bool {
return true
},
ReplicaOnly: true,
},
func(dst string, opt *ClientOption) conn {
if dst == "127.0.0.1:0" || dst == "127.0.2.1:0" {
return primaryNodeConn
} else {
return replicaNodeConn
}
},
)
if err != nil {
t.Fatalf("unexpected err %v", err)
}

if client.pslots[0] != client.conns["127.0.0.1:0"].conn {
t.Fatalf("unexpected node assigned to pslot 0")
}
if client.pslots[8192] != client.conns["127.0.0.1:0"].conn {
t.Fatalf("unexpected node assigned to pslot 8192")
}
if client.pslots[8193] != client.conns["127.0.2.1:0"].conn {
t.Fatalf("unexpected node assigned to pslot 8193")
}
if client.pslots[16383] != client.conns["127.0.2.1:0"].conn {
t.Fatalf("unexpected node assigned to pslot 16383")
}
if client.rslots[0] != client.conns["127.0.1.1:1"].conn {
t.Fatalf("unexpected node assigned to rslot 0")
}
if client.rslots[8192] != client.conns["127.0.1.1:1"].conn {
t.Fatalf("unexpected node assigned to rslot 8192")
}
if client.rslots[8193] != client.conns["127.0.3.1:1"].conn {
t.Fatalf("unexpected node assigned to rslot 8193")
}
if client.rslots[16383] != client.conns["127.0.3.1:1"].conn {
t.Fatalf("unexpected node assigned to rslot 16383")
}
})
}

//gocyclo:ignore
Expand Down
2 changes: 1 addition & 1 deletion rueidis.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ type ClientOption struct {

// SendToReplicas is a function that returns true if the command should be sent to replicas.
// currently only used for cluster client.
// NOTE: This function can't be used with ReplicaOnly option.
SendToReplicas func(cmd Completed) bool

// Sentinel options, including MasterSet and Auth options
Expand Down Expand Up @@ -161,6 +160,7 @@ type ClientOption struct {
ForceSingleClient bool

// ReplicaOnly indicates that this client will only try to connect to readonly replicas of redis setup.
// If SendToReplicas is set, client will use read-only operations through replicas connection.
ReplicaOnly bool

// ClientNoEvict sets the client eviction mode for the current connection.
Expand Down
25 changes: 0 additions & 25 deletions rueidis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"math/big"
"net"
"os"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -132,30 +131,6 @@ func TestNewClusterClientError(t *testing.T) {
}
<-done
})

t.Run("replica only and send to replicas option conflict", func(t *testing.T) {
ln, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatal(err)
}
defer ln.Close()

_, port, _ := net.SplitHostPort(ln.Addr().String())
client, err := NewClient(ClientOption{
InitAddress: []string{"127.0.0.1:" + port},
ReplicaOnly: true,
SendToReplicas: func(cmd Completed) bool {
return true
},
})
if client != nil || err == nil {
t.Errorf("unexpected return %v %v", client, err)
}

if !strings.Contains(err.Error(), ErrReplicaOnlyConflict.Error()) {
t.Errorf("unexpected error %v", err)
}
})
}

func TestFallBackSingleClient(t *testing.T) {
Expand Down