diff --git a/go.mod b/go.mod index 3cc398eaac..ad2c199e3f 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,7 @@ require ( github.com/Shopify/sarama v1.29.1 github.com/alecthomas/jsonschema v0.0.0-20210526225647-edb03dcab7bc github.com/bytecodealliance/wasmtime-go v0.28.0 + github.com/davecgh/go-spew v1.1.1 github.com/facebookgo/ensure v0.0.0-20200202191622-63f1cf65ac4c // indirect github.com/facebookgo/freeport v0.0.0-20150612182905-d4adf43b75b9 // indirect github.com/facebookgo/stack v0.0.0-20160209184415-751773369052 // indirect diff --git a/pkg/filter/proxy/server_test.go b/pkg/filter/proxy/server_test.go index 8b1886b051..5a0a767ea1 100644 --- a/pkg/filter/proxy/server_test.go +++ b/pkg/filter/proxy/server_test.go @@ -314,25 +314,37 @@ func TestStaticServers(t *testing.T) { func TestDynamicService(t *testing.T) { loadBalance := &LoadBalance{Policy: PolicyRandom} + configServers := []*Server{ + { + URL: "http://127.0.0.1:8888", + Tags: []string{"static"}, + }, + } s := &servers{ poolSpec: &PoolSpec{ LoadBalance: loadBalance, + Servers: configServers, }, } + s.useService(nil) + if !reflect.DeepEqual(s.static.servers, configServers) { + t.Fatalf("static servers want %+v, got %+v", configServers, s.static.servers) + } + instances := []*serviceregistry.ServiceInstanceSpec{ { RegistryName: "registry-test1", ServiceName: "service-test1", InstanceID: "instance-test1", - HostIP: "127.0.0.1", + Address: "127.0.0.1", Port: 1111, }, { RegistryName: "registry-test1", ServiceName: "service-test1", InstanceID: "instance-test2", - HostIP: "127.0.0.1", + Address: "127.0.0.1", Port: 2222, }, } diff --git a/pkg/object/consulserviceregistry/client.go b/pkg/object/consulserviceregistry/client.go new file mode 100644 index 0000000000..99d6e8fe14 --- /dev/null +++ b/pkg/object/consulserviceregistry/client.go @@ -0,0 +1,64 @@ +package consulserviceregistry + +import ( + "fmt" + + "github.com/hashicorp/consul/api" +) + +type ( + // consulClient is the common client interface for consul. + consulClient interface { + ServiceRegister(registration *api.AgentServiceRegistration) error + ServiceDeregister(instanceID string) error + ListServiceInstances(serviceName string) ([]*api.CatalogService, error) + ListAllServiceInstances() ([]*api.CatalogService, error) + } + + consulAPIClient struct { + client *api.Client + } +) + +func newConsulAPIClient(client *api.Client) *consulAPIClient { + return &consulAPIClient{ + client: client, + } +} + +func (c *consulAPIClient) ServiceRegister(registration *api.AgentServiceRegistration) error { + return c.client.Agent().ServiceRegister(registration) +} + +func (c *consulAPIClient) ServiceDeregister(instanceID string) error { + return c.client.Agent().ServiceDeregister(instanceID) +} + +func (c *consulAPIClient) ListServiceInstances(serviceName string) ([]*api.CatalogService, error) { + resp, _, err := c.client.Catalog().Service(serviceName, "", &api.QueryOptions{}) + if err != nil { + return nil, err + } + return resp, nil +} + +func (c *consulAPIClient) ListAllServiceInstances() ([]*api.CatalogService, error) { + resp, _, err := c.client.Catalog().Services(&api.QueryOptions{}) + if err != nil { + return nil, fmt.Errorf("pull catalog services failed: %v", err) + } + + catalogServices := []*api.CatalogService{} + for serviceName := range resp { + services, _, err := c.client.Catalog().Service(serviceName, "", &api.QueryOptions{}) + if err != nil { + return nil, fmt.Errorf("pull catalog service %s failed: %v", serviceName, err) + } + + for _, service := range services { + catalogServices = append(catalogServices, service) + } + } + + return catalogServices, nil +} diff --git a/pkg/object/consulserviceregistry/consulserviceregistry.go b/pkg/object/consulserviceregistry/consulserviceregistry.go index 385f6565bf..712edd5d59 100644 --- a/pkg/object/consulserviceregistry/consulserviceregistry.go +++ b/pkg/object/consulserviceregistry/consulserviceregistry.go @@ -58,7 +58,7 @@ type ( notify chan *serviceregistry.RegistryEvent clientMutex sync.RWMutex - client *api.Client + client consulClient statusMutex sync.Mutex instancesNum map[string]int @@ -134,7 +134,7 @@ func (c *ConsulServiceRegistry) reload() { go c.run() } -func (c *ConsulServiceRegistry) getClient() (*api.Client, error) { +func (c *ConsulServiceRegistry) getClient() (consulClient, error) { c.clientMutex.RLock() if c.client != nil { client := c.client @@ -146,7 +146,7 @@ func (c *ConsulServiceRegistry) getClient() (*api.Client, error) { return c.buildClient() } -func (c *ConsulServiceRegistry) buildClient() (*api.Client, error) { +func (c *ConsulServiceRegistry) buildClient() (consulClient, error) { c.clientMutex.Lock() defer c.clientMutex.Unlock() @@ -176,9 +176,9 @@ func (c *ConsulServiceRegistry) buildClient() (*api.Client, error) { return nil, err } - c.client = client + c.client = newConsulAPIClient(client) - return client, nil + return c.client, nil } func (c *ConsulServiceRegistry) closeClient() { @@ -305,7 +305,7 @@ func (c *ConsulServiceRegistry) ApplyServiceInstances(instances map[string]*serv for _, instance := range instances { registration := c.serviceInstanceToRegistration(instance) - err = client.Agent().ServiceRegister(registration) + err = client.ServiceRegister(registration) if err != nil { return err } @@ -323,7 +323,7 @@ func (c *ConsulServiceRegistry) DeleteServiceInstances(instances map[string]*ser } for _, instance := range instances { - err := client.Agent().ServiceDeregister(instance.InstanceID) + err := client.ServiceDeregister(instance.InstanceID) if err != nil { return err } @@ -356,7 +356,7 @@ func (c *ConsulServiceRegistry) ListServiceInstances(serviceName string) (map[st c.superSpec.Name(), err) } - catalogServices, _, err := client.Catalog().Service(serviceName, "", &api.QueryOptions{}) + catalogServices, err := client.ListServiceInstances(serviceName) if err != nil { return nil, err @@ -384,28 +384,20 @@ func (c *ConsulServiceRegistry) ListAllServiceInstances() (map[string]*servicere c.superSpec.Name(), err) } - resp, _, err := client.Catalog().Services(&api.QueryOptions{}) + catalogServices, err := client.ListAllServiceInstances() if err != nil { return nil, fmt.Errorf("%s pull catalog services failed: %v", c.superSpec.Name(), err) } instances := make(map[string]*serviceregistry.ServiceInstanceSpec) - for serviceName := range resp { - services, _, err := client.Catalog().Service(serviceName, "", &api.QueryOptions{}) - if err != nil { - return nil, fmt.Errorf("%s pull catalog service %s failed: %v", - c.superSpec.Name(), serviceName, err) + for _, catalogService := range catalogServices { + serviceInstance := c.catalogServiceToServiceInstance(catalogService) + if err := serviceInstance.Validate(); err != nil { + return nil, fmt.Errorf("%+v is invalid: %v", serviceInstance, err) } - for _, service := range services { - serviceInstance := c.catalogServiceToServiceInstance(service) - if err := serviceInstance.Validate(); err != nil { - return nil, fmt.Errorf("%+v is invalid: %v", serviceInstance, err) - } - - instances[serviceInstance.Key()] = serviceInstance - } + instances[serviceInstance.Key()] = serviceInstance } return instances, nil @@ -413,11 +405,12 @@ func (c *ConsulServiceRegistry) ListAllServiceInstances() (map[string]*servicere func (c *ConsulServiceRegistry) serviceInstanceToRegistration(serviceInstance *serviceregistry.ServiceInstanceSpec) *api.AgentServiceRegistration { return &api.AgentServiceRegistration{ - Kind: api.ServiceKindTypical, - ID: serviceInstance.InstanceID, - Name: serviceInstance.ServiceName, - Tags: serviceInstance.Tags, - Port: int(serviceInstance.Port), + Kind: api.ServiceKindTypical, + ID: serviceInstance.InstanceID, + Name: serviceInstance.ServiceName, + Tags: serviceInstance.Tags, + Port: int(serviceInstance.Port), + Address: serviceInstance.Address, Meta: map[string]string{ MetaKeyRegistryName: serviceInstance.RegistryName, }, @@ -431,17 +424,12 @@ func (c *ConsulServiceRegistry) catalogServiceToServiceInstance(catalogService * registryName = catalogService.ServiceMeta[MetaKeyRegistryName] } - hostIP := catalogService.ServiceAddress - if hostIP == "" { - hostIP = catalogService.Address - } - return &serviceregistry.ServiceInstanceSpec{ RegistryName: registryName, ServiceName: catalogService.ServiceName, InstanceID: catalogService.ServiceID, Port: uint16(catalogService.ServicePort), Tags: catalogService.ServiceTags, - HostIP: hostIP, + Address: catalogService.Address, } } diff --git a/pkg/object/consulserviceregistry/consulserviceregistry_test.go b/pkg/object/consulserviceregistry/consulserviceregistry_test.go new file mode 100644 index 0000000000..f7f2ccf401 --- /dev/null +++ b/pkg/object/consulserviceregistry/consulserviceregistry_test.go @@ -0,0 +1,169 @@ +package consulserviceregistry + +import ( + "fmt" + "reflect" + "strings" + "testing" + + "github.com/megaease/easegress/pkg/object/serviceregistry" + + "github.com/davecgh/go-spew/spew" + "github.com/hashicorp/consul/api" +) + +type ( + mockClient struct { + client *api.Client + instances map[string]*api.AgentServiceRegistration + } +) + +func newMockClient() *mockClient { + return &mockClient{ + instances: make(map[string]*api.AgentServiceRegistration), + } +} + +func (c *mockClient) ServiceRegister(registration *api.AgentServiceRegistration) error { + c.instances[registration.ID] = registration + return nil +} + +func (c *mockClient) ServiceDeregister(instanceID string) error { + _, exists := c.instances[instanceID] + if !exists { + return fmt.Errorf("%s not found", instanceID) + } + + delete(c.instances, instanceID) + + return nil +} + +func (c *mockClient) ListServiceInstances(serviceName string) ([]*api.CatalogService, error) { + result := []*api.CatalogService{} + + for _, instance := range c.instances { + if instance.Name != serviceName { + continue + } + result = append(result, c.agentToCatalogService(instance)) + } + + return result, nil +} + +func (c *mockClient) ListAllServiceInstances() ([]*api.CatalogService, error) { + result := []*api.CatalogService{} + + for _, instance := range c.instances { + result = append(result, c.agentToCatalogService(instance)) + } + + return result, nil +} + +func (c *mockClient) agentToCatalogService(registration *api.AgentServiceRegistration) *api.CatalogService { + return &api.CatalogService{ + ServiceID: registration.ID, + ServiceName: registration.Name, + ServiceAddress: registration.Address, + ServiceTaggedAddresses: registration.TaggedAddresses, + ServiceTags: registration.Tags, + ServiceMeta: registration.Meta, + ServicePort: registration.Port, + ServiceWeights: api.Weights(*registration.Weights), + ServiceProxy: registration.Proxy, + Namespace: registration.Namespace, + } +} + +func TestApplyServiceInstances(t *testing.T) { + mock := newMockClient() + c := &ConsulServiceRegistry{ + client: mock, + } + + data := []*serviceregistry.ServiceInstanceSpec{ + { + RegistryName: "consul", + ServiceName: "service1", + InstanceID: "instance1", + Address: "localhost", + Port: 18880, + }, + { + RegistryName: "consul", + ServiceName: "service1", + InstanceID: "instance2", + Address: "127.0.0.1", + Port: 18881, + }, + { + RegistryName: "consul", + ServiceName: "service2", + InstanceID: "instance3", + Address: "localhost", + Port: 18883, + }, + } + + instances := map[string]*serviceregistry.ServiceInstanceSpec{} + for _, instance := range data { + instances[instance.Key()] = instance + } + + wantInstances := map[string]*api.AgentServiceRegistration{ + "instance1": { + Name: "service1", + ID: "instance1", + Address: "localhost", + Port: 18880, + Meta: map[string]string{ + MetaKeyRegistryName: "consul", + }, + }, + "instance2": { + Name: "service1", + ID: "instance2", + Address: "127.0.0.1", + Port: 18881, + Meta: map[string]string{ + MetaKeyRegistryName: "consul", + }, + }, + "instance3": { + Name: "service2", + ID: "instance3", + Address: "localhost", + Port: 18883, + Meta: map[string]string{ + MetaKeyRegistryName: "consul", + }, + }, + } + + err := c.ApplyServiceInstances(instances) + if err != nil { + t.Fatalf("apply failed: %v", err) + } + + if !reflect.DeepEqual(wantInstances, mock.instances) { + t.Fatalf("want: %+v, got: %+v", + spew.Sdump(wantInstances), spew.Sdump(mock.instances)) + } + + err = c.ApplyServiceInstances(map[string]*serviceregistry.ServiceInstanceSpec{ + "consul/service1/instance1": { + RegistryName: "consul", + ServiceName: "service1", + InstanceID: "instance1", + Address: "localhost", + }, + }) + + if strings.Index(err.Error(), "port is empty") < 0 { + t.Fatalf("want empty port error, got %v", err) + } +} diff --git a/pkg/object/eurekaserviceregistry/eurekaserviceregistry.go b/pkg/object/eurekaserviceregistry/eurekaserviceregistry.go index e562a6f689..117e202ca9 100644 --- a/pkg/object/eurekaserviceregistry/eurekaserviceregistry.go +++ b/pkg/object/eurekaserviceregistry/eurekaserviceregistry.go @@ -386,12 +386,16 @@ func (e *EurekaServiceRegistry) instanceInfoToServiceInstances(info *eurekaapi.I registryName = info.Metadata.Map[MetaKeyRegistryName] } + address := info.IpAddr + if address == "" { + address = info.HostName + } + baseServiceInstanceSpec := serviceregistry.ServiceInstanceSpec{ RegistryName: registryName, ServiceName: info.App, InstanceID: info.InstanceID, - Hostname: info.HostName, - HostIP: info.IpAddr, + Address: address, } if info.Port != nil && info.Port.Enabled { @@ -417,8 +421,7 @@ func (e *EurekaServiceRegistry) serviceInstanceToInstanceInfo(serviceInstance *s }, App: serviceInstance.ServiceName, InstanceID: serviceInstance.InstanceID, - HostName: serviceInstance.Hostname, - IpAddr: serviceInstance.HostIP, + IpAddr: serviceInstance.Address, } switch serviceInstance.Scheme { diff --git a/pkg/object/meshcontroller/master/registrysyncer.go b/pkg/object/meshcontroller/master/registrysyncer.go index f67cb4184f..335f6d6aa0 100644 --- a/pkg/object/meshcontroller/master/registrysyncer.go +++ b/pkg/object/meshcontroller/master/registrysyncer.go @@ -217,8 +217,8 @@ func (rs *registrySyncer) meshToExternalInstances(instances map[string]*spec.Ser ServiceName: instance.ServiceName, InstanceID: instance.InstanceID, - HostIP: instance.IP, - Port: uint16(instance.Port), + Address: instance.IP, + Port: uint16(instance.Port), } result[externalInstance.Key()] = externalInstance } @@ -231,7 +231,7 @@ func (rs *registrySyncer) externalToMeshInstance(instance *serviceregistry.Servi RegistryName: instance.RegistryName, ServiceName: instance.ServiceName, InstanceID: instance.InstanceID, - IP: instance.HostIP, + IP: instance.Address, Port: uint32(instance.Port), } } diff --git a/pkg/object/nacosserviceregistry/nacosserviceregistry.go b/pkg/object/nacosserviceregistry/nacosserviceregistry.go index b2d2a43876..804877e75f 100644 --- a/pkg/object/nacosserviceregistry/nacosserviceregistry.go +++ b/pkg/object/nacosserviceregistry/nacosserviceregistry.go @@ -473,7 +473,7 @@ func (n *NacosServiceRegistry) serviceInstanceToRegisterInstance(instance *servi }, ServiceName: instance.ServiceName, - Ip: instance.HostIP, + Ip: instance.Address, Port: uint64(instance.Port), Weight: float64(instance.Weight), Enable: true, @@ -484,7 +484,7 @@ func (n *NacosServiceRegistry) serviceInstanceToRegisterInstance(instance *servi func (n *NacosServiceRegistry) serviceInstanceToDeregisterInstance(instance *serviceregistry.ServiceInstanceSpec) *vo.DeregisterInstanceParam { return &vo.DeregisterInstanceParam{ ServiceName: instance.ServiceName, - Ip: instance.HostIP, + Ip: instance.Address, Port: uint64(instance.Port), } } @@ -504,7 +504,7 @@ func (n *NacosServiceRegistry) nacosInstanceToServiceInstance(nacosInstance *mod RegistryName: registryName, ServiceName: nacosInstance.ServiceName, InstanceID: instanceID, - HostIP: nacosInstance.Ip, + Address: nacosInstance.Ip, Port: uint16(nacosInstance.Port), Weight: int(nacosInstance.Weight), } diff --git a/pkg/object/serviceregistry/service.go b/pkg/object/serviceregistry/service.go index b585f055f9..87ac41956f 100644 --- a/pkg/object/serviceregistry/service.go +++ b/pkg/object/serviceregistry/service.go @@ -32,14 +32,13 @@ type ( // InstanceID is required. InstanceID string `yaml:"instanceID"` - // Scheme is optional if Port is not empty. - Scheme string `yaml:"scheme"` - // Hostname is optional if HostIP is not empty. - Hostname string `yaml:"hostname"` - // HostIP is optional if Hostname is not empty. - HostIP string `yaml:"hostIP"` - // Port is optional if Scheme is not empty + // Address is required. + Address string `yaml:"address"` + // Port is required. Port uint16 `yaml:"port"` + + // Scheme is optional. + Scheme string `yaml:"scheme"` // Tags is optional. Tags []string `yaml:"tags"` // Weight is optional. @@ -74,12 +73,12 @@ func (s *ServiceInstanceSpec) Validate() error { return fmt.Errorf("instanceID is empty") } - if s.Hostname == "" && s.HostIP == "" { - return fmt.Errorf("both hostname and hostIP are empty") + if s.Address == "" { + return fmt.Errorf("address is empty") } - if s.Scheme == "" && s.Port == 0 { - return fmt.Errorf("both scheme and port are empty") + if s.Port == 0 { + return fmt.Errorf("port is empty") } switch s.Scheme { @@ -103,19 +102,7 @@ func (s *ServiceInstanceSpec) URL() string { scheme = "http" } - var host string - if s.Hostname != "" { - host = s.Hostname - } else { - host = s.HostIP - } - - var port string - if s.Port != 0 { - port = fmt.Sprintf("%d", s.Port) - } - - return fmt.Sprintf("%s://%s:%s", scheme, host, port) + return fmt.Sprintf("%s://%s:%d", scheme, s.Address, s.Port) } // NewRegistryEventFromDiff creates a registry event from diff old and new specs.