diff --git a/internal/ingress/controller/admission_batcher.go b/internal/ingress/controller/admission_batcher.go index 4dac4a46ec..1fdc2c2e03 100644 --- a/internal/ingress/controller/admission_batcher.go +++ b/internal/ingress/controller/admission_batcher.go @@ -18,13 +18,13 @@ type Namespace string type Name string const ( - batchDelay = 5 * time.Second - batchersCount = 3 + batchDelay = 4 * time.Second ) type AdmissionBatcher struct { - ingresses []*networking.Ingress - errorChannels []chan error + ingresses map[Namespace][]*networking.Ingress + isOnBatching map[Namespace]bool + errorChannels map[Namespace][]chan error isWorking bool isWorkingMU *sync.Mutex workerMU *sync.Mutex @@ -33,8 +33,9 @@ type AdmissionBatcher struct { func NewAdmissionBatcher() AdmissionBatcher { return AdmissionBatcher{ - ingresses: nil, - errorChannels: nil, + ingresses: map[Namespace][]*networking.Ingress{}, + isOnBatching: map[Namespace]bool{}, + errorChannels: map[Namespace][]chan error{}, isWorking: true, isWorkingMU: &sync.Mutex{}, workerMU: &sync.Mutex{}, @@ -44,9 +45,7 @@ func NewAdmissionBatcher() AdmissionBatcher { func (n *NGINXController) StartAdmissionBatcher() { n.admissionBatcher.setWork(true) - for i := 0; i < batchersCount; i++ { - go n.BatchConsumerRoutine(i) - } + go n.BatchConsumerRoutine() } func (n *NGINXController) StopAdmissionBatcher() { @@ -54,22 +53,20 @@ func (n *NGINXController) StopAdmissionBatcher() { n.admissionBatcher.consumerWG.Wait() } -func (n *NGINXController) BatchConsumerRoutine(i int) { +func (n *NGINXController) BatchConsumerRoutine() { n.admissionBatcher.consumerWG.Add(1) defer n.admissionBatcher.consumerWG.Done() - klog.Infof("Admission batcher routine %d started", i) for n.admissionBatcher.isWork() { - n.admissionBatcher.workerMU.Lock() - if !n.admissionBatcher.hasNewIngresses() { - n.admissionBatcher.workerMU.Unlock() + ns, ok := n.admissionBatcher.hasNewIngresses() + if !ok { time.Sleep(100 * time.Millisecond) continue } - + klog.Info("Start waiting for batch, namespace: ", ns) + go n.BatchConsumerRoutine() time.Sleep(batchDelay) - newIngresses, errorChannels := n.admissionBatcher.fetchNewBatch() - n.admissionBatcher.workerMU.Unlock() + newIngresses, errorChannels := n.admissionBatcher.fetchNewBatch(ns) if len(newIngresses) > 0 { err := n.validateNewIngresses(newIngresses) @@ -77,8 +74,8 @@ func (n *NGINXController) BatchConsumerRoutine(i int) { erCh <- err } } + return } - klog.Infof("Admission batcher routine %d finished", i) } func groupByNamespacesAndNames(ingresses []*networking.Ingress) map[Namespace]map[Name]struct{} { @@ -136,24 +133,47 @@ func (n *NGINXController) validateNewIngresses(newIngresses []*networking.Ingres klog.Info("New ingresses with annotations appended for ", ingsListStr) + startTest := time.Now().UnixNano() / 1000000 start := time.Now() - _, _, newIngCfg := n.getConfiguration(ings) + + _, servers, newIngCfg := n.getConfiguration(ings) klog.Info("Configuration generated in ", time.Now().Sub(start).Seconds(), " seconds for ", ingsListStr) + for _, newIngress := range newIngresses { + err := checkOverlap(newIngress, servers) + if err != nil { + return errors.Wrapf(err, "error while validating overlap for ingress %s/%s", newIngress.Namespace, newIngress.Name) + } + } + start = time.Now() template, err := n.generateTemplate(cfg, *newIngCfg) if err != nil { - return errors.Wrap(err, "error while validating batch of ingresses") + return errors.Wrapf(err, "error while generating template for ingresses %s", ingsListStr) } klog.Info("Generated nginx template in ", time.Now().Sub(start).Seconds(), " seconds for ", ingsListStr) start = time.Now() err = n.testTemplate(template) if err != nil { - return errors.Wrap(err, "error while validating batch of ingresses") + return errors.Wrapf(err, "error while testing template for of ingresses %s", ingsListStr) } klog.Info("Tested nginx template in ", time.Now().Sub(start).Seconds(), " seconds for ", ingsListStr) + endCheck := time.Now().UnixNano() / 1000000 + + testedSize := len(ings) + confSize := len(template) + n.metricCollector.SetAdmissionMetrics( + float64(testedSize), + float64(endCheck-startTest)/1000, + float64(len(ings)), + //can't calculate content properly because of batching + 0, + float64(confSize), + //can't calculate content properly because of batching + 0, + ) return nil } @@ -171,14 +191,20 @@ func (ab *AdmissionBatcher) isWork() bool { return ab.isWorking } -func (ab *AdmissionBatcher) hasNewIngresses() bool { +func (ab *AdmissionBatcher) hasNewIngresses() (Namespace, bool) { ab.isWorkingMU.Lock() defer ab.isWorkingMU.Unlock() - return len(ab.ingresses) != 0 + for ns, b := range ab.isOnBatching { + if !b { + ab.isOnBatching[ns] = true + return ns, true + } + } + return "", false } -func (ab *AdmissionBatcher) fetchNewBatch() (ings []*networking.Ingress, errorChannels []chan error) { +func (ab *AdmissionBatcher) fetchNewBatch(namespace Namespace) (ings []*networking.Ingress, errorChannels []chan error) { ab.isWorkingMU.Lock() defer ab.isWorkingMU.Unlock() @@ -186,31 +212,44 @@ func (ab *AdmissionBatcher) fetchNewBatch() (ings []*networking.Ingress, errorCh return nil, nil } - ings = ab.ingresses - errorChannels = ab.errorChannels + ings = ab.ingresses[namespace] + errorChannels = ab.errorChannels[namespace] var sb strings.Builder - sb.WriteString("Fetched new batch of ingresses: ") + sb.WriteString(fmt.Sprint("Fetched new batch of ingresses for ns: ", namespace)) for _, ing := range ings { - sb.WriteString(fmt.Sprintf("%s/%s ", ing.Namespace, ing.Name)) + sb.WriteString(fmt.Sprintf(" %s", ing.Name)) } klog.Info(sb.String()) - ab.errorChannels = nil - ab.ingresses = nil + delete(ab.errorChannels, namespace) + delete(ab.ingresses, namespace) + delete(ab.isOnBatching, namespace) return ings, errorChannels } func (ab *AdmissionBatcher) ValidateIngress(ing *networking.Ingress) error { + errCh := ab.addIngressToHandle(ing) + klog.Info( + "Ingress ", + fmt.Sprintf("%v/%v", ing.Namespace, ing.Name), + " submitted for batch validation, waiting for verdict...", + ) + return <-errCh +} + +func (ab *AdmissionBatcher) addIngressToHandle(ing *networking.Ingress) chan error { ab.isWorkingMU.Lock() + defer ab.isWorkingMU.Unlock() - ab.ingresses = append(ab.ingresses, ing) + ns := Namespace(ing.Namespace) - errCh := make(chan error) - ab.errorChannels = append(ab.errorChannels, errCh) - - ab.isWorkingMU.Unlock() + if _, ok := ab.isOnBatching[ns]; !ok { + ab.isOnBatching[ns] = false + } + ab.ingresses[ns] = append(ab.ingresses[ns], ing) - klog.Info("Ingress ", fmt.Sprintf("%v/%v", ing.Namespace, ing.Name), " submitted for batch validation, waiting for verdict...") - return <-errCh + errCh := make(chan error) + ab.errorChannels[ns] = append(ab.errorChannels[ns], errCh) + return errCh } diff --git a/internal/ingress/controller/controller.go b/internal/ingress/controller/controller.go index c56c266263..58160059a5 100644 --- a/internal/ingress/controller/controller.go +++ b/internal/ingress/controller/controller.go @@ -39,7 +39,6 @@ import ( "k8s.io/ingress-nginx/internal/ingress/annotations/proxy" ngx_config "k8s.io/ingress-nginx/internal/ingress/controller/config" "k8s.io/ingress-nginx/internal/ingress/controller/ingressclass" - "k8s.io/ingress-nginx/internal/ingress/controller/store" "k8s.io/ingress-nginx/internal/ingress/errors" "k8s.io/ingress-nginx/internal/ingress/inspector" "k8s.io/ingress-nginx/internal/ingress/metric/collectors" @@ -308,7 +307,7 @@ func (n *NGINXController) CheckWarning(ing *networking.Ingress) ([]string, error // CheckIngress returns an error in case the provided ingress, when added // to the current configuration, generates an invalid configuration func (n *NGINXController) CheckIngress(ing *networking.Ingress) error { - startCheck := time.Now().UnixNano() / 1000000 + //startCheck := time.Now().UnixNano() / 1000000 if ing == nil { // no ingress to add, no state change @@ -340,7 +339,7 @@ func (n *NGINXController) CheckIngress(ing *networking.Ingress) error { if n.cfg.DisableCatchAll && ing.Spec.DefaultBackend != nil { return fmt.Errorf("this deployment is trying to create a catch-all ingress while DisableCatchAll flag is set to true. Remove '.spec.defaultBackend' or set DisableCatchAll flag to false") } - startRender := time.Now().UnixNano() / 1000000 + //startRender := time.Now().UnixNano() / 1000000 cfg := n.store.GetBackendConfiguration() cfg.Resolver = n.resolver @@ -383,50 +382,14 @@ func (n *NGINXController) CheckIngress(ing *networking.Ingress) error { k8s.SetDefaultNGINXPathType(ing) - allIngresses := n.store.ListIngresses() - - filter := func(toCheck *ingress.Ingress) bool { - return toCheck.ObjectMeta.Namespace == ing.ObjectMeta.Namespace && - toCheck.ObjectMeta.Name == ing.ObjectMeta.Name - } - ings := store.FilterIngresses(allIngresses, filter) - parsed, err := annotations.NewAnnotationExtractor(n.store).Extract(ing) - if err != nil { - n.metricCollector.IncCheckErrorCount(ing.ObjectMeta.Namespace, ing.Name) - return err - } - ings = append(ings, &ingress.Ingress{ - Ingress: *ing, - ParsedAnnotations: parsed, - }) - startTest := time.Now().UnixNano() / 1000000 - _, servers, _ := n.getConfiguration(ings) - - err = checkOverlap(ing, servers) - if err != nil { - n.metricCollector.IncCheckErrorCount(ing.ObjectMeta.Namespace, ing.Name) - return err - } - testedSize := len(ings) - klog.Info("starting validation of ingress ", fmt.Sprintf("%v/%v", ing.Namespace, ing.Name)) - err = n.admissionBatcher.ValidateIngress(ing) + err := n.admissionBatcher.ValidateIngress(ing) if err != nil { n.metricCollector.IncCheckErrorCount(ing.ObjectMeta.Namespace, ing.Name) return err } n.metricCollector.IncCheckCount(ing.ObjectMeta.Namespace, ing.Name) - endCheck := time.Now().UnixNano() / 1000000 - n.metricCollector.SetAdmissionMetrics( - float64(testedSize), - float64(endCheck-startTest)/1000, - float64(len(ings)), - float64(startTest-startRender)/1000, - //can't calculate content properly because of batching - float64(0), - float64(endCheck-startCheck)/1000, - ) return nil } @@ -601,7 +564,7 @@ func (n *NGINXController) getDefaultUpstream() *ingress.Backend { func (n *NGINXController) getConfiguration(ingresses []*ingress.Ingress) (sets.Set[string], []*ingress.Server, *ingress.Configuration) { start := time.Now() upstreams, servers := n.getBackendServers(ingresses) - klog.V(3).Infof("Got backend servers in %f seconds for %d ingresses", time.Now().Sub(start).Seconds(), len(ingresses)) + klog.Infof("Got backend servers in %f seconds for %d ingresses", time.Now().Sub(start).Seconds(), len(ingresses)) var passUpstreams []*ingress.SSLPassthroughBackend @@ -652,7 +615,7 @@ func (n *NGINXController) getConfiguration(ingresses []*ingress.Ingress) (sets.S } } - klog.V(3).Infof("Collected info about passupstreams in %f seconds for %d ingresses", time.Now().Sub(start).Seconds(), len(ingresses)) + klog.Infof("Collected info about passupstreams in %f seconds for %d ingresses", time.Now().Sub(start).Seconds(), len(ingresses)) return hosts, servers, &ingress.Configuration{ Backends: upstreams, diff --git a/joom-build.sh b/joom-build.sh index 9d6583b444..a4f0391cec 100755 --- a/joom-build.sh +++ b/joom-build.sh @@ -5,7 +5,7 @@ export REGISTRY="jfrog.joom.it/docker-registry/joom-ingress-nginx" export BASE_TAG BASE_TAG=$(cat TAG) -export TAG="${BASE_TAG}-batching-patch" +export TAG="${BASE_TAG}-batching-patch-$(date -u +%d%m%y-%H%M%S)" export ARCH=amd64