Skip to content

Commit

Permalink
NETOBSERV-256: Kafka config (#107)
Browse files Browse the repository at this point in the history
* Added flp kafka configuration when kafka is enabled

* Added Makefile target to deploy and undeploy kafka

* Added example of kafka configuration

* Fixed wrong comment

* Changed kafka topic name in flowcollector example
  • Loading branch information
OlivierCazade authored May 31, 2022
1 parent 23c7912 commit 0994ae6
Show file tree
Hide file tree
Showing 5 changed files with 242 additions and 80 deletions.
16 changes: 15 additions & 1 deletion .mk/development.mk
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,21 @@ undeploy-loki: ## Undeploy loki.
curl -S -L https://raw.githubusercontent.com/netobserv/documents/main/examples/zero-click-loki/1-storage.yaml | kubectl --ignore-not-found=true delete -f - || true
-pkill --oldest --full "3100:3100"

.PHONY: deploy-kafka
deploy-kafka:
@echo -e "\n==> Deploy kafka"
kubectl create namespace $(NAMESPACE) --dry-run=client -o yaml | kubectl apply -f -
kubectl create -f "https://strimzi.io/install/latest?namespace="$(NAMESPACE) -n $(NAMESPACE)
kubectl create -f "https://raw.githubusercontent.com/netobserv/documents/main/examples/kafka-cluster.yaml" -n $(NAMESPACE)
kubectl create -f "https://raw.githubusercontent.com/netobserv/documents/main/examples/kafka-topic.yaml" -n $(NAMESPACE)

.PHONY: undeploy-kafka
undeploy-kafka: ## Undeploy kafka.
@echo -e "\n==> Undeploy kafka"
kubectl delete -f "https://raw.githubusercontent.com/netobserv/documents/main/examples/kafka-topic.yaml" -n $(NAMESPACE)
kubectl delete -f "https://raw.githubusercontent.com/netobserv/documents/main/examples/kafka-cluster.yaml" -n $(NAMESPACE)
kubectl delete -f "https://strimzi.io/install/latest?namespace="$(NAMESPACE) -n $(NAMESPACE)

.PHONY: deploy-grafana
deploy-grafana: ## Deploy grafana.
@echo -e "\n==> Deploy grafana"
Expand All @@ -48,4 +63,3 @@ deploy-all: manifests generate fmt lint deploy-loki deploy-grafana install deplo

.PHONY: undeploy-all
undeploy-all: undeploy-loki undeploy-grafana uninstall undeploy-sample-cr

4 changes: 4 additions & 0 deletions config/samples/flows_v1alpha1_flowcollector.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ spec:
enableKubeProbes: true
healthPort: 8080
prometheusPort: 9090
kafka:
enable: false
address: "kafka-cluster-kafka-bootstrap"
topic: "network-flows"
loki:
url: 'http://loki:3100/'
batchWait: 1s
Expand Down
187 changes: 132 additions & 55 deletions controllers/flowlogspipeline/flp_objects.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,13 @@ type builder struct {
portProtocol corev1.Protocol
desired *flowsv1alpha1.FlowCollectorFLP
desiredLoki *flowsv1alpha1.FlowCollectorLoki
desiredKafka *flowsv1alpha1.FlowCollectorKafka
confKind string
confKindSuffix string
useOpenShiftSCC bool
}

func newBuilder(ns string, portProtocol corev1.Protocol, desired *flowsv1alpha1.FlowCollectorFLP, desiredLoki *flowsv1alpha1.FlowCollectorLoki, confKind string, useOpenShiftSCC bool) builder {
func newBuilder(ns string, portProtocol corev1.Protocol, desired *flowsv1alpha1.FlowCollectorFLP, desiredLoki *flowsv1alpha1.FlowCollectorLoki, desiredKafka *flowsv1alpha1.FlowCollectorKafka, confKind string, useOpenShiftSCC bool) builder {
version := helper.ExtractVersion(desired.Image)
return builder{
namespace: ns,
Expand All @@ -72,7 +74,9 @@ func newBuilder(ns string, portProtocol corev1.Protocol, desired *flowsv1alpha1.
},
desired: desired,
desiredLoki: desiredLoki,
desiredKafka: desiredKafka,
portProtocol: portProtocol,
confKind: confKind,
confKindSuffix: FlpConfSuffix[confKind],
useOpenShiftSCC: useOpenShiftSCC,
}
Expand Down Expand Up @@ -114,7 +118,7 @@ func (b *builder) daemonSet(configDigest string) *appsv1.DaemonSet {
func (b *builder) podTemplate(hostNetwork bool, configDigest string) corev1.PodTemplateSpec {
var ports []corev1.ContainerPort
var tolerations []corev1.Toleration
if b.desired.Kind == constants.DaemonSetKind {
if b.desired.Kind == constants.DaemonSetKind && b.confKind != ConfKafkaTransformer {
ports = []corev1.ContainerPort{{
Name: constants.FLPPortName + b.confKindSuffix,
HostPort: b.desired.Port,
Expand Down Expand Up @@ -203,39 +207,34 @@ func (b *builder) podTemplate(hostNetwork bool, configDigest string) corev1.PodT
}
}

// returns a configmap with a digest of its configuration contents, which will be used to
// detect any configuration change
func (b *builder) configMap() (*corev1.ConfigMap, string) {
var ingest, decoder, enrich, loki, aggregate, prometheus map[string]interface{}

// loki stage (write) configuration
lokiWrite := map[string]interface{}{
"type": "loki",
"labels": constants.LokiIndexFields,
}

if b.desiredLoki != nil {
lokiWrite["batchSize"] = b.desiredLoki.BatchSize
lokiWrite["batchWait"] = b.desiredLoki.BatchWait.ToUnstructured()
lokiWrite["maxBackoff"] = b.desiredLoki.MaxBackoff.ToUnstructured()
lokiWrite["maxRetries"] = b.desiredLoki.MaxRetries
lokiWrite["minBackoff"] = b.desiredLoki.MinBackoff.ToUnstructured()
lokiWrite["staticLabels"] = b.desiredLoki.StaticLabels
lokiWrite["timeout"] = b.desiredLoki.Timeout.ToUnstructured()
lokiWrite["url"] = b.desiredLoki.URL
lokiWrite["timestampLabel"] = "TimeFlowEndMs"
lokiWrite["timestampScale"] = "1ms"
}

loki = map[string]interface{}{"name": "loki",
"write": map[string]interface{}{
"type": "loki",
"loki": lokiWrite,
func (b *builder) getIngestConfig() ([]map[string]string, []map[string]interface{}) {
ingestStages := []map[string]string{
{"name": "ingest"},
{"name": "decode",
"follows": "ingest",
},
}
var ingest, decoder map[string]interface{}

// ingest stage (ingest) configuration
if b.portProtocol == corev1.ProtocolUDP {
if b.confKind == ConfKafkaTransformer {
ingest = map[string]interface{}{
"name": "ingest",
"ingest": map[string]interface{}{
"type": "kafka",
"kafka": map[string]interface{}{
"brokers": []string{b.desiredKafka.Address},
"topic": b.desiredKafka.Topic,
"groupid": b.confKind, // Without groupid, each message is delivered to each consumers
},
},
}
decoder = map[string]interface{}{
"name": "decode",
"decode": map[string]interface{}{
"type": "json",
},
}
} else if b.portProtocol == corev1.ProtocolUDP {
// UDP Port: IPFIX collector with JSON decoder
ingest = map[string]interface{}{
"name": "ingest",
Expand Down Expand Up @@ -272,8 +271,55 @@ func (b *builder) configMap() (*corev1.ConfigMap, string) {
}
}

return ingestStages, []map[string]interface{}{ingest, decoder}
}

func (b *builder) getTransformConfig() ([]map[string]string, []map[string]interface{}) {
transformStages := []map[string]string{
{"name": "enrich",
"follows": "decode",
},
{"name": "loki",
"follows": "enrich",
},
{"name": "aggregate",
"follows": "enrich",
},
{"name": "prometheus",
"follows": "aggregate",
},
}
var enrich, loki, aggregate, prometheus map[string]interface{}

// loki stage (write) configuration
lokiWrite := map[string]interface{}{
"type": "loki",
"labels": constants.LokiIndexFields,
}

if b.desiredLoki != nil {
lokiWrite["batchSize"] = b.desiredLoki.BatchSize
lokiWrite["batchWait"] = b.desiredLoki.BatchWait.ToUnstructured()
lokiWrite["maxBackoff"] = b.desiredLoki.MaxBackoff.ToUnstructured()
lokiWrite["maxRetries"] = b.desiredLoki.MaxRetries
lokiWrite["minBackoff"] = b.desiredLoki.MinBackoff.ToUnstructured()
lokiWrite["staticLabels"] = b.desiredLoki.StaticLabels
lokiWrite["timeout"] = b.desiredLoki.Timeout.ToUnstructured()
lokiWrite["url"] = b.desiredLoki.URL
lokiWrite["timestampLabel"] = "TimeFlowEndMs"
lokiWrite["timestampScale"] = "1ms"
}

loki = map[string]interface{}{"name": "loki",
"write": map[string]interface{}{
"type": "loki",
"loki": lokiWrite,
},
}

// enrich stage (transform) configuration
enrich = map[string]interface{}{"name": "enrich",
enrich = map[string]interface{}{
"name": "enrich",
"transform": map[string]interface{}{
"type": "network",
"network": map[string]interface{}{
Expand All @@ -294,7 +340,8 @@ func (b *builder) configMap() (*corev1.ConfigMap, string) {
}

// prometheus stage (encode) configuration
prometheus = map[string]interface{}{"name": "prometheus",
prometheus = map[string]interface{}{
"name": "prometheus",
"encode": map[string]interface{}{
"type": "prom",
"prom": map[string]interface{}{
Expand All @@ -310,33 +357,63 @@ func (b *builder) configMap() (*corev1.ConfigMap, string) {
"type": "aggregates",
},
}
return transformStages, []map[string]interface{}{enrich, loki, aggregate, prometheus}
}

func (b *builder) getKafkaConfig() ([]map[string]string, []map[string]interface{}) {
kafkaStages := []map[string]string{
{"name": "kafka",
"follows": "decode",
},
}
kafka := map[string]interface{}{
"name": "kafka",
"encode": map[string]interface{}{
"type": "kafka",
"kafka": map[string]interface{}{
"address": b.desiredKafka.Address,
"topic": b.desiredKafka.Topic,
},
},
}

return kafkaStages, []map[string]interface{}{kafka}
}

func (b *builder) getPipelinesConfig() ([]map[string]string, []map[string]interface{}) {
stages := []map[string]string{}
parameters := []map[string]interface{}{}

ingestStages, ingestParameters := b.getIngestConfig()
stages = append(stages, ingestStages...)
parameters = append(parameters, ingestParameters...)

if b.confKind == ConfKafkaIngester {
kafkaStages, kafkaParameters := b.getKafkaConfig()
stages = append(stages, kafkaStages...)
parameters = append(parameters, kafkaParameters...)

} else {
transformStages, transformParameters := b.getTransformConfig()
stages = append(stages, transformStages...)
parameters = append(parameters, transformParameters...)
}
return stages, parameters
}

// returns a configmap with a digest of its configuration contents, which will be used to
// detect any configuration change
func (b *builder) configMap() (*corev1.ConfigMap, string) {

stages, parameters := b.getPipelinesConfig()

config := map[string]interface{}{
"log-level": b.desired.LogLevel,
"health": map[string]interface{}{
"port": b.desired.HealthPort,
},
"pipeline": []map[string]string{
{"name": "ingest"},
{"name": "decode",
"follows": "ingest",
},
{"name": "enrich",
"follows": "decode",
},
{"name": "loki",
"follows": "enrich",
},
{"name": "aggregate",
"follows": "enrich",
},
{"name": "prometheus",
"follows": "aggregate",
},
},
"parameters": []map[string]interface{}{
ingest, decoder, enrich, loki, aggregate, prometheus,
},
"pipeline": stages,
"parameters": parameters,
}

configStr := "{}"
Expand Down
6 changes: 5 additions & 1 deletion controllers/flowlogspipeline/flp_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ func checkDeployNeeded(kafka flowsv1alpha1.FlowCollectorKafka, confKind string)
func (r *singleDeploymentReconciler) Reconcile(ctx context.Context, desired *flowsv1alpha1.FlowCollector) error {
desiredFLP := &desired.Spec.FlowlogsPipeline
desiredLoki := &desired.Spec.Loki
desiredKafka := &desired.Spec.Kafka
err := validateDesired(desiredFLP)
if err != nil {
return err
Expand All @@ -175,7 +176,7 @@ func (r *singleDeploymentReconciler) Reconcile(ctx context.Context, desired *flo
if desired.Spec.Agent == flowsv1alpha1.AgentEBPF {
portProtocol = corev1.ProtocolTCP
}
builder := newBuilder(r.nobjMngr.Namespace, portProtocol, desiredFLP, desiredLoki, r.confKind, r.useOpenShiftSCC)
builder := newBuilder(r.nobjMngr.Namespace, portProtocol, desiredFLP, desiredLoki, desiredKafka, r.confKind, r.useOpenShiftSCC)
newCM, configDigest := builder.configMap()
if !r.nobjMngr.Exists(r.owned.configMap) {
if err := r.CreateOwned(ctx, newCM); err != nil {
Expand All @@ -191,6 +192,9 @@ func (r *singleDeploymentReconciler) Reconcile(ctx context.Context, desired *flo
return err
}

if r.confKind == ConfKafkaTransformer {
return r.reconcileAsDeployment(ctx, desiredFLP, &builder, configDigest)
}
switch desiredFLP.Kind {
case constants.DeploymentKind:
return r.reconcileAsDeployment(ctx, desiredFLP, &builder, configDigest)
Expand Down
Loading

0 comments on commit 0994ae6

Please sign in to comment.