Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NETOBSERV-256: Kafka config #107

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion .mk/development.mk
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,21 @@ undeploy-loki: ## Undeploy loki.
curl -S -L https://raw.githubusercontent.com/netobserv/documents/main/examples/zero-click-loki/1-storage.yaml | kubectl --ignore-not-found=true delete -f - || true
-pkill --oldest --full "3100:3100"

.PHONY: deploy-kafka
deploy-kafka:
@echo -e "\n==> Deploy kafka"
kubectl create namespace $(NAMESPACE) --dry-run=client -o yaml | kubectl apply -f -
kubectl create -f "https://strimzi.io/install/latest?namespace="$(NAMESPACE) -n $(NAMESPACE)
kubectl create -f "https://raw.githubusercontent.com/netobserv/documents/main/examples/kafka-cluster.yaml" -n $(NAMESPACE)
kubectl create -f "https://raw.githubusercontent.com/netobserv/documents/main/examples/kafka-topic.yaml" -n $(NAMESPACE)
Comment on lines +29 to +31
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we mix both strimzi.io / netobserv/document sources, we should fix the version here instead of using latest

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Strimzi only provide this endpoint for its latest version.

The other option is to track this file in one of our repository, but this file has to be changed depending of the targeted namespace.

Since we only use this for now as quick setup for development and since this file is quite big (13k lines) would you be fine to keep using latest for know? If at one point we experience issues it will still be possible to freeze the version there.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well if there is no easy solution let's keep this as is. Thanks


.PHONY: undeploy-kafka
undeploy-kafka: ## Undeploy loki.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

change comment from Undeploy loki to undeploy-kafka

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, thanks!

@echo -e "\n==> Undeploy kafka"
kubectl delete -f "https://raw.githubusercontent.com/netobserv/documents/main/examples/kafka-topic.yaml" -n $(NAMESPACE)
kubectl delete -f "https://raw.githubusercontent.com/netobserv/documents/main/examples/kafka-cluster.yaml" -n $(NAMESPACE)
kubectl delete -f "https://strimzi.io/install/latest?namespace="$(NAMESPACE) -n $(NAMESPACE)

.PHONY: deploy-grafana
deploy-grafana: ## Deploy grafana.
@echo -e "\n==> Deploy grafana"
Expand All @@ -48,4 +63,3 @@ deploy-all: manifests generate fmt lint deploy-loki deploy-grafana install deplo

.PHONY: undeploy-all
undeploy-all: undeploy-loki undeploy-grafana uninstall undeploy-sample-cr

4 changes: 4 additions & 0 deletions config/samples/flows_v1alpha1_flowcollector.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ spec:
enableKubeProbes: true
healthPort: 8080
prometheusPort: 9090
kafka:
enable: false
address: "kafka-cluster-kafka-bootstrap"
topic: "flp-topic"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as this CR will define the default values in OLM, I'd use a topic name that better describes what it is for. Maybe "netflows" ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, network-flows would be better

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, thanks!

loki:
url: 'http://loki:3100/'
batchWait: 1s
Expand Down
187 changes: 132 additions & 55 deletions controllers/flowlogspipeline/flp_objects.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,13 @@ type builder struct {
portProtocol corev1.Protocol
desired *flowsv1alpha1.FlowCollectorFLP
desiredLoki *flowsv1alpha1.FlowCollectorLoki
desiredKafka *flowsv1alpha1.FlowCollectorKafka
confKind string
confKindSuffix string
useOpenShiftSCC bool
}

func newBuilder(ns string, portProtocol corev1.Protocol, desired *flowsv1alpha1.FlowCollectorFLP, desiredLoki *flowsv1alpha1.FlowCollectorLoki, confKind string, useOpenShiftSCC bool) builder {
func newBuilder(ns string, portProtocol corev1.Protocol, desired *flowsv1alpha1.FlowCollectorFLP, desiredLoki *flowsv1alpha1.FlowCollectorLoki, desiredKafka *flowsv1alpha1.FlowCollectorKafka, confKind string, useOpenShiftSCC bool) builder {
version := helper.ExtractVersion(desired.Image)
return builder{
namespace: ns,
Expand All @@ -72,7 +74,9 @@ func newBuilder(ns string, portProtocol corev1.Protocol, desired *flowsv1alpha1.
},
desired: desired,
desiredLoki: desiredLoki,
desiredKafka: desiredKafka,
portProtocol: portProtocol,
confKind: confKind,
confKindSuffix: FlpConfSuffix[confKind],
useOpenShiftSCC: useOpenShiftSCC,
}
Expand Down Expand Up @@ -114,7 +118,7 @@ func (b *builder) daemonSet(configDigest string) *appsv1.DaemonSet {
func (b *builder) podTemplate(hostNetwork bool, configDigest string) corev1.PodTemplateSpec {
var ports []corev1.ContainerPort
var tolerations []corev1.Toleration
if b.desired.Kind == constants.DaemonSetKind {
if b.desired.Kind == constants.DaemonSetKind && b.confKind != ConfKafkaTransformer {
ports = []corev1.ContainerPort{{
Name: constants.FLPPortName + b.confKindSuffix,
HostPort: b.desired.Port,
Expand Down Expand Up @@ -203,39 +207,34 @@ func (b *builder) podTemplate(hostNetwork bool, configDigest string) corev1.PodT
}
}

// returns a configmap with a digest of its configuration contents, which will be used to
// detect any configuration change
func (b *builder) configMap() (*corev1.ConfigMap, string) {
var ingest, decoder, enrich, loki, aggregate, prometheus map[string]interface{}

// loki stage (write) configuration
lokiWrite := map[string]interface{}{
"type": "loki",
"labels": constants.LokiIndexFields,
}

if b.desiredLoki != nil {
lokiWrite["batchSize"] = b.desiredLoki.BatchSize
lokiWrite["batchWait"] = b.desiredLoki.BatchWait.ToUnstructured()
lokiWrite["maxBackoff"] = b.desiredLoki.MaxBackoff.ToUnstructured()
lokiWrite["maxRetries"] = b.desiredLoki.MaxRetries
lokiWrite["minBackoff"] = b.desiredLoki.MinBackoff.ToUnstructured()
lokiWrite["staticLabels"] = b.desiredLoki.StaticLabels
lokiWrite["timeout"] = b.desiredLoki.Timeout.ToUnstructured()
lokiWrite["url"] = b.desiredLoki.URL
lokiWrite["timestampLabel"] = "TimeFlowEndMs"
lokiWrite["timestampScale"] = "1ms"
}

loki = map[string]interface{}{"name": "loki",
"write": map[string]interface{}{
"type": "loki",
"loki": lokiWrite,
func (b *builder) getIngestConfig() ([]map[string]string, []map[string]interface{}) {
ingestStages := []map[string]string{
{"name": "ingest"},
{"name": "decode",
"follows": "ingest",
},
}
var ingest, decoder map[string]interface{}

// ingest stage (ingest) configuration
if b.portProtocol == corev1.ProtocolUDP {
if b.confKind == ConfKafkaTransformer {
ingest = map[string]interface{}{
"name": "ingest",
"ingest": map[string]interface{}{
"type": "kafka",
"kafka": map[string]interface{}{
"brokers": []string{b.desiredKafka.Address},
"topic": b.desiredKafka.Topic,
"groupid": b.confKind, // Without groupid, each message is delivered to each consumers
},
},
}
decoder = map[string]interface{}{
"name": "decode",
"decode": map[string]interface{}{
"type": "json",
},
}
} else if b.portProtocol == corev1.ProtocolUDP {
// UDP Port: IPFIX collector with JSON decoder
ingest = map[string]interface{}{
"name": "ingest",
Expand Down Expand Up @@ -272,8 +271,55 @@ func (b *builder) configMap() (*corev1.ConfigMap, string) {
}
}

return ingestStages, []map[string]interface{}{ingest, decoder}
}

func (b *builder) getTransformConfig() ([]map[string]string, []map[string]interface{}) {
transformStages := []map[string]string{
{"name": "enrich",
"follows": "decode",
},
{"name": "loki",
"follows": "enrich",
},
{"name": "aggregate",
"follows": "enrich",
},
{"name": "prometheus",
"follows": "aggregate",
},
}
var enrich, loki, aggregate, prometheus map[string]interface{}

// loki stage (write) configuration
lokiWrite := map[string]interface{}{
"type": "loki",
"labels": constants.LokiIndexFields,
}

if b.desiredLoki != nil {
lokiWrite["batchSize"] = b.desiredLoki.BatchSize
lokiWrite["batchWait"] = b.desiredLoki.BatchWait.ToUnstructured()
lokiWrite["maxBackoff"] = b.desiredLoki.MaxBackoff.ToUnstructured()
lokiWrite["maxRetries"] = b.desiredLoki.MaxRetries
lokiWrite["minBackoff"] = b.desiredLoki.MinBackoff.ToUnstructured()
lokiWrite["staticLabels"] = b.desiredLoki.StaticLabels
lokiWrite["timeout"] = b.desiredLoki.Timeout.ToUnstructured()
lokiWrite["url"] = b.desiredLoki.URL
lokiWrite["timestampLabel"] = "TimeFlowEndMs"
lokiWrite["timestampScale"] = "1ms"
}

loki = map[string]interface{}{"name": "loki",
"write": map[string]interface{}{
"type": "loki",
"loki": lokiWrite,
},
}

// enrich stage (transform) configuration
enrich = map[string]interface{}{"name": "enrich",
enrich = map[string]interface{}{
"name": "enrich",
"transform": map[string]interface{}{
"type": "network",
"network": map[string]interface{}{
Expand All @@ -294,7 +340,8 @@ func (b *builder) configMap() (*corev1.ConfigMap, string) {
}

// prometheus stage (encode) configuration
prometheus = map[string]interface{}{"name": "prometheus",
prometheus = map[string]interface{}{
"name": "prometheus",
"encode": map[string]interface{}{
"type": "prom",
"prom": map[string]interface{}{
Expand All @@ -310,33 +357,63 @@ func (b *builder) configMap() (*corev1.ConfigMap, string) {
"type": "aggregates",
},
}
return transformStages, []map[string]interface{}{enrich, loki, aggregate, prometheus}
}

func (b *builder) getKafkaConfig() ([]map[string]string, []map[string]interface{}) {
kafkaStages := []map[string]string{
{"name": "kafka",
"follows": "decode",
},
}
kafka := map[string]interface{}{
"name": "kafka",
"encode": map[string]interface{}{
"type": "kafka",
"kafka": map[string]interface{}{
"address": b.desiredKafka.Address,
"topic": b.desiredKafka.Topic,
},
},
}

return kafkaStages, []map[string]interface{}{kafka}
}

func (b *builder) getPipelinesConfig() ([]map[string]string, []map[string]interface{}) {
stages := []map[string]string{}
parameters := []map[string]interface{}{}

ingestStages, ingestParameters := b.getIngestConfig()
stages = append(stages, ingestStages...)
parameters = append(parameters, ingestParameters...)

if b.confKind == ConfKafkaIngester {
kafkaStages, kafkaParameters := b.getKafkaConfig()
stages = append(stages, kafkaStages...)
parameters = append(parameters, kafkaParameters...)

} else {
transformStages, transformParameters := b.getTransformConfig()
stages = append(stages, transformStages...)
parameters = append(parameters, transformParameters...)
}
return stages, parameters
}

// returns a configmap with a digest of its configuration contents, which will be used to
// detect any configuration change
func (b *builder) configMap() (*corev1.ConfigMap, string) {

stages, parameters := b.getPipelinesConfig()

config := map[string]interface{}{
"log-level": b.desired.LogLevel,
"health": map[string]interface{}{
"port": b.desired.HealthPort,
},
"pipeline": []map[string]string{
{"name": "ingest"},
{"name": "decode",
"follows": "ingest",
},
{"name": "enrich",
"follows": "decode",
},
{"name": "loki",
"follows": "enrich",
},
{"name": "aggregate",
"follows": "enrich",
},
{"name": "prometheus",
"follows": "aggregate",
},
},
"parameters": []map[string]interface{}{
ingest, decoder, enrich, loki, aggregate, prometheus,
},
"pipeline": stages,
"parameters": parameters,
}

configStr := "{}"
Expand Down
6 changes: 5 additions & 1 deletion controllers/flowlogspipeline/flp_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ func checkDeployNeeded(kafka flowsv1alpha1.FlowCollectorKafka, confKind string)
func (r *singleDeploymentReconciler) Reconcile(ctx context.Context, desired *flowsv1alpha1.FlowCollector) error {
desiredFLP := &desired.Spec.FlowlogsPipeline
desiredLoki := &desired.Spec.Loki
desiredKafka := &desired.Spec.Kafka
err := validateDesired(desiredFLP)
if err != nil {
return err
Expand All @@ -175,7 +176,7 @@ func (r *singleDeploymentReconciler) Reconcile(ctx context.Context, desired *flo
if desired.Spec.Agent == flowsv1alpha1.AgentEBPF {
portProtocol = corev1.ProtocolTCP
}
builder := newBuilder(r.nobjMngr.Namespace, portProtocol, desiredFLP, desiredLoki, r.confKind, r.useOpenShiftSCC)
builder := newBuilder(r.nobjMngr.Namespace, portProtocol, desiredFLP, desiredLoki, desiredKafka, r.confKind, r.useOpenShiftSCC)
newCM, configDigest := builder.configMap()
if !r.nobjMngr.Exists(r.owned.configMap) {
if err := r.CreateOwned(ctx, newCM); err != nil {
Expand All @@ -191,6 +192,9 @@ func (r *singleDeploymentReconciler) Reconcile(ctx context.Context, desired *flo
return err
}

if r.confKind == ConfKafkaTransformer {
return r.reconcileAsDeployment(ctx, desiredFLP, &builder, configDigest)
}
switch desiredFLP.Kind {
case constants.DeploymentKind:
return r.reconcileAsDeployment(ctx, desiredFLP, &builder, configDigest)
Expand Down
Loading