Skip to content

Commit

Permalink
[mesh]: Support nacos service registry
Browse files Browse the repository at this point in the history
  • Loading branch information
xxx7xxxx committed Aug 24, 2021
1 parent 13b4a27 commit 299989b
Show file tree
Hide file tree
Showing 12 changed files with 1,390 additions and 437 deletions.
10 changes: 10 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRF
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d h1:UQZhZ2O0vMHr2cI+DC1Mbh0TJxzA3RcLoMsFw+aXw7E=
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho=
github.com/alexflint/go-filemutex v0.0.0-20171022225611-72bdc8eae2ae/go.mod h1:CgnQgUtFrFz9mxFNtED3jI5tLDjKlOM+oUF/sTk6ps0=
github.com/aliyun/alibaba-cloud-sdk-go v1.61.18 h1:zOVTBdCKFd9JbCKz9/nt+FovbjPFmb7mUnp8nH9fQBA=
github.com/aliyun/alibaba-cloud-sdk-go v1.61.18/go.mod h1:v8ESoHo4SyHmuB4b1tJqDHxfTGEciD+yhvOU/5s1Rfk=
github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883/go.mod h1:rCTlJbsFo29Kk6CurOXKm700vrz8f0KW0JNfpkRJY/8=
github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239/go.mod h1:2FmKhYUyUczH0OGQWaF5ceTx0UBShxjsH6f8oGKYe2c=
Expand Down Expand Up @@ -195,6 +196,7 @@ github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b/go.mod h1:ac9efd0D
github.com/bradfitz/go-smtpd v0.0.0-20170404230938-deb6d6237625/go.mod h1:HYsPBTaaSFSlLx/70C2HPIMNZpVV8+vt/A+FMnYP11g=
github.com/bshuster-repo/logrus-logstash-hook v0.4.1/go.mod h1:zsTqEiSzDgAa/8GZR7E1qaXrhYNDKBYy5/dWPTIflbk=
github.com/buger/jsonparser v0.0.0-20180808090653-f4dd9f5a6b44/go.mod h1:bbYlZJ7hK1yFx9hf58LP0zeX7UjIGs20ufpu3evjr+s=
github.com/buger/jsonparser v0.0.0-20181115193947-bf1c66bbce23 h1:D21IyuvjDCshj1/qq+pCNd3VZOAEI9jy6Bi131YlXgI=
github.com/buger/jsonparser v0.0.0-20181115193947-bf1c66bbce23/go.mod h1:bbYlZJ7hK1yFx9hf58LP0zeX7UjIGs20ufpu3evjr+s=
github.com/bugsnag/bugsnag-go v0.0.0-20141110184014-b1d153021fcd/go.mod h1:2oa8nejYd4cQ/b0hMIopN0lCRxU0bueqREvZLWFrtK8=
github.com/bugsnag/osext v0.0.0-20130617224835-0dd3f918b21b/go.mod h1:obH5gd0BsqsP2LwDJ9aOkm/6J86V6lyAXCoQWGw3K50=
Expand Down Expand Up @@ -430,6 +432,7 @@ github.com/facebookgo/stack v0.0.0-20160209184415-751773369052 h1:JWuenKqqX8nojt
github.com/facebookgo/stack v0.0.0-20160209184415-751773369052/go.mod h1:UbMTZqLaRiH3MsBH8va0n7s1pQYcu3uTb8G4tygF4Zg=
github.com/facebookgo/subset v0.0.0-20200203212716-c811ad88dec4 h1:7HZCaLC5+BZpmbhCOZJ293Lz68O7PYrF2EzeiFMwCLk=
github.com/facebookgo/subset v0.0.0-20200203212716-c811ad88dec4/go.mod h1:5tD+neXqOorC30/tWg0LCSkrqj/AR6gu8yY8/fpw1q0=
github.com/fastly/go-utils v0.0.0-20180712184237-d95a45783239 h1:Ghm4eQYC0nEPnSJdVkTrXpu9KtoVCSo1hg7mtI7G9KU=
github.com/fastly/go-utils v0.0.0-20180712184237-d95a45783239/go.mod h1:Gdwt2ce0yfBxPvZrHkprdPPTTS3N5rwmLE8T22KBXlw=
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
github.com/fatih/color v1.9.0 h1:8xPHl4/q1VyqGIPif1F+1V3Y3lSmrq01EabUW3CoW5s=
Expand Down Expand Up @@ -460,6 +463,7 @@ github.com/globalsign/mgo v0.0.0-20180905125535-1ca0a4f7cbcb/go.mod h1:xkRDCp4j0
github.com/globalsign/mgo v0.0.0-20181015135952-eeefdecb41b8/go.mod h1:xkRDCp4j0OGD1HRkm4kmhM+pmpv3AKq5SU7GMg4oO/Q=
github.com/go-chi/chi/v5 v5.0.3 h1:khYQBdPivkYG1s1TAzDQG1f6eX4kD2TItYVZexL5rS4=
github.com/go-chi/chi/v5 v5.0.3/go.mod h1:DslCQbL2OYiznFReuXYUmQ2hGd1aDpCnlMNITLSKoi8=
github.com/go-errors/errors v1.0.1 h1:LUHzmkK3GUKUrL/1gfBUxAHzcev3apQlezX/+O7ma6w=
github.com/go-errors/errors v1.0.1/go.mod h1:f4zRHt4oKfwPJE5k8C9vpYG+aDHdBFUsgrm6/TyX73Q=
github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU=
github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8=
Expand Down Expand Up @@ -783,6 +787,7 @@ github.com/jcmturner/gokrb5/v8 v8.4.2 h1:6ZIM6b/JJN0X8UM43ZOM6Z4SJzla+a/u7scXFJz
github.com/jcmturner/gokrb5/v8 v8.4.2/go.mod h1:sb+Xq/fTY5yktf/VxLsE3wlfPqQjp0aWNYyvBVK62bc=
github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZY=
github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc=
github.com/jehiah/go-strftime v0.0.0-20171201141054-1d33003b3869 h1:IPJ3dvxmJ4uczJe5YQdrYB16oTJlGSC/OyZDqUk9xX4=
github.com/jehiah/go-strftime v0.0.0-20171201141054-1d33003b3869/go.mod h1:cJ6Cj7dQo+O6GJNiMx+Pa94qKj+TG8ONdKHgMNIyyag=
github.com/jellevandenhooff/dkim v0.0.0-20150330215556-f50fe3d243e1/go.mod h1:E0B/fFc00Y+Rasa88328GlI/XbtyysCtTHZS8h7IrBU=
github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
Expand Down Expand Up @@ -844,8 +849,11 @@ github.com/kr/pty v1.1.5/go.mod h1:9r2w37qlBe7rQ6e1fg1S/9xpWHSnaqNdHD3WcMdbPDA=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/lestrrat/go-envload v0.0.0-20180220120943-6ed08b54a570 h1:0iQektZGS248WXmGIYOwRXSQhD4qn3icjMpuxwO7qlo=
github.com/lestrrat/go-envload v0.0.0-20180220120943-6ed08b54a570/go.mod h1:BLt8L9ld7wVsvEWQbuLrUZnCMnUmLZ+CGDzKtclrTlE=
github.com/lestrrat/go-file-rotatelogs v0.0.0-20180223000712-d3151e2a480f h1:sgUSP4zdTUZYZgAGGtN5Lxk92rK+JUFOwf+FT99EEI4=
github.com/lestrrat/go-file-rotatelogs v0.0.0-20180223000712-d3151e2a480f/go.mod h1:UGmTpUd3rjbtfIpwAPrcfmGf/Z1HS95TATB+m57TPB8=
github.com/lestrrat/go-strftime v0.0.0-20180220042222-ba3bf9c1d042 h1:Bvq8AziQ5jFF4BHGAEDSqwPW1NJS3XshxbRCxtjFAZc=
github.com/lestrrat/go-strftime v0.0.0-20180220042222-ba3bf9c1d042/go.mod h1:TPpsiPUEh0zFL1Snz4crhMlBe60PYxRHr5oFF3rRYg0=
github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de h1:9TO3cAIGXtEhnIaL+V+BEER86oLrvS+kWobKpbJuye0=
github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de/go.mod h1:zAbeS9B/r2mtpb6U+EI2rYA5OAXxsYw6wTamcNW+zcE=
Expand Down Expand Up @@ -1253,6 +1261,7 @@ github.com/tarm/serial v0.0.0-20180830185346-98f6abe2eb07/go.mod h1:kDXzergiv9cb
github.com/tchap/go-patricia v2.2.6+incompatible/go.mod h1:bmLyhP68RS6kStMGxByiQ23RP/odRBOTVjwp2cDyi6I=
github.com/tcnksm/go-httpstat v0.2.1-0.20191008022543-e866bb274419 h1:elOIj31UL4RZWgLfLV4pWZA0j5QqGO95/Dll2WIwOZU=
github.com/tcnksm/go-httpstat v0.2.1-0.20191008022543-e866bb274419/go.mod h1:s3JVJFtQxtBEBC9dwcdTTXS9xFnM3SXAZwPG41aurT8=
github.com/tebeka/strftime v0.1.3 h1:5HQXOqWKYRFfNyBMNVc9z5+QzuBtIXy03psIhtdJYto=
github.com/tebeka/strftime v0.1.3/go.mod h1:7wJm3dZlpr4l/oVK0t1HYIc4rMzQ2XJlOMIUJUJH6XQ=
github.com/tidwall/gjson v1.8.0 h1:Qt+orfosKn0rbNTZqHYDqBrmm3UDA4KRkv70fDzG+PQ=
github.com/tidwall/gjson v1.8.0/go.mod h1:5/xDoumyyDNerp2U36lyolv46b3uF/9Bu6OfyQ9GImk=
Expand All @@ -1267,6 +1276,7 @@ github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802 h1:uruHq4
github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
github.com/tomasen/realip v0.0.0-20180522021738-f0c99a92ddce h1:fb190+cK2Xz/dvi9Hv8eCYJYvIGUTN2/KLq1pT6CjEc=
github.com/tomasen/realip v0.0.0-20180522021738-f0c99a92ddce/go.mod h1:o8v6yHRoik09Xen7gje4m9ERNah1d1PPsVq1VEx9vE4=
github.com/toolkits/concurrent v0.0.0-20150624120057-a4371d70e3e3 h1:kF/7m/ZU+0D4Jj5eZ41Zm3IH/J8OElK1Qtd7tVKAwLk=
github.com/toolkits/concurrent v0.0.0-20150624120057-a4371d70e3e3/go.mod h1:QDlpd3qS71vYtakd2hmdpqhJ9nwv6mD6A30bQ1BPBFE=
github.com/tsenart/go-tsz v0.0.0-20180814232043-cdeb9e1e981e/go.mod h1:SWZznP1z5Ki7hDT2ioqiFKEse8K9tU2OUvaRI0NeGQo=
github.com/tsenart/go-tsz v0.0.0-20180814235614-0bd30b3df1c3/go.mod h1:SWZznP1z5Ki7hDT2ioqiFKEse8K9tU2OUvaRI0NeGQo=
Expand Down
68 changes: 34 additions & 34 deletions pkg/filter/proxy/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ const (
type (
servers struct {
poolSpec *PoolSpec
super *supervisor.Supervisor

mutex sync.Mutex
serviceRegistry *serviceregistry.ServiceRegistry
Expand Down Expand Up @@ -95,6 +96,7 @@ func (lb LoadBalance) Validate() error {
func newServers(super *supervisor.Supervisor, poolSpec *PoolSpec) *servers {
s := &servers{
poolSpec: poolSpec,
super: super,
done: make(chan struct{}),
}

Expand All @@ -104,74 +106,68 @@ func newServers(super *supervisor.Supervisor, poolSpec *PoolSpec) *servers {
return s
}

s.serviceRegistry = super.MustGetSystemController(serviceregistry.Kind).
Instance().(*serviceregistry.ServiceRegistry)
s.serviceWatcher = s.serviceRegistry.NewServiceWatcher(poolSpec.ServiceRegistry, poolSpec.ServiceName)

go s.watchService()

return s
}

func (s *servers) watchService() {
serviceSpec, err := s.serviceRegistry.GetService(s.poolSpec.ServiceRegistry, s.poolSpec.ServiceName)
if err != nil {
logger.Warnf("ger service %s/%s failed: %v",
s.poolSpec.ServiceRegistry, s.poolSpec.ServiceName, err)
}
s.tryUseService()

s.useService(serviceSpec)
s.serviceRegistry = s.super.MustGetSystemController(serviceregistry.Kind).
Instance().(*serviceregistry.ServiceRegistry)
s.serviceWatcher = s.serviceRegistry.NewServiceWatcher(s.poolSpec.ServiceRegistry, s.poolSpec.ServiceName)

for {
select {
case <-s.done:
return
case event := <-s.serviceWatcher.Watch():
s.handleServiceEvent(event)
s.handleEvent(event)
}
}
}

func (s *servers) handleServiceEvent(event *serviceregistry.ServiceEvent) {
if event.Delete != nil {
logger.Warnf("service %s delete, use static servers", s.poolSpec.ServiceName)
s.useStaticServers()
return
}
func (s *servers) handleEvent(event *serviceregistry.ServiceEvent) {
s.useService(event.Instances)
}

err := s.useService(event.Apply)
func (s *servers) tryUseService() {
serviceInstanceSpecs, err := s.serviceRegistry.ListServiceInstances(s.poolSpec.ServiceRegistry, s.poolSpec.ServiceName)
if err != nil {
logger.Warnf("use service %s failed: %v", s.poolSpec.ServiceName, err)
logger.Errorf("get service %s/%s failed: %v",
s.poolSpec.ServiceRegistry, s.poolSpec.ServiceName, err)
s.useStaticServers()
return
}
s.useService(serviceInstanceSpecs)
}

func (s *servers) useStaticServers() {
s.mutex.Lock()
defer s.mutex.Unlock()
s.static = newStaticServers(s.poolSpec.Servers,
s.poolSpec.ServersTags,
s.poolSpec.LoadBalance)
}

func (s *servers) useService(serviceSpec *serviceregistry.ServiceSpec) error {
func (s *servers) useService(serviceInstanceSpecs map[string]*serviceregistry.ServiceInstanceSpec) {
var servers []*Server
for _, serviceInstance := range serviceSpec.Instances {
for _, instance := range serviceInstanceSpecs {
servers = append(servers, &Server{
URL: serviceInstance.URL(),
Tags: serviceInstance.Tags,
Weight: serviceInstance.Weight,
URL: instance.URL(),
Tags: instance.Tags,
Weight: instance.Weight,
})
}
if len(servers) == 0 {
return fmt.Errorf("empty service instance")
logger.Errorf("%s/%s: empty service instance",
s.poolSpec.ServiceRegistry, s.poolSpec.ServiceName)
s.useStaticServers()
return
}

s.mutex.Lock()
defer s.mutex.Unlock()
s.static = newStaticServers(servers, s.poolSpec.ServersTags, s.poolSpec.LoadBalance)
}

return nil
func (s *servers) useStaticServers() {
s.mutex.Lock()
defer s.mutex.Unlock()
s.static = newStaticServers(s.poolSpec.Servers, s.poolSpec.ServersTags, s.poolSpec.LoadBalance)
}

func (s *servers) snapshot() *staticServers {
Expand Down Expand Up @@ -206,6 +202,10 @@ func (s *servers) close() {
}

func newStaticServers(servers []*Server, tags []string, lb *LoadBalance) *staticServers {
if servers == nil {
servers = make([]*Server, 0)
}

ss := &staticServers{}
if lb == nil {
ss.lb.Policy = PolicyRoundRobin
Expand Down
Loading

0 comments on commit 299989b

Please sign in to comment.