Skip to content

Commit

Permalink
[mesh]: Complete mesh registry syncer
Browse files Browse the repository at this point in the history
  • Loading branch information
xxx7xxxx committed Aug 15, 2021
1 parent 74a5fd2 commit ae4adcc
Show file tree
Hide file tree
Showing 8 changed files with 242 additions and 160 deletions.
125 changes: 0 additions & 125 deletions pkg/filter/proxy/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,128 +310,3 @@ func TestStaticServers(t *testing.T) {
}
}
}

// TODO: Mock supervisor to test dynamic server.

// func TestServers(t *testing.T) {
// servers := []*Server{
// {
// URL: "http://127.0.0.1:9090",
// Tags: []string{"d1", "v1", "green"},
// Weight: 1,
// },
// {
// URL: "http://127.0.0.1:9091",
// Tags: []string{"v1", "d1", "green"},
// Weight: 2,
// },
// {
// URL: "http://127.0.0.1:9092",
// Tags: []string{"green", "d1", "v1"},
// Weight: 3,
// },
// {
// URL: "http://127.0.0.1:9093",
// Tags: []string{"v1"},
// Weight: 4,
// },
// {
// URL: "http://127.0.0.1:9094",
// Tags: []string{"v1", "v3"},
// Weight: 5,
// },
// }
// ps := &PoolSpec{
// ServersTags: []string{},
// ServiceRegistry: "service registry",
// ServiceName: "service name",
// Servers: []*Server{},
// }
//
// ctx := &contexttest.MockedHTTPContext{}
//
// s := newServers(ps)
// if s.len() != 0 {
// t.Errorf("servers.len() should be 0")
// }
// s.close()
//
// ps = &PoolSpec{
// ServersTags: []string{},
// ServiceRegistry: "service registry",
// ServiceName: "service name",
// Servers: servers,
// }
// s = newServers(ps)
// if s.len() != len(servers) {
// t.Errorf("servers.len() is not %d", len(servers))
// }
// for i := 0; i < len(servers); i++ {
// if svr, e := s.next(ctx); e != nil || svr != servers[i] {
// t.Errorf("ss.next() returns unexpected server")
// }
// }
// s.close()
//
// ps = &PoolSpec{
// ServersTags: []string{},
// ServiceRegistry: "service registry",
// Servers: []*Server{},
// ServiceName: "testservice",
// }
// fnGetService.Store(func(serviceRegistry, serviceName string) (*serviceregistry.Service, error) {
// return nil, fmt.Errorf("dummy error")
// })
// s = newServers(ps)
// if s.len() != 0 {
// t.Errorf("servers.len() should be 0")
// }
// s.close()
//
// svcservers := []*serviceregistry.Server{
// {
// ServiceName: "testservice",
// Hostname: "server1",
// HostIP: "192.168.1.1",
// Port: 80,
// },
// {
// ServiceName: "testservice",
// Hostname: "server2",
// HostIP: "192.168.1.2",
// Port: 80,
// },
// {
// ServiceName: "testservice",
// Hostname: "server3",
// HostIP: "192.168.1.3",
// Port: 80,
// },
// }
// service, _ := serviceregistry.NewService("testservice", svcservers)
//
// fnGetService.Store(func(serviceRegistry, serviceName string) (*serviceregistry.Service, error) {
// return service, nil
// })
// s = newServers(ps)
// if s.len() != len(service.Servers()) {
// t.Errorf("servers.len() is not %d", len(service.Servers()))
// }
//
// svcservers = append(svcservers, &serviceregistry.Server{
// ServiceName: "testservice",
// Hostname: "server4",
// HostIP: "192.168.1.4",
// Port: 80,
// })
// service.Update(svcservers)
//
// time.Sleep(100 * time.Millisecond)
// if s.len() != len(service.Servers()) {
// t.Errorf("servers.len() is not %d", len(service.Servers()))
// }
//
// service.Close("close")
// s.close()
// }
//
10 changes: 6 additions & 4 deletions pkg/object/meshcontroller/master/master.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,9 @@ type (
spec *spec.Admin
maxHeartbeatTimeout time.Duration

store storage.Storage
service *service.Service
registrySyncer *registrySyncer
store storage.Storage
service *service.Service

done chan struct{}
}
Expand All @@ -59,8 +60,9 @@ func New(superSpec *supervisor.Spec) *Master {
superSpec: superSpec,
spec: adminSpec,

store: store,
service: service.New(superSpec),
store: store,
service: service.New(superSpec),
registrySyncer: newRegistrySyncer(superSpec),

done: make(chan struct{}),
}
Expand Down
178 changes: 159 additions & 19 deletions pkg/object/meshcontroller/master/registrysyncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,42 +20,57 @@ package master
import (
"time"

"github.com/megaease/easegress/pkg/logger"
"github.com/megaease/easegress/pkg/object/meshcontroller/informer"
"github.com/megaease/easegress/pkg/object/meshcontroller/service"
"github.com/megaease/easegress/pkg/object/meshcontroller/spec"
"github.com/megaease/easegress/pkg/object/meshcontroller/storage"
"github.com/megaease/easegress/pkg/object/serviceregistry"
"github.com/megaease/easegress/pkg/supervisor"
"github.com/megaease/easegress/pkg/util/stringtool"
)

const (
syncInterval = 10 * time.Second
)

type (
registrySyncer struct {
superSpec *supervisor.Spec
spec *spec.Admin
syncInterval time.Duration
superSpec *supervisor.Spec
spec *spec.Admin

service *service.Service
externalInstances map[string]*serviceregistry.ServiceInstanceSpec
service *service.Service
informer informer.Informer
serviceRegistry *serviceregistry.ServiceRegistry
registryWatcher serviceregistry.RegistryWatcher

done chan struct{}
}

// ExternalRegistry is the interface of external service registry.
ExternalRegistry interface {
SyncFromExternal()
SyncToExternal()
ListExternalServices()
Notify() <-chan struct{}
}
)

func newRegistrySyncer(superSpec *supervisor.Spec, registryController string) *registrySyncer {
func newRegistrySyncer(superSpec *supervisor.Spec) *registrySyncer {
spec := superSpec.ObjectSpec().(*spec.Admin)

rs := &registrySyncer{
superSpec: superSpec,
spec: spec,
// syncInterval: syncInteral,
service: service.New(superSpec),
done: make(chan struct{}),

done: make(chan struct{}),
}

if spec.ExternalServiceRegistry == "" {
return rs
}

store := storage.New(superSpec.Name(), superSpec.Super().Cluster())
rs.service = service.New(superSpec)
rs.informer = informer.NewInformer(store, "")
rs.informer.OnAllServiceInstanceSpecs(rs.serviceInstanceSpecsFunc)

rs.serviceRegistry = superSpec.Super().MustGetSystemController(serviceregistry.Kind).Instance().(*serviceregistry.ServiceRegistry)
rs.registryWatcher = rs.serviceRegistry.NewRegistryWatcher(spec.ExternalServiceRegistry)

go rs.run()

return rs
Expand All @@ -66,12 +81,137 @@ func (rs *registrySyncer) run() {
select {
case <-rs.done:
return
case <-time.After(rs.syncInterval):
rs.sync()
case event := <-rs.registryWatcher.Watch():
rs.handleEvent(event)
}
}
}

func (rs *registrySyncer) handleEvent(event *serviceregistry.RegistryEvent) {
defer func() {
if r := recover(); r != nil {
logger.Errorf("recover from %v", r)
}
}()

if event.UseReplace {
oldInstances := rs.service.ListAllServiceInstanceSpecs()
for _, oldInstance := range oldInstances {
if oldInstance.RegistryName != rs.externalRegistryName() {
continue
}

instance := &serviceregistry.ServiceInstanceSpec{
RegistryName: oldInstance.RegistryName,
ServiceName: oldInstance.ServiceName,
InstanceID: oldInstance.InstanceID,
}
_, existed := event.Replace[instance.Key()]
if !existed {
rs.service.DeleteServiceInstanceSpec(oldInstance.ServiceName, oldInstance.InstanceID)
}
}

for _, instance := range event.Replace {
rs.service.PutServiceInstanceSpec(rs.externalToMeshInstance(instance))
}
return
}

for _, instance := range event.Delete {
rs.service.DeleteServiceInstanceSpec(instance.ServiceName, instance.InstanceID)
}
for _, instance := range event.Apply {
rs.service.PutServiceInstanceSpec(rs.externalToMeshInstance(instance))
}
}

func (rs *registrySyncer) serviceInstanceSpecsFunc(meshInstances map[string]*spec.ServiceInstanceSpec) bool {
oldInstances, err := rs.serviceRegistry.ListAllServiceInstances(rs.spec.ExternalServiceRegistry)
if err != nil {
logger.Errorf("list all service instances of %s: %v", rs.spec.ExternalServiceRegistry, err)
return true
}
oldInstances = rs.filterExternalInstances(oldInstances, rs.meshRegistryName())

meshInstances = rs.filterMeshInstances(meshInstances, "", rs.meshRegistryName())
newInstances := rs.meshToExternalInstances(meshInstances)

event := serviceregistry.NewRegistryEventFromDiff(rs.meshRegistryName(), oldInstances, newInstances)
if len(event.Apply) != 0 {
rs.serviceRegistry.ApplyServiceInstances(rs.externalRegistryName(), event.Apply)
}
if len(event.Delete) != 0 {
rs.serviceRegistry.DeleteServiceInstances(rs.externalRegistryName(), event.Delete)
}

return true
}

func (rs *registrySyncer) meshRegistryName() string {
return rs.superSpec.Name()
}

func (rs *registrySyncer) externalRegistryName() string {
return rs.spec.ExternalServiceRegistry
}

func (rs *registrySyncer) filterExternalInstances(instances map[string]*serviceregistry.ServiceInstanceSpec, registryNames ...string) map[string]*serviceregistry.ServiceInstanceSpec {
result := make(map[string]*serviceregistry.ServiceInstanceSpec)

for _, instance := range instances {
if stringtool.StrInSlice(instance.RegistryName, registryNames) {
result[instance.Key()] = instance
}
}

return result
}

func (rs *registrySyncer) filterMeshInstances(instances map[string]*spec.ServiceInstanceSpec, registryNames ...string) map[string]*spec.ServiceInstanceSpec {
result := make(map[string]*spec.ServiceInstanceSpec)

for _, instance := range instances {
if stringtool.StrInSlice(instance.RegistryName, registryNames) {
result[instance.InstanceID] = instance
}
}

return result
}

func (rs *registrySyncer) meshToExternalInstances(instances map[string]*spec.ServiceInstanceSpec) map[string]*serviceregistry.ServiceInstanceSpec {
result := make(map[string]*serviceregistry.ServiceInstanceSpec)

for _, instance := range instances {
externalInstance := &serviceregistry.ServiceInstanceSpec{
RegistryName: instance.RegistryName,
ServiceName: instance.ServiceName,
InstanceID: instance.InstanceID,

HostIP: instance.IP,
Port: uint16(instance.Port),
}
result[externalInstance.Key()] = externalInstance
}

return result
}

func (rs *registrySyncer) sync() {
func (rs *registrySyncer) externalToMeshInstance(instance *serviceregistry.ServiceInstanceSpec) *spec.ServiceInstanceSpec {
return &spec.ServiceInstanceSpec{
RegistryName: instance.RegistryName,
ServiceName: instance.ServiceName,
InstanceID: instance.InstanceID,
IP: instance.HostIP,
Port: uint32(instance.Port),
}
}

func (rs *registrySyncer) close() {
close(rs.done)

if rs.informer != nil {
rs.informer.Close()
}
}
8 changes: 8 additions & 0 deletions pkg/object/meshcontroller/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,14 @@ func (s *Service) PutServiceInstanceSpec(_spec *spec.ServiceInstanceSpec) {
}
}

// DeleteServiceInstanceSpec deletes the service instance spec.
func (s *Service) DeleteServiceInstanceSpec(serviceName, instanceID string) {
err := s.store.Delete(layout.ServiceInstanceSpecKey(serviceName, instanceID))
if err != nil {
api.ClusterPanic(err)
}
}

// ListTenantSpecs lists tenant specs
func (s *Service) ListTenantSpecs() []*spec.Tenant {
tenants := []*spec.Tenant{}
Expand Down
Loading

0 comments on commit ae4adcc

Please sign in to comment.