Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat:load balancer strategy mode and add consistent hash #373

Merged
merged 8 commits into from
Mar 25, 2022
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions pkg/adapter/springcloud/cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,8 +293,7 @@ func (a *CloudAdapter) fetchCompareAndSet() {
// first remove the router for removed cluster
for _, c := range oldStore.Config {
if !newStore.HasCluster(c.Name) {
delete := &model.Router{ID: c.Name}
rm.DeleteRouter(delete)
rm.DeleteRouter(&model.Router{ID: c.Name})
}
}
// second set cluster
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (z *zkAppListener) watch() {
if err != nil {
failTimes++
logger.Infof("watching (path{%s}) = error{%v}", z.servicesPath, err)
if err == gzk.ErrNilNode {
if err == zk.ErrNoNode {
logger.Errorf("watching (path{%s}) got errNilNode,so exit listen", z.servicesPath)
return
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,10 @@ func (asl *applicationServiceListener) WatchAndHandle() {
if err != nil {
failTimes++
logger.Infof("watching (path{%s}) = error{%v}", asl.servicePath, err)
if err == gzk.ErrNilChildren {
if err == zk.ErrNoChildrenForEphemerals {
return
}
if err == gzk.ErrNilNode {
if err == zk.ErrNoNode {
logger.Errorf("watching (path{%s}) got errNilNode,so exit listen", asl.servicePath)
return
}
Expand Down
9 changes: 4 additions & 5 deletions pkg/adapter/xds/cds.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,19 +141,18 @@ func (c *CdsManager) makeCluster(cluster *xdspb.Cluster) *model.Cluster {
TypeStr: cluster.TypeStr,
Type: c.makeClusterType(cluster),
EdsClusterConfig: c.makeEdsClusterConfig(cluster.EdsClusterConfig),
LbStr: cluster.LbStr,
Lb: c.makeLoadBalancePolicy(cluster),
LbStr: c.makeLoadBalancePolicy(cluster.LbStr),
HealthChecks: c.makeHealthChecks(cluster.HealthChecks),
Endpoints: c.makeEndpoints(cluster.Endpoints),
}
}

func (c *CdsManager) makeLoadBalancePolicy(cluster *xdspb.Cluster) model.LbPolicy {
return model.LbPolicy(model.LbPolicyValue[cluster.LbStr])
func (c *CdsManager) makeLoadBalancePolicy(lb string) model.LbPolicyType {
return model.LbPolicyTypeValue[lb]
}

func (c *CdsManager) makeClusterType(cluster *xdspb.Cluster) model.DiscoveryType {
return model.DiscoveryType(model.DiscoveryTypeValue[cluster.TypeStr])
return model.DiscoveryTypeValue[cluster.TypeStr]
}

func (c *CdsManager) makeEndpoints(endpoint *xdspb.Endpoint) []*model.Endpoint {
Expand Down
36 changes: 36 additions & 0 deletions pkg/cluster/loadbalancer/load_balancer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package loadbalancer

import (
"github.com/apache/dubbo-go-pixiu/pkg/model"
)

type LoadBalancer interface {
Handler(c *model.Cluster) *model.Endpoint
}

// LoadBalancerStrategy load balancer strategy mode
var LoadBalancerStrategy = map[model.LbPolicyType]LoadBalancer{}

func RegisterLoadBalancer(name model.LbPolicyType, balancer LoadBalancer) {
if _, ok := LoadBalancerStrategy[name]; ok {
panic("load balancer register fail " + name)
}
LoadBalancerStrategy[name] = balancer
}
37 changes: 37 additions & 0 deletions pkg/cluster/loadbalancer/rand/load_balancer_rand.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package rand

import (
"math/rand"
)

import (
"github.com/apache/dubbo-go-pixiu/pkg/cluster/loadbalancer"
"github.com/apache/dubbo-go-pixiu/pkg/model"
)

func init() {
loadbalancer.RegisterLoadBalancer(model.LoadBalancerRand, Rand{})
}

type Rand struct{}

func (Rand) Handler(c *model.Cluster) *model.Endpoint {
return c.Endpoints[rand.Intn(len(c.Endpoints))]
}
39 changes: 39 additions & 0 deletions pkg/cluster/loadbalancer/roundrobin/round_robin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package roundrobin

import (
"github.com/apache/dubbo-go-pixiu/pkg/cluster/loadbalancer"
"github.com/apache/dubbo-go-pixiu/pkg/model"
)

func init() {
loadbalancer.RegisterLoadBalancer(model.LoadBalancerRoundRobin, RoundRobin{})
}

type RoundRobin struct{}

func (RoundRobin) Handler(c *model.Cluster) *model.Endpoint {
lens := len(c.Endpoints)
if c.PrePickEndpointIndex >= lens {
c.PrePickEndpointIndex = 0
}
e := c.Endpoints[c.PrePickEndpointIndex]
c.PrePickEndpointIndex = (c.PrePickEndpointIndex + 1) % lens
return e
}
19 changes: 2 additions & 17 deletions pkg/config/config_load.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,21 +150,6 @@ func GetLoadBalance(cfg *model.Bootstrap) (err error) {
logger.Error("Bootstrap configuration is null")
return err
}
var lbPolicy int32
for _, c := range cfg.StaticResources.Clusters {
flag := true
if c.TypeStr != "" {
if t, ok := model.LbPolicyValue[c.LbStr]; ok {
lbPolicy = t
flag = false
}
}
if flag {
c.LbStr = constant.DefaultLoadBalanceType
lbPolicy = model.LbPolicyValue[c.LbStr]
}
c.Lb = model.LbPolicy(lbPolicy)
}
return nil
}

Expand All @@ -173,7 +158,7 @@ func GetDiscoveryType(cfg *model.Bootstrap) (err error) {
logger.Error("Bootstrap configuration is null")
return err
}
var discoveryType int32
var discoveryType model.DiscoveryType
for _, c := range cfg.StaticResources.Clusters {
flag := true
if c.TypeStr != "" {
Expand All @@ -186,7 +171,7 @@ func GetDiscoveryType(cfg *model.Bootstrap) (err error) {
c.TypeStr = constant.DefaultDiscoveryType
discoveryType = model.DiscoveryTypeValue[c.TypeStr]
}
c.Type = model.DiscoveryType(discoveryType)
c.Type = discoveryType
}
return nil
}
70 changes: 19 additions & 51 deletions pkg/model/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,8 @@

package model

import (
"math/rand"
)

const (
Static DiscoveryType = 0 + iota
Static DiscoveryType = iota
StrictDNS
LogicalDns
EDS
Expand All @@ -31,21 +27,21 @@ const (

var (
// DiscoveryTypeName
DiscoveryTypeName = map[int32]string{
0: "Static",
1: "StrictDNS",
2: "LogicalDns",
3: "EDS",
4: "OriginalDst",
DiscoveryTypeName = map[DiscoveryType]string{
Static: "Static",
StrictDNS: "StrictDNS",
LogicalDns: "LogicalDns",
EDS: "EDS",
OriginalDst: "OriginalDst",
}

// DiscoveryTypeValue
DiscoveryTypeValue = map[string]int32{
"Static": 0,
"StrictDNS": 1,
"LogicalDns": 2,
"EDS": 3,
"OriginalDst": 4,
DiscoveryTypeValue = map[string]DiscoveryType{
"Static": Static,
"StrictDNS": StrictDNS,
"LogicalDns": LogicalDns,
"EDS": EDS,
"OriginalDst": OriginalDst,
}
)

Expand All @@ -56,11 +52,10 @@ type (
TypeStr string `yaml:"type" json:"type"` // Type the cluster discovery type string value
Type DiscoveryType `yaml:"-" json:"-"` // Type the cluster discovery type
EdsClusterConfig EdsClusterConfig `yaml:"eds_cluster_config" json:"eds_cluster_config" mapstructure:"eds_cluster_config"`
LbStr string `yaml:"lb_policy" json:"lb_policy"` // Lb the cluster select node used loadBalance policy
Lb LbPolicy `yaml:",omitempty" json:",omitempty"` // Lb the cluster select node used loadBalance policy
LbStr LbPolicyType `yaml:"lb_policy" json:"lb_policy"` // Lb the cluster select node used loadBalance policy
HealthChecks []HealthCheck `yaml:"health_checks" json:"health_checks"`
Endpoints []*Endpoint `yaml:"endpoints" json:"endpoints"`
prePickEndpointIndex int
PrePickEndpointIndex int
}

// EdsClusterConfig todo remove un-used EdsClusterConfig
Expand All @@ -83,36 +78,9 @@ type (

// Endpoint
Endpoint struct {
ID string `yaml:"ID" json:"ID"` // ID indicate one endpoint
Name string `yaml:"name" json:"name"` // Name the cluster unique name
Address SocketAddress `yaml:"socket_address" json:"socket_address" mapstructure:"socket_address"`
// extra info such as label or other meta data
Metadata map[string]string `yaml:"meta" json:"meta"`
ID string `yaml:"ID" json:"ID"` // ID indicate one endpoint
Name string `yaml:"name" json:"name"` // Name the cluster unique name
Address SocketAddress `yaml:"socket_address" json:"socket_address" mapstructure:"socket_address"` // Address socket address
Metadata map[string]string `yaml:"meta" json:"meta"` // Metadata extra info such as label or other meta data
}
)

func (c *Cluster) PickOneEndpoint() *Endpoint {
// TODO: add lb strategy abstraction
if c.Endpoints == nil || len(c.Endpoints) == 0 {
return nil
}

if len(c.Endpoints) == 1 {
return c.Endpoints[0]
}

if c.Lb == Rand {
return c.Endpoints[rand.Intn(len(c.Endpoints))]
} else if c.Lb == RoundRobin {

lens := len(c.Endpoints)
if c.prePickEndpointIndex >= lens {
c.prePickEndpointIndex = 0
}
e := c.Endpoints[c.prePickEndpointIndex]
c.prePickEndpointIndex = (c.prePickEndpointIndex + 1) % lens
return e
} else {
return c.Endpoints[rand.Intn(len(c.Endpoints))]
}
}
23 changes: 9 additions & 14 deletions pkg/model/lb.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,17 @@

package model

// LbPolicy the load balance policy enum
type LbPolicy int32
// LbPolicyType the load balance policy enum
type LbPolicyType string

const (
RoundRobin LbPolicy = 0
Rand LbPolicy = 3
LoadBalancerRand LbPolicyType = "Rand"
LoadBalancerRoundRobin LbPolicyType = "RoundRobin"
LoadBalanceConsistentHashing LbPolicyType = "ConsistentHashing"
)

// LbPolicyName key int32 for LbPolicy, value string
var LbPolicyName = map[int32]string{
0: "RoundRobin",
3: "Rand",
}

// LbPolicyValue key string, value int32 for LbPolicy
var LbPolicyValue = map[string]int32{
"RoundRobin": 0,
"Rand": 3,
var LbPolicyTypeValue = map[string]LbPolicyType{
"Rand": LoadBalancerRand,
"RoundRobin": LoadBalancerRoundRobin,
"ConsistentHashing": LoadBalanceConsistentHashing,
}
22 changes: 19 additions & 3 deletions pkg/server/cluster_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
)

import (
"github.com/apache/dubbo-go-pixiu/pkg/cluster/loadbalancer"
"github.com/apache/dubbo-go-pixiu/pkg/common/yaml"
"github.com/apache/dubbo-go-pixiu/pkg/logger"
"github.com/apache/dubbo-go-pixiu/pkg/model"
Expand Down Expand Up @@ -120,13 +121,28 @@ func (cm *ClusterManager) PickEndpoint(clusterName string) *model.Endpoint {

for _, cluster := range cm.store.Config {
if cluster.Name == clusterName {
// according to lb to choose one endpoint, now only random
return cluster.PickOneEndpoint()
return pickOneEndpoint(cluster)
}
}
return nil
}

func pickOneEndpoint(c *model.Cluster) *model.Endpoint {
if c.Endpoints == nil || len(c.Endpoints) == 0 {
return nil
}

if len(c.Endpoints) == 1 {
return c.Endpoints[0]
}

loadBalancer, ok := loadbalancer.LoadBalancerStrategy[c.LbStr]
if ok {
return loadBalancer.Handler(c)
}
return loadbalancer.LoadBalancerStrategy[model.LoadBalancerRand].Handler(c)
}

func (cm *ClusterManager) RemoveCluster(namesToDel []string) {
cm.rw.Lock()
defer cm.rw.Unlock()
Expand Down Expand Up @@ -194,7 +210,7 @@ func (s *ClusterStore) SetEndpoint(clusterName string, endpoint *model.Endpoint)
}

// cluster create
c := &model.Cluster{Name: clusterName, Lb: model.RoundRobin, Endpoints: []*model.Endpoint{endpoint}}
c := &model.Cluster{Name: clusterName, LbStr: model.LoadBalancerRoundRobin, Endpoints: []*model.Endpoint{endpoint}}
// not call AddCluster, because lock is not reenter
s.Config = append(s.Config, c)
}
Expand Down
Loading