Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix:#1143 Feature/reduce etcd registry conn; wait group modify #1297

Merged
Merged
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 5 additions & 46 deletions remoting/etcdv3/facade.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,11 @@ import (
)

import (
"github.com/apache/dubbo-getty"
gxetcd "github.com/dubbogo/gost/database/kv/etcd/v3"
perrors "github.com/pkg/errors"
)

import (
"dubbo.apache.org/dubbo-go/v3/common"
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/common/logger"
)

Expand All @@ -46,54 +43,16 @@ type clientFacade interface {

// HandleClientRestart keeps the connection between client and server
func HandleClientRestart(r clientFacade) {
var (
err error
failTimes int
)

defer r.WaitGroup().Done()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

r.WaitGroup()的done,wait和add使用散落在各处,remoting作为基础组件,是否可能有同学直接调用 HandleClientRestart ,然后函数结束出现 negative 的问题?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

有可能,用once限制一下HandleClientRestart的调用次数是否可以呢;该方法在init包被引入的时候调用,只调用一次。

LOOP:
for {
select {
case <-r.Client().GetCtx().Done():
r.RestartCallBack()
// re-register all services
time.Sleep(10 * time.Microsecond)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

为什么这里要Sleep?
我看RestartCallBack的逻辑是重新注册,下面的逻辑是重建client,并且有重试逻辑。使用RestartCallBack的目的是什么呢?

Copy link
Contributor Author

@WilliamLeaves WilliamLeaves Jul 9, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

代码实现参照zk对于连接过多的优化,做了类似的实现。
<-GetCtx().Done()意味着已经断连,需要调用RestartCallBack重新注册

case <-r.Done():
logger.Warnf("(ETCDV3ProviderRegistry)reconnectETCDV3 goroutine exit now...")
break LOOP
// re-register all services
case <-r.Client().Done():
r.ClientLock().Lock()
clientName := gxetcd.RegistryETCDV3Client
timeout, _ := time.ParseDuration(r.GetURL().GetParam(constant.REGISTRY_TIMEOUT_KEY, constant.DEFAULT_REG_TIMEOUT))
endpoints := r.Client().GetEndPoints()
r.Client().Close()
r.SetClient(nil)
r.ClientLock().Unlock()

// try to connect to etcd,
failTimes = 0
for {
select {
case <-r.Done():
logger.Warnf("(ETCDV3ProviderRegistry)reconnectETCDRegistry goroutine exit now...")
break LOOP
case <-getty.GetTimeWheel().After(timeSecondDuration(failTimes * gxetcd.ConnDelay)): // avoid connect frequent
}
err = ValidateClient(
r,
gxetcd.WithName(clientName),
gxetcd.WithEndpoints(endpoints...),
gxetcd.WithTimeout(timeout),
gxetcd.WithHeartbeat(1),
)
logger.Infof("ETCDV3ProviderRegistry.validateETCDV3Client(etcd Addr{%s}) = error{%#v}",
endpoints, perrors.WithStack(err))
if err == nil && r.RestartCallBack() {
break
}
failTimes++
if gxetcd.MaxFailTimes <= failTimes {
failTimes = gxetcd.MaxFailTimes
}
}
return
}
}
}