diff --git a/internal/ingress/controller/admission_batcher.go b/internal/ingress/controller/admission_batcher.go index bd30b76790..c676673010 100644 --- a/internal/ingress/controller/admission_batcher.go +++ b/internal/ingress/controller/admission_batcher.go @@ -18,29 +18,17 @@ type Namespace string type Name string const ( - // time of batch collecting - admissionDelaySeconds = 3 - admissionDelay = admissionDelaySeconds * time.Second - - // amount of concurrent batch consumers - batchConsumerCount = 30 / admissionDelaySeconds + batchDelay = 5 * time.Second + batchersCount = 3 ) type AdmissionBatcher struct { ingresses []*networking.Ingress errorChannels []chan error - - // flag for consumer goroutine indicating whether it should keep processing or not - isWorking bool - - // mutex protecting queues access - mu *sync.Mutex - - // wait group to monitor consumer goroutine lifetime - consumerWG sync.WaitGroup - - // when was last not empty batch consumed by some worker for validation - lastBatchConsumedTime time.Time + isWorking bool + isWorkingMU *sync.Mutex + workerMU *sync.Mutex + consumerWG sync.WaitGroup } func NewAdmissionBatcher() AdmissionBatcher { @@ -48,51 +36,48 @@ func NewAdmissionBatcher() AdmissionBatcher { ingresses: nil, errorChannels: nil, isWorking: true, - mu: &sync.Mutex{}, + isWorkingMU: &sync.Mutex{}, + workerMU: &sync.Mutex{}, consumerWG: sync.WaitGroup{}, } } func (n *NGINXController) StartAdmissionBatcher() { - for i := 0; i < batchConsumerCount; i++ { - go n.AdmissionBatcherConsumerRoutine() + n.admissionBatcher.setWork(true) + for i := 0; i < batchersCount; i++ { + go n.BatchConsumerRoutine(i) } } func (n *NGINXController) StopAdmissionBatcher() { - n.admissionBatcher.mu.Lock() - defer n.admissionBatcher.mu.Unlock() - - n.admissionBatcher.isWorking = false + n.admissionBatcher.setWork(false) + n.admissionBatcher.consumerWG.Wait() } -// AdmissionBatcherConsumerRoutine is started during ingress-controller startup phase -// And it should stop during ingress-controller's graceful shutdown -func (n *NGINXController) AdmissionBatcherConsumerRoutine() { +func (n *NGINXController) BatchConsumerRoutine(i int) { n.admissionBatcher.consumerWG.Add(1) defer n.admissionBatcher.consumerWG.Done() + klog.Infof("Admission batcher routine %d started", i) - klog.Info("Admission batcher routine started") - - // prevent races on isWorking field - n.admissionBatcher.mu.Lock() - for n.admissionBatcher.isWorking { - timeSinceLastBatchPull := time.Now().Sub(n.admissionBatcher.lastBatchConsumedTime) - n.admissionBatcher.mu.Unlock() + for n.admissionBatcher.isWork() { + n.admissionBatcher.workerMU.Lock() + if !n.admissionBatcher.hasNewIngresses() { + n.admissionBatcher.workerMU.Unlock() + continue + } - time.Sleep(max(time.Duration(0), admissionDelay-timeSinceLastBatchPull)) + time.Sleep(batchDelay) newIngresses, errorChannels := n.admissionBatcher.fetchNewBatch() - if len(newIngresses) != 0 { + n.admissionBatcher.workerMU.Unlock() + + if len(newIngresses) > 0 { err := n.validateNewIngresses(newIngresses) for _, erCh := range errorChannels { erCh <- err } } - - n.admissionBatcher.mu.Lock() } - - klog.Info("Admission batcher routine finished") + klog.Infof("Admission batcher routine %d finished", i) } func groupByNamespacesAndNames(ingresses []*networking.Ingress) map[Namespace]map[Name]struct{} { @@ -147,12 +132,11 @@ func (n *NGINXController) validateNewIngresses(newIngresses []*networking.Ingres ParsedAnnotations: ann, }) } - //debug + klog.Info("New ingresses with annotations appended for ", ingsListStr) start := time.Now() _, _, newIngCfg := n.getConfiguration(ings) - //debug klog.Info("Configuration generated in ", time.Now().Sub(start).Seconds(), " seconds for ", ingsListStr) start = time.Now() @@ -160,7 +144,6 @@ func (n *NGINXController) validateNewIngresses(newIngresses []*networking.Ingres if err != nil { return errors.Wrap(err, "error while validating batch of ingresses") } - //debug klog.Info("Generated nginx template in ", time.Now().Sub(start).Seconds(), " seconds for ", ingsListStr) start = time.Now() @@ -168,15 +151,35 @@ func (n *NGINXController) validateNewIngresses(newIngresses []*networking.Ingres if err != nil { return errors.Wrap(err, "error while validating batch of ingresses") } - //debug klog.Info("Tested nginx template in ", time.Now().Sub(start).Seconds(), " seconds for ", ingsListStr) return nil } +func (ab *AdmissionBatcher) setWork(status bool) { + ab.isWorkingMU.Lock() + defer ab.isWorkingMU.Unlock() + + ab.isWorking = status +} + +func (ab *AdmissionBatcher) isWork() bool { + ab.isWorkingMU.Lock() + defer ab.isWorkingMU.Unlock() + + return ab.isWorking +} + +func (ab *AdmissionBatcher) hasNewIngresses() bool { + ab.isWorkingMU.Lock() + defer ab.isWorkingMU.Unlock() + + return len(ab.ingresses) != 0 +} + func (ab *AdmissionBatcher) fetchNewBatch() (ings []*networking.Ingress, errorChannels []chan error) { - ab.mu.Lock() - defer ab.mu.Unlock() + ab.isWorkingMU.Lock() + defer ab.isWorkingMU.Unlock() if len(ab.ingresses) == 0 { return nil, nil @@ -185,7 +188,6 @@ func (ab *AdmissionBatcher) fetchNewBatch() (ings []*networking.Ingress, errorCh ings = ab.ingresses errorChannels = ab.errorChannels - // debug var sb strings.Builder sb.WriteString("Fetched new batch of ingresses: ") for _, ing := range ings { @@ -195,30 +197,19 @@ func (ab *AdmissionBatcher) fetchNewBatch() (ings []*networking.Ingress, errorCh ab.errorChannels = nil ab.ingresses = nil - - ab.lastBatchConsumedTime = time.Now() - return ings, errorChannels } func (ab *AdmissionBatcher) ValidateIngress(ing *networking.Ingress) error { - ab.mu.Lock() + ab.isWorkingMU.Lock() ab.ingresses = append(ab.ingresses, ing) errCh := make(chan error) ab.errorChannels = append(ab.errorChannels, errCh) - ab.mu.Unlock() + ab.isWorkingMU.Unlock() - // debug klog.Info("Ingress ", fmt.Sprintf("%v/%v", ing.Namespace, ing.Name), " submitted for batch validation, waiting for verdict...") return <-errCh } - -func max(d1, d2 time.Duration) time.Duration { - if d1 < d2 { - return d2 - } - return d1 -} diff --git a/internal/ingress/controller/controller.go b/internal/ingress/controller/controller.go index 3fcc0135b9..c56c266263 100644 --- a/internal/ingress/controller/controller.go +++ b/internal/ingress/controller/controller.go @@ -599,15 +599,9 @@ func (n *NGINXController) getDefaultUpstream() *ingress.Backend { // getConfiguration returns the configuration matching the standard kubernetes ingress func (n *NGINXController) getConfiguration(ingresses []*ingress.Ingress) (sets.Set[string], []*ingress.Server, *ingress.Configuration) { - var ingsListSB strings.Builder - for _, ing := range ingresses { - ingsListSB.WriteString(fmt.Sprintf("%v/%v ", ing.Namespace, ing.Name)) - } - ingsListStr := ingsListSB.String() - start := time.Now() upstreams, servers := n.getBackendServers(ingresses) - klog.Info("Got backend servers in ", time.Now().Sub(start).Seconds(), " seconds for ", ingsListStr) + klog.V(3).Infof("Got backend servers in %f seconds for %d ingresses", time.Now().Sub(start).Seconds(), len(ingresses)) var passUpstreams []*ingress.SSLPassthroughBackend @@ -658,7 +652,7 @@ func (n *NGINXController) getConfiguration(ingresses []*ingress.Ingress) (sets.S } } - klog.Info("Collected info about passupstreams in ", time.Now().Sub(start).Seconds(), " seconds for ", ingsListStr) + klog.V(3).Infof("Collected info about passupstreams in %f seconds for %d ingresses", time.Now().Sub(start).Seconds(), len(ingresses)) return hosts, servers, &ingress.Configuration{ Backends: upstreams, @@ -1216,7 +1210,8 @@ func (n *NGINXController) serviceEndpoints(svcKey, backendPort string) ([]ingres servicePort.Name == backendPort { endps := getEndpointsFromSlices(svc, &servicePort, apiv1.ProtocolTCP, zone, n.store.GetServiceEndpointsSlices) if len(endps) == 0 { - klog.Warningf("Service %q does not have any active Endpoint.", svcKey) + //move to verbose info, cause of noisy false positive due to disabled dev stands + klog.V(3).Infof("Service %q does not have any active Endpoint.", svcKey) } upstreams = append(upstreams, endps...)