Skip to content

Commit

Permalink
[mesh]: Add leader judgement for syncing
Browse files Browse the repository at this point in the history
  • Loading branch information
xxx7xxxx committed Aug 24, 2021
1 parent 4c467d3 commit ab8c9b3
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 0 deletions.
9 changes: 9 additions & 0 deletions pkg/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,15 @@ func New(opt *option.Options) (Cluster, error) {
return c, nil
}

func (c *cluster) IsLeader() bool {
server, err := c.getServer()
if err != nil {
return false
}

return server.Server.Leader() == server.Server.ID()
}

// requestContext returns context with request timeout,
// please use it immediately in case of incorrect timeout.
func (c *cluster) requestContext() context.Context {
Expand Down
2 changes: 2 additions & 0 deletions pkg/cluster/cluster_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import (
type (
// Cluster is the open cluster interface.
Cluster interface {
IsLeader() bool

Layout() *Layout

Get(key string) (*string, error)
Expand Down
13 changes: 13 additions & 0 deletions pkg/object/meshcontroller/master/registrysyncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ func (rs *registrySyncer) run() {
case <-rs.done:
return
case event := <-rs.registryWatcher.Watch():
if !rs.needSync() {
continue
}

rs.handleEvent(event)
}
}
Expand Down Expand Up @@ -133,6 +137,10 @@ func (rs *registrySyncer) handleEvent(event *serviceregistry.RegistryEvent) {
}

func (rs *registrySyncer) serviceInstanceSpecsFunc(meshInstances map[string]*spec.ServiceInstanceSpec) bool {
if !rs.needSync() {
return true
}

oldInstances, err := rs.serviceRegistry.ListAllServiceInstances(rs.spec.ExternalServiceRegistry)
if err != nil {
logger.Errorf("list all service instances of %s: %v", rs.spec.ExternalServiceRegistry, err)
Expand Down Expand Up @@ -163,6 +171,11 @@ func (rs *registrySyncer) serviceInstanceSpecsFunc(meshInstances map[string]*spe
return true
}

func (rs *registrySyncer) needSync() bool {
// NOTE: Only need one member in the cluster to do sync.
return rs.superSpec.Super().Cluster().IsLeader()
}

func (rs *registrySyncer) meshRegistryName() string {
return rs.superSpec.Name()
}
Expand Down

0 comments on commit ab8c9b3

Please sign in to comment.