Skip to content

Commit

Permalink
Feature/v1.9.4 batch admission patch v4 (#8)
Browse files Browse the repository at this point in the history
* add infinite loop fix
* fix batching
  • Loading branch information
rchshld authored Mar 5, 2024
1 parent 9667e94 commit 6a7bc0c
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 80 deletions.
113 changes: 76 additions & 37 deletions internal/ingress/controller/admission_batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ type Namespace string
type Name string

const (
batchDelay = 5 * time.Second
batchersCount = 3
batchDelay = 4 * time.Second
)

type AdmissionBatcher struct {
ingresses []*networking.Ingress
errorChannels []chan error
ingresses map[Namespace][]*networking.Ingress
isOnBatching map[Namespace]bool
errorChannels map[Namespace][]chan error
isWorking bool
isWorkingMU *sync.Mutex
workerMU *sync.Mutex
Expand All @@ -33,8 +33,9 @@ type AdmissionBatcher struct {

func NewAdmissionBatcher() AdmissionBatcher {
return AdmissionBatcher{
ingresses: nil,
errorChannels: nil,
ingresses: map[Namespace][]*networking.Ingress{},
isOnBatching: map[Namespace]bool{},
errorChannels: map[Namespace][]chan error{},
isWorking: true,
isWorkingMU: &sync.Mutex{},
workerMU: &sync.Mutex{},
Expand All @@ -44,41 +45,37 @@ func NewAdmissionBatcher() AdmissionBatcher {

func (n *NGINXController) StartAdmissionBatcher() {
n.admissionBatcher.setWork(true)
for i := 0; i < batchersCount; i++ {
go n.BatchConsumerRoutine(i)
}
go n.BatchConsumerRoutine()
}

func (n *NGINXController) StopAdmissionBatcher() {
n.admissionBatcher.setWork(false)
n.admissionBatcher.consumerWG.Wait()
}

func (n *NGINXController) BatchConsumerRoutine(i int) {
func (n *NGINXController) BatchConsumerRoutine() {
n.admissionBatcher.consumerWG.Add(1)
defer n.admissionBatcher.consumerWG.Done()
klog.Infof("Admission batcher routine %d started", i)

for n.admissionBatcher.isWork() {
n.admissionBatcher.workerMU.Lock()
if !n.admissionBatcher.hasNewIngresses() {
n.admissionBatcher.workerMU.Unlock()
ns, ok := n.admissionBatcher.hasNewIngresses()
if !ok {
time.Sleep(100 * time.Millisecond)
continue
}

klog.Info("Start waiting for batch, namespace: ", ns)
go n.BatchConsumerRoutine()
time.Sleep(batchDelay)
newIngresses, errorChannels := n.admissionBatcher.fetchNewBatch()
n.admissionBatcher.workerMU.Unlock()
newIngresses, errorChannels := n.admissionBatcher.fetchNewBatch(ns)

if len(newIngresses) > 0 {
err := n.validateNewIngresses(newIngresses)
for _, erCh := range errorChannels {
erCh <- err
}
}
return
}
klog.Infof("Admission batcher routine %d finished", i)
}

func groupByNamespacesAndNames(ingresses []*networking.Ingress) map[Namespace]map[Name]struct{} {
Expand Down Expand Up @@ -136,24 +133,47 @@ func (n *NGINXController) validateNewIngresses(newIngresses []*networking.Ingres

klog.Info("New ingresses with annotations appended for ", ingsListStr)

startTest := time.Now().UnixNano() / 1000000
start := time.Now()
_, _, newIngCfg := n.getConfiguration(ings)

_, servers, newIngCfg := n.getConfiguration(ings)
klog.Info("Configuration generated in ", time.Now().Sub(start).Seconds(), " seconds for ", ingsListStr)

Check failure on line 140 in internal/ingress/controller/admission_batcher.go

View workflow job for this annotation

GitHub Actions / lint

S1012: should use `time.Since` instead of `time.Now().Sub` (gosimple)

for _, newIngress := range newIngresses {
err := checkOverlap(newIngress, servers)
if err != nil {
return errors.Wrapf(err, "error while validating overlap for ingress %s/%s", newIngress.Namespace, newIngress.Name)
}
}

start = time.Now()
template, err := n.generateTemplate(cfg, *newIngCfg)
if err != nil {
return errors.Wrap(err, "error while validating batch of ingresses")
return errors.Wrapf(err, "error while generating template for ingresses %s", ingsListStr)
}
klog.Info("Generated nginx template in ", time.Now().Sub(start).Seconds(), " seconds for ", ingsListStr)

Check failure on line 154 in internal/ingress/controller/admission_batcher.go

View workflow job for this annotation

GitHub Actions / lint

S1012: should use `time.Since` instead of `time.Now().Sub` (gosimple)

start = time.Now()
err = n.testTemplate(template)
if err != nil {
return errors.Wrap(err, "error while validating batch of ingresses")
return errors.Wrapf(err, "error while testing template for of ingresses %s", ingsListStr)
}
klog.Info("Tested nginx template in ", time.Now().Sub(start).Seconds(), " seconds for ", ingsListStr)

Check failure on line 161 in internal/ingress/controller/admission_batcher.go

View workflow job for this annotation

GitHub Actions / lint

S1012: should use `time.Since` instead of `time.Now().Sub` (gosimple)

endCheck := time.Now().UnixNano() / 1000000

testedSize := len(ings)
confSize := len(template)
n.metricCollector.SetAdmissionMetrics(
float64(testedSize),
float64(endCheck-startTest)/1000,
float64(len(ings)),
//can't calculate content properly because of batching

Check failure on line 171 in internal/ingress/controller/admission_batcher.go

View workflow job for this annotation

GitHub Actions / lint

commentFormatting: put a space between `//` and comment text (gocritic)
0,
float64(confSize),
//can't calculate content properly because of batching

Check failure on line 174 in internal/ingress/controller/admission_batcher.go

View workflow job for this annotation

GitHub Actions / lint

commentFormatting: put a space between `//` and comment text (gocritic)
0,
)
return nil
}

Expand All @@ -171,46 +191,65 @@ func (ab *AdmissionBatcher) isWork() bool {
return ab.isWorking
}

func (ab *AdmissionBatcher) hasNewIngresses() bool {
func (ab *AdmissionBatcher) hasNewIngresses() (Namespace, bool) {
ab.isWorkingMU.Lock()
defer ab.isWorkingMU.Unlock()

return len(ab.ingresses) != 0
for ns, b := range ab.isOnBatching {
if !b {
ab.isOnBatching[ns] = true
return ns, true
}
}
return "", false
}

func (ab *AdmissionBatcher) fetchNewBatch() (ings []*networking.Ingress, errorChannels []chan error) {
func (ab *AdmissionBatcher) fetchNewBatch(namespace Namespace) (ings []*networking.Ingress, errorChannels []chan error) {
ab.isWorkingMU.Lock()
defer ab.isWorkingMU.Unlock()

if len(ab.ingresses) == 0 {
return nil, nil
}

ings = ab.ingresses
errorChannels = ab.errorChannels
ings = ab.ingresses[namespace]
errorChannels = ab.errorChannels[namespace]

var sb strings.Builder
sb.WriteString("Fetched new batch of ingresses: ")
sb.WriteString(fmt.Sprint("Fetched new batch of ingresses for ns: ", namespace))
for _, ing := range ings {
sb.WriteString(fmt.Sprintf("%s/%s ", ing.Namespace, ing.Name))
sb.WriteString(fmt.Sprintf(" %s", ing.Name))
}
klog.Info(sb.String())

ab.errorChannels = nil
ab.ingresses = nil
delete(ab.errorChannels, namespace)
delete(ab.ingresses, namespace)
delete(ab.isOnBatching, namespace)
return ings, errorChannels
}

func (ab *AdmissionBatcher) ValidateIngress(ing *networking.Ingress) error {
errCh := ab.addIngressToHandle(ing)
klog.Info(
"Ingress ",
fmt.Sprintf("%v/%v", ing.Namespace, ing.Name),
" submitted for batch validation, waiting for verdict...",
)
return <-errCh
}

func (ab *AdmissionBatcher) addIngressToHandle(ing *networking.Ingress) chan error {
ab.isWorkingMU.Lock()
defer ab.isWorkingMU.Unlock()

ab.ingresses = append(ab.ingresses, ing)
ns := Namespace(ing.Namespace)

errCh := make(chan error)
ab.errorChannels = append(ab.errorChannels, errCh)

ab.isWorkingMU.Unlock()
if _, ok := ab.isOnBatching[ns]; !ok {
ab.isOnBatching[ns] = false
}
ab.ingresses[ns] = append(ab.ingresses[ns], ing)

klog.Info("Ingress ", fmt.Sprintf("%v/%v", ing.Namespace, ing.Name), " submitted for batch validation, waiting for verdict...")
return <-errCh
errCh := make(chan error)
ab.errorChannels[ns] = append(ab.errorChannels[ns], errCh)
return errCh
}
47 changes: 5 additions & 42 deletions internal/ingress/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ import (
"k8s.io/ingress-nginx/internal/ingress/annotations/proxy"
ngx_config "k8s.io/ingress-nginx/internal/ingress/controller/config"
"k8s.io/ingress-nginx/internal/ingress/controller/ingressclass"
"k8s.io/ingress-nginx/internal/ingress/controller/store"
"k8s.io/ingress-nginx/internal/ingress/errors"
"k8s.io/ingress-nginx/internal/ingress/inspector"
"k8s.io/ingress-nginx/internal/ingress/metric/collectors"
Expand Down Expand Up @@ -308,7 +307,7 @@ func (n *NGINXController) CheckWarning(ing *networking.Ingress) ([]string, error
// CheckIngress returns an error in case the provided ingress, when added
// to the current configuration, generates an invalid configuration
func (n *NGINXController) CheckIngress(ing *networking.Ingress) error {
startCheck := time.Now().UnixNano() / 1000000
//startCheck := time.Now().UnixNano() / 1000000

Check failure on line 310 in internal/ingress/controller/controller.go

View workflow job for this annotation

GitHub Actions / lint

commentFormatting: put a space between `//` and comment text (gocritic)

if ing == nil {
// no ingress to add, no state change
Expand Down Expand Up @@ -340,7 +339,7 @@ func (n *NGINXController) CheckIngress(ing *networking.Ingress) error {
if n.cfg.DisableCatchAll && ing.Spec.DefaultBackend != nil {
return fmt.Errorf("this deployment is trying to create a catch-all ingress while DisableCatchAll flag is set to true. Remove '.spec.defaultBackend' or set DisableCatchAll flag to false")
}
startRender := time.Now().UnixNano() / 1000000
//startRender := time.Now().UnixNano() / 1000000
cfg := n.store.GetBackendConfiguration()
cfg.Resolver = n.resolver

Expand Down Expand Up @@ -383,50 +382,14 @@ func (n *NGINXController) CheckIngress(ing *networking.Ingress) error {

k8s.SetDefaultNGINXPathType(ing)

allIngresses := n.store.ListIngresses()

filter := func(toCheck *ingress.Ingress) bool {
return toCheck.ObjectMeta.Namespace == ing.ObjectMeta.Namespace &&
toCheck.ObjectMeta.Name == ing.ObjectMeta.Name
}
ings := store.FilterIngresses(allIngresses, filter)
parsed, err := annotations.NewAnnotationExtractor(n.store).Extract(ing)
if err != nil {
n.metricCollector.IncCheckErrorCount(ing.ObjectMeta.Namespace, ing.Name)
return err
}
ings = append(ings, &ingress.Ingress{
Ingress: *ing,
ParsedAnnotations: parsed,
})
startTest := time.Now().UnixNano() / 1000000
_, servers, _ := n.getConfiguration(ings)

err = checkOverlap(ing, servers)
if err != nil {
n.metricCollector.IncCheckErrorCount(ing.ObjectMeta.Namespace, ing.Name)
return err
}
testedSize := len(ings)

klog.Info("starting validation of ingress ", fmt.Sprintf("%v/%v", ing.Namespace, ing.Name))
err = n.admissionBatcher.ValidateIngress(ing)
err := n.admissionBatcher.ValidateIngress(ing)
if err != nil {
n.metricCollector.IncCheckErrorCount(ing.ObjectMeta.Namespace, ing.Name)
return err
}

n.metricCollector.IncCheckCount(ing.ObjectMeta.Namespace, ing.Name)
endCheck := time.Now().UnixNano() / 1000000
n.metricCollector.SetAdmissionMetrics(
float64(testedSize),
float64(endCheck-startTest)/1000,
float64(len(ings)),
float64(startTest-startRender)/1000,
//can't calculate content properly because of batching
float64(0),
float64(endCheck-startCheck)/1000,
)
return nil
}

Expand Down Expand Up @@ -601,7 +564,7 @@ func (n *NGINXController) getDefaultUpstream() *ingress.Backend {
func (n *NGINXController) getConfiguration(ingresses []*ingress.Ingress) (sets.Set[string], []*ingress.Server, *ingress.Configuration) {
start := time.Now()
upstreams, servers := n.getBackendServers(ingresses)
klog.V(3).Infof("Got backend servers in %f seconds for %d ingresses", time.Now().Sub(start).Seconds(), len(ingresses))
klog.Infof("Got backend servers in %f seconds for %d ingresses", time.Now().Sub(start).Seconds(), len(ingresses))

var passUpstreams []*ingress.SSLPassthroughBackend

Expand Down Expand Up @@ -652,7 +615,7 @@ func (n *NGINXController) getConfiguration(ingresses []*ingress.Ingress) (sets.S
}
}

klog.V(3).Infof("Collected info about passupstreams in %f seconds for %d ingresses", time.Now().Sub(start).Seconds(), len(ingresses))
klog.Infof("Collected info about passupstreams in %f seconds for %d ingresses", time.Now().Sub(start).Seconds(), len(ingresses))

return hosts, servers, &ingress.Configuration{
Backends: upstreams,
Expand Down
2 changes: 1 addition & 1 deletion joom-build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ export REGISTRY="jfrog.joom.it/docker-registry/joom-ingress-nginx"

export BASE_TAG
BASE_TAG=$(cat TAG)
export TAG="${BASE_TAG}-batching-patch"
export TAG="${BASE_TAG}-batching-patch-$(date -u +%d%m%y-%H%M%S)"

export ARCH=amd64

Expand Down

0 comments on commit 6a7bc0c

Please sign in to comment.