Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: send readonly command #430

Merged
merged 13 commits into from
Dec 25, 2023
42 changes: 28 additions & 14 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,17 +71,18 @@ var retrycachep = util.NewPool(func(capacity int) *retrycache {
})

type clusterClient struct {
pslots [16384]conn
rslots []conn
opt *ClientOption
conns map[string]connrole
connFn connFn
sc call
mu sync.RWMutex
stop uint32
cmd Builder
retry bool
aws bool
pslots [16384]conn
rslots []conn
opt *ClientOption
replicaOpt *ClientOption
conns map[string]connrole
connFn connFn
sc call
mu sync.RWMutex
stop uint32
cmd Builder
retry bool
aws bool
}

// NOTE: connrole and conn must be initialized at the same time
Expand All @@ -93,8 +94,8 @@ type connrole struct {
func newClusterClient(opt *ClientOption, connFn connFn) (client *clusterClient, err error) {
client = &clusterClient{
cmd: cmds.NewBuilder(cmds.InitSlot),
opt: opt,
connFn: connFn,
opt: opt,
conns: make(map[string]connrole),
retry: !opt.DisableRetry,
aws: len(opt.InitAddress) == 1 && strings.Contains(opt.InitAddress[0], "amazonaws.com"),
Expand All @@ -104,6 +105,12 @@ func newClusterClient(opt *ClientOption, connFn connFn) (client *clusterClient,
return nil, ErrReplicaOnlyConflict
}

if opt.SendToReplicas != nil {
replicaOpt := *opt
replicaOpt.ReplicaOnly = true
client.replicaOpt = &replicaOpt
}

client.connFn = func(dst string, opt *ClientOption) conn {
cc := connFn(dst, opt)
cc.SetOnCloseHook(func(err error) {
Expand Down Expand Up @@ -225,7 +232,13 @@ func (c *clusterClient) _refresh() (err error) {
for master, g := range groups {
conns[master] = connrole{conn: c.connFn(master, c.opt), replica: false}
for _, addr := range g.nodes[1:] {
conns[addr] = connrole{conn: c.connFn(addr, c.opt), replica: true}
var cc conn
if c.opt.SendToReplicas != nil {
cc = c.connFn(addr, c.replicaOpt)
} else {
cc = c.connFn(addr, c.opt)
}
conns[addr] = connrole{conn: cc, replica: true}
}
}
// make sure InitAddress always be present
Expand All @@ -241,7 +254,8 @@ func (c *clusterClient) _refresh() (err error) {

c.mu.RLock()
for addr, cc := range c.conns {
if fresh, ok := conns[addr]; ok {
fresh, ok := conns[addr]
if ok && (cc.replica == fresh.replica || c.opt.SendToReplicas == nil) {
conns[addr] = connrole{
conn: cc.conn,
replica: fresh.replica,
Expand Down
26 changes: 22 additions & 4 deletions cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -861,12 +861,21 @@ func TestClusterClientInit(t *testing.T) {
})

t.Run("Refresh cluster which has multi nodes per shard with SendToReplica option", func(t *testing.T) {
m := &mockConn{
primaryNodeConn := &mockConn{
DoFn: func(cmd Completed) RedisResult {
if strings.Join(cmd.Commands(), " ") == "CLUSTER SLOTS" {
return slotsMultiResp
}
return RedisResult{}
return RedisResult{
err: errors.New("unexpected call"),
}
},
}
replicaNodeConn := &mockConn{
DoFn: func(cmd Completed) RedisResult {
return RedisResult{
err: errors.New("unexpected call"),
}
},
}

Expand All @@ -878,8 +887,17 @@ func TestClusterClientInit(t *testing.T) {
},
},
func(dst string, opt *ClientOption) conn {
copiedM := *m
return &copiedM
if dst == "127.0.0.1:0" || dst == "127.0.2.1:0" {
if opt.ReplicaOnly {
t.Fatalf("unexpected replicaOnly option in primary node")
}
return primaryNodeConn
} else {
if !opt.ReplicaOnly {
t.Fatalf("unexpected replicaOnly option in replica node")
}
return replicaNodeConn
}
},
)
if err != nil {
Expand Down