Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NETOBSERV-255: FLP multiple deployments #78

Merged
merged 9 commits into from
May 17, 2022
21 changes: 21 additions & 0 deletions api/v1alpha1/flowcollector_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ type FlowCollectorSpec struct {
// Loki contains settings related to the loki client
Loki FlowCollectorLoki `json:"loki,omitempty"`

// Kafka configurations, if empty the operator will deploy a all-in-one FLP
// +optional
Kafka FlowCollectorKafka `json:"kafka,omitempty"`

// ConsolePlugin contains settings related to the console dynamic plugin
ConsolePlugin FlowCollectorConsolePlugin `json:"consolePlugin,omitempty"`

Expand Down Expand Up @@ -162,6 +166,23 @@ type FlowCollectorEBPF struct {
Privileged bool `json:"privileged,omitempty"`
}

// FlowCollectorKafka defines the desired Kafka config of FlowCollector
type FlowCollectorKafka struct {
// Important: Run "make generate" to regenerate code after modifying this file

//+kubebuilder:default:=false
// Should this feature be enabled
Enable bool `json:"enable,omitempty"`

//+kubebuilder:default:=""
// Address of the kafka server
Address string `json:"address"`

//+kubebuilder:default:=""
// Kafka topic to use
Topic string `json:"topic"`
}

// FlowCollectorFLP defines the desired flowlogs-pipeline state of FlowCollector
type FlowCollectorFLP struct {
// Important: Run "make generate" to regenerate code after modifying this file
Expand Down
16 changes: 16 additions & 0 deletions api/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 20 additions & 0 deletions config/crd/bases/flows.netobserv.io_flowcollectors.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1451,6 +1451,26 @@ spec:
minimum: 0
type: integer
type: object
kafka:
description: Kafka configurations, if empty the operator will deploy
a all-in-one FLP
properties:
address:
default: ""
description: Address of the kafka server
type: string
enable:
default: false
description: Should this feature be enabled
type: boolean
topic:
default: ""
description: Kafka topic to use
type: string
required:
- address
- topic
type: object
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the area of using FLP as ingress ... we will have multiple FLP instances consuming from the same topic. Need to make sure that we have some configuration on the distribution that Kafka is creating when sending data into those FLP's going forward this is going to be very important after we implement the stateful connection tracking capabilities. There might be multiple different configurations based on Load/ number of consumers etc ... those might need to be exported and configurable to users.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the kafka configuration block is meant to change, but I would prefer to change it again once we have the connection tracking so we can test the different configurations and chose what we need to expose.
Would you be fine with that approach?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it doesn't have to be in this PR, agree ... maybe just open a ticket somewhere to remember that we need to improve that later.

loki:
description: Loki contains settings related to the loki client
properties:
Expand Down
2 changes: 1 addition & 1 deletion controllers/flowcollector_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func (r *FlowCollectorReconciler) Reconcile(ctx context.Context, req ctrl.Reques
desired.Spec.ClusterNetworkOperator.Namespace,
ovsFlowsConfigMapName,
r.lookupIP)
if err := ovsConfigController.Reconcile(ctx, desired); err != nil {
if err := ovsConfigController.Reconcile(ctx, desired, gfReconciler.GetServiceName(desired.Spec.Kafka)); err != nil {
return ctrl.Result{},
fmt.Errorf("failed to reconcile ovs-flows-config ConfigMap: %w", err)
}
Expand Down
76 changes: 76 additions & 0 deletions controllers/flowcollector_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,14 @@ func flowCollectorControllerSpecs() {
Name: constants.FLPName,
Namespace: otherNamespace,
}
gfKeyKafkaIngester := types.NamespacedName{
Name: constants.FLPName + flowlogspipeline.FlpConfSuffix[flowlogspipeline.ConfKafkaIngester],
Namespace: operatorNamespace,
}
gfKeyKafkaTransformer := types.NamespacedName{
Name: constants.FLPName + flowlogspipeline.FlpConfSuffix[flowlogspipeline.ConfKafkaTransformer],
Namespace: operatorNamespace,
}
cpKey1 := types.NamespacedName{
Name: "network-observability-plugin",
Namespace: operatorNamespace,
Expand Down Expand Up @@ -375,6 +383,74 @@ func flowCollectorControllerSpecs() {
})
})

Context("Changing kafka config", func() {
It("Should update kafka config successfully", func() {
Eventually(func() error {
fc := flowsv1alpha1.FlowCollector{}
if err := k8sClient.Get(ctx, crKey, &fc); err != nil {
return err
}
fc.Spec.Kafka = flowsv1alpha1.FlowCollectorKafka{Enable: true, Address: "loaclhost:9092", Topic: "FLP"}
return k8sClient.Update(ctx, &fc)
}).Should(Succeed())
})

It("Should deploy kafka ingester and transformer", func() {
By("Expecting ingester daemonset to be created")
Eventually(func() interface{} {
return k8sClient.Get(ctx, gfKeyKafkaIngester, &appsv1.DaemonSet{})
}, timeout, interval).Should(Succeed())

By("Expecting transformer deployment to be created")
Eventually(func() interface{} {
return k8sClient.Get(ctx, gfKeyKafkaTransformer, &appsv1.Deployment{})
}, timeout, interval).Should(Succeed())

By("Not Expecting transformer service to be created")
Eventually(func() interface{} {
return k8sClient.Get(ctx, gfKeyKafkaTransformer, &v1.Service{})
}, timeout, interval).Should(MatchError(`services "flowlogs-pipeline-transformer" not found`))
})

It("Should delete previous flp deployment", func() {
By("Expecting deployment to be deleted")
Eventually(func() interface{} {
return k8sClient.Get(ctx, gfKey1, &appsv1.DaemonSet{})
}, timeout, interval).Should(MatchError(`daemonsets.apps "flowlogs-pipeline" not found`))
})

It("Should remove kafka config successfully", func() {
Eventually(func() error {
fc := flowsv1alpha1.FlowCollector{}
if err := k8sClient.Get(ctx, crKey, &fc); err != nil {
return err
}
fc.Spec.Kafka.Enable = false
return k8sClient.Update(ctx, &fc)
}).Should(Succeed())
})

It("Should deploy single flp again", func() {
By("Expecting daemonset to be created")
Eventually(func() interface{} {
return k8sClient.Get(ctx, gfKey1, &appsv1.DaemonSet{})
}, timeout, interval).Should(Succeed())
})

It("Should delete kafka ingester and transformer", func() {
By("Expecting ingester daemonset to be deleted")
Eventually(func() interface{} {
return k8sClient.Get(ctx, gfKeyKafkaIngester, &appsv1.DaemonSet{})
}, timeout, interval).Should(MatchError(`daemonsets.apps "flowlogs-pipeline-ingester" not found`))

By("Expecting transformer deployment to be deleted")
Eventually(func() interface{} {
return k8sClient.Get(ctx, gfKeyKafkaTransformer, &appsv1.Deployment{})
}, timeout, interval).Should(MatchError(`deployments.apps "flowlogs-pipeline-transformer" not found`))
})

})

Context("Changing namespace", func() {
It("Should update namespace successfully", func() {
Eventually(func() error {
Expand Down
Loading