Skip to content

Commit

Permalink
add new batch logic (#6)
Browse files Browse the repository at this point in the history
  • Loading branch information
rchshld authored Dec 29, 2023
1 parent 9c8f497 commit 7491d5b
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 70 deletions.
113 changes: 52 additions & 61 deletions internal/ingress/controller/admission_batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,81 +18,66 @@ type Namespace string
type Name string

const (
// time of batch collecting
admissionDelaySeconds = 3
admissionDelay = admissionDelaySeconds * time.Second

// amount of concurrent batch consumers
batchConsumerCount = 30 / admissionDelaySeconds
batchDelay = 5 * time.Second
batchersCount = 3
)

type AdmissionBatcher struct {
ingresses []*networking.Ingress
errorChannels []chan error

// flag for consumer goroutine indicating whether it should keep processing or not
isWorking bool

// mutex protecting queues access
mu *sync.Mutex

// wait group to monitor consumer goroutine lifetime
consumerWG sync.WaitGroup

// when was last not empty batch consumed by some worker for validation
lastBatchConsumedTime time.Time
isWorking bool
isWorkingMU *sync.Mutex
workerMU *sync.Mutex
consumerWG sync.WaitGroup
}

func NewAdmissionBatcher() AdmissionBatcher {
return AdmissionBatcher{
ingresses: nil,
errorChannels: nil,
isWorking: true,
mu: &sync.Mutex{},
isWorkingMU: &sync.Mutex{},
workerMU: &sync.Mutex{},
consumerWG: sync.WaitGroup{},
}
}

func (n *NGINXController) StartAdmissionBatcher() {
for i := 0; i < batchConsumerCount; i++ {
go n.AdmissionBatcherConsumerRoutine()
n.admissionBatcher.setWork(true)
for i := 0; i < batchersCount; i++ {
go n.BatchConsumerRoutine(i)
}
}

func (n *NGINXController) StopAdmissionBatcher() {
n.admissionBatcher.mu.Lock()
defer n.admissionBatcher.mu.Unlock()

n.admissionBatcher.isWorking = false
n.admissionBatcher.setWork(false)
n.admissionBatcher.consumerWG.Wait()
}

// AdmissionBatcherConsumerRoutine is started during ingress-controller startup phase
// And it should stop during ingress-controller's graceful shutdown
func (n *NGINXController) AdmissionBatcherConsumerRoutine() {
func (n *NGINXController) BatchConsumerRoutine(i int) {
n.admissionBatcher.consumerWG.Add(1)
defer n.admissionBatcher.consumerWG.Done()
klog.Infof("Admission batcher routine %d started", i)

klog.Info("Admission batcher routine started")

// prevent races on isWorking field
n.admissionBatcher.mu.Lock()
for n.admissionBatcher.isWorking {
timeSinceLastBatchPull := time.Now().Sub(n.admissionBatcher.lastBatchConsumedTime)
n.admissionBatcher.mu.Unlock()
for n.admissionBatcher.isWork() {
n.admissionBatcher.workerMU.Lock()
if !n.admissionBatcher.hasNewIngresses() {
n.admissionBatcher.workerMU.Unlock()
continue
}

time.Sleep(max(time.Duration(0), admissionDelay-timeSinceLastBatchPull))
time.Sleep(batchDelay)
newIngresses, errorChannels := n.admissionBatcher.fetchNewBatch()
if len(newIngresses) != 0 {
n.admissionBatcher.workerMU.Unlock()

if len(newIngresses) > 0 {
err := n.validateNewIngresses(newIngresses)
for _, erCh := range errorChannels {
erCh <- err
}
}

n.admissionBatcher.mu.Lock()
}

klog.Info("Admission batcher routine finished")
klog.Infof("Admission batcher routine %d finished", i)
}

func groupByNamespacesAndNames(ingresses []*networking.Ingress) map[Namespace]map[Name]struct{} {
Expand Down Expand Up @@ -147,36 +132,54 @@ func (n *NGINXController) validateNewIngresses(newIngresses []*networking.Ingres
ParsedAnnotations: ann,
})
}
//debug

klog.Info("New ingresses with annotations appended for ", ingsListStr)

start := time.Now()
_, _, newIngCfg := n.getConfiguration(ings)
//debug
klog.Info("Configuration generated in ", time.Now().Sub(start).Seconds(), " seconds for ", ingsListStr)

start = time.Now()
template, err := n.generateTemplate(cfg, *newIngCfg)
if err != nil {
return errors.Wrap(err, "error while validating batch of ingresses")
}
//debug
klog.Info("Generated nginx template in ", time.Now().Sub(start).Seconds(), " seconds for ", ingsListStr)

start = time.Now()
err = n.testTemplate(template)
if err != nil {
return errors.Wrap(err, "error while validating batch of ingresses")
}
//debug
klog.Info("Tested nginx template in ", time.Now().Sub(start).Seconds(), " seconds for ", ingsListStr)

return nil
}

func (ab *AdmissionBatcher) setWork(status bool) {
ab.isWorkingMU.Lock()
defer ab.isWorkingMU.Unlock()

ab.isWorking = status
}

func (ab *AdmissionBatcher) isWork() bool {
ab.isWorkingMU.Lock()
defer ab.isWorkingMU.Unlock()

return ab.isWorking
}

func (ab *AdmissionBatcher) hasNewIngresses() bool {
ab.isWorkingMU.Lock()
defer ab.isWorkingMU.Unlock()

return len(ab.ingresses) != 0
}

func (ab *AdmissionBatcher) fetchNewBatch() (ings []*networking.Ingress, errorChannels []chan error) {
ab.mu.Lock()
defer ab.mu.Unlock()
ab.isWorkingMU.Lock()
defer ab.isWorkingMU.Unlock()

if len(ab.ingresses) == 0 {
return nil, nil
Expand All @@ -185,7 +188,6 @@ func (ab *AdmissionBatcher) fetchNewBatch() (ings []*networking.Ingress, errorCh
ings = ab.ingresses
errorChannels = ab.errorChannels

// debug
var sb strings.Builder
sb.WriteString("Fetched new batch of ingresses: ")
for _, ing := range ings {
Expand All @@ -195,30 +197,19 @@ func (ab *AdmissionBatcher) fetchNewBatch() (ings []*networking.Ingress, errorCh

ab.errorChannels = nil
ab.ingresses = nil

ab.lastBatchConsumedTime = time.Now()

return ings, errorChannels
}

func (ab *AdmissionBatcher) ValidateIngress(ing *networking.Ingress) error {
ab.mu.Lock()
ab.isWorkingMU.Lock()

ab.ingresses = append(ab.ingresses, ing)

errCh := make(chan error)
ab.errorChannels = append(ab.errorChannels, errCh)

ab.mu.Unlock()
ab.isWorkingMU.Unlock()

// debug
klog.Info("Ingress ", fmt.Sprintf("%v/%v", ing.Namespace, ing.Name), " submitted for batch validation, waiting for verdict...")
return <-errCh
}

func max(d1, d2 time.Duration) time.Duration {
if d1 < d2 {
return d2
}
return d1
}
13 changes: 4 additions & 9 deletions internal/ingress/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -599,15 +599,9 @@ func (n *NGINXController) getDefaultUpstream() *ingress.Backend {

// getConfiguration returns the configuration matching the standard kubernetes ingress
func (n *NGINXController) getConfiguration(ingresses []*ingress.Ingress) (sets.Set[string], []*ingress.Server, *ingress.Configuration) {
var ingsListSB strings.Builder
for _, ing := range ingresses {
ingsListSB.WriteString(fmt.Sprintf("%v/%v ", ing.Namespace, ing.Name))
}
ingsListStr := ingsListSB.String()

start := time.Now()
upstreams, servers := n.getBackendServers(ingresses)
klog.Info("Got backend servers in ", time.Now().Sub(start).Seconds(), " seconds for ", ingsListStr)
klog.V(3).Infof("Got backend servers in %f seconds for %d ingresses", time.Now().Sub(start).Seconds(), len(ingresses))

var passUpstreams []*ingress.SSLPassthroughBackend

Expand Down Expand Up @@ -658,7 +652,7 @@ func (n *NGINXController) getConfiguration(ingresses []*ingress.Ingress) (sets.S
}
}

klog.Info("Collected info about passupstreams in ", time.Now().Sub(start).Seconds(), " seconds for ", ingsListStr)
klog.V(3).Infof("Collected info about passupstreams in %f seconds for %d ingresses", time.Now().Sub(start).Seconds(), len(ingresses))

return hosts, servers, &ingress.Configuration{
Backends: upstreams,
Expand Down Expand Up @@ -1216,7 +1210,8 @@ func (n *NGINXController) serviceEndpoints(svcKey, backendPort string) ([]ingres
servicePort.Name == backendPort {
endps := getEndpointsFromSlices(svc, &servicePort, apiv1.ProtocolTCP, zone, n.store.GetServiceEndpointsSlices)
if len(endps) == 0 {
klog.Warningf("Service %q does not have any active Endpoint.", svcKey)
//move to verbose info, cause of noisy false positive due to disabled dev stands
klog.V(3).Infof("Service %q does not have any active Endpoint.", svcKey)
}

upstreams = append(upstreams, endps...)
Expand Down

0 comments on commit 7491d5b

Please sign in to comment.