From aba2496f7ca498921e3a05c1595b151fe5512ca0 Mon Sep 17 00:00:00 2001 From: proost Date: Wed, 18 Sep 2024 00:54:53 +0900 Subject: [PATCH 01/15] feat: add clutser topology awareness --- cluster.go | 190 ++++++++++++++++++++++---------- cluster_test.go | 282 ++++++++++++++++++++++++++++++++++++++++++++++++ rueidis.go | 11 ++ 3 files changed, 424 insertions(+), 59 deletions(-) diff --git a/cluster.go b/cluster.go index 8f991323..372a097c 100644 --- a/cluster.go +++ b/cluster.go @@ -18,19 +18,21 @@ import ( // ErrNoSlot indicates that there is no redis node owns the key slot. var ErrNoSlot = errors.New("the slot has no redis node") var ErrReplicaOnlyConflict = errors.New("ReplicaOnly conflicts with SendToReplicas option") +var ErrInvalidScanInterval = errors.New("scan interval must be greater than or equal to 0") type clusterClient struct { - pslots [16384]conn - rslots []conn - opt *ClientOption - rOpt *ClientOption - conns map[string]connrole - connFn connFn - sc call - mu sync.RWMutex - stop uint32 - cmd Builder - retry bool + pslots [16384]conn + rslots []conn + opt *ClientOption + rOpt *ClientOption + conns map[string]connrole + connFn connFn + sc call + mu sync.RWMutex + stop uint32 + cmd Builder + retry bool + refreshStop chan struct{} } // NOTE: connrole and conn must be initialized at the same time @@ -41,11 +43,12 @@ type connrole struct { func newClusterClient(opt *ClientOption, connFn connFn) (*clusterClient, error) { client := &clusterClient{ - cmd: cmds.NewBuilder(cmds.InitSlot), - connFn: connFn, - opt: opt, - conns: make(map[string]connrole), - retry: !opt.DisableRetry, + cmd: cmds.NewBuilder(cmds.InitSlot), + connFn: connFn, + opt: opt, + conns: make(map[string]connrole), + retry: !opt.DisableRetry, + refreshStop: make(chan struct{}), } if opt.ReplicaOnly && opt.SendToReplicas != nil { @@ -74,6 +77,12 @@ func newClusterClient(opt *ClientOption, connFn connFn) (*clusterClient, error) return client, err } + if opt.ClusterTopologyRefreshmentOption.ScanInterval > 0 { + go client.runClusterTopologyRefreshment() + } else if opt.ClusterTopologyRefreshmentOption.ScanInterval < 0 { + return nil, ErrInvalidScanInterval + } + return client, nil } @@ -144,54 +153,14 @@ func getClusterSlots(c conn, timeout time.Duration) clusterslots { } func (c *clusterClient) _refresh() (err error) { - c.mu.RLock() - results := make(chan clusterslots, len(c.conns)) - pending := make([]conn, 0, len(c.conns)) - for _, cc := range c.conns { - pending = append(pending, cc.conn) - } - c.mu.RUnlock() - - var result clusterslots - for i := 0; i < cap(results); i++ { - if i&3 == 0 { // batch CLUSTER SLOTS/CLUSTER SHARDS for every 4 connections - for j := i; j < i+4 && j < len(pending); j++ { - go func(c conn, timeout time.Duration) { - results <- getClusterSlots(c, timeout) - }(pending[j], c.opt.ConnWriteTimeout) - } - } - result = <-results - err = result.reply.Error() - if len(result.reply.val.values) != 0 { - break - } - } + result, err := c.getClusterTopology() if err != nil { return err } - pending = nil groups := result.parse(c.opt.TLSConfig != nil) - conns := make(map[string]connrole, len(groups)) - for master, g := range groups { - conns[master] = connrole{conn: c.connFn(master, c.opt), replica: false} - for _, addr := range g.nodes[1:] { - if c.rOpt != nil { - conns[addr] = connrole{conn: c.connFn(addr, c.rOpt), replica: true} - } else { - conns[addr] = connrole{conn: c.connFn(addr, c.opt), replica: true} - } - } - } - // make sure InitAddress always be present - for _, addr := range c.opt.InitAddress { - if _, ok := conns[addr]; !ok { - conns[addr] = connrole{ - conn: c.connFn(addr, c.opt), - } - } - } + + conns := c.newConns(groups) var removes []conn @@ -266,6 +235,60 @@ func (c *clusterClient) _refresh() (err error) { return nil } +func (c *clusterClient) getClusterTopology() (result clusterslots, err error) { + c.mu.RLock() + results := make(chan clusterslots, len(c.conns)) + pending := make([]conn, 0, len(c.conns)) + for _, cc := range c.conns { + pending = append(pending, cc.conn) + } + c.mu.RUnlock() + + for i := 0; i < cap(results); i++ { + if i&3 == 0 { // batch CLUSTER SLOTS/CLUSTER SHARDS for every 4 connections + for j := i; j < i+4 && j < len(pending); j++ { + go func(c conn, timeout time.Duration) { + results <- getClusterSlots(c, timeout) + }(pending[j], c.opt.ConnWriteTimeout) + } + } + result = <-results + err = result.reply.Error() + if len(result.reply.val.values) != 0 { + break + } + } + if err != nil { + return + } + + pending = nil + return result, nil +} + +func (c *clusterClient) newConns(groups map[string]group) map[string]connrole { + conns := make(map[string]connrole, len(groups)) + for master, g := range groups { + conns[master] = connrole{conn: c.connFn(master, c.opt), replica: false} + for _, addr := range g.nodes[1:] { + if c.rOpt != nil { + conns[addr] = connrole{conn: c.connFn(addr, c.rOpt), replica: true} + } else { + conns[addr] = connrole{conn: c.connFn(addr, c.opt), replica: true} + } + } + } + // make sure InitAddress always be present + for _, addr := range c.opt.InitAddress { + if _, ok := conns[addr]; !ok { + conns[addr] = connrole{ + conn: c.connFn(addr, c.opt), + } + } + } + return conns +} + func (c *clusterClient) single() (conn conn) { return c._pick(cmds.InitSlot, false) } @@ -358,6 +381,53 @@ func parseShards(shards RedisMessage, defaultAddr string, tls bool) map[string]g return groups } +func (c *clusterClient) runClusterTopologyRefreshment() { + ticker := time.NewTicker(c.opt.ClusterTopologyRefreshmentOption.ScanInterval) + defer ticker.Stop() + for { + select { + case <-c.refreshStop: + return + case <-ticker.C: + result, err := c.getClusterTopology() + if err != nil { + c.lazyRefresh() + continue + } + + groups := result.parse(c.opt.TLSConfig != nil) + + conns := c.newConns(groups) + + isChanged := false + c.mu.RLock() + // check if the new topology is different from the current one + for addr, cc := range conns { + old, ok := c.conns[addr] + if !ok || old.replica != cc.replica { + isChanged = true + break + } + } + + // check if the current topology is different from the new one + if !isChanged { + for addr := range c.conns { + if _, ok := conns[addr]; !ok { + isChanged = true + break + } + } + } + c.mu.RUnlock() + + if isChanged { + c.lazyRefresh() + } + } + } +} + func (c *clusterClient) _pick(slot uint16, toReplica bool) (p conn) { c.mu.RLock() if slot == cmds.InitSlot { @@ -1018,6 +1088,8 @@ func (c *clusterClient) Nodes() map[string]Client { } func (c *clusterClient) Close() { + close(c.refreshStop) + atomic.StoreUint32(&c.stop, 1) c.mu.RLock() for _, cc := range c.conns { diff --git a/cluster_test.go b/cluster_test.go index 33b54aef..8cc4d880 100644 --- a/cluster_test.go +++ b/cluster_test.go @@ -33,6 +33,23 @@ var slotsResp = newResult(RedisMessage{typ: '*', values: []RedisMessage{ }}, }}, nil) +var slotsRespWithChangedRoll = newResult(RedisMessage{typ: '*', values: []RedisMessage{ + {typ: '*', values: []RedisMessage{ + {typ: ':', integer: 0}, + {typ: ':', integer: 16383}, + {typ: '*', values: []RedisMessage{ // master + {typ: '+', string: "127.0.1.1"}, + {typ: ':', integer: 1}, + {typ: '+', string: ""}, + }}, + {typ: '*', values: []RedisMessage{ // replica + {typ: '+', string: "127.0.0.1"}, + {typ: ':', integer: 0}, + {typ: '+', string: ""}, + }}, + }}, +}}, nil) + var slotsMultiResp = newResult(RedisMessage{typ: '*', values: []RedisMessage{ {typ: '*', values: []RedisMessage{ {typ: ':', integer: 0}, @@ -897,6 +914,27 @@ func TestClusterClientInit(t *testing.T) { t.Fatalf("unexpected node assigned to rslot 16383") } }) + + t.Run("Negative Cluster Topology Scan Interval", func(t *testing.T) { + _, err := newClusterClient( + &ClientOption{ + InitAddress: []string{"127.0.0.1:0"}, + ClusterTopologyRefreshmentOption: ClusterTopologyRefreshmentOption{ + ScanInterval: -1 * time.Millisecond, + }, + }, + func(dst string, opt *ClientOption) conn { + return &mockConn{ + DoFn: func(cmd Completed) RedisResult { + return singleSlotResp + }, + } + }, + ) + if !errors.Is(err, ErrInvalidScanInterval) { + t.Fatalf("unexpected err %v", err) + } + }) } //gocyclo:ignore @@ -1153,6 +1191,12 @@ func TestClusterClient(t *testing.T) { } client.Close() <-called + select { + case _, ok := <-client.refreshStop: + if ok { + t.Fatalf("refreshStop should be closed") + } + } }) t.Run("Dedicated Err", func(t *testing.T) { @@ -4477,3 +4521,241 @@ func TestConnectToNonAvailableCluster(t *testing.T) { } wg.Wait() } + +func TestClusterTopologyRefreshment(t *testing.T) { + defer ShouldNotLeaked(SetupLeakDetection()) + + t.Run("nothing changed", func(t *testing.T) { + var callCount int64 + var client *clusterClient + refreshWaitCh := make(chan struct{}) + client, err := newClusterClient( + &ClientOption{ + InitAddress: []string{"127.0.0.1:0"}, + ClusterTopologyRefreshmentOption: ClusterTopologyRefreshmentOption{ + ScanInterval: 100 * time.Millisecond, + }, + }, + func(dst string, opt *ClientOption) conn { + return &mockConn{ + DoFn: func(cmd Completed) RedisResult { + // initial call + if atomic.CompareAndSwapInt64(&callCount, 0, 1) { + return singleSlotResp + } + + // call in refreshment scan + if atomic.CompareAndSwapInt64(&callCount, 1, 2) { + close(client.refreshStop) // stop refreshment + go func() { + time.Sleep(100 * time.Millisecond) // verify that refreshment is stopped + close(refreshWaitCh) + }() + return singleSlotResp + } + + t.Fatalf("unexpected call") + return singleSlotResp + }, + } + }, + ) + if err != nil { + t.Fatalf("unexpected err %v", err) + } + + select { + case <-refreshWaitCh: + case <-time.After(5 * time.Second): + t.Fatal("timeout waiting for refresh") + } + }) + + t.Run("replicas are changed", func(t *testing.T) { + var callCount int64 + var client *clusterClient + refreshWaitCh := make(chan struct{}) + client, err := newClusterClient( + &ClientOption{ + InitAddress: []string{"127.0.0.1:0"}, + ClusterTopologyRefreshmentOption: ClusterTopologyRefreshmentOption{ + ScanInterval: 100 * time.Millisecond, + }, + }, + func(dst string, opt *ClientOption) conn { + return &mockConn{ + DoFn: func(cmd Completed) RedisResult { + // initial call + if atomic.CompareAndSwapInt64(&callCount, 0, 1) { + return singleSlotResp2 + } + + // call in refreshment scan + if atomic.CompareAndSwapInt64(&callCount, 1, 2) { + close(client.refreshStop) // stop refreshment + return slotsResp + } + + // call in _refresh + if atomic.CompareAndSwapInt64(&callCount, 2, 3) { + close(refreshWaitCh) + return slotsResp + } + + return slotsResp + }, + } + }, + ) + if err != nil { + t.Fatalf("unexpected err %v", err) + } + + select { + case <-refreshWaitCh: + case <-time.After(5 * time.Second): + t.Fatal("timeout waiting for refresh") + } + }) + + t.Run("shards are changed", func(t *testing.T) { + var callCount int64 + var client *clusterClient + refreshWaitCh := make(chan struct{}) + client, err := newClusterClient( + &ClientOption{ + InitAddress: []string{"127.0.0.1:0"}, + ClusterTopologyRefreshmentOption: ClusterTopologyRefreshmentOption{ + ScanInterval: 100 * time.Millisecond, + }, + }, + func(dst string, opt *ClientOption) conn { + return &mockConn{ + DoFn: func(cmd Completed) RedisResult { + // initial call + if atomic.CompareAndSwapInt64(&callCount, 0, 1) { + return singleSlotResp + } + + // call in refreshment scan + if atomic.CompareAndSwapInt64(&callCount, 1, 2) { + close(client.refreshStop) // stop refreshment + return slotsMultiRespWithoutReplicas + } + + // call in _refresh + if atomic.CompareAndSwapInt64(&callCount, 2, 3) { + close(refreshWaitCh) + return slotsMultiRespWithoutReplicas + } + + return slotsMultiRespWithoutReplicas + }, + } + }, + ) + if err != nil { + t.Fatalf("unexpected err %v", err) + } + + select { + case <-refreshWaitCh: + case <-time.After(5 * time.Second): + t.Fatal("timeout waiting for refresh") + } + }) + + t.Run("node roll are changed", func(t *testing.T) { + var callCount int64 + var client *clusterClient + refreshWaitCh := make(chan struct{}) + client, err := newClusterClient( + &ClientOption{ + InitAddress: []string{"127.0.0.1:0"}, + ClusterTopologyRefreshmentOption: ClusterTopologyRefreshmentOption{ + ScanInterval: 100 * time.Millisecond, + }, + }, + func(dst string, opt *ClientOption) conn { + return &mockConn{ + DoFn: func(cmd Completed) RedisResult { + // initial call + if atomic.CompareAndSwapInt64(&callCount, 0, 1) { + return slotsResp + } + + // call in refreshment scan + if atomic.CompareAndSwapInt64(&callCount, 1, 2) { + close(client.refreshStop) // stop refreshment + return slotsRespWithChangedRoll + } + + // call in _refresh + if atomic.CompareAndSwapInt64(&callCount, 2, 3) { + close(refreshWaitCh) + return slotsRespWithChangedRoll + } + + return slotsRespWithChangedRoll + }, + } + }, + ) + if err != nil { + t.Fatalf("unexpected err %v", err) + } + + select { + case <-refreshWaitCh: + case <-time.After(190 * time.Second): + t.Fatal("timeout waiting for refresh") + } + }) + + t.Run("failed to get cluster topology", func(t *testing.T) { + var callCount int64 + var client *clusterClient + refreshWaitCh := make(chan struct{}) + client, err := newClusterClient( + &ClientOption{ + InitAddress: []string{"127.0.0.1:0"}, + ClusterTopologyRefreshmentOption: ClusterTopologyRefreshmentOption{ + ScanInterval: 100 * time.Millisecond, + }, + }, + func(dst string, opt *ClientOption) conn { + return &mockConn{ + DoFn: func(cmd Completed) RedisResult { + // initial call + if atomic.CompareAndSwapInt64(&callCount, 0, 1) { + return singleSlotResp + } + + // call in refreshment scan + if atomic.CompareAndSwapInt64(&callCount, 1, 2) { + close(client.refreshStop) // stop refreshment + return newErrResult(errors.New("failed to get cluster topology")) + } + + // call in _refresh + if atomic.CompareAndSwapInt64(&callCount, 2, 3) { + close(refreshWaitCh) + return slotsMultiRespWithoutReplicas + } + + return slotsMultiRespWithoutReplicas + }, + } + }, + ) + if err != nil { + t.Fatalf("unexpected err %v", err) + } + + select { + case <-refreshWaitCh: + case <-time.After(5 * time.Second): + t.Fatal("timeout waiting for refresh") + } + }) +} diff --git a/rueidis.go b/rueidis.go index 77c6c673..41479736 100644 --- a/rueidis.go +++ b/rueidis.go @@ -183,6 +183,9 @@ type ClientOption struct { // the current connection will be excluded from the client eviction process // even if we're above the configured client eviction threshold. ClientNoEvict bool + + // ClusterTopologyRefreshmentOption is the options for the cluster topology refreshment. + ClusterTopologyRefreshmentOption ClusterTopologyRefreshmentOption } // SentinelOption contains MasterSet, @@ -201,6 +204,14 @@ type SentinelOption struct { ClientName string } +// ClusterTopologyRefreshmentOption is the options for the cluster topology refreshment. +// Cluster Refresh happens only when the cluster topology is not up-to-date. +type ClusterTopologyRefreshmentOption struct { + // ScanInterval is the interval to scan the cluster topology. + // If the value is zero, refreshment will be disabled. + ScanInterval time.Duration +} + // Client is the redis client interface for both single redis instance and redis cluster. It should be created from the NewClient() type Client interface { CoreClient From 1181ffbe8041a47911e438c99bb2464e46a6d00e Mon Sep 17 00:00:00 2001 From: proost Date: Wed, 18 Sep 2024 20:03:38 +0900 Subject: [PATCH 02/15] fix: depense closed channel close --- cluster.go | 43 ++++++++++++++++++++++--------------------- cluster_test.go | 14 +++++++------- 2 files changed, 29 insertions(+), 28 deletions(-) diff --git a/cluster.go b/cluster.go index 372a097c..2b6ac1e3 100644 --- a/cluster.go +++ b/cluster.go @@ -21,18 +21,18 @@ var ErrReplicaOnlyConflict = errors.New("ReplicaOnly conflicts with SendToReplic var ErrInvalidScanInterval = errors.New("scan interval must be greater than or equal to 0") type clusterClient struct { - pslots [16384]conn - rslots []conn - opt *ClientOption - rOpt *ClientOption - conns map[string]connrole - connFn connFn - sc call - mu sync.RWMutex - stop uint32 - cmd Builder - retry bool - refreshStop chan struct{} + pslots [16384]conn + rslots []conn + opt *ClientOption + rOpt *ClientOption + conns map[string]connrole + connFn connFn + sc call + mu sync.RWMutex + stop uint32 + cmd Builder + retry bool + stopCh chan struct{} } // NOTE: connrole and conn must be initialized at the same time @@ -43,12 +43,12 @@ type connrole struct { func newClusterClient(opt *ClientOption, connFn connFn) (*clusterClient, error) { client := &clusterClient{ - cmd: cmds.NewBuilder(cmds.InitSlot), - connFn: connFn, - opt: opt, - conns: make(map[string]connrole), - retry: !opt.DisableRetry, - refreshStop: make(chan struct{}), + cmd: cmds.NewBuilder(cmds.InitSlot), + connFn: connFn, + opt: opt, + conns: make(map[string]connrole), + retry: !opt.DisableRetry, + stopCh: make(chan struct{}), } if opt.ReplicaOnly && opt.SendToReplicas != nil { @@ -386,7 +386,7 @@ func (c *clusterClient) runClusterTopologyRefreshment() { defer ticker.Stop() for { select { - case <-c.refreshStop: + case <-c.stopCh: return case <-ticker.C: result, err := c.getClusterTopology() @@ -1088,9 +1088,10 @@ func (c *clusterClient) Nodes() map[string]Client { } func (c *clusterClient) Close() { - close(c.refreshStop) + if atomic.CompareAndSwapUint32(&c.stop, 0, 1) { + close(c.stopCh) + } - atomic.StoreUint32(&c.stop, 1) c.mu.RLock() for _, cc := range c.conns { go cc.conn.Close() diff --git a/cluster_test.go b/cluster_test.go index 8cc4d880..aaa543d9 100644 --- a/cluster_test.go +++ b/cluster_test.go @@ -1192,9 +1192,9 @@ func TestClusterClient(t *testing.T) { client.Close() <-called select { - case _, ok := <-client.refreshStop: + case _, ok := <-client.stopCh: if ok { - t.Fatalf("refreshStop should be closed") + t.Fatalf("stopCh should be closed") } } }) @@ -4546,7 +4546,7 @@ func TestClusterTopologyRefreshment(t *testing.T) { // call in refreshment scan if atomic.CompareAndSwapInt64(&callCount, 1, 2) { - close(client.refreshStop) // stop refreshment + close(client.stopCh) // stop refreshment go func() { time.Sleep(100 * time.Millisecond) // verify that refreshment is stopped close(refreshWaitCh) @@ -4592,7 +4592,7 @@ func TestClusterTopologyRefreshment(t *testing.T) { // call in refreshment scan if atomic.CompareAndSwapInt64(&callCount, 1, 2) { - close(client.refreshStop) // stop refreshment + close(client.stopCh) // stop refreshment return slotsResp } @@ -4639,7 +4639,7 @@ func TestClusterTopologyRefreshment(t *testing.T) { // call in refreshment scan if atomic.CompareAndSwapInt64(&callCount, 1, 2) { - close(client.refreshStop) // stop refreshment + close(client.stopCh) // stop refreshment return slotsMultiRespWithoutReplicas } @@ -4686,7 +4686,7 @@ func TestClusterTopologyRefreshment(t *testing.T) { // call in refreshment scan if atomic.CompareAndSwapInt64(&callCount, 1, 2) { - close(client.refreshStop) // stop refreshment + close(client.stopCh) // stop refreshment return slotsRespWithChangedRoll } @@ -4733,7 +4733,7 @@ func TestClusterTopologyRefreshment(t *testing.T) { // call in refreshment scan if atomic.CompareAndSwapInt64(&callCount, 1, 2) { - close(client.refreshStop) // stop refreshment + close(client.stopCh) // stop refreshment return newErrResult(errors.New("failed to get cluster topology")) } From 027671b8a08e824c58cc35a069c8cf980c1abc49 Mon Sep 17 00:00:00 2001 From: proost Date: Wed, 18 Sep 2024 20:16:09 +0900 Subject: [PATCH 03/15] refactor: make conditional refresh --- cluster.go | 62 +++++++++++++++++++++++++++++------------------------- 1 file changed, 33 insertions(+), 29 deletions(-) diff --git a/cluster.go b/cluster.go index 2b6ac1e3..7dbee024 100644 --- a/cluster.go +++ b/cluster.go @@ -389,43 +389,47 @@ func (c *clusterClient) runClusterTopologyRefreshment() { case <-c.stopCh: return case <-ticker.C: - result, err := c.getClusterTopology() - if err != nil { - c.lazyRefresh() - continue - } + c.conditionalRefresh() + } + } +} - groups := result.parse(c.opt.TLSConfig != nil) +func (c *clusterClient) conditionalRefresh() { + result, err := c.getClusterTopology() + if err != nil { + c.lazyRefresh() + return + } - conns := c.newConns(groups) + groups := result.parse(c.opt.TLSConfig != nil) - isChanged := false - c.mu.RLock() - // check if the new topology is different from the current one - for addr, cc := range conns { - old, ok := c.conns[addr] - if !ok || old.replica != cc.replica { - isChanged = true - break - } - } + conns := c.newConns(groups) - // check if the current topology is different from the new one - if !isChanged { - for addr := range c.conns { - if _, ok := conns[addr]; !ok { - isChanged = true - break - } - } - } - c.mu.RUnlock() + isChanged := false + c.mu.RLock() + // check if the new topology is different from the current one + for addr, cc := range conns { + old, ok := c.conns[addr] + if !ok || old.replica != cc.replica { + isChanged = true + break + } + } - if isChanged { - c.lazyRefresh() + // check if the current topology is different from the new one + if !isChanged { + for addr := range c.conns { + if _, ok := conns[addr]; !ok { + isChanged = true + break } } } + c.mu.RUnlock() + + if isChanged { + c.lazyRefresh() + } } func (c *clusterClient) _pick(slot uint16, toReplica bool) (p conn) { From 931eddffa234b3c8f2d06dcb9a129cb5bedc13ee Mon Sep 17 00:00:00 2001 From: proost Date: Wed, 18 Sep 2024 21:55:00 +0900 Subject: [PATCH 04/15] test: fix broken test --- cluster_test.go | 35 ++++++++++++++++++++--------------- 1 file changed, 20 insertions(+), 15 deletions(-) diff --git a/cluster_test.go b/cluster_test.go index aaa543d9..590b6727 100644 --- a/cluster_test.go +++ b/cluster_test.go @@ -4527,9 +4527,10 @@ func TestClusterTopologyRefreshment(t *testing.T) { t.Run("nothing changed", func(t *testing.T) { var callCount int64 - var client *clusterClient + var cli *clusterClient + var err error refreshWaitCh := make(chan struct{}) - client, err := newClusterClient( + cli, err = newClusterClient( &ClientOption{ InitAddress: []string{"127.0.0.1:0"}, ClusterTopologyRefreshmentOption: ClusterTopologyRefreshmentOption{ @@ -4546,7 +4547,7 @@ func TestClusterTopologyRefreshment(t *testing.T) { // call in refreshment scan if atomic.CompareAndSwapInt64(&callCount, 1, 2) { - close(client.stopCh) // stop refreshment + close(cli.stopCh) // stop refreshment go func() { time.Sleep(100 * time.Millisecond) // verify that refreshment is stopped close(refreshWaitCh) @@ -4573,9 +4574,10 @@ func TestClusterTopologyRefreshment(t *testing.T) { t.Run("replicas are changed", func(t *testing.T) { var callCount int64 - var client *clusterClient + var cli *clusterClient + var err error refreshWaitCh := make(chan struct{}) - client, err := newClusterClient( + cli, err = newClusterClient( &ClientOption{ InitAddress: []string{"127.0.0.1:0"}, ClusterTopologyRefreshmentOption: ClusterTopologyRefreshmentOption{ @@ -4592,7 +4594,7 @@ func TestClusterTopologyRefreshment(t *testing.T) { // call in refreshment scan if atomic.CompareAndSwapInt64(&callCount, 1, 2) { - close(client.stopCh) // stop refreshment + close(cli.stopCh) // stop refreshment return slotsResp } @@ -4620,9 +4622,10 @@ func TestClusterTopologyRefreshment(t *testing.T) { t.Run("shards are changed", func(t *testing.T) { var callCount int64 - var client *clusterClient + var cli *clusterClient + var err error refreshWaitCh := make(chan struct{}) - client, err := newClusterClient( + cli, err = newClusterClient( &ClientOption{ InitAddress: []string{"127.0.0.1:0"}, ClusterTopologyRefreshmentOption: ClusterTopologyRefreshmentOption{ @@ -4639,7 +4642,7 @@ func TestClusterTopologyRefreshment(t *testing.T) { // call in refreshment scan if atomic.CompareAndSwapInt64(&callCount, 1, 2) { - close(client.stopCh) // stop refreshment + close(cli.stopCh) // stop refreshment return slotsMultiRespWithoutReplicas } @@ -4667,9 +4670,10 @@ func TestClusterTopologyRefreshment(t *testing.T) { t.Run("node roll are changed", func(t *testing.T) { var callCount int64 - var client *clusterClient + var cli *clusterClient + var err error refreshWaitCh := make(chan struct{}) - client, err := newClusterClient( + cli, err = newClusterClient( &ClientOption{ InitAddress: []string{"127.0.0.1:0"}, ClusterTopologyRefreshmentOption: ClusterTopologyRefreshmentOption{ @@ -4686,7 +4690,7 @@ func TestClusterTopologyRefreshment(t *testing.T) { // call in refreshment scan if atomic.CompareAndSwapInt64(&callCount, 1, 2) { - close(client.stopCh) // stop refreshment + close(cli.stopCh) // stop refreshment return slotsRespWithChangedRoll } @@ -4714,9 +4718,10 @@ func TestClusterTopologyRefreshment(t *testing.T) { t.Run("failed to get cluster topology", func(t *testing.T) { var callCount int64 - var client *clusterClient + var cli *clusterClient + var err error refreshWaitCh := make(chan struct{}) - client, err := newClusterClient( + cli, err = newClusterClient( &ClientOption{ InitAddress: []string{"127.0.0.1:0"}, ClusterTopologyRefreshmentOption: ClusterTopologyRefreshmentOption{ @@ -4733,7 +4738,7 @@ func TestClusterTopologyRefreshment(t *testing.T) { // call in refreshment scan if atomic.CompareAndSwapInt64(&callCount, 1, 2) { - close(client.stopCh) // stop refreshment + close(cli.stopCh) // stop refreshment return newErrResult(errors.New("failed to get cluster topology")) } From e743162a5c6be151ec9403fea5f910fdf1dacc9f Mon Sep 17 00:00:00 2001 From: proost Date: Wed, 18 Sep 2024 23:35:34 +0900 Subject: [PATCH 05/15] test: change flaky test --- cluster_test.go | 165 ++++++++++++++++++++++++++++-------------------- 1 file changed, 96 insertions(+), 69 deletions(-) diff --git a/cluster_test.go b/cluster_test.go index 590b6727..775b8855 100644 --- a/cluster_test.go +++ b/cluster_test.go @@ -4525,16 +4525,13 @@ func TestConnectToNonAvailableCluster(t *testing.T) { func TestClusterTopologyRefreshment(t *testing.T) { defer ShouldNotLeaked(SetupLeakDetection()) - t.Run("nothing changed", func(t *testing.T) { + t.Run("no refreshment", func(t *testing.T) { var callCount int64 - var cli *clusterClient - var err error - refreshWaitCh := make(chan struct{}) - cli, err = newClusterClient( + _, err := newClusterClient( &ClientOption{ InitAddress: []string{"127.0.0.1:0"}, ClusterTopologyRefreshmentOption: ClusterTopologyRefreshmentOption{ - ScanInterval: 100 * time.Millisecond, + ScanInterval: 0, }, }, func(dst string, opt *ClientOption) conn { @@ -4545,16 +4542,6 @@ func TestClusterTopologyRefreshment(t *testing.T) { return singleSlotResp } - // call in refreshment scan - if atomic.CompareAndSwapInt64(&callCount, 1, 2) { - close(cli.stopCh) // stop refreshment - go func() { - time.Sleep(100 * time.Millisecond) // verify that refreshment is stopped - close(refreshWaitCh) - }() - return singleSlotResp - } - t.Fatalf("unexpected call") return singleSlotResp }, @@ -4565,23 +4552,21 @@ func TestClusterTopologyRefreshment(t *testing.T) { t.Fatalf("unexpected err %v", err) } - select { - case <-refreshWaitCh: - case <-time.After(5 * time.Second): - t.Fatal("timeout waiting for refresh") + time.Sleep(2 * time.Second) // verify that no refreshment is called + + if atomic.LoadInt64(&callCount) != 1 { + t.Fatalf("unexpected call count %d", callCount) } }) - t.Run("replicas are changed", func(t *testing.T) { + t.Run("nothing changed", func(t *testing.T) { var callCount int64 - var cli *clusterClient - var err error refreshWaitCh := make(chan struct{}) - cli, err = newClusterClient( + cli, err := newClusterClient( &ClientOption{ InitAddress: []string{"127.0.0.1:0"}, ClusterTopologyRefreshmentOption: ClusterTopologyRefreshmentOption{ - ScanInterval: 100 * time.Millisecond, + ScanInterval: 2 * time.Second, }, }, func(dst string, opt *ClientOption) conn { @@ -4589,22 +4574,22 @@ func TestClusterTopologyRefreshment(t *testing.T) { DoFn: func(cmd Completed) RedisResult { // initial call if atomic.CompareAndSwapInt64(&callCount, 0, 1) { - return singleSlotResp2 + return singleSlotResp } // call in refreshment scan if atomic.CompareAndSwapInt64(&callCount, 1, 2) { - close(cli.stopCh) // stop refreshment - return slotsResp + return singleSlotResp } - // call in _refresh + // call in refreshment scan if atomic.CompareAndSwapInt64(&callCount, 2, 3) { close(refreshWaitCh) - return slotsResp + return singleSlotResp } - return slotsResp + atomic.AddInt64(&callCount, 1) + return singleSlotResp }, } }, @@ -4615,21 +4600,28 @@ func TestClusterTopologyRefreshment(t *testing.T) { select { case <-refreshWaitCh: + cli.Close() + + conns := cli.conns + if len(conns) != 1 { + t.Fatalf("unexpected conns %v", conns) + } + if _, ok := conns["127.0.0.1:0"]; !ok { + t.Fatalf("unexpected conns %v", conns) + } case <-time.After(5 * time.Second): t.Fatal("timeout waiting for refresh") } }) - t.Run("shards are changed", func(t *testing.T) { + t.Run("replicas are changed", func(t *testing.T) { var callCount int64 - var cli *clusterClient - var err error refreshWaitCh := make(chan struct{}) - cli, err = newClusterClient( + cli, err := newClusterClient( &ClientOption{ InitAddress: []string{"127.0.0.1:0"}, ClusterTopologyRefreshmentOption: ClusterTopologyRefreshmentOption{ - ScanInterval: 100 * time.Millisecond, + ScanInterval: 2 * time.Second, }, }, func(dst string, opt *ClientOption) conn { @@ -4642,17 +4634,18 @@ func TestClusterTopologyRefreshment(t *testing.T) { // call in refreshment scan if atomic.CompareAndSwapInt64(&callCount, 1, 2) { - close(cli.stopCh) // stop refreshment - return slotsMultiRespWithoutReplicas + return slotsResp } - // call in _refresh - if atomic.CompareAndSwapInt64(&callCount, 2, 3) { + // call in _refresh or refreshment scan + if atomic.CompareAndSwapInt64(&callCount, 5, 6) { close(refreshWaitCh) - return slotsMultiRespWithoutReplicas + return slotsResp } - return slotsMultiRespWithoutReplicas + // call in _refresh or refreshment scan + atomic.AddInt64(&callCount, 1) + return slotsResp }, } }, @@ -4663,21 +4656,31 @@ func TestClusterTopologyRefreshment(t *testing.T) { select { case <-refreshWaitCh: - case <-time.After(5 * time.Second): + cli.Close() + + conns := cli.conns + if len(conns) != 2 { + t.Fatalf("unexpected conns %v", conns) + } + if _, ok := conns["127.0.0.1:0"]; !ok { + t.Fatalf("unexpected conns %v", conns) + } + if _, ok := conns["127.0.1.1:1"]; !ok { + t.Fatalf("unexpected conns %v", conns) + } + case <-time.After(30 * time.Second): t.Fatal("timeout waiting for refresh") } }) - t.Run("node roll are changed", func(t *testing.T) { + t.Run("shards are changed", func(t *testing.T) { var callCount int64 - var cli *clusterClient - var err error refreshWaitCh := make(chan struct{}) - cli, err = newClusterClient( + cli, err := newClusterClient( &ClientOption{ InitAddress: []string{"127.0.0.1:0"}, ClusterTopologyRefreshmentOption: ClusterTopologyRefreshmentOption{ - ScanInterval: 100 * time.Millisecond, + ScanInterval: 2 * time.Second, }, }, func(dst string, opt *ClientOption) conn { @@ -4685,22 +4688,23 @@ func TestClusterTopologyRefreshment(t *testing.T) { DoFn: func(cmd Completed) RedisResult { // initial call if atomic.CompareAndSwapInt64(&callCount, 0, 1) { - return slotsResp + return singleSlotResp } // call in refreshment scan if atomic.CompareAndSwapInt64(&callCount, 1, 2) { - close(cli.stopCh) // stop refreshment - return slotsRespWithChangedRoll + return slotsMultiRespWithoutReplicas } - // call in _refresh - if atomic.CompareAndSwapInt64(&callCount, 2, 3) { + // call in _refresh or refreshment scan + if atomic.CompareAndSwapInt64(&callCount, 5, 6) { close(refreshWaitCh) - return slotsRespWithChangedRoll + return slotsMultiRespWithoutReplicas } - return slotsRespWithChangedRoll + // call in _refresh or refreshment scan + atomic.AddInt64(&callCount, 1) + return slotsMultiRespWithoutReplicas }, } }, @@ -4711,21 +4715,31 @@ func TestClusterTopologyRefreshment(t *testing.T) { select { case <-refreshWaitCh: - case <-time.After(190 * time.Second): + cli.Close() + + conns := cli.conns + if len(conns) != 2 { + t.Fatalf("unexpected conns %v", conns) + } + if _, ok := conns["127.0.0.1:0"]; !ok { + t.Fatalf("unexpected conns %v", conns) + } + if _, ok := conns["127.0.1.1:0"]; !ok { + t.Fatalf("unexpected conns %v", conns) + } + case <-time.After(30 * time.Second): t.Fatal("timeout waiting for refresh") } }) - t.Run("failed to get cluster topology", func(t *testing.T) { + t.Run("node roll are changed", func(t *testing.T) { var callCount int64 - var cli *clusterClient - var err error refreshWaitCh := make(chan struct{}) - cli, err = newClusterClient( + cli, err := newClusterClient( &ClientOption{ InitAddress: []string{"127.0.0.1:0"}, ClusterTopologyRefreshmentOption: ClusterTopologyRefreshmentOption{ - ScanInterval: 100 * time.Millisecond, + ScanInterval: 2 * time.Second, }, }, func(dst string, opt *ClientOption) conn { @@ -4733,22 +4747,23 @@ func TestClusterTopologyRefreshment(t *testing.T) { DoFn: func(cmd Completed) RedisResult { // initial call if atomic.CompareAndSwapInt64(&callCount, 0, 1) { - return singleSlotResp + return slotsResp } // call in refreshment scan if atomic.CompareAndSwapInt64(&callCount, 1, 2) { - close(cli.stopCh) // stop refreshment - return newErrResult(errors.New("failed to get cluster topology")) + return slotsRespWithChangedRoll } - // call in _refresh - if atomic.CompareAndSwapInt64(&callCount, 2, 3) { + // call in _refresh or refreshment scan + if atomic.CompareAndSwapInt64(&callCount, 5, 6) { close(refreshWaitCh) - return slotsMultiRespWithoutReplicas + return slotsRespWithChangedRoll } - return slotsMultiRespWithoutReplicas + // call in _refresh or refreshment scan + atomic.AddInt64(&callCount, 1) + return slotsRespWithChangedRoll }, } }, @@ -4759,7 +4774,19 @@ func TestClusterTopologyRefreshment(t *testing.T) { select { case <-refreshWaitCh: - case <-time.After(5 * time.Second): + cli.Close() + + conns := cli.conns + if len(conns) != 2 { + t.Fatalf("unexpected conns %v", conns) + } + if conn, ok := conns["127.0.0.1:0"]; !ok || !conn.replica { + t.Fatalf("unexpected conns %v", conns) + } + if conn, ok := conns["127.0.1.1:1"]; !ok || conn.replica { + t.Fatalf("unexpected conns %v", conns) + } + case <-time.After(30 * time.Second): t.Fatal("timeout waiting for refresh") } }) From b075e699c15affca0f3595532edcf9120cf7b0ce Mon Sep 17 00:00:00 2001 From: proost Date: Fri, 20 Sep 2024 01:59:22 +0900 Subject: [PATCH 06/15] perf: use less call --- cluster.go | 156 +++++++++++++++++++++++++++++------------------- cluster_test.go | 43 +++++++------ 2 files changed, 120 insertions(+), 79 deletions(-) diff --git a/cluster.go b/cluster.go index 7dbee024..e3e95ff4 100644 --- a/cluster.go +++ b/cluster.go @@ -159,9 +159,64 @@ func (c *clusterClient) _refresh() (err error) { } groups := result.parse(c.opt.TLSConfig != nil) + conns := make(map[string]connrole, len(groups)) + for master, g := range groups { + conns[master] = connrole{conn: c.connFn(master, c.opt), replica: false} + for _, addr := range g.nodes[1:] { + if c.rOpt != nil { + conns[addr] = connrole{conn: c.connFn(addr, c.rOpt), replica: true} + } else { + conns[addr] = connrole{conn: c.connFn(addr, c.opt), replica: true} + } + } + } + // make sure InitAddress always be present + for _, addr := range c.opt.InitAddress { + if _, ok := conns[addr]; !ok { + conns[addr] = connrole{ + conn: c.connFn(addr, c.opt), + } + } + } - conns := c.newConns(groups) + c.updateClusterTopologyCache(groups, conns) + return nil +} +func (c *clusterClient) getClusterTopology() (result clusterslots, err error) { + c.mu.RLock() + results := make(chan clusterslots, len(c.conns)) + pending := make([]conn, 0, len(c.conns)) + for _, cc := range c.conns { + pending = append(pending, cc.conn) + } + c.mu.RUnlock() + + for i := 0; i < cap(results); i++ { + if i&3 == 0 { // batch CLUSTER SLOTS/CLUSTER SHARDS for every 4 connections + for j := i; j < i+4 && j < len(pending); j++ { + go func(c conn, timeout time.Duration) { + results <- getClusterSlots(c, timeout) + }(pending[j], c.opt.ConnWriteTimeout) + } + } + result = <-results + err = result.reply.Error() + if len(result.reply.val.values) != 0 { + break + } + } + if err != nil { + return + } + + pending = nil + return result, nil +} + +func (c *clusterClient) updateClusterTopologyCache( + groups map[string]group, conns map[string]connrole, +) { var removes []conn c.mu.RLock() @@ -231,62 +286,6 @@ func (c *clusterClient) _refresh() (err error) { } }(removes) } - - return nil -} - -func (c *clusterClient) getClusterTopology() (result clusterslots, err error) { - c.mu.RLock() - results := make(chan clusterslots, len(c.conns)) - pending := make([]conn, 0, len(c.conns)) - for _, cc := range c.conns { - pending = append(pending, cc.conn) - } - c.mu.RUnlock() - - for i := 0; i < cap(results); i++ { - if i&3 == 0 { // batch CLUSTER SLOTS/CLUSTER SHARDS for every 4 connections - for j := i; j < i+4 && j < len(pending); j++ { - go func(c conn, timeout time.Duration) { - results <- getClusterSlots(c, timeout) - }(pending[j], c.opt.ConnWriteTimeout) - } - } - result = <-results - err = result.reply.Error() - if len(result.reply.val.values) != 0 { - break - } - } - if err != nil { - return - } - - pending = nil - return result, nil -} - -func (c *clusterClient) newConns(groups map[string]group) map[string]connrole { - conns := make(map[string]connrole, len(groups)) - for master, g := range groups { - conns[master] = connrole{conn: c.connFn(master, c.opt), replica: false} - for _, addr := range g.nodes[1:] { - if c.rOpt != nil { - conns[addr] = connrole{conn: c.connFn(addr, c.rOpt), replica: true} - } else { - conns[addr] = connrole{conn: c.connFn(addr, c.opt), replica: true} - } - } - } - // make sure InitAddress always be present - for _, addr := range c.opt.InitAddress { - if _, ok := conns[addr]; !ok { - conns[addr] = connrole{ - conn: c.connFn(addr, c.opt), - } - } - } - return conns } func (c *clusterClient) single() (conn conn) { @@ -403,7 +402,25 @@ func (c *clusterClient) conditionalRefresh() { groups := result.parse(c.opt.TLSConfig != nil) - conns := c.newConns(groups) + // we need to check if the new topology is different from the current one. + // so we don't need to early re-create the connections. + conns := make(map[string]connrole, len(groups)) + for master, g := range groups { + conns[master] = connrole{replica: false} + for _, addr := range g.nodes[1:] { + if c.rOpt != nil { + conns[addr] = connrole{replica: true} + } else { + conns[addr] = connrole{replica: true} + } + } + } + // make sure InitAddress always be present + for _, addr := range c.opt.InitAddress { + if _, ok := conns[addr]; !ok { + conns[addr] = connrole{} + } + } isChanged := false c.mu.RLock() @@ -427,9 +444,28 @@ func (c *clusterClient) conditionalRefresh() { } c.mu.RUnlock() - if isChanged { - c.lazyRefresh() + if !isChanged { + return + } + + for addr, cc := range conns { + if cc.replica { + if c.rOpt != nil { + cc.conn = c.connFn(addr, c.rOpt) + } else { + cc.conn = c.connFn(addr, c.opt) + } + } else { + cc.conn = c.connFn(addr, c.opt) + } + + conns[addr] = connrole{ + conn: cc.conn, + replica: cc.replica, + } } + + c.updateClusterTopologyCache(groups, conns) } func (c *clusterClient) _pick(slot uint16, toReplica bool) (p conn) { diff --git a/cluster_test.go b/cluster_test.go index 775b8855..d38b4333 100644 --- a/cluster_test.go +++ b/cluster_test.go @@ -4552,7 +4552,7 @@ func TestClusterTopologyRefreshment(t *testing.T) { t.Fatalf("unexpected err %v", err) } - time.Sleep(2 * time.Second) // verify that no refreshment is called + time.Sleep(3 * time.Second) // verify that no refreshment is called if atomic.LoadInt64(&callCount) != 1 { t.Fatalf("unexpected call count %d", callCount) @@ -4566,7 +4566,7 @@ func TestClusterTopologyRefreshment(t *testing.T) { &ClientOption{ InitAddress: []string{"127.0.0.1:0"}, ClusterTopologyRefreshmentOption: ClusterTopologyRefreshmentOption{ - ScanInterval: 2 * time.Second, + ScanInterval: 500 * time.Millisecond, }, }, func(dst string, opt *ClientOption) conn { @@ -4582,8 +4582,8 @@ func TestClusterTopologyRefreshment(t *testing.T) { return singleSlotResp } - // call in refreshment scan - if atomic.CompareAndSwapInt64(&callCount, 2, 3) { + // call in another refreshment scan + if atomic.CompareAndSwapInt64(&callCount, 5, 6) { close(refreshWaitCh) return singleSlotResp } @@ -4602,14 +4602,16 @@ func TestClusterTopologyRefreshment(t *testing.T) { case <-refreshWaitCh: cli.Close() + cli.mu.Lock() conns := cli.conns + cli.mu.Unlock() if len(conns) != 1 { t.Fatalf("unexpected conns %v", conns) } if _, ok := conns["127.0.0.1:0"]; !ok { t.Fatalf("unexpected conns %v", conns) } - case <-time.After(5 * time.Second): + case <-time.After(10 * time.Second): t.Fatal("timeout waiting for refresh") } }) @@ -4621,7 +4623,7 @@ func TestClusterTopologyRefreshment(t *testing.T) { &ClientOption{ InitAddress: []string{"127.0.0.1:0"}, ClusterTopologyRefreshmentOption: ClusterTopologyRefreshmentOption{ - ScanInterval: 2 * time.Second, + ScanInterval: 500 * time.Millisecond, }, }, func(dst string, opt *ClientOption) conn { @@ -4637,13 +4639,12 @@ func TestClusterTopologyRefreshment(t *testing.T) { return slotsResp } - // call in _refresh or refreshment scan + // call in another refreshment scan to verify that conns are changed. if atomic.CompareAndSwapInt64(&callCount, 5, 6) { close(refreshWaitCh) return slotsResp } - // call in _refresh or refreshment scan atomic.AddInt64(&callCount, 1) return slotsResp }, @@ -4658,7 +4659,9 @@ func TestClusterTopologyRefreshment(t *testing.T) { case <-refreshWaitCh: cli.Close() + cli.mu.Lock() conns := cli.conns + cli.mu.Unlock() if len(conns) != 2 { t.Fatalf("unexpected conns %v", conns) } @@ -4668,7 +4671,7 @@ func TestClusterTopologyRefreshment(t *testing.T) { if _, ok := conns["127.0.1.1:1"]; !ok { t.Fatalf("unexpected conns %v", conns) } - case <-time.After(30 * time.Second): + case <-time.After(10 * time.Second): t.Fatal("timeout waiting for refresh") } }) @@ -4680,7 +4683,7 @@ func TestClusterTopologyRefreshment(t *testing.T) { &ClientOption{ InitAddress: []string{"127.0.0.1:0"}, ClusterTopologyRefreshmentOption: ClusterTopologyRefreshmentOption{ - ScanInterval: 2 * time.Second, + ScanInterval: 500 * time.Millisecond, }, }, func(dst string, opt *ClientOption) conn { @@ -4696,13 +4699,12 @@ func TestClusterTopologyRefreshment(t *testing.T) { return slotsMultiRespWithoutReplicas } - // call in _refresh or refreshment scan + // call in another refreshment scan to verify that conns are changed. if atomic.CompareAndSwapInt64(&callCount, 5, 6) { close(refreshWaitCh) return slotsMultiRespWithoutReplicas } - // call in _refresh or refreshment scan atomic.AddInt64(&callCount, 1) return slotsMultiRespWithoutReplicas }, @@ -4717,7 +4719,9 @@ func TestClusterTopologyRefreshment(t *testing.T) { case <-refreshWaitCh: cli.Close() + cli.mu.Lock() conns := cli.conns + cli.mu.Unlock() if len(conns) != 2 { t.Fatalf("unexpected conns %v", conns) } @@ -4727,7 +4731,7 @@ func TestClusterTopologyRefreshment(t *testing.T) { if _, ok := conns["127.0.1.1:0"]; !ok { t.Fatalf("unexpected conns %v", conns) } - case <-time.After(30 * time.Second): + case <-time.After(10 * time.Second): t.Fatal("timeout waiting for refresh") } }) @@ -4739,7 +4743,7 @@ func TestClusterTopologyRefreshment(t *testing.T) { &ClientOption{ InitAddress: []string{"127.0.0.1:0"}, ClusterTopologyRefreshmentOption: ClusterTopologyRefreshmentOption{ - ScanInterval: 2 * time.Second, + ScanInterval: 500 * time.Millisecond, }, }, func(dst string, opt *ClientOption) conn { @@ -4755,13 +4759,12 @@ func TestClusterTopologyRefreshment(t *testing.T) { return slotsRespWithChangedRoll } - // call in _refresh or refreshment scan + // call in refreshment scan to verify that conns are changed. if atomic.CompareAndSwapInt64(&callCount, 5, 6) { close(refreshWaitCh) return slotsRespWithChangedRoll } - // call in _refresh or refreshment scan atomic.AddInt64(&callCount, 1) return slotsRespWithChangedRoll }, @@ -4776,17 +4779,19 @@ func TestClusterTopologyRefreshment(t *testing.T) { case <-refreshWaitCh: cli.Close() + cli.mu.Lock() conns := cli.conns + cli.mu.Unlock() if len(conns) != 2 { t.Fatalf("unexpected conns %v", conns) } - if conn, ok := conns["127.0.0.1:0"]; !ok || !conn.replica { + if cc, ok := conns["127.0.0.1:0"]; !ok || !cc.replica { t.Fatalf("unexpected conns %v", conns) } - if conn, ok := conns["127.0.1.1:1"]; !ok || conn.replica { + if cc, ok := conns["127.0.1.1:1"]; !ok || cc.replica { t.Fatalf("unexpected conns %v", conns) } - case <-time.After(30 * time.Second): + case <-time.After(10 * time.Second): t.Fatal("timeout waiting for refresh") } }) From 826de2c76a7906b25727ee529ba91d28622d6457 Mon Sep 17 00:00:00 2001 From: proost Date: Fri, 20 Sep 2024 23:25:40 +0900 Subject: [PATCH 07/15] refactor: rollback --- cluster.go | 140 +++++++++--------------------------------------- cluster_test.go | 24 ++++----- 2 files changed, 38 insertions(+), 126 deletions(-) diff --git a/cluster.go b/cluster.go index e3e95ff4..8257a56c 100644 --- a/cluster.go +++ b/cluster.go @@ -153,37 +153,6 @@ func getClusterSlots(c conn, timeout time.Duration) clusterslots { } func (c *clusterClient) _refresh() (err error) { - result, err := c.getClusterTopology() - if err != nil { - return err - } - - groups := result.parse(c.opt.TLSConfig != nil) - conns := make(map[string]connrole, len(groups)) - for master, g := range groups { - conns[master] = connrole{conn: c.connFn(master, c.opt), replica: false} - for _, addr := range g.nodes[1:] { - if c.rOpt != nil { - conns[addr] = connrole{conn: c.connFn(addr, c.rOpt), replica: true} - } else { - conns[addr] = connrole{conn: c.connFn(addr, c.opt), replica: true} - } - } - } - // make sure InitAddress always be present - for _, addr := range c.opt.InitAddress { - if _, ok := conns[addr]; !ok { - conns[addr] = connrole{ - conn: c.connFn(addr, c.opt), - } - } - } - - c.updateClusterTopologyCache(groups, conns) - return nil -} - -func (c *clusterClient) getClusterTopology() (result clusterslots, err error) { c.mu.RLock() results := make(chan clusterslots, len(c.conns)) pending := make([]conn, 0, len(c.conns)) @@ -192,6 +161,7 @@ func (c *clusterClient) getClusterTopology() (result clusterslots, err error) { } c.mu.RUnlock() + var result clusterslots for i := 0; i < cap(results); i++ { if i&3 == 0 { // batch CLUSTER SLOTS/CLUSTER SHARDS for every 4 connections for j := i; j < i+4 && j < len(pending); j++ { @@ -207,16 +177,31 @@ func (c *clusterClient) getClusterTopology() (result clusterslots, err error) { } } if err != nil { - return + return err } - pending = nil - return result, nil -} -func (c *clusterClient) updateClusterTopologyCache( - groups map[string]group, conns map[string]connrole, -) { + groups := result.parse(c.opt.TLSConfig != nil) + conns := make(map[string]connrole, len(groups)) + for master, g := range groups { + conns[master] = connrole{conn: c.connFn(master, c.opt), replica: false} + for _, addr := range g.nodes[1:] { + if c.rOpt != nil { + conns[addr] = connrole{conn: c.connFn(addr, c.rOpt), replica: true} + } else { + conns[addr] = connrole{conn: c.connFn(addr, c.opt), replica: true} + } + } + } + // make sure InitAddress always be present + for _, addr := range c.opt.InitAddress { + if _, ok := conns[addr]; !ok { + conns[addr] = connrole{ + conn: c.connFn(addr, c.opt), + } + } + } + var removes []conn c.mu.RLock() @@ -286,6 +271,8 @@ func (c *clusterClient) updateClusterTopologyCache( } }(removes) } + + return nil } func (c *clusterClient) single() (conn conn) { @@ -388,84 +375,9 @@ func (c *clusterClient) runClusterTopologyRefreshment() { case <-c.stopCh: return case <-ticker.C: - c.conditionalRefresh() - } - } -} - -func (c *clusterClient) conditionalRefresh() { - result, err := c.getClusterTopology() - if err != nil { - c.lazyRefresh() - return - } - - groups := result.parse(c.opt.TLSConfig != nil) - - // we need to check if the new topology is different from the current one. - // so we don't need to early re-create the connections. - conns := make(map[string]connrole, len(groups)) - for master, g := range groups { - conns[master] = connrole{replica: false} - for _, addr := range g.nodes[1:] { - if c.rOpt != nil { - conns[addr] = connrole{replica: true} - } else { - conns[addr] = connrole{replica: true} - } - } - } - // make sure InitAddress always be present - for _, addr := range c.opt.InitAddress { - if _, ok := conns[addr]; !ok { - conns[addr] = connrole{} - } - } - - isChanged := false - c.mu.RLock() - // check if the new topology is different from the current one - for addr, cc := range conns { - old, ok := c.conns[addr] - if !ok || old.replica != cc.replica { - isChanged = true - break - } - } - - // check if the current topology is different from the new one - if !isChanged { - for addr := range c.conns { - if _, ok := conns[addr]; !ok { - isChanged = true - break - } - } - } - c.mu.RUnlock() - - if !isChanged { - return - } - - for addr, cc := range conns { - if cc.replica { - if c.rOpt != nil { - cc.conn = c.connFn(addr, c.rOpt) - } else { - cc.conn = c.connFn(addr, c.opt) - } - } else { - cc.conn = c.connFn(addr, c.opt) - } - - conns[addr] = connrole{ - conn: cc.conn, - replica: cc.replica, + c.lazyRefresh() } } - - c.updateClusterTopologyCache(groups, conns) } func (c *clusterClient) _pick(slot uint16, toReplica bool) (p conn) { diff --git a/cluster_test.go b/cluster_test.go index d38b4333..9a114129 100644 --- a/cluster_test.go +++ b/cluster_test.go @@ -4566,7 +4566,7 @@ func TestClusterTopologyRefreshment(t *testing.T) { &ClientOption{ InitAddress: []string{"127.0.0.1:0"}, ClusterTopologyRefreshmentOption: ClusterTopologyRefreshmentOption{ - ScanInterval: 500 * time.Millisecond, + ScanInterval: 2 * time.Second, }, }, func(dst string, opt *ClientOption) conn { @@ -4583,7 +4583,7 @@ func TestClusterTopologyRefreshment(t *testing.T) { } // call in another refreshment scan - if atomic.CompareAndSwapInt64(&callCount, 5, 6) { + if atomic.CompareAndSwapInt64(&callCount, 4, 5) { close(refreshWaitCh) return singleSlotResp } @@ -4611,7 +4611,7 @@ func TestClusterTopologyRefreshment(t *testing.T) { if _, ok := conns["127.0.0.1:0"]; !ok { t.Fatalf("unexpected conns %v", conns) } - case <-time.After(10 * time.Second): + case <-time.After(30 * time.Second): t.Fatal("timeout waiting for refresh") } }) @@ -4623,7 +4623,7 @@ func TestClusterTopologyRefreshment(t *testing.T) { &ClientOption{ InitAddress: []string{"127.0.0.1:0"}, ClusterTopologyRefreshmentOption: ClusterTopologyRefreshmentOption{ - ScanInterval: 500 * time.Millisecond, + ScanInterval: 2 * time.Second, }, }, func(dst string, opt *ClientOption) conn { @@ -4640,7 +4640,7 @@ func TestClusterTopologyRefreshment(t *testing.T) { } // call in another refreshment scan to verify that conns are changed. - if atomic.CompareAndSwapInt64(&callCount, 5, 6) { + if atomic.CompareAndSwapInt64(&callCount, 4, 5) { close(refreshWaitCh) return slotsResp } @@ -4671,7 +4671,7 @@ func TestClusterTopologyRefreshment(t *testing.T) { if _, ok := conns["127.0.1.1:1"]; !ok { t.Fatalf("unexpected conns %v", conns) } - case <-time.After(10 * time.Second): + case <-time.After(30 * time.Second): t.Fatal("timeout waiting for refresh") } }) @@ -4683,7 +4683,7 @@ func TestClusterTopologyRefreshment(t *testing.T) { &ClientOption{ InitAddress: []string{"127.0.0.1:0"}, ClusterTopologyRefreshmentOption: ClusterTopologyRefreshmentOption{ - ScanInterval: 500 * time.Millisecond, + ScanInterval: 2 * time.Second, }, }, func(dst string, opt *ClientOption) conn { @@ -4700,7 +4700,7 @@ func TestClusterTopologyRefreshment(t *testing.T) { } // call in another refreshment scan to verify that conns are changed. - if atomic.CompareAndSwapInt64(&callCount, 5, 6) { + if atomic.CompareAndSwapInt64(&callCount, 4, 5) { close(refreshWaitCh) return slotsMultiRespWithoutReplicas } @@ -4731,7 +4731,7 @@ func TestClusterTopologyRefreshment(t *testing.T) { if _, ok := conns["127.0.1.1:0"]; !ok { t.Fatalf("unexpected conns %v", conns) } - case <-time.After(10 * time.Second): + case <-time.After(30 * time.Second): t.Fatal("timeout waiting for refresh") } }) @@ -4743,7 +4743,7 @@ func TestClusterTopologyRefreshment(t *testing.T) { &ClientOption{ InitAddress: []string{"127.0.0.1:0"}, ClusterTopologyRefreshmentOption: ClusterTopologyRefreshmentOption{ - ScanInterval: 500 * time.Millisecond, + ScanInterval: 2 * time.Second, }, }, func(dst string, opt *ClientOption) conn { @@ -4760,7 +4760,7 @@ func TestClusterTopologyRefreshment(t *testing.T) { } // call in refreshment scan to verify that conns are changed. - if atomic.CompareAndSwapInt64(&callCount, 5, 6) { + if atomic.CompareAndSwapInt64(&callCount, 4, 5) { close(refreshWaitCh) return slotsRespWithChangedRoll } @@ -4791,7 +4791,7 @@ func TestClusterTopologyRefreshment(t *testing.T) { if cc, ok := conns["127.0.1.1:1"]; !ok || cc.replica { t.Fatalf("unexpected conns %v", conns) } - case <-time.After(10 * time.Second): + case <-time.After(30 * time.Second): t.Fatal("timeout waiting for refresh") } }) From 9fd1da9a6bff4f2cf0e05e77ca66135c9aa50250 Mon Sep 17 00:00:00 2001 From: proost Date: Sat, 21 Sep 2024 12:34:37 +0900 Subject: [PATCH 08/15] perf: set detection logic --- cluster.go | 53 ++++++++++++++++++++++++++++++++++++++++++++----- cluster_test.go | 8 ++++---- 2 files changed, 52 insertions(+), 9 deletions(-) diff --git a/cluster.go b/cluster.go index 8257a56c..5c88f9fb 100644 --- a/cluster.go +++ b/cluster.go @@ -182,25 +182,68 @@ func (c *clusterClient) _refresh() (err error) { pending = nil groups := result.parse(c.opt.TLSConfig != nil) + + // we need to check whether the new topology is different from the current one. + // so we don't need to early re-create the connections. conns := make(map[string]connrole, len(groups)) for master, g := range groups { - conns[master] = connrole{conn: c.connFn(master, c.opt), replica: false} + conns[master] = connrole{replica: false} for _, addr := range g.nodes[1:] { if c.rOpt != nil { - conns[addr] = connrole{conn: c.connFn(addr, c.rOpt), replica: true} + conns[addr] = connrole{replica: true} } else { - conns[addr] = connrole{conn: c.connFn(addr, c.opt), replica: true} + conns[addr] = connrole{replica: true} } } } // make sure InitAddress always be present for _, addr := range c.opt.InitAddress { if _, ok := conns[addr]; !ok { - conns[addr] = connrole{ - conn: c.connFn(addr, c.opt), + conns[addr] = connrole{} + } + } + + isChanged := false + c.mu.RLock() + // check if the new topology is different from the current one + for addr, cc := range conns { + old, ok := c.conns[addr] + if !ok || old.replica != cc.replica { + isChanged = true + break + } + } + // check if the current topology is different from the new one + if !isChanged { + for addr := range c.conns { + if _, ok := conns[addr]; !ok { + isChanged = true + break } } } + c.mu.RUnlock() + + if !isChanged { + return nil + } + + for addr, cc := range conns { + if cc.replica { + if c.rOpt != nil { + cc.conn = c.connFn(addr, c.rOpt) + } else { + cc.conn = c.connFn(addr, c.opt) + } + } else { + cc.conn = c.connFn(addr, c.opt) + } + + conns[addr] = connrole{ + conn: cc.conn, + replica: cc.replica, + } + } var removes []conn diff --git a/cluster_test.go b/cluster_test.go index 9a114129..fbc57bf9 100644 --- a/cluster_test.go +++ b/cluster_test.go @@ -4611,7 +4611,7 @@ func TestClusterTopologyRefreshment(t *testing.T) { if _, ok := conns["127.0.0.1:0"]; !ok { t.Fatalf("unexpected conns %v", conns) } - case <-time.After(30 * time.Second): + case <-time.After(60 * time.Second): t.Fatal("timeout waiting for refresh") } }) @@ -4671,7 +4671,7 @@ func TestClusterTopologyRefreshment(t *testing.T) { if _, ok := conns["127.0.1.1:1"]; !ok { t.Fatalf("unexpected conns %v", conns) } - case <-time.After(30 * time.Second): + case <-time.After(60 * time.Second): t.Fatal("timeout waiting for refresh") } }) @@ -4731,7 +4731,7 @@ func TestClusterTopologyRefreshment(t *testing.T) { if _, ok := conns["127.0.1.1:0"]; !ok { t.Fatalf("unexpected conns %v", conns) } - case <-time.After(30 * time.Second): + case <-time.After(60 * time.Second): t.Fatal("timeout waiting for refresh") } }) @@ -4791,7 +4791,7 @@ func TestClusterTopologyRefreshment(t *testing.T) { if cc, ok := conns["127.0.1.1:1"]; !ok || cc.replica { t.Fatalf("unexpected conns %v", conns) } - case <-time.After(30 * time.Second): + case <-time.After(60 * time.Second): t.Fatal("timeout waiting for refresh") } }) From 09a9403301ee2aecab963f684a7c499f93a79039 Mon Sep 17 00:00:00 2001 From: proost Date: Sat, 21 Sep 2024 12:36:55 +0900 Subject: [PATCH 09/15] refactor: rename cluster option --- cluster.go | 6 +++--- cluster_test.go | 12 ++++++------ rueidis.go | 10 +++++----- 3 files changed, 14 insertions(+), 14 deletions(-) diff --git a/cluster.go b/cluster.go index 5c88f9fb..8424094d 100644 --- a/cluster.go +++ b/cluster.go @@ -77,9 +77,9 @@ func newClusterClient(opt *ClientOption, connFn connFn) (*clusterClient, error) return client, err } - if opt.ClusterTopologyRefreshmentOption.ScanInterval > 0 { + if opt.ClusterOption.ScanInterval > 0 { go client.runClusterTopologyRefreshment() - } else if opt.ClusterTopologyRefreshmentOption.ScanInterval < 0 { + } else if opt.ClusterOption.ScanInterval < 0 { return nil, ErrInvalidScanInterval } @@ -411,7 +411,7 @@ func parseShards(shards RedisMessage, defaultAddr string, tls bool) map[string]g } func (c *clusterClient) runClusterTopologyRefreshment() { - ticker := time.NewTicker(c.opt.ClusterTopologyRefreshmentOption.ScanInterval) + ticker := time.NewTicker(c.opt.ClusterOption.ScanInterval) defer ticker.Stop() for { select { diff --git a/cluster_test.go b/cluster_test.go index fbc57bf9..f239b1b6 100644 --- a/cluster_test.go +++ b/cluster_test.go @@ -919,7 +919,7 @@ func TestClusterClientInit(t *testing.T) { _, err := newClusterClient( &ClientOption{ InitAddress: []string{"127.0.0.1:0"}, - ClusterTopologyRefreshmentOption: ClusterTopologyRefreshmentOption{ + ClusterOption: ClusterOption{ ScanInterval: -1 * time.Millisecond, }, }, @@ -4530,7 +4530,7 @@ func TestClusterTopologyRefreshment(t *testing.T) { _, err := newClusterClient( &ClientOption{ InitAddress: []string{"127.0.0.1:0"}, - ClusterTopologyRefreshmentOption: ClusterTopologyRefreshmentOption{ + ClusterOption: ClusterOption{ ScanInterval: 0, }, }, @@ -4565,7 +4565,7 @@ func TestClusterTopologyRefreshment(t *testing.T) { cli, err := newClusterClient( &ClientOption{ InitAddress: []string{"127.0.0.1:0"}, - ClusterTopologyRefreshmentOption: ClusterTopologyRefreshmentOption{ + ClusterOption: ClusterOption{ ScanInterval: 2 * time.Second, }, }, @@ -4622,7 +4622,7 @@ func TestClusterTopologyRefreshment(t *testing.T) { cli, err := newClusterClient( &ClientOption{ InitAddress: []string{"127.0.0.1:0"}, - ClusterTopologyRefreshmentOption: ClusterTopologyRefreshmentOption{ + ClusterOption: ClusterOption{ ScanInterval: 2 * time.Second, }, }, @@ -4682,7 +4682,7 @@ func TestClusterTopologyRefreshment(t *testing.T) { cli, err := newClusterClient( &ClientOption{ InitAddress: []string{"127.0.0.1:0"}, - ClusterTopologyRefreshmentOption: ClusterTopologyRefreshmentOption{ + ClusterOption: ClusterOption{ ScanInterval: 2 * time.Second, }, }, @@ -4742,7 +4742,7 @@ func TestClusterTopologyRefreshment(t *testing.T) { cli, err := newClusterClient( &ClientOption{ InitAddress: []string{"127.0.0.1:0"}, - ClusterTopologyRefreshmentOption: ClusterTopologyRefreshmentOption{ + ClusterOption: ClusterOption{ ScanInterval: 2 * time.Second, }, }, diff --git a/rueidis.go b/rueidis.go index 41479736..7a42ce22 100644 --- a/rueidis.go +++ b/rueidis.go @@ -184,8 +184,8 @@ type ClientOption struct { // even if we're above the configured client eviction threshold. ClientNoEvict bool - // ClusterTopologyRefreshmentOption is the options for the cluster topology refreshment. - ClusterTopologyRefreshmentOption ClusterTopologyRefreshmentOption + // ClusterOption is the options for the redis cluster client. + ClusterOption ClusterOption } // SentinelOption contains MasterSet, @@ -204,11 +204,11 @@ type SentinelOption struct { ClientName string } -// ClusterTopologyRefreshmentOption is the options for the cluster topology refreshment. -// Cluster Refresh happens only when the cluster topology is not up-to-date. -type ClusterTopologyRefreshmentOption struct { +// ClusterOption is the options for the redis cluster client. +type ClusterOption struct { // ScanInterval is the interval to scan the cluster topology. // If the value is zero, refreshment will be disabled. + // Cluster Refresh happens only when the cluster topology is not up-to-date. ScanInterval time.Duration } From fa0b58a22e8af7dc63d426d4bd0423c4c8182e25 Mon Sep 17 00:00:00 2001 From: proost Date: Sat, 21 Sep 2024 18:00:07 +0900 Subject: [PATCH 10/15] refactor: divide lazy conditional refresh --- cluster.go | 129 ++++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 128 insertions(+), 1 deletion(-) diff --git a/cluster.go b/cluster.go index 8424094d..843c5a6e 100644 --- a/cluster.go +++ b/cluster.go @@ -129,6 +129,10 @@ func (c *clusterClient) lazyRefresh() { c.sc.LazyDo(time.Second, c._refresh) } +func (c *clusterClient) lazyConditionalRefresh() { + c.sc.LazyDo(time.Second, c.conditionalRefresh) +} + type clusterslots struct { addr string reply RedisResult @@ -181,6 +185,129 @@ func (c *clusterClient) _refresh() (err error) { } pending = nil + groups := result.parse(c.opt.TLSConfig != nil) + conns := make(map[string]connrole, len(groups)) + for master, g := range groups { + conns[master] = connrole{conn: c.connFn(master, c.opt), replica: false} + for _, addr := range g.nodes[1:] { + if c.rOpt != nil { + conns[addr] = connrole{conn: c.connFn(addr, c.rOpt), replica: true} + } else { + conns[addr] = connrole{conn: c.connFn(addr, c.opt), replica: true} + } + } + } + // make sure InitAddress always be present + for _, addr := range c.opt.InitAddress { + if _, ok := conns[addr]; !ok { + conns[addr] = connrole{ + conn: c.connFn(addr, c.opt), + } + } + } + + var removes []conn + + c.mu.RLock() + for addr, cc := range c.conns { + fresh, ok := conns[addr] + if ok && (cc.replica == fresh.replica || c.rOpt == nil) { + conns[addr] = connrole{ + conn: cc.conn, + replica: fresh.replica, + } + } else { + removes = append(removes, cc.conn) + } + } + c.mu.RUnlock() + + pslots := [16384]conn{} + var rslots []conn + for master, g := range groups { + switch { + case c.opt.ReplicaOnly && len(g.nodes) > 1: + nodesCount := len(g.nodes) + for _, slot := range g.slots { + for i := slot[0]; i <= slot[1]; i++ { + pslots[i] = conns[g.nodes[1+util.FastRand(nodesCount-1)]].conn + } + } + case c.rOpt != nil: // implies c.opt.SendToReplicas != nil + if len(rslots) == 0 { // lazy init + rslots = make([]conn, 16384) + } + if len(g.nodes) > 1 { + for _, slot := range g.slots { + for i := slot[0]; i <= slot[1]; i++ { + pslots[i] = conns[master].conn + rslots[i] = conns[g.nodes[1+util.FastRand(len(g.nodes)-1)]].conn + } + } + } else { + for _, slot := range g.slots { + for i := slot[0]; i <= slot[1]; i++ { + pslots[i] = conns[master].conn + rslots[i] = conns[master].conn + } + } + } + default: + for _, slot := range g.slots { + for i := slot[0]; i <= slot[1]; i++ { + pslots[i] = conns[master].conn + } + } + } + } + + c.mu.Lock() + c.pslots = pslots + c.rslots = rslots + c.conns = conns + c.mu.Unlock() + + if len(removes) > 0 { + go func(removes []conn) { + time.Sleep(time.Second * 5) + for _, cc := range removes { + cc.Close() + } + }(removes) + } + + return nil +} + +func (c *clusterClient) conditionalRefresh() (err error) { + c.mu.RLock() + results := make(chan clusterslots, len(c.conns)) + pending := make([]conn, 0, len(c.conns)) + for _, cc := range c.conns { + pending = append(pending, cc.conn) + } + c.mu.RUnlock() + + var result clusterslots + for i := 0; i < cap(results); i++ { + if i&3 == 0 { // batch CLUSTER SLOTS/CLUSTER SHARDS for every 4 connections + for j := i; j < i+4 && j < len(pending); j++ { + go func(c conn, timeout time.Duration) { + results <- getClusterSlots(c, timeout) + }(pending[j], c.opt.ConnWriteTimeout) + } + } + result = <-results + err = result.reply.Error() + if len(result.reply.val.values) != 0 { + break + } + } + if err != nil { + return err + } + pending = nil + groups := result.parse(c.opt.TLSConfig != nil) // we need to check whether the new topology is different from the current one. @@ -418,7 +545,7 @@ func (c *clusterClient) runClusterTopologyRefreshment() { case <-c.stopCh: return case <-ticker.C: - c.lazyRefresh() + c.lazyConditionalRefresh() } } } From 42c0b0d164f8ffe0ebdfa4eeb5384f466e11bd1a Mon Sep 17 00:00:00 2001 From: proost Date: Sat, 21 Sep 2024 18:25:21 +0900 Subject: [PATCH 11/15] refactor: make reusable code --- cluster.go | 157 ++++++++++++++--------------------------------------- 1 file changed, 40 insertions(+), 117 deletions(-) diff --git a/cluster.go b/cluster.go index 843c5a6e..3ce24b0e 100644 --- a/cluster.go +++ b/cluster.go @@ -157,6 +157,38 @@ func getClusterSlots(c conn, timeout time.Duration) clusterslots { } func (c *clusterClient) _refresh() (err error) { + result, err := c.getClusterTopology() + if err != nil { + return err + } + + groups := result.parse(c.opt.TLSConfig != nil) + conns := make(map[string]connrole, len(groups)) + for master, g := range groups { + conns[master] = connrole{conn: c.connFn(master, c.opt), replica: false} + for _, addr := range g.nodes[1:] { + if c.rOpt != nil { + conns[addr] = connrole{conn: c.connFn(addr, c.rOpt), replica: true} + } else { + conns[addr] = connrole{conn: c.connFn(addr, c.opt), replica: true} + } + } + } + // make sure InitAddress always be present + for _, addr := range c.opt.InitAddress { + if _, ok := conns[addr]; !ok { + conns[addr] = connrole{ + conn: c.connFn(addr, c.opt), + } + } + } + + c.updateClusterTopologyCache(conns, groups) + + return nil +} + +func (c *clusterClient) getClusterTopology() (result clusterslots, err error) { c.mu.RLock() results := make(chan clusterslots, len(c.conns)) pending := make([]conn, 0, len(c.conns)) @@ -165,7 +197,6 @@ func (c *clusterClient) _refresh() (err error) { } c.mu.RUnlock() - var result clusterslots for i := 0; i < cap(results); i++ { if i&3 == 0 { // batch CLUSTER SLOTS/CLUSTER SHARDS for every 4 connections for j := i; j < i+4 && j < len(pending); j++ { @@ -181,31 +212,16 @@ func (c *clusterClient) _refresh() (err error) { } } if err != nil { - return err + return } pending = nil - groups := result.parse(c.opt.TLSConfig != nil) - conns := make(map[string]connrole, len(groups)) - for master, g := range groups { - conns[master] = connrole{conn: c.connFn(master, c.opt), replica: false} - for _, addr := range g.nodes[1:] { - if c.rOpt != nil { - conns[addr] = connrole{conn: c.connFn(addr, c.rOpt), replica: true} - } else { - conns[addr] = connrole{conn: c.connFn(addr, c.opt), replica: true} - } - } - } - // make sure InitAddress always be present - for _, addr := range c.opt.InitAddress { - if _, ok := conns[addr]; !ok { - conns[addr] = connrole{ - conn: c.connFn(addr, c.opt), - } - } - } + return +} +func (c *clusterClient) updateClusterTopologyCache( + conns map[string]connrole, groups map[string]group, +) { var removes []conn c.mu.RLock() @@ -275,38 +291,13 @@ func (c *clusterClient) _refresh() (err error) { } }(removes) } - - return nil } func (c *clusterClient) conditionalRefresh() (err error) { - c.mu.RLock() - results := make(chan clusterslots, len(c.conns)) - pending := make([]conn, 0, len(c.conns)) - for _, cc := range c.conns { - pending = append(pending, cc.conn) - } - c.mu.RUnlock() - - var result clusterslots - for i := 0; i < cap(results); i++ { - if i&3 == 0 { // batch CLUSTER SLOTS/CLUSTER SHARDS for every 4 connections - for j := i; j < i+4 && j < len(pending); j++ { - go func(c conn, timeout time.Duration) { - results <- getClusterSlots(c, timeout) - }(pending[j], c.opt.ConnWriteTimeout) - } - } - result = <-results - err = result.reply.Error() - if len(result.reply.val.values) != 0 { - break - } - } + result, err := c.getClusterTopology() if err != nil { return err } - pending = nil groups := result.parse(c.opt.TLSConfig != nil) @@ -372,75 +363,7 @@ func (c *clusterClient) conditionalRefresh() (err error) { } } - var removes []conn - - c.mu.RLock() - for addr, cc := range c.conns { - fresh, ok := conns[addr] - if ok && (cc.replica == fresh.replica || c.rOpt == nil) { - conns[addr] = connrole{ - conn: cc.conn, - replica: fresh.replica, - } - } else { - removes = append(removes, cc.conn) - } - } - c.mu.RUnlock() - - pslots := [16384]conn{} - var rslots []conn - for master, g := range groups { - switch { - case c.opt.ReplicaOnly && len(g.nodes) > 1: - nodesCount := len(g.nodes) - for _, slot := range g.slots { - for i := slot[0]; i <= slot[1]; i++ { - pslots[i] = conns[g.nodes[1+util.FastRand(nodesCount-1)]].conn - } - } - case c.rOpt != nil: // implies c.opt.SendToReplicas != nil - if len(rslots) == 0 { // lazy init - rslots = make([]conn, 16384) - } - if len(g.nodes) > 1 { - for _, slot := range g.slots { - for i := slot[0]; i <= slot[1]; i++ { - pslots[i] = conns[master].conn - rslots[i] = conns[g.nodes[1+util.FastRand(len(g.nodes)-1)]].conn - } - } - } else { - for _, slot := range g.slots { - for i := slot[0]; i <= slot[1]; i++ { - pslots[i] = conns[master].conn - rslots[i] = conns[master].conn - } - } - } - default: - for _, slot := range g.slots { - for i := slot[0]; i <= slot[1]; i++ { - pslots[i] = conns[master].conn - } - } - } - } - - c.mu.Lock() - c.pslots = pslots - c.rslots = rslots - c.conns = conns - c.mu.Unlock() - - if len(removes) > 0 { - go func(removes []conn) { - time.Sleep(time.Second * 5) - for _, cc := range removes { - cc.Close() - } - }(removes) - } + c.updateClusterTopologyCache(conns, groups) return nil } From c02c2562a7276d3bf265651534aab1e7403bf0cf Mon Sep 17 00:00:00 2001 From: proost Date: Sun, 22 Sep 2024 23:47:38 +0900 Subject: [PATCH 12/15] refactor: merge refresh --- cluster.go | 161 +++++++++++++++++------------------------------------ 1 file changed, 51 insertions(+), 110 deletions(-) diff --git a/cluster.go b/cluster.go index 3ce24b0e..11262fa7 100644 --- a/cluster.go +++ b/cluster.go @@ -129,10 +129,6 @@ func (c *clusterClient) lazyRefresh() { c.sc.LazyDo(time.Second, c._refresh) } -func (c *clusterClient) lazyConditionalRefresh() { - c.sc.LazyDo(time.Second, c.conditionalRefresh) -} - type clusterslots struct { addr string reply RedisResult @@ -157,10 +153,33 @@ func getClusterSlots(c conn, timeout time.Duration) clusterslots { } func (c *clusterClient) _refresh() (err error) { - result, err := c.getClusterTopology() + c.mu.RLock() + results := make(chan clusterslots, len(c.conns)) + pending := make([]conn, 0, len(c.conns)) + for _, cc := range c.conns { + pending = append(pending, cc.conn) + } + c.mu.RUnlock() + + var result clusterslots + for i := 0; i < cap(results); i++ { + if i&3 == 0 { // batch CLUSTER SLOTS/CLUSTER SHARDS for every 4 connections + for j := i; j < i+4 && j < len(pending); j++ { + go func(c conn, timeout time.Duration) { + results <- getClusterSlots(c, timeout) + }(pending[j], c.opt.ConnWriteTimeout) + } + } + result = <-results + err = result.reply.Error() + if len(result.reply.val.values) != 0 { + break + } + } if err != nil { return err } + pending = nil groups := result.parse(c.opt.TLSConfig != nil) conns := make(map[string]connrole, len(groups)) @@ -183,45 +202,40 @@ func (c *clusterClient) _refresh() (err error) { } } - c.updateClusterTopologyCache(conns, groups) - - return nil -} - -func (c *clusterClient) getClusterTopology() (result clusterslots, err error) { + shouldRefresh := false c.mu.RLock() - results := make(chan clusterslots, len(c.conns)) - pending := make([]conn, 0, len(c.conns)) - for _, cc := range c.conns { - pending = append(pending, cc.conn) + // check if the new topology is different from the current one + for addr, cc := range conns { + old, ok := c.conns[addr] + if !ok || old.replica != cc.replica { + shouldRefresh = true + break + } } - c.mu.RUnlock() - - for i := 0; i < cap(results); i++ { - if i&3 == 0 { // batch CLUSTER SLOTS/CLUSTER SHARDS for every 4 connections - for j := i; j < i+4 && j < len(pending); j++ { - go func(c conn, timeout time.Duration) { - results <- getClusterSlots(c, timeout) - }(pending[j], c.opt.ConnWriteTimeout) + // check if the current topology is different from the new one + if !shouldRefresh { + for addr := range c.conns { + if _, ok := conns[addr]; !ok { + shouldRefresh = true + break } } - result = <-results - err = result.reply.Error() - if len(result.reply.val.values) != 0 { - break - } } - if err != nil { - return + // check if cluster client is initialized. + if !shouldRefresh { + for _, cc := range c.pslots { + if cc == nil { + shouldRefresh = true + break + } + } } - pending = nil + c.mu.RUnlock() - return -} + if !shouldRefresh { + return nil + } -func (c *clusterClient) updateClusterTopologyCache( - conns map[string]connrole, groups map[string]group, -) { var removes []conn c.mu.RLock() @@ -291,79 +305,6 @@ func (c *clusterClient) updateClusterTopologyCache( } }(removes) } -} - -func (c *clusterClient) conditionalRefresh() (err error) { - result, err := c.getClusterTopology() - if err != nil { - return err - } - - groups := result.parse(c.opt.TLSConfig != nil) - - // we need to check whether the new topology is different from the current one. - // so we don't need to early re-create the connections. - conns := make(map[string]connrole, len(groups)) - for master, g := range groups { - conns[master] = connrole{replica: false} - for _, addr := range g.nodes[1:] { - if c.rOpt != nil { - conns[addr] = connrole{replica: true} - } else { - conns[addr] = connrole{replica: true} - } - } - } - // make sure InitAddress always be present - for _, addr := range c.opt.InitAddress { - if _, ok := conns[addr]; !ok { - conns[addr] = connrole{} - } - } - - isChanged := false - c.mu.RLock() - // check if the new topology is different from the current one - for addr, cc := range conns { - old, ok := c.conns[addr] - if !ok || old.replica != cc.replica { - isChanged = true - break - } - } - // check if the current topology is different from the new one - if !isChanged { - for addr := range c.conns { - if _, ok := conns[addr]; !ok { - isChanged = true - break - } - } - } - c.mu.RUnlock() - - if !isChanged { - return nil - } - - for addr, cc := range conns { - if cc.replica { - if c.rOpt != nil { - cc.conn = c.connFn(addr, c.rOpt) - } else { - cc.conn = c.connFn(addr, c.opt) - } - } else { - cc.conn = c.connFn(addr, c.opt) - } - - conns[addr] = connrole{ - conn: cc.conn, - replica: cc.replica, - } - } - - c.updateClusterTopologyCache(conns, groups) return nil } @@ -468,7 +409,7 @@ func (c *clusterClient) runClusterTopologyRefreshment() { case <-c.stopCh: return case <-ticker.C: - c.lazyConditionalRefresh() + c.lazyRefresh() } } } From 9f886c503d27f212422fdedc97f1b3c050eff45c Mon Sep 17 00:00:00 2001 From: proost Date: Tue, 24 Sep 2024 00:34:31 +0900 Subject: [PATCH 13/15] refactor: use parameter --- cluster.go | 88 ++++++++++++++++++++++--------------------------- cluster_test.go | 2 +- 2 files changed, 40 insertions(+), 50 deletions(-) diff --git a/cluster.go b/cluster.go index 11262fa7..beaeea1b 100644 --- a/cluster.go +++ b/cluster.go @@ -73,7 +73,7 @@ func newClusterClient(opt *ClientOption, connFn connFn) (*clusterClient, error) return nil, err } - if err := client.refresh(context.Background()); err != nil { + if err := client.refresh(context.Background(), true); err != nil { return client, err } @@ -121,12 +121,16 @@ func (c *clusterClient) init() error { return es[0] } -func (c *clusterClient) refresh(ctx context.Context) (err error) { - return c.sc.Do(ctx, c._refresh) +func (c *clusterClient) refresh(ctx context.Context, forceRefresh bool) (err error) { + return c.sc.Do(ctx, func() error { + return c._refresh(forceRefresh) + }) } func (c *clusterClient) lazyRefresh() { - c.sc.LazyDo(time.Second, c._refresh) + c.sc.LazyDo(time.Second, func() error { + return c._refresh(false) + }) } type clusterslots struct { @@ -152,7 +156,7 @@ func getClusterSlots(c conn, timeout time.Duration) clusterslots { return clusterslots{reply: c.Do(ctx, cmds.ShardsCmd), addr: c.Addr(), ver: v} } -func (c *clusterClient) _refresh() (err error) { +func (c *clusterClient) _refresh(forceRefresh bool) (err error) { c.mu.RLock() results := make(chan clusterslots, len(c.conns)) pending := make([]conn, 0, len(c.conns)) @@ -202,55 +206,41 @@ func (c *clusterClient) _refresh() (err error) { } } - shouldRefresh := false - c.mu.RLock() - // check if the new topology is different from the current one - for addr, cc := range conns { - old, ok := c.conns[addr] - if !ok || old.replica != cc.replica { - shouldRefresh = true - break - } - } - // check if the current topology is different from the new one - if !shouldRefresh { - for addr := range c.conns { - if _, ok := conns[addr]; !ok { + var removes []conn + + if !forceRefresh { + shouldRefresh := false + + c.mu.RLock() + // check if the current topology is different from the new one + for addr, cc := range c.conns { + fresh, ok := conns[addr] + if ok && (cc.replica == fresh.replica || c.rOpt == nil) { + conns[addr] = connrole{ + conn: cc.conn, + replica: fresh.replica, + } + } else { shouldRefresh = true - break + removes = append(removes, cc.conn) } } - } - // check if cluster client is initialized. - if !shouldRefresh { - for _, cc := range c.pslots { - if cc == nil { - shouldRefresh = true - break + // check if the new topology is different from the current one + if !shouldRefresh { + for addr, cc := range conns { + old, ok := c.conns[addr] + if !ok || old.replica != cc.replica { + shouldRefresh = true + break + } } } - } - c.mu.RUnlock() - - if !shouldRefresh { - return nil - } + c.mu.RUnlock() - var removes []conn - - c.mu.RLock() - for addr, cc := range c.conns { - fresh, ok := conns[addr] - if ok && (cc.replica == fresh.replica || c.rOpt == nil) { - conns[addr] = connrole{ - conn: cc.conn, - replica: fresh.replica, - } - } else { - removes = append(removes, cc.conn) + if !shouldRefresh { + return nil } } - c.mu.RUnlock() pslots := [16384]conn{} var rslots []conn @@ -435,7 +425,7 @@ func (c *clusterClient) _pick(slot uint16, toReplica bool) (p conn) { func (c *clusterClient) pick(ctx context.Context, slot uint16, toReplica bool) (p conn, err error) { if p = c._pick(slot, toReplica); p == nil { - if err := c.refresh(ctx); err != nil { + if err := c.refresh(ctx, false); err != nil { return nil, err } if p = c._pick(slot, toReplica); p == nil { @@ -617,7 +607,7 @@ func (c *clusterClient) _pickMulti(multi []Completed) (retries *connretry, last func (c *clusterClient) pickMulti(ctx context.Context, multi []Completed) (*connretry, uint16, bool, error) { conns, slot, toReplica := c._pickMulti(multi) if conns == nil { - if err := c.refresh(ctx); err != nil { + if err := c.refresh(ctx, false); err != nil { return nil, 0, false, err } if conns, slot, toReplica = c._pickMulti(multi); conns == nil { @@ -897,7 +887,7 @@ func (c *clusterClient) _pickMultiCache(multi []CacheableTTL) *connretrycache { func (c *clusterClient) pickMultiCache(ctx context.Context, multi []CacheableTTL) (*connretrycache, error) { conns := c._pickMultiCache(multi) if conns == nil { - if err := c.refresh(ctx); err != nil { + if err := c.refresh(ctx, false); err != nil { return nil, err } if conns = c._pickMultiCache(multi); conns == nil { diff --git a/cluster_test.go b/cluster_test.go index f239b1b6..9cc2d636 100644 --- a/cluster_test.go +++ b/cluster_test.go @@ -718,7 +718,7 @@ func TestClusterClientInit(t *testing.T) { atomic.AddInt64(num, 1) - if err := client.refresh(context.Background()); err != nil { + if err := client.refresh(context.Background(), true); err != nil { t.Fatalf("unexpected err %v", err) } From 3712efd3a6d66db27019ae895c5bda0599ab5b3f Mon Sep 17 00:00:00 2001 From: proost Date: Wed, 25 Sep 2024 23:41:53 +0900 Subject: [PATCH 14/15] refactor: always refresh --- cluster.go | 60 +++++++++++++++---------------------------------- cluster_test.go | 2 +- rueidis.go | 2 +- 3 files changed, 20 insertions(+), 44 deletions(-) diff --git a/cluster.go b/cluster.go index beaeea1b..858cb165 100644 --- a/cluster.go +++ b/cluster.go @@ -73,7 +73,7 @@ func newClusterClient(opt *ClientOption, connFn connFn) (*clusterClient, error) return nil, err } - if err := client.refresh(context.Background(), true); err != nil { + if err := client.refresh(context.Background()); err != nil { return client, err } @@ -121,16 +121,12 @@ func (c *clusterClient) init() error { return es[0] } -func (c *clusterClient) refresh(ctx context.Context, forceRefresh bool) (err error) { - return c.sc.Do(ctx, func() error { - return c._refresh(forceRefresh) - }) +func (c *clusterClient) refresh(ctx context.Context) (err error) { + return c.sc.Do(ctx, c._refresh) } func (c *clusterClient) lazyRefresh() { - c.sc.LazyDo(time.Second, func() error { - return c._refresh(false) - }) + c.sc.LazyDo(time.Second, c._refresh) } type clusterslots struct { @@ -156,7 +152,7 @@ func getClusterSlots(c conn, timeout time.Duration) clusterslots { return clusterslots{reply: c.Do(ctx, cmds.ShardsCmd), addr: c.Addr(), ver: v} } -func (c *clusterClient) _refresh(forceRefresh bool) (err error) { +func (c *clusterClient) _refresh() (err error) { c.mu.RLock() results := make(chan clusterslots, len(c.conns)) pending := make([]conn, 0, len(c.conns)) @@ -208,39 +204,19 @@ func (c *clusterClient) _refresh(forceRefresh bool) (err error) { var removes []conn - if !forceRefresh { - shouldRefresh := false - - c.mu.RLock() - // check if the current topology is different from the new one - for addr, cc := range c.conns { - fresh, ok := conns[addr] - if ok && (cc.replica == fresh.replica || c.rOpt == nil) { - conns[addr] = connrole{ - conn: cc.conn, - replica: fresh.replica, - } - } else { - shouldRefresh = true - removes = append(removes, cc.conn) - } - } - // check if the new topology is different from the current one - if !shouldRefresh { - for addr, cc := range conns { - old, ok := c.conns[addr] - if !ok || old.replica != cc.replica { - shouldRefresh = true - break - } + c.mu.RLock() + for addr, cc := range c.conns { + fresh, ok := conns[addr] + if ok && (cc.replica == fresh.replica || c.rOpt == nil) { + conns[addr] = connrole{ + conn: cc.conn, + replica: fresh.replica, } - } - c.mu.RUnlock() - - if !shouldRefresh { - return nil + } else { + removes = append(removes, cc.conn) } } + c.mu.RUnlock() pslots := [16384]conn{} var rslots []conn @@ -425,7 +401,7 @@ func (c *clusterClient) _pick(slot uint16, toReplica bool) (p conn) { func (c *clusterClient) pick(ctx context.Context, slot uint16, toReplica bool) (p conn, err error) { if p = c._pick(slot, toReplica); p == nil { - if err := c.refresh(ctx, false); err != nil { + if err := c.refresh(ctx); err != nil { return nil, err } if p = c._pick(slot, toReplica); p == nil { @@ -607,7 +583,7 @@ func (c *clusterClient) _pickMulti(multi []Completed) (retries *connretry, last func (c *clusterClient) pickMulti(ctx context.Context, multi []Completed) (*connretry, uint16, bool, error) { conns, slot, toReplica := c._pickMulti(multi) if conns == nil { - if err := c.refresh(ctx, false); err != nil { + if err := c.refresh(ctx); err != nil { return nil, 0, false, err } if conns, slot, toReplica = c._pickMulti(multi); conns == nil { @@ -887,7 +863,7 @@ func (c *clusterClient) _pickMultiCache(multi []CacheableTTL) *connretrycache { func (c *clusterClient) pickMultiCache(ctx context.Context, multi []CacheableTTL) (*connretrycache, error) { conns := c._pickMultiCache(multi) if conns == nil { - if err := c.refresh(ctx, false); err != nil { + if err := c.refresh(ctx); err != nil { return nil, err } if conns = c._pickMultiCache(multi); conns == nil { diff --git a/cluster_test.go b/cluster_test.go index 9cc2d636..f239b1b6 100644 --- a/cluster_test.go +++ b/cluster_test.go @@ -718,7 +718,7 @@ func TestClusterClientInit(t *testing.T) { atomic.AddInt64(num, 1) - if err := client.refresh(context.Background(), true); err != nil { + if err := client.refresh(context.Background()); err != nil { t.Fatalf("unexpected err %v", err) } diff --git a/rueidis.go b/rueidis.go index 7a42ce22..f5b16000 100644 --- a/rueidis.go +++ b/rueidis.go @@ -208,7 +208,7 @@ type SentinelOption struct { type ClusterOption struct { // ScanInterval is the interval to scan the cluster topology. // If the value is zero, refreshment will be disabled. - // Cluster Refresh happens only when the cluster topology is not up-to-date. + // Cluster topology cache refresh happens always in the background after successful scan. ScanInterval time.Duration } From 3c97bc8f0a770ec8d630f7568fa8cf940aa4c477 Mon Sep 17 00:00:00 2001 From: proost Date: Wed, 25 Sep 2024 23:57:54 +0900 Subject: [PATCH 15/15] refactor: rename shardsrefreshinterval --- cluster.go | 10 +++++----- cluster_test.go | 16 ++++++++-------- rueidis.go | 4 ++-- 3 files changed, 15 insertions(+), 15 deletions(-) diff --git a/cluster.go b/cluster.go index 858cb165..e8114129 100644 --- a/cluster.go +++ b/cluster.go @@ -18,7 +18,7 @@ import ( // ErrNoSlot indicates that there is no redis node owns the key slot. var ErrNoSlot = errors.New("the slot has no redis node") var ErrReplicaOnlyConflict = errors.New("ReplicaOnly conflicts with SendToReplicas option") -var ErrInvalidScanInterval = errors.New("scan interval must be greater than or equal to 0") +var ErrInvalidShardsRefreshInterval = errors.New("ShardsRefreshInterval must be greater than or equal to 0") type clusterClient struct { pslots [16384]conn @@ -77,10 +77,10 @@ func newClusterClient(opt *ClientOption, connFn connFn) (*clusterClient, error) return client, err } - if opt.ClusterOption.ScanInterval > 0 { + if opt.ClusterOption.ShardsRefreshInterval > 0 { go client.runClusterTopologyRefreshment() - } else if opt.ClusterOption.ScanInterval < 0 { - return nil, ErrInvalidScanInterval + } else if opt.ClusterOption.ShardsRefreshInterval < 0 { + return nil, ErrInvalidShardsRefreshInterval } return client, nil @@ -368,7 +368,7 @@ func parseShards(shards RedisMessage, defaultAddr string, tls bool) map[string]g } func (c *clusterClient) runClusterTopologyRefreshment() { - ticker := time.NewTicker(c.opt.ClusterOption.ScanInterval) + ticker := time.NewTicker(c.opt.ClusterOption.ShardsRefreshInterval) defer ticker.Stop() for { select { diff --git a/cluster_test.go b/cluster_test.go index f239b1b6..c7c368be 100644 --- a/cluster_test.go +++ b/cluster_test.go @@ -915,12 +915,12 @@ func TestClusterClientInit(t *testing.T) { } }) - t.Run("Negative Cluster Topology Scan Interval", func(t *testing.T) { + t.Run("Negative ShardRefreshInterval", func(t *testing.T) { _, err := newClusterClient( &ClientOption{ InitAddress: []string{"127.0.0.1:0"}, ClusterOption: ClusterOption{ - ScanInterval: -1 * time.Millisecond, + ShardsRefreshInterval: -1 * time.Millisecond, }, }, func(dst string, opt *ClientOption) conn { @@ -931,7 +931,7 @@ func TestClusterClientInit(t *testing.T) { } }, ) - if !errors.Is(err, ErrInvalidScanInterval) { + if !errors.Is(err, ErrInvalidShardsRefreshInterval) { t.Fatalf("unexpected err %v", err) } }) @@ -4531,7 +4531,7 @@ func TestClusterTopologyRefreshment(t *testing.T) { &ClientOption{ InitAddress: []string{"127.0.0.1:0"}, ClusterOption: ClusterOption{ - ScanInterval: 0, + ShardsRefreshInterval: 0, }, }, func(dst string, opt *ClientOption) conn { @@ -4566,7 +4566,7 @@ func TestClusterTopologyRefreshment(t *testing.T) { &ClientOption{ InitAddress: []string{"127.0.0.1:0"}, ClusterOption: ClusterOption{ - ScanInterval: 2 * time.Second, + ShardsRefreshInterval: 2 * time.Second, }, }, func(dst string, opt *ClientOption) conn { @@ -4623,7 +4623,7 @@ func TestClusterTopologyRefreshment(t *testing.T) { &ClientOption{ InitAddress: []string{"127.0.0.1:0"}, ClusterOption: ClusterOption{ - ScanInterval: 2 * time.Second, + ShardsRefreshInterval: 2 * time.Second, }, }, func(dst string, opt *ClientOption) conn { @@ -4683,7 +4683,7 @@ func TestClusterTopologyRefreshment(t *testing.T) { &ClientOption{ InitAddress: []string{"127.0.0.1:0"}, ClusterOption: ClusterOption{ - ScanInterval: 2 * time.Second, + ShardsRefreshInterval: 2 * time.Second, }, }, func(dst string, opt *ClientOption) conn { @@ -4743,7 +4743,7 @@ func TestClusterTopologyRefreshment(t *testing.T) { &ClientOption{ InitAddress: []string{"127.0.0.1:0"}, ClusterOption: ClusterOption{ - ScanInterval: 2 * time.Second, + ShardsRefreshInterval: 2 * time.Second, }, }, func(dst string, opt *ClientOption) conn { diff --git a/rueidis.go b/rueidis.go index f5b16000..169ae8b9 100644 --- a/rueidis.go +++ b/rueidis.go @@ -206,10 +206,10 @@ type SentinelOption struct { // ClusterOption is the options for the redis cluster client. type ClusterOption struct { - // ScanInterval is the interval to scan the cluster topology. + // ShardsRefreshInterval is the interval to scan the cluster topology. // If the value is zero, refreshment will be disabled. // Cluster topology cache refresh happens always in the background after successful scan. - ScanInterval time.Duration + ShardsRefreshInterval time.Duration } // Client is the redis client interface for both single redis instance and redis cluster. It should be created from the NewClient()