Skip to content

Commit

Permalink
Fix #1166 (#1249)
Browse files Browse the repository at this point in the history
  • Loading branch information
justxuewei authored Jun 14, 2021
1 parent 75fc696 commit 5c2e47e
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 23 deletions.
25 changes: 16 additions & 9 deletions registry/zookeeper/service_discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,15 @@ import (
"dubbo.apache.org/dubbo-go/v3/registry/event"
)

var testName = "test"

var tc *zk.TestCluster
const testName = "test"

func prepareData(t *testing.T) *zk.TestCluster {
var err error
tc, err = zk.StartTestCluster(1, nil, nil, zk.WithRetryTimes(20))
tc, err := zk.StartTestCluster(1, nil, nil)
assert.NoError(t, err)
assert.NotNil(t, tc.Servers[0])
address := "127.0.0.1:" + strconv.Itoa(tc.Servers[0].Port)
//address := "127.0.0.1:2181"

config.GetBaseConfig().ServiceDiscoveries[testName] = &config.ServiceDiscoveryConfig{
Protocol: "zookeeper",
Expand All @@ -71,6 +70,7 @@ func TestNewZookeeperServiceDiscovery(t *testing.T) {
_, err := newZookeeperServiceDiscovery(name)

// the ServiceDiscoveryConfig not found
// err: could not init the instance because the config is invalid
assert.NotNil(t, err)

sdc := &config.ServiceDiscoveryConfig{
Expand All @@ -81,10 +81,20 @@ func TestNewZookeeperServiceDiscovery(t *testing.T) {
_, err = newZookeeperServiceDiscovery(name)

// RemoteConfig not found
// err: could not find the remote config for name: mock
assert.NotNil(t, err)
}

func TestCURDZookeeperServiceDiscovery(t *testing.T) {
func TestZookeeperServiceDiscovery_CURDAndListener(t *testing.T) {
tc := prepareData(t)
defer func() {
_ = tc.Stop()
}()
t.Run("testCURDZookeeperServiceDiscovery", testCURDZookeeperServiceDiscovery)
t.Run("testAddListenerZookeeperServiceDiscovery", testAddListenerZookeeperServiceDiscovery)
}

func testCURDZookeeperServiceDiscovery(t *testing.T) {
prepareData(t)
extension.SetEventDispatcher("mock", func() observer.EventDispatcher {
return dispatcher.NewMockEventDispatcher()
Expand Down Expand Up @@ -164,10 +174,7 @@ func TestCURDZookeeperServiceDiscovery(t *testing.T) {
assert.Nil(t, err)
}

func TestAddListenerZookeeperServiceDiscovery(t *testing.T) {
defer func() {
_ = tc.Stop()
}()
func testAddListenerZookeeperServiceDiscovery(t *testing.T) {
sd, err := newZookeeperServiceDiscovery(testName)
assert.Nil(t, err)
defer func() {
Expand Down
10 changes: 6 additions & 4 deletions remoting/getty/getty_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ func testGetUser61(t *testing.T, c *Client) {

func testClient_AsyncCall(t *testing.T, client *Client) {
user := &User{}
lock := sync.Mutex{}
wg := sync.WaitGroup{}
request := remoting.NewRequest("2.0.2")
invocation := createInvocation("GetUser0", nil, nil, []interface{}{"4", nil, "username"},
[]reflect.Value{reflect.ValueOf("4"), reflect.ValueOf(nil), reflect.ValueOf("username")})
Expand All @@ -314,13 +314,13 @@ func testClient_AsyncCall(t *testing.T, client *Client) {
r := response.(remoting.AsyncCallbackResponse)
rst := *r.Reply.(*remoting.Response).Result.(*protocol.RPCResult)
assert.Equal(t, User{ID: "4", Name: "username"}, *(rst.Rest.(*User)))
lock.Unlock()
wg.Done()
}
lock.Lock()
wg.Add(1)
err := client.Request(request, 3*time.Second, rsp)
assert.NoError(t, err)
assert.Equal(t, User{}, *user)
time.Sleep(1 * time.Second)
wg.Done()
}

func InitTest(t *testing.T) (*Server, *common.URL) {
Expand Down Expand Up @@ -436,6 +436,8 @@ func (u *UserProvider) GetUser(ctx context.Context, req []interface{}, rsp *User
}

func (u *UserProvider) GetUser0(id string, k *User, name string) (User, error) {
// fix testClient_AsyncCall assertion bug(#1233)
time.Sleep(1 * time.Second)
return User{ID: id, Name: name}, nil
}

Expand Down
43 changes: 33 additions & 10 deletions remoting/zookeeper/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package zookeeper

import (
uatomic "go.uber.org/atomic"
"path"
"strings"
"sync"
Expand All @@ -38,13 +39,13 @@ import (
"dubbo.apache.org/dubbo-go/v3/remoting"
)

var defaultTTL = 15 * time.Minute
var defaultTTL = 10 * time.Minute

// nolint
type ZkEventListener struct {
client *gxzookeeper.ZookeeperClient
pathMapLock sync.Mutex
pathMap map[string]struct{}
pathMap map[string]*uatomic.Int32
wg sync.WaitGroup
exit chan struct{}
}
Expand All @@ -53,7 +54,7 @@ type ZkEventListener struct {
func NewZkEventListener(client *gxzookeeper.ZookeeperClient) *ZkEventListener {
return &ZkEventListener{
client: client,
pathMap: make(map[string]struct{}),
pathMap: make(map[string]*uatomic.Int32),
exit: make(chan struct{}),
}
}
Expand Down Expand Up @@ -81,6 +82,17 @@ func (l *ZkEventListener) ListenServiceNodeEvent(zkPath string, listener remotin
// nolint
func (l *ZkEventListener) listenServiceNodeEvent(zkPath string, listener ...remoting.DataListener) bool {
defer l.wg.Done()

l.pathMapLock.Lock()
a, ok := l.pathMap[zkPath]
if !ok || a.Load() > 1 {
l.pathMapLock.Unlock()
return false
}
a.Inc()
l.pathMapLock.Unlock()
defer a.Dec()

var zkEvent zk.Event
for {
keyEventCh, err := l.client.ExistW(zkPath)
Expand Down Expand Up @@ -158,9 +170,6 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li
newNode string
)
for _, n := range newChildren {
if contains(children, n) {
continue
}

newNode = path.Join(zkPath, n)
logger.Infof("add zkNode{%s}", newNode)
Expand All @@ -176,6 +185,7 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li
// listen l service node
l.wg.Add(1)
go func(node string, listener remoting.DataListener) {
// invoker l.wg.Done() in l.listenServiceNodeEvent
if l.listenServiceNodeEvent(node, listener) {
logger.Warnf("delete zkNode{%s}", node)
listener.DataChange(remoting.Event{Path: node, Action: remoting.EventTypeDel})
Expand Down Expand Up @@ -276,15 +286,15 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkPath string, listen
// Save the path to avoid listen repeatedly
l.pathMapLock.Lock()
_, ok := l.pathMap[dubboPath]
if !ok {
l.pathMap[dubboPath] = uatomic.NewInt32(0)
}
l.pathMapLock.Unlock()
if ok {
logger.Warnf("@zkPath %s has already been listened.", dubboPath)
continue
}

l.pathMapLock.Lock()
l.pathMap[dubboPath] = struct{}{}
l.pathMapLock.Unlock()
// When Zk disconnected, the Conn will be set to nil, so here need check the value of Conn
l.client.RLock()
if l.client.Conn == nil {
Expand All @@ -303,6 +313,7 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkPath string, listen
logger.Infof("listen dubbo service key{%s}", dubboPath)
l.wg.Add(1)
go func(zkPath string, listener remoting.DataListener) {
// invoker l.wg.Done() in l.listenServiceNodeEvent
if l.listenServiceNodeEvent(zkPath, listener) {
listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.EventTypeDel})
l.pathMapLock.Lock()
Expand All @@ -324,12 +335,24 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkPath string, listen
}
}
// Periodically update provider information
ticker := time.NewTicker(ttl)
tickerTTL := ttl
if tickerTTL > 20e9 {
tickerTTL = 20e9
}
ticker := time.NewTicker(tickerTTL)
WATCH:
for {
select {
case <-ticker.C:
l.handleZkNodeEvent(zkPath, children, listener)
if tickerTTL < ttl {
tickerTTL *= 2
if tickerTTL > ttl {
tickerTTL = ttl
}
ticker.Stop()
ticker = time.NewTicker(tickerTTL)
}
case zkEvent = <-childEventCh:
logger.Warnf("get a zookeeper childEventCh{type:%s, server:%s, path:%s, state:%d-%s, err:%s}",
zkEvent.Type.String(), zkEvent.Server, zkEvent.Path, zkEvent.State, gxzookeeper.StateToString(zkEvent.State), zkEvent.Err)
Expand Down

0 comments on commit 5c2e47e

Please sign in to comment.