Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add clutser topology awareness #638

Merged
merged 15 commits into from
Sep 25, 2024
165 changes: 139 additions & 26 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
// ErrNoSlot indicates that there is no redis node owns the key slot.
var ErrNoSlot = errors.New("the slot has no redis node")
var ErrReplicaOnlyConflict = errors.New("ReplicaOnly conflicts with SendToReplicas option")
var ErrInvalidScanInterval = errors.New("scan interval must be greater than or equal to 0")

type clusterClient struct {
pslots [16384]conn
Expand All @@ -31,6 +32,7 @@ type clusterClient struct {
stop uint32
cmd Builder
retry bool
stopCh chan struct{}
}

// NOTE: connrole and conn must be initialized at the same time
Expand All @@ -46,6 +48,7 @@ func newClusterClient(opt *ClientOption, connFn connFn) (*clusterClient, error)
opt: opt,
conns: make(map[string]connrole),
retry: !opt.DisableRetry,
stopCh: make(chan struct{}),
}

if opt.ReplicaOnly && opt.SendToReplicas != nil {
Expand Down Expand Up @@ -74,6 +77,12 @@ func newClusterClient(opt *ClientOption, connFn connFn) (*clusterClient, error)
return client, err
}

if opt.ClusterTopologyRefreshmentOption.ScanInterval > 0 {
go client.runClusterTopologyRefreshment()
} else if opt.ClusterTopologyRefreshmentOption.ScanInterval < 0 {
return nil, ErrInvalidScanInterval
}

return client, nil
}

Expand Down Expand Up @@ -144,6 +153,37 @@ func getClusterSlots(c conn, timeout time.Duration) clusterslots {
}

func (c *clusterClient) _refresh() (err error) {
result, err := c.getClusterTopology()
if err != nil {
return err
}

groups := result.parse(c.opt.TLSConfig != nil)
conns := make(map[string]connrole, len(groups))
for master, g := range groups {
conns[master] = connrole{conn: c.connFn(master, c.opt), replica: false}
for _, addr := range g.nodes[1:] {
if c.rOpt != nil {
conns[addr] = connrole{conn: c.connFn(addr, c.rOpt), replica: true}
} else {
conns[addr] = connrole{conn: c.connFn(addr, c.opt), replica: true}
}
}
}
// make sure InitAddress always be present
for _, addr := range c.opt.InitAddress {
if _, ok := conns[addr]; !ok {
conns[addr] = connrole{
conn: c.connFn(addr, c.opt),
}
}
}

c.updateClusterTopologyCache(groups, conns)
return nil
}

func (c *clusterClient) getClusterTopology() (result clusterslots, err error) {
c.mu.RLock()
results := make(chan clusterslots, len(c.conns))
pending := make([]conn, 0, len(c.conns))
Expand All @@ -152,7 +192,6 @@ func (c *clusterClient) _refresh() (err error) {
}
c.mu.RUnlock()

var result clusterslots
for i := 0; i < cap(results); i++ {
if i&3 == 0 { // batch CLUSTER SLOTS/CLUSTER SHARDS for every 4 connections
for j := i; j < i+4 && j < len(pending); j++ {
Expand All @@ -168,31 +207,16 @@ func (c *clusterClient) _refresh() (err error) {
}
}
if err != nil {
return err
return
}
pending = nil

groups := result.parse(c.opt.TLSConfig != nil)
conns := make(map[string]connrole, len(groups))
for master, g := range groups {
conns[master] = connrole{conn: c.connFn(master, c.opt), replica: false}
for _, addr := range g.nodes[1:] {
if c.rOpt != nil {
conns[addr] = connrole{conn: c.connFn(addr, c.rOpt), replica: true}
} else {
conns[addr] = connrole{conn: c.connFn(addr, c.opt), replica: true}
}
}
}
// make sure InitAddress always be present
for _, addr := range c.opt.InitAddress {
if _, ok := conns[addr]; !ok {
conns[addr] = connrole{
conn: c.connFn(addr, c.opt),
}
}
}
pending = nil
return result, nil
}

func (c *clusterClient) updateClusterTopologyCache(
groups map[string]group, conns map[string]connrole,
) {
var removes []conn

c.mu.RLock()
Expand Down Expand Up @@ -262,8 +286,6 @@ func (c *clusterClient) _refresh() (err error) {
}
}(removes)
}

return nil
}

func (c *clusterClient) single() (conn conn) {
Expand Down Expand Up @@ -358,6 +380,94 @@ func parseShards(shards RedisMessage, defaultAddr string, tls bool) map[string]g
return groups
}

func (c *clusterClient) runClusterTopologyRefreshment() {
ticker := time.NewTicker(c.opt.ClusterTopologyRefreshmentOption.ScanInterval)
defer ticker.Stop()
for {
select {
case <-c.stopCh:
return
case <-ticker.C:
c.conditionalRefresh()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, now there is no duplicated getClusterTopology at the end. However, I still want all the refresh processes, including the new conditional one, to be gated by the lazyRefresh function to ensure at most one getClusterTopology per second and not to flood the servers. lazyRefresh serves the same purpose as your initial cool-down period proposal. Can we merge the _refresh and conditionalRefresh and then just use lazyRefresh here?

Suggested change
c.conditionalRefresh()
c.lazyRefresh()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

826de2c

just call lazyRefresh

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I am ok with this one, but why do you delete your detection logic?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, i thought you want to use original one. i changed 9fd1da9

Copy link
Contributor Author

@proost proost Sep 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fa0b58a

We should not merge _refresh and conditionalRefresh.

_refresh is called,

  1. In a constructor. This case must init all conns, pslots, rslots. so should be all logic must be called.
  2. there is no connection for slot. This case should refresh.
  3. When got error from redis cluster. I think this case forced refreshness is reasonable.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would 1 be an issue if we made _refresh conditionally? I thought in that case the checking logic would be bypassed.

For 2 and 3, I don't get your points of not making _refresh conditionally. They are not related to the refresh process but are more related to how we act on the result of it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}
}
}

func (c *clusterClient) conditionalRefresh() {
result, err := c.getClusterTopology()
if err != nil {
c.lazyRefresh()
return
}

groups := result.parse(c.opt.TLSConfig != nil)

// we need to check if the new topology is different from the current one.
// so we don't need to early re-create the connections.
conns := make(map[string]connrole, len(groups))
for master, g := range groups {
conns[master] = connrole{replica: false}
for _, addr := range g.nodes[1:] {
if c.rOpt != nil {
conns[addr] = connrole{replica: true}
} else {
conns[addr] = connrole{replica: true}
}
}
}
// make sure InitAddress always be present
for _, addr := range c.opt.InitAddress {
if _, ok := conns[addr]; !ok {
conns[addr] = connrole{}
}
}

isChanged := false
c.mu.RLock()
// check if the new topology is different from the current one
for addr, cc := range conns {
old, ok := c.conns[addr]
if !ok || old.replica != cc.replica {
isChanged = true
break
}
}

// check if the current topology is different from the new one
if !isChanged {
for addr := range c.conns {
if _, ok := conns[addr]; !ok {
isChanged = true
break
}
}
}
c.mu.RUnlock()

if !isChanged {
return
}

for addr, cc := range conns {
if cc.replica {
if c.rOpt != nil {
cc.conn = c.connFn(addr, c.rOpt)
} else {
cc.conn = c.connFn(addr, c.opt)
}
} else {
cc.conn = c.connFn(addr, c.opt)
}

conns[addr] = connrole{
conn: cc.conn,
replica: cc.replica,
}
}

c.updateClusterTopologyCache(groups, conns)
}

func (c *clusterClient) _pick(slot uint16, toReplica bool) (p conn) {
c.mu.RLock()
if slot == cmds.InitSlot {
Expand Down Expand Up @@ -1018,7 +1128,10 @@ func (c *clusterClient) Nodes() map[string]Client {
}

func (c *clusterClient) Close() {
atomic.StoreUint32(&c.stop, 1)
if atomic.CompareAndSwapUint32(&c.stop, 0, 1) {
close(c.stopCh)
}

c.mu.RLock()
for _, cc := range c.conns {
go cc.conn.Close()
Expand Down
Loading
Loading