diff --git a/cmd/webhook/main.go b/cmd/webhook/main.go index c73903fa..12803ff0 100644 --- a/cmd/webhook/main.go +++ b/cmd/webhook/main.go @@ -115,7 +115,7 @@ func run(ctx context.Context, cfg *rest.Config, options *config.Options) error { webhookServer := server.NewWebhookServer(ctx, cfg, name, options) if err := webhookServer.RegisterValidators(ippool.NewIPPoolValidator(poolCache), - loadbalancer.NewValidator(vmiCache)); err != nil { + loadbalancer.NewValidator()); err != nil { return fmt.Errorf("failed to register ip pool validator: %w", err) } diff --git a/pkg/controller/ippool/controller.go b/pkg/controller/ippool/controller.go index a537d2c7..5126181c 100644 --- a/pkg/controller/ippool/controller.go +++ b/pkg/controller/ippool/controller.go @@ -81,11 +81,8 @@ func (h *Handler) OnRemove(_ string, ipPool *lbv1.IPPool) (*lbv1.IPPool, error) if ipPool == nil { return nil, nil } - - logrus.Debugf("IP Pool %s has been deleted", ipPool.Name) - + logrus.Infof("IP Pool %s is deleted", ipPool.Name) h.allocatorMap.Delete(ipPool.Name) - return ipPool, nil } diff --git a/pkg/controller/loadbalancer/controller.go b/pkg/controller/loadbalancer/controller.go index bdcb6d30..c5c91fe5 100644 --- a/pkg/controller/loadbalancer/controller.go +++ b/pkg/controller/loadbalancer/controller.go @@ -2,6 +2,7 @@ package loadbalancer import ( "context" + "errors" "fmt" "net" "reflect" @@ -29,12 +30,17 @@ const ( AnnotationKeyProject = lb.GroupName + "/project" AnnotationKeyNamespace = lb.GroupName + "/namespace" AnnotationKeyCluster = lb.GroupName + "/cluster" +) - defaultWaitIPTimeout = time.Second * 5 +var ( + errNoMatchedIPPool = errors.New("no matched IPPool") + errNoAvailableIP = errors.New("no available IP") + errNoRunningBackendServer = errors.New("no running backend servers") + errAllBackendServersNotHealthy = errors.New("running backend servers are not probed as healthy") ) type Handler struct { - lbClient ctllbv1.LoadBalancerClient + lbController ctllbv1.LoadBalancerController ipPoolCache ctllbv1.IPPoolCache nadCache ctlcniv1.NetworkAttachmentDefinitionCache serviceClient ctlcorev1.ServiceClient @@ -49,7 +55,7 @@ type Handler struct { } func Register(ctx context.Context, management *config.Management) error { - lbs := management.LbFactory.Loadbalancer().V1beta1().LoadBalancer() + lbc := management.LbFactory.Loadbalancer().V1beta1().LoadBalancer() pools := management.LbFactory.Loadbalancer().V1beta1().IPPool() nads := management.CniFactory.K8s().V1().NetworkAttachmentDefinition() services := management.CoreFactory.Core().V1().Service() @@ -57,7 +63,7 @@ func Register(ctx context.Context, management *config.Management) error { vmis := management.KubevirtFactory.Kubevirt().V1().VirtualMachineInstance() handler := &Handler{ - lbClient: lbs, + lbController: lbc, ipPoolCache: pools.Cache(), nadCache: nads.Cache(), serviceClient: services, @@ -71,8 +77,14 @@ func Register(ctx context.Context, management *config.Management) error { lbManager: management.LBManager, } - lbs.OnChange(ctx, controllerName, handler.OnChange) - lbs.OnRemove(ctx, controllerName, handler.OnRemove) + // NOTE: register the health check hander BEFORE the controller starts working + // no mutex is used to protect + if err := handler.lbManager.RegisterHealthCheckHandler(handler.HealthCheckNotify); err != nil { + return err + } + + lbc.OnChange(ctx, controllerName, handler.OnChange) + lbc.OnRemove(ctx, controllerName, handler.OnRemove) return nil } @@ -81,43 +93,118 @@ func (h *Handler) OnChange(_ string, lb *lbv1.LoadBalancer) (*lbv1.LoadBalancer, if lb == nil || lb.DeletionTimestamp != nil || lb.APIVersion != lbv1.SchemeGroupVersion.String() { return nil, nil } - logrus.Debugf("load balancer %s/%s has been changed, spec: %+v, apiVersion: %s", lb.Namespace, lb.Name, lb.Spec, lb.APIVersion) + logrus.Debugf("lb %s/%s is changed, spec: %+v, apiVersion: %s", lb.Namespace, lb.Name, lb.Spec, lb.APIVersion) lbCopy := lb.DeepCopy() - allocatedAddress, err := h.allocateIP(lb) - if err != nil { - err = fmt.Errorf("allocate ip for lb %s/%s failed, error: %w", lb.Namespace, lb.Name, err) - return h.updateStatus(lbCopy, lb, err) - } - if allocatedAddress != nil { - lbCopy.Status.AllocatedAddress = *allocatedAddress + + // 1. ensure lb get an address + if lb, err := h.ensureAllocatedAddress(lbCopy, lb); err != nil { + return h.handleError(lbCopy, lb, err) } + + // 2. ensure lb's implementation when it is VM type // The workload type defaults to VM if not specified to be compatible with previous versions if lb.Spec.WorkloadType == lbv1.VM || lb.Spec.WorkloadType == "" { - if err = h.ensureVMLoadBalancer(lbCopy); err != nil { - return h.updateStatus(lbCopy, lb, err) + if lb, err := h.ensureVMLoadBalancer(lbCopy, lb); err != nil { + return h.handleError(lbCopy, lb, err) } } + // move lb to Ready return h.updateStatus(lbCopy, lb, nil) } -func (h *Handler) ensureVMLoadBalancer(lb *lbv1.LoadBalancer) error { +func (h *Handler) OnRemove(_ string, lb *lbv1.LoadBalancer) (*lbv1.LoadBalancer, error) { + if lb == nil { + return nil, nil + } + logrus.Infof("lb %s/%s is deleted, address %s, allocatedIP %s", lb.Namespace, lb.Name, lb.Status.Address, lb.Status.AllocatedAddress.IP) + + if lb.Spec.IPAM == lbv1.Pool && lb.Status.AllocatedAddress.IPPool != "" { + if err := h.releaseIP(lb); err != nil { + logrus.Infof("lb %s/%s fail to release ip %s, error: %s", lb.Namespace, lb.Name, lb.Status.AllocatedAddress.IP, err.Error()) + return nil, fmt.Errorf("fail to release ip %s, error: %w", lb.Status.AllocatedAddress.IP, err) + } + logrus.Debugf("lb %s/%s release ip %s", lb.Namespace, lb.Name, lb.Status.AllocatedAddress.IP) + } + + if lb.Spec.WorkloadType == lbv1.VM || lb.Spec.WorkloadType == "" { + if err := h.lbManager.DeleteLoadBalancer(lb); err != nil { + logrus.Infof("lb %s/%s fail to delete service, error: %s", lb.Namespace, lb.Name, err.Error()) + return nil, fmt.Errorf("fail to delete service, error: %w", err) + } + logrus.Debugf("lb %s/%s delete service", lb.Namespace, lb.Name) + } + + return lb, nil +} + +func (h *Handler) handleError(lbCopy, lb *lbv1.LoadBalancer, err error) (*lbv1.LoadBalancer, error) { + // handle customized error + if errors.Is(err, errNoMatchedIPPool) || errors.Is(err, errNoAvailableIP) || errors.Is(err, lbpkg.ErrWaitExternalIP) { + h.lbController.EnqueueAfter(lb.Namespace, lb.Name, 1*time.Second) + return h.updateStatusNotReturnError(lbCopy, lb, err) + } else if errors.Is(err, errNoRunningBackendServer) || errors.Is(err, errAllBackendServersNotHealthy) { + // stop reconciler, wait vmi controller Enqueue() lb / health check go thread Enqueue() + return h.updateStatusNotReturnError(lbCopy, lb, err) + } + return h.updateStatus(lbCopy, lb, err) +} + +func (h *Handler) ensureAllocatedAddress(lbCopy, lb *lbv1.LoadBalancer) (*lbv1.LoadBalancer, error) { + if lb.Spec.IPAM == lbv1.DHCP { + return h.ensureAllocatedAddressDHCP(lbCopy, lb) + } + return h.ensureAllocatedAddressPool(lbCopy, lb) +} + +func (h *Handler) ensureVMLoadBalancer(lbCopy, lb *lbv1.LoadBalancer) (*lbv1.LoadBalancer, error) { if err := h.lbManager.EnsureLoadBalancer(lb); err != nil { - return fmt.Errorf("ensure load balancer %s/%s failed, error: %w", lb.Namespace, lb.Name, err) + return lb, err } - ip, err := h.waitServiceExternalIP(lb.Namespace, lb.Name) + + ip, err := h.lbManager.EnsureLoadBalancerServiceIP(lb) if err != nil { - return fmt.Errorf("wait service %s/%s external ip failed, error: %w", lb.Namespace, lb.Name, err) + lbCopy.Status.Address = "" + return lb, err } - lb.Status.Address = ip - servers, err := h.getBackendServers(lb) + + lbCopy.Status.Address = ip + servers, err := h.lbManager.EnsureBackendServers(lb) if err != nil { - return fmt.Errorf("get backend servers of lb %s/%s failed, error: %w", lb.Namespace, lb.Name, err) + return lb, err + } + lbCopy.Status.BackendServers = getServerAddress(servers) + if len(lbCopy.Status.BackendServers) == 0 { + return lb, errNoRunningBackendServer } - lb.Status.BackendServers = servers - return nil + if lb.Spec.HealthCheck != nil && lb.Spec.HealthCheck.Port != 0 { + count, err := h.lbManager.GetProbeReadyBackendServerCount(lb) + if err != nil { + return lb, err + } + + logrus.Debugf("lb %s/%s active probe count %v", lb.Namespace, lb.Name, count) + if count == 0 { + return lb, fmt.Errorf("%w total:%v, healthy:0", errAllBackendServersNotHealthy, len(lbCopy.Status.BackendServers)) + } + } + + return lb, nil +} + +func getServerAddress(servers []lbpkg.BackendServer) []string { + if len(servers) == 0 { + return nil + } + address := make([]string, 0, len(servers)) + for _, server := range servers { + if addr, ok := server.GetAddress(); ok { + address = append(address, addr) + } + } + return address } func (h *Handler) updateStatus(lbCopy, lb *lbv1.LoadBalancer, err error) (*lbv1.LoadBalancer, error) { @@ -129,116 +216,98 @@ func (h *Handler) updateStatus(lbCopy, lb *lbv1.LoadBalancer, err error) (*lbv1. lbv1.LoadBalancerReady.Message(lbCopy, "") } - // status didn't change, don't update it + // don't update when no change happens if reflect.DeepEqual(lbCopy.Status, lb.Status) { return lbCopy, err } - - updatedLb, updatedErr := h.lbClient.Update(lbCopy) + updatedLb, updatedErr := h.lbController.Update(lbCopy) if updatedErr != nil { - return nil, fmt.Errorf("update lb %s/%s status failed, error: %w", lb.Namespace, lb.Name, updatedErr) + return nil, fmt.Errorf("fail to update status, error: %w", updatedErr) } return updatedLb, err } -func (h *Handler) getBackendServers(lb *lbv1.LoadBalancer) ([]string, error) { - backendServers, err := h.lbManager.GetBackendServers(lb) - if err != nil { - return nil, err - } +// do not return error to wrangler framework, avoid endless error message +// caller decides where to add Enqueue +func (h *Handler) updateStatusNotReturnError(lbCopy, lb *lbv1.LoadBalancer, err error) (*lbv1.LoadBalancer, error) { + // set status to False + lbv1.LoadBalancerReady.False(lbCopy) + lbv1.LoadBalancerReady.Message(lbCopy, err.Error()) - servers := make([]string, 0, len(backendServers)) - for _, server := range backendServers { - addr, ok := server.GetAddress() - if ok { - servers = append(servers, addr) - } - } - - return servers, nil -} - -func (h *Handler) OnRemove(_ string, lb *lbv1.LoadBalancer) (*lbv1.LoadBalancer, error) { - if lb == nil { - return nil, nil + // don't update when no change happens + if reflect.DeepEqual(lbCopy.Status, lb.Status) { + return lb, nil } - logrus.Debugf("load balancer %s/%s has been deleted", lb.Namespace, lb.Name) - - if lb.Spec.IPAM == lbv1.Pool && lb.Status.AllocatedAddress.IPPool != "" { - if err := h.releaseIP(lb); err != nil { - return nil, fmt.Errorf("release ip of lb %s/%s failed, error: %w", lb.Namespace, lb.Name, err) - } + updatedLb, updatedErr := h.lbController.Update(lbCopy) + if updatedErr != nil { + return nil, fmt.Errorf("fail to update status with original error %w, new error: %w", err, updatedErr) } + logrus.Infof("lb %s/%s is set to not ready, error: %s", lb.Namespace, lb.Name, err.Error()) + return updatedLb, nil +} - if lb.Spec.WorkloadType == lbv1.VM { - if err := h.lbManager.DeleteLoadBalancer(lb); err != nil { - return nil, fmt.Errorf("delete lb %s/%s failed, error: %w", lb.Namespace, lb.Name, err) +func (h *Handler) ensureAllocatedAddressDHCP(lbCopy, lb *lbv1.LoadBalancer) (*lbv1.LoadBalancer, error) { + if lb.Status.AllocatedAddress.IP != utils.Address4AskDHCP { + lbCopy.Status.AllocatedAddress = lbv1.AllocatedAddress{ + IP: utils.Address4AskDHCP, } + return lb, nil } return lb, nil } -func (h *Handler) allocateIP(lb *lbv1.LoadBalancer) (*lbv1.AllocatedAddress, error) { - allocated := lb.Status.AllocatedAddress +func (h *Handler) ensureAllocatedAddressPool(lbCopy, lb *lbv1.LoadBalancer) (*lbv1.LoadBalancer, error) { + // lb's ip pool changes, release the previous allocated IP + if lb.Spec.IPPool != "" && lb.Status.AllocatedAddress.IPPool != "" && lb.Status.AllocatedAddress.IPPool != lb.Spec.IPPool { + logrus.Infof("lb %s/%s release ip %s to pool %s", lb.Namespace, lb.Name, lb.Status.AllocatedAddress.IP, lb.Status.AllocatedAddress.IPPool) + if err := h.releaseIP(lb); err != nil { + logrus.Warnf("lb %s/%s fail to release ip %s to pool %s, error: %s", lb.Namespace, lb.Name, lb.Status.AllocatedAddress.IP, lb.Spec.IPPool, err.Error()) + return lb, fmt.Errorf("fail to release ip %s to pool %s, error: %w", lb.Status.AllocatedAddress.IP, lb.Spec.IPPool, err) + } - if lb.Spec.IPAM == lbv1.DHCP { - return h.allocatedIPFromDHCP(&allocated, lb) + lbCopy.Status.AllocatedAddress = lbv1.AllocatedAddress{} + return lb, nil } - return h.allocatedIPFromPool(&allocated, lb) -} - -func (h *Handler) allocatedIPFromDHCP(allocated *lbv1.AllocatedAddress, lb *lbv1.LoadBalancer) (*lbv1.AllocatedAddress, error) { - var err error - // release the IP if the lb has applied an IP - if allocated.IPPool != "" { - if err = h.releaseIP(lb); err != nil { - return nil, err + // allocate or re-allocate IP + if lb.Status.AllocatedAddress.IPPool == "" { + ip, err := h.allocateIPFromPool(lb) + if err != nil { + logrus.Debugf("lb %s/%s fail to allocate from pool %s", lb.Namespace, lb.Name, err.Error()) + return lb, err } - } - if allocated.IP != utils.Address4AskDHCP { - return &lbv1.AllocatedAddress{ - IP: utils.Address4AskDHCP, - }, nil + lbCopy.Status.AllocatedAddress = *ip + logrus.Infof("lb %s/%s allocate ip %s from pool %s", lb.Namespace, lb.Name, ip.IP, ip.IPPool) + return lb, nil } - return nil, nil + return lb, nil } -func (h *Handler) allocatedIPFromPool(allocated *lbv1.AllocatedAddress, lb *lbv1.LoadBalancer) (*lbv1.AllocatedAddress, error) { - var err error +func (h *Handler) allocateIPFromPool(lb *lbv1.LoadBalancer) (*lbv1.AllocatedAddress, error) { pool := lb.Spec.IPPool if pool == "" { // match an IP pool automatically if not specified - pool, err = h.selectIPPool(lb) + pool, err := h.selectIPPool(lb) if err != nil { - return nil, fmt.Errorf("fail to select the pool for lb %s/%s, error: %w", lb.Namespace, lb.Name, err) - } - } - // release the IP from other IP pool - if allocated.IPPool != "" && allocated.IPPool != pool { - if err = h.releaseIP(lb); err != nil { return nil, err } - } - if allocated.IPPool != pool { return h.requestIP(lb, pool) } - return nil, nil + return h.requestIP(lb, pool) } func (h *Handler) requestIP(lb *lbv1.LoadBalancer, pool string) (*lbv1.AllocatedAddress, error) { - // get allocator allocator := h.allocatorMap.Get(pool) if allocator == nil { - return nil, fmt.Errorf("could not get the allocator %s", pool) + return nil, fmt.Errorf("fail to get allocator %s", pool) } - // get IP + // the ip is booked on pool when successfully Get() ipConfig, err := allocator.Get(fmt.Sprintf("%s/%s", lb.Namespace, lb.Name)) if err != nil { return nil, err @@ -264,42 +333,26 @@ func (h *Handler) selectIPPool(lb *lbv1.LoadBalancer) (string, error) { } pool, err := ipam.NewSelector(h.ipPoolCache).Select(r) if err != nil { - return "", fmt.Errorf("select IP pool failed, error: %w", err) + return "", fmt.Errorf("%w with selector, error: %w", errNoMatchedIPPool, err) } if pool == nil { - return "", fmt.Errorf("no matching IP pool with requirement %+v", r) + return "", fmt.Errorf("%w with requirement %+v", errNoMatchedIPPool, r) } return pool.Name, nil } func (h *Handler) releaseIP(lb *lbv1.LoadBalancer) error { + // if pool is not ready, just fail and wait a := h.allocatorMap.Get(lb.Status.AllocatedAddress.IPPool) if a == nil { - return fmt.Errorf("could not get the allocator %s", lb.Status.AllocatedAddress.IPPool) + return fmt.Errorf("fail to get allocator %s", lb.Status.AllocatedAddress.IPPool) } return a.Release(fmt.Sprintf("%s/%s", lb.Namespace, lb.Name), "") } -func (h *Handler) waitServiceExternalIP(namespace, name string) (string, error) { - ticker := time.NewTicker(time.Second) - defer ticker.Stop() - tick := ticker.C - timeout := time.After(defaultWaitIPTimeout) - - for { - select { - case <-timeout: - return "", fmt.Errorf("timeout") - case <-tick: - svc, err := h.serviceCache.Get(namespace, name) - if err != nil { - logrus.Warnf("get service %s/%s failed, error: %v, continue...", namespace, name, err) - continue - } - if len(svc.Status.LoadBalancer.Ingress) > 0 { - return svc.Status.LoadBalancer.Ingress[0].IP, nil - } - } - } +// lb manager health check notify that the health of some VMs changed +func (h *Handler) HealthCheckNotify(namespace, name string) error { + h.lbController.Enqueue(namespace, name) + return nil } diff --git a/pkg/controller/vmi/controller.go b/pkg/controller/vmi/controller.go index 36f693d3..2698bb66 100644 --- a/pkg/controller/vmi/controller.go +++ b/pkg/controller/vmi/controller.go @@ -11,8 +11,6 @@ import ( lbv1 "github.com/harvester/harvester-load-balancer/pkg/apis/loadbalancer.harvesterhci.io/v1beta1" "github.com/harvester/harvester-load-balancer/pkg/config" ctllbv1 "github.com/harvester/harvester-load-balancer/pkg/generated/controllers/loadbalancer.harvesterhci.io/v1beta1" - lbpkg "github.com/harvester/harvester-load-balancer/pkg/lb" - "github.com/harvester/harvester-load-balancer/pkg/lb/servicelb" "github.com/harvester/harvester-load-balancer/pkg/utils" ) @@ -22,8 +20,6 @@ type Handler struct { lbController ctllbv1.LoadBalancerController lbClient ctllbv1.LoadBalancerClient lbCache ctllbv1.LoadBalancerCache - - lbManager lbpkg.Manager } func Register(ctx context.Context, management *config.Management) error { @@ -34,8 +30,6 @@ func Register(ctx context.Context, management *config.Management) error { lbController: lbs, lbClient: lbs, lbCache: lbs.Cache(), - - lbManager: management.LBManager, } vmis.OnChange(ctx, controllerName, handler.OnChange) @@ -48,77 +42,39 @@ func (h *Handler) OnChange(_ string, vmi *kubevirtv1.VirtualMachineInstance) (*k if vmi == nil || vmi.DeletionTimestamp != nil { return nil, nil } - logrus.Debugf("VMI %s/%s has been changed", vmi.Namespace, vmi.Name) - - lbs, err := h.lbCache.List(vmi.Namespace, labels.Everything()) - if err != nil { - return nil, fmt.Errorf("list load balancers failed, error: %w", err) - } - - for _, lb := range lbs { - // skip the cluster LB or the LB whose server selector is empty - if lb.Spec.WorkloadType == lbv1.Cluster || len(lb.Spec.BackendServerSelector) == 0 { - continue - } - - selector, err := utils.NewSelector(lb.Spec.BackendServerSelector) - if err != nil { - return nil, fmt.Errorf("parse selector %+v failed, error: %w", lb.Spec.BackendServerSelector, err) - } - // add or update the backend server to the matched load balancer - isChanged := false - if selector.Matches(labels.Set(vmi.Labels)) { - if isChanged, err = h.addServerToLB(vmi, lb); err != nil { - return nil, fmt.Errorf("add server %s/%s to lb %s/%s failed, error: %w", vmi.Namespace, vmi.Name, lb.Namespace, lb.Name, err) - } - } else { // remove the backend server from the unmatched load balancer - if isChanged, err = h.removeServerFromLB(vmi, lb); err != nil { - return nil, fmt.Errorf("remove server %s/%s from lb %s/%s failed, error: %w", vmi.Namespace, vmi.Name, lb.Namespace, lb.Name, err) - } - } - // update the load balancer status - if isChanged { - h.lbController.Enqueue(lb.Namespace, lb.Name) - } - } - - return vmi, nil + logrus.Debugf("VMI %s/%s is changed", vmi.Namespace, vmi.Name) + return h.notifyLoadBalancer(vmi) } func (h *Handler) OnRemove(_ string, vmi *kubevirtv1.VirtualMachineInstance) (*kubevirtv1.VirtualMachineInstance, error) { if vmi == nil { return nil, nil } + logrus.Debugf("VMI %s/%s is deleted", vmi.Namespace, vmi.Name) + return h.notifyLoadBalancer(vmi) +} - logrus.Debugf("VMI %s/%s has been deleted", vmi.Namespace, vmi.Name) - - lbs, err := h.lbCache.List("", labels.Everything()) +func (h *Handler) notifyLoadBalancer(vmi *kubevirtv1.VirtualMachineInstance) (*kubevirtv1.VirtualMachineInstance, error) { + lbs, err := h.lbCache.List(vmi.Namespace, labels.Everything()) if err != nil { - return nil, fmt.Errorf("list load balancers failed, error: %w", err) + return nil, fmt.Errorf("fail to list load balancers, error: %w", err) } for _, lb := range lbs { // skip the cluster LB or the LB whose server selector is empty - if lb.Spec.WorkloadType == lbv1.Cluster || len(lb.Spec.BackendServerSelector) == 0 { + if lb.DeletionTimestamp != nil || lb.Spec.WorkloadType == lbv1.Cluster || len(lb.Spec.BackendServerSelector) == 0 { continue } - // remove the backend server from the load balancers - if ok, err := h.removeServerFromLB(vmi, lb); err != nil { - return nil, fmt.Errorf("remove server %s/%s from lb %s/%s failed, error: %w", vmi.Namespace, vmi.Name, lb.Namespace, lb.Name, err) - } else if ok { + // notify LB + selector, err := utils.NewSelector(lb.Spec.BackendServerSelector) + if err != nil { + return nil, fmt.Errorf("fail to parse selector %+v, error: %w", lb.Spec.BackendServerSelector, err) + } + + if selector.Matches(labels.Set(vmi.Labels)) { + logrus.Debugf("VMI %s/%s notify lb %s/%s", vmi.Namespace, vmi.Name, lb.Namespace, lb.Name) h.lbController.Enqueue(lb.Namespace, lb.Name) } } - return vmi, nil } - -func (h *Handler) removeServerFromLB(vmi *kubevirtv1.VirtualMachineInstance, lb *lbv1.LoadBalancer) (bool, error) { - server := &servicelb.Server{VirtualMachineInstance: vmi} - return h.lbManager.RemoveBackendServers(lb, []lbpkg.BackendServer{server}) -} - -func (h *Handler) addServerToLB(vmi *kubevirtv1.VirtualMachineInstance, lb *lbv1.LoadBalancer) (bool, error) { - server := &servicelb.Server{VirtualMachineInstance: vmi} - return h.lbManager.AddBackendServers(lb, []lbpkg.BackendServer{server}) -} diff --git a/pkg/ipam/store/fake_store.go b/pkg/ipam/store/fake_store.go index 797c823b..e860cb68 100644 --- a/pkg/ipam/store/fake_store.go +++ b/pkg/ipam/store/fake_store.go @@ -69,12 +69,14 @@ func (f *FakeStore) Release(ip net.IP) error { } ipStr := ip.String() - if f.pool.Status.AllocatedHistory == nil { - f.pool.Status.AllocatedHistory = make(map[string]string) + if _, ok := f.pool.Status.Allocated[ipStr]; ok { + if f.pool.Status.AllocatedHistory == nil { + f.pool.Status.AllocatedHistory = make(map[string]string) + } + f.pool.Status.AllocatedHistory[ipStr] = f.pool.Status.Allocated[ipStr] + delete(f.pool.Status.Allocated, ipStr) + f.pool.Status.Available++ } - f.pool.Status.AllocatedHistory[ipStr] = f.pool.Status.Allocated[ipStr] - delete(f.pool.Status.Allocated, ipStr) - f.pool.Status.Available++ return nil } @@ -92,6 +94,7 @@ func (f *FakeStore) ReleaseByID(applicantID, _ string) error { f.pool.Status.AllocatedHistory[ip] = applicant delete(f.pool.Status.Allocated, ip) f.pool.Status.Available++ + break } } diff --git a/pkg/ipam/store/store.go b/pkg/ipam/store/store.go index ae21b32f..89606093 100644 --- a/pkg/ipam/store/store.go +++ b/pkg/ipam/store/store.go @@ -47,10 +47,15 @@ func (s *Store) Reserve(applicantID, _ string, ip net.IP, _ string) (bool, error ipStr := ip.String() - // return false if the ip has been allocated if ipPool.Status.Allocated != nil { - if _, ok := ipPool.Status.Allocated[ipStr]; ok { - return false, nil + if id, ok := ipPool.Status.Allocated[ipStr]; ok { + if id != applicantID { + // the ip has been allocated to other application + return false, nil + } + // pool allocated ip to lb, but lb failed to book it (e.g. failed to update status due to conflict) + // when lb allocates again, return success + return true, nil } } @@ -66,7 +71,7 @@ func (s *Store) Reserve(applicantID, _ string, ip net.IP, _ string) (bool, error ipPoolCopy.Status.Available-- if _, err := s.iPPoolClient.Update(ipPoolCopy); err != nil { - return false, fmt.Errorf("record %s into %s failed", ipStr, s.iPPoolName) + return false, fmt.Errorf("fail to reserve %s into %s", ipStr, s.iPPoolName) } return true, nil @@ -90,19 +95,25 @@ func (s *Store) Release(ip net.IP) error { return nil } - ipPoolCopy := ipPool.DeepCopy() + // tolerant duplicated release + // e.g. lb released ip but failed to update self, then release again + // still, need to check applicant ID + // luckily the host-local/backend/allocator only calls ReleaseByID ipStr := ip.String() + if _, ok := ipPool.Status.Allocated[ipStr]; ok { + ipPoolCopy := ipPool.DeepCopy() - if ipPool.Status.AllocatedHistory == nil { - ipPoolCopy.Status.AllocatedHistory = make(map[string]string) - } - ipPoolCopy.Status.AllocatedHistory[ipStr] = ipPool.Status.Allocated[ipStr] - delete(ipPoolCopy.Status.Allocated, ipStr) - ipPoolCopy.Status.Available++ + if ipPool.Status.AllocatedHistory == nil { + ipPoolCopy.Status.AllocatedHistory = make(map[string]string) + } + ipPoolCopy.Status.AllocatedHistory[ipStr] = ipPool.Status.Allocated[ipStr] + delete(ipPoolCopy.Status.Allocated, ipStr) + ipPoolCopy.Status.Available++ - _, err = s.iPPoolClient.Update(ipPoolCopy) - if err != nil { - return err + _, err = s.iPPoolClient.Update(ipPoolCopy) + if err != nil { + return err + } } return nil @@ -118,7 +129,11 @@ func (s *Store) ReleaseByID(applicantID, _ string) error { } ipPoolCopy := ipPool.DeepCopy() + found := false + // tolerant duplicated release + // e.g. lb released ip but failed to update self, then release again + // the host-local/backend/allocator only calls ReleaseByID for ip, applicant := range ipPool.Status.Allocated { if applicant == applicantID { if ipPool.Status.AllocatedHistory == nil { @@ -127,12 +142,16 @@ func (s *Store) ReleaseByID(applicantID, _ string) error { ipPoolCopy.Status.AllocatedHistory[ip] = applicant delete(ipPoolCopy.Status.Allocated, ip) ipPoolCopy.Status.Available++ + found = true + break } } - _, err = s.iPPoolClient.Update(ipPoolCopy) - if err != nil { - return err + if found { + _, err = s.iPPoolClient.Update(ipPoolCopy) + if err != nil { + return err + } } return nil @@ -144,7 +163,8 @@ func (s *Store) GetByID(applicantID, _ string) []net.IP { return nil } - ips := make([]net.IP, 0, 10) + // each ID can only have max 1 IP + ips := make([]net.IP, 0, 1) for ip, applicant := range ipPool.Status.Allocated { if applicantID == applicant { diff --git a/pkg/lb/interface.go b/pkg/lb/interface.go index 60606557..bfa7ab65 100644 --- a/pkg/lb/interface.go +++ b/pkg/lb/interface.go @@ -1,19 +1,34 @@ package lb import ( + "errors" + "k8s.io/apimachinery/pkg/types" lbv1 "github.com/harvester/harvester-load-balancer/pkg/apis/loadbalancer.harvesterhci.io/v1beta1" ) +type HealthCheckHandler func(namespace, name string) error + type Manager interface { + // Step 1. Ensure loadbalancer EnsureLoadBalancer(lb *lbv1.LoadBalancer) error DeleteLoadBalancer(lb *lbv1.LoadBalancer) error - GetBackendServers(lb *lbv1.LoadBalancer) ([]BackendServer, error) - // AddBackendServers returns true if backend servers are really added the LB, otherwise returns false - AddBackendServers(lb *lbv1.LoadBalancer, servers []BackendServer) (bool, error) - // RemoveBackendServers returns true if backend servers are really removed from the LB, otherwise returns false - RemoveBackendServers(lb *lbv1.LoadBalancer, servers []BackendServer) (bool, error) + + // Step 2. Ensure loadbalancer external IP + EnsureLoadBalancerServiceIP(lb *lbv1.LoadBalancer) (string, error) + + // Step 3. Ensure service backend servers + EnsureBackendServers(lb *lbv1.LoadBalancer) ([]BackendServer, error) + + ListBackendServers(lb *lbv1.LoadBalancer) ([]BackendServer, error) + + // return the count of endpoints which are probed as Ready + // if probe is disabled, then return the count of all endpoints + GetProbeReadyBackendServerCount(lb *lbv1.LoadBalancer) (int, error) + + // register a handler to get which lb is happending changes per health check + RegisterHealthCheckHandler(handler HealthCheckHandler) error } type BackendServer interface { @@ -22,3 +37,7 @@ type BackendServer interface { GetName() string GetAddress() (string, bool) } + +var ( + ErrWaitExternalIP = errors.New("service is waiting for external IP") +) diff --git a/pkg/lb/servicelb/manager.go b/pkg/lb/servicelb/manager.go index 1d6cbddf..ce73034f 100644 --- a/pkg/lb/servicelb/manager.go +++ b/pkg/lb/servicelb/manager.go @@ -3,6 +3,8 @@ package servicelb import ( "context" "fmt" + "reflect" + "slices" "strconv" "strings" "time" @@ -41,6 +43,7 @@ type Manager struct { endpointSliceClient ctldiscoveryv1.EndpointSliceClient endpointSliceCache ctldiscoveryv1.EndpointSliceCache vmiCache ctlkubevirtv1.VirtualMachineInstanceCache + healthHandler pkglb.HealthCheckHandler *prober.Manager } @@ -61,223 +64,365 @@ func NewManager(ctx context.Context, serviceClient ctlCorev1.ServiceClient, serv return m } -func (m *Manager) updateHealthCondition(uid string, isHealthy bool) error { - logrus.Infof("uid: %s, isHealthy: %t", uid, isHealthy) - ns, name, server, err := unMarshalUID(uid) +func (m *Manager) updateHealthCondition(uid, address string, isHealthy bool) error { + ns, name, err := unMarshalUID(uid) if err != nil { return err } eps, err := m.endpointSliceCache.Get(ns, name) if err != nil && !errors.IsNotFound(err) { - return fmt.Errorf("get cache of endpointslice %s/%s failed, error: %w", ns, name, err) + return fmt.Errorf("fail to get endpointslice %s/%s, error: %w", ns, name, err) } else if errors.IsNotFound(err) { - logrus.Warnf("endpointSlice %s/%s not found", ns, name) + logrus.Warnf("endpointSlice %s/%s is not found", ns, name) return nil } - for i, ep := range eps.Endpoints { - if len(ep.Addresses) != 1 { - return fmt.Errorf("the length of addresses is not 1, endpoint: %+v", ep) + ip, _, err := unMarshalPorberAddress(address) + if err != nil { + return err + } + + for i := range eps.Endpoints { + if len(eps.Endpoints[i].Addresses) != 1 { + return fmt.Errorf("the length of lb %s endpoint addresses is %v, endpoint: %+v", uid, len(eps.Endpoints[i].Addresses), eps.Endpoints[i]) } - if ep.Addresses[0] == server { - epsCopy := eps.DeepCopy() - epsCopy.Endpoints[i].Conditions = discoveryv1.EndpointConditions{Ready: &isHealthy} - if _, err := m.endpointSliceClient.Update(epsCopy); err != nil { - return fmt.Errorf("update status of endpointslice %s/%s failed, error: %w", ns, name, err) + if eps.Endpoints[i].Addresses[0] == ip { + // only update the Ready condition when necessary + if needUpdateEndpointConditions(&eps.Endpoints[i].Conditions, isHealthy) { + // notify controller that some endpoint conditions change ( success <---> fail) + // otherweise, the controller needs to watch all endpointslice object to know the health probe result on time + // or the controller actively loop Enqueue all lbs which enables health check + if m.healthHandler != nil { + if err := m.healthHandler(ns, name); err != nil { + return fmt.Errorf("fail to notify lb %s, error: %w", uid, err) + } + } + epsCopy := eps.DeepCopy() + updateEndpointConditions(&epsCopy.Endpoints[i].Conditions, isHealthy) + logrus.Infof("update condition of lb %s endpoint ip %s to %t", uid, ip, isHealthy) + if _, err := m.endpointSliceClient.Update(epsCopy); err != nil { + return fmt.Errorf("fail to update condition of lb %s endpoint ip %s to %t, error: %w", uid, ip, isHealthy, err) + } } + break } } return nil } -func (m *Manager) EnsureLoadBalancer(lb *lbv1.LoadBalancer) error { - if err := m.ensureService(lb); err != nil { - return fmt.Errorf("ensure service failed, error: %w", err) - } +func isEndpointConditionsReady(ec *discoveryv1.EndpointConditions) bool { + return ec.Ready != nil && *ec.Ready +} - eps, err := m.ensureEndpointSlice(lb) - if err != nil { - return fmt.Errorf("ensure endpointslice failed, error: %w", err) +func needUpdateEndpointConditions(ec *discoveryv1.EndpointConditions, ready bool) bool { + return ec.Ready == nil || *ec.Ready != ready +} + +func updateEndpointConditions(ec *discoveryv1.EndpointConditions, ready bool) { + if ec.Ready == nil { + v := ready + ec.Ready = &v + } else if *ec.Ready != ready { + *ec.Ready = ready } +} - m.ensureProbes(lb, eps) +func (m *Manager) updateAllConditions(lb *lbv1.LoadBalancer, eps *discoveryv1.EndpointSlice, isHealthy bool) error { + epsCopy := eps.DeepCopy() + updated := false + for i := range epsCopy.Endpoints { + if !isDummyEndpoint(&epsCopy.Endpoints[i]) && needUpdateEndpointConditions(&epsCopy.Endpoints[i].Conditions, isHealthy) { + updateEndpointConditions(&epsCopy.Endpoints[i].Conditions, isHealthy) + updated = true + } + } + if updated { + logrus.Infof("update all conditions of lb %s/%s endpoints to %t", lb.Namespace, lb.Name, isHealthy) + if _, err := m.endpointSliceClient.Update(epsCopy); err != nil { + return fmt.Errorf("fail to update all conditions of lb %s/%s endpoints to %t error: %w", lb.Namespace, lb.Name, isHealthy, err) + } + } return nil } -func (m *Manager) DeleteLoadBalancer(lb *lbv1.LoadBalancer) error { +// if probe is disabled, then return the endpint count +func (m *Manager) GetProbeReadyBackendServerCount(lb *lbv1.LoadBalancer) (int, error) { eps, err := m.endpointSliceCache.Get(lb.Namespace, lb.Name) if err != nil && !errors.IsNotFound(err) { - return err + return 0, err } else if errors.IsNotFound(err) { - logrus.Warnf("endpointSlice %s/%s not found", lb.Namespace, lb.Name) - return nil + logrus.Warnf("lb %s/%s endpointSlice is not found", lb.Namespace, lb.Name) + return 0, err + } + + count := 0 + // if use `for _, ep := range eps.Endpoints` + // get: G601: Implicit memory aliasing in for loop. (gosec) + for i := range eps.Endpoints { + if !isDummyEndpoint(&eps.Endpoints[i]) && isEndpointConditionsReady(&eps.Endpoints[i].Conditions) { + count++ + } } - m.removeProbes(lb, eps) + return count, nil +} +func (m *Manager) EnsureLoadBalancer(lb *lbv1.LoadBalancer) error { + return m.ensureService(lb) +} + +// call only once before controller starts looping on OnChange ... +func (m *Manager) RegisterHealthCheckHandler(handler pkglb.HealthCheckHandler) error { + if m.healthHandler != nil { + return fmt.Errorf("health check handler can only be registered once") + } + m.healthHandler = handler return nil } -func (m *Manager) GetBackendServers(lb *lbv1.LoadBalancer) ([]pkglb.BackendServer, error) { - eps, err := m.endpointSliceCache.Get(lb.Namespace, lb.Name) +func (m *Manager) DeleteLoadBalancer(lb *lbv1.LoadBalancer) error { + _, err := m.endpointSliceCache.Get(lb.Namespace, lb.Name) if err != nil && !errors.IsNotFound(err) { - return nil, err + return err } else if errors.IsNotFound(err) { - logrus.Warnf("endpointSlice %s/%s not found", lb.Namespace, lb.Name) - return []pkglb.BackendServer{}, nil + logrus.Infof("delete lb %s/%s endpointSlice is not found", lb.Namespace, lb.Name) + // no harm to call the following removeLBProbers } - servers := make([]pkglb.BackendServer, 0, len(eps.Endpoints)) - for _, ep := range eps.Endpoints { - vmi, err := m.vmiCache.Get(ep.TargetRef.Namespace, ep.TargetRef.Name) - if err != nil { - return nil, err - } + _, err = m.removeLBProbers(lb) + return err +} - servers = append(servers, &Server{vmi}) +// get the qualified backend servers of one LB +func (m *Manager) getServiceBackendServers(lb *lbv1.LoadBalancer) ([]pkglb.BackendServer, error) { + // get related vmis + selector, err := utils.NewSelector(lb.Spec.BackendServerSelector) + if err != nil { + return nil, fmt.Errorf("fail to new selector, error: %w", err) + } + vmis, err := m.vmiCache.List(lb.Namespace, selector) + if err != nil { + return nil, fmt.Errorf("fail to list vmi per selector, error: %w", err) } + servers := make([]pkglb.BackendServer, 0, len(vmis)) + + // skip being-deleted vmi, no-address vmi + for _, vmi := range vmis { + if vmi.DeletionTimestamp != nil { + continue + } + server := &Server{VirtualMachineInstance: vmi} + if _, ok := server.GetAddress(); ok { + servers = append(servers, server) + } + } return servers, nil } -func (m *Manager) AddBackendServers(lb *lbv1.LoadBalancer, servers []pkglb.BackendServer) (bool, error) { - eps, err := m.endpointSliceCache.Get(lb.Namespace, lb.Name) - if err != nil && !errors.IsNotFound(err) { - return false, err - } else if errors.IsNotFound(err) { - logrus.Warnf("endpointSlice %s/%s not found", lb.Namespace, lb.Name) - return false, nil +func (m *Manager) EnsureBackendServers(lb *lbv1.LoadBalancer) ([]pkglb.BackendServer, error) { + // ensure service is existing + if svc, err := m.getService(lb); err != nil { + return nil, err + } else if svc == nil { + return nil, fmt.Errorf("service is not existing, ensure it first") } - var epsCopy *discoveryv1.EndpointSlice - endpoints := make([]*discoveryv1.Endpoint, 0, len(servers)) - for _, server := range servers { - if flag, _, err := isExisting(eps, server); err != nil || flag { - continue + var eps *discoveryv1.EndpointSlice + eps, err := m.endpointSliceCache.Get(lb.Namespace, lb.Name) + if err != nil { + if !errors.IsNotFound(err) { + return nil, fmt.Errorf("fail to get endpointslice, error: %w", err) } + eps = nil + } - address, ok := server.GetAddress() - if !ok { - continue - } + servers, err := m.getServiceBackendServers(lb) + if err != nil { + return nil, err + } - endpoint := discoveryv1.Endpoint{ - Addresses: []string{address}, - TargetRef: &corev1.ObjectReference{ - Namespace: server.GetNamespace(), - Name: server.GetName(), - UID: server.GetUID(), - }, - Conditions: discoveryv1.EndpointConditions{ - Ready: pointer.Bool(true), - }, + epsNew, err := m.constructEndpointSliceFromBackendServers(eps, lb, servers) + if err != nil { + return nil, err + } + + // create a new one + if eps == nil { + // it is ok to do not check IsAlreadyExists, reconciler will pass + eps, err = m.endpointSliceClient.Create(epsNew) + if err != nil { + return nil, fmt.Errorf("fail to create endpointslice, error: %w", err) } - if epsCopy == nil { - epsCopy = eps.DeepCopy() + } else { + if !reflect.DeepEqual(eps, epsNew) { + logrus.Debugf("update endpointslice %s/%s", lb.Namespace, lb.Name) + eps, err = m.endpointSliceClient.Update(epsNew) + if err != nil { + return nil, fmt.Errorf("fail to update endpointslice, error: %w", err) + } } - epsCopy.Endpoints = append(epsCopy.Endpoints, endpoint) - endpoints = append(endpoints, &endpoint) } - if epsCopy != nil { - if _, err := m.endpointSliceClient.Update(epsCopy); err != nil { - return false, err - } + // always ensure probs + if err := m.ensureProbes(lb, eps); err != nil { + return nil, fmt.Errorf("fail to ensure probs, error: %w", err) } - if lb.Spec.HealthCheck != nil && lb.Spec.HealthCheck.Port != 0 { - for _, endpoint := range endpoints { - m.addOneProbe(lb, endpoint) - } + // always ensure dummy endpoint + if err := m.ensureDummyEndpoint(lb, eps); err != nil { + return nil, fmt.Errorf("fail to ensure dummy endpointslice, error: %w", err) } - return epsCopy != nil, nil + return servers, nil } -func (m *Manager) RemoveBackendServers(lb *lbv1.LoadBalancer, servers []pkglb.BackendServer) (bool, error) { - eps, err := m.endpointSliceCache.Get(lb.Namespace, lb.Name) - if err != nil && !errors.IsNotFound(err) { - return false, err - } else if errors.IsNotFound(err) { - logrus.Warnf("endpointSlice %s/%s not found", lb.Namespace, lb.Name) - return false, nil +func (m *Manager) EnsureLoadBalancerServiceIP(lb *lbv1.LoadBalancer) (string, error) { + // ensure service is existing + svc, err := m.getService(lb) + if err != nil { + return "", err } - - indexes := make([]int, 0, len(servers)) - for _, server := range servers { - flag, index, err := isExisting(eps, server) - if err != nil || !flag { - continue - } - indexes = append(indexes, index) + if svc == nil { + return "", fmt.Errorf("service is not existing, ensure it first") } - - if len(indexes) == 0 { - return false, nil + // kube-vip will update this field, wait if not existing + if len(svc.Status.LoadBalancer.Ingress) > 0 { + return svc.Status.LoadBalancer.Ingress[0].IP, nil } + // no ip, wait + return "", pkglb.ErrWaitExternalIP +} - epsCopy := eps.DeepCopy() - epsCopy.Endpoints = make([]discoveryv1.Endpoint, 0, len(eps.Endpoints)) - preIndex := -1 - for _, index := range indexes { - epsCopy.Endpoints = append(epsCopy.Endpoints, eps.Endpoints[preIndex+1:index]...) - preIndex = index - } - epsCopy.Endpoints = append(epsCopy.Endpoints, eps.Endpoints[preIndex+1:]...) - if _, err = m.endpointSliceClient.Update(epsCopy); err != nil { - return false, err +func (m *Manager) ListBackendServers(lb *lbv1.LoadBalancer) ([]pkglb.BackendServer, error) { + return m.getServiceBackendServers(lb) +} + +func (m *Manager) ensureProbes(lb *lbv1.LoadBalancer, eps *discoveryv1.EndpointSlice) error { + // disabled + if lb.Spec.HealthCheck == nil || lb.Spec.HealthCheck.Port == 0 { + if _, err := m.removeLBProbers(lb); err != nil { + return err + } + // user may disable the healthy checker e.g. it is not working as expected + // then set all endpoints to be Ready thus they can continue to work + if err := m.updateAllConditions(lb, eps, true); err != nil { + return err + } + return nil } - if lb.Spec.HealthCheck != nil && lb.Spec.HealthCheck.Port != 0 { - for _, i := range indexes { - m.removeOneProbe(lb, &eps.Endpoints[i]) + uid := marshalUID(lb.Namespace, lb.Name) + targetProbers := make(map[string]prober.HealthOption) + // indexing to skip G601 in go v121 + for i := range eps.Endpoints { + if len(eps.Endpoints[i].Addresses) == 0 || isDummyEndpoint(&eps.Endpoints[i]) { + continue } + targetProbers[marshalPorberAddress(lb, &eps.Endpoints[i])] = m.generateOneProber(lb, &eps.Endpoints[i]) } - return true, nil + // get a copy of data for safe operation + activeProbers, _ := m.GetWorkerHealthOptionMap(uid) + + return m.updateAllProbers(uid, activeProbers, targetProbers) } -// Check whether the server is existing in the endpointSlice -func isExisting(eps *discoveryv1.EndpointSlice, server pkglb.BackendServer) (bool, int, error) { - address, ok := server.GetAddress() - if !ok { - return false, 0, fmt.Errorf("could not get address of server %s/%s", server.GetNamespace(), server.GetName()) +// according to the activeProbers and targetProbers +// +// keep unchanged +// remove out-dated +// add newly observed +func (m *Manager) updateAllProbers(uid string, activeProbers, targetProbers map[string]prober.HealthOption) error { + for _, ap := range activeProbers { + if tp, ok := targetProbers[ap.Address]; ok { + // find in both + if !ap.Equal(tp) { + // changed + // remove active one + if _, err := m.RemoveWorker(uid, ap.Address); err != nil { + return err + } + // add target one + if err := m.AddWorker(uid, tp.Address, tp); err != nil { + return err + } + } + // replaced or equal; then delete it from both maps + delete(activeProbers, ap.Address) + delete(targetProbers, tp.Address) + } + // for those not found in the targetProbers, will be processed in next lines } - for i, ep := range eps.Endpoints { - if len(ep.Addresses) != 1 { - return false, 0, fmt.Errorf("the length of addresses is not 1, endpoint: %+v", ep) + // remove all remainings of activeProbers + for _, ap := range activeProbers { + logrus.Debugf("-probe %s %s", uid, ap.Address) + if _, err := m.RemoveWorker(uid, ap.Address); err != nil { + return err } + } - if ep.TargetRef != nil && server.GetUID() == ep.TargetRef.UID && address == ep.Addresses[0] { - return true, i, nil + // add all remainings of targetProbers + for _, tp := range targetProbers { + logrus.Debugf("+probe %s %s", uid, tp.Address) + if err := m.AddWorker(uid, tp.Address, tp); err != nil { + return err } } - return false, 0, nil + return nil } -func (m *Manager) ensureProbes(lb *lbv1.LoadBalancer, eps *discoveryv1.EndpointSlice) { - if lb.Spec.HealthCheck == nil || lb.Spec.HealthCheck.Port == 0 { - m.removeProbes(lb, eps) - return +// without at least one Ready (dummy) endpoint, the service may route traffic to local host +func (m *Manager) ensureDummyEndpoint(lb *lbv1.LoadBalancer, eps *discoveryv1.EndpointSlice) error { + dummyCount := 0 + activeCount := 0 + // if use `for _, ep := range eps.Endpoints` + // get: G601: Implicit memory aliasing in for loop. (gosec) + for i := range eps.Endpoints { + if isDummyEndpoint(&eps.Endpoints[i]) { + dummyCount++ + } else if isEndpointConditionsReady(&eps.Endpoints[i].Conditions) { + activeCount++ + } } - for i := range eps.Endpoints { - m.addOneProbe(lb, &eps.Endpoints[i]) + // add the dummy endpoint + if activeCount == 0 && dummyCount == 0 { + epsCopy := eps.DeepCopy() + epsCopy.Endpoints = appendDummyEndpoint(epsCopy.Endpoints, lb) + if _, err := m.endpointSliceClient.Update(epsCopy); err != nil { + return fmt.Errorf("fail to append dummy endpoint to lb %v endpoint, error: %w", lb.Name, err) + } + return nil } -} -func (m *Manager) addOneProbe(lb *lbv1.LoadBalancer, ep *discoveryv1.Endpoint) { - if len(ep.Addresses) == 0 { - return + // remove the dummy endpoint + if activeCount > 0 && dummyCount > 0 { + epsCopy := eps.DeepCopy() + epsCopy.Endpoints = slices.DeleteFunc(epsCopy.Endpoints, func(ep discoveryv1.Endpoint) bool { + return ep.TargetRef.UID == dummyEndpointID + }) + if _, err := m.endpointSliceClient.Update(epsCopy); err != nil { + return fmt.Errorf("fail to remove dummy endpoint from lb %v endpoint, error: %w", lb.Name, err) + } + return nil } + return nil +} + +func (m *Manager) removeLBProbers(lb *lbv1.LoadBalancer) (int, error) { + return m.RemoveWorkersByUid(marshalUID(lb.Namespace, lb.Name)) +} + +func (m *Manager) generateOneProber(lb *lbv1.LoadBalancer, ep *discoveryv1.Endpoint) prober.HealthOption { option := prober.HealthOption{ - Address: ep.Addresses[0] + ":" + strconv.Itoa(int(lb.Spec.HealthCheck.Port)), + Address: marshalPorberAddress(lb, ep), InitialCondition: true, } if lb.Spec.HealthCheck.SuccessThreshold == 0 { @@ -303,54 +448,70 @@ func (m *Manager) addOneProbe(lb *lbv1.LoadBalancer, ep *discoveryv1.Endpoint) { if ep.Conditions.Ready != nil { option.InitialCondition = *ep.Conditions.Ready } - - uid := marshalUID(lb.Namespace, lb.Name, ep.Addresses[0]) - - m.AddWorker(uid, option) + return option } -func (m *Manager) removeProbes(lb *lbv1.LoadBalancer, eps *discoveryv1.EndpointSlice) { - for i := range eps.Endpoints { - m.removeOneProbe(lb, &eps.Endpoints[i]) - } +// the prober.Manager is managed per uid +// lb namespace/name is a qualified group uid +func marshalUID(ns, name string) (uid string) { + uid = ns + "/" + name + return } -func (m *Manager) removeOneProbe(lb *lbv1.LoadBalancer, ep *discoveryv1.Endpoint) { - if len(ep.Addresses) == 0 { +func unMarshalUID(uid string) (namespace, name string, err error) { + fields := strings.Split(uid, "/") + if len(fields) != 2 { + err = fmt.Errorf("invalid uid %s", uid) return } - uid := marshalUID(lb.Namespace, lb.Name, ep.Addresses[0]) - m.RemoveWorker(uid) + namespace, name = fields[0], fields[1] + return } -func unMarshalUID(uid string) (namespace, name, server string, err error) { - fields := strings.Split(uid, "/") - if len(fields) != 3 { - err = fmt.Errorf("invalid uid %s", uid) +func marshalPorberAddress(lb *lbv1.LoadBalancer, ep *discoveryv1.Endpoint) string { + return ep.Addresses[0] + ":" + strconv.Itoa(int(lb.Spec.HealthCheck.Port)) +} + +// probe address is like: 10.52.0.214:80 +func unMarshalPorberAddress(address string) (ip, port string, err error) { + fields := strings.Split(address, ":") + if len(fields) != 2 { + err = fmt.Errorf("invalid probe address %s", address) return } - namespace, name, server = fields[0], fields[1], fields[2] + ip, port = fields[0], fields[1] return } -func marshalUID(ns, name, server string) (uid string) { - uid = ns + "/" + name + "/" + server - return +func (m *Manager) getService(lb *lbv1.LoadBalancer) (*corev1.Service, error) { + svc, err := m.serviceCache.Get(lb.Namespace, lb.Name) + if err != nil { + if !errors.IsNotFound(err) { + return nil, fmt.Errorf("fail to get service, error: %w", err) + } + return nil, nil + } + return svc, nil } func (m *Manager) ensureService(lb *lbv1.LoadBalancer) error { - svc, err := m.serviceCache.Get(lb.Namespace, lb.Name) - if err != nil && !errors.IsNotFound(err) { - return fmt.Errorf("get cache of service %s/%s failed, error: %w", lb.Namespace, lb.Name, err) - } else if err == nil { - svc = constructService(svc, lb) - if _, err := m.serviceClient.Update(svc.DeepCopy()); err != nil { - return fmt.Errorf("update service %s/%s failed, error: %w", lb.Namespace, lb.Name, err) + svc, err := m.getService(lb) + if err != nil { + return err + } + + if svc != nil { + svcCopy := svc.DeepCopy() + svcCopy = constructService(svcCopy, lb) + if !reflect.DeepEqual(svc, svcCopy) { + if _, err := m.serviceClient.Update(svcCopy); err != nil { + return fmt.Errorf("fail to update service, error: %w", err) + } } } else { svc = constructService(nil, lb) - if _, err := m.serviceClient.Create(svc); err != nil { - return fmt.Errorf("create service %s/%s failed, error: %w", lb.Namespace, lb.Name, err) + if _, err := m.serviceClient.Create(svc); err != nil && !errors.IsAlreadyExists(err) { + return fmt.Errorf("fail to create service, error: %w", err) } } @@ -398,32 +559,30 @@ func constructService(cur *corev1.Service, lb *lbv1.LoadBalancer) *corev1.Servic return svc } -func (m *Manager) ensureEndpointSlice(lb *lbv1.LoadBalancer) (*discoveryv1.EndpointSlice, error) { - eps, err := m.endpointSliceCache.Get(lb.Namespace, lb.Name) - if err != nil && !errors.IsNotFound(err) { - return nil, fmt.Errorf("get cache of endpointslice %s/%s failed, error: %w", lb.Namespace, lb.Name, err) - } else if err == nil { - eps, err = m.constructEndpointSlice(eps, lb) - if err != nil { - return nil, err - } - if eps, err = m.endpointSliceClient.Update(eps); err != nil { - return nil, fmt.Errorf("update endpointslice %s/%s failed, error: %w", lb.Namespace, lb.Name, err) - } - } else { - eps, err = m.constructEndpointSlice(nil, lb) - if err != nil { - return nil, err - } - if eps, err = m.endpointSliceClient.Create(eps); err != nil { - return nil, fmt.Errorf("create endpointslice %s/%s failed, error: %w", lb.Namespace, lb.Name, err) - } - } +const dummyEndpointIPv4Address = "10.52.0.255" +const dummyEndpointID = "dummy347-546a-4642-9da6-5608endpoint" + +func appendDummyEndpoint(eps []discoveryv1.Endpoint, lb *lbv1.LoadBalancer) []discoveryv1.Endpoint { + endpoint := discoveryv1.Endpoint{ + Addresses: []string{dummyEndpointIPv4Address}, + TargetRef: &corev1.ObjectReference{ + Namespace: lb.Namespace, + Name: lb.Name, + UID: dummyEndpointID, + }, + Conditions: discoveryv1.EndpointConditions{ + Ready: pointer.Bool(true), + }, + } + eps = append(eps, endpoint) + return eps +} - return eps, nil +func isDummyEndpoint(ep *discoveryv1.Endpoint) bool { + return ep.TargetRef.UID == dummyEndpointID } -func (m *Manager) constructEndpointSlice(cur *discoveryv1.EndpointSlice, lb *lbv1.LoadBalancer) (*discoveryv1.EndpointSlice, error) { +func (m *Manager) constructEndpointSliceFromBackendServers(cur *discoveryv1.EndpointSlice, lb *lbv1.LoadBalancer, servers []pkglb.BackendServer) (*discoveryv1.EndpointSlice, error) { eps := &discoveryv1.EndpointSlice{} if cur != nil { eps = cur.DeepCopy() @@ -457,14 +616,11 @@ func (m *Manager) constructEndpointSlice(cur *discoveryv1.EndpointSlice, lb *lbv // It's necessary to reserve the condition of old endpoints. var endpoints []discoveryv1.Endpoint - var flag bool - servers, err := m.matchBackendServers(lb) - if err != nil { - return nil, err - } + var existing bool for _, server := range servers { - flag = false + existing = false + // already checked when getting servers, but keep to take care of history data address, ok := server.GetAddress() if !ok { continue @@ -475,14 +631,14 @@ func (m *Manager) constructEndpointSlice(cur *discoveryv1.EndpointSlice, lb *lbv } if ep.TargetRef != nil && server.GetUID() == ep.TargetRef.UID && address == ep.Addresses[0] { - flag = true + existing = true // add the existing endpoint endpoints = append(endpoints, ep) break } } // add the non-existing endpoint - if !flag { + if !existing { endpoint := discoveryv1.Endpoint{ Addresses: []string{address}, TargetRef: &corev1.ObjectReference{ @@ -497,33 +653,13 @@ func (m *Manager) constructEndpointSlice(cur *discoveryv1.EndpointSlice, lb *lbv endpoints = append(endpoints, endpoint) } } + // a dummy endpoint avoids the LB traffic is routed to other services/local host accidentally + if len(endpoints) == 0 { + endpoints = appendDummyEndpoint(endpoints, lb) + } eps.Endpoints = endpoints - logrus.Debugln("constructEndpointSlice: ", eps) + logrus.Debugln("constructEndpointSliceFromBackendServers: ", eps) return eps, nil } - -// Find the VMIs which match the selector -func (m *Manager) matchBackendServers(lb *lbv1.LoadBalancer) ([]pkglb.BackendServer, error) { - if len(lb.Spec.BackendServerSelector) == 0 { - return []pkglb.BackendServer{}, nil - } - selector, err := utils.NewSelector(lb.Spec.BackendServerSelector) - if err != nil { - return nil, err - } - // limit the lb to the same namespace of the workload - vmis, err := m.vmiCache.List(lb.Namespace, selector) - if err != nil { - return nil, err - } - - servers := make([]pkglb.BackendServer, len(vmis)) - - for i, vmi := range vmis { - servers[i] = &Server{vmi} - } - - return servers, nil -} diff --git a/pkg/prober/health.go b/pkg/prober/health.go index 203c4aae..ca23bd42 100644 --- a/pkg/prober/health.go +++ b/pkg/prober/health.go @@ -12,6 +12,12 @@ type HealthOption struct { } type healthCondition struct { - workerUID string - isHealth bool + uid string + address string + isHealthy bool +} + +func (ho *HealthOption) Equal(h HealthOption) bool { + return ho.Address == h.Address && ho.SuccessThreshold == h.SuccessThreshold && ho.FailureThreshold == h.FailureThreshold && + ho.Timeout == h.Timeout && ho.Period == h.Period } diff --git a/pkg/prober/manager.go b/pkg/prober/manager.go index ece6b4b7..01bf6c89 100644 --- a/pkg/prober/manager.go +++ b/pkg/prober/manager.go @@ -7,10 +7,12 @@ import ( "github.com/sirupsen/logrus" ) -type updateCondition func(workerKey string, isHealthy bool) error +type updateCondition func(uid, address string, isHealthy bool) error + +type WorkerMap map[string]*Worker type Manager struct { - workers map[string]*Worker + workers map[string]WorkerMap workerLock sync.RWMutex conditionChan chan healthCondition tcpProber *tcpProber @@ -18,7 +20,7 @@ type Manager struct { func NewManager(ctx context.Context, handler updateCondition) *Manager { m := &Manager{ - workers: make(map[string]*Worker), + workers: make(map[string]WorkerMap), workerLock: sync.RWMutex{}, conditionChan: make(chan healthCondition), tcpProber: newTCPProber(ctx), @@ -26,8 +28,8 @@ func NewManager(ctx context.Context, handler updateCondition) *Manager { go func() { for cond := range m.conditionChan { - if err := handler(cond.workerUID, cond.isHealth); err != nil { - logrus.Errorf("update status failed, key: %s, condition: %t", cond.workerUID, cond.isHealth) + if err := handler(cond.uid, cond.address, cond.isHealthy); err != nil { + logrus.Errorf("prober update status to manager failed, uid:%s, address: %s, condition: %t", cond.uid, cond.address, cond.isHealthy) } } }() @@ -35,54 +37,81 @@ func NewManager(ctx context.Context, handler updateCondition) *Manager { return m } -func (m *Manager) GetWorker(uid string) (*Worker, bool) { +func (m *Manager) GetWorkerHealthOptionMap(uid string) (map[string]HealthOption, error) { m.workerLock.RLock() defer m.workerLock.RUnlock() - if w, ok := m.workers[uid]; ok { - return w, true + if wm, ok := m.workers[uid]; ok { + // copy out + prob := make(map[string]HealthOption) + for _, w := range wm { + prob[w.Address] = w.HealthOption + } + return prob, nil } - return nil, false + return nil, nil } -func (m *Manager) ListWorkers() map[string]*Worker { - return m.workers -} +func (m *Manager) AddWorker(uid string, address string, option HealthOption) error { + m.workerLock.Lock() + defer m.workerLock.Unlock() -func (m *Manager) AddWorker(uid string, option HealthOption) { - w, existed := m.GetWorker(uid) - if existed { - if isChanged(option, w) { - m.RemoveWorker(uid) - } else { - return - } + wm, ok := m.workers[uid] + if !ok { + wm = make(WorkerMap) + m.workers[uid] = wm } - logrus.Infof("add worker, uid: %s, option: %+v", uid, option) - w = newWorker(uid, m.tcpProber, option, m.conditionChan) - m.workerLock.Lock() - defer m.workerLock.Unlock() - m.workers[uid] = w + // stop if duplicated + if w, ok := wm[address]; ok { + logrus.Infof("porber worker already exists, uid %s, address %s, will stop it", uid, address) + w.stop() + } + w := newWorker(uid, m.tcpProber, option, m.conditionChan) + wm[address] = w + go w.run() + + logrus.Infof("add porber worker, uid: %s, address: %s, option: %+v", uid, address, option) + return nil } -func (m *Manager) RemoveWorker(uid string) { - w, existed := m.GetWorker(uid) - if !existed { - return - } - logrus.Infof("remove worker, uid: %s", uid) - w.stop() +func (m *Manager) RemoveWorker(uid, address string) (int, error) { m.workerLock.Lock() defer m.workerLock.Unlock() - delete(m.workers, uid) + + cnt := 0 + if wm, ok := m.workers[uid]; ok { + if w, ok := wm[address]; ok { + w.stop() + delete(wm, address) + cnt = 1 + } + } + + if cnt > 0 { + logrus.Infof("remove porber worker, uid: %s, address: %s", uid, address) + } + + return cnt, nil } -func isChanged(o HealthOption, w *Worker) bool { - if o.Address == w.address && o.Timeout == w.timeout && o.Period == w.Period && - o.SuccessThreshold == w.successThreshold && o.FailureThreshold == w.failureThreshold { - return false +func (m *Manager) RemoveWorkersByUid(uid string) (int, error) { + m.workerLock.Lock() + defer m.workerLock.Unlock() + + cnt := 0 + if wm, ok := m.workers[uid]; ok { + cnt = len(wm) + for k, w := range wm { + w.stop() + delete(wm, k) + } + delete(m.workers, uid) + } + + if cnt > 0 { + logrus.Infof("remove %d porber workers from uid: %s", cnt, uid) } - return true + return cnt, nil } diff --git a/pkg/prober/manager_test.go b/pkg/prober/manager_test.go index 9a8bc2eb..f9fd4101 100644 --- a/pkg/prober/manager_test.go +++ b/pkg/prober/manager_test.go @@ -18,43 +18,57 @@ const ( ) func TestManager(t *testing.T) { - mng.AddWorker(healthyCase, HealthOption{ + if err := mng.AddWorker(healthyCase, healthyAddress, HealthOption{ Address: healthyAddress, SuccessThreshold: 1, FailureThreshold: 3, Timeout: time.Second, Period: time.Second, InitialCondition: false, - }) + }); err != nil { + t.Errorf("case: %s, add worker failed %s", healthyCase, err.Error()) + } time.Sleep(time.Second * 2) - if len(mng.workers) != 1 { - t.Errorf("case; %s, add worker failed", healthyCase) + if len(mng.workers) == 0 { + t.Errorf("case: %s, add worker failed", healthyCase) + } + if len(mng.workers[healthyCase]) == 0 { + t.Errorf("case: %s, add worker address %s failed", healthyCase, healthyAddress) + } + if _, ok := mng.workers[healthyCase][healthyAddress]; !ok { + t.Errorf("case: %s, works map is wrong", healthyCase) } - if !mng.workers[healthyCase].condition { + if !mng.workers[healthyCase][healthyAddress].condition { t.Errorf("it should be able to connect %s", healthyAddress) } - mng.RemoveWorker(healthyCase) - if len(mng.workers) != 0 { + if _, err := mng.RemoveWorker(healthyCase, healthyAddress); err != nil { + t.Errorf("case: %s, remove worker failed %s", healthyCase, err.Error()) + } + if len(mng.workers[healthyCase]) != 0 { t.Errorf("case: %s, remove worker failed", healthyCase) } - mng.AddWorker(unhealthyCase, HealthOption{ + if err := mng.AddWorker(unhealthyCase, unhealthyAddress, HealthOption{ Address: unhealthyAddress, SuccessThreshold: 1, FailureThreshold: 2, Timeout: time.Second, Period: time.Second, InitialCondition: true, - }) + }); err != nil { + t.Errorf("case: %s, add worker failed %s", unhealthyCase, err.Error()) + } + if len(mng.workers[unhealthyCase]) != 1 { + t.Errorf("case: %s, Add worker failed, len=%d", unhealthyCase, len(mng.workers[unhealthyCase])) + } time.Sleep(time.Second * 5) - if mng.workers[unhealthyCase].condition { + if mng.workers[unhealthyCase][unhealthyAddress].condition { t.Errorf("it should not be able to connect %s", unhealthyAddress) } - if len(mng.workers) != 1 { - t.Errorf("case: %s, Add worker failed", unhealthyCase) + if _, err := mng.RemoveWorker(unhealthyCase, unhealthyAddress); err != nil { + t.Errorf("case: %s, remove worker failed %s", unhealthyCase, err.Error()) } - mng.RemoveWorker(unhealthyCase) - if len(mng.workers) != 0 { + if len(mng.workers[unhealthyAddress]) != 0 { t.Errorf("case: %s, remove worker failed", unhealthyCase) } } @@ -67,7 +81,7 @@ func TestMain(m *testing.M) { os.Exit(code) } -func printCondition(uid string, isHealthy bool) error { - fmt.Printf("health check result, uid: %s, isHealthy: %t\n", uid, isHealthy) +func printCondition(uid, address string, isHealthy bool) error { + fmt.Printf("health check result, uid: %s, address %s, isHealthy: %t\n", uid, address, isHealthy) return nil } diff --git a/pkg/prober/worker.go b/pkg/prober/worker.go index ce45cb7c..118fa94e 100644 --- a/pkg/prober/worker.go +++ b/pkg/prober/worker.go @@ -7,34 +7,30 @@ import ( ) type Worker struct { - tcpProber Prober - uid string - address string - successThreshold uint - successCounter uint - failureThreshold uint - failureCounter uint - timeout time.Duration - Period time.Duration - condition bool - conditionChan chan healthCondition - stopCh chan struct{} + HealthOption + uid string + tcpProber Prober + successCounter uint + failureCounter uint + condition bool + conditionChan chan healthCondition + stopCh chan struct{} + logFailure bool + logSuccess bool } func newWorker(uid string, tcpProber Prober, option HealthOption, conditionChan chan healthCondition) *Worker { return &Worker{ - tcpProber: tcpProber, - uid: uid, - address: option.Address, - successThreshold: option.SuccessThreshold, - successCounter: 0, - failureThreshold: option.FailureThreshold, - failureCounter: 0, - timeout: option.Timeout, - Period: option.Period, - condition: option.InitialCondition, - conditionChan: conditionChan, - stopCh: make(chan struct{}), + tcpProber: tcpProber, + uid: uid, + HealthOption: option, + successCounter: 0, + failureCounter: 0, + conditionChan: conditionChan, + stopCh: make(chan struct{}), + condition: option.InitialCondition, + logFailure: true, + logSuccess: true, } } @@ -58,35 +54,49 @@ func (w *Worker) stop() { // probe only supports TCP func (w *Worker) probe() error { - return w.tcpProber.Probe(w.address, w.timeout) + return w.tcpProber.Probe(w.Address, w.Timeout) } func (w *Worker) doProbe() { + // failure case if err := w.probe(); err != nil { - logrus.Infof("probe error, %s, address: %s, timeout: %v", err.Error(), w.address, w.timeout) w.successCounter = 0 w.failureCounter++ - } else { - logrus.Infof("probe successful, address: %s, timeout: %v", w.address, w.timeout) - w.failureCounter = 0 - w.successCounter++ - } - if w.successCounter == w.successThreshold { - if !w.condition { - w.condition = true - w.conditionChan <- healthCondition{ - workerUID: w.uid, - isHealth: w.condition, + w.logSuccess = true + if w.failureCounter >= w.FailureThreshold { + // for continuous failure, only log error once in the controller life-cycle + if w.logFailure { + logrus.Infof("probe error uid:%s, address: %s, timeout: %v, error: %s", w.uid, w.Address, w.Timeout, err.Error()) + w.logFailure = false } - } - } - if w.failureCounter == w.failureThreshold { - if w.condition { + // notify anyway, the receiver may fail when processing w.condition = false w.conditionChan <- healthCondition{ - workerUID: w.uid, - isHealth: w.condition, + uid: w.uid, + address: w.Address, + isHealthy: w.condition, } + w.failureCounter = 0 } + return + } + + // successful case + w.failureCounter = 0 + w.successCounter++ + w.logFailure = true + if w.successCounter >= w.SuccessThreshold { + if w.logSuccess { + logrus.Infof("probe successful, uid:%s, address: %s, timeout: %v", w.uid, w.Address, w.Timeout) + w.logSuccess = false + } + // notify anyway, the receiver may fail when processing + w.condition = true + w.conditionChan <- healthCondition{ + uid: w.uid, + address: w.Address, + isHealthy: w.condition, + } + w.successCounter = 0 } } diff --git a/pkg/webhook/loadbalancer/mutator.go b/pkg/webhook/loadbalancer/mutator.go index b8f4168b..ad95c035 100644 --- a/pkg/webhook/loadbalancer/mutator.go +++ b/pkg/webhook/loadbalancer/mutator.go @@ -41,7 +41,20 @@ func NewMutator(namespaceCache ctlcorev1.NamespaceCache, func (m *mutator) Create(_ *admission.Request, newObj runtime.Object) (admission.Patch, error) { lb := newObj.(*lbv1.LoadBalancer) - return m.getAnnotationsPatch(lb) + ap, err := m.getAnnotationsPatch(lb) + if err != nil { + return nil, err + } + + hcp, err := m.getHealthyCheckPatch(lb) + if err != nil { + return nil, err + } + + if len(ap) == 0 { + return hcp, nil + } + return append(ap, hcp...), nil } func (m *mutator) Update(_ *admission.Request, _, newObj runtime.Object) (admission.Patch, error) { @@ -51,7 +64,62 @@ func (m *mutator) Update(_ *admission.Request, _, newObj runtime.Object) (admiss return nil, nil } - return m.getAnnotationsPatch(lb) + ap, err := m.getAnnotationsPatch(lb) + if err != nil { + return nil, err + } + + hcp, err := m.getHealthyCheckPatch(lb) + if err != nil { + return nil, err + } + + if len(ap) == 0 { + return hcp, nil + } + return append(ap, hcp...), nil +} + +// those fields are not checked in the past, necessary to overwrite them to at least 1 +func (m *mutator) getHealthyCheckPatch(lb *lbv1.LoadBalancer) (admission.Patch, error) { + if lb.Spec.HealthCheck == nil || lb.Spec.HealthCheck.Port == 0 { + return nil, nil + } + + hc := *lb.Spec.HealthCheck + patched := false + + if hc.SuccessThreshold == 0 { + hc.SuccessThreshold = 2 + patched = true + } + + if hc.FailureThreshold == 0 { + hc.FailureThreshold = 2 + patched = true + } + + if hc.PeriodSeconds == 0 { + hc.PeriodSeconds = 1 + patched = true + } + + if hc.TimeoutSeconds == 0 { + hc.TimeoutSeconds = 1 + patched = true + } + + if patched { + return []admission.PatchOp{ + { + Op: admission.PatchOpReplace, + Path: "/spec/healthCheck", + Value: hc, + }, + }, nil + } + + return nil, nil } func (m *mutator) getAnnotationsPatch(lb *lbv1.LoadBalancer) (admission.Patch, error) { diff --git a/pkg/webhook/loadbalancer/mutator_test.go b/pkg/webhook/loadbalancer/mutator_test.go index 980d2299..d9754c6e 100644 --- a/pkg/webhook/loadbalancer/mutator_test.go +++ b/pkg/webhook/loadbalancer/mutator_test.go @@ -8,6 +8,10 @@ import ( harvesterfakeclients "github.com/harvester/harvester/pkg/util/fakeclients" corefake "k8s.io/client-go/kubernetes/fake" + lbv1 "github.com/harvester/harvester-load-balancer/pkg/apis/loadbalancer.harvesterhci.io/v1beta1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "github.com/harvester/harvester-load-balancer/pkg/utils" ) @@ -39,6 +43,67 @@ func TestFindProject(t *testing.T) { }, } + testsHealthCheckMutatored := []struct { + name string + lb *lbv1.LoadBalancer + wantErr bool + opsLen int + }{ + { + name: "health check mutatored case", + lb: &lbv1.LoadBalancer{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "test", + }, + Spec: lbv1.LoadBalancerSpec{ + Listeners: []lbv1.Listener{ + {Name: "a", BackendPort: 80, Protocol: corev1.ProtocolTCP}, + {Name: "b", BackendPort: 32, Protocol: corev1.ProtocolUDP}, + }, + HealthCheck: &lbv1.HealthCheck{Port: 80, SuccessThreshold: 0, FailureThreshold: 1, PeriodSeconds: 1, TimeoutSeconds: 1}, + }, + }, + wantErr: false, + opsLen: 2, + }, + { + name: "health check right case: valid parameters", + lb: &lbv1.LoadBalancer{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "test", + }, + Spec: lbv1.LoadBalancerSpec{ + Listeners: []lbv1.Listener{ + {Name: "a", BackendPort: 80, Protocol: corev1.ProtocolTCP}, + {Name: "b", BackendPort: 32, Protocol: corev1.ProtocolUDP}, + }, + HealthCheck: &lbv1.HealthCheck{Port: 80, SuccessThreshold: 1, FailureThreshold: 1, PeriodSeconds: 1, TimeoutSeconds: 1}, + }, + }, + wantErr: false, + opsLen: 1, + }, + { + name: "health check right case: no health check", + lb: &lbv1.LoadBalancer{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "test", + }, + Spec: lbv1.LoadBalancerSpec{ + Listeners: []lbv1.Listener{ + {Name: "a", BackendPort: 80, Protocol: corev1.ProtocolTCP}, + {Name: "b", BackendPort: 32, Protocol: corev1.ProtocolUDP}, + }, + }, + }, + wantErr: false, + opsLen: 1, + }, + } + for _, test := range tests { if project, err := m.findProject(test.namespace); err != nil { t.Error(err) @@ -46,6 +111,23 @@ func TestFindProject(t *testing.T) { t.Errorf("want project %s through namespace %s, got %s", test.wantProject, test.namespace, project) } } + + for _, test := range testsHealthCheckMutatored { + if pt, err := m.Create(nil, test.lb); (err != nil) != test.wantErr { + t.Error(err) + } else if len(pt) != test.opsLen { + // return 2 ops + // [{Op:replace Path:/metadata/annotations Value:map[loadbalancer.harvesterhci.io/namespace:default loadbalancer.harvesterhci.io/network: loadbalancer.harvesterhci.io/project:local/p-abcde]} + // {Op:replace Path:/spec/healthCheck Value:{Port:80 SuccessThreshold:2 FailureThreshold:1 PeriodSeconds:1 TimeoutSeconds:1}}] + t.Errorf("create test %v return patchOps len %v != %v, %+v", test.name, len(pt), test.opsLen, pt) + } + + if pt, err := m.Update(nil, nil, test.lb); (err != nil) != test.wantErr { + t.Error(err) + } else if len(pt) != test.opsLen { + t.Errorf("update test %v return patchOps len %v != %v, %+v", test.name, len(pt), test.opsLen, pt) + } + } } // TestFindNetwork tests the function findNetwork diff --git a/pkg/webhook/loadbalancer/validator.go b/pkg/webhook/loadbalancer/validator.go index b91c10a3..cb115a50 100644 --- a/pkg/webhook/loadbalancer/validator.go +++ b/pkg/webhook/loadbalancer/validator.go @@ -3,26 +3,22 @@ package loadbalancer import ( "fmt" - ctlkubevirtv1 "github.com/harvester/harvester/pkg/generated/controllers/kubevirt.io/v1" "github.com/harvester/webhook/pkg/server/admission" admissionregv1 "k8s.io/api/admissionregistration/v1" + corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" lbv1 "github.com/harvester/harvester-load-balancer/pkg/apis/loadbalancer.harvesterhci.io/v1beta1" - "github.com/harvester/harvester-load-balancer/pkg/utils" ) type validator struct { admission.DefaultValidator - vmiCache ctlkubevirtv1.VirtualMachineInstanceCache } var _ admission.Validator = &validator{} -func NewValidator(vmiCache ctlkubevirtv1.VirtualMachineInstanceCache) admission.Validator { - return &validator{ - vmiCache: vmiCache, - } +func NewValidator() admission.Validator { + return &validator{} } func (v *validator) Create(_ *admission.Request, newObj runtime.Object) error { @@ -32,12 +28,8 @@ func (v *validator) Create(_ *admission.Request, newObj runtime.Object) error { return fmt.Errorf("create loadbalancer %s/%s failed: %w", lb.Namespace, lb.Name, err) } - ok, err := v.matchAtLeastOneVmi(lb) - if err != nil { - return fmt.Errorf("create loadbalancer %s/%s failed: %w", lb.Namespace, lb.Name, err) - } - if !ok { - return fmt.Errorf("create loadbalancer %s/%s failed: no virtual machine instance matched", lb.Namespace, lb.Name) + if err := checkHealthyCheck(lb); err != nil { + return fmt.Errorf("create loadbalancer %s/%s failed with healthyCheck: %w", lb.Namespace, lb.Name, err) } return nil @@ -54,12 +46,8 @@ func (v *validator) Update(_ *admission.Request, oldObj, newObj runtime.Object) return fmt.Errorf("update loadbalancer %s/%s failed: %w", lb.Namespace, lb.Name, err) } - ok, err := v.matchAtLeastOneVmi(lb) - if err != nil { - return fmt.Errorf("update loadbalancer %s/%s failed: %w", lb.Namespace, lb.Name, err) - } - if !ok { - return fmt.Errorf("update loadbalancer %s/%s failed: no virtual machine instance matched", lb.Namespace, lb.Name) + if err := checkHealthyCheck(lb); err != nil { + return fmt.Errorf("update loadbalancer %s/%s failed with healthyCheck: %w", lb.Namespace, lb.Name, err) } return nil @@ -79,22 +67,13 @@ func (v *validator) Resource() admission.Resource { } } -func (v *validator) matchAtLeastOneVmi(lb *lbv1.LoadBalancer) (bool, error) { - selector, err := utils.NewSelector(lb.Spec.BackendServerSelector) - if err != nil { - return false, err - } - - vmis, err := v.vmiCache.List(lb.Namespace, selector) - if err != nil { - return false, err - } - - return len(vmis) > 0, nil -} +const maxPort = 65535 func checkListeners(lb *lbv1.LoadBalancer) error { nameMap, portMap, backendMap := map[string]bool{}, map[int32]int{}, map[int32]int{} + if len(lb.Spec.Listeners) == 0 { + return fmt.Errorf("the loadbalancer needs to have at least one listener") + } for i, listener := range lb.Spec.Listeners { // check listener name if _, ok := nameMap[listener.Name]; ok { @@ -117,5 +96,53 @@ func checkListeners(lb *lbv1.LoadBalancer) error { backendMap[listener.BackendPort] = i } + for _, listener := range lb.Spec.Listeners { + // check listener name + if listener.Port > maxPort { + return fmt.Errorf("listener port %v must <= %v", listener.Port, maxPort) + } else if listener.Port < 1 { + return fmt.Errorf("listener port %v must >= 1", listener.Port) + } + if listener.BackendPort > maxPort { + return fmt.Errorf("listener backend port %v must <= %v", listener.Port, maxPort) + } else if listener.BackendPort < 1 { + return fmt.Errorf("listener backend port %v must >= 1", listener.Port) + } + } + + return nil +} + +func checkHealthyCheck(lb *lbv1.LoadBalancer) error { + if lb.Spec.HealthCheck != nil && lb.Spec.HealthCheck.Port != 0 { + wrongProtocol := false + for _, listener := range lb.Spec.Listeners { + // check listener port and protocol, only TCP is supported now + if uint(listener.BackendPort) == lb.Spec.HealthCheck.Port { + if listener.Protocol == corev1.ProtocolTCP { + if lb.Spec.HealthCheck.SuccessThreshold == 0 { + return fmt.Errorf("healthcheck SuccessThreshold should > 0") + } + if lb.Spec.HealthCheck.FailureThreshold == 0 { + return fmt.Errorf("healthcheck FailureThreshold should > 0") + } + if lb.Spec.HealthCheck.PeriodSeconds == 0 { + return fmt.Errorf("healthcheck PeriodSeconds should > 0") + } + if lb.Spec.HealthCheck.TimeoutSeconds == 0 { + return fmt.Errorf("healthcheck TimeoutSeconds should > 0") + } + return nil + } + // not the expected TCP + wrongProtocol = true + } + } + if wrongProtocol { + return fmt.Errorf("healthcheck port %v can only be a TCP backend port", lb.Spec.HealthCheck.Port) + } + return fmt.Errorf("healthcheck port %v is not in listener backend port list", lb.Spec.HealthCheck.Port) + } + return nil } diff --git a/pkg/webhook/loadbalancer/validator_test.go b/pkg/webhook/loadbalancer/validator_test.go index e487c4d0..aa4ac70b 100644 --- a/pkg/webhook/loadbalancer/validator_test.go +++ b/pkg/webhook/loadbalancer/validator_test.go @@ -4,6 +4,7 @@ import ( "testing" lbv1 "github.com/harvester/harvester-load-balancer/pkg/apis/loadbalancer.harvesterhci.io/v1beta1" + corev1 "k8s.io/api/core/v1" ) func TestCheckListeners(t *testing.T) { @@ -48,6 +49,54 @@ func TestCheckListeners(t *testing.T) { }, wantErr: true, }, + { + name: "port < 1", + lb: &lbv1.LoadBalancer{ + Spec: lbv1.LoadBalancerSpec{ + Listeners: []lbv1.Listener{ + {Name: "a", Port: -1}, + {Name: "b", Port: 80}, + }, + }, + }, + wantErr: true, + }, + { + name: "port > 65535", + lb: &lbv1.LoadBalancer{ + Spec: lbv1.LoadBalancerSpec{ + Listeners: []lbv1.Listener{ + {Name: "a", Port: 80}, + {Name: "b", Port: 8000}, + }, + }, + }, + wantErr: true, + }, + { + name: "backend port < 1", + lb: &lbv1.LoadBalancer{ + Spec: lbv1.LoadBalancerSpec{ + Listeners: []lbv1.Listener{ + {Name: "a", BackendPort: 0}, + {Name: "b", BackendPort: 80}, + }, + }, + }, + wantErr: true, + }, + { + name: "backend port > 65535", + lb: &lbv1.LoadBalancer{ + Spec: lbv1.LoadBalancerSpec{ + Listeners: []lbv1.Listener{ + {Name: "a", BackendPort: 65536}, + {Name: "b", BackendPort: 80}, + }, + }, + }, + wantErr: true, + }, { name: "right case", lb: &lbv1.LoadBalancer{ @@ -62,9 +111,113 @@ func TestCheckListeners(t *testing.T) { }, } + testsHealtyCheck := []struct { + name string + lb *lbv1.LoadBalancer + wantErr bool + }{ + { + name: "health check port is not in backend port list", + lb: &lbv1.LoadBalancer{ + Spec: lbv1.LoadBalancerSpec{ + Listeners: []lbv1.Listener{ + {Name: "a", BackendPort: 80, Protocol: corev1.ProtocolTCP}, + {Name: "b", BackendPort: 32, Protocol: corev1.ProtocolUDP}, + }, + HealthCheck: &lbv1.HealthCheck{Port: 99}, + }, + }, + wantErr: true, + }, + { + name: "health check protocol is not expected tcp", + lb: &lbv1.LoadBalancer{ + Spec: lbv1.LoadBalancerSpec{ + Listeners: []lbv1.Listener{ + {Name: "a", BackendPort: 80, Protocol: corev1.ProtocolTCP}, + {Name: "b", BackendPort: 32, Protocol: corev1.ProtocolUDP}, + }, + HealthCheck: &lbv1.HealthCheck{Port: 32}, + }, + }, + wantErr: true, + }, + { + name: "health check parameter SuccessThreshold is error", + lb: &lbv1.LoadBalancer{ + Spec: lbv1.LoadBalancerSpec{ + Listeners: []lbv1.Listener{ + {Name: "a", BackendPort: 80, Protocol: corev1.ProtocolTCP}, + {Name: "b", BackendPort: 32, Protocol: corev1.ProtocolUDP}, + }, + HealthCheck: &lbv1.HealthCheck{Port: 80, SuccessThreshold: 0}, + }, + }, + wantErr: true, + }, + { + name: "health check parameter FailureThreshold is error", + lb: &lbv1.LoadBalancer{ + Spec: lbv1.LoadBalancerSpec{ + Listeners: []lbv1.Listener{ + {Name: "a", BackendPort: 80, Protocol: corev1.ProtocolTCP}, + {Name: "b", BackendPort: 32, Protocol: corev1.ProtocolUDP}, + }, + HealthCheck: &lbv1.HealthCheck{Port: 80, FailureThreshold: 0}, + }, + }, + wantErr: true, + }, + { + name: "health check parameter PeriodSeconds is error", + lb: &lbv1.LoadBalancer{ + Spec: lbv1.LoadBalancerSpec{ + Listeners: []lbv1.Listener{ + {Name: "a", BackendPort: 80, Protocol: corev1.ProtocolTCP}, + {Name: "b", BackendPort: 32, Protocol: corev1.ProtocolUDP}, + }, + HealthCheck: &lbv1.HealthCheck{Port: 80, PeriodSeconds: 0}, + }, + }, + wantErr: true, + }, + { + name: "health check parameter TimeoutSeconds is error", + lb: &lbv1.LoadBalancer{ + Spec: lbv1.LoadBalancerSpec{ + Listeners: []lbv1.Listener{ + {Name: "a", BackendPort: 80, Protocol: corev1.ProtocolTCP}, + {Name: "b", BackendPort: 32, Protocol: corev1.ProtocolUDP}, + }, + HealthCheck: &lbv1.HealthCheck{Port: 80, TimeoutSeconds: 0}, + }, + }, + wantErr: true, + }, + { + name: "health check right case", + lb: &lbv1.LoadBalancer{ + Spec: lbv1.LoadBalancerSpec{ + Listeners: []lbv1.Listener{ + {Name: "a", BackendPort: 80, Protocol: corev1.ProtocolTCP}, + {Name: "b", BackendPort: 32, Protocol: corev1.ProtocolUDP}, + }, + HealthCheck: &lbv1.HealthCheck{Port: 80, SuccessThreshold: 1, FailureThreshold: 1, PeriodSeconds: 1, TimeoutSeconds: 1}, + }, + }, + wantErr: false, + }, + } + for _, tt := range tests { if err := checkListeners(tt.lb); (err != nil) != tt.wantErr { - t.Errorf("%q. checkPorts() error = %v, wantErr %v", tt.name, err, tt.wantErr) + t.Errorf("%q. checkListeners() error = %v, wantErr %v", tt.name, err, tt.wantErr) + } + } + + for _, tt := range testsHealtyCheck { + if err := checkHealthyCheck(tt.lb); (err != nil) != tt.wantErr { + t.Errorf("%q. checkHealthyCheck() error = %v, wantErr %v", tt.name, err, tt.wantErr) } } }