From 9ff2841d85658dc5aa29036f217e02e326e95258 Mon Sep 17 00:00:00 2001 From: Hyeonho Kim Date: Thu, 26 Sep 2024 01:03:38 +0900 Subject: [PATCH] feat: add cluster shards refresh interval (#638) * feat: add clutser topology awareness * fix: depense closed channel close * refactor: make conditional refresh * test: fix broken test * test: change flaky test * perf: use less call * refactor: rollback * perf: set detection logic * refactor: rename cluster option * refactor: divide lazy conditional refresh * refactor: make reusable code * refactor: merge refresh * refactor: use parameter * refactor: always refresh * refactor: rename shardsrefreshinterval --- cluster.go | 27 +++- cluster_test.go | 319 ++++++++++++++++++++++++++++++++++++++++++++++++ rueidis.go | 11 ++ 3 files changed, 356 insertions(+), 1 deletion(-) diff --git a/cluster.go b/cluster.go index c2044e8a..315b16c6 100644 --- a/cluster.go +++ b/cluster.go @@ -18,6 +18,7 @@ import ( // ErrNoSlot indicates that there is no redis node owns the key slot. var ErrNoSlot = errors.New("the slot has no redis node") var ErrReplicaOnlyConflict = errors.New("ReplicaOnly conflicts with SendToReplicas option") +var ErrInvalidShardsRefreshInterval = errors.New("ShardsRefreshInterval must be greater than or equal to 0") type clusterClient struct { pslots [16384]conn @@ -31,6 +32,7 @@ type clusterClient struct { stop uint32 cmd Builder retry bool + stopCh chan struct{} } // NOTE: connrole and conn must be initialized at the same time @@ -46,6 +48,7 @@ func newClusterClient(opt *ClientOption, connFn connFn) (*clusterClient, error) opt: opt, conns: make(map[string]connrole), retry: !opt.DisableRetry, + stopCh: make(chan struct{}), } if opt.ReplicaOnly && opt.SendToReplicas != nil { @@ -74,6 +77,12 @@ func newClusterClient(opt *ClientOption, connFn connFn) (*clusterClient, error) return client, err } + if opt.ClusterOption.ShardsRefreshInterval > 0 { + go client.runClusterTopologyRefreshment() + } else if opt.ClusterOption.ShardsRefreshInterval < 0 { + return nil, ErrInvalidShardsRefreshInterval + } + return client, nil } @@ -358,6 +367,19 @@ func parseShards(shards RedisMessage, defaultAddr string, tls bool) map[string]g return groups } +func (c *clusterClient) runClusterTopologyRefreshment() { + ticker := time.NewTicker(c.opt.ClusterOption.ShardsRefreshInterval) + defer ticker.Stop() + for { + select { + case <-c.stopCh: + return + case <-ticker.C: + c.lazyRefresh() + } + } +} + func (c *clusterClient) _pick(slot uint16, toReplica bool) (p conn) { c.mu.RLock() if slot == cmds.InitSlot { @@ -1018,7 +1040,10 @@ func (c *clusterClient) Nodes() map[string]Client { } func (c *clusterClient) Close() { - atomic.StoreUint32(&c.stop, 1) + if atomic.CompareAndSwapUint32(&c.stop, 0, 1) { + close(c.stopCh) + } + c.mu.RLock() for _, cc := range c.conns { go cc.conn.Close() diff --git a/cluster_test.go b/cluster_test.go index 33b54aef..c7c368be 100644 --- a/cluster_test.go +++ b/cluster_test.go @@ -33,6 +33,23 @@ var slotsResp = newResult(RedisMessage{typ: '*', values: []RedisMessage{ }}, }}, nil) +var slotsRespWithChangedRoll = newResult(RedisMessage{typ: '*', values: []RedisMessage{ + {typ: '*', values: []RedisMessage{ + {typ: ':', integer: 0}, + {typ: ':', integer: 16383}, + {typ: '*', values: []RedisMessage{ // master + {typ: '+', string: "127.0.1.1"}, + {typ: ':', integer: 1}, + {typ: '+', string: ""}, + }}, + {typ: '*', values: []RedisMessage{ // replica + {typ: '+', string: "127.0.0.1"}, + {typ: ':', integer: 0}, + {typ: '+', string: ""}, + }}, + }}, +}}, nil) + var slotsMultiResp = newResult(RedisMessage{typ: '*', values: []RedisMessage{ {typ: '*', values: []RedisMessage{ {typ: ':', integer: 0}, @@ -897,6 +914,27 @@ func TestClusterClientInit(t *testing.T) { t.Fatalf("unexpected node assigned to rslot 16383") } }) + + t.Run("Negative ShardRefreshInterval", func(t *testing.T) { + _, err := newClusterClient( + &ClientOption{ + InitAddress: []string{"127.0.0.1:0"}, + ClusterOption: ClusterOption{ + ShardsRefreshInterval: -1 * time.Millisecond, + }, + }, + func(dst string, opt *ClientOption) conn { + return &mockConn{ + DoFn: func(cmd Completed) RedisResult { + return singleSlotResp + }, + } + }, + ) + if !errors.Is(err, ErrInvalidShardsRefreshInterval) { + t.Fatalf("unexpected err %v", err) + } + }) } //gocyclo:ignore @@ -1153,6 +1191,12 @@ func TestClusterClient(t *testing.T) { } client.Close() <-called + select { + case _, ok := <-client.stopCh: + if ok { + t.Fatalf("stopCh should be closed") + } + } }) t.Run("Dedicated Err", func(t *testing.T) { @@ -4477,3 +4521,278 @@ func TestConnectToNonAvailableCluster(t *testing.T) { } wg.Wait() } + +func TestClusterTopologyRefreshment(t *testing.T) { + defer ShouldNotLeaked(SetupLeakDetection()) + + t.Run("no refreshment", func(t *testing.T) { + var callCount int64 + _, err := newClusterClient( + &ClientOption{ + InitAddress: []string{"127.0.0.1:0"}, + ClusterOption: ClusterOption{ + ShardsRefreshInterval: 0, + }, + }, + func(dst string, opt *ClientOption) conn { + return &mockConn{ + DoFn: func(cmd Completed) RedisResult { + // initial call + if atomic.CompareAndSwapInt64(&callCount, 0, 1) { + return singleSlotResp + } + + t.Fatalf("unexpected call") + return singleSlotResp + }, + } + }, + ) + if err != nil { + t.Fatalf("unexpected err %v", err) + } + + time.Sleep(3 * time.Second) // verify that no refreshment is called + + if atomic.LoadInt64(&callCount) != 1 { + t.Fatalf("unexpected call count %d", callCount) + } + }) + + t.Run("nothing changed", func(t *testing.T) { + var callCount int64 + refreshWaitCh := make(chan struct{}) + cli, err := newClusterClient( + &ClientOption{ + InitAddress: []string{"127.0.0.1:0"}, + ClusterOption: ClusterOption{ + ShardsRefreshInterval: 2 * time.Second, + }, + }, + func(dst string, opt *ClientOption) conn { + return &mockConn{ + DoFn: func(cmd Completed) RedisResult { + // initial call + if atomic.CompareAndSwapInt64(&callCount, 0, 1) { + return singleSlotResp + } + + // call in refreshment scan + if atomic.CompareAndSwapInt64(&callCount, 1, 2) { + return singleSlotResp + } + + // call in another refreshment scan + if atomic.CompareAndSwapInt64(&callCount, 4, 5) { + close(refreshWaitCh) + return singleSlotResp + } + + atomic.AddInt64(&callCount, 1) + return singleSlotResp + }, + } + }, + ) + if err != nil { + t.Fatalf("unexpected err %v", err) + } + + select { + case <-refreshWaitCh: + cli.Close() + + cli.mu.Lock() + conns := cli.conns + cli.mu.Unlock() + if len(conns) != 1 { + t.Fatalf("unexpected conns %v", conns) + } + if _, ok := conns["127.0.0.1:0"]; !ok { + t.Fatalf("unexpected conns %v", conns) + } + case <-time.After(60 * time.Second): + t.Fatal("timeout waiting for refresh") + } + }) + + t.Run("replicas are changed", func(t *testing.T) { + var callCount int64 + refreshWaitCh := make(chan struct{}) + cli, err := newClusterClient( + &ClientOption{ + InitAddress: []string{"127.0.0.1:0"}, + ClusterOption: ClusterOption{ + ShardsRefreshInterval: 2 * time.Second, + }, + }, + func(dst string, opt *ClientOption) conn { + return &mockConn{ + DoFn: func(cmd Completed) RedisResult { + // initial call + if atomic.CompareAndSwapInt64(&callCount, 0, 1) { + return singleSlotResp + } + + // call in refreshment scan + if atomic.CompareAndSwapInt64(&callCount, 1, 2) { + return slotsResp + } + + // call in another refreshment scan to verify that conns are changed. + if atomic.CompareAndSwapInt64(&callCount, 4, 5) { + close(refreshWaitCh) + return slotsResp + } + + atomic.AddInt64(&callCount, 1) + return slotsResp + }, + } + }, + ) + if err != nil { + t.Fatalf("unexpected err %v", err) + } + + select { + case <-refreshWaitCh: + cli.Close() + + cli.mu.Lock() + conns := cli.conns + cli.mu.Unlock() + if len(conns) != 2 { + t.Fatalf("unexpected conns %v", conns) + } + if _, ok := conns["127.0.0.1:0"]; !ok { + t.Fatalf("unexpected conns %v", conns) + } + if _, ok := conns["127.0.1.1:1"]; !ok { + t.Fatalf("unexpected conns %v", conns) + } + case <-time.After(60 * time.Second): + t.Fatal("timeout waiting for refresh") + } + }) + + t.Run("shards are changed", func(t *testing.T) { + var callCount int64 + refreshWaitCh := make(chan struct{}) + cli, err := newClusterClient( + &ClientOption{ + InitAddress: []string{"127.0.0.1:0"}, + ClusterOption: ClusterOption{ + ShardsRefreshInterval: 2 * time.Second, + }, + }, + func(dst string, opt *ClientOption) conn { + return &mockConn{ + DoFn: func(cmd Completed) RedisResult { + // initial call + if atomic.CompareAndSwapInt64(&callCount, 0, 1) { + return singleSlotResp + } + + // call in refreshment scan + if atomic.CompareAndSwapInt64(&callCount, 1, 2) { + return slotsMultiRespWithoutReplicas + } + + // call in another refreshment scan to verify that conns are changed. + if atomic.CompareAndSwapInt64(&callCount, 4, 5) { + close(refreshWaitCh) + return slotsMultiRespWithoutReplicas + } + + atomic.AddInt64(&callCount, 1) + return slotsMultiRespWithoutReplicas + }, + } + }, + ) + if err != nil { + t.Fatalf("unexpected err %v", err) + } + + select { + case <-refreshWaitCh: + cli.Close() + + cli.mu.Lock() + conns := cli.conns + cli.mu.Unlock() + if len(conns) != 2 { + t.Fatalf("unexpected conns %v", conns) + } + if _, ok := conns["127.0.0.1:0"]; !ok { + t.Fatalf("unexpected conns %v", conns) + } + if _, ok := conns["127.0.1.1:0"]; !ok { + t.Fatalf("unexpected conns %v", conns) + } + case <-time.After(60 * time.Second): + t.Fatal("timeout waiting for refresh") + } + }) + + t.Run("node roll are changed", func(t *testing.T) { + var callCount int64 + refreshWaitCh := make(chan struct{}) + cli, err := newClusterClient( + &ClientOption{ + InitAddress: []string{"127.0.0.1:0"}, + ClusterOption: ClusterOption{ + ShardsRefreshInterval: 2 * time.Second, + }, + }, + func(dst string, opt *ClientOption) conn { + return &mockConn{ + DoFn: func(cmd Completed) RedisResult { + // initial call + if atomic.CompareAndSwapInt64(&callCount, 0, 1) { + return slotsResp + } + + // call in refreshment scan + if atomic.CompareAndSwapInt64(&callCount, 1, 2) { + return slotsRespWithChangedRoll + } + + // call in refreshment scan to verify that conns are changed. + if atomic.CompareAndSwapInt64(&callCount, 4, 5) { + close(refreshWaitCh) + return slotsRespWithChangedRoll + } + + atomic.AddInt64(&callCount, 1) + return slotsRespWithChangedRoll + }, + } + }, + ) + if err != nil { + t.Fatalf("unexpected err %v", err) + } + + select { + case <-refreshWaitCh: + cli.Close() + + cli.mu.Lock() + conns := cli.conns + cli.mu.Unlock() + if len(conns) != 2 { + t.Fatalf("unexpected conns %v", conns) + } + if cc, ok := conns["127.0.0.1:0"]; !ok || !cc.replica { + t.Fatalf("unexpected conns %v", conns) + } + if cc, ok := conns["127.0.1.1:1"]; !ok || cc.replica { + t.Fatalf("unexpected conns %v", conns) + } + case <-time.After(60 * time.Second): + t.Fatal("timeout waiting for refresh") + } + }) +} diff --git a/rueidis.go b/rueidis.go index 77c6c673..169ae8b9 100644 --- a/rueidis.go +++ b/rueidis.go @@ -183,6 +183,9 @@ type ClientOption struct { // the current connection will be excluded from the client eviction process // even if we're above the configured client eviction threshold. ClientNoEvict bool + + // ClusterOption is the options for the redis cluster client. + ClusterOption ClusterOption } // SentinelOption contains MasterSet, @@ -201,6 +204,14 @@ type SentinelOption struct { ClientName string } +// ClusterOption is the options for the redis cluster client. +type ClusterOption struct { + // ShardsRefreshInterval is the interval to scan the cluster topology. + // If the value is zero, refreshment will be disabled. + // Cluster topology cache refresh happens always in the background after successful scan. + ShardsRefreshInterval time.Duration +} + // Client is the redis client interface for both single redis instance and redis cluster. It should be created from the NewClient() type Client interface { CoreClient