Skip to content

Commit

Permalink
defeat too much node listen goroutine
Browse files Browse the repository at this point in the history
  • Loading branch information
AlexStocks committed Jun 2, 2021
1 parent cdae603 commit 0600ed2
Showing 1 changed file with 36 additions and 9 deletions.
45 changes: 36 additions & 9 deletions remoting/zookeeper/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import (
"github.com/dubbogo/go-zookeeper/zk"
gxzookeeper "github.com/dubbogo/gost/database/kv/zk"
perrors "github.com/pkg/errors"

uatomic "go.uber.org/atomic"
)

import (
Expand All @@ -46,7 +48,7 @@ var (
type ZkEventListener struct {
client *gxzookeeper.ZookeeperClient
pathMapLock sync.Mutex
pathMap map[string]struct{}
pathMap map[string]*uatomic.Int32
wg sync.WaitGroup
exit chan struct{}
}
Expand All @@ -55,7 +57,7 @@ type ZkEventListener struct {
func NewZkEventListener(client *gxzookeeper.ZookeeperClient) *ZkEventListener {
return &ZkEventListener{
client: client,
pathMap: make(map[string]struct{}),
pathMap: make(map[string]*uatomic.Int32),
exit: make(chan struct{}),
}
}
Expand All @@ -70,21 +72,41 @@ func (l *ZkEventListener) ListenServiceNodeEvent(zkPath string, listener remotin
// listen l service node
l.wg.Add(1)
go func(zkPath string, listener remoting.DataListener) {
var delFlag bool
if l.listenServiceNodeEvent(zkPath, listener) {
listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.EventTypeDel})
l.pathMapLock.Lock()
delete(l.pathMap, zkPath)
if a, ok := l.pathMap[zkPath]; ok && a.Load() <= 0 {
delFlag = true
delete(l.pathMap, zkPath)
}
l.pathMapLock.Unlock()
}
logger.Warnf("ListenServiceNodeEvent->listenSelf(zk path{%s}) goroutine exit now", zkPath)
logger.Warnf("ListenServiceNodeEvent->listenSelf(zk path{%s}) goroutine exit now, delFlag:%v", zkPath, delFlag)
}(zkPath, listener)
}

// nolint
func (l *ZkEventListener) listenServiceNodeEvent(zkPath string, listener ...remoting.DataListener) bool {
defer l.wg.Done()

l.pathMapLock.Lock()
a, ok := l.pathMap[zkPath]
if ok {
a.Inc()
}
l.pathMapLock.Unlock()

var zkEvent zk.Event
for {
l.pathMapLock.Lock()
a, ok := l.pathMap[zkPath]
l.pathMapLock.Unlock()
if ok && a.Load() > 1 {
a.Dec()
return false
}

keyEventCh, err := l.client.ExistW(zkPath)
if err != nil {
logger.Warnf("existW{key:%s} = error{%v}", zkPath, err)
Expand Down Expand Up @@ -174,6 +196,7 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li
// listen l service node
l.wg.Add(1)
go func(node string, listener remoting.DataListener) {
// invoker l.wg.Done() in l.listenServiceNodeEvent
if l.listenServiceNodeEvent(node, listener) {
logger.Warnf("delete zkNode{%s}", node)
listener.DataChange(remoting.Event{Path: node, Action: remoting.EventTypeDel})
Expand Down Expand Up @@ -271,15 +294,15 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkPath string, listen
// Save the path to avoid listen repeatedly
l.pathMapLock.Lock()
_, ok := l.pathMap[dubboPath]
if !ok {
l.pathMap[dubboPath] = uatomic.NewInt32(0)
}
l.pathMapLock.Unlock()
if ok {
logger.Warnf("@zkPath %s has already been listened.", dubboPath)
continue
}

l.pathMapLock.Lock()
l.pathMap[dubboPath] = struct{}{}
l.pathMapLock.Unlock()
// When Zk disconnected, the Conn will be set to nil, so here need check the value of Conn
l.client.RLock()
if l.client.Conn == nil {
Expand All @@ -298,13 +321,17 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkPath string, listen
logger.Infof("listen dubbo service key{%s}", dubboPath)
l.wg.Add(1)
go func(zkPath string, listener remoting.DataListener) {
var delFlag bool
if l.listenServiceNodeEvent(zkPath, listener) {
listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.EventTypeDel})
l.pathMapLock.Lock()
delete(l.pathMap, zkPath)
if a, ok := l.pathMap[zkPath]; ok && a.Load() <= 0 {
delFlag = true
delete(l.pathMap, zkPath)
}
l.pathMapLock.Unlock()
}
logger.Warnf("listenDirEvent->listenSelf(zk path{%s}) goroutine exit now", zkPath)
logger.Warnf("listenDirEvent->listenSelf(zk path{%s}) goroutine exit now, delFlag:%v", zkPath, delFlag)
}(dubboPath, listener)

// listen sub path recursive
Expand Down

0 comments on commit 0600ed2

Please sign in to comment.