From 587520a48f9f69a26cb8eb89f83aa13baab81579 Mon Sep 17 00:00:00 2001 From: Yun Long Date: Thu, 19 Aug 2021 01:03:25 +0800 Subject: [PATCH] [mesh]: Add more checking --- .../config/etcd-service-registry-example.yaml | 4 +- example/config/http-pipeline-example.yaml | 2 + example/config/mesh-controller-example.yaml | 3 +- example/config/server-001-instance-001.yaml | 7 +++ example/config/server-001-instance-002.yaml | 7 +++ pkg/cluster/cluster.go | 15 ++---- pkg/filter/proxy/pool.go | 6 +++ pkg/filter/proxy/server.go | 26 +++++++--- .../consulserviceregistry.go | 8 +++- .../etcdserviceregistry.go | 24 ++++++---- .../eurekaserviceregistry.go | 8 +++- pkg/object/meshcontroller/master/master.go | 14 ++++++ .../nacosserviceregistry.go | 8 +++- pkg/object/serviceregistry/service.go | 25 ++++++++-- pkg/object/serviceregistry/serviceregistry.go | 30 ++++++++++-- pkg/object/serviceregistry/watcher.go | 47 +++++++++++++++++-- .../zookeeperserviceregistry.go | 10 +++- pkg/util/contexttool/contexttool.go | 16 +++++++ 18 files changed, 210 insertions(+), 50 deletions(-) create mode 100644 example/config/server-001-instance-001.yaml create mode 100644 example/config/server-001-instance-002.yaml create mode 100644 pkg/util/contexttool/contexttool.go diff --git a/example/config/etcd-service-registry-example.yaml b/example/config/etcd-service-registry-example.yaml index 785f869821..ee2f4cf67e 100644 --- a/example/config/etcd-service-registry-example.yaml +++ b/example/config/etcd-service-registry-example.yaml @@ -1,5 +1,5 @@ kind: EtcdServiceRegistry name: etcd-service-registry-example -endpoints: ['127.0.0.1:12379'] +endpoints: ['127.0.0.1:2379'] prefix: "/services/" -cacheTimeout: 10s +cacheTimeout: 5s diff --git a/example/config/http-pipeline-example.yaml b/example/config/http-pipeline-example.yaml index 933b2190b9..d3ce81f638 100644 --- a/example/config/http-pipeline-example.yaml +++ b/example/config/http-pipeline-example.yaml @@ -101,6 +101,8 @@ filters: X-Filter: exact: candidate mainPool: + serviceRegistry: etcd-service-registry-example + serviceName: service-001 serversTags: ["v2"] servers: - url: http://127.0.0.1:9095 diff --git a/example/config/mesh-controller-example.yaml b/example/config/mesh-controller-example.yaml index f38d37fc50..49c58dec3a 100644 --- a/example/config/mesh-controller-example.yaml +++ b/example/config/mesh-controller-example.yaml @@ -1,6 +1,5 @@ name: mesh-controller-example kind: MeshController -specUpdateInterval: 10s heartbeatInterval: 5s registryType: consul -serviceName: service-001 +externalServiceRegistry: etcd-service-registry-example diff --git a/example/config/server-001-instance-001.yaml b/example/config/server-001-instance-001.yaml new file mode 100644 index 0000000000..b2a53b908f --- /dev/null +++ b/example/config/server-001-instance-001.yaml @@ -0,0 +1,7 @@ +registryName: etcd-service-registry-example +serviceName: service-001 +instanceID: intance-001 +scheme: http +hostIP: 127.0.0.1 +port: 9091 +tags: [v1] diff --git a/example/config/server-001-instance-002.yaml b/example/config/server-001-instance-002.yaml new file mode 100644 index 0000000000..f6f307dc16 --- /dev/null +++ b/example/config/server-001-instance-002.yaml @@ -0,0 +1,7 @@ +registryName: etcd-service-registry-example +serviceName: service-001 +instanceID: intance-002 +scheme: +hostIP: 127.0.0.1 +port: 9092 +tags: [v2] diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 3bc0f49d40..1af0ca2ead 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -33,6 +33,7 @@ import ( "github.com/megaease/easegress/pkg/logger" "github.com/megaease/easegress/pkg/option" + "github.com/megaease/easegress/pkg/util/contexttool" ) const ( @@ -165,23 +166,13 @@ func New(opt *option.Options) (Cluster, error) { // requestContext returns context with request timeout, // please use it immediately in case of incorrect timeout. func (c *cluster) requestContext() context.Context { - ctx, cancel := context.WithTimeout(context.Background(), c.requestTimeout) - go func() { - time.Sleep(c.requestTimeout) - cancel() - }() - return ctx + return contexttool.TimeoutContext(c.requestTimeout) } // longRequestContext takes 3 times longer than requestContext. func (c *cluster) longRequestContext() context.Context { requestTimeout := 3 * c.requestTimeout - ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) - go func() { - time.Sleep(requestTimeout) - cancel() - }() - return ctx + return contexttool.TimeoutContext(requestTimeout) } func (c *cluster) run() { diff --git a/pkg/filter/proxy/pool.go b/pkg/filter/proxy/pool.go index 04c1bcfc90..965cd06000 100644 --- a/pkg/filter/proxy/pool.go +++ b/pkg/filter/proxy/pool.go @@ -129,6 +129,12 @@ func (p *pool) status() *PoolStatus { } func (p *pool) handle(ctx context.HTTPContext, reqBody io.Reader) string { + // FIXME: When the request matched mirror handler, there will be + // data race warning. Because after proxy handled the request, + // the HTTPPipeline and mux of HTTPServer will call ctx.AddTag() too. + // Even the the situation is not a bug, but the cautious data race + // detector of golang will warn it. + // Add a method ctx.AddTagWithLock()? addTag := func(subPrefix, msg string) { tag := stringtool.Cat(p.tagPrefix, "#", subPrefix, ": ", msg) ctx.Lock() diff --git a/pkg/filter/proxy/server.go b/pkg/filter/proxy/server.go index df8179420c..974faadbc2 100644 --- a/pkg/filter/proxy/server.go +++ b/pkg/filter/proxy/server.go @@ -106,18 +106,18 @@ func newServers(super *supervisor.Supervisor, poolSpec *PoolSpec) *servers { return s } + s.serviceRegistry = s.super.MustGetSystemController(serviceregistry.Kind). + Instance().(*serviceregistry.ServiceRegistry) + + s.tryUseService() + s.serviceWatcher = s.serviceRegistry.NewServiceWatcher(s.poolSpec.ServiceRegistry, s.poolSpec.ServiceName) + go s.watchService() return s } func (s *servers) watchService() { - s.tryUseService() - - s.serviceRegistry = s.super.MustGetSystemController(serviceregistry.Kind). - Instance().(*serviceregistry.ServiceRegistry) - s.serviceWatcher = s.serviceRegistry.NewServiceWatcher(s.poolSpec.ServiceRegistry, s.poolSpec.ServiceName) - for { select { case <-s.done: @@ -134,12 +134,14 @@ func (s *servers) handleEvent(event *serviceregistry.ServiceEvent) { func (s *servers) tryUseService() { serviceInstanceSpecs, err := s.serviceRegistry.ListServiceInstances(s.poolSpec.ServiceRegistry, s.poolSpec.ServiceName) + if err != nil { logger.Errorf("get service %s/%s failed: %v", s.poolSpec.ServiceRegistry, s.poolSpec.ServiceName, err) s.useStaticServers() return } + s.useService(serviceInstanceSpecs) } @@ -159,9 +161,19 @@ func (s *servers) useService(serviceInstanceSpecs map[string]*serviceregistry.Se return } + staticServers := newStaticServers(servers, s.poolSpec.ServersTags, s.poolSpec.LoadBalance) + if staticServers.len() == 0 { + logger.Errorf("%s/%s: no service instance satisfy tags: %v", + s.poolSpec.ServiceRegistry, s.poolSpec.ServiceName, s.poolSpec.ServersTags) + s.useStaticServers() + } + + logger.Infof("use dynamic service: %s/%s", s.poolSpec.ServiceRegistry, s.poolSpec.ServiceName) + s.mutex.Lock() defer s.mutex.Unlock() - s.static = newStaticServers(servers, s.poolSpec.ServersTags, s.poolSpec.LoadBalance) + s.static = staticServers + } func (s *servers) useStaticServers() { diff --git a/pkg/object/consulserviceregistry/consulserviceregistry.go b/pkg/object/consulserviceregistry/consulserviceregistry.go index 0ed33a3259..385f6565bf 100644 --- a/pkg/object/consulserviceregistry/consulserviceregistry.go +++ b/pkg/object/consulserviceregistry/consulserviceregistry.go @@ -118,7 +118,6 @@ func (c *ConsulServiceRegistry) Inherit(superSpec *supervisor.Spec, previousGene func (c *ConsulServiceRegistry) reload() { c.serviceRegistry = c.superSpec.Super().MustGetSystemController(serviceregistry.Kind). Instance().(*serviceregistry.ServiceRegistry) - c.serviceRegistry.RegisterRegistry(c) c.notify = make(chan *serviceregistry.RegistryEvent, 10) c.firstDone = false @@ -130,6 +129,8 @@ func (c *ConsulServiceRegistry) reload() { logger.Errorf("%s get consul client failed: %v", c.superSpec.Name(), err) } + c.serviceRegistry.RegisterRegistry(c) + go c.run() } @@ -228,12 +229,17 @@ func (c *ConsulServiceRegistry) update() { c.firstDone = true event = &serviceregistry.RegistryEvent{ SourceRegistryName: c.Name(), + UseReplace: true, Replace: instances, } } else { event = serviceregistry.NewRegistryEventFromDiff(c.Name(), c.instances, instances) } + if event.Empty() { + return + } + c.notify <- event c.instances = instances diff --git a/pkg/object/etcdserviceregistry/etcdserviceregistry.go b/pkg/object/etcdserviceregistry/etcdserviceregistry.go index c58e8e5dcb..c23a7a4761 100644 --- a/pkg/object/etcdserviceregistry/etcdserviceregistry.go +++ b/pkg/object/etcdserviceregistry/etcdserviceregistry.go @@ -18,7 +18,6 @@ package eserviceregistry import ( - "context" "fmt" "path/filepath" "sync" @@ -27,6 +26,7 @@ import ( "github.com/megaease/easegress/pkg/logger" "github.com/megaease/easegress/pkg/object/serviceregistry" "github.com/megaease/easegress/pkg/supervisor" + "github.com/megaease/easegress/pkg/util/contexttool" clientv3 "go.etcd.io/etcd/client/v3" "gopkg.in/yaml.v2" @@ -38,6 +38,8 @@ const ( // Kind is the kind of EtcdServiceRegistry. Kind = "EtcdServiceRegistry" + + requestTimeout = 5 * time.Second ) func init() { @@ -92,7 +94,7 @@ func (e *EtcdServiceRegistry) Kind() string { func (e *EtcdServiceRegistry) DefaultSpec() interface{} { return &Spec{ Prefix: "/services/", - CacheTimeout: "60s", + CacheTimeout: "10s", } } @@ -111,7 +113,6 @@ func (e *EtcdServiceRegistry) Inherit(superSpec *supervisor.Spec, previousGenera func (e *EtcdServiceRegistry) reload() { e.serviceRegistry = e.superSpec.Super().MustGetSystemController(serviceregistry.Kind). Instance().(*serviceregistry.ServiceRegistry) - e.serviceRegistry.RegisterRegistry(e) e.firstDone = false e.notify = make(chan *serviceregistry.RegistryEvent, 10) @@ -123,6 +124,8 @@ func (e *EtcdServiceRegistry) reload() { logger.Errorf("%s get etcd client failed: %v", e.superSpec.Name(), err) } + e.serviceRegistry.RegisterRegistry(e) + go e.run() } @@ -216,12 +219,17 @@ func (e *EtcdServiceRegistry) update() { e.firstDone = true event = &serviceregistry.RegistryEvent{ SourceRegistryName: e.Name(), + UseReplace: true, Replace: instances, } } else { event = serviceregistry.NewRegistryEventFromDiff(e.Name(), e.instances, instances) } + if event.Empty() { + return + } + e.notify <- event e.instances = instances @@ -294,7 +302,7 @@ func (e *EtcdServiceRegistry) ApplyServiceInstances(instances map[string]*servic ops = append(ops, clientv3.OpPut(key, string(buff))) } - _, err = client.Txn(context.Background()).Then(ops...).Commit() + _, err = client.Txn(contexttool.TimeoutContext(requestTimeout)).Then(ops...).Commit() if err != nil { return err } @@ -316,7 +324,7 @@ func (e *EtcdServiceRegistry) DeleteServiceInstances(instances map[string]*servi ops = append(ops, clientv3.OpDelete(key)) } - _, err = client.Txn(context.Background()).Then(ops...).Commit() + _, err = client.Txn(contexttool.TimeoutContext(requestTimeout)).Then(ops...).Commit() if err != nil { return err } @@ -332,7 +340,7 @@ func (e *EtcdServiceRegistry) GetServiceInstance(serviceName, instanceID string) e.superSpec.Name(), err) } - resp, err := client.Get(context.Background(), e.serviceInstanceEtcdKeyFromRaw(serviceName, instanceID)) + resp, err := client.Get(contexttool.TimeoutContext(requestTimeout), e.serviceInstanceEtcdKeyFromRaw(serviceName, instanceID)) if err != nil { return nil, err } @@ -363,7 +371,7 @@ func (e *EtcdServiceRegistry) ListServiceInstances(serviceName string) (map[stri e.superSpec.Name(), err) } - resp, err := client.Get(context.Background(), e.serviceEtcdPrefix(serviceName), clientv3.WithPrefix()) + resp, err := client.Get(contexttool.TimeoutContext(requestTimeout), e.serviceEtcdPrefix(serviceName), clientv3.WithPrefix()) if err != nil { return nil, err } @@ -395,7 +403,7 @@ func (e *EtcdServiceRegistry) ListAllServiceInstances() (map[string]*serviceregi e.superSpec.Name(), err) } - resp, err := client.Get(context.Background(), e.spec.Prefix, clientv3.WithPrefix()) + resp, err := client.Get(contexttool.TimeoutContext(requestTimeout), e.spec.Prefix, clientv3.WithPrefix()) if err != nil { return nil, err } diff --git a/pkg/object/eurekaserviceregistry/eurekaserviceregistry.go b/pkg/object/eurekaserviceregistry/eurekaserviceregistry.go index 114d13ce31..e562a6f689 100644 --- a/pkg/object/eurekaserviceregistry/eurekaserviceregistry.go +++ b/pkg/object/eurekaserviceregistry/eurekaserviceregistry.go @@ -110,7 +110,6 @@ func (e *EurekaServiceRegistry) Inherit(superSpec *supervisor.Spec, previousGene func (e *EurekaServiceRegistry) reload() { e.serviceRegistry = e.superSpec.Super().MustGetSystemController(serviceregistry.Kind). Instance().(*serviceregistry.ServiceRegistry) - e.serviceRegistry.RegisterRegistry(e) e.notify = make(chan *serviceregistry.RegistryEvent, 10) e.firstDone = false @@ -122,6 +121,8 @@ func (e *EurekaServiceRegistry) reload() { logger.Errorf("%s get eureka client failed: %v", e.superSpec.Name(), err) } + e.serviceRegistry.RegisterRegistry(e) + go e.run() } @@ -201,12 +202,17 @@ func (e *EurekaServiceRegistry) update() { e.firstDone = true event = &serviceregistry.RegistryEvent{ SourceRegistryName: e.Name(), + UseReplace: true, Replace: instances, } } else { event = serviceregistry.NewRegistryEventFromDiff(e.Name(), e.instances, instances) } + if event.Empty() { + return + } + e.notify <- event e.instances = instances diff --git a/pkg/object/meshcontroller/master/master.go b/pkg/object/meshcontroller/master/master.go index aa3b543985..d2df474eb3 100644 --- a/pkg/object/meshcontroller/master/master.go +++ b/pkg/object/meshcontroller/master/master.go @@ -128,6 +128,10 @@ func (m *Master) scanInstances() (failedInstances []*spec.ServiceInstanceSpec, now := time.Now() for _, _spec := range specs { + if !m.isMeshRegistryName(_spec.RegistryName) { + continue + } + var status *spec.ServiceInstanceStatus for _, s := range statuses { if s.ServiceName == _spec.ServiceName && s.InstanceID == _spec.InstanceID { @@ -186,6 +190,16 @@ func (m *Master) cleanDeadInstances() { } } +func (m *Master) isMeshRegistryName(registryName string) bool { + // NOTE: Empty registry name means it is a internal mesh service by default. + switch registryName { + case "", m.superSpec.Name(): + return true + default: + return false + } +} + func (m *Master) handleRebornInstances(rebornInstances []*spec.ServiceInstanceSpec) { m.updateInstanceStatus(rebornInstances, spec.ServiceStatusUp) } diff --git a/pkg/object/nacosserviceregistry/nacosserviceregistry.go b/pkg/object/nacosserviceregistry/nacosserviceregistry.go index d980c24516..ee8ab8f20b 100644 --- a/pkg/object/nacosserviceregistry/nacosserviceregistry.go +++ b/pkg/object/nacosserviceregistry/nacosserviceregistry.go @@ -140,7 +140,6 @@ func (n *NacosServiceRegistry) Inherit(superSpec *supervisor.Spec, previousGener func (n *NacosServiceRegistry) reload() { n.serviceRegistry = n.superSpec.Super().MustGetSystemController(serviceregistry.Kind). Instance().(*serviceregistry.ServiceRegistry) - n.serviceRegistry.RegisterRegistry(n) n.notify = make(chan *serviceregistry.RegistryEvent, 10) n.firstDone = false @@ -152,6 +151,8 @@ func (n *NacosServiceRegistry) reload() { logger.Errorf("%s get nacos client failed: %v", n.superSpec.Name(), err) } + n.serviceRegistry.RegisterRegistry(n) + go n.run() } @@ -264,12 +265,17 @@ func (n *NacosServiceRegistry) update() { n.firstDone = true event = &serviceregistry.RegistryEvent{ SourceRegistryName: n.Name(), + UseReplace: true, Replace: instances, } } else { event = serviceregistry.NewRegistryEventFromDiff(n.Name(), n.instances, instances) } + if event.Empty() { + return + } + n.notify <- event n.instances = instances diff --git a/pkg/object/serviceregistry/service.go b/pkg/object/serviceregistry/service.go index e279dd728a..ef9685b85b 100644 --- a/pkg/object/serviceregistry/service.go +++ b/pkg/object/serviceregistry/service.go @@ -30,7 +30,7 @@ type ( // ServiceName is required. ServiceName string `yaml:"serviceName"` // InstanceID is required. - InstanceID string `yaml:"name"` + InstanceID string `yaml:"instanceID"` // Scheme is optional if Port is not empty. Scheme string `yaml:"scheme"` @@ -50,15 +50,30 @@ type ( // DeepCopy deep copies ServiceInstanceSpec. func (s *ServiceInstanceSpec) DeepCopy() *ServiceInstanceSpec { copy := *s + + if s.Tags != nil { + for _, tag := range s.Tags { + copy.Tags = append(copy.Tags, tag) + } + } + return © } // Validate validates itself. func (s *ServiceInstanceSpec) Validate() error { + if s.RegistryName == "" { + return fmt.Errorf("registryName is empty") + } + if s.ServiceName == "" { return fmt.Errorf("serviceName is empty") } + if s.InstanceID == "" { + return fmt.Errorf("instanceID is empty") + } + if s.Hostname == "" && s.HostIP == "" { return fmt.Errorf("both hostname and hostIP are empty") } @@ -124,18 +139,18 @@ func NewRegistryEventFromDiff(registryName string, oldSpecs, newSpecs map[string } for _, oldSpec := range oldSpecs { - _, exists := newSpecs[oldSpec.ServiceName] + _, exists := newSpecs[oldSpec.Key()] if !exists { copy := oldSpec.DeepCopy() - event.Delete[oldSpec.ServiceName] = copy + event.Delete[oldSpec.Key()] = copy } } for _, newSpec := range newSpecs { - oldSpec, exists := oldSpecs[newSpec.ServiceName] + oldSpec, exists := oldSpecs[newSpec.Key()] if exists && !reflect.DeepEqual(oldSpec, newSpec) { copy := newSpec.DeepCopy() - event.Apply[newSpec.ServiceName] = copy + event.Apply[newSpec.Key()] = copy } } diff --git a/pkg/object/serviceregistry/serviceregistry.go b/pkg/object/serviceregistry/serviceregistry.go index ba7597d45d..1451376495 100644 --- a/pkg/object/serviceregistry/serviceregistry.go +++ b/pkg/object/serviceregistry/serviceregistry.go @@ -46,7 +46,7 @@ type ( superSpec *supervisor.Spec spec *Spec - mutex sync.RWMutex + mutex sync.Mutex // The key is registry name. registryBuckets map[string]*registryBucket @@ -121,6 +121,10 @@ func newServiceBucket() *serviceBucket { } } +func (b *serviceBucket) needClean() bool { + return len(b.serviceWatchers) == 0 +} + // RegisterRegistry registers the registry and watch it. func (sr *ServiceRegistry) RegisterRegistry(registry Registry) error { sr.mutex.Lock() @@ -138,17 +142,32 @@ func (sr *ServiceRegistry) RegisterRegistry(registry Registry) error { bucket.registered, bucket.registry = true, registry - go sr.watchRegistry(bucket) + // NOTE: There will be data race warning, if it calls Notify within watchRegistry, + // even it won't cause bug. So we move it out here. + go sr.watchRegistry(registry.Notify(), bucket) return nil } -func (sr *ServiceRegistry) watchRegistry(bucket *registryBucket) { +func (sr *ServiceRegistry) watchRegistry(notify <-chan *RegistryEvent, bucket *registryBucket) { for { select { case <-bucket.done: return - case event := <-bucket.registry.Notify(): + case event := <-notify: + // Defensive programming for driver not to judge it. + if event.Empty() { + continue + } + + err := event.Validate() + if err != nil { + logger.Errorf("registry event from %v is invalid: %v", + bucket.registry.Name(), err) + continue + } + + // Defensive programming for driver not to fulfill the field. event.SourceRegistryName = bucket.registry.Name() sr.handleRegistryEvent(event) @@ -223,9 +242,10 @@ func (sr *ServiceRegistry) DeregisterRegistry(registryName string) error { SourceRegistryName: registryName, UseReplace: true, } + sr._handleRegistryEvent(cleanEvent) - bucket.registered = false + bucket.registered, bucket.registry = false, nil close(bucket.done) if bucket.needClean() { diff --git a/pkg/object/serviceregistry/watcher.go b/pkg/object/serviceregistry/watcher.go index 6953ac102d..440f517860 100644 --- a/pkg/object/serviceregistry/watcher.go +++ b/pkg/object/serviceregistry/watcher.go @@ -214,7 +214,16 @@ func (sr *ServiceRegistry) serviceWacherStopFn(registryName, serviceName, watche return } - delete(bucket.serviceBuckets, watcherID) + serviceBucket, exists := bucket.serviceBuckets[serviceName] + if !exists { + return + } + + delete(serviceBucket.serviceWatchers, watcherID) + + if serviceBucket.needClean() { + delete(bucket.serviceBuckets, serviceName) + } if bucket.needClean() { delete(sr.registryBuckets, registryName) @@ -226,10 +235,6 @@ func (sr *ServiceRegistry) generateID() string { return uuid.NewString() } -func (sr *ServiceRegistry) serviceWatchersKey(registryName, serviceName string) string { - return fmt.Sprintf("%s/%s", registryName, serviceName) -} - func (w *serviceWatcher) ID() string { return w.id } @@ -332,3 +337,35 @@ func (e *RegistryEvent) DeepCopy() *RegistryEvent { return copy } + +// Empty returns if the event contains nothing to handle. +func (e *RegistryEvent) Empty() bool { + return !e.UseReplace && len(e.Replace) == 0 && + len(e.Delete) == 0 && len(e.Apply) == 0 +} + +// Validate validates RegistryEvent. +func (e *RegistryEvent) Validate() error { + for k, v := range e.Replace { + err := v.Validate() + if err != nil { + return fmt.Errorf("replace element %v is invalid: %v", k, err) + } + } + + for k, v := range e.Delete { + err := v.Validate() + if err != nil { + return fmt.Errorf("delete element %v is invalid: %v", k, err) + } + } + + for k, v := range e.Apply { + err := v.Validate() + if err != nil { + return fmt.Errorf("apply element %v is invalid: %v", k, err) + } + } + + return nil +} diff --git a/pkg/object/zookeeperserviceregistry/zookeeperserviceregistry.go b/pkg/object/zookeeperserviceregistry/zookeeperserviceregistry.go index a219d6fbac..7ee7316b7d 100644 --- a/pkg/object/zookeeperserviceregistry/zookeeperserviceregistry.go +++ b/pkg/object/zookeeperserviceregistry/zookeeperserviceregistry.go @@ -37,6 +37,8 @@ const ( // Kind is the kind of ZookeeperServiceRegistry. Kind = "ZookeeperServiceRegistry" + + requestTimeout = 5 * time.Second ) func init() { @@ -113,7 +115,6 @@ func (zk *ZookeeperServiceRegistry) Inherit(superSpec *supervisor.Spec, previous func (zk *ZookeeperServiceRegistry) reload() { zk.serviceRegistry = zk.superSpec.Super().MustGetSystemController(serviceregistry.Kind). Instance().(*serviceregistry.ServiceRegistry) - zk.serviceRegistry.RegisterRegistry(zk) zk.notify = make(chan *serviceregistry.RegistryEvent, 10) zk.firstDone = false @@ -125,6 +126,8 @@ func (zk *ZookeeperServiceRegistry) reload() { logger.Errorf("%s get zookeeper conn failed: %v", zk.superSpec.Name(), err) } + zk.serviceRegistry.RegisterRegistry(zk) + go zk.run() } @@ -226,12 +229,17 @@ func (zk *ZookeeperServiceRegistry) update() { zk.firstDone = true event = &serviceregistry.RegistryEvent{ SourceRegistryName: zk.Name(), + UseReplace: true, Replace: instances, } } else { event = serviceregistry.NewRegistryEventFromDiff(zk.Name(), zk.instances, instances) } + if event.Empty() { + return + } + zk.notify <- event zk.instances = instances diff --git a/pkg/util/contexttool/contexttool.go b/pkg/util/contexttool/contexttool.go new file mode 100644 index 0000000000..2daad9cb48 --- /dev/null +++ b/pkg/util/contexttool/contexttool.go @@ -0,0 +1,16 @@ +package contexttool + +import ( + "context" + "time" +) + +// TimeoutContext wraps standard timeout context by calling cancel function automatically. +func TimeoutContext(timeout time.Duration) context.Context { + ctx, cancel := context.WithTimeout(context.Background(), timeout) + go func() { + time.Sleep(timeout) + cancel() + }() + return ctx +}