Skip to content

Commit

Permalink
[doc]: Add NacosServiceRegistry
Browse files Browse the repository at this point in the history
  • Loading branch information
xxx7xxxx committed Aug 24, 2021
1 parent 587520a commit eff8730
Show file tree
Hide file tree
Showing 12 changed files with 161 additions and 39 deletions.
48 changes: 35 additions & 13 deletions doc/controllers.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

# Controllers

- [Controllers](#controllers)
Expand All @@ -17,6 +16,7 @@
- [EtcdServiceRegistry](#etcdserviceregistry)
- [EurekaServiceRegistry](#eurekaserviceregistry)
- [ZookeeperServiceRegistry](#zookeeperserviceregistry)
- [NacosServiceRegistry](#nacosserviceregistry)
- [Common Types](#common-types)
- [tracing.Spec](#tracingspec)
- [zipkin.Spec](#zipkinspec)
Expand All @@ -27,6 +27,7 @@
- [httppipeline.Flow](#httppipelineflow)
- [httppipeline.Filter](#httppipelinefilter)
- [easemonitormetrics.Kafka](#easemonitormetricskafka)
- [nacos.ServerSpec](#nacosserverspec)

As the [architecture diagram](./architecture.png) shows, the controller is the core entity to control kinds of working. There are two kinds of controllers overall:

Expand Down Expand Up @@ -151,18 +152,18 @@ ingressClass: easegress
httpServer:
port: 8080
https: false
keepAlive: true
keepAliveTimeout: 60s
keepAlive: true
keepAliveTimeout: 60s
maxConnections: 10240
```
| Name | Type | Description | Required |
| ------------ | -------- | ------------------------------------------------------------------------- | --------------------- |
| kubeConfig | string | Path of the Kubernetes configuration file. | No |
| masterURL | string | The address of the Kubernetes API server. | No |
| namespaces | []string | An array of Kubernetes namespaces which the IngressController needs to watch, all namespaces are watched if left empty. | No |
| ingressClass | string | The IngressController only handles `Ingresses` with `ingressClassName` set to the value of this option. | No (default: easegress) |
| httpServer | [httpserver.Spec](#httpserver) | Basic configuration for the shared HTTP traffic gate. The routing rules will be generated dynamically according to Kubernetes ingresses and should not be specified here. | Yes |
| Name | Type | Description | Required |
| ------------ | ------------------------------ | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ----------------------- |
| kubeConfig | string | Path of the Kubernetes configuration file. | No |
| masterURL | string | The address of the Kubernetes API server. | No |
| namespaces | []string | An array of Kubernetes namespaces which the IngressController needs to watch, all namespaces are watched if left empty. | No |
| ingressClass | string | The IngressController only handles `Ingresses` with `ingressClassName` set to the value of this option. | No (default: easegress) |
| httpServer | [httpserver.Spec](#httpserver) | Basic configuration for the shared HTTP traffic gate. The routing rules will be generated dynamically according to Kubernetes ingresses and should not be specified here. | Yes |

**Note**: IngressController uses `kubeConfig` and `masterURL` to connect to Kubernetes, at least one of them must be specified when deployed outside of a Kubernetes cluster, and both are optional when deployed inside a cluster.

Expand Down Expand Up @@ -262,6 +263,18 @@ syncInterval: 10s
| Prefix | string | Prefix of services | Yes (default: /) |
| syncInterval | string | Interval to synchronize data | Yes (default: 10s) |

### NacosServiceRegistry

NacosServiceRegistry supports service discovery for Nacos as backend. The config looks like:

| Name | Type | Description | Required |
| ------------ | ------------------------------------- | ---------------------------- | ------------------ |
| servers | [][nacosServerSpec](#nacosserverspec) | Servers of Nacos | Yes |
| syncInterval | string | Interval to synchronize data | Yes (default: 10s) |
| namespace | string | The namespace of Nacos | No |
| username | string | The username of client | No |
| password | string | The password of client | No |

## Common Types

### tracing.Spec
Expand Down Expand Up @@ -324,9 +337,9 @@ There must be at least one of `values` and `regexp`.

### httppipeline.Flow

| Name | Type | Description | Required |
| ------ | ----------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | -------- |
| filter | string | The filter name | Yes |
| Name | Type | Description | Required |
| ------ | ----------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | -------- |
| filter | string | The filter name | Yes |
| jumpIf | map[string]string | Jump to another filter conditionally, the key is the result of the current filter, the value is the jumping filter name. `END` is the built-in value for the ending of the pipeline | No |

### httppipeline.Filter
Expand All @@ -345,3 +358,12 @@ The self-defining specification of each filter references to [filters](./filters
| ------- | -------- | ---------------- | ----------------------------- |
| brokers | []string | Broker addresses | Yes (default: localhost:9092) |
| topic | string | Produce topic | Yes |

### nacos.ServerSpec

| Name | Type | Description | Required |
| ----------- | ------ | -------------------------------------------- | -------- |
| ipAddr | string | The ip address | Yes |
| port | uint16 | The port | Yes |
| scheme | string | The scheme of protocol (support http, https) | No |
| contextPath | string | The context path | No |
7 changes: 7 additions & 0 deletions example/config/server-001-instance-003.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
registryName: etcd-service-registry-example
serviceName: service-001
instanceID: intance-003
scheme: http
hostIP: 127.0.0.1
port: 9093
tags: [v3]
5 changes: 5 additions & 0 deletions example/config/server-002-instance-001.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
registryName: mesh-controller-example
serviceName: service-002
instanceID: instance-001
ip: 127.0.0.1
port: 9094
27 changes: 21 additions & 6 deletions pkg/object/meshcontroller/master/registrysyncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ func (rs *registrySyncer) handleEvent(event *serviceregistry.RegistryEvent) {
}()

if event.UseReplace {
event.Replace = rs.filterExternalInstances(event.Replace, rs.externalRegistryName())

oldInstances := rs.service.ListAllServiceInstanceSpecs()
for _, oldInstance := range oldInstances {
if oldInstance.RegistryName != rs.externalRegistryName() {
Expand All @@ -106,18 +108,22 @@ func (rs *registrySyncer) handleEvent(event *serviceregistry.RegistryEvent) {
ServiceName: oldInstance.ServiceName,
InstanceID: oldInstance.InstanceID,
}
_, existed := event.Replace[instance.Key()]
if !existed {
_, exists := event.Replace[instance.Key()]
if !exists {
rs.service.DeleteServiceInstanceSpec(oldInstance.ServiceName, oldInstance.InstanceID)
}
}

for _, instance := range event.Replace {
rs.service.PutServiceInstanceSpec(rs.externalToMeshInstance(instance))
}

return
}

event.Delete = rs.filterExternalInstances(event.Delete, rs.externalRegistryName())
event.Apply = rs.filterExternalInstances(event.Apply, rs.externalRegistryName())

for _, instance := range event.Delete {
rs.service.DeleteServiceInstanceSpec(instance.ServiceName, instance.InstanceID)
}
Expand All @@ -132,17 +138,26 @@ func (rs *registrySyncer) serviceInstanceSpecsFunc(meshInstances map[string]*spe
logger.Errorf("list all service instances of %s: %v", rs.spec.ExternalServiceRegistry, err)
return true
}

oldInstances = rs.filterExternalInstances(oldInstances, rs.meshRegistryName())

meshInstances = rs.filterMeshInstances(meshInstances, "", rs.meshRegistryName())
meshInstances = rs.filterMeshInstances(meshInstances, rs.meshRegistryName())
newInstances := rs.meshToExternalInstances(meshInstances)

event := serviceregistry.NewRegistryEventFromDiff(rs.meshRegistryName(), oldInstances, newInstances)

if len(event.Apply) != 0 {
rs.serviceRegistry.ApplyServiceInstances(rs.externalRegistryName(), event.Apply)
err := rs.serviceRegistry.ApplyServiceInstances(rs.externalRegistryName(), event.Apply)
if err != nil {
logger.Errorf("apply service instances failed: %v", err)
return true
}
}
if len(event.Delete) != 0 {
rs.serviceRegistry.DeleteServiceInstances(rs.externalRegistryName(), event.Delete)
err := rs.serviceRegistry.DeleteServiceInstances(rs.externalRegistryName(), event.Delete)
if err != nil {
logger.Errorf("delete service instances failed: %v", err)
}
}

return true
Expand Down Expand Up @@ -173,7 +188,7 @@ func (rs *registrySyncer) filterMeshInstances(instances map[string]*spec.Service

for _, instance := range instances {
if stringtool.StrInSlice(instance.RegistryName, registryNames) {
result[instance.InstanceID] = instance
result[instance.Key()] = instance
}
}

Expand Down
15 changes: 9 additions & 6 deletions pkg/object/meshcontroller/registrycenter/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ type (
RegistryType string
registered bool

registryName string
serviceName string
instanceID string
IP string
Expand All @@ -68,10 +69,11 @@ type (
)

// NewRegistryCenterServer creates a initialized registry center server.
func NewRegistryCenterServer(registryType string, serviceName string, IP string, port int, instanceID string,
func NewRegistryCenterServer(registryType string, registryName, serviceName string, IP string, port int, instanceID string,
serviceLabels map[string]string, service *service.Service) *Server {
return &Server{
RegistryType: registryType,
registryName: registryName,
serviceName: serviceName,
service: service,
registered: false,
Expand Down Expand Up @@ -105,11 +107,12 @@ func (rcs *Server) Register(serviceSpec *spec.Service, ingressReady ReadyFunc, e
}

ins := &spec.ServiceInstanceSpec{
ServiceName: rcs.serviceName,
InstanceID: rcs.instanceID,
IP: rcs.IP,
Port: uint32(serviceSpec.Sidecar.IngressPort),
Labels: rcs.serviceLabels,
RegistryName: rcs.registryName,
ServiceName: rcs.serviceName,
InstanceID: rcs.instanceID,
IP: rcs.IP,
Port: uint32(serviceSpec.Sidecar.IngressPort),
Labels: rcs.serviceLabels,
}

go rcs.register(ins, ingressReady, egressReady)
Expand Down
9 changes: 7 additions & 2 deletions pkg/object/meshcontroller/spec/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,9 +224,9 @@ type (
}

// ServiceInstanceSpec is the spec of service instance.
// FIXME: Use the unified struct: serviceregistry.ServiceInstanceSpec.
ServiceInstanceSpec struct {
// Backward compatibility: empty RegistryName means it is a mesh service.
RegistryName string
RegistryName string `yaml:"registryName" jsonschema:"required"`
// Provide by registry client
ServiceName string `yaml:"serviceName" jsonschema:"required"`
InstanceID string `yaml:"instanceID" jsonschema:"required"`
Expand Down Expand Up @@ -287,6 +287,11 @@ func (a Admin) Validate() error {
return nil
}

// Key returns the key of ServiceInstanceSpec.
func (s *ServiceInstanceSpec) Key() string {
return fmt.Sprintf("%s/%s/%s", s.RegistryName, s.ServiceName, s.InstanceID)
}

func newPipelineSpecBuilder(name string) *pipelineSpecBuilder {
return &pipelineSpecBuilder{
Kind: httppipeline.Kind,
Expand Down
2 changes: 1 addition & 1 deletion pkg/object/meshcontroller/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func New(superSpec *supervisor.Spec) *Worker {
store := storage.New(superSpec.Name(), super.Cluster())
_service := service.New(superSpec)
registryCenterServer := registrycenter.NewRegistryCenterServer(spec.RegistryType,
serviceName, applicationIP, applicationPort, instanceID, serviceLabels, _service)
superSpec.Name(), serviceName, applicationIP, applicationPort, instanceID, serviceLabels, _service)

inf := informer.NewInformer(store, serviceName)
ingressServer := NewIngressServer(superSpec, super, serviceName, inf)
Expand Down
6 changes: 3 additions & 3 deletions pkg/object/nacosserviceregistry/nacosserviceregistry.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,11 @@ type (

// Spec describes the NacosServiceRegistry.
Spec struct {
Servers []*ServerSpec `yaml:"servers" jsonschema:"required"`
SyncInterval string `yaml:"syncInterval" jsonschema:"required,format=duration"`
Namespace string `yaml:"namespace" jsonschema:"omitempty"`
Username string `yaml:"username"`
Password string `yaml:"password"`
Servers []*ServerSpec `yaml:"servers"`
Username string `yaml:"username" jsonschema:"omitempty"`
Password string `yaml:"password" jsonschema:"omitempty"`
}

// ServerSpec is the server config of Nacos.
Expand Down
15 changes: 11 additions & 4 deletions pkg/object/serviceregistry/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,23 +133,30 @@ func NewRegistryEventFromDiff(registryName string, oldSpecs, newSpecs map[string

event := &RegistryEvent{
SourceRegistryName: registryName,

Delete: make(map[string]*ServiceInstanceSpec),
Apply: make(map[string]*ServiceInstanceSpec),
}

for _, oldSpec := range oldSpecs {
_, exists := newSpecs[oldSpec.Key()]
if !exists {
copy := oldSpec.DeepCopy()

if event.Delete == nil {
event.Delete = make(map[string]*ServiceInstanceSpec)
}

event.Delete[oldSpec.Key()] = copy
}
}

for _, newSpec := range newSpecs {
oldSpec, exists := oldSpecs[newSpec.Key()]
if exists && !reflect.DeepEqual(oldSpec, newSpec) {
if !exists || !reflect.DeepEqual(oldSpec, newSpec) {
copy := newSpec.DeepCopy()

if event.Apply == nil {
event.Apply = make(map[string]*ServiceInstanceSpec)
}

event.Apply[newSpec.Key()] = copy
}
}
Expand Down
59 changes: 59 additions & 0 deletions pkg/object/serviceregistry/service_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package serviceregistry

import (
"reflect"
"testing"
)

func TestNewRegistryEventFromDiff(t *testing.T) {
olds := []*ServiceInstanceSpec{
{
RegistryName: "registry-test1",
ServiceName: "service-test1",
InstanceID: "instance-test1",
},
}

news := []*ServiceInstanceSpec{
{
RegistryName: "registry-test1",
ServiceName: "service-test1",
InstanceID: "instance-test2",
},
}

oldSpecs := map[string]*ServiceInstanceSpec{}
for _, oldSpec := range olds {
oldSpecs[oldSpec.Key()] = oldSpec
}

newSpecs := map[string]*ServiceInstanceSpec{}
for _, newSpec := range news {
newSpecs[newSpec.Key()] = newSpec
}

wantEvent := &RegistryEvent{
SourceRegistryName: "registry-test1",
Delete: map[string]*ServiceInstanceSpec{
"registry-test1/service-test1/instance-test1": {
RegistryName: "registry-test1",
ServiceName: "service-test1",
InstanceID: "instance-test1",
},
},

Apply: map[string]*ServiceInstanceSpec{
"registry-test1/service-test1/instance-test2": {
RegistryName: "registry-test1",
ServiceName: "service-test1",
InstanceID: "instance-test2",
},
},
}

gotEvent := NewRegistryEventFromDiff("registry-test1", oldSpecs, newSpecs)

if !reflect.DeepEqual(wantEvent, gotEvent) {
t.Fatalf("registry event:\nwant %+v\ngot %+v", wantEvent, gotEvent)
}
}
4 changes: 0 additions & 4 deletions pkg/object/serviceregistry/serviceregistry.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,10 +269,6 @@ func (sr *ServiceRegistry) _applyServiceInstances(registryName string, serviceIn
return fmt.Errorf("%s not found", registryName)
}

for _, instance := range serviceInstances {
instance.RegistryName = registryName
}

return bucket.registry.ApplyServiceInstances(serviceInstances)
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/object/serviceregistry/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
type (
// ServiceEvent is the event of service.
// It concludes complete instances of the service.
// NOTE: Changing inner fields needs to adapt to its methods DeepCopy, etc.
ServiceEvent struct {
// SourceRegistryName is the registry which caused the event,
// the RegistryName of specs may not be the same with it.
Expand All @@ -36,6 +37,7 @@ type (

// RegistryEvent is the event of service registry.
// If UseReplace is true, the event handler should use Replace field even it is empty.
// NOTE: Changing inner fields needs to adapt to its methods Empty, DeepCopy, Validate, etc.
RegistryEvent struct {
// SourceRegistryName is the registry which caused the event,
// the RegistryName of specs may not be the same with it.
Expand Down Expand Up @@ -311,6 +313,7 @@ func (e *ServiceEvent) DeepCopy() *ServiceEvent {
func (e *RegistryEvent) DeepCopy() *RegistryEvent {
copy := &RegistryEvent{
SourceRegistryName: e.SourceRegistryName,
UseReplace: e.UseReplace,
Replace: e.Replace,
}

Expand Down

0 comments on commit eff8730

Please sign in to comment.