Skip to content

Commit

Permalink
[test]: Add ConsulServiceRegistry unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
xxx7xxxx committed Aug 24, 2021
1 parent 05ebb8a commit e17e648
Show file tree
Hide file tree
Showing 9 changed files with 293 additions and 69 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
github.com/Shopify/sarama v1.29.1
github.com/alecthomas/jsonschema v0.0.0-20210526225647-edb03dcab7bc
github.com/bytecodealliance/wasmtime-go v0.28.0
github.com/davecgh/go-spew v1.1.1
github.com/facebookgo/ensure v0.0.0-20200202191622-63f1cf65ac4c // indirect
github.com/facebookgo/freeport v0.0.0-20150612182905-d4adf43b75b9 // indirect
github.com/facebookgo/stack v0.0.0-20160209184415-751773369052 // indirect
Expand Down
16 changes: 14 additions & 2 deletions pkg/filter/proxy/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,25 +314,37 @@ func TestStaticServers(t *testing.T) {

func TestDynamicService(t *testing.T) {
loadBalance := &LoadBalance{Policy: PolicyRandom}
configServers := []*Server{
{
URL: "http://127.0.0.1:8888",
Tags: []string{"static"},
},
}
s := &servers{
poolSpec: &PoolSpec{
LoadBalance: loadBalance,
Servers: configServers,
},
}

s.useService(nil)
if !reflect.DeepEqual(s.static.servers, configServers) {
t.Fatalf("static servers want %+v, got %+v", configServers, s.static.servers)
}

instances := []*serviceregistry.ServiceInstanceSpec{
{
RegistryName: "registry-test1",
ServiceName: "service-test1",
InstanceID: "instance-test1",
HostIP: "127.0.0.1",
Address: "127.0.0.1",
Port: 1111,
},
{
RegistryName: "registry-test1",
ServiceName: "service-test1",
InstanceID: "instance-test2",
HostIP: "127.0.0.1",
Address: "127.0.0.1",
Port: 2222,
},
}
Expand Down
64 changes: 64 additions & 0 deletions pkg/object/consulserviceregistry/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package consulserviceregistry

import (
"fmt"

"github.com/hashicorp/consul/api"
)

type (
// consulClient is the common client interface for consul.
consulClient interface {
ServiceRegister(registration *api.AgentServiceRegistration) error
ServiceDeregister(instanceID string) error
ListServiceInstances(serviceName string) ([]*api.CatalogService, error)
ListAllServiceInstances() ([]*api.CatalogService, error)
}

consulAPIClient struct {
client *api.Client
}
)

func newConsulAPIClient(client *api.Client) *consulAPIClient {
return &consulAPIClient{
client: client,
}
}

func (c *consulAPIClient) ServiceRegister(registration *api.AgentServiceRegistration) error {
return c.client.Agent().ServiceRegister(registration)
}

func (c *consulAPIClient) ServiceDeregister(instanceID string) error {
return c.client.Agent().ServiceDeregister(instanceID)
}

func (c *consulAPIClient) ListServiceInstances(serviceName string) ([]*api.CatalogService, error) {
resp, _, err := c.client.Catalog().Service(serviceName, "", &api.QueryOptions{})
if err != nil {
return nil, err
}
return resp, nil
}

func (c *consulAPIClient) ListAllServiceInstances() ([]*api.CatalogService, error) {
resp, _, err := c.client.Catalog().Services(&api.QueryOptions{})
if err != nil {
return nil, fmt.Errorf("pull catalog services failed: %v", err)
}

catalogServices := []*api.CatalogService{}
for serviceName := range resp {
services, _, err := c.client.Catalog().Service(serviceName, "", &api.QueryOptions{})
if err != nil {
return nil, fmt.Errorf("pull catalog service %s failed: %v", serviceName, err)
}

for _, service := range services {
catalogServices = append(catalogServices, service)
}
}

return catalogServices, nil
}
54 changes: 21 additions & 33 deletions pkg/object/consulserviceregistry/consulserviceregistry.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ type (
notify chan *serviceregistry.RegistryEvent

clientMutex sync.RWMutex
client *api.Client
client consulClient

statusMutex sync.Mutex
instancesNum map[string]int
Expand Down Expand Up @@ -134,7 +134,7 @@ func (c *ConsulServiceRegistry) reload() {
go c.run()
}

func (c *ConsulServiceRegistry) getClient() (*api.Client, error) {
func (c *ConsulServiceRegistry) getClient() (consulClient, error) {
c.clientMutex.RLock()
if c.client != nil {
client := c.client
Expand All @@ -146,7 +146,7 @@ func (c *ConsulServiceRegistry) getClient() (*api.Client, error) {
return c.buildClient()
}

func (c *ConsulServiceRegistry) buildClient() (*api.Client, error) {
func (c *ConsulServiceRegistry) buildClient() (consulClient, error) {
c.clientMutex.Lock()
defer c.clientMutex.Unlock()

Expand Down Expand Up @@ -176,9 +176,9 @@ func (c *ConsulServiceRegistry) buildClient() (*api.Client, error) {
return nil, err
}

c.client = client
c.client = newConsulAPIClient(client)

return client, nil
return c.client, nil
}

func (c *ConsulServiceRegistry) closeClient() {
Expand Down Expand Up @@ -305,7 +305,7 @@ func (c *ConsulServiceRegistry) ApplyServiceInstances(instances map[string]*serv

for _, instance := range instances {
registration := c.serviceInstanceToRegistration(instance)
err = client.Agent().ServiceRegister(registration)
err = client.ServiceRegister(registration)
if err != nil {
return err
}
Expand All @@ -323,7 +323,7 @@ func (c *ConsulServiceRegistry) DeleteServiceInstances(instances map[string]*ser
}

for _, instance := range instances {
err := client.Agent().ServiceDeregister(instance.InstanceID)
err := client.ServiceDeregister(instance.InstanceID)
if err != nil {
return err
}
Expand Down Expand Up @@ -356,7 +356,7 @@ func (c *ConsulServiceRegistry) ListServiceInstances(serviceName string) (map[st
c.superSpec.Name(), err)
}

catalogServices, _, err := client.Catalog().Service(serviceName, "", &api.QueryOptions{})
catalogServices, err := client.ListServiceInstances(serviceName)

if err != nil {
return nil, err
Expand Down Expand Up @@ -384,40 +384,33 @@ func (c *ConsulServiceRegistry) ListAllServiceInstances() (map[string]*servicere
c.superSpec.Name(), err)
}

resp, _, err := client.Catalog().Services(&api.QueryOptions{})
catalogServices, err := client.ListAllServiceInstances()
if err != nil {
return nil, fmt.Errorf("%s pull catalog services failed: %v",
c.superSpec.Name(), err)
}

instances := make(map[string]*serviceregistry.ServiceInstanceSpec)
for serviceName := range resp {
services, _, err := client.Catalog().Service(serviceName, "", &api.QueryOptions{})
if err != nil {
return nil, fmt.Errorf("%s pull catalog service %s failed: %v",
c.superSpec.Name(), serviceName, err)
for _, catalogService := range catalogServices {
serviceInstance := c.catalogServiceToServiceInstance(catalogService)
if err := serviceInstance.Validate(); err != nil {
return nil, fmt.Errorf("%+v is invalid: %v", serviceInstance, err)
}

for _, service := range services {
serviceInstance := c.catalogServiceToServiceInstance(service)
if err := serviceInstance.Validate(); err != nil {
return nil, fmt.Errorf("%+v is invalid: %v", serviceInstance, err)
}

instances[serviceInstance.Key()] = serviceInstance
}
instances[serviceInstance.Key()] = serviceInstance
}

return instances, nil
}

func (c *ConsulServiceRegistry) serviceInstanceToRegistration(serviceInstance *serviceregistry.ServiceInstanceSpec) *api.AgentServiceRegistration {
return &api.AgentServiceRegistration{
Kind: api.ServiceKindTypical,
ID: serviceInstance.InstanceID,
Name: serviceInstance.ServiceName,
Tags: serviceInstance.Tags,
Port: int(serviceInstance.Port),
Kind: api.ServiceKindTypical,
ID: serviceInstance.InstanceID,
Name: serviceInstance.ServiceName,
Tags: serviceInstance.Tags,
Port: int(serviceInstance.Port),
Address: serviceInstance.Address,
Meta: map[string]string{
MetaKeyRegistryName: serviceInstance.RegistryName,
},
Expand All @@ -431,17 +424,12 @@ func (c *ConsulServiceRegistry) catalogServiceToServiceInstance(catalogService *
registryName = catalogService.ServiceMeta[MetaKeyRegistryName]
}

hostIP := catalogService.ServiceAddress
if hostIP == "" {
hostIP = catalogService.Address
}

return &serviceregistry.ServiceInstanceSpec{
RegistryName: registryName,
ServiceName: catalogService.ServiceName,
InstanceID: catalogService.ServiceID,
Port: uint16(catalogService.ServicePort),
Tags: catalogService.ServiceTags,
HostIP: hostIP,
Address: catalogService.Address,
}
}
Loading

0 comments on commit e17e648

Please sign in to comment.