Skip to content

Commit

Permalink
Merge branch 'main' into codecov-matrix
Browse files Browse the repository at this point in the history
  • Loading branch information
jotak authored Mar 16, 2023
2 parents 15527f7 + 725e26f commit 9fb94e9
Show file tree
Hide file tree
Showing 15 changed files with 1,053 additions and 167 deletions.
30 changes: 13 additions & 17 deletions controllers/consoleplugin/consoleplugin_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,18 +64,9 @@ func NewReconciler(cl reconcilers.ClientHelper, ns, prevNS, imageName string, av
return CPReconciler{ClientHelper: cl, nobjMngr: nobjMngr, owned: owned, image: imageName, availableAPIs: availableAPIs}
}

// InitStaticResources inits some "static" / one-shot resources, usually not subject to reconciliation
func (r *CPReconciler) InitStaticResources(ctx context.Context) error {
cr := buildClusterRole()
return r.ReconcileClusterRole(ctx, cr)
}

// PrepareNamespaceChange cleans up old namespace and restore the relevant "static" resources
func (r *CPReconciler) PrepareNamespaceChange(ctx context.Context) error {
// Switching namespace => delete everything in the previous namespace
// CleanupNamespace cleans up old namespace
func (r *CPReconciler) CleanupNamespace(ctx context.Context) {
r.nobjMngr.CleanupPreviousNamespace(ctx)
cr := buildClusterRole()
return r.ReconcileClusterRole(ctx, cr)
}

// Reconcile is the reconciler entry point to reconcile the current plugin state with the desired configuration
Expand Down Expand Up @@ -147,6 +138,11 @@ func (r *CPReconciler) reconcilePermissions(ctx context.Context, builder *builde
return r.CreateOwned(ctx, builder.serviceAccount())
} // update not needed for now

cr := buildClusterRole()
if err := r.ReconcileClusterRole(ctx, cr); err != nil {
return err
}

desired := builder.clusterRoleBinding()
if err := r.ReconcileClusterRoleBinding(ctx, desired); err != nil {
return err
Expand Down Expand Up @@ -227,18 +223,18 @@ func (r *CPReconciler) reconcileService(ctx context.Context, builder builder, de
if err := r.CreateOwned(ctx, newSVC); err != nil {
return err
}
if r.availableAPIs.HasSvcMonitor() {
serviceMonitor := builder.serviceMonitor()
if err := r.CreateOwned(ctx, serviceMonitor); err != nil {
return err
}
}
} else if serviceNeedsUpdate(r.owned.service, &desired.ConsolePlugin, &report) {
newSVC := builder.service(r.owned.service)
if err := r.UpdateOwned(ctx, r.owned.service, newSVC); err != nil {
return err
}
}
if r.availableAPIs.HasSvcMonitor() {
serviceMonitor := builder.serviceMonitor()
if err := reconcilers.GenericReconcile(ctx, r.nobjMngr, &r.ClientHelper, r.owned.serviceMonitor, serviceMonitor, &report, helper.ServiceMonitorChanged); err != nil {
return err
}
}
return nil
}

Expand Down
27 changes: 4 additions & 23 deletions controllers/flowcollector_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,31 +202,12 @@ func (r *FlowCollectorReconciler) handleNamespaceChanged(
cpReconciler *consoleplugin.CPReconciler,
) error {
log := log.FromContext(ctx)
if oldNS == "" {
// First install: create one-shot resources
log.Info("FlowCollector first install: creating initial resources")
err := flpReconciler.InitStaticResources(ctx)
if err != nil {
return err
}
if r.availableAPIs.HasConsolePlugin() {
err := cpReconciler.InitStaticResources(ctx)
if err != nil {
return err
}
}
} else {
if oldNS != "" {
// Namespace updated, clean up previous namespace
log.Info("FlowCollector namespace change detected: cleaning up previous namespace and preparing next one", "old namespace", oldNS, "new namepace", newNS)
err := flpReconciler.PrepareNamespaceChange(ctx)
if err != nil {
return err
}
log.Info("FlowCollector namespace change detected: cleaning up previous namespace", "old namespace", oldNS, "new namepace", newNS)
flpReconciler.CleanupNamespace(ctx)
if r.availableAPIs.HasConsolePlugin() {
err := cpReconciler.PrepareNamespaceChange(ctx)
if err != nil {
return err
}
cpReconciler.CleanupNamespace(ctx)
}
}

Expand Down
31 changes: 31 additions & 0 deletions controllers/flowcollector_controller_console_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
operatorsv1 "github.com/openshift/api/operator/v1"
monitoringv1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1"
appsv1 "k8s.io/api/apps/v1"
ascv2 "k8s.io/api/autoscaling/v2"
v1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -172,6 +173,36 @@ func flowCollectorConsolePluginSpecs() {
return svc.Spec.Ports[0].Port
}, timeout, interval).Should(Equal(int32(9099)))
})

It("Should create desired objects when they're not found (e.g. case of an operator upgrade)", func() {
sm := monitoringv1.ServiceMonitor{}

By("Expecting ServiceMonitor to exist")
Eventually(func() interface{} {
return k8sClient.Get(ctx, types.NamespacedName{
Name: "netobserv-plugin",
Namespace: cpNamespace,
}, &sm)
}, timeout, interval).Should(Succeed())

// Manually delete ServiceMonitor
By("Deleting ServiceMonitor")
Eventually(func() error {
return k8sClient.Delete(ctx, &sm)
}, timeout, interval).Should(Succeed())

// Do a dummy change that will trigger reconcile, and make sure SM is created again
UpdateCR(crKey, func(fc *flowslatest.FlowCollector) {
fc.Spec.Processor.LogLevel = "info"
})
By("Expecting ServiceMonitor to exist")
Eventually(func() interface{} {
return k8sClient.Get(ctx, types.NamespacedName{
Name: "netobserv-plugin",
Namespace: cpNamespace,
}, &sm)
}, timeout, interval).Should(Succeed())
})
})

Context("Configuring the Loki URL", func() {
Expand Down
68 changes: 67 additions & 1 deletion controllers/flowcollector_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
monitoringv1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1"
appsv1 "k8s.io/api/apps/v1"
ascv2 "k8s.io/api/autoscaling/v2"
v1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -339,6 +340,71 @@ func flowCollectorControllerSpecs() {
return ofc.Data["sampling"]
}, timeout, interval).Should(Equal("1"))
})

It("Should create desired objects when they're not found (e.g. case of an operator upgrade)", func() {
psvc := v1.Service{}
sm := monitoringv1.ServiceMonitor{}
pr := monitoringv1.PrometheusRule{}
By("Expecting prometheus service to exist")
Eventually(func() interface{} {
return k8sClient.Get(ctx, types.NamespacedName{
Name: "flowlogs-pipeline-prom",
Namespace: operatorNamespace,
}, &psvc)
}, timeout, interval).Should(Succeed())

By("Expecting ServiceMonitor to exist")
Eventually(func() interface{} {
return k8sClient.Get(ctx, types.NamespacedName{
Name: "flowlogs-pipeline-monitor",
Namespace: operatorNamespace,
}, &sm)
}, timeout, interval).Should(Succeed())

By("Expecting PrometheusRule to exist")
Eventually(func() interface{} {
return k8sClient.Get(ctx, types.NamespacedName{
Name: "flowlogs-pipeline-alert",
Namespace: operatorNamespace,
}, &pr)
}, timeout, interval).Should(Succeed())

// Manually delete ServiceMonitor
By("Deleting ServiceMonitor")
Eventually(func() error {
return k8sClient.Delete(ctx, &sm)
}, timeout, interval).Should(Succeed())

// Do a dummy change that will trigger reconcile, and make sure SM is created again
UpdateCR(crKey, func(fc *flowslatest.FlowCollector) {
fc.Spec.Processor.LogLevel = "info"
})
By("Expecting ServiceMonitor to exist")
Eventually(func() interface{} {
return k8sClient.Get(ctx, types.NamespacedName{
Name: "flowlogs-pipeline-monitor",
Namespace: operatorNamespace,
}, &sm)
}, timeout, interval).Should(Succeed())

// Manually delete Rule
By("Deleting prom rule")
Eventually(func() error {
return k8sClient.Delete(ctx, &pr)
}, timeout, interval).Should(Succeed())

// Do a dummy change that will trigger reconcile, and make sure Rule is created again
UpdateCR(crKey, func(fc *flowslatest.FlowCollector) {
fc.Spec.Processor.LogLevel = "debug"
})
By("Expecting PrometheusRule to exist")
Eventually(func() interface{} {
return k8sClient.Get(ctx, types.NamespacedName{
Name: "flowlogs-pipeline-alert",
Namespace: operatorNamespace,
}, &pr)
}, timeout, interval).Should(Succeed())
})
})

Context("With Kafka", func() {
Expand Down Expand Up @@ -739,7 +805,7 @@ func UpdateCR(key types.NamespacedName, updater func(*flowslatest.FlowCollector)
Eventually(func() error {
updater(cr)
return k8sClient.Update(ctx, cr)
}).Should(Succeed())
}, timeout, interval).Should(Succeed())
}

func checkDigestUpdate(oldDigest *string, annots map[string]string) error {
Expand Down
53 changes: 21 additions & 32 deletions controllers/flowlogspipeline/flp_ingest_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

flowslatest "github.com/netobserv/network-observability-operator/api/v1beta1"
"github.com/netobserv/network-observability-operator/controllers/constants"
"github.com/netobserv/network-observability-operator/controllers/reconcilers"
"github.com/netobserv/network-observability-operator/pkg/helper"
)

Expand Down Expand Up @@ -50,6 +51,8 @@ func newIngesterReconciler(info *reconcilersCommonInfo) *flpIngesterReconciler {
info.nobjMngr.AddManagedObject(configMapName(ConfKafkaIngester), owned.configMap)
if info.availableAPIs.HasSvcMonitor() {
info.nobjMngr.AddManagedObject(serviceMonitorName(ConfKafkaIngester), owned.serviceMonitor)
}
if info.availableAPIs.HasPromRule() {
info.nobjMngr.AddManagedObject(prometheusRuleName(ConfKafkaIngester), owned.prometheusRule)
}

Expand All @@ -64,18 +67,9 @@ func (r *flpIngesterReconciler) context(ctx context.Context) context.Context {
return log.IntoContext(ctx, l)
}

// initStaticResources inits some "static" / one-shot resources, usually not subject to reconciliation
func (r *flpIngesterReconciler) initStaticResources(ctx context.Context) error {
cr := buildClusterRoleIngester(r.useOpenShiftSCC)
return r.ReconcileClusterRole(ctx, cr)
}

// PrepareNamespaceChange cleans up old namespace and restore the relevant "static" resources
func (r *flpIngesterReconciler) prepareNamespaceChange(ctx context.Context) error {
// Switching namespace => delete everything in the previous namespace
// cleanupNamespace cleans up old namespace
func (r *flpIngesterReconciler) cleanupNamespace(ctx context.Context) {
r.nobjMngr.CleanupPreviousNamespace(ctx)
cr := buildClusterRoleIngester(r.useOpenShiftSCC)
return r.ReconcileClusterRole(ctx, cr)
}

func (r *flpIngesterReconciler) reconcile(ctx context.Context, desired *flowslatest.FlowCollector) error {
Expand Down Expand Up @@ -126,34 +120,24 @@ func (r *flpIngesterReconciler) reconcilePrometheusService(ctx context.Context,
if err := r.CreateOwned(ctx, builder.newPromService()); err != nil {
return err
}
if r.availableAPIs.HasSvcMonitor() {
if err := r.CreateOwned(ctx, builder.generic.serviceMonitor()); err != nil {
return err
}
if err := r.CreateOwned(ctx, builder.generic.prometheusRule()); err != nil {
} else {
newSVC := builder.fromPromService(r.owned.promService)
if helper.ServiceChanged(r.owned.promService, newSVC, &report) {
if err := r.UpdateOwned(ctx, r.owned.promService, newSVC); err != nil {
return err
}
}
return nil
}
newSVC := builder.fromPromService(r.owned.promService)
if helper.ServiceChanged(r.owned.promService, newSVC, &report) {
if err := r.UpdateOwned(ctx, r.owned.promService, newSVC); err != nil {
if r.availableAPIs.HasSvcMonitor() {
serviceMonitor := builder.generic.serviceMonitor()
if err := reconcilers.GenericReconcile(ctx, r.nobjMngr, &r.ClientHelper, r.owned.serviceMonitor, serviceMonitor, &report, helper.ServiceMonitorChanged); err != nil {
return err
}
}
if r.availableAPIs.HasSvcMonitor() {
newMonitorSvc := builder.generic.serviceMonitor()
if helper.ServiceMonitorChanged(r.owned.serviceMonitor, newMonitorSvc) {
if err := r.UpdateOwned(ctx, r.owned.serviceMonitor, newMonitorSvc); err != nil {
return err
}
}
newPromRules := builder.generic.prometheusRule()
if helper.PrometheusRuleChanged(r.owned.prometheusRule, newPromRules) {
if err := r.UpdateOwned(ctx, r.owned.prometheusRule, newPromRules); err != nil {
return err
}
if r.availableAPIs.HasPromRule() {
promRules := builder.generic.prometheusRule()
if err := reconcilers.GenericReconcile(ctx, r.nobjMngr, &r.ClientHelper, r.owned.prometheusRule, promRules, &report, helper.PrometheusRuleChanged); err != nil {
return err
}
}
return nil
Expand Down Expand Up @@ -183,6 +167,11 @@ func (r *flpIngesterReconciler) reconcilePermissions(ctx context.Context, builde
return r.CreateOwned(ctx, builder.serviceAccount())
} // We only configure name, update is not needed for now

cr := buildClusterRoleIngester(r.useOpenShiftSCC)
if err := r.ReconcileClusterRole(ctx, cr); err != nil {
return err
}

desired := builder.clusterRoleBinding()
if err := r.ClientHelper.ReconcileClusterRoleBinding(ctx, desired); err != nil {
return err
Expand Down
Loading

0 comments on commit 9fb94e9

Please sign in to comment.