Skip to content

Commit

Permalink
[mesh]: Support bi-directional service registry controllers
Browse files Browse the repository at this point in the history
  • Loading branch information
xxx7xxxx committed Aug 24, 2021
1 parent a80009e commit f75640c
Show file tree
Hide file tree
Showing 17 changed files with 1,463 additions and 725 deletions.
5 changes: 3 additions & 2 deletions pkg/filter/proxy/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

"github.com/megaease/easegress/pkg/context"
"github.com/megaease/easegress/pkg/logger"
"github.com/megaease/easegress/pkg/supervisor"
"github.com/megaease/easegress/pkg/tracing"
"github.com/megaease/easegress/pkg/util/callbackreader"
"github.com/megaease/easegress/pkg/util/httpfilter"
Expand Down Expand Up @@ -96,7 +97,7 @@ func (s PoolSpec) Validate() error {
return nil
}

func newPool(spec *PoolSpec, tagPrefix string,
func newPool(super *supervisor.Supervisor, spec *PoolSpec, tagPrefix string,
writeResponse bool, failureCodes []int) *pool {

var filter *httpfilter.HTTPFilter
Expand All @@ -116,7 +117,7 @@ func newPool(spec *PoolSpec, tagPrefix string,
writeResponse: writeResponse,

filter: filter,
servers: newServers(spec),
servers: newServers(super, spec),
httpStat: httpstat.New(),
memoryCache: memoryCache,
}
Expand Down
11 changes: 7 additions & 4 deletions pkg/filter/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,9 @@ func (b *Proxy) Inherit(filterSpec *httppipeline.FilterSpec, previousGeneration
}

func (b *Proxy) reload() {
b.mainPool = newPool(b.spec.MainPool, "proxy#main",
super := b.filterSpec.Super()

b.mainPool = newPool(super, b.spec.MainPool, "proxy#main",
true /*writeResponse*/, b.spec.FailureCodes)

if b.spec.Fallback != nil {
Expand All @@ -205,13 +207,14 @@ func (b *Proxy) reload() {
if len(b.spec.CandidatePools) > 0 {
var candidatePools []*pool
for k := range b.spec.CandidatePools {
candidatePools = append(candidatePools, newPool(b.spec.CandidatePools[k], fmt.Sprintf("proxy#candidate#%d", k),
true, b.spec.FailureCodes))
candidatePools = append(candidatePools,
newPool(super, b.spec.CandidatePools[k], fmt.Sprintf("proxy#candidate#%d", k),
true, b.spec.FailureCodes))
}
b.candidatePools = candidatePools
}
if b.spec.MirrorPool != nil {
b.mirrorPool = newPool(b.spec.MirrorPool, "proxy#mirror",
b.mirrorPool = newPool(super, b.spec.MirrorPool, "proxy#mirror",
false /*writeResponse*/, b.spec.FailureCodes)
}

Expand Down
150 changes: 54 additions & 96 deletions pkg/filter/proxy/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,11 @@ import (
"github.com/megaease/easegress/pkg/context"
"github.com/megaease/easegress/pkg/logger"
"github.com/megaease/easegress/pkg/object/serviceregistry"
"github.com/megaease/easegress/pkg/supervisor"
"github.com/megaease/easegress/pkg/util/hashtool"
"github.com/megaease/easegress/pkg/util/stringtool"
)

// make it mockable in test
var fnGetService atomic.Value

func init() {
rand.Seed(time.Now().UnixNano())
fnGetService.Store(func(serviceRegistry, serviceName string) (*serviceregistry.Service, error) {
return serviceregistry.Global.GetService(serviceRegistry, serviceName)
})
}

const (
// PolicyRoundRobin is the policy of round-robin.
PolicyRoundRobin = "roundRobin"
Expand All @@ -60,10 +51,11 @@ type (
servers struct {
poolSpec *PoolSpec

mutex sync.Mutex
service *serviceregistry.Service
static *staticServers
done chan struct{}
mutex sync.Mutex
serviceRegistry *serviceregistry.ServiceRegistry
serviceWatcher serviceregistry.ServiceWatcher
static *staticServers
done chan struct{}
}

staticServers struct {
Expand Down Expand Up @@ -100,81 +92,58 @@ func (lb LoadBalance) Validate() error {
return nil
}

func newServers(poolSpec *PoolSpec) *servers {
func newServers(super *supervisor.Supervisor, poolSpec *PoolSpec) *servers {
s := &servers{
poolSpec: poolSpec,
done: make(chan struct{}),
}

s.tryUpdateService()
s.useStaticServers()

if poolSpec.ServiceRegistry == "" || poolSpec.ServiceName == "" {
return s
}

s.serviceRegistry = super.MustGetSystemController(serviceregistry.Kind).
Instance().(*serviceregistry.ServiceRegistry)
s.serviceWatcher = s.serviceRegistry.NewServiceWatcher(poolSpec.ServiceRegistry, poolSpec.ServiceName)

go s.run()
go s.watchService()

return s
}

func (s *servers) run() {
if s.poolSpec.ServiceName == "" {
return
func (s *servers) watchService() {
serviceSpec, err := s.serviceRegistry.GetService(s.poolSpec.ServiceRegistry, s.poolSpec.ServiceName)
if err != nil {
logger.Warnf("ger service %s/%s failed: %v",
s.poolSpec.ServiceRegistry, s.poolSpec.ServiceName, err)
}

for {
service := s.mustUpdateService()
s.useService(serviceSpec)

// NOTE: The servers is closed.
if service == nil {
return
}

select {
// NOTE: Defensive programming.
case <-s.done:
return
case <-service.Updated():
logger.Infof("service %s updated, try to update",
s.poolSpec.ServiceName)
case <-service.Closed():
logger.Warnf("service %s closed: %s, try to get again",
s.poolSpec.ServiceName, service.CloseMessage())
}
}
}

// mustUpdateService blocks until getting the service or closed.
func (s *servers) mustUpdateService() *serviceregistry.Service {
for {
service, err := s.useService()
if err == nil {
return service
}
logger.Warnf("%v", err)
select {
case <-s.done:
return nil
case <-time.After(retryTimeout):
return
case event := <-s.serviceWatcher.Watch():
s.handleServiceEvent(event)
}
}
}

// tryUpdateService uses static servers if it failed to get service.
func (s *servers) tryUpdateService() {
if s.poolSpec.ServiceName == "" {
func (s *servers) handleServiceEvent(event *serviceregistry.ServiceEvent) {
if event.Delete != nil {
logger.Warnf("service %s delete, use static servers", s.poolSpec.ServiceName)
s.useStaticServers()
return
}

_, err := s.useService()
if err == nil {
err := s.useService(event.Apply)
if err != nil {
logger.Warnf("use service %s failed: %v", s.poolSpec.ServiceName, err)
return
}

logger.Errorf("%v", err)
if len(s.poolSpec.Servers) > 0 {
logger.Warnf("fallback to static severs")
s.useStaticServers()
} else {
logger.Warnf("no static server available either")
}
}

func (s *servers) useStaticServers() {
Expand All @@ -183,58 +152,43 @@ func (s *servers) useStaticServers() {
s.static = newStaticServers(s.poolSpec.Servers,
s.poolSpec.ServersTags,
s.poolSpec.LoadBalance)
s.service = nil
}

func getService(serviceRegistry, serviceName string) (*serviceregistry.Service, error) {
fn := fnGetService.Load().(func(serviceRegistry, serviceName string) (*serviceregistry.Service, error))
return fn(serviceRegistry, serviceName)
}

func (s *servers) useService() (*serviceregistry.Service, error) {
s.mutex.Lock()
defer s.mutex.Unlock()

service, err := getService(s.poolSpec.ServiceRegistry, s.poolSpec.ServiceName)
if err != nil {
return nil, fmt.Errorf("get service %s failed: %v", s.poolSpec.ServiceName, err)
}

var serversInput []*Server
servers := service.Servers()
for _, snapshotServer := range servers {
serversInput = append(serversInput, &Server{
URL: snapshotServer.URL(),
Tags: snapshotServer.Tags,
Weight: snapshotServer.Weight,
func (s *servers) useService(serviceSpec *serviceregistry.ServiceSpec) error {
var servers []*Server
for _, serviceInstance := range serviceSpec.Instances {
servers = append(servers, &Server{
URL: serviceInstance.URL(),
Tags: serviceInstance.Tags,
Weight: serviceInstance.Weight,
})
}
static := newStaticServers(serversInput, s.poolSpec.ServersTags, s.poolSpec.LoadBalance)
if len(servers) == 0 {
return fmt.Errorf("empty service instance")
}

s.static, s.service = static, service
s.mutex.Lock()
defer s.mutex.Unlock()
s.static = newStaticServers(servers, s.poolSpec.ServersTags, s.poolSpec.LoadBalance)

return service, nil
return nil
}

func (s *servers) snapshot() (*staticServers, *serviceregistry.Service) {
func (s *servers) snapshot() *staticServers {
s.mutex.Lock()
defer s.mutex.Unlock()

return s.static, s.service
return s.static
}

func (s *servers) len() int {
static, _ := s.snapshot()

if static == nil {
return 0
}
static := s.snapshot()

return static.len()
}

func (s *servers) next(ctx context.HTTPContext) (*Server, error) {
static, _ := s.snapshot()
static := s.snapshot()

if static.len() == 0 {
return nil, fmt.Errorf("no server available")
Expand All @@ -245,6 +199,10 @@ func (s *servers) next(ctx context.HTTPContext) (*Server, error) {

func (s *servers) close() {
close(s.done)

if s.serviceWatcher != nil {
s.serviceWatcher.Stop()
}
}

func newStaticServers(servers []*Server, tags []string, lb *LoadBalance) *staticServers {
Expand Down
Loading

0 comments on commit f75640c

Please sign in to comment.