diff --git a/README.md b/README.md index b6b2c4c7..e3f3219c 100644 --- a/README.md +++ b/README.md @@ -173,11 +173,11 @@ To receive messages from channels, the message handler should be registered when ```golang c, _ := rueidis.NewClient(rueidis.ClientOption{ InitAddress: []string{"127.0.0.1:6379"}, - PubSubHandlers: rueidis.NewPubSubHandlers(func(prev error, client rueidis.DedicatedClient) { + PubSubOption: rueidis.NewPubSubOption(func(prev error, client rueidis.DedicatedClient) { // Subscribe channels in this PubSubSetup hook for auto reconnecting after disconnected. // The "prev" err is previous disconnect error. err := client.Do(ctx, client.B().Subscribe().Channel("my_channel").Build()).Error() - }, rueidis.PubSubOption{ + }, rueidis.PubSubHandler{ OnMessage: func(channel, message string) { // handle the message }, diff --git a/client.go b/client.go index a8bb6735..7ef55e49 100644 --- a/client.go +++ b/client.go @@ -24,7 +24,7 @@ func newSingleClient(opt ClientOption, connFn connFn) (*singleClient, error) { return nil, err } - opt.PubSubHandlers.installHook(client.cmd, func() conn { return client.conn }) + opt.PubSubOption.installHook(client.cmd, func() conn { return client.conn }) return client, nil } diff --git a/cluster.go b/cluster.go index eeb1616c..2010785b 100644 --- a/cluster.go +++ b/cluster.go @@ -52,7 +52,7 @@ func newClusterClient(opt ClientOption, connFn connFn) (client *clusterClient, e return nil, err } - opt.PubSubHandlers.installHook(client.cmd, func() (cc conn) { + opt.PubSubOption.installHook(client.cmd, func() (cc conn) { var err error for cc == nil && err != ErrConnClosing { cc, err = client.pick(cmds.InitSlot) diff --git a/pipe.go b/pipe.go index 09dbbc02..a413ed92 100644 --- a/pipe.go +++ b/pipe.go @@ -45,7 +45,7 @@ type pipe struct { info map[string]proto.Message - cbs PubSubHandlers + cbs PubSubOption onDisconnected func(err error) } @@ -62,7 +62,7 @@ func newPipe(conn net.Conn, option ClientOption, onDisconnected func(err error)) r: bufio.NewReader(conn), w: bufio.NewWriter(conn), - cbs: option.PubSubHandlers, + cbs: option.PubSubOption, onDisconnected: onDisconnected, } diff --git a/pipe_test.go b/pipe_test.go index 3a08bfca..0498c24c 100644 --- a/pipe_test.go +++ b/pipe_test.go @@ -419,7 +419,7 @@ func TestPubSub(t *testing.T) { t.Run("PubSub Push Message", func(t *testing.T) { count := make([]int32, 4) p, mock, cancel, _ := setup(t, ClientOption{ - PubSubHandlers: PubSubHandlers{ + PubSubOption: PubSubOption{ onMessage: func(channel, message string) { if channel != "1" || message != "2" { t.Fatalf("unexpected onMessage") diff --git a/pubsub.go b/pubsub.go index 472a7091..a5daba4c 100644 --- a/pubsub.go +++ b/pubsub.go @@ -2,34 +2,32 @@ package rueidis import "github.com/rueian/rueidis/internal/cmds" -func NewPubSubHandlers(onConnected PubSubSetup, opt PubSubOption) PubSubHandlers { - return PubSubHandlers{ - onMessage: opt.OnMessage, - onPMessage: opt.OnPMessage, - onSubscribed: opt.OnSubscribed, - onUnSubscribed: opt.OnUnSubscribed, +func NewPubSubOption(onConnected func(prev error, client DedicatedClient), cbs PubSubHandler) PubSubOption { + return PubSubOption{ + onMessage: cbs.OnMessage, + onPMessage: cbs.OnPMessage, + onSubscribed: cbs.OnSubscribed, + onUnSubscribed: cbs.OnUnSubscribed, onConnected: onConnected, } } -type PubSubSetup func(prev error, client DedicatedClient) - -type PubSubOption struct { +type PubSubHandler struct { OnMessage func(channel, message string) OnPMessage func(pattern, channel, message string) OnSubscribed func(channel string, active int64) OnUnSubscribed func(channel string, active int64) } -type PubSubHandlers struct { +type PubSubOption struct { onMessage func(channel, message string) onPMessage func(pattern, channel, message string) onSubscribed func(channel string, active int64) onUnSubscribed func(channel string, active int64) - onConnected PubSubSetup + onConnected func(prev error, client DedicatedClient) } -func (h PubSubHandlers) _install(prev error, builder *cmds.Builder, pick func() conn) { +func (h PubSubOption) _install(prev error, builder *cmds.Builder, pick func() conn) { if cc := pick(); cc != nil { cc.OnDisconnected(func(err error) { if err != ErrConnClosing { @@ -39,7 +37,7 @@ func (h PubSubHandlers) _install(prev error, builder *cmds.Builder, pick func() go h.onConnected(prev, &dedicatedSingleClient{cmd: builder, wire: cc}) } } -func (h PubSubHandlers) installHook(builder *cmds.Builder, pick func() conn) { +func (h PubSubOption) installHook(builder *cmds.Builder, pick func() conn) { if h.onConnected != nil { h._install(nil, builder, pick) } diff --git a/pubsub_test.go b/pubsub_test.go index 510a89fe..88dd658f 100644 --- a/pubsub_test.go +++ b/pubsub_test.go @@ -21,7 +21,7 @@ func TestSingleClientPubSubReconnect(t *testing.T) { } _, err := newSingleClient(ClientOption{ InitAddress: []string{""}, - PubSubHandlers: NewPubSubHandlers(func(prev error, client DedicatedClient) { + PubSubOption: NewPubSubOption(func(prev error, client DedicatedClient) { if prev != nil { atomic.AddInt64(&errs, 1) } @@ -29,7 +29,7 @@ func TestSingleClientPubSubReconnect(t *testing.T) { t.Errorf("unexpected subscribe err %v", err) } atomic.AddInt64(&count, 1) - }, PubSubOption{})}, func(dst string, opt ClientOption) conn { + }, PubSubHandler{})}, func(dst string, opt ClientOption) conn { return m }) if err != nil { @@ -57,7 +57,7 @@ func TestClusterClientPubSubReconnect(t *testing.T) { } _, err := newClusterClient(ClientOption{ InitAddress: []string{":0"}, - PubSubHandlers: NewPubSubHandlers(func(prev error, client DedicatedClient) { + PubSubOption: NewPubSubOption(func(prev error, client DedicatedClient) { if prev != nil { atomic.AddInt64(&errs, 1) } @@ -65,7 +65,7 @@ func TestClusterClientPubSubReconnect(t *testing.T) { t.Errorf("unexpected subscribe err %v", err) } atomic.AddInt64(&count, 1) - }, PubSubOption{}), + }, PubSubHandler{}), }, func(dst string, opt ClientOption) conn { return m }) diff --git a/rueidis.go b/rueidis.go index f050bb39..358121f1 100644 --- a/rueidis.go +++ b/rueidis.go @@ -44,8 +44,8 @@ type ClientOption struct { DialTimeout time.Duration TLSConfig *tls.Config - // Redis PubSub callbacks - PubSubHandlers PubSubHandlers + // Redis PubSub callbacks, should be created from NewPubSubOption + PubSubOption PubSubOption } type Client interface {