Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor LoadBalancerPool to use cloud listing snapshotter #460

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/backends/backends.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,8 @@ func (b *Backends) List() ([]interface{}, error) {
return nil, err
}
var ret []interface{}
for _, _ = range backends {
ret = append(ret, true)
for _, x := range backends {
ret = append(ret, x)
}
return ret, nil
}
24 changes: 13 additions & 11 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func NewLoadBalancerController(
hasSynced: ctx.HasSynced,
nodes: NewNodeController(ctx, instancePool),
instancePool: instancePool,
l7Pool: loadbalancers.NewLoadBalancerPool(ctx.Cloud, ctx.ClusterNamer),
l7Pool: loadbalancers.NewLoadBalancerPool(ctx.Cloud, ctx.ClusterNamer, true),
backendSyncer: backends.NewBackendSyncer(backendPool, healthChecker, ctx.ClusterNamer, ctx.BackendConfigEnabled),
negLinker: backends.NewNEGLinker(backendPool, ctx.Cloud, ctx.ClusterNamer),
igLinker: backends.NewInstanceGroupLinker(instancePool, backendPool, ctx.ClusterNamer),
Expand Down Expand Up @@ -369,9 +369,12 @@ func (lbc *LoadBalancerController) SyncLoadBalancer(state interface{}) error {
}

// Create higher-level LB resources.
if err := lbc.l7Pool.Sync(lb); err != nil {
l7, err := lbc.l7Pool.Sync(lb)
if err != nil {
return err
}
syncState.l7 = l7

return nil
}

Expand Down Expand Up @@ -400,16 +403,12 @@ func (lbc *LoadBalancerController) PostProcess(state interface{}) error {

// Get the loadbalancer and update the ingress status.
ing := syncState.ing
k, err := utils.KeyFunc(ing)
if err != nil {
return fmt.Errorf("cannot get key for Ingress %s/%s: %v", ing.Namespace, ing.Name, err)
}

l7, err := lbc.l7Pool.Get(k)
if err != nil {
return fmt.Errorf("unable to get loadbalancer: %v", err)
if syncState.l7 == nil {
return fmt.Errorf("sync state does not contain L7 spec")
}
if err := lbc.updateIngressStatus(l7, ing); err != nil {

if err := lbc.updateIngressStatus(syncState.l7, ing); err != nil {
return fmt.Errorf("update ingress status error: %v", err)
}
return nil
Expand Down Expand Up @@ -449,7 +448,10 @@ func (lbc *LoadBalancerController) sync(key string) error {

// Bootstrap state for GCP sync.
urlMap, errs := lbc.Translator.TranslateIngress(ing, lbc.ctx.DefaultBackendSvcPortID)
syncState := &syncState{urlMap, ing}
syncState := &syncState{
urlMap: urlMap,
ing: ing,
}
if errs != nil {
return fmt.Errorf("error while evaluating the ingress spec: %v", utils.JoinErrs(errs))
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func newLoadBalancerController() *LoadBalancerController {
lbc := NewLoadBalancerController(ctx, stopCh)
// TODO(rramkumar): Fix this so we don't have to override with our fake
lbc.instancePool = instances.NewNodePool(instances.NewFakeInstanceGroups(sets.NewString(), namer), namer)
lbc.l7Pool = loadbalancers.NewLoadBalancerPool(loadbalancers.NewFakeLoadBalancers(clusterUID, namer), namer)
lbc.l7Pool = loadbalancers.NewLoadBalancerPool(loadbalancers.NewFakeLoadBalancers(clusterUID, namer), namer, false)
lbc.instancePool.Init(&instances.FakeZoneLister{Zones: []string{"zone-a"}})

lbc.hasSynced = func() bool { return true }
Expand Down
2 changes: 2 additions & 0 deletions pkg/controller/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package controller

import (
"k8s.io/ingress-gce/pkg/loadbalancers"
"k8s.io/ingress-gce/pkg/utils"

extensions "k8s.io/api/extensions/v1beta1"
Expand All @@ -31,5 +32,6 @@ type gcState struct {
// syncState is used by the controller to maintain state for routines that sync GCP resources of an Ingress.
type syncState struct {
urlMap *utils.GCEURLMap
l7 *loadbalancers.L7
ing *extensions.Ingress
}
6 changes: 4 additions & 2 deletions pkg/loadbalancers/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,12 @@ type LoadBalancers interface {

// LoadBalancerPool is an interface to manage the cloud resources associated
// with a gce loadbalancer.
// TODO(rramkumar): Break up this interface into 2: Pool & Syncer.
type LoadBalancerPool interface {
Get(name string) (*L7, error)
// Note: Get is currrently only used for testing.
Get(name string) bool
Delete(name string) error
Sync(ri *L7RuntimeInfo) error
Sync(ri *L7RuntimeInfo) (*L7, error)
GC(names []string) error
Shutdown() error
}
93 changes: 42 additions & 51 deletions pkg/loadbalancers/l7.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,68 +176,59 @@ func (l *L7) GetIP() string {
return ""
}

// Cleanup deletes resources specific to this l7 in the right order.
// Cleanup deletes resources specific to this l7 in the right order given a base name.
// forwarding rule -> target proxy -> url map
// This leaves backends and health checks, which are shared across loadbalancers.
func (l *L7) Cleanup() error {
if l.fw != nil {
glog.V(2).Infof("Deleting global forwarding rule %v", l.fw.Name)
if err := utils.IgnoreHTTPNotFound(l.cloud.DeleteGlobalForwardingRule(l.fw.Name)); err != nil {
return err
}
l.fw = nil
func Cleanup(name string, cloud LoadBalancers, namer *utils.Namer) error {
fwName := namer.ForwardingRule(name, "HTTP")
glog.V(2).Infof("Deleting global forwarding rule %v", fwName)
if err := utils.IgnoreHTTPNotFound(cloud.DeleteGlobalForwardingRule(fwName)); err != nil {
return err
}
if l.fws != nil {
glog.V(2).Infof("Deleting global forwarding rule %v", l.fws.Name)
if err := utils.IgnoreHTTPNotFound(l.cloud.DeleteGlobalForwardingRule(l.fws.Name)); err != nil {
return err
}
l.fws = nil
fwsName := namer.ForwardingRule(name, "HTTPS")
glog.V(2).Infof("Deleting global forwarding rule %v", fwsName)
if err := utils.IgnoreHTTPNotFound(cloud.DeleteGlobalForwardingRule(fwsName)); err != nil {
return err
}
if l.ip != nil {
glog.V(2).Infof("Deleting static IP %v(%v)", l.ip.Name, l.ip.Address)
if err := utils.IgnoreHTTPNotFound(l.cloud.DeleteGlobalAddress(l.ip.Name)); err != nil {
return err
}
l.ip = nil

staticIPName := fwName
glog.V(2).Infof("Deleting static IP %v", staticIPName)
if err := utils.IgnoreHTTPNotFound(cloud.DeleteGlobalAddress(staticIPName)); err != nil {
return err
}
if l.tps != nil {
glog.V(2).Infof("Deleting target https proxy %v", l.tps.Name)
if err := utils.IgnoreHTTPNotFound(l.cloud.DeleteTargetHttpsProxy(l.tps.Name)); err != nil {
return err
}
l.tps = nil

tpsName := namer.TargetProxy(name, "HTTPS")
glog.V(2).Infof("Deleting target https proxy %v", tpsName)
if err := utils.IgnoreHTTPNotFound(cloud.DeleteTargetHttpsProxy(tpsName)); err != nil {
return err
}
// Delete the SSL cert if it is from a secret, not referencing a pre-created GCE cert.
if len(l.sslCerts) != 0 && l.runtimeInfo.TLSName == "" {
var certErr error
for _, cert := range l.sslCerts {
glog.V(2).Infof("Deleting sslcert %s", cert.Name)
if err := utils.IgnoreHTTPNotFound(l.cloud.DeleteSslCertificate(cert.Name)); err != nil {
glog.Errorf("Old cert delete failed - %v", err)
certErr = err
}

}
l.sslCerts = nil
if certErr != nil {
return certErr
}
certs, err := cloud.ListSslCertificates()
if err != nil {
return err
}
if l.tp != nil {
glog.V(2).Infof("Deleting target http proxy %v", l.tp.Name)
if err := utils.IgnoreHTTPNotFound(l.cloud.DeleteTargetHttpProxy(l.tp.Name)); err != nil {
return err
for _, c := range certs {
// Delete the SSL cert if it is from a secret, not referencing a pre-created GCE cert.
if namer.IsCertUsedForLB(name, c.Name) {
glog.V(2).Infof("Deleting sslcert %s", c.Name)
if err := utils.IgnoreHTTPNotFound(cloud.DeleteSslCertificate(c.Name)); err != nil {
return err
}
}
l.tp = nil
}
if l.um != nil {
glog.V(2).Infof("Deleting url map %v", l.um.Name)
if err := utils.IgnoreHTTPNotFound(l.cloud.DeleteUrlMap(l.um.Name)); err != nil {
return err
}
l.um = nil

tpName := namer.TargetProxy(name, "HTTP")
glog.V(2).Infof("Deleting target http proxy %v", tpName)
if err := utils.IgnoreHTTPNotFound(cloud.DeleteTargetHttpProxy(tpName)); err != nil {
return err
}

umName := namer.UrlMap(name)
glog.V(2).Infof("Deleting url map %v", umName)
if err := utils.IgnoreHTTPNotFound(cloud.DeleteUrlMap(umName)); err != nil {
return err
}

return nil
}

Expand Down
90 changes: 53 additions & 37 deletions pkg/loadbalancers/l7s.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@ package loadbalancers

import (
"fmt"
"reflect"
"time"

"github.com/golang/glog"
compute "google.golang.org/api/compute/v1"

"k8s.io/apimachinery/pkg/util/sets"

Expand All @@ -43,39 +44,45 @@ func (l *L7s) Namer() *utils.Namer {
// NewLoadBalancerPool returns a new loadbalancer pool.
// - cloud: implements LoadBalancers. Used to sync L7 loadbalancer resources
// with the cloud.
func NewLoadBalancerPool(cloud LoadBalancers, namer *utils.Namer) LoadBalancerPool {
return &L7s{cloud, storage.NewInMemoryPool(), namer}
func NewLoadBalancerPool(cloud LoadBalancers, namer *utils.Namer, resyncWithCloud bool) LoadBalancerPool {
l7Pool := &L7s{
cloud: cloud,
namer: namer,
}
if !resyncWithCloud {
l7Pool.snapshotter = storage.NewInMemoryPool()
}
keyFunc := func(i interface{}) (string, error) {
um := i.(*compute.UrlMap)
if !namer.NameBelongsToCluster(um.Name) {
return "", fmt.Errorf("unrecognized name %v", um.Name)
}
// Scrub out the UrlMap prefix of the name to get the base LB name.
return namer.ScrubUrlMapPrefix(um.Name), nil
}
l7Pool.snapshotter = storage.NewCloudListingPool("loadbalancers", keyFunc, l7Pool, 30*time.Second)
return l7Pool
}

// Get returns the loadbalancer by name.
func (l *L7s) Get(name string) (*L7, error) {
// Get implements LoadBalancerPool.
// Note: This is currently only used for testing.
func (l *L7s) Get(name string) bool {
name = l.namer.LoadBalancer(name)
lb, exists := l.snapshotter.Get(name)
if !exists {
return nil, fmt.Errorf("loadbalancer %v not in pool", name)
}
return lb.(*L7), nil
_, exists := l.snapshotter.Get(name)
return exists
}

// Sync a load balancer with the given runtime info from the controller.
func (l *L7s) Sync(ri *L7RuntimeInfo) error {
// Sync implements LoadBalancerPool.
func (l *L7s) Sync(ri *L7RuntimeInfo) (*L7, error) {
name := l.namer.LoadBalancer(ri.Name)

lb, _ := l.Get(name)
if lb == nil {
glog.V(3).Infof("Creating l7 %v", name)
lb = &L7{
runtimeInfo: ri,
Name: l.namer.LoadBalancer(ri.Name),
cloud: l.cloud,
namer: l.namer,
}
} else {
if !reflect.DeepEqual(lb.runtimeInfo, ri) {
glog.V(3).Infof("LB %v runtime info changed, old %+v new %+v", lb.Name, lb.runtimeInfo, ri)
lb.runtimeInfo = ri
}
glog.V(3).Infof("Sync: LB %s", name)
lb := &L7{
runtimeInfo: ri,
Name: l.namer.LoadBalancer(ri.Name),
cloud: l.cloud,
namer: l.namer,
}

// Add the lb to the pool, in case we create an UrlMap but run out
// of quota in creating the ForwardingRule we still need to cleanup
// the UrlMap during GC.
Expand All @@ -86,28 +93,24 @@ func (l *L7s) Sync(ri *L7RuntimeInfo) error {
// make it exist we need to create a collection of gce resources, done
// through the edge hop.
if err := lb.edgeHop(); err != nil {
return err
return lb, err
}

return nil
return lb, nil
}

// Delete deletes a load balancer by name.
// Delete implements LoadBalancerPool.
func (l *L7s) Delete(name string) error {
name = l.namer.LoadBalancer(name)
lb, err := l.Get(name)
if err != nil {
return err
}
glog.V(3).Infof("Deleting lb %v", name)
if err := lb.Cleanup(); err != nil {
if err := Cleanup(name, l.cloud, l.namer); err != nil {
return err
}
l.snapshotter.Delete(name)
return nil
}

// GC garbage collects loadbalancers not in the input list.
// GC implements LoadBalancerPool.
func (l *L7s) GC(names []string) error {
glog.V(4).Infof("GC(%v)", names)

Expand All @@ -131,11 +134,24 @@ func (l *L7s) GC(names []string) error {
return nil
}

// Shutdown logs whether or not the pool is empty.
// Shutdown implemented LoadBalancerPool.
func (l *L7s) Shutdown() error {
if err := l.GC([]string{}); err != nil {
return err
}
glog.V(2).Infof("Loadbalancer pool shutdown.")
return nil
}

// List lists all loadbalancers via listing all URLMap's.
func (l *L7s) List() ([]interface{}, error) {
urlMaps, err := l.cloud.ListUrlMaps()
if err != nil {
return nil, err
}
var ret []interface{}
for _, x := range urlMaps {
ret = append(ret, x)
}
return ret, nil
}
Loading