Skip to content

Commit

Permalink
feat: rename PubSubHandlers to PubSubOption
Browse files Browse the repository at this point in the history
  • Loading branch information
rueian committed Jan 1, 2022
1 parent d43eb8e commit 8f7815b
Show file tree
Hide file tree
Showing 8 changed files with 24 additions and 26 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -173,11 +173,11 @@ To receive messages from channels, the message handler should be registered when
```golang
c, _ := rueidis.NewClient(rueidis.ClientOption{
InitAddress: []string{"127.0.0.1:6379"},
PubSubHandlers: rueidis.NewPubSubHandlers(func(prev error, client rueidis.DedicatedClient) {
PubSubOption: rueidis.NewPubSubOption(func(prev error, client rueidis.DedicatedClient) {
// Subscribe channels in this PubSubSetup hook for auto reconnecting after disconnected.
// The "prev" err is previous disconnect error.
err := client.Do(ctx, client.B().Subscribe().Channel("my_channel").Build()).Error()
}, rueidis.PubSubOption{
}, rueidis.PubSubHandler{
OnMessage: func(channel, message string) {
// handle the message
},
Expand Down
2 changes: 1 addition & 1 deletion client.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func newSingleClient(opt ClientOption, connFn connFn) (*singleClient, error) {
return nil, err
}

opt.PubSubHandlers.installHook(client.cmd, func() conn { return client.conn })
opt.PubSubOption.installHook(client.cmd, func() conn { return client.conn })

return client, nil
}
Expand Down
2 changes: 1 addition & 1 deletion cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func newClusterClient(opt ClientOption, connFn connFn) (client *clusterClient, e
return nil, err
}

opt.PubSubHandlers.installHook(client.cmd, func() (cc conn) {
opt.PubSubOption.installHook(client.cmd, func() (cc conn) {
var err error
for cc == nil && err != ErrConnClosing {
cc, err = client.pick(cmds.InitSlot)
Expand Down
4 changes: 2 additions & 2 deletions pipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ type pipe struct {

info map[string]proto.Message

cbs PubSubHandlers
cbs PubSubOption

onDisconnected func(err error)
}
Expand All @@ -62,7 +62,7 @@ func newPipe(conn net.Conn, option ClientOption, onDisconnected func(err error))
r: bufio.NewReader(conn),
w: bufio.NewWriter(conn),

cbs: option.PubSubHandlers,
cbs: option.PubSubOption,
onDisconnected: onDisconnected,
}

Expand Down
2 changes: 1 addition & 1 deletion pipe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,7 @@ func TestPubSub(t *testing.T) {
t.Run("PubSub Push Message", func(t *testing.T) {
count := make([]int32, 4)
p, mock, cancel, _ := setup(t, ClientOption{
PubSubHandlers: PubSubHandlers{
PubSubOption: PubSubOption{
onMessage: func(channel, message string) {
if channel != "1" || message != "2" {
t.Fatalf("unexpected onMessage")
Expand Down
24 changes: 11 additions & 13 deletions pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,34 +2,32 @@ package rueidis

import "github.com/rueian/rueidis/internal/cmds"

func NewPubSubHandlers(onConnected PubSubSetup, opt PubSubOption) PubSubHandlers {
return PubSubHandlers{
onMessage: opt.OnMessage,
onPMessage: opt.OnPMessage,
onSubscribed: opt.OnSubscribed,
onUnSubscribed: opt.OnUnSubscribed,
func NewPubSubOption(onConnected func(prev error, client DedicatedClient), cbs PubSubHandler) PubSubOption {
return PubSubOption{
onMessage: cbs.OnMessage,
onPMessage: cbs.OnPMessage,
onSubscribed: cbs.OnSubscribed,
onUnSubscribed: cbs.OnUnSubscribed,
onConnected: onConnected,
}
}

type PubSubSetup func(prev error, client DedicatedClient)

type PubSubOption struct {
type PubSubHandler struct {
OnMessage func(channel, message string)
OnPMessage func(pattern, channel, message string)
OnSubscribed func(channel string, active int64)
OnUnSubscribed func(channel string, active int64)
}

type PubSubHandlers struct {
type PubSubOption struct {
onMessage func(channel, message string)
onPMessage func(pattern, channel, message string)
onSubscribed func(channel string, active int64)
onUnSubscribed func(channel string, active int64)
onConnected PubSubSetup
onConnected func(prev error, client DedicatedClient)
}

func (h PubSubHandlers) _install(prev error, builder *cmds.Builder, pick func() conn) {
func (h PubSubOption) _install(prev error, builder *cmds.Builder, pick func() conn) {
if cc := pick(); cc != nil {
cc.OnDisconnected(func(err error) {
if err != ErrConnClosing {
Expand All @@ -39,7 +37,7 @@ func (h PubSubHandlers) _install(prev error, builder *cmds.Builder, pick func()
go h.onConnected(prev, &dedicatedSingleClient{cmd: builder, wire: cc})
}
}
func (h PubSubHandlers) installHook(builder *cmds.Builder, pick func() conn) {
func (h PubSubOption) installHook(builder *cmds.Builder, pick func() conn) {
if h.onConnected != nil {
h._install(nil, builder, pick)
}
Expand Down
8 changes: 4 additions & 4 deletions pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,15 @@ func TestSingleClientPubSubReconnect(t *testing.T) {
}
_, err := newSingleClient(ClientOption{
InitAddress: []string{""},
PubSubHandlers: NewPubSubHandlers(func(prev error, client DedicatedClient) {
PubSubOption: NewPubSubOption(func(prev error, client DedicatedClient) {
if prev != nil {
atomic.AddInt64(&errs, 1)
}
if err := client.Do(context.Background(), client.B().Subscribe().Channel("a").Build()).Error(); err != nil {
t.Errorf("unexpected subscribe err %v", err)
}
atomic.AddInt64(&count, 1)
}, PubSubOption{})}, func(dst string, opt ClientOption) conn {
}, PubSubHandler{})}, func(dst string, opt ClientOption) conn {
return m
})
if err != nil {
Expand Down Expand Up @@ -57,15 +57,15 @@ func TestClusterClientPubSubReconnect(t *testing.T) {
}
_, err := newClusterClient(ClientOption{
InitAddress: []string{":0"},
PubSubHandlers: NewPubSubHandlers(func(prev error, client DedicatedClient) {
PubSubOption: NewPubSubOption(func(prev error, client DedicatedClient) {
if prev != nil {
atomic.AddInt64(&errs, 1)
}
if err := client.Do(context.Background(), client.B().Subscribe().Channel("a").Build()).Error(); err != nil {
t.Errorf("unexpected subscribe err %v", err)
}
atomic.AddInt64(&count, 1)
}, PubSubOption{}),
}, PubSubHandler{}),
}, func(dst string, opt ClientOption) conn {
return m
})
Expand Down
4 changes: 2 additions & 2 deletions rueidis.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ type ClientOption struct {
DialTimeout time.Duration
TLSConfig *tls.Config

// Redis PubSub callbacks
PubSubHandlers PubSubHandlers
// Redis PubSub callbacks, should be created from NewPubSubOption
PubSubOption PubSubOption
}

type Client interface {
Expand Down

0 comments on commit 8f7815b

Please sign in to comment.