Skip to content

Commit

Permalink
fix: send READONLY to replicas when SendToReplicas option is set (#430)
Browse files Browse the repository at this point in the history
* fix: send readonly command

* refactor: rollback

* refactor: rollback

* BREAKING: remove error

* refactor: send readonly command in _refresh

* refactor: use shallow copy

* style: unify style

* refactor: call conn function

* fix: wrong code

* refactor: default to read-only mode

* refactor: change more simpler

* refactor: rollback

* fix: check send replicas
  • Loading branch information
proost authored Dec 25, 2023
1 parent 3ec15df commit b611f0c
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 18 deletions.
42 changes: 28 additions & 14 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,17 +71,18 @@ var retrycachep = util.NewPool(func(capacity int) *retrycache {
})

type clusterClient struct {
pslots [16384]conn
rslots []conn
opt *ClientOption
conns map[string]connrole
connFn connFn
sc call
mu sync.RWMutex
stop uint32
cmd Builder
retry bool
aws bool
pslots [16384]conn
rslots []conn
opt *ClientOption
replicaOpt *ClientOption
conns map[string]connrole
connFn connFn
sc call
mu sync.RWMutex
stop uint32
cmd Builder
retry bool
aws bool
}

// NOTE: connrole and conn must be initialized at the same time
Expand All @@ -93,8 +94,8 @@ type connrole struct {
func newClusterClient(opt *ClientOption, connFn connFn) (client *clusterClient, err error) {
client = &clusterClient{
cmd: cmds.NewBuilder(cmds.InitSlot),
opt: opt,
connFn: connFn,
opt: opt,
conns: make(map[string]connrole),
retry: !opt.DisableRetry,
aws: len(opt.InitAddress) == 1 && strings.Contains(opt.InitAddress[0], "amazonaws.com"),
Expand All @@ -104,6 +105,12 @@ func newClusterClient(opt *ClientOption, connFn connFn) (client *clusterClient,
return nil, ErrReplicaOnlyConflict
}

if opt.SendToReplicas != nil {
replicaOpt := *opt
replicaOpt.ReplicaOnly = true
client.replicaOpt = &replicaOpt
}

client.connFn = func(dst string, opt *ClientOption) conn {
cc := connFn(dst, opt)
cc.SetOnCloseHook(func(err error) {
Expand Down Expand Up @@ -225,7 +232,13 @@ func (c *clusterClient) _refresh() (err error) {
for master, g := range groups {
conns[master] = connrole{conn: c.connFn(master, c.opt), replica: false}
for _, addr := range g.nodes[1:] {
conns[addr] = connrole{conn: c.connFn(addr, c.opt), replica: true}
var cc conn
if c.opt.SendToReplicas != nil {
cc = c.connFn(addr, c.replicaOpt)
} else {
cc = c.connFn(addr, c.opt)
}
conns[addr] = connrole{conn: cc, replica: true}
}
}
// make sure InitAddress always be present
Expand All @@ -241,7 +254,8 @@ func (c *clusterClient) _refresh() (err error) {

c.mu.RLock()
for addr, cc := range c.conns {
if fresh, ok := conns[addr]; ok {
fresh, ok := conns[addr]
if ok && (cc.replica == fresh.replica || c.opt.SendToReplicas == nil) {
conns[addr] = connrole{
conn: cc.conn,
replica: fresh.replica,
Expand Down
26 changes: 22 additions & 4 deletions cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -861,12 +861,21 @@ func TestClusterClientInit(t *testing.T) {
})

t.Run("Refresh cluster which has multi nodes per shard with SendToReplica option", func(t *testing.T) {
m := &mockConn{
primaryNodeConn := &mockConn{
DoFn: func(cmd Completed) RedisResult {
if strings.Join(cmd.Commands(), " ") == "CLUSTER SLOTS" {
return slotsMultiResp
}
return RedisResult{}
return RedisResult{
err: errors.New("unexpected call"),
}
},
}
replicaNodeConn := &mockConn{
DoFn: func(cmd Completed) RedisResult {
return RedisResult{
err: errors.New("unexpected call"),
}
},
}

Expand All @@ -878,8 +887,17 @@ func TestClusterClientInit(t *testing.T) {
},
},
func(dst string, opt *ClientOption) conn {
copiedM := *m
return &copiedM
if dst == "127.0.0.1:0" || dst == "127.0.2.1:0" {
if opt.ReplicaOnly {
t.Fatalf("unexpected replicaOnly option in primary node")
}
return primaryNodeConn
} else {
if !opt.ReplicaOnly {
t.Fatalf("unexpected replicaOnly option in replica node")
}
return replicaNodeConn
}
},
)
if err != nil {
Expand Down

0 comments on commit b611f0c

Please sign in to comment.