Skip to content

Commit

Permalink
Merge pull request apache#7 from ztelur/williamfeng323-feature/regist…
Browse files Browse the repository at this point in the history
…ry-listener

add waitEventAndHandlePeriod and fix ci
  • Loading branch information
williamfeng323 authored Oct 2, 2021
2 parents 0e613e9 + 61f4546 commit ba830d5
Show file tree
Hide file tree
Showing 7 changed files with 108 additions and 243 deletions.
6 changes: 4 additions & 2 deletions integrate_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ make PROJECT_DIR=$P_DIR PIXIU_DIR=$PIXIU_DIR PROJECT_NAME=$(basename $P_DIR) BAS
sleep 0.5
# start server
make PROJECT_DIR=$P_DIR PIXIU_DIR=$PIXIU_DIR PROJECT_NAME=$(basename $P_DIR) BASE_DIR=$P_DIR/dist -f igt/Makefile start
sleep 0.5
sleep 2
# start server
make PROJECT_DIR=$P_DIR PIXIU_DIR=$PIXIU_DIR PROJECT_NAME=$(basename $P_DIR) BASE_DIR=$P_DIR/dist -f igt/Makefile buildPixiu
sleep 0.5
sleep 2
# start integration
make PROJECT_DIR=$P_DIR PIXIU_DIR=$PIXIU_DIR PROJECT_NAME=$(basename $P_DIR) BASE_DIR=$P_DIR/dist -f igt/Makefile integration
result=$?
Expand All @@ -38,4 +38,6 @@ make PROJECT_DIR=$P_DIR PIXIU_DIR=$PIXIU_DIR PROJECT_NAME=$(basename $P_DIR) BAS

make PROJECT_DIR=$P_DIR PIXIU_DIR=$PIXIU_DIR PROJECT_NAME=$(basename $P_DIR) BASE_DIR=$P_DIR/dist -f igt/Makefile docker-down

sleep 1

exit $((result))
Original file line number Diff line number Diff line change
Expand Up @@ -110,30 +110,33 @@ func (z *zkAppListener) watch() {
continue
}
failTimes = 0
tickerTTL := defaultTTL
ticker := time.NewTicker(tickerTTL)
z.handleEvent(children)
WATCH:
for {
select {
case <-ticker.C:
z.handleEvent(children)
case zkEvent := <-e:
logger.Warnf("get a zookeeper e{type:%s, server:%s, path:%s, state:%d-%s, err:%s}",
zkEvent.Type.String(), zkEvent.Server, zkEvent.Path, zkEvent.State, zookeeper.StateToString(zkEvent.State), zkEvent.Err)
ticker.Stop()
if zkEvent.Type != zk.EventNodeChildrenChanged {
break WATCH
}
z.handleEvent(children)
break WATCH
case <-z.exit:
logger.Warnf("listen(path{%s}) goroutine exit now...", z.servicesPath)
ticker.Stop()
return
}
if continueLoop := z.waitEventAndHandlePeriod(children, e); !continueLoop {
return
}
}
}

func (z *zkAppListener) waitEventAndHandlePeriod(children []string, e <-chan zk.Event) bool {
tickerTTL := defaultTTL
ticker := time.NewTicker(tickerTTL)
defer ticker.Stop()
z.handleEvent(children)
for {
select {
case <-ticker.C:
z.handleEvent(children)
case zkEvent := <-e:
logger.Warnf("get a zookeeper e{type:%s, server:%s, path:%s, state:%d-%s, err:%s}",
zkEvent.Type.String(), zkEvent.Server, zkEvent.Path, zkEvent.State, zookeeper.StateToString(zkEvent.State), zkEvent.Err)
if zkEvent.Type != zk.EventNodeChildrenChanged {
return true
}
z.handleEvent(children)
return true
case <-z.exit:
logger.Warnf("listen(path{%s}) goroutine exit now...", z.servicesPath)
return false
}
}
}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -94,33 +94,37 @@ func (asl *applicationServiceListener) WatchAndHandle() {
continue
}
failTimes = 0
tickerTTL := defaultTTL
ticker := time.NewTicker(tickerTTL)
asl.handleEvent(children)
WATCH:
for {
select {
case <-ticker.C:
asl.handleEvent(children)
case zkEvent := <-e:
logger.Warnf("get a zookeeper e{type:%s, server:%s, path:%s, state:%d-%s, err:%s}",
zkEvent.Type.String(), zkEvent.Server, zkEvent.Path, zkEvent.State, zookeeper.StateToString(zkEvent.State), zkEvent.Err)
ticker.Stop()
if zkEvent.Type != zk.EventNodeChildrenChanged {
break WATCH
}
asl.handleEvent(children)
break WATCH
case <-asl.exit:
logger.Warnf("listen(path{%s}) goroutine exit now...", asl.servicePath)
ticker.Stop()
return
}
if continueLoop := asl.waitEventAndHandlePeriod(children, e); !continueLoop {
return
}

}
}

func (asl *applicationServiceListener) waitEventAndHandlePeriod(children []string, e <-chan zk.Event) bool {
tickerTTL := defaultTTL
ticker := time.NewTicker(tickerTTL)
defer ticker.Stop()
asl.handleEvent(children)
for {
select {
case <-ticker.C:
asl.handleEvent(children)
case zkEvent := <-e:
logger.Warnf("get a zookeeper e{type:%s, server:%s, path:%s, state:%d-%s, err:%s}",
zkEvent.Type.String(), zkEvent.Server, zkEvent.Path, zkEvent.State, zookeeper.StateToString(zkEvent.State), zkEvent.Err)
if zkEvent.Type != zk.EventNodeChildrenChanged {
return true
}
asl.handleEvent(children)
return true
case <-asl.exit:
logger.Warnf("listen(path{%s}) goroutine exit now...", asl.servicePath)
return false
}
}
}

func (asl *applicationServiceListener) handleEvent(children []string) {
fetchChildren, err := asl.client.GetChildren(asl.servicePath)
if err != nil {
Expand Down
46 changes: 25 additions & 21 deletions pkg/adapter/dubboregistry/registry/zookeeper/interface_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,28 +101,32 @@ func (z *zkIntfListener) watch() {
continue
}
failTimes = 0
tickerTTL := defaultTTL
ticker := time.NewTicker(tickerTTL)
z.handleEvent(z.path)
WATCH:
for {
select {
case <-ticker.C:
z.handleEvent(z.path)
case zkEvent := <-e:
logger.Warnf("get a zookeeper e{type:%s, server:%s, path:%s, state:%d-%s, err:%s}",
zkEvent.Type.String(), zkEvent.Server, zkEvent.Path, zkEvent.State, zookeeper.StateToString(zkEvent.State), zkEvent.Err)
ticker.Stop()
if zkEvent.Type != zk.EventNodeChildrenChanged {
break WATCH
}
z.handleEvent(zkEvent.Path)
break WATCH
case <-z.exit:
logger.Warnf("listen(path{%s}) goroutine exit now...", z.path)
ticker.Stop()
return
if continueLoop := z.waitEventAndHandlePeriod(z.path, e); !continueLoop {
return
}
}
}

func (z *zkIntfListener) waitEventAndHandlePeriod(path string, e <-chan zk.Event) bool {
tickerTTL := defaultTTL
ticker := time.NewTicker(tickerTTL)
defer ticker.Stop()
z.handleEvent(z.path)
for {
select {
case <-ticker.C:
z.handleEvent(z.path)
case zkEvent := <-e:
logger.Warnf("get a zookeeper e{type:%s, server:%s, path:%s, state:%d-%s, err:%s}",
zkEvent.Type.String(), zkEvent.Server, zkEvent.Path, zkEvent.State, zookeeper.StateToString(zkEvent.State), zkEvent.Err)
if zkEvent.Type != zk.EventNodeChildrenChanged {
return true
}
z.handleEvent(zkEvent.Path)
return true
case <-z.exit:
logger.Warnf("listen(path{%s}) goroutine exit now...", z.path)
return false
}
}
}
Expand Down
2 changes: 0 additions & 2 deletions pkg/adapter/dubboregistry/registry/zookeeper/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,6 @@ func initZKListeners(reg *ZKRegistry) {
reg.zkListeners = make(map[registry.RegisteredType]registry.Listener)
reg.zkListeners[registry.RegisteredTypeInterface] = newZKIntfListener(reg.client, reg, reg.AdapterListener)
go reg.zkListeners[registry.RegisteredTypeInterface].WatchAndHandle()
reg.zkListeners[registry.RegisteredTypeApplication] = newZkAppListener(reg.client, reg, reg.AdapterListener)
go reg.zkListeners[registry.RegisteredTypeApplication].WatchAndHandle()
}

func (r *ZKRegistry) GetClient() *zk.ZooKeeperClient {
Expand Down
Loading

0 comments on commit ba830d5

Please sign in to comment.