From 9a68fb5e247b42412071fa3e373de33011df2f59 Mon Sep 17 00:00:00 2001 From: Yun Long Date: Mon, 16 Aug 2021 00:12:05 +0800 Subject: [PATCH] [mesh]: Complete mesh registry syncer --- pkg/filter/proxy/server_test.go | 125 ------------ pkg/object/meshcontroller/master/master.go | 10 +- .../meshcontroller/master/registrysyncer.go | 178 ++++++++++++++++-- pkg/object/meshcontroller/service/service.go | 8 + pkg/object/meshcontroller/spec/spec.go | 4 +- pkg/object/serviceregistry/service.go | 4 +- pkg/object/serviceregistry/serviceregistry.go | 38 +++- pkg/object/serviceregistry/watcher.go | 35 +++- 8 files changed, 242 insertions(+), 160 deletions(-) diff --git a/pkg/filter/proxy/server_test.go b/pkg/filter/proxy/server_test.go index 4cc25f0b5a..072d7afab8 100644 --- a/pkg/filter/proxy/server_test.go +++ b/pkg/filter/proxy/server_test.go @@ -310,128 +310,3 @@ func TestStaticServers(t *testing.T) { } } } - -// TODO: Mock supervisor to test dynamic server. - -// func TestServers(t *testing.T) { -// servers := []*Server{ -// { -// URL: "http://127.0.0.1:9090", -// Tags: []string{"d1", "v1", "green"}, -// Weight: 1, -// }, -// { -// URL: "http://127.0.0.1:9091", -// Tags: []string{"v1", "d1", "green"}, -// Weight: 2, -// }, -// { -// URL: "http://127.0.0.1:9092", -// Tags: []string{"green", "d1", "v1"}, -// Weight: 3, -// }, -// { -// URL: "http://127.0.0.1:9093", -// Tags: []string{"v1"}, -// Weight: 4, -// }, -// { -// URL: "http://127.0.0.1:9094", -// Tags: []string{"v1", "v3"}, -// Weight: 5, -// }, -// } -// ps := &PoolSpec{ -// ServersTags: []string{}, -// ServiceRegistry: "service registry", -// ServiceName: "service name", -// Servers: []*Server{}, -// } -// -// ctx := &contexttest.MockedHTTPContext{} -// -// s := newServers(ps) -// if s.len() != 0 { -// t.Errorf("servers.len() should be 0") -// } -// s.close() -// -// ps = &PoolSpec{ -// ServersTags: []string{}, -// ServiceRegistry: "service registry", -// ServiceName: "service name", -// Servers: servers, -// } -// s = newServers(ps) -// if s.len() != len(servers) { -// t.Errorf("servers.len() is not %d", len(servers)) -// } -// for i := 0; i < len(servers); i++ { -// if svr, e := s.next(ctx); e != nil || svr != servers[i] { -// t.Errorf("ss.next() returns unexpected server") -// } -// } -// s.close() -// -// ps = &PoolSpec{ -// ServersTags: []string{}, -// ServiceRegistry: "service registry", -// Servers: []*Server{}, -// ServiceName: "testservice", -// } -// fnGetService.Store(func(serviceRegistry, serviceName string) (*serviceregistry.Service, error) { -// return nil, fmt.Errorf("dummy error") -// }) -// s = newServers(ps) -// if s.len() != 0 { -// t.Errorf("servers.len() should be 0") -// } -// s.close() -// -// svcservers := []*serviceregistry.Server{ -// { -// ServiceName: "testservice", -// Hostname: "server1", -// HostIP: "192.168.1.1", -// Port: 80, -// }, -// { -// ServiceName: "testservice", -// Hostname: "server2", -// HostIP: "192.168.1.2", -// Port: 80, -// }, -// { -// ServiceName: "testservice", -// Hostname: "server3", -// HostIP: "192.168.1.3", -// Port: 80, -// }, -// } -// service, _ := serviceregistry.NewService("testservice", svcservers) -// -// fnGetService.Store(func(serviceRegistry, serviceName string) (*serviceregistry.Service, error) { -// return service, nil -// }) -// s = newServers(ps) -// if s.len() != len(service.Servers()) { -// t.Errorf("servers.len() is not %d", len(service.Servers())) -// } -// -// svcservers = append(svcservers, &serviceregistry.Server{ -// ServiceName: "testservice", -// Hostname: "server4", -// HostIP: "192.168.1.4", -// Port: 80, -// }) -// service.Update(svcservers) -// -// time.Sleep(100 * time.Millisecond) -// if s.len() != len(service.Servers()) { -// t.Errorf("servers.len() is not %d", len(service.Servers())) -// } -// -// service.Close("close") -// s.close() -// } -// diff --git a/pkg/object/meshcontroller/master/master.go b/pkg/object/meshcontroller/master/master.go index dd1ebd328e..aa3b543985 100644 --- a/pkg/object/meshcontroller/master/master.go +++ b/pkg/object/meshcontroller/master/master.go @@ -45,8 +45,9 @@ type ( spec *spec.Admin maxHeartbeatTimeout time.Duration - store storage.Storage - service *service.Service + registrySyncer *registrySyncer + store storage.Storage + service *service.Service done chan struct{} } @@ -64,8 +65,9 @@ func New(superSpec *supervisor.Spec) *Master { superSpec: superSpec, spec: adminSpec, - store: store, - service: service.New(superSpec), + store: store, + service: service.New(superSpec), + registrySyncer: newRegistrySyncer(superSpec), done: make(chan struct{}), } diff --git a/pkg/object/meshcontroller/master/registrysyncer.go b/pkg/object/meshcontroller/master/registrysyncer.go index e6e7d2f835..590f8de54e 100644 --- a/pkg/object/meshcontroller/master/registrysyncer.go +++ b/pkg/object/meshcontroller/master/registrysyncer.go @@ -20,42 +20,57 @@ package master import ( "time" + "github.com/megaease/easegress/pkg/logger" + "github.com/megaease/easegress/pkg/object/meshcontroller/informer" "github.com/megaease/easegress/pkg/object/meshcontroller/service" "github.com/megaease/easegress/pkg/object/meshcontroller/spec" + "github.com/megaease/easegress/pkg/object/meshcontroller/storage" + "github.com/megaease/easegress/pkg/object/serviceregistry" "github.com/megaease/easegress/pkg/supervisor" + "github.com/megaease/easegress/pkg/util/stringtool" +) + +const ( + syncInterval = 10 * time.Second ) type ( registrySyncer struct { - superSpec *supervisor.Spec - spec *spec.Admin - syncInterval time.Duration + superSpec *supervisor.Spec + spec *spec.Admin - service *service.Service + externalInstances map[string]*serviceregistry.ServiceInstanceSpec + service *service.Service + informer informer.Informer + serviceRegistry *serviceregistry.ServiceRegistry + registryWatcher serviceregistry.RegistryWatcher done chan struct{} } - - // ExternalRegistry is the interface of external service registry. - ExternalRegistry interface { - SyncFromExternal() - SyncToExternal() - ListExternalServices() - Notify() <-chan struct{} - } ) -func newRegistrySyncer(superSpec *supervisor.Spec, registryController string) *registrySyncer { +func newRegistrySyncer(superSpec *supervisor.Spec) *registrySyncer { spec := superSpec.ObjectSpec().(*spec.Admin) rs := ®istrySyncer{ superSpec: superSpec, spec: spec, - // syncInterval: syncInteral, - service: service.New(superSpec), - done: make(chan struct{}), + + done: make(chan struct{}), + } + + if spec.ExternalServiceRegistry == "" { + return rs } + store := storage.New(superSpec.Name(), superSpec.Super().Cluster()) + rs.service = service.New(superSpec) + rs.informer = informer.NewInformer(store, "") + rs.informer.OnAllServiceInstanceSpecs(rs.serviceInstanceSpecsFunc) + + rs.serviceRegistry = superSpec.Super().MustGetSystemController(serviceregistry.Kind).Instance().(*serviceregistry.ServiceRegistry) + rs.registryWatcher = rs.serviceRegistry.NewRegistryWatcher(spec.ExternalServiceRegistry) + go rs.run() return rs @@ -66,12 +81,137 @@ func (rs *registrySyncer) run() { select { case <-rs.done: return - case <-time.After(rs.syncInterval): - rs.sync() + case event := <-rs.registryWatcher.Watch(): + rs.handleEvent(event) + } + } +} + +func (rs *registrySyncer) handleEvent(event *serviceregistry.RegistryEvent) { + defer func() { + if r := recover(); r != nil { + logger.Errorf("recover from %v", r) + } + }() + + if event.UseReplace { + oldInstances := rs.service.ListAllServiceInstanceSpecs() + for _, oldInstance := range oldInstances { + if oldInstance.RegistryName != rs.externalRegistryName() { + continue + } + + instance := &serviceregistry.ServiceInstanceSpec{ + RegistryName: oldInstance.RegistryName, + ServiceName: oldInstance.ServiceName, + InstanceID: oldInstance.InstanceID, + } + _, existed := event.Replace[instance.Key()] + if !existed { + rs.service.DeleteServiceInstanceSpec(oldInstance.ServiceName, oldInstance.InstanceID) + } + } + + for _, instance := range event.Replace { + rs.service.PutServiceInstanceSpec(rs.externalToMeshInstance(instance)) + } + return + } + + for _, instance := range event.Delete { + rs.service.DeleteServiceInstanceSpec(instance.ServiceName, instance.InstanceID) + } + for _, instance := range event.Apply { + rs.service.PutServiceInstanceSpec(rs.externalToMeshInstance(instance)) + } +} + +func (rs *registrySyncer) serviceInstanceSpecsFunc(meshInstances map[string]*spec.ServiceInstanceSpec) bool { + oldInstances, err := rs.serviceRegistry.ListAllServiceInstances(rs.spec.ExternalServiceRegistry) + if err != nil { + logger.Errorf("list all service instances of %s: %v", rs.spec.ExternalServiceRegistry, err) + return true + } + oldInstances = rs.filterExternalInstances(oldInstances, rs.meshRegistryName()) + + meshInstances = rs.filterMeshInstances(meshInstances, "", rs.meshRegistryName()) + newInstances := rs.meshToExternalInstances(meshInstances) + + event := serviceregistry.NewRegistryEventFromDiff(rs.meshRegistryName(), oldInstances, newInstances) + if len(event.Apply) != 0 { + rs.serviceRegistry.ApplyServiceInstances(rs.externalRegistryName(), event.Apply) + } + if len(event.Delete) != 0 { + rs.serviceRegistry.DeleteServiceInstances(rs.externalRegistryName(), event.Delete) + } + + return true +} + +func (rs *registrySyncer) meshRegistryName() string { + return rs.superSpec.Name() +} + +func (rs *registrySyncer) externalRegistryName() string { + return rs.spec.ExternalServiceRegistry +} + +func (rs *registrySyncer) filterExternalInstances(instances map[string]*serviceregistry.ServiceInstanceSpec, registryNames ...string) map[string]*serviceregistry.ServiceInstanceSpec { + result := make(map[string]*serviceregistry.ServiceInstanceSpec) + + for _, instance := range instances { + if stringtool.StrInSlice(instance.RegistryName, registryNames) { + result[instance.Key()] = instance + } + } + + return result +} + +func (rs *registrySyncer) filterMeshInstances(instances map[string]*spec.ServiceInstanceSpec, registryNames ...string) map[string]*spec.ServiceInstanceSpec { + result := make(map[string]*spec.ServiceInstanceSpec) + + for _, instance := range instances { + if stringtool.StrInSlice(instance.RegistryName, registryNames) { + result[instance.InstanceID] = instance + } + } + + return result +} + +func (rs *registrySyncer) meshToExternalInstances(instances map[string]*spec.ServiceInstanceSpec) map[string]*serviceregistry.ServiceInstanceSpec { + result := make(map[string]*serviceregistry.ServiceInstanceSpec) + + for _, instance := range instances { + externalInstance := &serviceregistry.ServiceInstanceSpec{ + RegistryName: instance.RegistryName, + ServiceName: instance.ServiceName, + InstanceID: instance.InstanceID, + + HostIP: instance.IP, + Port: uint16(instance.Port), } + result[externalInstance.Key()] = externalInstance } + + return result } -func (rs *registrySyncer) sync() { +func (rs *registrySyncer) externalToMeshInstance(instance *serviceregistry.ServiceInstanceSpec) *spec.ServiceInstanceSpec { + return &spec.ServiceInstanceSpec{ + RegistryName: instance.RegistryName, + ServiceName: instance.ServiceName, + InstanceID: instance.InstanceID, + IP: instance.HostIP, + Port: uint32(instance.Port), + } +} + +func (rs *registrySyncer) close() { + close(rs.done) + if rs.informer != nil { + rs.informer.Close() + } } diff --git a/pkg/object/meshcontroller/service/service.go b/pkg/object/meshcontroller/service/service.go index 4544017c43..d179acd776 100644 --- a/pkg/object/meshcontroller/service/service.go +++ b/pkg/object/meshcontroller/service/service.go @@ -322,6 +322,14 @@ func (s *Service) PutServiceInstanceSpec(_spec *spec.ServiceInstanceSpec) { } } +// DeleteServiceInstanceSpec deletes the service instance spec. +func (s *Service) DeleteServiceInstanceSpec(serviceName, instanceID string) { + err := s.store.Delete(layout.ServiceInstanceSpecKey(serviceName, instanceID)) + if err != nil { + api.ClusterPanic(err) + } +} + // ListTenantSpecs lists tenant specs func (s *Service) ListTenantSpecs() []*spec.Tenant { tenants := []*spec.Tenant{} diff --git a/pkg/object/meshcontroller/spec/spec.go b/pkg/object/meshcontroller/spec/spec.go index 0bf263bda3..d1cfe2fd20 100644 --- a/pkg/object/meshcontroller/spec/spec.go +++ b/pkg/object/meshcontroller/spec/spec.go @@ -90,7 +90,7 @@ type ( // IngressPort is the port for http server in mesh ingress IngressPort int `yaml:"ingressPort" jsonschema:"required"` - ServiceRegistry string `yaml:"serviceRegistry" jsonschema:"omitempty"` + ExternalServiceRegistry string `yaml:"externalServiceRegistry" jsonschema:"omitempty"` } // Service contains the information of service. @@ -225,6 +225,8 @@ type ( // ServiceInstanceSpec is the spec of service instance. ServiceInstanceSpec struct { + // Backward compatibility: empty RegistryName means it is a mesh service. + RegistryName string // Provide by registry client ServiceName string `yaml:"serviceName" jsonschema:"required"` InstanceID string `yaml:"instanceID" jsonschema:"required"` diff --git a/pkg/object/serviceregistry/service.go b/pkg/object/serviceregistry/service.go index 3ddbd89911..e279dd728a 100644 --- a/pkg/object/serviceregistry/service.go +++ b/pkg/object/serviceregistry/service.go @@ -25,12 +25,12 @@ import ( type ( // ServiceInstanceSpec is the service instance spec in Easegress. ServiceInstanceSpec struct { - // InstanceID is required. - InstanceID string `yaml:"name"` // RegistryName is required. RegistryName string `yaml:"registryName"` // ServiceName is required. ServiceName string `yaml:"serviceName"` + // InstanceID is required. + InstanceID string `yaml:"name"` // Scheme is optional if Port is not empty. Scheme string `yaml:"scheme"` diff --git a/pkg/object/serviceregistry/serviceregistry.go b/pkg/object/serviceregistry/serviceregistry.go index 2583411e97..ba7597d45d 100644 --- a/pkg/object/serviceregistry/serviceregistry.go +++ b/pkg/object/serviceregistry/serviceregistry.go @@ -202,9 +202,8 @@ func (sr *ServiceRegistry) _handleRegistryEvent(event *RegistryEvent) { for _, watcher := range serviceBucket.serviceWatchers { watcher.(*serviceWatcher).EventChan() <- &ServiceEvent{ - RegistryName: event.SourceRegistryName, - ServiceName: serviceName, - Instances: instances, + SourceRegistryName: event.SourceRegistryName, + Instances: instances, } } } @@ -241,6 +240,10 @@ func (sr *ServiceRegistry) ApplyServiceInstances(registryName string, serviceIns sr.mutex.Lock() defer sr.mutex.Unlock() + return sr._applyServiceInstances(registryName, serviceInstances) +} + +func (sr *ServiceRegistry) _applyServiceInstances(registryName string, serviceInstances map[string]*ServiceInstanceSpec) error { bucket, exists := sr.registryBuckets[registryName] if !exists || !bucket.registered { return fmt.Errorf("%s not found", registryName) @@ -258,6 +261,10 @@ func (sr *ServiceRegistry) GetServiceInstance(registryName, serviceName, instanc sr.mutex.Lock() defer sr.mutex.Unlock() + return sr._getServiceInstance(registryName, serviceName, instanceID) +} + +func (sr *ServiceRegistry) _getServiceInstance(registryName, serviceName, instanceID string) (*ServiceInstanceSpec, error) { bucket, exists := sr.registryBuckets[registryName] if !exists || !bucket.registered { return nil, fmt.Errorf("%s not found", registryName) @@ -271,6 +278,10 @@ func (sr *ServiceRegistry) ListServiceInstances(registryName, serviceName string sr.mutex.Lock() defer sr.mutex.Unlock() + return sr._listServiceInstances(registryName, serviceName) +} + +func (sr *ServiceRegistry) _listServiceInstances(registryName, serviceName string) (map[string]*ServiceInstanceSpec, error) { bucket, exists := sr.registryBuckets[registryName] if !exists || !bucket.registered { return nil, fmt.Errorf("%s not found", registryName) @@ -279,11 +290,32 @@ func (sr *ServiceRegistry) ListServiceInstances(registryName, serviceName string return bucket.registry.ListServiceInstances(serviceName) } +// DeleteServiceInstances service instances of one service. +func (sr *ServiceRegistry) DeleteServiceInstances(registryName string, serviceInstances map[string]*ServiceInstanceSpec) error { + sr.mutex.Lock() + defer sr.mutex.Unlock() + + return sr._deleteServiceInstances(registryName, serviceInstances) +} + +func (sr *ServiceRegistry) _deleteServiceInstances(registryName string, serviceInstances map[string]*ServiceInstanceSpec) error { + bucket, exists := sr.registryBuckets[registryName] + if !exists || !bucket.registered { + return fmt.Errorf("%s not found", registryName) + } + + return bucket.registry.DeleteServiceInstances(serviceInstances) +} + // ListAllServiceInstances service instances of all services. func (sr *ServiceRegistry) ListAllServiceInstances(registryName string) (map[string]*ServiceInstanceSpec, error) { sr.mutex.Lock() defer sr.mutex.Unlock() + return sr._listAllServiceInstances(registryName) +} + +func (sr *ServiceRegistry) _listAllServiceInstances(registryName string) (map[string]*ServiceInstanceSpec, error) { bucket, exists := sr.registryBuckets[registryName] if !exists || !bucket.registered { return nil, fmt.Errorf("%s not found", registryName) diff --git a/pkg/object/serviceregistry/watcher.go b/pkg/object/serviceregistry/watcher.go index f8abdb1b67..6953ac102d 100644 --- a/pkg/object/serviceregistry/watcher.go +++ b/pkg/object/serviceregistry/watcher.go @@ -21,21 +21,23 @@ import ( "fmt" "github.com/google/uuid" + "github.com/megaease/easegress/pkg/logger" ) type ( // ServiceEvent is the event of service. // It concludes complete instances of the service. ServiceEvent struct { - RegistryName string - ServiceName string - Instances map[string]*ServiceInstanceSpec + // SourceRegistryName is the registry which caused the event, + // the RegistryName of specs may not be the same with it. + SourceRegistryName string + Instances map[string]*ServiceInstanceSpec } // RegistryEvent is the event of service registry. // If UseReplace is true, the event handler should use Replace field even it is empty. RegistryEvent struct { - // SourceRegistryName is the registry which send the event, + // SourceRegistryName is the registry which caused the event, // the RegistryName of specs may not be the same with it. SourceRegistryName string UseReplace bool @@ -119,6 +121,17 @@ func (sr *ServiceRegistry) NewRegistryWatcher(registryName string) RegistryWatch sr.registryBuckets[registryName].registryWatchers[id] = watcher + instances, err := sr._listAllServiceInstances(registryName) + if err != nil { + logger.Warnf("watch registry %s: list service instances failed: %v", registryName, err) + } else { + watcher.EventChan() <- &RegistryEvent{ + SourceRegistryName: registryName, + UseReplace: true, + Replace: instances, + } + } + return watcher } @@ -148,6 +161,17 @@ func (sr *ServiceRegistry) NewServiceWatcher(registryName, serviceName string) S } sr.registryBuckets[registryName].serviceBuckets[serviceName].serviceWatchers[id] = watcher + instances, err := sr._listServiceInstances(registryName, serviceName) + if err != nil { + logger.Warnf("watch service %s/%s: list service instances failed: %v", + registryName, serviceName, err) + } else { + watcher.EventChan() <- &ServiceEvent{ + SourceRegistryName: registryName, + Instances: instances, + } + } + return watcher } @@ -265,8 +289,7 @@ func (w *registryWatcher) Stop() { // DeepCopy deep copies ServiceEvent. func (e *ServiceEvent) DeepCopy() *ServiceEvent { copy := &ServiceEvent{ - RegistryName: e.RegistryName, - ServiceName: e.ServiceName, + SourceRegistryName: e.SourceRegistryName, } if e.Instances != nil {