Skip to content

Commit

Permalink
fix: refresh slots and reconnect pubsub when cluster nodes leave
Browse files Browse the repository at this point in the history
  • Loading branch information
rueian committed Jan 17, 2022
1 parent 5b0518e commit a5afff2
Show file tree
Hide file tree
Showing 9 changed files with 111 additions and 53 deletions.
12 changes: 11 additions & 1 deletion client.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,17 @@ func newSingleClient(opt ClientOption, connFn connFn) (*singleClient, error) {
return nil, err
}

opt.PubSubOption.installHook(client.cmd, func() conn { return client.conn })
if opt.PubSubOption.onConnected != nil {
var install func(error)
install = func(prev error) {
if prev != ErrClosing {
dcc := &dedicatedSingleClient{cmd: client.cmd, wire: client.conn}
client.conn.OnDisconnected(install)
opt.PubSubOption.onConnected(prev, dcc)
}
}
install(nil)
}

return client, nil
}
Expand Down
89 changes: 69 additions & 20 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"math/rand"
"runtime"
"sync"
"sync/atomic"
"time"

"github.com/rueian/rueidis/internal/cmds"
Expand All @@ -23,6 +24,8 @@ type clusterClient struct {
slots [16384]conn
conns map[string]conn
connFn connFn

closed uint32
}

func newClusterClient(opt ClientOption, connFn connFn) (client *clusterClient, err error) {
Expand All @@ -48,13 +51,18 @@ func newClusterClient(opt ClientOption, connFn connFn) (client *clusterClient, e
return nil, err
}

opt.PubSubOption.installHook(client.cmd, func() (cc conn) {
var err error
for cc == nil && err != ErrClosing {
cc, err = client.pick(cmds.InitSlot)
if opt.PubSubOption.onConnected != nil {
var install func(error)
install = func(prev error) {
if atomic.LoadUint32(&client.closed) == 0 {
dcc := &dedicatedClusterClient{cmd: client.cmd, client: client, slot: cmds.InitSlot, pool: false, onDisconnect: install}
for cc := (conn)(nil); cc == nil; cc = dcc.getConn() {
opt.PubSubOption.onConnected(prev, dcc)
}
}
}
return cc
})
install(nil)
}

return client, nil
}
Expand Down Expand Up @@ -250,6 +258,9 @@ retry:
}
resp = cc.Do(cmd)
process:
if c.shouldRefreshRetry(resp.NonRedisError()) {
goto retry
}
if err := resp.RedisError(); err != nil {
if addr, ok := err.IsMoved(); ok {
go c.refresh()
Expand Down Expand Up @@ -277,6 +288,9 @@ retry:
}
resp = cc.DoCache(cmd, ttl)
process:
if c.shouldRefreshRetry(resp.NonRedisError()) {
goto retry
}
if err := resp.RedisError(); err != nil {
if addr, ok := err.IsMoved(); ok {
go c.refresh()
Expand All @@ -297,51 +311,76 @@ ret:
}

func (c *clusterClient) Dedicated(fn func(DedicatedClient) error) (err error) {
dcc := &dedicatedClusterClient{cmd: c.cmd, client: c, slot: cmds.InitSlot}
dcc := &dedicatedClusterClient{cmd: c.cmd, client: c, slot: cmds.NoSlot, pool: true}
err = fn(dcc)
dcc.release()
return err
}

func (c *clusterClient) Close() {
atomic.StoreUint32(&c.closed, 1)
c.mu.RLock()
for _, cc := range c.conns {
go cc.Close()
}
c.mu.RUnlock()
}

func (c *clusterClient) shouldRefreshRetry(err error) (should bool) {
if should = err == ErrClosing && atomic.LoadUint32(&c.closed) == 0; should {
c.refresh()
}
return should
}

type dedicatedClusterClient struct {
mu sync.Mutex
cmd *cmds.Builder
client *clusterClient
conn conn
wire wire
slot uint16
pool bool

onDisconnect func(error)
}

func (c *dedicatedClusterClient) check(slot uint16) {
if slot == cmds.InitSlot {
return
}
if c.slot == cmds.InitSlot {
c.mu.Lock()
defer c.mu.Unlock()
if c.slot == cmds.NoSlot {
c.slot = slot
} else if c.slot != slot {
panic(panicMsgCxSlot)
}
}

func (c *dedicatedClusterClient) acquire() (err error) {
func (c *dedicatedClusterClient) getConn() conn {
c.mu.Lock()
defer c.mu.Unlock()
return c.conn
}

func (c *dedicatedClusterClient) acquire() (wire wire, err error) {
c.mu.Lock()
defer c.mu.Unlock()
if c.wire != nil {
return nil
return c.wire, nil
}
if c.slot == cmds.InitSlot {
if c.slot == cmds.NoSlot {
panic(panicMsgNoSlot)
}
if c.conn, err = c.client.pick(c.slot); err != nil {
return err
return nil, err
}
if c.onDisconnect != nil {
c.conn.OnDisconnected(c.onDisconnect)
}
if !c.pool {
return c.conn, nil
}
c.wire = c.conn.Acquire()
return nil
return c.wire, nil
}

func (c *dedicatedClusterClient) release() {
Expand All @@ -356,10 +395,14 @@ func (c *dedicatedClusterClient) B() *cmds.Builder {

func (c *dedicatedClusterClient) Do(ctx context.Context, cmd cmds.Completed) (resp RedisResult) {
c.check(cmd.Slot())
if err := c.acquire(); err != nil {
retry:
if wire, err := c.acquire(); err != nil {
return newErrResult(err)
} else {
resp = c.wire.Do(cmd)
resp = wire.Do(cmd)
if c.client.shouldRefreshRetry(resp.NonRedisError()) {
goto retry
}
}
c.cmd.Put(cmd.CommandSlice())
return resp
Expand All @@ -372,8 +415,14 @@ func (c *dedicatedClusterClient) DoMulti(ctx context.Context, multi ...cmds.Comp
for _, cmd := range multi {
c.check(cmd.Slot())
}
if err := c.acquire(); err == nil {
resp = c.wire.DoMulti(multi...)
retry:
if wire, err := c.acquire(); err == nil {
resp = wire.DoMulti(multi...)
for _, resp := range resp {
if c.client.shouldRefreshRetry(resp.NonRedisError()) {
goto retry
}
}
} else {
resp = make([]RedisResult, len(multi))
for i := range resp {
Expand Down
3 changes: 2 additions & 1 deletion cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,9 @@ func TestClusterClient(t *testing.T) {
t.Errorf("Dedicated should panic if no slot is selected")
}
}()
builder := cmds.NewBuilder(cmds.NoSlot)
client.Dedicated(func(c DedicatedClient) error {
return c.Do(context.Background(), c.B().Info().Build()).Error()
return c.Do(context.Background(), builder.Info().Build()).Error()
})
})

Expand Down
6 changes: 5 additions & 1 deletion mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,15 @@ type mux struct {
onDisconnected atomic.Value
}

func makeMux(dst string, option ClientOption, dialFn dialFn) *mux {
func makeMux(dst string, option ClientOption, dialFn dialFn, retryOnRefuse bool) *mux {
return newMux(dst, option, (*pipe)(nil), func(onDisconnected func(err error)) (w wire, err error) {
conn, err := dialFn(dst, option)
if err == nil {
w, err = newPipe(conn, option, onDisconnected)
} else if !retryOnRefuse {
if e, ok := err.(*net.OpError); ok && !e.Timeout() && !e.Temporary() {
return dead, nil
}
}
return w, err
})
Expand Down
4 changes: 2 additions & 2 deletions mux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func TestNewMux(t *testing.T) {
}()
m := makeMux("", ClientOption{}, func(dst string, opt ClientOption) (net.Conn, error) {
return n1, nil
})
}, true)
if err := m.Dial(); err != nil {
t.Fatalf("unexpected error %v", err)
}
Expand Down Expand Up @@ -511,7 +511,7 @@ func BenchmarkClientSideCaching(b *testing.B) {
setup := func(b *testing.B) *mux {
c := makeMux("127.0.0.1:6379", ClientOption{CacheSizeEachConn: DefaultCacheBytes}, func(dst string, opt ClientOption) (conn net.Conn, err error) {
return net.Dial("tcp", dst)
})
}, true)
if err := c.Dial(); err != nil {
panic(err)
}
Expand Down
10 changes: 9 additions & 1 deletion pipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@ func (p *pipe) Error() error {
}

func (p *pipe) Close() {
swapped := p.error.CompareAndSwap(nil, &errs{error: ErrClosing})
swapped := p.error.CompareAndSwap(nil, errClosing)
atomic.CompareAndSwapInt32(&p.state, 0, 2)
atomic.CompareAndSwapInt32(&p.state, 1, 2)
p._awake()
Expand All @@ -462,12 +462,20 @@ func (p *pipe) Close() {
atomic.CompareAndSwapInt32(&p.state, 2, 3)
}

var dead *pipe

func init() {
dead = &pipe{state: 3}
dead.error.Store(errClosing)
}

const (
protocolbug = "protocol bug, message handled out of order"
prohibitmix = "mixing SUBSCRIBE, PSUBSCRIBE, UNSUBSCRIBE, PUNSUBSCRIBE with other commands in DoMulti is prohibited"
)

var cacheMark = &(RedisMessage{})
var errClosing = &errs{error: ErrClosing}

type errs struct{ error }

Expand Down
18 changes: 0 additions & 18 deletions pubsub.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package rueidis

import "github.com/rueian/rueidis/internal/cmds"

// NewPubSubOption creates a PubSubOption for client initialization.
// The onConnected callback is called when the connection to a redis node established including auto reconnect.
// One should subscribe to channel in the onConnected callback to have it resubscribe after auto reconnection.
Expand Down Expand Up @@ -33,19 +31,3 @@ type PubSubOption struct {
onUnSubscribed func(channel string, active int64)
onConnected func(prev error, client DedicatedClient)
}

func (h PubSubOption) _install(prev error, builder *cmds.Builder, pick func() conn) {
if cc := pick(); cc != nil {
cc.OnDisconnected(func(err error) {
if err != ErrClosing {
h._install(err, builder, pick)
}
})
h.onConnected(prev, &dedicatedSingleClient{cmd: builder, wire: cc})
}
}
func (h PubSubOption) installHook(builder *cmds.Builder, pick func() conn) {
if h.onConnected != nil {
h._install(nil, builder, pick)
}
}
10 changes: 5 additions & 5 deletions pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,15 +73,15 @@ func TestClusterClientPubSubReconnect(t *testing.T) {
t.Fatalf("unexpected err %v", err)
}
m.TriggerDisconnect(errors.New("network")) // should trigger reconnect
m.TriggerDisconnect(ErrClosing) // should not trigger reconnect
m.TriggerDisconnect(ErrClosing) // ErrClosing for cluster client should trigger reconnect

for atomic.LoadInt64(&count) != 2 {
log.Printf("wait for pubsub reconnect count to be 2, got: %d\n", atomic.LoadInt64(&count))
for atomic.LoadInt64(&count) != 3 {
log.Printf("wait for pubsub reconnect count to be 3, got: %d\n", atomic.LoadInt64(&count))
time.Sleep(time.Millisecond * 100)
}

if atomic.LoadInt64(&errs) != 1 {
t.Fatalf("errs count should be 1")
if atomic.LoadInt64(&errs) != 2 {
t.Fatalf("errs count should be 2")
}
}

Expand Down
12 changes: 8 additions & 4 deletions rueidis.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,15 +106,19 @@ type DedicatedClient interface {
// It will first try to connect as cluster client. If the len(ClientOption.InitAddress) == 1 and
// the address does not enable cluster mode, the NewClient() will use single client instead.
func NewClient(option ClientOption) (client Client, err error) {
client, err = newClusterClient(option, makeConn)
client, err = newClusterClient(option, makeClusterConn)
if err != nil && len(option.InitAddress) == 1 && err.Error() == redisErrMsgClusterDisabled {
client, err = newSingleClient(option, makeConn)
client, err = newSingleClient(option, makeSingleConn)
}
return client, err
}

func makeConn(dst string, opt ClientOption) conn {
return makeMux(dst, opt, dial)
func makeClusterConn(dst string, opt ClientOption) conn {
return makeMux(dst, opt, dial, false)
}

func makeSingleConn(dst string, opt ClientOption) conn {
return makeMux(dst, opt, dial, true)
}

func dial(dst string, opt ClientOption) (conn net.Conn, err error) {
Expand Down

0 comments on commit a5afff2

Please sign in to comment.