diff --git a/.mk/development.mk b/.mk/development.mk
index 14842aa00..7fb310edd 100644
--- a/.mk/development.mk
+++ b/.mk/development.mk
@@ -22,6 +22,21 @@ undeploy-loki: ## Undeploy loki.
 	curl -S -L https://raw.githubusercontent.com/netobserv/documents/main/examples/zero-click-loki/1-storage.yaml | kubectl --ignore-not-found=true  delete -f - || true
 	-pkill --oldest --full "3100:3100"
 
+.PHONY: deploy-kafka
+deploy-kafka:
+	@echo -e "\n==> Deploy kafka"
+	kubectl create namespace $(NAMESPACE)  --dry-run=client -o yaml | kubectl apply -f -
+	kubectl create -f "https://strimzi.io/install/latest?namespace="$(NAMESPACE) -n $(NAMESPACE)
+	kubectl create -f "https://raw.githubusercontent.com/netobserv/documents/main/examples/kafka-cluster.yaml" -n $(NAMESPACE)
+	kubectl create -f "https://raw.githubusercontent.com/netobserv/documents/main/examples/kafka-topic.yaml" -n $(NAMESPACE)
+
+.PHONY: undeploy-kafka
+undeploy-kafka: ## Undeploy kafka.
+	@echo -e "\n==> Undeploy kafka"
+	kubectl delete -f "https://raw.githubusercontent.com/netobserv/documents/main/examples/kafka-topic.yaml" -n $(NAMESPACE)
+	kubectl delete -f "https://raw.githubusercontent.com/netobserv/documents/main/examples/kafka-cluster.yaml" -n $(NAMESPACE)
+	kubectl delete -f "https://strimzi.io/install/latest?namespace="$(NAMESPACE) -n $(NAMESPACE)
+
 .PHONY: deploy-grafana
 deploy-grafana: ## Deploy grafana.
 	@echo -e "\n==> Deploy grafana"
@@ -48,4 +63,3 @@ deploy-all: manifests generate fmt lint deploy-loki deploy-grafana install deplo
 
 .PHONY: undeploy-all
 undeploy-all: undeploy-loki undeploy-grafana uninstall undeploy-sample-cr
-
diff --git a/config/samples/flows_v1alpha1_flowcollector.yaml b/config/samples/flows_v1alpha1_flowcollector.yaml
index ddd16908f..ee8afad33 100644
--- a/config/samples/flows_v1alpha1_flowcollector.yaml
+++ b/config/samples/flows_v1alpha1_flowcollector.yaml
@@ -30,6 +30,10 @@ spec:
     enableKubeProbes: true
     healthPort: 8080
     prometheusPort: 9090
+  kafka:
+    enable: false
+    address: "kafka-cluster-kafka-bootstrap"
+    topic: "network-flows"
   loki:
     url: 'http://loki:3100/'
     batchWait: 1s
diff --git a/controllers/flowlogspipeline/flp_objects.go b/controllers/flowlogspipeline/flp_objects.go
index cb5fa9cf1..a36a81bae 100644
--- a/controllers/flowlogspipeline/flp_objects.go
+++ b/controllers/flowlogspipeline/flp_objects.go
@@ -55,11 +55,13 @@ type builder struct {
 	portProtocol    corev1.Protocol
 	desired         *flowsv1alpha1.FlowCollectorFLP
 	desiredLoki     *flowsv1alpha1.FlowCollectorLoki
+	desiredKafka    *flowsv1alpha1.FlowCollectorKafka
+	confKind        string
 	confKindSuffix  string
 	useOpenShiftSCC bool
 }
 
-func newBuilder(ns string, portProtocol corev1.Protocol, desired *flowsv1alpha1.FlowCollectorFLP, desiredLoki *flowsv1alpha1.FlowCollectorLoki, confKind string, useOpenShiftSCC bool) builder {
+func newBuilder(ns string, portProtocol corev1.Protocol, desired *flowsv1alpha1.FlowCollectorFLP, desiredLoki *flowsv1alpha1.FlowCollectorLoki, desiredKafka *flowsv1alpha1.FlowCollectorKafka, confKind string, useOpenShiftSCC bool) builder {
 	version := helper.ExtractVersion(desired.Image)
 	return builder{
 		namespace: ns,
@@ -72,7 +74,9 @@ func newBuilder(ns string, portProtocol corev1.Protocol, desired *flowsv1alpha1.
 		},
 		desired:         desired,
 		desiredLoki:     desiredLoki,
+		desiredKafka:    desiredKafka,
 		portProtocol:    portProtocol,
+		confKind:        confKind,
 		confKindSuffix:  FlpConfSuffix[confKind],
 		useOpenShiftSCC: useOpenShiftSCC,
 	}
@@ -114,7 +118,7 @@ func (b *builder) daemonSet(configDigest string) *appsv1.DaemonSet {
 func (b *builder) podTemplate(hostNetwork bool, configDigest string) corev1.PodTemplateSpec {
 	var ports []corev1.ContainerPort
 	var tolerations []corev1.Toleration
-	if b.desired.Kind == constants.DaemonSetKind {
+	if b.desired.Kind == constants.DaemonSetKind && b.confKind != ConfKafkaTransformer {
 		ports = []corev1.ContainerPort{{
 			Name:          constants.FLPPortName + b.confKindSuffix,
 			HostPort:      b.desired.Port,
@@ -203,39 +207,34 @@ func (b *builder) podTemplate(hostNetwork bool, configDigest string) corev1.PodT
 	}
 }
 
-// returns a configmap with a digest of its configuration contents, which will be used to
-// detect any configuration change
-func (b *builder) configMap() (*corev1.ConfigMap, string) {
-	var ingest, decoder, enrich, loki, aggregate, prometheus map[string]interface{}
-
-	// loki stage (write) configuration
-	lokiWrite := map[string]interface{}{
-		"type":   "loki",
-		"labels": constants.LokiIndexFields,
-	}
-
-	if b.desiredLoki != nil {
-		lokiWrite["batchSize"] = b.desiredLoki.BatchSize
-		lokiWrite["batchWait"] = b.desiredLoki.BatchWait.ToUnstructured()
-		lokiWrite["maxBackoff"] = b.desiredLoki.MaxBackoff.ToUnstructured()
-		lokiWrite["maxRetries"] = b.desiredLoki.MaxRetries
-		lokiWrite["minBackoff"] = b.desiredLoki.MinBackoff.ToUnstructured()
-		lokiWrite["staticLabels"] = b.desiredLoki.StaticLabels
-		lokiWrite["timeout"] = b.desiredLoki.Timeout.ToUnstructured()
-		lokiWrite["url"] = b.desiredLoki.URL
-		lokiWrite["timestampLabel"] = "TimeFlowEndMs"
-		lokiWrite["timestampScale"] = "1ms"
-	}
-
-	loki = map[string]interface{}{"name": "loki",
-		"write": map[string]interface{}{
-			"type": "loki",
-			"loki": lokiWrite,
+func (b *builder) getIngestConfig() ([]map[string]string, []map[string]interface{}) {
+	ingestStages := []map[string]string{
+		{"name": "ingest"},
+		{"name": "decode",
+			"follows": "ingest",
 		},
 	}
+	var ingest, decoder map[string]interface{}
 
-	// ingest stage (ingest) configuration
-	if b.portProtocol == corev1.ProtocolUDP {
+	if b.confKind == ConfKafkaTransformer {
+		ingest = map[string]interface{}{
+			"name": "ingest",
+			"ingest": map[string]interface{}{
+				"type": "kafka",
+				"kafka": map[string]interface{}{
+					"brokers": []string{b.desiredKafka.Address},
+					"topic":   b.desiredKafka.Topic,
+					"groupid": b.confKind, // Without groupid, each message is delivered to each consumers
+				},
+			},
+		}
+		decoder = map[string]interface{}{
+			"name": "decode",
+			"decode": map[string]interface{}{
+				"type": "json",
+			},
+		}
+	} else if b.portProtocol == corev1.ProtocolUDP {
 		// UDP Port: IPFIX collector with JSON decoder
 		ingest = map[string]interface{}{
 			"name": "ingest",
@@ -272,8 +271,55 @@ func (b *builder) configMap() (*corev1.ConfigMap, string) {
 		}
 	}
 
+	return ingestStages, []map[string]interface{}{ingest, decoder}
+}
+
+func (b *builder) getTransformConfig() ([]map[string]string, []map[string]interface{}) {
+	transformStages := []map[string]string{
+		{"name": "enrich",
+			"follows": "decode",
+		},
+		{"name": "loki",
+			"follows": "enrich",
+		},
+		{"name": "aggregate",
+			"follows": "enrich",
+		},
+		{"name": "prometheus",
+			"follows": "aggregate",
+		},
+	}
+	var enrich, loki, aggregate, prometheus map[string]interface{}
+
+	// loki stage (write) configuration
+	lokiWrite := map[string]interface{}{
+		"type":   "loki",
+		"labels": constants.LokiIndexFields,
+	}
+
+	if b.desiredLoki != nil {
+		lokiWrite["batchSize"] = b.desiredLoki.BatchSize
+		lokiWrite["batchWait"] = b.desiredLoki.BatchWait.ToUnstructured()
+		lokiWrite["maxBackoff"] = b.desiredLoki.MaxBackoff.ToUnstructured()
+		lokiWrite["maxRetries"] = b.desiredLoki.MaxRetries
+		lokiWrite["minBackoff"] = b.desiredLoki.MinBackoff.ToUnstructured()
+		lokiWrite["staticLabels"] = b.desiredLoki.StaticLabels
+		lokiWrite["timeout"] = b.desiredLoki.Timeout.ToUnstructured()
+		lokiWrite["url"] = b.desiredLoki.URL
+		lokiWrite["timestampLabel"] = "TimeFlowEndMs"
+		lokiWrite["timestampScale"] = "1ms"
+	}
+
+	loki = map[string]interface{}{"name": "loki",
+		"write": map[string]interface{}{
+			"type": "loki",
+			"loki": lokiWrite,
+		},
+	}
+
 	// enrich stage (transform) configuration
-	enrich = map[string]interface{}{"name": "enrich",
+	enrich = map[string]interface{}{
+		"name": "enrich",
 		"transform": map[string]interface{}{
 			"type": "network",
 			"network": map[string]interface{}{
@@ -294,7 +340,8 @@ func (b *builder) configMap() (*corev1.ConfigMap, string) {
 	}
 
 	// prometheus stage (encode) configuration
-	prometheus = map[string]interface{}{"name": "prometheus",
+	prometheus = map[string]interface{}{
+		"name": "prometheus",
 		"encode": map[string]interface{}{
 			"type": "prom",
 			"prom": map[string]interface{}{
@@ -310,33 +357,63 @@ func (b *builder) configMap() (*corev1.ConfigMap, string) {
 			"type": "aggregates",
 		},
 	}
+	return transformStages, []map[string]interface{}{enrich, loki, aggregate, prometheus}
+}
+
+func (b *builder) getKafkaConfig() ([]map[string]string, []map[string]interface{}) {
+	kafkaStages := []map[string]string{
+		{"name": "kafka",
+			"follows": "decode",
+		},
+	}
+	kafka := map[string]interface{}{
+		"name": "kafka",
+		"encode": map[string]interface{}{
+			"type": "kafka",
+			"kafka": map[string]interface{}{
+				"address": b.desiredKafka.Address,
+				"topic":   b.desiredKafka.Topic,
+			},
+		},
+	}
+
+	return kafkaStages, []map[string]interface{}{kafka}
+}
+
+func (b *builder) getPipelinesConfig() ([]map[string]string, []map[string]interface{}) {
+	stages := []map[string]string{}
+	parameters := []map[string]interface{}{}
+
+	ingestStages, ingestParameters := b.getIngestConfig()
+	stages = append(stages, ingestStages...)
+	parameters = append(parameters, ingestParameters...)
+
+	if b.confKind == ConfKafkaIngester {
+		kafkaStages, kafkaParameters := b.getKafkaConfig()
+		stages = append(stages, kafkaStages...)
+		parameters = append(parameters, kafkaParameters...)
+
+	} else {
+		transformStages, transformParameters := b.getTransformConfig()
+		stages = append(stages, transformStages...)
+		parameters = append(parameters, transformParameters...)
+	}
+	return stages, parameters
+}
+
+// returns a configmap with a digest of its configuration contents, which will be used to
+// detect any configuration change
+func (b *builder) configMap() (*corev1.ConfigMap, string) {
+
+	stages, parameters := b.getPipelinesConfig()
 
 	config := map[string]interface{}{
 		"log-level": b.desired.LogLevel,
 		"health": map[string]interface{}{
 			"port": b.desired.HealthPort,
 		},
-		"pipeline": []map[string]string{
-			{"name": "ingest"},
-			{"name": "decode",
-				"follows": "ingest",
-			},
-			{"name": "enrich",
-				"follows": "decode",
-			},
-			{"name": "loki",
-				"follows": "enrich",
-			},
-			{"name": "aggregate",
-				"follows": "enrich",
-			},
-			{"name": "prometheus",
-				"follows": "aggregate",
-			},
-		},
-		"parameters": []map[string]interface{}{
-			ingest, decoder, enrich, loki, aggregate, prometheus,
-		},
+		"pipeline":   stages,
+		"parameters": parameters,
 	}
 
 	configStr := "{}"
diff --git a/controllers/flowlogspipeline/flp_reconciler.go b/controllers/flowlogspipeline/flp_reconciler.go
index a7379382a..e184ac2e3 100644
--- a/controllers/flowlogspipeline/flp_reconciler.go
+++ b/controllers/flowlogspipeline/flp_reconciler.go
@@ -151,6 +151,7 @@ func checkDeployNeeded(kafka flowsv1alpha1.FlowCollectorKafka, confKind string)
 func (r *singleDeploymentReconciler) Reconcile(ctx context.Context, desired *flowsv1alpha1.FlowCollector) error {
 	desiredFLP := &desired.Spec.FlowlogsPipeline
 	desiredLoki := &desired.Spec.Loki
+	desiredKafka := &desired.Spec.Kafka
 	err := validateDesired(desiredFLP)
 	if err != nil {
 		return err
@@ -175,7 +176,7 @@ func (r *singleDeploymentReconciler) Reconcile(ctx context.Context, desired *flo
 	if desired.Spec.Agent == flowsv1alpha1.AgentEBPF {
 		portProtocol = corev1.ProtocolTCP
 	}
-	builder := newBuilder(r.nobjMngr.Namespace, portProtocol, desiredFLP, desiredLoki, r.confKind, r.useOpenShiftSCC)
+	builder := newBuilder(r.nobjMngr.Namespace, portProtocol, desiredFLP, desiredLoki, desiredKafka, r.confKind, r.useOpenShiftSCC)
 	newCM, configDigest := builder.configMap()
 	if !r.nobjMngr.Exists(r.owned.configMap) {
 		if err := r.CreateOwned(ctx, newCM); err != nil {
@@ -191,6 +192,9 @@ func (r *singleDeploymentReconciler) Reconcile(ctx context.Context, desired *flo
 		return err
 	}
 
+	if r.confKind == ConfKafkaTransformer {
+		return r.reconcileAsDeployment(ctx, desiredFLP, &builder, configDigest)
+	}
 	switch desiredFLP.Kind {
 	case constants.DeploymentKind:
 		return r.reconcileAsDeployment(ctx, desiredFLP, &builder, configDigest)
diff --git a/controllers/flowlogspipeline/flp_test.go b/controllers/flowlogspipeline/flp_test.go
index 9a067bc6d..7a1ea6d42 100644
--- a/controllers/flowlogspipeline/flp_test.go
+++ b/controllers/flowlogspipeline/flp_test.go
@@ -102,6 +102,14 @@ func getLokiConfig() flowsv1alpha1.FlowCollectorLoki {
 	}
 }
 
+func getKafkaConfig() flowsv1alpha1.FlowCollectorKafka {
+	return flowsv1alpha1.FlowCollectorKafka{
+		Enable:  false,
+		Address: "kafka",
+		Topic:   "flp",
+	}
+}
+
 func getAutoScalerSpecs() (ascv2.HorizontalPodAutoscaler, flowsv1alpha1.FlowCollectorFLP) {
 	var autoScaler = ascv2.HorizontalPodAutoscaler{
 		ObjectMeta: metav1.ObjectMeta{
@@ -137,14 +145,15 @@ func TestDaemonSetNoChange(t *testing.T) {
 	ns := "namespace"
 	flp := getFLPConfig()
 	loki := getLokiConfig()
-	b := newBuilder(ns, corev1.ProtocolUDP, &flp, &loki, ConfSingle, true)
+	kafka := getKafkaConfig()
+	b := newBuilder(ns, corev1.ProtocolUDP, &flp, &loki, &kafka, ConfSingle, true)
 	_, digest := b.configMap()
 	first := b.daemonSet(digest)
 
 	// Check no change
 	flp = getFLPConfig()
 	loki = getLokiConfig()
-	b = newBuilder(ns, corev1.ProtocolUDP, &flp, &loki, ConfSingle, true)
+	b = newBuilder(ns, corev1.ProtocolUDP, &flp, &loki, &kafka, ConfSingle, true)
 	_, digest = b.configMap()
 
 	assert.False(daemonSetNeedsUpdate(first, &flp, digest, constants.FLPName+FlpConfSuffix[ConfSingle]))
@@ -157,13 +166,14 @@ func TestDaemonSetChanged(t *testing.T) {
 	ns := "namespace"
 	flp := getFLPConfig()
 	loki := getLokiConfig()
-	b := newBuilder(ns, corev1.ProtocolUDP, &flp, &loki, ConfSingle, true)
+	kafka := getKafkaConfig()
+	b := newBuilder(ns, corev1.ProtocolUDP, &flp, &loki, &kafka, ConfSingle, true)
 	_, digest := b.configMap()
 	first := b.daemonSet(digest)
 
 	// Check probes enabled change
 	flp.EnableKubeProbes = true
-	b = newBuilder(ns, corev1.ProtocolUDP, &flp, &loki, ConfSingle, true)
+	b = newBuilder(ns, corev1.ProtocolUDP, &flp, &loki, &kafka, ConfSingle, true)
 	_, digest = b.configMap()
 	second := b.daemonSet(digest)
 
@@ -171,7 +181,7 @@ func TestDaemonSetChanged(t *testing.T) {
 
 	// Check log level change
 	flp.LogLevel = "info"
-	b = newBuilder(ns, corev1.ProtocolUDP, &flp, &loki, ConfSingle, true)
+	b = newBuilder(ns, corev1.ProtocolUDP, &flp, &loki, &kafka, ConfSingle, true)
 	_, digest = b.configMap()
 	third := b.daemonSet(digest)
 
@@ -182,7 +192,7 @@ func TestDaemonSetChanged(t *testing.T) {
 		corev1.ResourceCPU:    resource.MustParse("500m"),
 		corev1.ResourceMemory: resource.MustParse("500Gi"),
 	}
-	b = newBuilder(ns, corev1.ProtocolUDP, &flp, &loki, ConfSingle, true)
+	b = newBuilder(ns, corev1.ProtocolUDP, &flp, &loki, &kafka, ConfSingle, true)
 	_, digest = b.configMap()
 	fourth := b.daemonSet(digest)
 
@@ -193,7 +203,7 @@ func TestDaemonSetChanged(t *testing.T) {
 		corev1.ResourceCPU:    resource.MustParse("1"),
 		corev1.ResourceMemory: resource.MustParse("512Mi"),
 	}
-	b = newBuilder(ns, corev1.ProtocolUDP, &flp, &loki, ConfSingle, true)
+	b = newBuilder(ns, corev1.ProtocolUDP, &flp, &loki, &kafka, ConfSingle, true)
 	_, digest = b.configMap()
 
 	assert.True(daemonSetNeedsUpdate(fourth, &flp, digest, constants.FLPName+FlpConfSuffix[ConfSingle]))
@@ -207,14 +217,15 @@ func TestDeploymentNoChange(t *testing.T) {
 	ns := "namespace"
 	flp := getFLPConfig()
 	loki := getLokiConfig()
-	b := newBuilder(ns, corev1.ProtocolUDP, &flp, &loki, ConfSingle, true)
+	kafka := getKafkaConfig()
+	b := newBuilder(ns, corev1.ProtocolUDP, &flp, &loki, &kafka, ConfSingle, true)
 	_, digest := b.configMap()
 	first := b.deployment(digest)
 
 	// Check no change
 	flp = getFLPConfig()
 	loki = getLokiConfig()
-	b = newBuilder(ns, corev1.ProtocolUDP, &flp, &loki, ConfSingle, true)
+	b = newBuilder(ns, corev1.ProtocolUDP, &flp, &loki, &kafka, ConfSingle, true)
 	_, digest = b.configMap()
 
 	assert.False(deploymentNeedsUpdate(first, &flp, digest, constants.FLPName+FlpConfSuffix[ConfSingle]))
@@ -227,13 +238,14 @@ func TestDeploymentChanged(t *testing.T) {
 	ns := "namespace"
 	flp := getFLPConfig()
 	loki := getLokiConfig()
-	b := newBuilder(ns, corev1.ProtocolUDP, &flp, &loki, ConfSingle, true)
+	kafka := getKafkaConfig()
+	b := newBuilder(ns, corev1.ProtocolUDP, &flp, &loki, &kafka, ConfSingle, true)
 	_, digest := b.configMap()
 	first := b.deployment(digest)
 
 	// Check probes enabled change
 	flp.EnableKubeProbes = true
-	b = newBuilder(ns, corev1.ProtocolUDP, &flp, &loki, ConfSingle, true)
+	b = newBuilder(ns, corev1.ProtocolUDP, &flp, &loki, &kafka, ConfSingle, true)
 	_, digest = b.configMap()
 	second := b.deployment(digest)
 
@@ -241,7 +253,7 @@ func TestDeploymentChanged(t *testing.T) {
 
 	// Check log level change
 	flp.LogLevel = "info"
-	b = newBuilder(ns, corev1.ProtocolUDP, &flp, &loki, ConfSingle, true)
+	b = newBuilder(ns, corev1.ProtocolUDP, &flp, &loki, &kafka, ConfSingle, true)
 	_, digest = b.configMap()
 	third := b.deployment(digest)
 
@@ -252,7 +264,7 @@ func TestDeploymentChanged(t *testing.T) {
 		corev1.ResourceCPU:    resource.MustParse("500m"),
 		corev1.ResourceMemory: resource.MustParse("500Gi"),
 	}
-	b = newBuilder(ns, corev1.ProtocolUDP, &flp, &loki, ConfSingle, true)
+	b = newBuilder(ns, corev1.ProtocolUDP, &flp, &loki, &kafka, ConfSingle, true)
 	_, digest = b.configMap()
 	fourth := b.deployment(digest)
 
@@ -263,7 +275,7 @@ func TestDeploymentChanged(t *testing.T) {
 		corev1.ResourceCPU:    resource.MustParse("1"),
 		corev1.ResourceMemory: resource.MustParse("512Mi"),
 	}
-	b = newBuilder(ns, corev1.ProtocolUDP, &flp, &loki, ConfSingle, true)
+	b = newBuilder(ns, corev1.ProtocolUDP, &flp, &loki, &kafka, ConfSingle, true)
 	_, digest = b.configMap()
 	fifth := b.deployment(digest)
 
@@ -273,7 +285,7 @@ func TestDeploymentChanged(t *testing.T) {
 	// Check replicas didn't change because HPA is used
 	flp2 := flp
 	flp2.Replicas = 5
-	b = newBuilder(ns, corev1.ProtocolUDP, &flp2, &loki, ConfSingle, true)
+	b = newBuilder(ns, corev1.ProtocolUDP, &flp2, &loki, &kafka, ConfSingle, true)
 	_, digest = b.configMap()
 
 	assert.False(deploymentNeedsUpdate(fifth, &flp2, digest, constants.FLPName+FlpConfSuffix[ConfSingle]))
@@ -286,14 +298,15 @@ func TestDeploymentChangedReplicasNoHPA(t *testing.T) {
 	ns := "namespace"
 	flp := getFLPConfigNoHPA()
 	loki := getLokiConfig()
-	b := newBuilder(ns, corev1.ProtocolUDP, &flp, &loki, ConfSingle, true)
+	kafka := getKafkaConfig()
+	b := newBuilder(ns, corev1.ProtocolUDP, &flp, &loki, &kafka, ConfSingle, true)
 	_, digest := b.configMap()
 	first := b.deployment(digest)
 
 	// Check replicas changed (need to copy flp, as Spec.Replicas stores a pointer)
 	flp2 := flp
 	flp2.Replicas = 5
-	b = newBuilder(ns, corev1.ProtocolUDP, &flp2, &loki, ConfSingle, true)
+	b = newBuilder(ns, corev1.ProtocolUDP, &flp2, &loki, &kafka, ConfSingle, true)
 	_, digest = b.configMap()
 
 	assert.True(deploymentNeedsUpdate(first, &flp2, digest, constants.FLPName+FlpConfSuffix[ConfSingle]))
@@ -306,7 +319,8 @@ func TestServiceNoChange(t *testing.T) {
 	ns := "namespace"
 	flp := getFLPConfig()
 	loki := getLokiConfig()
-	b := newBuilder(ns, corev1.ProtocolUDP, &flp, &loki, ConfSingle, true)
+	kafka := getKafkaConfig()
+	b := newBuilder(ns, corev1.ProtocolUDP, &flp, &loki, &kafka, ConfSingle, true)
 	first := b.service(nil)
 
 	// Check no change
@@ -322,19 +336,20 @@ func TestServiceChanged(t *testing.T) {
 	ns := "namespace"
 	flp := getFLPConfig()
 	loki := getLokiConfig()
-	b := newBuilder(ns, corev1.ProtocolUDP, &flp, &loki, ConfSingle, true)
+	kafka := getKafkaConfig()
+	b := newBuilder(ns, corev1.ProtocolUDP, &flp, &loki, &kafka, ConfSingle, true)
 	first := b.service(nil)
 
 	// Check port changed
 	flp.Port = 9999
-	b = newBuilder(ns, corev1.ProtocolUDP, &flp, &loki, ConfSingle, true)
+	b = newBuilder(ns, corev1.ProtocolUDP, &flp, &loki, &kafka, ConfSingle, true)
 	second := b.service(first)
 
 	assert.True(serviceNeedsUpdate(first, second))
 
 	// Make sure non-service settings doesn't trigger service update
 	flp.LogLevel = "error"
-	b = newBuilder(ns, corev1.ProtocolUDP, &flp, &loki, ConfSingle, true)
+	b = newBuilder(ns, corev1.ProtocolUDP, &flp, &loki, &kafka, ConfSingle, true)
 	third := b.service(first)
 
 	assert.False(serviceNeedsUpdate(second, third))
@@ -346,7 +361,8 @@ func TestConfigMapShouldDeserializeAsYAML(t *testing.T) {
 	ns := "namespace"
 	flp := getFLPConfig()
 	loki := getLokiConfig()
-	b := newBuilder(ns, corev1.ProtocolUDP, &flp, &loki, ConfSingle, true)
+	kafka := getKafkaConfig()
+	b := newBuilder(ns, corev1.ProtocolUDP, &flp, &loki, &kafka, ConfSingle, true)
 	cm, digest := b.configMap()
 	assert.NotEmpty(t, digest)
 
@@ -414,7 +430,8 @@ func TestLabels(t *testing.T) {
 	assert := assert.New(t)
 
 	flpk := getFLPConfig()
-	builder := newBuilder("ns", corev1.ProtocolUDP, &flpk, nil, ConfSingle, true)
+	kafka := getKafkaConfig()
+	builder := newBuilder("ns", corev1.ProtocolUDP, &flpk, nil, &kafka, ConfSingle, true)
 
 	// Deployment
 	depl := builder.deployment("digest")
@@ -466,3 +483,49 @@ func TestDeployNeeded(t *testing.T) {
 	assert.NoError(err)
 
 }
+
+// This function validate that each stage has its matching parameter
+func validatePipelineConfig(stages []map[string]string, parameters []map[string]interface{}) bool {
+	for _, stage := range stages {
+		stageName, exist := stage["name"]
+		if !exist {
+			return false
+		}
+		exist = false
+		for _, parameter := range parameters {
+			if stageName == parameter["name"] {
+				exist = true
+				break
+			}
+		}
+		if exist == false {
+			return exist
+		}
+	}
+	return true
+}
+
+func TestPipelineConfig(t *testing.T) {
+	assert := assert.New(t)
+
+	// Single config
+	ns := "namespace"
+	flp := getFLPConfig()
+	loki := getLokiConfig()
+	kafka := getKafkaConfig()
+	b := newBuilder(ns, corev1.ProtocolUDP, &flp, &loki, &kafka, ConfSingle, true)
+	stages, parameters := b.getPipelinesConfig()
+	assert.True(validatePipelineConfig(stages, parameters))
+
+	// Kafka Ingester
+	kafka.Enable = true
+	b = newBuilder(ns, corev1.ProtocolUDP, &flp, &loki, &kafka, ConfKafkaIngester, true)
+	stages, parameters = b.getPipelinesConfig()
+	assert.True(validatePipelineConfig(stages, parameters))
+
+	// Kafka Transformer
+	b = newBuilder(ns, corev1.ProtocolUDP, &flp, &loki, &kafka, ConfKafkaTransformer, true)
+	stages, parameters = b.getPipelinesConfig()
+	assert.True(validatePipelineConfig(stages, parameters))
+
+}