Skip to content

Commit

Permalink
NETOBSERV-255: FLP multiple deployments (#78)
Browse files Browse the repository at this point in the history
* Refactoring to prepare kafka integration

* Added kafka config in the CR

* Moved kafka config in flowcollector spec

* Renamed singleFLPReconciler to singleDeploymentreconciler for clarity

* Added kafka enable flag instead of using pointer value

* Ingestor to ingester correction

* Kafka suffix changes

* Refactoring to split cluster role in two (ingestor and transformer)

* flp-ingestor and flp-single now use the same service
  • Loading branch information
OlivierCazade authored May 17, 2022
1 parent c054b6c commit 3189c39
Show file tree
Hide file tree
Showing 11 changed files with 531 additions and 160 deletions.
21 changes: 21 additions & 0 deletions api/v1alpha1/flowcollector_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ type FlowCollectorSpec struct {
// Loki contains settings related to the loki client
Loki FlowCollectorLoki `json:"loki,omitempty"`

// Kafka configurations, if empty the operator will deploy a all-in-one FLP
// +optional
Kafka FlowCollectorKafka `json:"kafka,omitempty"`

// ConsolePlugin contains settings related to the console dynamic plugin
ConsolePlugin FlowCollectorConsolePlugin `json:"consolePlugin,omitempty"`

Expand Down Expand Up @@ -162,6 +166,23 @@ type FlowCollectorEBPF struct {
Privileged bool `json:"privileged,omitempty"`
}

// FlowCollectorKafka defines the desired Kafka config of FlowCollector
type FlowCollectorKafka struct {
// Important: Run "make generate" to regenerate code after modifying this file

//+kubebuilder:default:=false
// Should this feature be enabled
Enable bool `json:"enable,omitempty"`

//+kubebuilder:default:=""
// Address of the kafka server
Address string `json:"address"`

//+kubebuilder:default:=""
// Kafka topic to use
Topic string `json:"topic"`
}

// FlowCollectorFLP defines the desired flowlogs-pipeline state of FlowCollector
type FlowCollectorFLP struct {
// Important: Run "make generate" to regenerate code after modifying this file
Expand Down
16 changes: 16 additions & 0 deletions api/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 20 additions & 0 deletions config/crd/bases/flows.netobserv.io_flowcollectors.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1451,6 +1451,26 @@ spec:
minimum: 0
type: integer
type: object
kafka:
description: Kafka configurations, if empty the operator will deploy
a all-in-one FLP
properties:
address:
default: ""
description: Address of the kafka server
type: string
enable:
default: false
description: Should this feature be enabled
type: boolean
topic:
default: ""
description: Kafka topic to use
type: string
required:
- address
- topic
type: object
loki:
description: Loki contains settings related to the loki client
properties:
Expand Down
2 changes: 1 addition & 1 deletion controllers/flowcollector_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func (r *FlowCollectorReconciler) Reconcile(ctx context.Context, req ctrl.Reques
desired.Spec.ClusterNetworkOperator.Namespace,
ovsFlowsConfigMapName,
r.lookupIP)
if err := ovsConfigController.Reconcile(ctx, desired); err != nil {
if err := ovsConfigController.Reconcile(ctx, desired, gfReconciler.GetServiceName(desired.Spec.Kafka)); err != nil {
return ctrl.Result{},
fmt.Errorf("failed to reconcile ovs-flows-config ConfigMap: %w", err)
}
Expand Down
76 changes: 76 additions & 0 deletions controllers/flowcollector_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,14 @@ func flowCollectorControllerSpecs() {
Name: constants.FLPName,
Namespace: otherNamespace,
}
gfKeyKafkaIngester := types.NamespacedName{
Name: constants.FLPName + flowlogspipeline.FlpConfSuffix[flowlogspipeline.ConfKafkaIngester],
Namespace: operatorNamespace,
}
gfKeyKafkaTransformer := types.NamespacedName{
Name: constants.FLPName + flowlogspipeline.FlpConfSuffix[flowlogspipeline.ConfKafkaTransformer],
Namespace: operatorNamespace,
}
cpKey1 := types.NamespacedName{
Name: "network-observability-plugin",
Namespace: operatorNamespace,
Expand Down Expand Up @@ -375,6 +383,74 @@ func flowCollectorControllerSpecs() {
})
})

Context("Changing kafka config", func() {
It("Should update kafka config successfully", func() {
Eventually(func() error {
fc := flowsv1alpha1.FlowCollector{}
if err := k8sClient.Get(ctx, crKey, &fc); err != nil {
return err
}
fc.Spec.Kafka = flowsv1alpha1.FlowCollectorKafka{Enable: true, Address: "loaclhost:9092", Topic: "FLP"}
return k8sClient.Update(ctx, &fc)
}).Should(Succeed())
})

It("Should deploy kafka ingester and transformer", func() {
By("Expecting ingester daemonset to be created")
Eventually(func() interface{} {
return k8sClient.Get(ctx, gfKeyKafkaIngester, &appsv1.DaemonSet{})
}, timeout, interval).Should(Succeed())

By("Expecting transformer deployment to be created")
Eventually(func() interface{} {
return k8sClient.Get(ctx, gfKeyKafkaTransformer, &appsv1.Deployment{})
}, timeout, interval).Should(Succeed())

By("Not Expecting transformer service to be created")
Eventually(func() interface{} {
return k8sClient.Get(ctx, gfKeyKafkaTransformer, &v1.Service{})
}, timeout, interval).Should(MatchError(`services "flowlogs-pipeline-transformer" not found`))
})

It("Should delete previous flp deployment", func() {
By("Expecting deployment to be deleted")
Eventually(func() interface{} {
return k8sClient.Get(ctx, gfKey1, &appsv1.DaemonSet{})
}, timeout, interval).Should(MatchError(`daemonsets.apps "flowlogs-pipeline" not found`))
})

It("Should remove kafka config successfully", func() {
Eventually(func() error {
fc := flowsv1alpha1.FlowCollector{}
if err := k8sClient.Get(ctx, crKey, &fc); err != nil {
return err
}
fc.Spec.Kafka.Enable = false
return k8sClient.Update(ctx, &fc)
}).Should(Succeed())
})

It("Should deploy single flp again", func() {
By("Expecting daemonset to be created")
Eventually(func() interface{} {
return k8sClient.Get(ctx, gfKey1, &appsv1.DaemonSet{})
}, timeout, interval).Should(Succeed())
})

It("Should delete kafka ingester and transformer", func() {
By("Expecting ingester daemonset to be deleted")
Eventually(func() interface{} {
return k8sClient.Get(ctx, gfKeyKafkaIngester, &appsv1.DaemonSet{})
}, timeout, interval).Should(MatchError(`daemonsets.apps "flowlogs-pipeline-ingester" not found`))

By("Expecting transformer deployment to be deleted")
Eventually(func() interface{} {
return k8sClient.Get(ctx, gfKeyKafkaTransformer, &appsv1.Deployment{})
}, timeout, interval).Should(MatchError(`deployments.apps "flowlogs-pipeline-transformer" not found`))
})

})

Context("Changing namespace", func() {
It("Should update namespace successfully", func() {
Eventually(func() error {
Expand Down
Loading

0 comments on commit 3189c39

Please sign in to comment.