diff --git a/go.sum b/go.sum index a10c455d6d..b5cfd87f2c 100644 --- a/go.sum +++ b/go.sum @@ -147,6 +147,7 @@ github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRF github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d h1:UQZhZ2O0vMHr2cI+DC1Mbh0TJxzA3RcLoMsFw+aXw7E= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= github.com/alexflint/go-filemutex v0.0.0-20171022225611-72bdc8eae2ae/go.mod h1:CgnQgUtFrFz9mxFNtED3jI5tLDjKlOM+oUF/sTk6ps0= +github.com/aliyun/alibaba-cloud-sdk-go v1.61.18 h1:zOVTBdCKFd9JbCKz9/nt+FovbjPFmb7mUnp8nH9fQBA= github.com/aliyun/alibaba-cloud-sdk-go v1.61.18/go.mod h1:v8ESoHo4SyHmuB4b1tJqDHxfTGEciD+yhvOU/5s1Rfk= github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883/go.mod h1:rCTlJbsFo29Kk6CurOXKm700vrz8f0KW0JNfpkRJY/8= github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239/go.mod h1:2FmKhYUyUczH0OGQWaF5ceTx0UBShxjsH6f8oGKYe2c= @@ -195,6 +196,7 @@ github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b/go.mod h1:ac9efd0D github.com/bradfitz/go-smtpd v0.0.0-20170404230938-deb6d6237625/go.mod h1:HYsPBTaaSFSlLx/70C2HPIMNZpVV8+vt/A+FMnYP11g= github.com/bshuster-repo/logrus-logstash-hook v0.4.1/go.mod h1:zsTqEiSzDgAa/8GZR7E1qaXrhYNDKBYy5/dWPTIflbk= github.com/buger/jsonparser v0.0.0-20180808090653-f4dd9f5a6b44/go.mod h1:bbYlZJ7hK1yFx9hf58LP0zeX7UjIGs20ufpu3evjr+s= +github.com/buger/jsonparser v0.0.0-20181115193947-bf1c66bbce23 h1:D21IyuvjDCshj1/qq+pCNd3VZOAEI9jy6Bi131YlXgI= github.com/buger/jsonparser v0.0.0-20181115193947-bf1c66bbce23/go.mod h1:bbYlZJ7hK1yFx9hf58LP0zeX7UjIGs20ufpu3evjr+s= github.com/bugsnag/bugsnag-go v0.0.0-20141110184014-b1d153021fcd/go.mod h1:2oa8nejYd4cQ/b0hMIopN0lCRxU0bueqREvZLWFrtK8= github.com/bugsnag/osext v0.0.0-20130617224835-0dd3f918b21b/go.mod h1:obH5gd0BsqsP2LwDJ9aOkm/6J86V6lyAXCoQWGw3K50= @@ -430,6 +432,7 @@ github.com/facebookgo/stack v0.0.0-20160209184415-751773369052 h1:JWuenKqqX8nojt github.com/facebookgo/stack v0.0.0-20160209184415-751773369052/go.mod h1:UbMTZqLaRiH3MsBH8va0n7s1pQYcu3uTb8G4tygF4Zg= github.com/facebookgo/subset v0.0.0-20200203212716-c811ad88dec4 h1:7HZCaLC5+BZpmbhCOZJ293Lz68O7PYrF2EzeiFMwCLk= github.com/facebookgo/subset v0.0.0-20200203212716-c811ad88dec4/go.mod h1:5tD+neXqOorC30/tWg0LCSkrqj/AR6gu8yY8/fpw1q0= +github.com/fastly/go-utils v0.0.0-20180712184237-d95a45783239 h1:Ghm4eQYC0nEPnSJdVkTrXpu9KtoVCSo1hg7mtI7G9KU= github.com/fastly/go-utils v0.0.0-20180712184237-d95a45783239/go.mod h1:Gdwt2ce0yfBxPvZrHkprdPPTTS3N5rwmLE8T22KBXlw= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= github.com/fatih/color v1.9.0 h1:8xPHl4/q1VyqGIPif1F+1V3Y3lSmrq01EabUW3CoW5s= @@ -460,6 +463,7 @@ github.com/globalsign/mgo v0.0.0-20180905125535-1ca0a4f7cbcb/go.mod h1:xkRDCp4j0 github.com/globalsign/mgo v0.0.0-20181015135952-eeefdecb41b8/go.mod h1:xkRDCp4j0OGD1HRkm4kmhM+pmpv3AKq5SU7GMg4oO/Q= github.com/go-chi/chi/v5 v5.0.3 h1:khYQBdPivkYG1s1TAzDQG1f6eX4kD2TItYVZexL5rS4= github.com/go-chi/chi/v5 v5.0.3/go.mod h1:DslCQbL2OYiznFReuXYUmQ2hGd1aDpCnlMNITLSKoi8= +github.com/go-errors/errors v1.0.1 h1:LUHzmkK3GUKUrL/1gfBUxAHzcev3apQlezX/+O7ma6w= github.com/go-errors/errors v1.0.1/go.mod h1:f4zRHt4oKfwPJE5k8C9vpYG+aDHdBFUsgrm6/TyX73Q= github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU= github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= @@ -783,6 +787,7 @@ github.com/jcmturner/gokrb5/v8 v8.4.2 h1:6ZIM6b/JJN0X8UM43ZOM6Z4SJzla+a/u7scXFJz github.com/jcmturner/gokrb5/v8 v8.4.2/go.mod h1:sb+Xq/fTY5yktf/VxLsE3wlfPqQjp0aWNYyvBVK62bc= github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZY= github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc= +github.com/jehiah/go-strftime v0.0.0-20171201141054-1d33003b3869 h1:IPJ3dvxmJ4uczJe5YQdrYB16oTJlGSC/OyZDqUk9xX4= github.com/jehiah/go-strftime v0.0.0-20171201141054-1d33003b3869/go.mod h1:cJ6Cj7dQo+O6GJNiMx+Pa94qKj+TG8ONdKHgMNIyyag= github.com/jellevandenhooff/dkim v0.0.0-20150330215556-f50fe3d243e1/go.mod h1:E0B/fFc00Y+Rasa88328GlI/XbtyysCtTHZS8h7IrBU= github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= @@ -844,8 +849,11 @@ github.com/kr/pty v1.1.5/go.mod h1:9r2w37qlBe7rQ6e1fg1S/9xpWHSnaqNdHD3WcMdbPDA= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/lestrrat/go-envload v0.0.0-20180220120943-6ed08b54a570 h1:0iQektZGS248WXmGIYOwRXSQhD4qn3icjMpuxwO7qlo= github.com/lestrrat/go-envload v0.0.0-20180220120943-6ed08b54a570/go.mod h1:BLt8L9ld7wVsvEWQbuLrUZnCMnUmLZ+CGDzKtclrTlE= +github.com/lestrrat/go-file-rotatelogs v0.0.0-20180223000712-d3151e2a480f h1:sgUSP4zdTUZYZgAGGtN5Lxk92rK+JUFOwf+FT99EEI4= github.com/lestrrat/go-file-rotatelogs v0.0.0-20180223000712-d3151e2a480f/go.mod h1:UGmTpUd3rjbtfIpwAPrcfmGf/Z1HS95TATB+m57TPB8= +github.com/lestrrat/go-strftime v0.0.0-20180220042222-ba3bf9c1d042 h1:Bvq8AziQ5jFF4BHGAEDSqwPW1NJS3XshxbRCxtjFAZc= github.com/lestrrat/go-strftime v0.0.0-20180220042222-ba3bf9c1d042/go.mod h1:TPpsiPUEh0zFL1Snz4crhMlBe60PYxRHr5oFF3rRYg0= github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de h1:9TO3cAIGXtEhnIaL+V+BEER86oLrvS+kWobKpbJuye0= github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de/go.mod h1:zAbeS9B/r2mtpb6U+EI2rYA5OAXxsYw6wTamcNW+zcE= @@ -1253,6 +1261,7 @@ github.com/tarm/serial v0.0.0-20180830185346-98f6abe2eb07/go.mod h1:kDXzergiv9cb github.com/tchap/go-patricia v2.2.6+incompatible/go.mod h1:bmLyhP68RS6kStMGxByiQ23RP/odRBOTVjwp2cDyi6I= github.com/tcnksm/go-httpstat v0.2.1-0.20191008022543-e866bb274419 h1:elOIj31UL4RZWgLfLV4pWZA0j5QqGO95/Dll2WIwOZU= github.com/tcnksm/go-httpstat v0.2.1-0.20191008022543-e866bb274419/go.mod h1:s3JVJFtQxtBEBC9dwcdTTXS9xFnM3SXAZwPG41aurT8= +github.com/tebeka/strftime v0.1.3 h1:5HQXOqWKYRFfNyBMNVc9z5+QzuBtIXy03psIhtdJYto= github.com/tebeka/strftime v0.1.3/go.mod h1:7wJm3dZlpr4l/oVK0t1HYIc4rMzQ2XJlOMIUJUJH6XQ= github.com/tidwall/gjson v1.8.0 h1:Qt+orfosKn0rbNTZqHYDqBrmm3UDA4KRkv70fDzG+PQ= github.com/tidwall/gjson v1.8.0/go.mod h1:5/xDoumyyDNerp2U36lyolv46b3uF/9Bu6OfyQ9GImk= @@ -1267,6 +1276,7 @@ github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802 h1:uruHq4 github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= github.com/tomasen/realip v0.0.0-20180522021738-f0c99a92ddce h1:fb190+cK2Xz/dvi9Hv8eCYJYvIGUTN2/KLq1pT6CjEc= github.com/tomasen/realip v0.0.0-20180522021738-f0c99a92ddce/go.mod h1:o8v6yHRoik09Xen7gje4m9ERNah1d1PPsVq1VEx9vE4= +github.com/toolkits/concurrent v0.0.0-20150624120057-a4371d70e3e3 h1:kF/7m/ZU+0D4Jj5eZ41Zm3IH/J8OElK1Qtd7tVKAwLk= github.com/toolkits/concurrent v0.0.0-20150624120057-a4371d70e3e3/go.mod h1:QDlpd3qS71vYtakd2hmdpqhJ9nwv6mD6A30bQ1BPBFE= github.com/tsenart/go-tsz v0.0.0-20180814232043-cdeb9e1e981e/go.mod h1:SWZznP1z5Ki7hDT2ioqiFKEse8K9tU2OUvaRI0NeGQo= github.com/tsenart/go-tsz v0.0.0-20180814235614-0bd30b3df1c3/go.mod h1:SWZznP1z5Ki7hDT2ioqiFKEse8K9tU2OUvaRI0NeGQo= diff --git a/pkg/filter/proxy/server.go b/pkg/filter/proxy/server.go index a0a5e6e9fb..df8179420c 100644 --- a/pkg/filter/proxy/server.go +++ b/pkg/filter/proxy/server.go @@ -50,6 +50,7 @@ const ( type ( servers struct { poolSpec *PoolSpec + super *supervisor.Supervisor mutex sync.Mutex serviceRegistry *serviceregistry.ServiceRegistry @@ -95,6 +96,7 @@ func (lb LoadBalance) Validate() error { func newServers(super *supervisor.Supervisor, poolSpec *PoolSpec) *servers { s := &servers{ poolSpec: poolSpec, + super: super, done: make(chan struct{}), } @@ -104,74 +106,68 @@ func newServers(super *supervisor.Supervisor, poolSpec *PoolSpec) *servers { return s } - s.serviceRegistry = super.MustGetSystemController(serviceregistry.Kind). - Instance().(*serviceregistry.ServiceRegistry) - s.serviceWatcher = s.serviceRegistry.NewServiceWatcher(poolSpec.ServiceRegistry, poolSpec.ServiceName) - go s.watchService() return s } func (s *servers) watchService() { - serviceSpec, err := s.serviceRegistry.GetService(s.poolSpec.ServiceRegistry, s.poolSpec.ServiceName) - if err != nil { - logger.Warnf("ger service %s/%s failed: %v", - s.poolSpec.ServiceRegistry, s.poolSpec.ServiceName, err) - } + s.tryUseService() - s.useService(serviceSpec) + s.serviceRegistry = s.super.MustGetSystemController(serviceregistry.Kind). + Instance().(*serviceregistry.ServiceRegistry) + s.serviceWatcher = s.serviceRegistry.NewServiceWatcher(s.poolSpec.ServiceRegistry, s.poolSpec.ServiceName) for { select { case <-s.done: return case event := <-s.serviceWatcher.Watch(): - s.handleServiceEvent(event) + s.handleEvent(event) } } } -func (s *servers) handleServiceEvent(event *serviceregistry.ServiceEvent) { - if event.Delete != nil { - logger.Warnf("service %s delete, use static servers", s.poolSpec.ServiceName) - s.useStaticServers() - return - } +func (s *servers) handleEvent(event *serviceregistry.ServiceEvent) { + s.useService(event.Instances) +} - err := s.useService(event.Apply) +func (s *servers) tryUseService() { + serviceInstanceSpecs, err := s.serviceRegistry.ListServiceInstances(s.poolSpec.ServiceRegistry, s.poolSpec.ServiceName) if err != nil { - logger.Warnf("use service %s failed: %v", s.poolSpec.ServiceName, err) + logger.Errorf("get service %s/%s failed: %v", + s.poolSpec.ServiceRegistry, s.poolSpec.ServiceName, err) + s.useStaticServers() return } + s.useService(serviceInstanceSpecs) } -func (s *servers) useStaticServers() { - s.mutex.Lock() - defer s.mutex.Unlock() - s.static = newStaticServers(s.poolSpec.Servers, - s.poolSpec.ServersTags, - s.poolSpec.LoadBalance) -} - -func (s *servers) useService(serviceSpec *serviceregistry.ServiceSpec) error { +func (s *servers) useService(serviceInstanceSpecs map[string]*serviceregistry.ServiceInstanceSpec) { var servers []*Server - for _, serviceInstance := range serviceSpec.Instances { + for _, instance := range serviceInstanceSpecs { servers = append(servers, &Server{ - URL: serviceInstance.URL(), - Tags: serviceInstance.Tags, - Weight: serviceInstance.Weight, + URL: instance.URL(), + Tags: instance.Tags, + Weight: instance.Weight, }) } if len(servers) == 0 { - return fmt.Errorf("empty service instance") + logger.Errorf("%s/%s: empty service instance", + s.poolSpec.ServiceRegistry, s.poolSpec.ServiceName) + s.useStaticServers() + return } s.mutex.Lock() defer s.mutex.Unlock() s.static = newStaticServers(servers, s.poolSpec.ServersTags, s.poolSpec.LoadBalance) +} - return nil +func (s *servers) useStaticServers() { + s.mutex.Lock() + defer s.mutex.Unlock() + s.static = newStaticServers(s.poolSpec.Servers, s.poolSpec.ServersTags, s.poolSpec.LoadBalance) } func (s *servers) snapshot() *staticServers { @@ -206,6 +202,10 @@ func (s *servers) close() { } func newStaticServers(servers []*Server, tags []string, lb *LoadBalance) *staticServers { + if servers == nil { + servers = make([]*Server, 0) + } + ss := &staticServers{} if lb == nil { ss.lb.Policy = PolicyRoundRobin diff --git a/pkg/object/consulserviceregistry/consulserviceregistry.go b/pkg/object/consulserviceregistry/consulserviceregistry.go index f0a966052d..0ed33a3259 100644 --- a/pkg/object/consulserviceregistry/consulserviceregistry.go +++ b/pkg/object/consulserviceregistry/consulserviceregistry.go @@ -18,6 +18,7 @@ package consulserviceregistry import ( + "fmt" "sync" "time" @@ -34,6 +35,11 @@ const ( // Kind is the kind of ConsulServiceRegistry. Kind = "ConsulServiceRegistry" + + // MetaKeyRegistryName is the key of service metadata. + // NOTE: Namespace is only available for Consul Enterprise, + // instead we use this field to work around. + MetaKeyRegistryName = "RegistryName" ) func init() { @@ -48,14 +54,14 @@ type ( serviceRegistry *serviceregistry.ServiceRegistry firstDone bool - serviceSpecs map[string]*serviceregistry.ServiceSpec + instances map[string]*serviceregistry.ServiceInstanceSpec notify chan *serviceregistry.RegistryEvent clientMutex sync.RWMutex client *api.Client - statusMutex sync.Mutex - serviceInstancesNum map[string]int + statusMutex sync.Mutex + instancesNum map[string]int done chan struct{} } @@ -74,7 +80,7 @@ type ( // Status is the status of ConsulServiceRegistry. Status struct { Health string `yaml:"health"` - ServiceInstancesNum map[string]int `yaml:"serviceInstancesNum"` + ServiceInstancesNum map[string]int `yaml:"instancesNum"` } ) @@ -114,8 +120,9 @@ func (c *ConsulServiceRegistry) reload() { Instance().(*serviceregistry.ServiceRegistry) c.serviceRegistry.RegisterRegistry(c) c.notify = make(chan *serviceregistry.RegistryEvent, 10) + c.firstDone = false - c.serviceInstancesNum = map[string]int{} + c.instancesNum = map[string]int{} c.done = make(chan struct{}) _, err := c.getClient() @@ -158,6 +165,7 @@ func (c *ConsulServiceRegistry) buildClient() (*api.Client, error) { if config.Token != "" { config.Token = c.spec.Token } + if config.Namespace != "" { config.Namespace = c.spec.Namespace } @@ -204,83 +212,33 @@ func (c *ConsulServiceRegistry) run() { } func (c *ConsulServiceRegistry) update() { - client, err := c.getClient() + instances, err := c.ListAllServiceInstances() if err != nil { - logger.Errorf("%s get consul client failed: %v", - c.superSpec.Name(), err) + logger.Errorf("list all service instances failed: %v", err) return } - q := &api.QueryOptions{ - Namespace: c.spec.Namespace, - Datacenter: c.spec.Datacenter, - } - catalog := client.Catalog() - - resp, _, err := catalog.Services(q) - if err != nil { - logger.Errorf("%s pull catalog services failed: %v", - c.superSpec.Name(), err) - return - } - - serviceSpecs := make(map[string]*serviceregistry.ServiceSpec) - serviceInstancesNum := map[string]int{} - for serviceName := range resp { - services, _, err := catalog.ServiceMultipleTags(serviceName, - c.spec.ServiceTags, q) - if err != nil { - logger.Errorf("%s pull catalog service %s failed: %v", - c.superSpec.Name(), serviceName, err) - continue - } - for _, service := range services { - serviceInstanceSpec := &serviceregistry.ServiceInstanceSpec{ - ServiceName: serviceName, - } - serviceInstanceSpec.HostIP = service.ServiceAddress - if serviceInstanceSpec.HostIP == "" { - serviceInstanceSpec.HostIP = service.Address - } - serviceInstanceSpec.Port = uint16(service.ServicePort) - serviceInstanceSpec.Tags = service.ServiceTags - - if err := serviceInstanceSpec.Validate(); err != nil { - logger.Errorf("invalid server: %v", err) - continue - } - - serviceSpec, exists := serviceSpecs[serviceName] - if !exists { - serviceSpecs[serviceName] = &serviceregistry.ServiceSpec{ - RegistryName: c.Name(), - ServiceName: serviceName, - Instances: []*serviceregistry.ServiceInstanceSpec{serviceInstanceSpec}, - } - } else { - serviceSpec.Instances = append(serviceSpec.Instances, serviceInstanceSpec) - } - - serviceInstancesNum[serviceName]++ - } + instancesNum := make(map[string]int) + for _, instance := range instances { + instancesNum[instance.ServiceName]++ } var event *serviceregistry.RegistryEvent if !c.firstDone { c.firstDone = true event = &serviceregistry.RegistryEvent{ - RegistryName: c.Name(), - Replace: serviceSpecs, + SourceRegistryName: c.Name(), + Replace: instances, } } else { - event = serviceregistry.NewRegistryEventFromDiff(c.Name(), c.serviceSpecs, serviceSpecs) + event = serviceregistry.NewRegistryEventFromDiff(c.Name(), c.instances, instances) } c.notify <- event - c.serviceSpecs = serviceSpecs + c.instances = instances c.statusMutex.Lock() - c.serviceInstancesNum = serviceInstancesNum + c.instancesNum = instancesNum c.statusMutex.Unlock() } @@ -296,7 +254,7 @@ func (c *ConsulServiceRegistry) Status() *supervisor.Status { } c.statusMutex.Lock() - serversNum := c.serviceInstancesNum + serversNum := c.instancesNum c.statusMutex.Unlock() s.ServiceInstancesNum = serversNum @@ -324,20 +282,160 @@ func (c *ConsulServiceRegistry) Notify() <-chan *serviceregistry.RegistryEvent { return c.notify } -// ApplyServices applies service specs to consul registry. -func (c *ConsulServiceRegistry) ApplyServices(serviceSpec []*serviceregistry.ServiceSpec) error { - // TODO +// ApplyServiceInstances applies service instances to the registry. +func (c *ConsulServiceRegistry) ApplyServiceInstances(instances map[string]*serviceregistry.ServiceInstanceSpec) error { + client, err := c.getClient() + if err != nil { + return fmt.Errorf("%s get consul client failed: %v", + c.superSpec.Name(), err) + } + + for _, instance := range instances { + err := instance.Validate() + if err != nil { + return fmt.Errorf("%+v is invalid: %v", instance, err) + } + } + + for _, instance := range instances { + registration := c.serviceInstanceToRegistration(instance) + err = client.Agent().ServiceRegister(registration) + if err != nil { + return err + } + } + return nil } -// GetService applies service specs to consul registry. -func (c *ConsulServiceRegistry) GetService(serviceName string) (*serviceregistry.ServiceSpec, error) { - // TODO - return nil, nil +// DeleteServiceInstances applies service instances to the registry. +func (c *ConsulServiceRegistry) DeleteServiceInstances(instances map[string]*serviceregistry.ServiceInstanceSpec) error { + client, err := c.getClient() + if err != nil { + return fmt.Errorf("%s get consul client failed: %v", + c.superSpec.Name(), err) + } + + for _, instance := range instances { + err := client.Agent().ServiceDeregister(instance.InstanceID) + if err != nil { + return err + } + } + + return nil } -// ListServices lists service specs from consul registry. -func (c *ConsulServiceRegistry) ListServices() ([]*serviceregistry.ServiceSpec, error) { - // TODO - return nil, nil +// GetServiceInstance get service instance from the registry. +func (c *ConsulServiceRegistry) GetServiceInstance(serviceName, instanceID string) (*serviceregistry.ServiceInstanceSpec, error) { + instances, err := c.ListServiceInstances(serviceName) + if err != nil { + return nil, err + } + + for _, instance := range instances { + if instance.InstanceID == instanceID { + return instance, nil + } + } + + return nil, fmt.Errorf("%s/%s not found", serviceName, instanceID) +} + +// ListServiceInstances list service instances of one service from the registry. +func (c *ConsulServiceRegistry) ListServiceInstances(serviceName string) (map[string]*serviceregistry.ServiceInstanceSpec, error) { + client, err := c.getClient() + if err != nil { + return nil, fmt.Errorf("%s get consul client failed: %v", + c.superSpec.Name(), err) + } + + catalogServices, _, err := client.Catalog().Service(serviceName, "", &api.QueryOptions{}) + + if err != nil { + return nil, err + } + + instances := make(map[string]*serviceregistry.ServiceInstanceSpec) + for _, catalog := range catalogServices { + serviceInstance := c.catalogServiceToServiceInstance(catalog) + err := serviceInstance.Validate() + if err != nil { + return nil, fmt.Errorf("%+v is invalid: %v", serviceInstance, err) + } + + instances[serviceInstance.Key()] = serviceInstance + } + + return instances, nil +} + +// ListAllServiceInstances list all service instances from the registry. +func (c *ConsulServiceRegistry) ListAllServiceInstances() (map[string]*serviceregistry.ServiceInstanceSpec, error) { + client, err := c.getClient() + if err != nil { + return nil, fmt.Errorf("%s get consul client failed: %v", + c.superSpec.Name(), err) + } + + resp, _, err := client.Catalog().Services(&api.QueryOptions{}) + if err != nil { + return nil, fmt.Errorf("%s pull catalog services failed: %v", + c.superSpec.Name(), err) + } + + instances := make(map[string]*serviceregistry.ServiceInstanceSpec) + for serviceName := range resp { + services, _, err := client.Catalog().Service(serviceName, "", &api.QueryOptions{}) + if err != nil { + return nil, fmt.Errorf("%s pull catalog service %s failed: %v", + c.superSpec.Name(), serviceName, err) + } + + for _, service := range services { + serviceInstance := c.catalogServiceToServiceInstance(service) + if err := serviceInstance.Validate(); err != nil { + return nil, fmt.Errorf("%+v is invalid: %v", serviceInstance, err) + } + + instances[serviceInstance.Key()] = serviceInstance + } + } + + return instances, nil +} + +func (c *ConsulServiceRegistry) serviceInstanceToRegistration(serviceInstance *serviceregistry.ServiceInstanceSpec) *api.AgentServiceRegistration { + return &api.AgentServiceRegistration{ + Kind: api.ServiceKindTypical, + ID: serviceInstance.InstanceID, + Name: serviceInstance.ServiceName, + Tags: serviceInstance.Tags, + Port: int(serviceInstance.Port), + Meta: map[string]string{ + MetaKeyRegistryName: serviceInstance.RegistryName, + }, + } +} + +func (c *ConsulServiceRegistry) catalogServiceToServiceInstance(catalogService *api.CatalogService) *serviceregistry.ServiceInstanceSpec { + registryName := c.Name() + if catalogService.ServiceMeta != nil && + catalogService.ServiceMeta[MetaKeyRegistryName] != "" { + registryName = catalogService.ServiceMeta[MetaKeyRegistryName] + } + + hostIP := catalogService.ServiceAddress + if hostIP == "" { + hostIP = catalogService.Address + } + + return &serviceregistry.ServiceInstanceSpec{ + RegistryName: registryName, + ServiceName: catalogService.ServiceName, + InstanceID: catalogService.ServiceID, + Port: uint16(catalogService.ServicePort), + Tags: catalogService.ServiceTags, + HostIP: hostIP, + } } diff --git a/pkg/object/etcdserviceregistry/etcdserviceregistry.go b/pkg/object/etcdserviceregistry/etcdserviceregistry.go index cca03a522d..c58e8e5dcb 100644 --- a/pkg/object/etcdserviceregistry/etcdserviceregistry.go +++ b/pkg/object/etcdserviceregistry/etcdserviceregistry.go @@ -19,15 +19,17 @@ package eserviceregistry import ( "context" + "fmt" + "path/filepath" "sync" "time" - "github.com/ghodss/yaml" - clientv3 "go.etcd.io/etcd/client/v3" - "github.com/megaease/easegress/pkg/logger" "github.com/megaease/easegress/pkg/object/serviceregistry" "github.com/megaease/easegress/pkg/supervisor" + + clientv3 "go.etcd.io/etcd/client/v3" + "gopkg.in/yaml.v2" ) const ( @@ -50,14 +52,14 @@ type ( serviceRegistry *serviceregistry.ServiceRegistry firstDone bool - serviceSpecs map[string]*serviceregistry.ServiceSpec + instances map[string]*serviceregistry.ServiceInstanceSpec notify chan *serviceregistry.RegistryEvent clientMutex sync.RWMutex client *clientv3.Client - statusMutex sync.Mutex - serviceInstancesNum map[string]int + statusMutex sync.Mutex + instancesNum map[string]int done chan struct{} } @@ -72,7 +74,7 @@ type ( // Status is the status of EtcdServiceRegistry. Status struct { Health string `yaml:"health"` - ServiceInstancesNum map[string]int `yaml:"serviceInstancesNum"` + ServiceInstancesNum map[string]int `yaml:"instancesNum"` } ) @@ -110,9 +112,10 @@ func (e *EtcdServiceRegistry) reload() { e.serviceRegistry = e.superSpec.Super().MustGetSystemController(serviceregistry.Kind). Instance().(*serviceregistry.ServiceRegistry) e.serviceRegistry.RegisterRegistry(e) + e.firstDone = false e.notify = make(chan *serviceregistry.RegistryEvent, 10) - e.serviceInstancesNum = map[string]int{} + e.instancesNum = map[string]int{} e.done = make(chan struct{}) _, err := e.getClient() @@ -197,66 +200,33 @@ func (e *EtcdServiceRegistry) run() { } func (e *EtcdServiceRegistry) update() { - client, err := e.getClient() - if err != nil { - logger.Errorf("%s get etcd client failed: %v", - e.superSpec.Name(), err) - return - } - resp, err := client.Get(context.Background(), e.spec.Prefix, clientv3.WithPrefix()) + instances, err := e.ListAllServiceInstances() if err != nil { - logger.Errorf("%s pull services failed: %v", - e.superSpec.Name(), err) + logger.Errorf("list all service instances failed: %v", err) return } - serviceSpecs := make(map[string]*serviceregistry.ServiceSpec) - serviceInstancesNum := map[string]int{} - for _, kv := range resp.Kvs { - serviceInstanceSpec := &serviceregistry.ServiceInstanceSpec{} - err := yaml.Unmarshal(kv.Value, serviceInstanceSpec) - if err != nil { - logger.Errorf("%s: unmarshal %s to yaml failed: %v", - kv.Key, kv.Value, err) - continue - } - if err := serviceInstanceSpec.Validate(); err != nil { - logger.Errorf("%s is invalid: %v", kv.Value, err) - continue - } - - serviceName := serviceInstanceSpec.ServiceName - - serviceSpec, exists := serviceSpecs[serviceName] - if !exists { - serviceSpecs[serviceName] = &serviceregistry.ServiceSpec{ - RegistryName: e.Name(), - ServiceName: serviceName, - Instances: []*serviceregistry.ServiceInstanceSpec{serviceInstanceSpec}, - } - } else { - serviceSpec.Instances = append(serviceSpec.Instances, serviceInstanceSpec) - } - - serviceInstancesNum[serviceName]++ + instancesNum := make(map[string]int) + for _, instance := range instances { + instancesNum[instance.ServiceName]++ } var event *serviceregistry.RegistryEvent if !e.firstDone { e.firstDone = true event = &serviceregistry.RegistryEvent{ - RegistryName: e.Name(), - Replace: serviceSpecs, + SourceRegistryName: e.Name(), + Replace: instances, } } else { - event = serviceregistry.NewRegistryEventFromDiff(e.Name(), e.serviceSpecs, serviceSpecs) + event = serviceregistry.NewRegistryEventFromDiff(e.Name(), e.instances, instances) } e.notify <- event - e.serviceSpecs = serviceSpecs + e.instances = instances e.statusMutex.Lock() - e.serviceInstancesNum = serviceInstancesNum + e.instancesNum = instancesNum e.statusMutex.Unlock() } @@ -272,10 +242,10 @@ func (e *EtcdServiceRegistry) Status() *supervisor.Status { } e.statusMutex.Lock() - serviceInstancesNum := e.serviceInstancesNum + instancesNum := e.instancesNum e.statusMutex.Unlock() - s.ServiceInstancesNum = serviceInstancesNum + s.ServiceInstancesNum = instancesNum return &supervisor.Status{ ObjectStatus: s, @@ -300,20 +270,163 @@ func (e *EtcdServiceRegistry) Notify() <-chan *serviceregistry.RegistryEvent { return e.notify } -// ApplyServices applies service specs to etcd registry. -func (e *EtcdServiceRegistry) ApplyServices(serviceSpec []*serviceregistry.ServiceSpec) error { - // TODO +// ApplyServiceInstances applies service instances to the registry. +func (e *EtcdServiceRegistry) ApplyServiceInstances(instances map[string]*serviceregistry.ServiceInstanceSpec) error { + client, err := e.getClient() + if err != nil { + return fmt.Errorf("%s get etcd client failed: %v", + e.superSpec.Name(), err) + } + + ops := []clientv3.Op{} + for _, instance := range instances { + err := instance.Validate() + if err != nil { + return fmt.Errorf("%+v is invalid: %v", instance, err) + } + + buff, err := yaml.Marshal(instance) + if err != nil { + return fmt.Errorf("marshal %+v to yaml failed: %v", instance, err) + } + + key := e.serviceInstanceEtcdKey(instance) + ops = append(ops, clientv3.OpPut(key, string(buff))) + } + + _, err = client.Txn(context.Background()).Then(ops...).Commit() + if err != nil { + return err + } + return nil } -// GetService applies service specs to etcd registry. -func (e *EtcdServiceRegistry) GetService(serviceName string) (*serviceregistry.ServiceSpec, error) { - // TODO - return nil, nil +// DeleteServiceInstances applies service instances to the registry. +func (e *EtcdServiceRegistry) DeleteServiceInstances(instances map[string]*serviceregistry.ServiceInstanceSpec) error { + client, err := e.getClient() + if err != nil { + return fmt.Errorf("%s get etcd client failed: %v", + e.superSpec.Name(), err) + } + + ops := []clientv3.Op{} + for _, instance := range instances { + key := e.serviceInstanceEtcdKey(instance) + ops = append(ops, clientv3.OpDelete(key)) + } + + _, err = client.Txn(context.Background()).Then(ops...).Commit() + if err != nil { + return err + } + + return nil +} + +// GetServiceInstance get service instance from the registry. +func (e *EtcdServiceRegistry) GetServiceInstance(serviceName, instanceID string) (*serviceregistry.ServiceInstanceSpec, error) { + client, err := e.getClient() + if err != nil { + return nil, fmt.Errorf("%s get etcd client failed: %v", + e.superSpec.Name(), err) + } + + resp, err := client.Get(context.Background(), e.serviceInstanceEtcdKeyFromRaw(serviceName, instanceID)) + if err != nil { + return nil, err + } + + if len(resp.Kvs) == 0 { + return nil, fmt.Errorf("%s/%s not found", serviceName, instanceID) + } + + instance := &serviceregistry.ServiceInstanceSpec{} + err = yaml.Unmarshal(resp.Kvs[0].Value, instance) + if err != nil { + return nil, fmt.Errorf("unmarshal %s to yaml failed: %v", resp.Kvs[0].Value, err) + } + + err = instance.Validate() + if err != nil { + return nil, fmt.Errorf("%+v is invalid: %v", instance, err) + } + + return instance, nil +} + +// ListServiceInstances list service instances of one service from the registry. +func (e *EtcdServiceRegistry) ListServiceInstances(serviceName string) (map[string]*serviceregistry.ServiceInstanceSpec, error) { + client, err := e.getClient() + if err != nil { + return nil, fmt.Errorf("%s get etcd client failed: %v", + e.superSpec.Name(), err) + } + + resp, err := client.Get(context.Background(), e.serviceEtcdPrefix(serviceName), clientv3.WithPrefix()) + if err != nil { + return nil, err + } + + instances := make(map[string]*serviceregistry.ServiceInstanceSpec) + for _, kv := range resp.Kvs { + instance := &serviceregistry.ServiceInstanceSpec{} + err = yaml.Unmarshal(kv.Value, instance) + if err != nil { + return nil, fmt.Errorf("unmarshal %s to yaml failed: %v", kv.Value, err) + } + + err = instance.Validate() + if err != nil { + return nil, fmt.Errorf("%+v is invalid: %v", instance, err) + } + + instances[instance.Key()] = instance + } + + return instances, nil +} + +// ListAllServiceInstances list all service instances from the registry. +func (e *EtcdServiceRegistry) ListAllServiceInstances() (map[string]*serviceregistry.ServiceInstanceSpec, error) { + client, err := e.getClient() + if err != nil { + return nil, fmt.Errorf("%s get etcd client failed: %v", + e.superSpec.Name(), err) + } + + resp, err := client.Get(context.Background(), e.spec.Prefix, clientv3.WithPrefix()) + if err != nil { + return nil, err + } + + instances := make(map[string]*serviceregistry.ServiceInstanceSpec) + for _, kv := range resp.Kvs { + instance := &serviceregistry.ServiceInstanceSpec{} + err = yaml.Unmarshal(kv.Value, instance) + if err != nil { + return nil, fmt.Errorf("unmarshal %s to yaml failed: %v", kv.Value, err) + } + + err = instance.Validate() + if err != nil { + return nil, fmt.Errorf("%+v is invalid: %v", instance, err) + } + + instances[instance.Key()] = instance + } + + return instances, nil +} + +func (e *EtcdServiceRegistry) serviceEtcdPrefix(serviceName string) string { + return filepath.Join(e.spec.Prefix, serviceName) + "/" +} + +func (e *EtcdServiceRegistry) serviceInstanceEtcdKey(instance *serviceregistry.ServiceInstanceSpec) string { + return e.serviceInstanceEtcdKeyFromRaw(instance.ServiceName, instance.InstanceID) } -// ListServices lists service specs from etcd registry. -func (e *EtcdServiceRegistry) ListServices() ([]*serviceregistry.ServiceSpec, error) { - // TODO - return nil, nil +func (e *EtcdServiceRegistry) serviceInstanceEtcdKeyFromRaw(serviceName, instanceID string) string { + return filepath.Join(e.spec.Prefix, serviceName, instanceID) } diff --git a/pkg/object/eurekaserviceregistry/eurekaserviceregistry.go b/pkg/object/eurekaserviceregistry/eurekaserviceregistry.go index 666bbf134e..114d13ce31 100644 --- a/pkg/object/eurekaserviceregistry/eurekaserviceregistry.go +++ b/pkg/object/eurekaserviceregistry/eurekaserviceregistry.go @@ -18,6 +18,7 @@ package eurekaserviceregistry import ( + "fmt" "sync" "time" @@ -34,6 +35,9 @@ const ( // Kind is the kind of EurekaServiceRegistry. Kind = "EurekaServiceRegistry" + + // MetaKeyRegistryName is the key of service metadata. + MetaKeyRegistryName = "RegistryName" ) func init() { @@ -48,14 +52,14 @@ type ( serviceRegistry *serviceregistry.ServiceRegistry firstDone bool - serviceSpecs map[string]*serviceregistry.ServiceSpec + instances map[string]*serviceregistry.ServiceInstanceSpec notify chan *serviceregistry.RegistryEvent clientMutex sync.RWMutex client *eurekaapi.Client - statusMutex sync.Mutex - serviceInstancesNum map[string]int + statusMutex sync.Mutex + instancesNum map[string]int done chan struct{} } @@ -69,7 +73,7 @@ type ( // Status is the status of EurekaServiceRegistry. Status struct { Health string `yaml:"health"` - ServiceInstancesNum map[string]int `yaml:"serviceInstancesNum"` + ServiceInstancesNum map[string]int `yaml:"instancesNum"` } ) @@ -108,8 +112,9 @@ func (e *EurekaServiceRegistry) reload() { Instance().(*serviceregistry.ServiceRegistry) e.serviceRegistry.RegisterRegistry(e) e.notify = make(chan *serviceregistry.RegistryEvent, 10) + e.firstDone = false - e.serviceInstancesNum = make(map[string]int) + e.instancesNum = make(map[string]int) e.done = make(chan struct{}) _, err := e.getClient() @@ -180,78 +185,33 @@ func (e *EurekaServiceRegistry) run() { } func (e *EurekaServiceRegistry) update() { - client, err := e.getClient() + instances, err := e.ListAllServiceInstances() if err != nil { - logger.Errorf("%s get eureka client failed: %v", - e.superSpec.Name(), err) + logger.Errorf("list all service instances failed: %v", err) return } - apps, err := client.GetApplications() - if err != nil { - logger.Errorf("%s get services failed: %v", - e.superSpec.Name(), err) - return - } - - serviceSpecs := make(map[string]*serviceregistry.ServiceSpec) - serviceInstancesNum := map[string]int{} - for _, app := range apps.Applications { - for _, instance := range app.Instances { - var instanceSpecs []*serviceregistry.ServiceInstanceSpec - - baseServiceInstanceSpec := serviceregistry.ServiceInstanceSpec{ - ServiceName: app.Name, - Hostname: instance.HostName, - HostIP: instance.IpAddr, - Port: uint16(instance.Port.Port), - } - - if instance.Port != nil && instance.Port.Enabled { - plain := baseServiceInstanceSpec - instanceSpecs = append(instanceSpecs, &plain) - serviceInstancesNum[app.Name]++ - } - - if instance.SecurePort != nil && instance.SecurePort.Enabled { - secure := baseServiceInstanceSpec - secure.Scheme = "https" - instanceSpecs = append(instanceSpecs, &secure) - - serviceInstancesNum[app.Name]++ - } - - serviceName := app.Name - - serviceSpec, exists := serviceSpecs[serviceName] - if !exists { - serviceSpecs[serviceName] = &serviceregistry.ServiceSpec{ - RegistryName: e.Name(), - ServiceName: serviceName, - } - serviceSpecs[serviceName].Instances = append(serviceSpecs[serviceName].Instances, instanceSpecs...) - } else { - serviceSpec.Instances = append(serviceSpec.Instances, instanceSpecs...) - } - } + instancesNum := make(map[string]int) + for _, instance := range instances { + instancesNum[instance.ServiceName]++ } var event *serviceregistry.RegistryEvent if !e.firstDone { e.firstDone = true event = &serviceregistry.RegistryEvent{ - RegistryName: e.Name(), - Replace: serviceSpecs, + SourceRegistryName: e.Name(), + Replace: instances, } } else { - event = serviceregistry.NewRegistryEventFromDiff(e.Name(), e.serviceSpecs, serviceSpecs) + event = serviceregistry.NewRegistryEventFromDiff(e.Name(), e.instances, instances) } e.notify <- event - e.serviceSpecs = serviceSpecs + e.instances = instances e.statusMutex.Lock() - e.serviceInstancesNum = serviceInstancesNum + e.instancesNum = instancesNum e.statusMutex.Unlock() } @@ -267,10 +227,10 @@ func (e *EurekaServiceRegistry) Status() *supervisor.Status { } e.statusMutex.Lock() - serviceInstancesNum := e.serviceInstancesNum + instancesNum := e.instancesNum e.statusMutex.Unlock() - s.ServiceInstancesNum = serviceInstancesNum + s.ServiceInstancesNum = instancesNum return &supervisor.Status{ ObjectStatus: s, @@ -295,20 +255,178 @@ func (e *EurekaServiceRegistry) Notify() <-chan *serviceregistry.RegistryEvent { return e.notify } -// ApplyServices applies service specs to eureka registry. -func (e *EurekaServiceRegistry) ApplyServices(serviceSpec []*serviceregistry.ServiceSpec) error { - // TODO +// ApplyServiceInstances applies service instances to the registry. +func (e *EurekaServiceRegistry) ApplyServiceInstances(instances map[string]*serviceregistry.ServiceInstanceSpec) error { + client, err := e.getClient() + if err != nil { + return fmt.Errorf("%s get consul client failed: %v", + e.superSpec.Name(), err) + } + + for _, instance := range instances { + err := instance.Validate() + if err != nil { + return fmt.Errorf("%+v is invalid: %v", instance, err) + } + } + + for _, instance := range instances { + info := e.serviceInstanceToInstanceInfo(instance) + err = client.RegisterInstance(info.App, info) + if err != nil { + return err + } + } + + return nil +} + +// DeleteServiceInstances applies service instances to the registry. +func (e *EurekaServiceRegistry) DeleteServiceInstances(instances map[string]*serviceregistry.ServiceInstanceSpec) error { + client, err := e.getClient() + if err != nil { + return fmt.Errorf("%s get consul client failed: %v", + e.superSpec.Name(), err) + } + + for _, instance := range instances { + err := client.UnregisterInstance(instance.ServiceName, instance.InstanceID) + if err != nil { + return err + } + } + return nil } -// GetService applies service specs to eureka registry. -func (e *EurekaServiceRegistry) GetService(serviceName string) (*serviceregistry.ServiceSpec, error) { - // TODO - return nil, nil +// GetServiceInstance get service instance from the registry. +func (e *EurekaServiceRegistry) GetServiceInstance(serviceName, instanceID string) (*serviceregistry.ServiceInstanceSpec, error) { + instances, err := e.ListServiceInstances(serviceName) + if err != nil { + return nil, err + } + + for _, instance := range instances { + if instance.InstanceID == instanceID { + return instance, nil + } + } + + return nil, fmt.Errorf("%s/%s not found", serviceName, instanceID) } -// ListServices lists service specs from eureka registry. -func (e *EurekaServiceRegistry) ListServices() ([]*serviceregistry.ServiceSpec, error) { - // TODO - return nil, nil +// ListServiceInstances list service instances of one service from the registry. +func (e *EurekaServiceRegistry) ListServiceInstances(serviceName string) (map[string]*serviceregistry.ServiceInstanceSpec, error) { + client, err := e.getClient() + if err != nil { + return nil, fmt.Errorf("%s get consul client failed: %v", + e.superSpec.Name(), err) + } + + app, err := client.GetApplication(serviceName) + if err != nil { + return nil, err + } + + instances := make(map[string]*serviceregistry.ServiceInstanceSpec) + for _, info := range app.Instances { + for _, serviceInstance := range e.instanceInfoToServiceInstances(&info) { + err := serviceInstance.Validate() + if err != nil { + return nil, fmt.Errorf("%+v is invalid: %v", serviceInstance, err) + } + instances[serviceInstance.Key()] = serviceInstance + } + } + + return instances, nil +} + +// ListAllServiceInstances list all service instances from the registry. +func (e *EurekaServiceRegistry) ListAllServiceInstances() (map[string]*serviceregistry.ServiceInstanceSpec, error) { + client, err := e.getClient() + if err != nil { + return nil, fmt.Errorf("%s get consul client failed: %v", + e.superSpec.Name(), err) + } + + apps, err := client.GetApplications() + if err != nil { + return nil, err + } + + instances := make(map[string]*serviceregistry.ServiceInstanceSpec) + for _, app := range apps.Applications { + for _, info := range app.Instances { + for _, serviceInstance := range e.instanceInfoToServiceInstances(&info) { + err := serviceInstance.Validate() + if err != nil { + return nil, fmt.Errorf("%+v is invalid: %v", serviceInstance, err) + } + instances[serviceInstance.Key()] = serviceInstance + } + } + } + + return instances, nil +} + +func (e *EurekaServiceRegistry) instanceInfoToServiceInstances(info *eurekaapi.InstanceInfo) []*serviceregistry.ServiceInstanceSpec { + var instances []*serviceregistry.ServiceInstanceSpec + + registryName := e.Name() + if info.Metadata != nil && info.Metadata.Map != nil && + info.Metadata.Map[MetaKeyRegistryName] != "" { + registryName = info.Metadata.Map[MetaKeyRegistryName] + } + + baseServiceInstanceSpec := serviceregistry.ServiceInstanceSpec{ + RegistryName: registryName, + ServiceName: info.App, + InstanceID: info.InstanceID, + Hostname: info.HostName, + HostIP: info.IpAddr, + } + + if info.Port != nil && info.Port.Enabled { + plain := baseServiceInstanceSpec + instances = append(instances, &plain) + } + + if info.SecurePort != nil && info.SecurePort.Enabled { + secure := baseServiceInstanceSpec + secure.Scheme = "https" + instances = append(instances, &secure) + } + + return instances +} + +func (e *EurekaServiceRegistry) serviceInstanceToInstanceInfo(serviceInstance *serviceregistry.ServiceInstanceSpec) *eurekaapi.InstanceInfo { + info := &eurekaapi.InstanceInfo{ + Metadata: &eurekaapi.MetaData{ + Map: map[string]string{ + MetaKeyRegistryName: serviceInstance.RegistryName, + }, + }, + App: serviceInstance.ServiceName, + InstanceID: serviceInstance.InstanceID, + HostName: serviceInstance.Hostname, + IpAddr: serviceInstance.HostIP, + } + + switch serviceInstance.Scheme { + case "", "http": + info.Port = &eurekaapi.Port{ + Port: int(serviceInstance.Port), + Enabled: true, + } + case "https": + info.SecurePort = &eurekaapi.Port{ + Port: int(serviceInstance.Port), + Enabled: true, + } + } + + return info } diff --git a/pkg/object/meshcontroller/registrycenter/registry.go b/pkg/object/meshcontroller/registrycenter/registry.go index 1c84416767..b74872a8aa 100644 --- a/pkg/object/meshcontroller/registrycenter/registry.go +++ b/pkg/object/meshcontroller/registrycenter/registry.go @@ -151,7 +151,7 @@ func (rcs *Server) register(ins *spec.ServiceInstanceSpec, ingressReady ReadyFun // level triggered, loop until it success tryTimes++ if !ingressReady() || !egressReady() { - logger.Infof("ingress ready: %d egress ready: %d", ingressReady(), egressReady()) + logger.Infof("ingress ready: %v egress ready: %v", ingressReady(), egressReady()) return } diff --git a/pkg/object/nacosserviceregistry/nacosserviceregistry.go b/pkg/object/nacosserviceregistry/nacosserviceregistry.go new file mode 100644 index 0000000000..d980c24516 --- /dev/null +++ b/pkg/object/nacosserviceregistry/nacosserviceregistry.go @@ -0,0 +1,507 @@ +/* + * Copyright (c) 2017, MegaEase + * All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package nacosserviceregistry + +import ( + "fmt" + "path/filepath" + "sync" + "time" + + "github.com/megaease/easegress/pkg/logger" + "github.com/megaease/easegress/pkg/object/serviceregistry" + "github.com/megaease/easegress/pkg/supervisor" + "github.com/megaease/easegress/pkg/v" + + "github.com/nacos-group/nacos-sdk-go/clients" + "github.com/nacos-group/nacos-sdk-go/clients/naming_client" + "github.com/nacos-group/nacos-sdk-go/common/constant" + "github.com/nacos-group/nacos-sdk-go/model" + "github.com/nacos-group/nacos-sdk-go/vo" +) + +const ( + // Category is the category of NacosServiceRegistry. + Category = supervisor.CategoryBusinessController + + // Kind is the kind of NacosServiceRegistry. + Kind = "NacosServiceRegistry" + + // MetaKeyRegistryName is the key of service registry name. + MetaKeyRegistryName = "RegistryName" + + // MetaKeyInstanceID is the key of service instance ID. + MetaKeyInstanceID = "InstanceID" +) + +func init() { + supervisor.Register(&NacosServiceRegistry{}) +} + +type ( + // NacosServiceRegistry is Object NacosServiceRegistry. + NacosServiceRegistry struct { + superSpec *supervisor.Spec + spec *Spec + + serviceRegistry *serviceregistry.ServiceRegistry + firstDone bool + instances map[string]*serviceregistry.ServiceInstanceSpec + notify chan *serviceregistry.RegistryEvent + + clientMutex sync.RWMutex + client naming_client.INamingClient + + statusMutex sync.Mutex + instancesNum map[string]int + + done chan struct{} + } + + // Spec describes the NacosServiceRegistry. + Spec struct { + SyncInterval string `yaml:"syncInterval" jsonschema:"required,format=duration"` + Namespace string `yaml:"namespace" jsonschema:"omitempty"` + Username string `yaml:"username"` + Password string `yaml:"password"` + Servers []*ServerSpec `yaml:"servers"` + } + + // ServerSpec is the server config of Nacos. + ServerSpec struct { + Scheme string `yaml:"scheme" jsonschema:"omitempty,enum=http,enum=https"` + ContextPath string `yaml:"contextPath" jsonschema:"omitempty"` + IPAddr string `yaml:"ipAddr" jsonschema:"required"` + Port uint16 `yaml:"port" jsonschema:"required"` + } + + // Status is the status of NacosServiceRegistry. + Status struct { + Health string `yaml:"health"` + ServiceInstancesNum map[string]int `yaml:"instancesNum"` + } +) + +var _ v.Validator = Spec{} + +// Validate validates Spec itself. +func (spec Spec) Validate() error { + if len(spec.Servers) == 0 { + return fmt.Errorf("zero server config") + } + + return nil +} + +// Category returns the category of NacosServiceRegistry. +func (n *NacosServiceRegistry) Category() supervisor.ObjectCategory { + return Category +} + +// Kind returns the kind of NacosServiceRegistry. +func (n *NacosServiceRegistry) Kind() string { + return Kind +} + +// DefaultSpec returns the default spec of NacosServiceRegistry. +func (n *NacosServiceRegistry) DefaultSpec() interface{} { + return &Spec{ + SyncInterval: "10s", + } +} + +// Init initilizes NacosServiceRegistry. +func (n *NacosServiceRegistry) Init(superSpec *supervisor.Spec) { + n.superSpec, n.spec = superSpec, superSpec.ObjectSpec().(*Spec) + n.reload() +} + +// Inherit inherits previous generation of NacosServiceRegistry. +func (n *NacosServiceRegistry) Inherit(superSpec *supervisor.Spec, previousGeneration supervisor.Object) { + previousGeneration.Close() + n.Init(superSpec) +} + +func (n *NacosServiceRegistry) reload() { + n.serviceRegistry = n.superSpec.Super().MustGetSystemController(serviceregistry.Kind). + Instance().(*serviceregistry.ServiceRegistry) + n.serviceRegistry.RegisterRegistry(n) + n.notify = make(chan *serviceregistry.RegistryEvent, 10) + n.firstDone = false + + n.instancesNum = map[string]int{} + n.done = make(chan struct{}) + + _, err := n.getClient() + if err != nil { + logger.Errorf("%s get nacos client failed: %v", n.superSpec.Name(), err) + } + + go n.run() +} + +func (n *NacosServiceRegistry) getClient() (naming_client.INamingClient, error) { + n.clientMutex.RLock() + if n.client != nil { + client := n.client + n.clientMutex.RUnlock() + return client, nil + } + n.clientMutex.RUnlock() + + return n.buildClient() +} + +func (n *NacosServiceRegistry) buildClient() (naming_client.INamingClient, error) { + n.clientMutex.Lock() + defer n.clientMutex.Unlock() + + // DCL + if n.client != nil { + return n.client, nil + } + + logDir := filepath.Join(n.superSpec.Super().Options().HomeDir, n.superSpec.Name(), "log") + cacheDir := filepath.Join(n.superSpec.Super().Options().HomeDir, n.superSpec.Name(), "cache") + clientConfig := constant.ClientConfig{ + Username: n.spec.Username, + Password: n.spec.Password, + + TimeoutMs: 5000, + NotLoadCacheAtStart: true, + LogDir: logDir, + CacheDir: cacheDir, + RotateTime: "1h", + MaxAge: 3, + LogLevel: "info", + } + + serverConfigs := []constant.ServerConfig{} + for _, serverSpec := range n.spec.Servers { + serverConfigs = append(serverConfigs, constant.ServerConfig{ + IpAddr: serverSpec.IPAddr, + ContextPath: serverSpec.ContextPath, + Port: uint64(serverSpec.Port), + Scheme: serverSpec.Scheme, + }) + } + + client, err := clients.NewNamingClient( + vo.NacosClientParam{ + ClientConfig: &clientConfig, + ServerConfigs: serverConfigs, + }, + ) + if err != nil { + return nil, err + } + + n.client = client + + return client, nil +} + +func (n *NacosServiceRegistry) closeClient() { + n.clientMutex.Lock() + defer n.clientMutex.Unlock() + + if n.client == nil { + return + } + + n.client = nil +} + +func (n *NacosServiceRegistry) run() { + syncInterval, err := time.ParseDuration(n.spec.SyncInterval) + if err != nil { + logger.Errorf("BUG: parse duration %s failed: %v", + n.spec.SyncInterval, err) + return + } + + n.update() + + for { + select { + case <-n.done: + return + case <-time.After(syncInterval): + n.update() + } + } +} + +func (n *NacosServiceRegistry) update() { + instances, err := n.ListAllServiceInstances() + if err != nil { + logger.Errorf("list all service instances failed: %v", err) + return + } + + instancesNum := make(map[string]int) + for _, instance := range instances { + instancesNum[instance.ServiceName]++ + } + + var event *serviceregistry.RegistryEvent + if !n.firstDone { + n.firstDone = true + event = &serviceregistry.RegistryEvent{ + SourceRegistryName: n.Name(), + Replace: instances, + } + } else { + event = serviceregistry.NewRegistryEventFromDiff(n.Name(), n.instances, instances) + } + + n.notify <- event + n.instances = instances + + n.statusMutex.Lock() + n.instancesNum = instancesNum + n.statusMutex.Unlock() +} + +// Status returns status of NacosServiceRegister. +func (n *NacosServiceRegistry) Status() *supervisor.Status { + s := &Status{} + + _, err := n.getClient() + if err != nil { + s.Health = err.Error() + } else { + s.Health = "ready" + } + + n.statusMutex.Lock() + serversNum := n.instancesNum + n.statusMutex.Unlock() + + s.ServiceInstancesNum = serversNum + + return &supervisor.Status{ + ObjectStatus: s, + } +} + +// Close closes NacosServiceRegistry. +func (n *NacosServiceRegistry) Close() { + n.serviceRegistry.DeregisterRegistry(n.Name()) + + n.closeClient() + close(n.done) +} + +// Name returns name. +func (n *NacosServiceRegistry) Name() string { + return n.superSpec.Name() +} + +// Notify returns notify channel. +func (n *NacosServiceRegistry) Notify() <-chan *serviceregistry.RegistryEvent { + return n.notify +} + +// ApplyServiceInstances applies service instances to the registry. +func (n *NacosServiceRegistry) ApplyServiceInstances(instances map[string]*serviceregistry.ServiceInstanceSpec) error { + client, err := n.getClient() + if err != nil { + return fmt.Errorf("%s get nacos client failed: %v", + n.superSpec.Name(), err) + } + + for _, instance := range instances { + err := instance.Validate() + if err != nil { + return fmt.Errorf("%+v is invalid: %v", instance, err) + } + } + + for _, instance := range instances { + registerInstance := n.serviceInstanceToRegisterInstance(instance) + _, err := client.RegisterInstance(*registerInstance) + if err != nil { + return err + } + } + + return nil +} + +// DeleteServiceInstances applies service instances to the registry. +func (n *NacosServiceRegistry) DeleteServiceInstances(instances map[string]*serviceregistry.ServiceInstanceSpec) error { + client, err := n.getClient() + if err != nil { + return fmt.Errorf("%s get nacos client failed: %v", + n.superSpec.Name(), err) + } + + for _, instance := range instances { + deregisterInstance := n.serviceInstanceToDeregisterInstance(instance) + _, err := client.DeregisterInstance(*deregisterInstance) + if err != nil { + return err + } + } + + return nil +} + +// GetServiceInstance get service instance from the registry. +func (n *NacosServiceRegistry) GetServiceInstance(serviceName, instanceID string) (*serviceregistry.ServiceInstanceSpec, error) { + instances, err := n.ListServiceInstances(serviceName) + if err != nil { + return nil, err + } + + for _, instance := range instances { + if instance.InstanceID == instanceID { + return instance, nil + } + } + + return nil, fmt.Errorf("%s/%s not found", serviceName, instanceID) +} + +// ListServiceInstances list service instances of one service from the registry. +func (n *NacosServiceRegistry) ListServiceInstances(serviceName string) (map[string]*serviceregistry.ServiceInstanceSpec, error) { + client, err := n.getClient() + if err != nil { + return nil, fmt.Errorf("%s get nacos client failed: %v", + n.superSpec.Name(), err) + } + + service, err := client.GetService(vo.GetServiceParam{ + ServiceName: serviceName, + }) + if err != nil { + return nil, err + } + + instances := make(map[string]*serviceregistry.ServiceInstanceSpec) + for _, nacosInstance := range service.Hosts { + serviceInstance := n.nacosInstanceToServiceInstance(&nacosInstance) + err := serviceInstance.Validate() + if err != nil { + return nil, fmt.Errorf("%+v is invalid: %v", serviceInstance, err) + } + + instances[serviceInstance.Key()] = serviceInstance + } + + return instances, nil +} + +// ListAllServiceInstances list all service instances from the registry. +func (n *NacosServiceRegistry) ListAllServiceInstances() (map[string]*serviceregistry.ServiceInstanceSpec, error) { + client, err := n.getClient() + if err != nil { + return nil, fmt.Errorf("%s get nacos client failed: %v", + n.superSpec.Name(), err) + } + + serviceNames := []string{} + var pageNo uint32 = 1 + var pageSize uint32 = 1000 + for { + services, err := client.GetAllServicesInfo(vo.GetAllServiceInfoParam{ + PageNo: pageNo, + PageSize: pageSize, + }) + if err != nil { + return nil, fmt.Errorf("%s pull services failed: %v", + n.superSpec.Name(), err) + } + + if len(services.Doms) == 0 { + break + } + + serviceNames = append(serviceNames, services.Doms...) + } + + instances := make(map[string]*serviceregistry.ServiceInstanceSpec) + for _, serviceName := range serviceNames { + service, err := client.GetService(vo.GetServiceParam{ + ServiceName: serviceName, + }) + if err != nil { + return nil, err + } + + for _, nacosInstance := range service.Hosts { + serviceInstance := n.nacosInstanceToServiceInstance(&nacosInstance) + err := serviceInstance.Validate() + if err != nil { + return nil, fmt.Errorf("%+v is invalid: %v", serviceInstance, err) + } + + instances[serviceInstance.Key()] = serviceInstance + } + } + + return instances, nil +} + +func (n *NacosServiceRegistry) serviceInstanceToRegisterInstance(instance *serviceregistry.ServiceInstanceSpec) *vo.RegisterInstanceParam { + return &vo.RegisterInstanceParam{ + Metadata: map[string]string{ + MetaKeyRegistryName: instance.RegistryName, + MetaKeyInstanceID: instance.InstanceID, + }, + ServiceName: instance.ServiceName, + + Ip: instance.HostIP, + Port: uint64(instance.Port), + Weight: float64(instance.Weight), + Enable: true, + Healthy: true, + } +} + +func (n *NacosServiceRegistry) serviceInstanceToDeregisterInstance(instance *serviceregistry.ServiceInstanceSpec) *vo.DeregisterInstanceParam { + return &vo.DeregisterInstanceParam{ + ServiceName: instance.ServiceName, + Ip: instance.HostIP, + Port: uint64(instance.Port), + } +} + +func (n *NacosServiceRegistry) nacosInstanceToServiceInstance(nacosInstance *model.Instance) *serviceregistry.ServiceInstanceSpec { + instanceID := nacosInstance.Metadata[MetaKeyInstanceID] + if instanceID == "" { + instanceID = nacosInstance.InstanceId + } + + registryName := nacosInstance.Metadata[MetaKeyRegistryName] + if registryName == "" { + registryName = n.Name() + } + + instance := &serviceregistry.ServiceInstanceSpec{ + RegistryName: registryName, + ServiceName: nacosInstance.ServiceName, + InstanceID: instanceID, + HostIP: nacosInstance.Ip, + Port: uint16(nacosInstance.Port), + Weight: int(nacosInstance.Weight), + } + + return instance +} diff --git a/pkg/object/serviceregistry/service.go b/pkg/object/serviceregistry/service.go index ddb3c17245..3ddbd89911 100644 --- a/pkg/object/serviceregistry/service.go +++ b/pkg/object/serviceregistry/service.go @@ -23,19 +23,15 @@ import ( ) type ( - // ServiceSpec is the unified service spec in Easegress. - ServiceSpec struct { - RegistryName string - ServiceName string - Instances []*ServiceInstanceSpec - } - - // ServiceInstanceSpec is the unified service instance spec in Easegress. + // ServiceInstanceSpec is the service instance spec in Easegress. ServiceInstanceSpec struct { + // InstanceID is required. + InstanceID string `yaml:"name"` // RegistryName is required. RegistryName string `yaml:"registryName"` // ServiceName is required. ServiceName string `yaml:"serviceName"` + // Scheme is optional if Port is not empty. Scheme string `yaml:"scheme"` // Hostname is optional if HostIP is not empty. @@ -51,20 +47,6 @@ type ( } ) -// DeepCopy deep copies ServiceSpec. -func (s *ServiceSpec) DeepCopy() *ServiceSpec { - copy := &ServiceSpec{ - RegistryName: s.RegistryName, - ServiceName: s.ServiceName, - } - - for _, instance := range s.Instances { - copy.Instances = append(copy.Instances, instance.DeepCopy()) - } - - return copy -} - // DeepCopy deep copies ServiceInstanceSpec. func (s *ServiceInstanceSpec) DeepCopy() *ServiceInstanceSpec { copy := *s @@ -94,6 +76,11 @@ func (s *ServiceInstanceSpec) Validate() error { return nil } +// Key returns the unique key for the service instance. +func (s *ServiceInstanceSpec) Key() string { + return fmt.Sprintf("%s/%s/%s", s.RegistryName, s.ServiceName, s.InstanceID) +} + // URL returns the url of the server. func (s *ServiceInstanceSpec) URL() string { scheme := s.Scheme @@ -117,28 +104,29 @@ func (s *ServiceInstanceSpec) URL() string { } // NewRegistryEventFromDiff creates a registry event from diff old and new specs. -// It only generates Apply and Delete excluding Replace. -// We recommend external drivers use event.Replace in first time, then use this utiliy -// to generate next events. -func NewRegistryEventFromDiff(registryName string, oldSpecs, newSpecs map[string]*ServiceSpec) *RegistryEvent { +// It only uses Apply and Delete excluding Replace. +// External drivers should use event.Replace in first time, then use this utiliy to generate next events. +// registryName is only assigned to the event, the registry name of service instance spec won't change. +func NewRegistryEventFromDiff(registryName string, oldSpecs, newSpecs map[string]*ServiceInstanceSpec) *RegistryEvent { if oldSpecs == nil { - oldSpecs = make(map[string]*ServiceSpec) + oldSpecs = make(map[string]*ServiceInstanceSpec) } if newSpecs == nil { - newSpecs = make(map[string]*ServiceSpec) + newSpecs = make(map[string]*ServiceInstanceSpec) } event := &RegistryEvent{ - Delete: make(map[string]*ServiceSpec), - Apply: make(map[string]*ServiceSpec), + SourceRegistryName: registryName, + + Delete: make(map[string]*ServiceInstanceSpec), + Apply: make(map[string]*ServiceInstanceSpec), } for _, oldSpec := range oldSpecs { _, exists := newSpecs[oldSpec.ServiceName] if !exists { copy := oldSpec.DeepCopy() - copy.RegistryName = registryName event.Delete[oldSpec.ServiceName] = copy } } @@ -147,7 +135,6 @@ func NewRegistryEventFromDiff(registryName string, oldSpecs, newSpecs map[string oldSpec, exists := oldSpecs[newSpec.ServiceName] if exists && !reflect.DeepEqual(oldSpec, newSpec) { copy := newSpec.DeepCopy() - copy.RegistryName = registryName event.Apply[newSpec.ServiceName] = copy } } diff --git a/pkg/object/serviceregistry/serviceregistry.go b/pkg/object/serviceregistry/serviceregistry.go index e0c236bd6c..2583411e97 100644 --- a/pkg/object/serviceregistry/serviceregistry.go +++ b/pkg/object/serviceregistry/serviceregistry.go @@ -90,9 +90,15 @@ type ( Notify() <-chan *RegistryEvent // Operations to the registry. - ApplyServices(serviceSpec []*ServiceSpec) error - GetService(serviceName string) (*ServiceSpec, error) - ListServices() ([]*ServiceSpec, error) + // The key of maps is key of service instance. + ApplyServiceInstances(serviceInstances map[string]*ServiceInstanceSpec) error + DeleteServiceInstances(serviceInstances map[string]*ServiceInstanceSpec) error + // GetServiceInstance must return error if not found. + GetServiceInstance(serviceName, instanceID string) (*ServiceInstanceSpec, error) + // ListServiceInstances could return zero elements of instances with nil error. + ListServiceInstances(serviceName string) (map[string]*ServiceInstanceSpec, error) + // ListServiceInstances could return zero elements of instances with nil error. + ListAllServiceInstances() (map[string]*ServiceInstanceSpec, error) } ) @@ -143,7 +149,7 @@ func (sr *ServiceRegistry) watchRegistry(bucket *registryBucket) { case <-bucket.done: return case event := <-bucket.registry.Notify(): - event.RegistryName = bucket.registry.Name() + event.SourceRegistryName = bucket.registry.Name() sr.handleRegistryEvent(event) } @@ -154,9 +160,13 @@ func (sr *ServiceRegistry) handleRegistryEvent(event *RegistryEvent) { sr.mutex.Lock() defer sr.mutex.Unlock() - bucket, exists := sr.registryBuckets[event.RegistryName] - if !exists { - logger.Errorf("BUG: registry bucket %s not found", event.RegistryName) + sr._handleRegistryEvent(event) +} + +func (sr *ServiceRegistry) _handleRegistryEvent(event *RegistryEvent) { + bucket, exists := sr.registryBuckets[event.SourceRegistryName] + if !exists || !bucket.registered { + logger.Errorf("BUG: registry bucket %s not found", event.SourceRegistryName) return } @@ -164,36 +174,38 @@ func (sr *ServiceRegistry) handleRegistryEvent(event *RegistryEvent) { watcher.(*registryWatcher).EventChan() <- event.DeepCopy() } + // Quickly return for performance. + if len(bucket.serviceBuckets) == 0 { + return + } + + applyOrDeleteServices := make(map[string]struct{}) + for _, instance := range event.Apply { + applyOrDeleteServices[instance.ServiceName] = struct{}{} + } + for _, instance := range event.Delete { + applyOrDeleteServices[instance.ServiceName] = struct{}{} + } + for serviceName, serviceBucket := range bucket.serviceBuckets { - replace, replaceExists := event.Replace[serviceName] - apply, applyExists := event.Apply[serviceName] - del, delExists := event.Delete[serviceName] - - if replaceExists { - for _, watcher := range serviceBucket.serviceWatchers { - watcher.(*serviceWatcher).EventChan() <- &ServiceEvent{ - Apply: replace.DeepCopy(), - } - } + _, applyOrDelete := applyOrDeleteServices[serviceName] + if !event.UseReplace && !applyOrDelete { continue } - if applyExists { - for _, watcher := range serviceBucket.serviceWatchers { - watcher.(*serviceWatcher).EventChan() <- &ServiceEvent{ - Apply: apply.DeepCopy(), - } - } + instances, err := bucket.registry.ListServiceInstances(serviceName) + if err != nil { + logger.Errorf("list service instances of %s/%s failed: %v", + event.SourceRegistryName, serviceName, err) continue } - if delExists { - for _, watcher := range serviceBucket.serviceWatchers { - watcher.(*serviceWatcher).EventChan() <- &ServiceEvent{ - Delete: del.DeepCopy(), - } + for _, watcher := range serviceBucket.serviceWatchers { + watcher.(*serviceWatcher).EventChan() <- &ServiceEvent{ + RegistryName: event.SourceRegistryName, + ServiceName: serviceName, + Instances: instances, } - continue } } } @@ -208,6 +220,12 @@ func (sr *ServiceRegistry) DeregisterRegistry(registryName string) error { return fmt.Errorf("%s not found", registryName) } + cleanEvent := &RegistryEvent{ + SourceRegistryName: registryName, + UseReplace: true, + } + sr._handleRegistryEvent(cleanEvent) + bucket.registered = false close(bucket.done) @@ -218,8 +236,8 @@ func (sr *ServiceRegistry) DeregisterRegistry(registryName string) error { return nil } -// ApplyServices applies the services to the registry with change RegistryName of all specs. -func (sr *ServiceRegistry) ApplyServices(registryName string, serviceSpecs []*ServiceSpec) error { +// ApplyServiceInstances applies the services to the registry with change RegistryName of all specs. +func (sr *ServiceRegistry) ApplyServiceInstances(registryName string, serviceInstances map[string]*ServiceInstanceSpec) error { sr.mutex.Lock() defer sr.mutex.Unlock() @@ -228,15 +246,28 @@ func (sr *ServiceRegistry) ApplyServices(registryName string, serviceSpecs []*Se return fmt.Errorf("%s not found", registryName) } - for _, spec := range serviceSpecs { - spec.RegistryName = registryName + for _, instance := range serviceInstances { + instance.RegistryName = registryName + } + + return bucket.registry.ApplyServiceInstances(serviceInstances) +} + +// GetServiceInstance get service instance. +func (sr *ServiceRegistry) GetServiceInstance(registryName, serviceName, instanceID string) (*ServiceInstanceSpec, error) { + sr.mutex.Lock() + defer sr.mutex.Unlock() + + bucket, exists := sr.registryBuckets[registryName] + if !exists || !bucket.registered { + return nil, fmt.Errorf("%s not found", registryName) } - return bucket.registry.ApplyServices(serviceSpecs) + return bucket.registry.GetServiceInstance(serviceName, instanceID) } -// GetService gets the service of the registry. -func (sr *ServiceRegistry) GetService(registryName, serviceName string) (*ServiceSpec, error) { +// ListServiceInstances service instances of one service. +func (sr *ServiceRegistry) ListServiceInstances(registryName, serviceName string) (map[string]*ServiceInstanceSpec, error) { sr.mutex.Lock() defer sr.mutex.Unlock() @@ -245,11 +276,11 @@ func (sr *ServiceRegistry) GetService(registryName, serviceName string) (*Servic return nil, fmt.Errorf("%s not found", registryName) } - return bucket.registry.GetService(serviceName) + return bucket.registry.ListServiceInstances(serviceName) } -// ListServices lists all services of the registry. -func (sr *ServiceRegistry) ListServices(registryName string) ([]*ServiceSpec, error) { +// ListAllServiceInstances service instances of all services. +func (sr *ServiceRegistry) ListAllServiceInstances(registryName string) (map[string]*ServiceInstanceSpec, error) { sr.mutex.Lock() defer sr.mutex.Unlock() @@ -258,7 +289,7 @@ func (sr *ServiceRegistry) ListServices(registryName string) ([]*ServiceSpec, er return nil, fmt.Errorf("%s not found", registryName) } - return bucket.registry.ListServices() + return bucket.registry.ListAllServiceInstances() } // Category returns the category of ServiceRegistry. diff --git a/pkg/object/serviceregistry/watcher.go b/pkg/object/serviceregistry/watcher.go index 5d8ca8bfac..f8abdb1b67 100644 --- a/pkg/object/serviceregistry/watcher.go +++ b/pkg/object/serviceregistry/watcher.go @@ -25,29 +25,29 @@ import ( type ( // ServiceEvent is the event of service. - // Only one of Apply and Delete should be filled. + // It concludes complete instances of the service. ServiceEvent struct { - // Apply creates or updates the service. - Apply *ServiceSpec - - // Delete has optional service instances. - Delete *ServiceSpec + RegistryName string + ServiceName string + Instances map[string]*ServiceInstanceSpec } // RegistryEvent is the event of service registry. - // Only one of Init, Delete and Apply should be filled. + // If UseReplace is true, the event handler should use Replace field even it is empty. RegistryEvent struct { - RegistryName string + // SourceRegistryName is the registry which send the event, + // the RegistryName of specs may not be the same with it. + SourceRegistryName string + UseReplace bool - // Replace replaces all services of the registry. - Replace map[string]*ServiceSpec + // Replace replaces all service instances of the registry. + Replace map[string]*ServiceInstanceSpec - // Apply creates or updates services of the registry. - Apply map[string]*ServiceSpec + // Apply creates or updates service instances of the registry. + Apply map[string]*ServiceInstanceSpec - // Delete deletes services of the registry. - // The spec of element has optional service instances. - Delete map[string]*ServiceSpec + // Delete deletes service instaces of the registry. + Delete map[string]*ServiceInstanceSpec } // ServiceWatcher is the watcher of service. @@ -57,9 +57,6 @@ type ( RegistryName() string ServiceName() string - // Exists returns if the service exists. - Exists() bool - Watch() <-chan *ServiceEvent Stop() @@ -137,7 +134,6 @@ func (sr *ServiceRegistry) NewServiceWatcher(registryName, serviceName string) S registryName: registryName, serviceName: serviceName, eventChan: make(chan *ServiceEvent, 10), - existsFn: sr.serviceExistsFn(registryName, serviceName), stopFn: sr.serviceWacherStopFn(registryName, serviceName, id), } @@ -184,13 +180,6 @@ func (sr *ServiceRegistry) registryWacherStopFn(registryName, watcherID string) } } -func (sr *ServiceRegistry) serviceExistsFn(registryName, serviceName string) func() bool { - return func() bool { - _, err := sr.GetService(registryName, serviceName) - return err == nil - } -} - func (sr *ServiceRegistry) serviceWacherStopFn(registryName, serviceName, watcherID string) func() { return func() { sr.mutex.Lock() @@ -275,14 +264,16 @@ func (w *registryWatcher) Stop() { // DeepCopy deep copies ServiceEvent. func (e *ServiceEvent) DeepCopy() *ServiceEvent { - copy := &ServiceEvent{} - - if e.Apply != nil { - copy.Apply = e.Apply.DeepCopy() + copy := &ServiceEvent{ + RegistryName: e.RegistryName, + ServiceName: e.ServiceName, } - if e.Delete != nil { - copy.Delete = e.Delete.DeepCopy() + if e.Instances != nil { + copy.Instances = make(map[string]*ServiceInstanceSpec) + for k, v := range e.Instances { + copy.Instances[k] = v.DeepCopy() + } } return copy @@ -291,25 +282,26 @@ func (e *ServiceEvent) DeepCopy() *ServiceEvent { // DeepCopy deep copies RegistryEvent. func (e *RegistryEvent) DeepCopy() *RegistryEvent { copy := &RegistryEvent{ - RegistryName: e.RegistryName, + SourceRegistryName: e.SourceRegistryName, + Replace: e.Replace, } if e.Replace != nil { - copy.Replace = make(map[string]*ServiceSpec) + copy.Replace = make(map[string]*ServiceInstanceSpec) for k, v := range e.Replace { copy.Replace[k] = v.DeepCopy() } } if e.Apply != nil { - copy.Apply = make(map[string]*ServiceSpec) + copy.Apply = make(map[string]*ServiceInstanceSpec) for k, v := range e.Apply { copy.Apply[k] = v.DeepCopy() } } if e.Delete != nil { - copy.Delete = make(map[string]*ServiceSpec) + copy.Delete = make(map[string]*ServiceInstanceSpec) for k, v := range e.Delete { copy.Delete[k] = v.DeepCopy() } diff --git a/pkg/object/zookeeperserviceregistry/zookeeperserviceregistry.go b/pkg/object/zookeeperserviceregistry/zookeeperserviceregistry.go index 516b183a6b..a219d6fbac 100644 --- a/pkg/object/zookeeperserviceregistry/zookeeperserviceregistry.go +++ b/pkg/object/zookeeperserviceregistry/zookeeperserviceregistry.go @@ -18,16 +18,17 @@ package zookeeperserviceregistry import ( - "encoding/json" "fmt" + "path/filepath" "sync" "time" - zookeeper "github.com/go-zookeeper/zk" - "github.com/megaease/easegress/pkg/logger" "github.com/megaease/easegress/pkg/object/serviceregistry" "github.com/megaease/easegress/pkg/supervisor" + + zookeeper "github.com/go-zookeeper/zk" + "gopkg.in/yaml.v2" ) const ( @@ -50,14 +51,14 @@ type ( serviceRegistry *serviceregistry.ServiceRegistry firstDone bool - serviceSpecs map[string]*serviceregistry.ServiceSpec + instances map[string]*serviceregistry.ServiceInstanceSpec notify chan *serviceregistry.RegistryEvent connMutex sync.RWMutex conn *zookeeper.Conn - statusMutex sync.Mutex - serviceInstancesNum map[string]int + statusMutex sync.Mutex + instancesNum map[string]int done chan struct{} } @@ -114,11 +115,12 @@ func (zk *ZookeeperServiceRegistry) reload() { Instance().(*serviceregistry.ServiceRegistry) zk.serviceRegistry.RegisterRegistry(zk) zk.notify = make(chan *serviceregistry.RegistryEvent, 10) + zk.firstDone = false - zk.serviceInstancesNum = make(map[string]int) + zk.instancesNum = make(map[string]int) zk.done = make(chan struct{}) - _, err := zk.getConn() + _, err := zk.getClient() if err != nil { logger.Errorf("%s get zookeeper conn failed: %v", zk.superSpec.Name(), err) } @@ -126,7 +128,7 @@ func (zk *ZookeeperServiceRegistry) reload() { go zk.run() } -func (zk *ZookeeperServiceRegistry) getConn() (*zookeeper.Conn, error) { +func (zk *ZookeeperServiceRegistry) getClient() (*zookeeper.Conn, error) { zk.connMutex.RLock() if zk.conn != nil { conn := zk.conn @@ -135,10 +137,10 @@ func (zk *ZookeeperServiceRegistry) getConn() (*zookeeper.Conn, error) { } zk.connMutex.RUnlock() - return zk.buildConn() + return zk.buildClient() } -func (zk *ZookeeperServiceRegistry) buildConn() (*zookeeper.Conn, error) { +func (zk *ZookeeperServiceRegistry) buildClient() (*zookeeper.Conn, error) { zk.connMutex.Lock() defer zk.connMutex.Unlock() @@ -208,79 +210,33 @@ func (zk *ZookeeperServiceRegistry) run() { } func (zk *ZookeeperServiceRegistry) update() { - conn, err := zk.getConn() + instances, err := zk.ListAllServiceInstances() if err != nil { - logger.Errorf("%s get zookeeper conn failed: %v", - zk.superSpec.Name(), err) + logger.Errorf("list all service instances failed: %v", err) return } - childs, _, err := conn.Children(zk.spec.Prefix) - if err != nil { - logger.Errorf("%s get path: %s children failed: %v", zk.superSpec.Name(), zk.spec.Prefix, err) - return - } - - serviceSpecs := make(map[string]*serviceregistry.ServiceSpec) - serviceInstancesNum := map[string]int{} - for _, child := range childs { - fullPath := zk.spec.Prefix + "/" + child - data, _, err := conn.Get(fullPath) - if err != nil { - if err == zookeeper.ErrNoNode { - continue - } - - logger.Errorf("%s get child path %s failed: %v", zk.superSpec.Name(), fullPath, err) - return - } - - serviceInstanceSpec := &serviceregistry.ServiceInstanceSpec{} - // Note: zookeeper allows store custom format into one path, so we choose to store - // serviceregistry.Server JSON format directly. - err = json.Unmarshal(data, serviceInstanceSpec) - if err != nil { - logger.Errorf("%s unmarshal fullpath %s to json failed: %v", zk.superSpec.Name(), fullPath, err) - return - } - - if err := serviceInstanceSpec.Validate(); err != nil { - logger.Errorf("%s is invalid: %v", data, err) - continue - } - - serviceName := serviceInstanceSpec.ServiceName - - serviceSpec, exists := serviceSpecs[serviceName] - if !exists { - serviceSpecs[serviceName] = &serviceregistry.ServiceSpec{ - RegistryName: zk.Name(), - ServiceName: serviceName, - Instances: []*serviceregistry.ServiceInstanceSpec{serviceInstanceSpec}, - } - } else { - serviceSpec.Instances = append(serviceSpec.Instances, serviceInstanceSpec) - } - - serviceInstancesNum[fullPath]++ + instancesNum := make(map[string]int) + for _, instance := range instances { + instancesNum[instance.ServiceName]++ } var event *serviceregistry.RegistryEvent if !zk.firstDone { zk.firstDone = true event = &serviceregistry.RegistryEvent{ - RegistryName: zk.Name(), - Replace: serviceSpecs, + SourceRegistryName: zk.Name(), + Replace: instances, } } else { - event = serviceregistry.NewRegistryEventFromDiff(zk.Name(), zk.serviceSpecs, serviceSpecs) + event = serviceregistry.NewRegistryEventFromDiff(zk.Name(), zk.instances, instances) } zk.notify <- event - zk.serviceSpecs = serviceSpecs + zk.instances = instances zk.statusMutex.Lock() - zk.serviceInstancesNum = serviceInstancesNum + zk.instancesNum = instancesNum zk.statusMutex.Unlock() } @@ -288,7 +244,7 @@ func (zk *ZookeeperServiceRegistry) update() { func (zk *ZookeeperServiceRegistry) Status() *supervisor.Status { s := &Status{} - _, err := zk.getConn() + _, err := zk.getClient() if err != nil { s.Health = err.Error() } else { @@ -296,7 +252,7 @@ func (zk *ZookeeperServiceRegistry) Status() *supervisor.Status { } zk.statusMutex.Lock() - s.ServiceInstancesNum = zk.serviceInstancesNum + s.ServiceInstancesNum = zk.instancesNum zk.statusMutex.Unlock() return &supervisor.Status{ @@ -322,20 +278,160 @@ func (zk *ZookeeperServiceRegistry) Notify() <-chan *serviceregistry.RegistryEve return zk.notify } -// ApplyServices applies service specs to zookeeper registry. -func (zk *ZookeeperServiceRegistry) ApplyServices(serviceSpec []*serviceregistry.ServiceSpec) error { - // TODO +// ApplyServiceInstances applies service instances to the registry. +func (zk *ZookeeperServiceRegistry) ApplyServiceInstances(serviceInstances map[string]*serviceregistry.ServiceInstanceSpec) error { + client, err := zk.getClient() + if err != nil { + return fmt.Errorf("%s get consul client failed: %v", + zk.superSpec.Name(), err) + } + + for _, instance := range serviceInstances { + err := instance.Validate() + if err != nil { + return fmt.Errorf("%+v is invalid: %v", instance, err) + } + } + + for _, instance := range serviceInstances { + buff, err := yaml.Marshal(instance) + if err != nil { + return fmt.Errorf("marshal %+v to yaml failed: %v", instance, err) + } + + path := zk.serviceInstanceZookeeperPath(instance) + _, err = client.Set(path, buff, 0) + if err != nil { + return err + } + } + + return nil +} + +// DeleteServiceInstances applies service instances to the registry. +func (zk *ZookeeperServiceRegistry) DeleteServiceInstances(serviceInstances map[string]*serviceregistry.ServiceInstanceSpec) error { + client, err := zk.getClient() + if err != nil { + return fmt.Errorf("%s get consul client failed: %v", + zk.superSpec.Name(), err) + } + + for _, instance := range serviceInstances { + path := zk.serviceInstanceZookeeperPath(instance) + err := client.Delete(path, 0) + if err != nil { + return err + } + } + return nil } -// GetService applies service specs to zookeeper registry. -func (zk *ZookeeperServiceRegistry) GetService(serviceName string) (*serviceregistry.ServiceSpec, error) { - // TODO - return nil, nil +// GetServiceInstance get service instance from the registry. +func (zk *ZookeeperServiceRegistry) GetServiceInstance(serviceName, instanceID string) (*serviceregistry.ServiceInstanceSpec, error) { + serviceInstances, err := zk.ListServiceInstances(serviceName) + if err != nil { + return nil, err + } + + for _, instance := range serviceInstances { + if instance.InstanceID == instanceID { + return instance, nil + } + } + + return nil, fmt.Errorf("%s/%s not found", serviceName, instanceID) +} + +// ListServiceInstances list service instances of one service from the registry. +func (zk *ZookeeperServiceRegistry) ListServiceInstances(serviceName string) (map[string]*serviceregistry.ServiceInstanceSpec, error) { + client, err := zk.getClient() + if err != nil { + return nil, fmt.Errorf("%s get consul client failed: %v", + zk.superSpec.Name(), err) + } + + childs, _, err := client.Children(zk.serviceZookeeperPrefix(serviceName)) + if err != nil { + return nil, fmt.Errorf("%s get path: %s children failed: %v", zk.superSpec.Name(), zk.spec.Prefix, err) + } + + instances := make(map[string]*serviceregistry.ServiceInstanceSpec) + for _, child := range childs { + fullPath := zk.fullPathOfChild(child) + data, _, err := client.Get(fullPath) + if err != nil { + return nil, fmt.Errorf("%s get child path %s failed: %v", zk.superSpec.Name(), fullPath, err) + } + + instance := &serviceregistry.ServiceInstanceSpec{} + err = yaml.Unmarshal(data, instance) + if err != nil { + return nil, fmt.Errorf("%s unmarshal fullpath %s to yaml failed: %v", zk.superSpec.Name(), fullPath, err) + } + + err = instance.Validate() + if err != nil { + return nil, fmt.Errorf("%s is invalid: %v", data, err) + } + + instances[instance.Key()] = instance + } + + return instances, nil +} + +// ListAllServiceInstances list all service instances from the registry. +func (zk *ZookeeperServiceRegistry) ListAllServiceInstances() (map[string]*serviceregistry.ServiceInstanceSpec, error) { + client, err := zk.getClient() + if err != nil { + return nil, fmt.Errorf("%s get consul client failed: %v", + zk.superSpec.Name(), err) + } + + childs, _, err := client.Children(zk.spec.Prefix) + if err != nil { + return nil, fmt.Errorf("%s get path: %s children failed: %v", zk.superSpec.Name(), zk.spec.Prefix, err) + } + + instances := make(map[string]*serviceregistry.ServiceInstanceSpec) + for _, child := range childs { + fullPath := zk.fullPathOfChild(child) + data, _, err := client.Get(fullPath) + if err != nil { + return nil, fmt.Errorf("%s get child path %s failed: %v", zk.superSpec.Name(), fullPath, err) + } + + instance := &serviceregistry.ServiceInstanceSpec{} + err = yaml.Unmarshal(data, instance) + if err != nil { + return nil, fmt.Errorf("%s unmarshal fullpath %s to yaml failed: %v", zk.superSpec.Name(), fullPath, err) + } + + err = instance.Validate() + if err != nil { + return nil, fmt.Errorf("%s is invalid: %v", data, err) + } + + instances[instance.Key()] = instance + } + + return instances, nil +} + +func (zk *ZookeeperServiceRegistry) fullPathOfChild(childPath string) string { + return filepath.Join(zk.spec.Prefix, childPath) +} + +func (zk *ZookeeperServiceRegistry) serviceZookeeperPrefix(serviceName string) string { + return filepath.Join(zk.spec.Prefix, serviceName) + "/" +} + +func (zk *ZookeeperServiceRegistry) serviceInstanceZookeeperPath(instance *serviceregistry.ServiceInstanceSpec) string { + return zk.serviceInstanceZookeeperPathFromRaw(instance.ServiceName, instance.InstanceID) } -// ListServices lists service specs from zookeeper registry. -func (zk *ZookeeperServiceRegistry) ListServices() ([]*serviceregistry.ServiceSpec, error) { - // TODO - return nil, nil +func (zk *ZookeeperServiceRegistry) serviceInstanceZookeeperPathFromRaw(serviceName, instanceID string) string { + return filepath.Join(zk.spec.Prefix, serviceName, instanceID) } diff --git a/pkg/registry/registry.go b/pkg/registry/registry.go index f3f52539cf..6b6f61f630 100644 --- a/pkg/registry/registry.go +++ b/pkg/registry/registry.go @@ -46,6 +46,7 @@ import ( _ "github.com/megaease/easegress/pkg/object/httpserver" _ "github.com/megaease/easegress/pkg/object/ingresscontroller" _ "github.com/megaease/easegress/pkg/object/meshcontroller" + _ "github.com/megaease/easegress/pkg/object/nacosserviceregistry" _ "github.com/megaease/easegress/pkg/object/rawconfigtrafficcontroller" _ "github.com/megaease/easegress/pkg/object/trafficcontroller" _ "github.com/megaease/easegress/pkg/object/websocketserver"