From eff87302122aaec381a4ebbc2c3124bff23c98e6 Mon Sep 17 00:00:00 2001 From: Yun Long Date: Thu, 19 Aug 2021 23:54:19 +0800 Subject: [PATCH] [doc]: Add NacosServiceRegistry --- doc/controllers.md | 48 +++++++++++---- example/config/server-001-instance-003.yaml | 7 +++ example/config/server-002-instance-001.yaml | 5 ++ .../meshcontroller/master/registrysyncer.go | 27 +++++++-- .../meshcontroller/registrycenter/registry.go | 15 +++-- pkg/object/meshcontroller/spec/spec.go | 9 ++- pkg/object/meshcontroller/worker/worker.go | 2 +- .../nacosserviceregistry.go | 6 +- pkg/object/serviceregistry/service.go | 15 +++-- pkg/object/serviceregistry/service_test.go | 59 +++++++++++++++++++ pkg/object/serviceregistry/serviceregistry.go | 4 -- pkg/object/serviceregistry/watcher.go | 3 + 12 files changed, 161 insertions(+), 39 deletions(-) create mode 100644 example/config/server-001-instance-003.yaml create mode 100644 example/config/server-002-instance-001.yaml create mode 100644 pkg/object/serviceregistry/service_test.go diff --git a/doc/controllers.md b/doc/controllers.md index 1da5b81c21..a0a109574a 100644 --- a/doc/controllers.md +++ b/doc/controllers.md @@ -1,4 +1,3 @@ - # Controllers - [Controllers](#controllers) @@ -17,6 +16,7 @@ - [EtcdServiceRegistry](#etcdserviceregistry) - [EurekaServiceRegistry](#eurekaserviceregistry) - [ZookeeperServiceRegistry](#zookeeperserviceregistry) + - [NacosServiceRegistry](#nacosserviceregistry) - [Common Types](#common-types) - [tracing.Spec](#tracingspec) - [zipkin.Spec](#zipkinspec) @@ -27,6 +27,7 @@ - [httppipeline.Flow](#httppipelineflow) - [httppipeline.Filter](#httppipelinefilter) - [easemonitormetrics.Kafka](#easemonitormetricskafka) + - [nacos.ServerSpec](#nacosserverspec) As the [architecture diagram](./architecture.png) shows, the controller is the core entity to control kinds of working. There are two kinds of controllers overall: @@ -151,18 +152,18 @@ ingressClass: easegress httpServer: port: 8080 https: false - keepAlive: true - keepAliveTimeout: 60s + keepAlive: true + keepAliveTimeout: 60s maxConnections: 10240 ``` -| Name | Type | Description | Required | -| ------------ | -------- | ------------------------------------------------------------------------- | --------------------- | -| kubeConfig | string | Path of the Kubernetes configuration file. | No | -| masterURL | string | The address of the Kubernetes API server. | No | -| namespaces | []string | An array of Kubernetes namespaces which the IngressController needs to watch, all namespaces are watched if left empty. | No | -| ingressClass | string | The IngressController only handles `Ingresses` with `ingressClassName` set to the value of this option. | No (default: easegress) | -| httpServer | [httpserver.Spec](#httpserver) | Basic configuration for the shared HTTP traffic gate. The routing rules will be generated dynamically according to Kubernetes ingresses and should not be specified here. | Yes | +| Name | Type | Description | Required | +| ------------ | ------------------------------ | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ----------------------- | +| kubeConfig | string | Path of the Kubernetes configuration file. | No | +| masterURL | string | The address of the Kubernetes API server. | No | +| namespaces | []string | An array of Kubernetes namespaces which the IngressController needs to watch, all namespaces are watched if left empty. | No | +| ingressClass | string | The IngressController only handles `Ingresses` with `ingressClassName` set to the value of this option. | No (default: easegress) | +| httpServer | [httpserver.Spec](#httpserver) | Basic configuration for the shared HTTP traffic gate. The routing rules will be generated dynamically according to Kubernetes ingresses and should not be specified here. | Yes | **Note**: IngressController uses `kubeConfig` and `masterURL` to connect to Kubernetes, at least one of them must be specified when deployed outside of a Kubernetes cluster, and both are optional when deployed inside a cluster. @@ -262,6 +263,18 @@ syncInterval: 10s | Prefix | string | Prefix of services | Yes (default: /) | | syncInterval | string | Interval to synchronize data | Yes (default: 10s) | +### NacosServiceRegistry + +NacosServiceRegistry supports service discovery for Nacos as backend. The config looks like: + +| Name | Type | Description | Required | +| ------------ | ------------------------------------- | ---------------------------- | ------------------ | +| servers | [][nacosServerSpec](#nacosserverspec) | Servers of Nacos | Yes | +| syncInterval | string | Interval to synchronize data | Yes (default: 10s) | +| namespace | string | The namespace of Nacos | No | +| username | string | The username of client | No | +| password | string | The password of client | No | + ## Common Types ### tracing.Spec @@ -324,9 +337,9 @@ There must be at least one of `values` and `regexp`. ### httppipeline.Flow -| Name | Type | Description | Required | -| ------ | ----------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | -------- | -| filter | string | The filter name | Yes | +| Name | Type | Description | Required | +| ------ | ----------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | -------- | +| filter | string | The filter name | Yes | | jumpIf | map[string]string | Jump to another filter conditionally, the key is the result of the current filter, the value is the jumping filter name. `END` is the built-in value for the ending of the pipeline | No | ### httppipeline.Filter @@ -345,3 +358,12 @@ The self-defining specification of each filter references to [filters](./filters | ------- | -------- | ---------------- | ----------------------------- | | brokers | []string | Broker addresses | Yes (default: localhost:9092) | | topic | string | Produce topic | Yes | + +### nacos.ServerSpec + +| Name | Type | Description | Required | +| ----------- | ------ | -------------------------------------------- | -------- | +| ipAddr | string | The ip address | Yes | +| port | uint16 | The port | Yes | +| scheme | string | The scheme of protocol (support http, https) | No | +| contextPath | string | The context path | No | diff --git a/example/config/server-001-instance-003.yaml b/example/config/server-001-instance-003.yaml new file mode 100644 index 0000000000..0d4484867e --- /dev/null +++ b/example/config/server-001-instance-003.yaml @@ -0,0 +1,7 @@ +registryName: etcd-service-registry-example +serviceName: service-001 +instanceID: intance-003 +scheme: http +hostIP: 127.0.0.1 +port: 9093 +tags: [v3] diff --git a/example/config/server-002-instance-001.yaml b/example/config/server-002-instance-001.yaml new file mode 100644 index 0000000000..99f1ff902a --- /dev/null +++ b/example/config/server-002-instance-001.yaml @@ -0,0 +1,5 @@ +registryName: mesh-controller-example +serviceName: service-002 +instanceID: instance-001 +ip: 127.0.0.1 +port: 9094 diff --git a/pkg/object/meshcontroller/master/registrysyncer.go b/pkg/object/meshcontroller/master/registrysyncer.go index 590f8de54e..b1d42104a9 100644 --- a/pkg/object/meshcontroller/master/registrysyncer.go +++ b/pkg/object/meshcontroller/master/registrysyncer.go @@ -95,6 +95,8 @@ func (rs *registrySyncer) handleEvent(event *serviceregistry.RegistryEvent) { }() if event.UseReplace { + event.Replace = rs.filterExternalInstances(event.Replace, rs.externalRegistryName()) + oldInstances := rs.service.ListAllServiceInstanceSpecs() for _, oldInstance := range oldInstances { if oldInstance.RegistryName != rs.externalRegistryName() { @@ -106,8 +108,8 @@ func (rs *registrySyncer) handleEvent(event *serviceregistry.RegistryEvent) { ServiceName: oldInstance.ServiceName, InstanceID: oldInstance.InstanceID, } - _, existed := event.Replace[instance.Key()] - if !existed { + _, exists := event.Replace[instance.Key()] + if !exists { rs.service.DeleteServiceInstanceSpec(oldInstance.ServiceName, oldInstance.InstanceID) } } @@ -115,9 +117,13 @@ func (rs *registrySyncer) handleEvent(event *serviceregistry.RegistryEvent) { for _, instance := range event.Replace { rs.service.PutServiceInstanceSpec(rs.externalToMeshInstance(instance)) } + return } + event.Delete = rs.filterExternalInstances(event.Delete, rs.externalRegistryName()) + event.Apply = rs.filterExternalInstances(event.Apply, rs.externalRegistryName()) + for _, instance := range event.Delete { rs.service.DeleteServiceInstanceSpec(instance.ServiceName, instance.InstanceID) } @@ -132,17 +138,26 @@ func (rs *registrySyncer) serviceInstanceSpecsFunc(meshInstances map[string]*spe logger.Errorf("list all service instances of %s: %v", rs.spec.ExternalServiceRegistry, err) return true } + oldInstances = rs.filterExternalInstances(oldInstances, rs.meshRegistryName()) - meshInstances = rs.filterMeshInstances(meshInstances, "", rs.meshRegistryName()) + meshInstances = rs.filterMeshInstances(meshInstances, rs.meshRegistryName()) newInstances := rs.meshToExternalInstances(meshInstances) event := serviceregistry.NewRegistryEventFromDiff(rs.meshRegistryName(), oldInstances, newInstances) + if len(event.Apply) != 0 { - rs.serviceRegistry.ApplyServiceInstances(rs.externalRegistryName(), event.Apply) + err := rs.serviceRegistry.ApplyServiceInstances(rs.externalRegistryName(), event.Apply) + if err != nil { + logger.Errorf("apply service instances failed: %v", err) + return true + } } if len(event.Delete) != 0 { - rs.serviceRegistry.DeleteServiceInstances(rs.externalRegistryName(), event.Delete) + err := rs.serviceRegistry.DeleteServiceInstances(rs.externalRegistryName(), event.Delete) + if err != nil { + logger.Errorf("delete service instances failed: %v", err) + } } return true @@ -173,7 +188,7 @@ func (rs *registrySyncer) filterMeshInstances(instances map[string]*spec.Service for _, instance := range instances { if stringtool.StrInSlice(instance.RegistryName, registryNames) { - result[instance.InstanceID] = instance + result[instance.Key()] = instance } } diff --git a/pkg/object/meshcontroller/registrycenter/registry.go b/pkg/object/meshcontroller/registrycenter/registry.go index b74872a8aa..91d679b778 100644 --- a/pkg/object/meshcontroller/registrycenter/registry.go +++ b/pkg/object/meshcontroller/registrycenter/registry.go @@ -50,6 +50,7 @@ type ( RegistryType string registered bool + registryName string serviceName string instanceID string IP string @@ -68,10 +69,11 @@ type ( ) // NewRegistryCenterServer creates a initialized registry center server. -func NewRegistryCenterServer(registryType string, serviceName string, IP string, port int, instanceID string, +func NewRegistryCenterServer(registryType string, registryName, serviceName string, IP string, port int, instanceID string, serviceLabels map[string]string, service *service.Service) *Server { return &Server{ RegistryType: registryType, + registryName: registryName, serviceName: serviceName, service: service, registered: false, @@ -105,11 +107,12 @@ func (rcs *Server) Register(serviceSpec *spec.Service, ingressReady ReadyFunc, e } ins := &spec.ServiceInstanceSpec{ - ServiceName: rcs.serviceName, - InstanceID: rcs.instanceID, - IP: rcs.IP, - Port: uint32(serviceSpec.Sidecar.IngressPort), - Labels: rcs.serviceLabels, + RegistryName: rcs.registryName, + ServiceName: rcs.serviceName, + InstanceID: rcs.instanceID, + IP: rcs.IP, + Port: uint32(serviceSpec.Sidecar.IngressPort), + Labels: rcs.serviceLabels, } go rcs.register(ins, ingressReady, egressReady) diff --git a/pkg/object/meshcontroller/spec/spec.go b/pkg/object/meshcontroller/spec/spec.go index d1cfe2fd20..2905992fd5 100644 --- a/pkg/object/meshcontroller/spec/spec.go +++ b/pkg/object/meshcontroller/spec/spec.go @@ -224,9 +224,9 @@ type ( } // ServiceInstanceSpec is the spec of service instance. + // FIXME: Use the unified struct: serviceregistry.ServiceInstanceSpec. ServiceInstanceSpec struct { - // Backward compatibility: empty RegistryName means it is a mesh service. - RegistryName string + RegistryName string `yaml:"registryName" jsonschema:"required"` // Provide by registry client ServiceName string `yaml:"serviceName" jsonschema:"required"` InstanceID string `yaml:"instanceID" jsonschema:"required"` @@ -287,6 +287,11 @@ func (a Admin) Validate() error { return nil } +// Key returns the key of ServiceInstanceSpec. +func (s *ServiceInstanceSpec) Key() string { + return fmt.Sprintf("%s/%s/%s", s.RegistryName, s.ServiceName, s.InstanceID) +} + func newPipelineSpecBuilder(name string) *pipelineSpecBuilder { return &pipelineSpecBuilder{ Kind: httppipeline.Kind, diff --git a/pkg/object/meshcontroller/worker/worker.go b/pkg/object/meshcontroller/worker/worker.go index f7e526292a..1a88c4dd38 100644 --- a/pkg/object/meshcontroller/worker/worker.go +++ b/pkg/object/meshcontroller/worker/worker.go @@ -117,7 +117,7 @@ func New(superSpec *supervisor.Spec) *Worker { store := storage.New(superSpec.Name(), super.Cluster()) _service := service.New(superSpec) registryCenterServer := registrycenter.NewRegistryCenterServer(spec.RegistryType, - serviceName, applicationIP, applicationPort, instanceID, serviceLabels, _service) + superSpec.Name(), serviceName, applicationIP, applicationPort, instanceID, serviceLabels, _service) inf := informer.NewInformer(store, serviceName) ingressServer := NewIngressServer(superSpec, super, serviceName, inf) diff --git a/pkg/object/nacosserviceregistry/nacosserviceregistry.go b/pkg/object/nacosserviceregistry/nacosserviceregistry.go index ee8ab8f20b..b2d2a43876 100644 --- a/pkg/object/nacosserviceregistry/nacosserviceregistry.go +++ b/pkg/object/nacosserviceregistry/nacosserviceregistry.go @@ -75,11 +75,11 @@ type ( // Spec describes the NacosServiceRegistry. Spec struct { + Servers []*ServerSpec `yaml:"servers" jsonschema:"required"` SyncInterval string `yaml:"syncInterval" jsonschema:"required,format=duration"` Namespace string `yaml:"namespace" jsonschema:"omitempty"` - Username string `yaml:"username"` - Password string `yaml:"password"` - Servers []*ServerSpec `yaml:"servers"` + Username string `yaml:"username" jsonschema:"omitempty"` + Password string `yaml:"password" jsonschema:"omitempty"` } // ServerSpec is the server config of Nacos. diff --git a/pkg/object/serviceregistry/service.go b/pkg/object/serviceregistry/service.go index ef9685b85b..b585f055f9 100644 --- a/pkg/object/serviceregistry/service.go +++ b/pkg/object/serviceregistry/service.go @@ -133,23 +133,30 @@ func NewRegistryEventFromDiff(registryName string, oldSpecs, newSpecs map[string event := &RegistryEvent{ SourceRegistryName: registryName, - - Delete: make(map[string]*ServiceInstanceSpec), - Apply: make(map[string]*ServiceInstanceSpec), } for _, oldSpec := range oldSpecs { _, exists := newSpecs[oldSpec.Key()] if !exists { copy := oldSpec.DeepCopy() + + if event.Delete == nil { + event.Delete = make(map[string]*ServiceInstanceSpec) + } + event.Delete[oldSpec.Key()] = copy } } for _, newSpec := range newSpecs { oldSpec, exists := oldSpecs[newSpec.Key()] - if exists && !reflect.DeepEqual(oldSpec, newSpec) { + if !exists || !reflect.DeepEqual(oldSpec, newSpec) { copy := newSpec.DeepCopy() + + if event.Apply == nil { + event.Apply = make(map[string]*ServiceInstanceSpec) + } + event.Apply[newSpec.Key()] = copy } } diff --git a/pkg/object/serviceregistry/service_test.go b/pkg/object/serviceregistry/service_test.go new file mode 100644 index 0000000000..ff283fca24 --- /dev/null +++ b/pkg/object/serviceregistry/service_test.go @@ -0,0 +1,59 @@ +package serviceregistry + +import ( + "reflect" + "testing" +) + +func TestNewRegistryEventFromDiff(t *testing.T) { + olds := []*ServiceInstanceSpec{ + { + RegistryName: "registry-test1", + ServiceName: "service-test1", + InstanceID: "instance-test1", + }, + } + + news := []*ServiceInstanceSpec{ + { + RegistryName: "registry-test1", + ServiceName: "service-test1", + InstanceID: "instance-test2", + }, + } + + oldSpecs := map[string]*ServiceInstanceSpec{} + for _, oldSpec := range olds { + oldSpecs[oldSpec.Key()] = oldSpec + } + + newSpecs := map[string]*ServiceInstanceSpec{} + for _, newSpec := range news { + newSpecs[newSpec.Key()] = newSpec + } + + wantEvent := &RegistryEvent{ + SourceRegistryName: "registry-test1", + Delete: map[string]*ServiceInstanceSpec{ + "registry-test1/service-test1/instance-test1": { + RegistryName: "registry-test1", + ServiceName: "service-test1", + InstanceID: "instance-test1", + }, + }, + + Apply: map[string]*ServiceInstanceSpec{ + "registry-test1/service-test1/instance-test2": { + RegistryName: "registry-test1", + ServiceName: "service-test1", + InstanceID: "instance-test2", + }, + }, + } + + gotEvent := NewRegistryEventFromDiff("registry-test1", oldSpecs, newSpecs) + + if !reflect.DeepEqual(wantEvent, gotEvent) { + t.Fatalf("registry event:\nwant %+v\ngot %+v", wantEvent, gotEvent) + } +} diff --git a/pkg/object/serviceregistry/serviceregistry.go b/pkg/object/serviceregistry/serviceregistry.go index 1451376495..cfc8f078c0 100644 --- a/pkg/object/serviceregistry/serviceregistry.go +++ b/pkg/object/serviceregistry/serviceregistry.go @@ -269,10 +269,6 @@ func (sr *ServiceRegistry) _applyServiceInstances(registryName string, serviceIn return fmt.Errorf("%s not found", registryName) } - for _, instance := range serviceInstances { - instance.RegistryName = registryName - } - return bucket.registry.ApplyServiceInstances(serviceInstances) } diff --git a/pkg/object/serviceregistry/watcher.go b/pkg/object/serviceregistry/watcher.go index 440f517860..3233cb9abe 100644 --- a/pkg/object/serviceregistry/watcher.go +++ b/pkg/object/serviceregistry/watcher.go @@ -27,6 +27,7 @@ import ( type ( // ServiceEvent is the event of service. // It concludes complete instances of the service. + // NOTE: Changing inner fields needs to adapt to its methods DeepCopy, etc. ServiceEvent struct { // SourceRegistryName is the registry which caused the event, // the RegistryName of specs may not be the same with it. @@ -36,6 +37,7 @@ type ( // RegistryEvent is the event of service registry. // If UseReplace is true, the event handler should use Replace field even it is empty. + // NOTE: Changing inner fields needs to adapt to its methods Empty, DeepCopy, Validate, etc. RegistryEvent struct { // SourceRegistryName is the registry which caused the event, // the RegistryName of specs may not be the same with it. @@ -311,6 +313,7 @@ func (e *ServiceEvent) DeepCopy() *ServiceEvent { func (e *RegistryEvent) DeepCopy() *RegistryEvent { copy := &RegistryEvent{ SourceRegistryName: e.SourceRegistryName, + UseReplace: e.UseReplace, Replace: e.Replace, }