Skip to content

Commit

Permalink
[mesh]: Add more checking
Browse files Browse the repository at this point in the history
  • Loading branch information
xxx7xxxx committed Aug 24, 2021
1 parent 2683d88 commit 587520a
Show file tree
Hide file tree
Showing 18 changed files with 210 additions and 50 deletions.
4 changes: 2 additions & 2 deletions example/config/etcd-service-registry-example.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
kind: EtcdServiceRegistry
name: etcd-service-registry-example
endpoints: ['127.0.0.1:12379']
endpoints: ['127.0.0.1:2379']
prefix: "/services/"
cacheTimeout: 10s
cacheTimeout: 5s
2 changes: 2 additions & 0 deletions example/config/http-pipeline-example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ filters:
X-Filter:
exact: candidate
mainPool:
serviceRegistry: etcd-service-registry-example
serviceName: service-001
serversTags: ["v2"]
servers:
- url: http://127.0.0.1:9095
Expand Down
3 changes: 1 addition & 2 deletions example/config/mesh-controller-example.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
name: mesh-controller-example
kind: MeshController
specUpdateInterval: 10s
heartbeatInterval: 5s
registryType: consul
serviceName: service-001
externalServiceRegistry: etcd-service-registry-example
7 changes: 7 additions & 0 deletions example/config/server-001-instance-001.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
registryName: etcd-service-registry-example
serviceName: service-001
instanceID: intance-001
scheme: http
hostIP: 127.0.0.1
port: 9091
tags: [v1]
7 changes: 7 additions & 0 deletions example/config/server-001-instance-002.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
registryName: etcd-service-registry-example
serviceName: service-001
instanceID: intance-002
scheme:
hostIP: 127.0.0.1
port: 9092
tags: [v2]
15 changes: 3 additions & 12 deletions pkg/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (

"github.com/megaease/easegress/pkg/logger"
"github.com/megaease/easegress/pkg/option"
"github.com/megaease/easegress/pkg/util/contexttool"
)

const (
Expand Down Expand Up @@ -165,23 +166,13 @@ func New(opt *option.Options) (Cluster, error) {
// requestContext returns context with request timeout,
// please use it immediately in case of incorrect timeout.
func (c *cluster) requestContext() context.Context {
ctx, cancel := context.WithTimeout(context.Background(), c.requestTimeout)
go func() {
time.Sleep(c.requestTimeout)
cancel()
}()
return ctx
return contexttool.TimeoutContext(c.requestTimeout)
}

// longRequestContext takes 3 times longer than requestContext.
func (c *cluster) longRequestContext() context.Context {
requestTimeout := 3 * c.requestTimeout
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
go func() {
time.Sleep(requestTimeout)
cancel()
}()
return ctx
return contexttool.TimeoutContext(requestTimeout)
}

func (c *cluster) run() {
Expand Down
6 changes: 6 additions & 0 deletions pkg/filter/proxy/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,12 @@ func (p *pool) status() *PoolStatus {
}

func (p *pool) handle(ctx context.HTTPContext, reqBody io.Reader) string {
// FIXME: When the request matched mirror handler, there will be
// data race warning. Because after proxy handled the request,
// the HTTPPipeline and mux of HTTPServer will call ctx.AddTag() too.
// Even the the situation is not a bug, but the cautious data race
// detector of golang will warn it.
// Add a method ctx.AddTagWithLock()?
addTag := func(subPrefix, msg string) {
tag := stringtool.Cat(p.tagPrefix, "#", subPrefix, ": ", msg)
ctx.Lock()
Expand Down
26 changes: 19 additions & 7 deletions pkg/filter/proxy/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,18 +106,18 @@ func newServers(super *supervisor.Supervisor, poolSpec *PoolSpec) *servers {
return s
}

s.serviceRegistry = s.super.MustGetSystemController(serviceregistry.Kind).
Instance().(*serviceregistry.ServiceRegistry)

s.tryUseService()
s.serviceWatcher = s.serviceRegistry.NewServiceWatcher(s.poolSpec.ServiceRegistry, s.poolSpec.ServiceName)

go s.watchService()

return s
}

func (s *servers) watchService() {
s.tryUseService()

s.serviceRegistry = s.super.MustGetSystemController(serviceregistry.Kind).
Instance().(*serviceregistry.ServiceRegistry)
s.serviceWatcher = s.serviceRegistry.NewServiceWatcher(s.poolSpec.ServiceRegistry, s.poolSpec.ServiceName)

for {
select {
case <-s.done:
Expand All @@ -134,12 +134,14 @@ func (s *servers) handleEvent(event *serviceregistry.ServiceEvent) {

func (s *servers) tryUseService() {
serviceInstanceSpecs, err := s.serviceRegistry.ListServiceInstances(s.poolSpec.ServiceRegistry, s.poolSpec.ServiceName)

if err != nil {
logger.Errorf("get service %s/%s failed: %v",
s.poolSpec.ServiceRegistry, s.poolSpec.ServiceName, err)
s.useStaticServers()
return
}

s.useService(serviceInstanceSpecs)
}

Expand All @@ -159,9 +161,19 @@ func (s *servers) useService(serviceInstanceSpecs map[string]*serviceregistry.Se
return
}

staticServers := newStaticServers(servers, s.poolSpec.ServersTags, s.poolSpec.LoadBalance)
if staticServers.len() == 0 {
logger.Errorf("%s/%s: no service instance satisfy tags: %v",
s.poolSpec.ServiceRegistry, s.poolSpec.ServiceName, s.poolSpec.ServersTags)
s.useStaticServers()
}

logger.Infof("use dynamic service: %s/%s", s.poolSpec.ServiceRegistry, s.poolSpec.ServiceName)

s.mutex.Lock()
defer s.mutex.Unlock()
s.static = newStaticServers(servers, s.poolSpec.ServersTags, s.poolSpec.LoadBalance)
s.static = staticServers

}

func (s *servers) useStaticServers() {
Expand Down
8 changes: 7 additions & 1 deletion pkg/object/consulserviceregistry/consulserviceregistry.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,6 @@ func (c *ConsulServiceRegistry) Inherit(superSpec *supervisor.Spec, previousGene
func (c *ConsulServiceRegistry) reload() {
c.serviceRegistry = c.superSpec.Super().MustGetSystemController(serviceregistry.Kind).
Instance().(*serviceregistry.ServiceRegistry)
c.serviceRegistry.RegisterRegistry(c)
c.notify = make(chan *serviceregistry.RegistryEvent, 10)
c.firstDone = false

Expand All @@ -130,6 +129,8 @@ func (c *ConsulServiceRegistry) reload() {
logger.Errorf("%s get consul client failed: %v", c.superSpec.Name(), err)
}

c.serviceRegistry.RegisterRegistry(c)

go c.run()
}

Expand Down Expand Up @@ -228,12 +229,17 @@ func (c *ConsulServiceRegistry) update() {
c.firstDone = true
event = &serviceregistry.RegistryEvent{
SourceRegistryName: c.Name(),
UseReplace: true,
Replace: instances,
}
} else {
event = serviceregistry.NewRegistryEventFromDiff(c.Name(), c.instances, instances)
}

if event.Empty() {
return
}

c.notify <- event
c.instances = instances

Expand Down
24 changes: 16 additions & 8 deletions pkg/object/etcdserviceregistry/etcdserviceregistry.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package eserviceregistry

import (
"context"
"fmt"
"path/filepath"
"sync"
Expand All @@ -27,6 +26,7 @@ import (
"github.com/megaease/easegress/pkg/logger"
"github.com/megaease/easegress/pkg/object/serviceregistry"
"github.com/megaease/easegress/pkg/supervisor"
"github.com/megaease/easegress/pkg/util/contexttool"

clientv3 "go.etcd.io/etcd/client/v3"
"gopkg.in/yaml.v2"
Expand All @@ -38,6 +38,8 @@ const (

// Kind is the kind of EtcdServiceRegistry.
Kind = "EtcdServiceRegistry"

requestTimeout = 5 * time.Second
)

func init() {
Expand Down Expand Up @@ -92,7 +94,7 @@ func (e *EtcdServiceRegistry) Kind() string {
func (e *EtcdServiceRegistry) DefaultSpec() interface{} {
return &Spec{
Prefix: "/services/",
CacheTimeout: "60s",
CacheTimeout: "10s",
}
}

Expand All @@ -111,7 +113,6 @@ func (e *EtcdServiceRegistry) Inherit(superSpec *supervisor.Spec, previousGenera
func (e *EtcdServiceRegistry) reload() {
e.serviceRegistry = e.superSpec.Super().MustGetSystemController(serviceregistry.Kind).
Instance().(*serviceregistry.ServiceRegistry)
e.serviceRegistry.RegisterRegistry(e)
e.firstDone = false
e.notify = make(chan *serviceregistry.RegistryEvent, 10)

Expand All @@ -123,6 +124,8 @@ func (e *EtcdServiceRegistry) reload() {
logger.Errorf("%s get etcd client failed: %v", e.superSpec.Name(), err)
}

e.serviceRegistry.RegisterRegistry(e)

go e.run()
}

Expand Down Expand Up @@ -216,12 +219,17 @@ func (e *EtcdServiceRegistry) update() {
e.firstDone = true
event = &serviceregistry.RegistryEvent{
SourceRegistryName: e.Name(),
UseReplace: true,
Replace: instances,
}
} else {
event = serviceregistry.NewRegistryEventFromDiff(e.Name(), e.instances, instances)
}

if event.Empty() {
return
}

e.notify <- event
e.instances = instances

Expand Down Expand Up @@ -294,7 +302,7 @@ func (e *EtcdServiceRegistry) ApplyServiceInstances(instances map[string]*servic
ops = append(ops, clientv3.OpPut(key, string(buff)))
}

_, err = client.Txn(context.Background()).Then(ops...).Commit()
_, err = client.Txn(contexttool.TimeoutContext(requestTimeout)).Then(ops...).Commit()
if err != nil {
return err
}
Expand All @@ -316,7 +324,7 @@ func (e *EtcdServiceRegistry) DeleteServiceInstances(instances map[string]*servi
ops = append(ops, clientv3.OpDelete(key))
}

_, err = client.Txn(context.Background()).Then(ops...).Commit()
_, err = client.Txn(contexttool.TimeoutContext(requestTimeout)).Then(ops...).Commit()
if err != nil {
return err
}
Expand All @@ -332,7 +340,7 @@ func (e *EtcdServiceRegistry) GetServiceInstance(serviceName, instanceID string)
e.superSpec.Name(), err)
}

resp, err := client.Get(context.Background(), e.serviceInstanceEtcdKeyFromRaw(serviceName, instanceID))
resp, err := client.Get(contexttool.TimeoutContext(requestTimeout), e.serviceInstanceEtcdKeyFromRaw(serviceName, instanceID))
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -363,7 +371,7 @@ func (e *EtcdServiceRegistry) ListServiceInstances(serviceName string) (map[stri
e.superSpec.Name(), err)
}

resp, err := client.Get(context.Background(), e.serviceEtcdPrefix(serviceName), clientv3.WithPrefix())
resp, err := client.Get(contexttool.TimeoutContext(requestTimeout), e.serviceEtcdPrefix(serviceName), clientv3.WithPrefix())
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -395,7 +403,7 @@ func (e *EtcdServiceRegistry) ListAllServiceInstances() (map[string]*serviceregi
e.superSpec.Name(), err)
}

resp, err := client.Get(context.Background(), e.spec.Prefix, clientv3.WithPrefix())
resp, err := client.Get(contexttool.TimeoutContext(requestTimeout), e.spec.Prefix, clientv3.WithPrefix())
if err != nil {
return nil, err
}
Expand Down
8 changes: 7 additions & 1 deletion pkg/object/eurekaserviceregistry/eurekaserviceregistry.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@ func (e *EurekaServiceRegistry) Inherit(superSpec *supervisor.Spec, previousGene
func (e *EurekaServiceRegistry) reload() {
e.serviceRegistry = e.superSpec.Super().MustGetSystemController(serviceregistry.Kind).
Instance().(*serviceregistry.ServiceRegistry)
e.serviceRegistry.RegisterRegistry(e)
e.notify = make(chan *serviceregistry.RegistryEvent, 10)
e.firstDone = false

Expand All @@ -122,6 +121,8 @@ func (e *EurekaServiceRegistry) reload() {
logger.Errorf("%s get eureka client failed: %v", e.superSpec.Name(), err)
}

e.serviceRegistry.RegisterRegistry(e)

go e.run()
}

Expand Down Expand Up @@ -201,12 +202,17 @@ func (e *EurekaServiceRegistry) update() {
e.firstDone = true
event = &serviceregistry.RegistryEvent{
SourceRegistryName: e.Name(),
UseReplace: true,
Replace: instances,
}
} else {
event = serviceregistry.NewRegistryEventFromDiff(e.Name(), e.instances, instances)
}

if event.Empty() {
return
}

e.notify <- event
e.instances = instances

Expand Down
14 changes: 14 additions & 0 deletions pkg/object/meshcontroller/master/master.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,10 @@ func (m *Master) scanInstances() (failedInstances []*spec.ServiceInstanceSpec,

now := time.Now()
for _, _spec := range specs {
if !m.isMeshRegistryName(_spec.RegistryName) {
continue
}

var status *spec.ServiceInstanceStatus
for _, s := range statuses {
if s.ServiceName == _spec.ServiceName && s.InstanceID == _spec.InstanceID {
Expand Down Expand Up @@ -186,6 +190,16 @@ func (m *Master) cleanDeadInstances() {
}
}

func (m *Master) isMeshRegistryName(registryName string) bool {
// NOTE: Empty registry name means it is a internal mesh service by default.
switch registryName {
case "", m.superSpec.Name():
return true
default:
return false
}
}

func (m *Master) handleRebornInstances(rebornInstances []*spec.ServiceInstanceSpec) {
m.updateInstanceStatus(rebornInstances, spec.ServiceStatusUp)
}
Expand Down
8 changes: 7 additions & 1 deletion pkg/object/nacosserviceregistry/nacosserviceregistry.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,6 @@ func (n *NacosServiceRegistry) Inherit(superSpec *supervisor.Spec, previousGener
func (n *NacosServiceRegistry) reload() {
n.serviceRegistry = n.superSpec.Super().MustGetSystemController(serviceregistry.Kind).
Instance().(*serviceregistry.ServiceRegistry)
n.serviceRegistry.RegisterRegistry(n)
n.notify = make(chan *serviceregistry.RegistryEvent, 10)
n.firstDone = false

Expand All @@ -152,6 +151,8 @@ func (n *NacosServiceRegistry) reload() {
logger.Errorf("%s get nacos client failed: %v", n.superSpec.Name(), err)
}

n.serviceRegistry.RegisterRegistry(n)

go n.run()
}

Expand Down Expand Up @@ -264,12 +265,17 @@ func (n *NacosServiceRegistry) update() {
n.firstDone = true
event = &serviceregistry.RegistryEvent{
SourceRegistryName: n.Name(),
UseReplace: true,
Replace: instances,
}
} else {
event = serviceregistry.NewRegistryEventFromDiff(n.Name(), n.instances, instances)
}

if event.Empty() {
return
}

n.notify <- event
n.instances = instances

Expand Down
Loading

0 comments on commit 587520a

Please sign in to comment.