Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add clutser topology awareness #638

Merged
merged 15 commits into from
Sep 25, 2024
27 changes: 26 additions & 1 deletion cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
// ErrNoSlot indicates that there is no redis node owns the key slot.
var ErrNoSlot = errors.New("the slot has no redis node")
var ErrReplicaOnlyConflict = errors.New("ReplicaOnly conflicts with SendToReplicas option")
var ErrInvalidScanInterval = errors.New("scan interval must be greater than or equal to 0")

type clusterClient struct {
pslots [16384]conn
Expand All @@ -31,6 +32,7 @@ type clusterClient struct {
stop uint32
cmd Builder
retry bool
stopCh chan struct{}
}

// NOTE: connrole and conn must be initialized at the same time
Expand All @@ -46,6 +48,7 @@ func newClusterClient(opt *ClientOption, connFn connFn) (*clusterClient, error)
opt: opt,
conns: make(map[string]connrole),
retry: !opt.DisableRetry,
stopCh: make(chan struct{}),
}

if opt.ReplicaOnly && opt.SendToReplicas != nil {
Expand Down Expand Up @@ -74,6 +77,12 @@ func newClusterClient(opt *ClientOption, connFn connFn) (*clusterClient, error)
return client, err
}

if opt.ClusterOption.ScanInterval > 0 {
go client.runClusterTopologyRefreshment()
} else if opt.ClusterOption.ScanInterval < 0 {
return nil, ErrInvalidScanInterval
}

return client, nil
}

Expand Down Expand Up @@ -358,6 +367,19 @@ func parseShards(shards RedisMessage, defaultAddr string, tls bool) map[string]g
return groups
}

func (c *clusterClient) runClusterTopologyRefreshment() {
ticker := time.NewTicker(c.opt.ClusterOption.ScanInterval)
defer ticker.Stop()
for {
select {
case <-c.stopCh:
return
case <-ticker.C:
c.lazyRefresh()
}
}
}

func (c *clusterClient) _pick(slot uint16, toReplica bool) (p conn) {
c.mu.RLock()
if slot == cmds.InitSlot {
Expand Down Expand Up @@ -1018,7 +1040,10 @@ func (c *clusterClient) Nodes() map[string]Client {
}

func (c *clusterClient) Close() {
atomic.StoreUint32(&c.stop, 1)
if atomic.CompareAndSwapUint32(&c.stop, 0, 1) {
close(c.stopCh)
}

c.mu.RLock()
for _, cc := range c.conns {
go cc.conn.Close()
Expand Down
Loading
Loading