Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add clutser topology awareness #638

Merged
merged 15 commits into from
Sep 25, 2024
Prev Previous commit
Next Next commit
perf: set detection logic
  • Loading branch information
proost committed Sep 21, 2024
commit 9fd1da9a6bff4f2cf0e05e77ca66135c9aa50250
53 changes: 48 additions & 5 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,25 +182,68 @@ func (c *clusterClient) _refresh() (err error) {
pending = nil

groups := result.parse(c.opt.TLSConfig != nil)

// we need to check whether the new topology is different from the current one.
// so we don't need to early re-create the connections.
conns := make(map[string]connrole, len(groups))
for master, g := range groups {
conns[master] = connrole{conn: c.connFn(master, c.opt), replica: false}
conns[master] = connrole{replica: false}
for _, addr := range g.nodes[1:] {
if c.rOpt != nil {
conns[addr] = connrole{conn: c.connFn(addr, c.rOpt), replica: true}
conns[addr] = connrole{replica: true}
} else {
conns[addr] = connrole{conn: c.connFn(addr, c.opt), replica: true}
conns[addr] = connrole{replica: true}
}
}
}
// make sure InitAddress always be present
for _, addr := range c.opt.InitAddress {
if _, ok := conns[addr]; !ok {
conns[addr] = connrole{
conn: c.connFn(addr, c.opt),
conns[addr] = connrole{}
}
}

isChanged := false
c.mu.RLock()
// check if the new topology is different from the current one
for addr, cc := range conns {
old, ok := c.conns[addr]
if !ok || old.replica != cc.replica {
isChanged = true
break
}
}
// check if the current topology is different from the new one
if !isChanged {
for addr := range c.conns {
if _, ok := conns[addr]; !ok {
isChanged = true
break
}
}
}
c.mu.RUnlock()

if !isChanged {
return nil
}

for addr, cc := range conns {
if cc.replica {
if c.rOpt != nil {
cc.conn = c.connFn(addr, c.rOpt)
} else {
cc.conn = c.connFn(addr, c.opt)
}
} else {
cc.conn = c.connFn(addr, c.opt)
}

conns[addr] = connrole{
conn: cc.conn,
replica: cc.replica,
}
}

var removes []conn

Expand Down
8 changes: 4 additions & 4 deletions cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4611,7 +4611,7 @@ func TestClusterTopologyRefreshment(t *testing.T) {
if _, ok := conns["127.0.0.1:0"]; !ok {
t.Fatalf("unexpected conns %v", conns)
}
case <-time.After(30 * time.Second):
case <-time.After(60 * time.Second):
t.Fatal("timeout waiting for refresh")
}
})
Expand Down Expand Up @@ -4671,7 +4671,7 @@ func TestClusterTopologyRefreshment(t *testing.T) {
if _, ok := conns["127.0.1.1:1"]; !ok {
t.Fatalf("unexpected conns %v", conns)
}
case <-time.After(30 * time.Second):
case <-time.After(60 * time.Second):
t.Fatal("timeout waiting for refresh")
}
})
Expand Down Expand Up @@ -4731,7 +4731,7 @@ func TestClusterTopologyRefreshment(t *testing.T) {
if _, ok := conns["127.0.1.1:0"]; !ok {
t.Fatalf("unexpected conns %v", conns)
}
case <-time.After(30 * time.Second):
case <-time.After(60 * time.Second):
t.Fatal("timeout waiting for refresh")
}
})
Expand Down Expand Up @@ -4791,7 +4791,7 @@ func TestClusterTopologyRefreshment(t *testing.T) {
if cc, ok := conns["127.0.1.1:1"]; !ok || cc.replica {
t.Fatalf("unexpected conns %v", conns)
}
case <-time.After(30 * time.Second):
case <-time.After(60 * time.Second):
t.Fatal("timeout waiting for refresh")
}
})
Expand Down