From 21591e4fdaabb1e53e48e67d738fc049ac20361a Mon Sep 17 00:00:00 2001 From: Joel Takvorian Date: Mon, 20 Nov 2023 11:54:18 +0100 Subject: [PATCH] Refactor FLP builder (#494) Make it easier to build a FLP pipeline outside of the FLP reconciler. This will be useful for NETOBSERV-627 - Create a new file flp_pipeline_builder.go that contains the pipeline building code, extracted from flp_common_objects.go - Make some of these functions exported, for external usage - Some API enhancements such as builder.NewGRPCPipeline for consumer friendlyness --- .../flowlogspipeline/flp_common_objects.go | 482 +++--------------- .../flowlogspipeline/flp_ingest_objects.go | 28 +- .../flowlogspipeline/flp_monolith_objects.go | 30 +- .../flp_monolith_reconciler.go | 2 +- .../flowlogspipeline/flp_pipeline_builder.go | 421 +++++++++++++++ controllers/flowlogspipeline/flp_test.go | 132 ++--- .../flowlogspipeline/flp_transfo_objects.go | 39 +- .../flp_transfo_reconciler.go | 2 +- 8 files changed, 586 insertions(+), 550 deletions(-) create mode 100644 controllers/flowlogspipeline/flp_pipeline_builder.go diff --git a/controllers/flowlogspipeline/flp_common_objects.go b/controllers/flowlogspipeline/flp_common_objects.go index 3216c6f2c..47be4b9e7 100644 --- a/controllers/flowlogspipeline/flp_common_objects.go +++ b/controllers/flowlogspipeline/flp_common_objects.go @@ -5,13 +5,10 @@ import ( "fmt" "hash/fnv" "strconv" - "time" "github.com/netobserv/flowlogs-pipeline/pkg/api" "github.com/netobserv/flowlogs-pipeline/pkg/config" monitoringv1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1" - promConfig "github.com/prometheus/common/config" - "github.com/prometheus/common/model" corev1 "k8s.io/api/core/v1" rbacv1 "k8s.io/api/rbac/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -19,29 +16,22 @@ import ( flowslatest "github.com/netobserv/network-observability-operator/api/v1beta2" "github.com/netobserv/network-observability-operator/controllers/constants" - "github.com/netobserv/network-observability-operator/controllers/globals" "github.com/netobserv/network-observability-operator/controllers/reconcilers" - "github.com/netobserv/network-observability-operator/pkg/filters" "github.com/netobserv/network-observability-operator/pkg/helper" - "github.com/netobserv/network-observability-operator/pkg/metrics" "github.com/netobserv/network-observability-operator/pkg/volumes" ) const ( - configVolume = "config-volume" - configPath = "/etc/flowlogs-pipeline" - configFile = "config.json" - lokiToken = "loki-token" - healthServiceName = "health" - prometheusServiceName = "prometheus" - profilePortName = "pprof" - healthTimeoutSeconds = 5 - livenessPeriodSeconds = 10 - startupFailureThreshold = 5 - startupPeriodSeconds = 10 - conntrackTerminatingTimeout = 5 * time.Second - conntrackEndTimeout = 10 * time.Second - conntrackHeartbeatInterval = 30 * time.Second + configVolume = "config-volume" + configPath = "/etc/flowlogs-pipeline" + configFile = "config.json" + healthServiceName = "health" + prometheusServiceName = "prometheus" + profilePortName = "pprof" + healthTimeoutSeconds = 5 + livenessPeriodSeconds = 10 + startupFailureThreshold = 5 + startupPeriodSeconds = 10 ) type ConfKind string @@ -58,7 +48,7 @@ var FlpConfSuffix = map[ConfKind]string{ ConfKafkaTransformer: "-transformer", } -type builder struct { +type Builder struct { info *reconcilers.Instance labels map[string]string selector map[string]string @@ -67,9 +57,12 @@ type builder struct { confKind ConfKind volumes volumes.Builder loki *helper.LokiConfig + pipeline *PipelineBuilder } -func newBuilder(info *reconcilers.Instance, desired *flowslatest.FlowCollectorSpec, ck ConfKind) (builder, error) { +type builder = Builder + +func NewBuilder(info *reconcilers.Instance, desired *flowslatest.FlowCollectorSpec, ck ConfKind) (Builder, error) { version := helper.ExtractVersion(info.Image) name := name(ck) var promTLS *flowslatest.CertificateReference @@ -115,6 +108,43 @@ func (b *builder) promServiceName() string { return promServiceName(b.confKin func (b *builder) configMapName() string { return configMapName(b.confKind) } func (b *builder) serviceMonitorName() string { return serviceMonitorName(b.confKind) } func (b *builder) prometheusRuleName() string { return prometheusRuleName(b.confKind) } +func (b *builder) Pipeline() *PipelineBuilder { return b.pipeline } + +func (b *builder) NewIPFIXPipeline() PipelineBuilder { + return b.initPipeline(config.NewCollectorPipeline("ipfix", api.IngestCollector{ + Port: int(b.desired.Processor.Port), + HostName: "0.0.0.0", + })) +} + +func (b *builder) NewGRPCPipeline() PipelineBuilder { + return b.initPipeline(config.NewGRPCPipeline("grpc", api.IngestGRPCProto{ + Port: int(b.desired.Processor.Port), + })) +} + +func (b *builder) NewKafkaPipeline() PipelineBuilder { + decoder := api.Decoder{Type: "protobuf"} + if helper.UseIPFIX(b.desired) { + decoder = api.Decoder{Type: "json"} + } + return b.initPipeline(config.NewKafkaPipeline("kafka-read", api.IngestKafka{ + Brokers: []string{b.desired.Kafka.Address}, + Topic: b.desired.Kafka.Topic, + GroupId: b.name(), // Without groupid, each message is delivered to each consumers + Decoder: decoder, + TLS: getKafkaTLS(&b.desired.Kafka.TLS, "kafka-cert", &b.volumes), + SASL: getKafkaSASL(&b.desired.Kafka.SASL, "kafka-ingest", &b.volumes), + PullQueueCapacity: b.desired.Processor.KafkaConsumerQueueCapacity, + PullMaxBytes: b.desired.Processor.KafkaConsumerBatchSize, + })) +} + +func (b *builder) initPipeline(ingest config.PipelineBuilderStage) PipelineBuilder { + pipeline := newPipelineBuilder(b.desired, b.info.Loki, &b.volumes, &ingest) + b.pipeline = &pipeline + return pipeline +} func (b *builder) portProtocol() corev1.Protocol { if helper.UseEBPF(b.desired) { @@ -235,380 +265,31 @@ func (b *builder) podTemplate(hasHostPort, hostNetwork bool, annotations map[str } } -func (b *builder) addTransformStages(stage *config.PipelineBuilderStage) error { - lastStage := *stage - indexFields := constants.LokiIndexFields - - lastStage = b.addTransformFilter(lastStage) - - indexFields, lastStage = b.addConnectionTracking(indexFields, lastStage) - - // enrich stage (transform) configuration - enrichedStage := lastStage.TransformNetwork("enrich", api.TransformNetwork{ - Rules: api.NetworkTransformRules{{ - Input: "SrcAddr", - Output: "SrcK8S", - Type: api.AddKubernetesRuleType, - }, { - Input: "DstAddr", - Output: "DstK8S", - Type: api.AddKubernetesRuleType, - }, { - Type: api.ReinterpretDirectionRuleType, - }}, - DirectionInfo: api.NetworkTransformDirectionInfo{ - ReporterIPField: "AgentIP", - SrcHostField: "SrcK8S_HostIP", - DstHostField: "DstK8S_HostIP", - FlowDirectionField: "FlowDirection", - IfDirectionField: "IfDirection", - }, - }) - - // loki stage (write) configuration - if helper.UseLoki(b.desired) { - lokiWrite := api.WriteLoki{ - Labels: indexFields, - BatchSize: int(b.loki.BatchSize), - BatchWait: helper.UnstructuredDuration(b.loki.BatchWait), - MaxBackoff: helper.UnstructuredDuration(b.loki.MaxBackoff), - MaxRetries: int(helper.PtrInt32(b.loki.MaxRetries)), - MinBackoff: helper.UnstructuredDuration(b.loki.MinBackoff), - StaticLabels: model.LabelSet{}, - Timeout: helper.UnstructuredDuration(b.loki.Timeout), - URL: b.loki.IngesterURL, - TimestampLabel: "TimeFlowEndMs", - TimestampScale: "1ms", - TenantID: b.loki.TenantID, - } - - for k, v := range b.desired.Loki.StaticLabels { - lokiWrite.StaticLabels[model.LabelName(k)] = model.LabelValue(v) - } - - var authorization *promConfig.Authorization - if b.loki.UseHostToken() || b.loki.UseForwardToken() { - b.volumes.AddToken(constants.FLPName) - authorization = &promConfig.Authorization{ - Type: "Bearer", - CredentialsFile: constants.TokensPath + constants.FLPName, - } - } - - if b.loki.TLS.Enable { - if b.loki.TLS.InsecureSkipVerify { - lokiWrite.ClientConfig = &promConfig.HTTPClientConfig{ - Authorization: authorization, - TLSConfig: promConfig.TLSConfig{ - InsecureSkipVerify: true, - }, - } - } else { - caPath := b.volumes.AddCACertificate(&b.loki.TLS, "loki-certs") - lokiWrite.ClientConfig = &promConfig.HTTPClientConfig{ - Authorization: authorization, - TLSConfig: promConfig.TLSConfig{ - CAFile: caPath, - }, - } - } - } else { - lokiWrite.ClientConfig = &promConfig.HTTPClientConfig{ - Authorization: authorization, - } - } - enrichedStage.WriteLoki("loki", lokiWrite) - } - - // write on Stdout if logging trace enabled - if b.desired.Processor.LogLevel == "trace" { - enrichedStage.WriteStdout("stdout", api.WriteStdout{Format: "json"}) - } - - // obtain encode_prometheus stage from metrics_definitions - names := helper.GetIncludeList(&b.desired.Processor.Metrics) - promMetrics := metrics.GetDefinitions(names) - - if len(promMetrics) > 0 { - // prometheus stage (encode) configuration - promEncode := api.PromEncode{ - Prefix: "netobserv_", - Metrics: promMetrics, - } - enrichedStage.EncodePrometheus("prometheus", promEncode) +// returns a configmap with a digest of its configuration contents, which will be used to +// detect any configuration change +func (b *builder) ConfigMap() (*corev1.ConfigMap, string, error) { + configStr, err := b.GetJSONConfig() + if err != nil { + return nil, "", err } - b.addCustomExportStages(&enrichedStage) - return nil -} - -func (b *builder) addConnectionTracking(indexFields []string, lastStage config.PipelineBuilderStage) ([]string, config.PipelineBuilderStage) { - outputFields := []api.OutputField{ - { - Name: "Bytes", - Operation: "sum", - }, - { - Name: "Bytes", - Operation: "sum", - SplitAB: true, - }, - { - Name: "Packets", - Operation: "sum", - }, - { - Name: "Packets", - Operation: "sum", - SplitAB: true, - }, - { - Name: "numFlowLogs", - Operation: "count", - }, - { - Name: "TimeFlowStartMs", - Operation: "min", - ReportMissing: true, - }, - { - Name: "TimeFlowEndMs", - Operation: "max", - ReportMissing: true, - }, - { - Name: "FlowDirection", - Operation: "first", - ReportMissing: true, - }, - { - Name: "IfDirection", - Operation: "first", - ReportMissing: true, + configMap := corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: b.configMapName(), + Namespace: b.info.Namespace, + Labels: b.labels, }, - { - Name: "AgentIP", - Operation: "first", - ReportMissing: true, + Data: map[string]string{ + configFile: configStr, }, } - - if helper.IsPktDropEnabled(&b.desired.Agent.EBPF) { - outputPktDropFields := []api.OutputField{ - { - Name: "PktDropBytes", - Operation: "sum", - }, - { - Name: "PktDropBytes", - Operation: "sum", - SplitAB: true, - }, - { - Name: "PktDropPackets", - Operation: "sum", - }, - { - Name: "PktDropPackets", - Operation: "sum", - SplitAB: true, - }, - { - Name: "PktDropLatestState", - Operation: "last", - }, - { - Name: "PktDropLatestDropCause", - Operation: "last", - }, - } - outputFields = append(outputFields, outputPktDropFields...) - } - - if helper.IsDNSTrackingEnabled(&b.desired.Agent.EBPF) { - outDNSTrackingFields := []api.OutputField{ - { - Name: "DnsFlagsResponseCode", - Operation: "last", - }, - { - Name: "DnsLatencyMs", - Operation: "max", - }, - } - outputFields = append(outputFields, outDNSTrackingFields...) - } - - if helper.IsFlowRTTEnabled(&b.desired.Agent.EBPF) { - outputFields = append(outputFields, api.OutputField{ - Name: "MaxTimeFlowRttNs", - Operation: "max", - Input: "TimeFlowRttNs", - }) - } - - // Connection tracking stage (only if LogTypes is not FLOWS) - if b.desired.Processor.LogTypes != nil && *b.desired.Processor.LogTypes != flowslatest.LogTypeFlows { - indexFields = append(indexFields, constants.LokiConnectionIndexFields...) - outputRecordTypes := helper.GetRecordTypes(&b.desired.Processor) - - terminatingTimeout := conntrackTerminatingTimeout - if b.desired.Processor.ConversationTerminatingTimeout != nil { - terminatingTimeout = b.desired.Processor.ConversationTerminatingTimeout.Duration - } - - endTimeout := conntrackEndTimeout - if b.desired.Processor.ConversationEndTimeout != nil { - endTimeout = b.desired.Processor.ConversationEndTimeout.Duration - } - - heartbeatInterval := conntrackHeartbeatInterval - if b.desired.Processor.ConversationHeartbeatInterval != nil { - heartbeatInterval = b.desired.Processor.ConversationHeartbeatInterval.Duration - } - - lastStage = lastStage.ConnTrack("extract_conntrack", api.ConnTrack{ - KeyDefinition: api.KeyDefinition{ - FieldGroups: []api.FieldGroup{ - {Name: "src", Fields: []string{"SrcAddr", "SrcPort"}}, - {Name: "dst", Fields: []string{"DstAddr", "DstPort"}}, - {Name: "common", Fields: []string{"Proto"}}, - }, - Hash: api.ConnTrackHash{ - FieldGroupRefs: []string{ - "common", - }, - FieldGroupARef: "src", - FieldGroupBRef: "dst", - }, - }, - OutputRecordTypes: outputRecordTypes, - OutputFields: outputFields, - Scheduling: []api.ConnTrackSchedulingGroup{ - { - Selector: nil, // Default group. Match all flowlogs - HeartbeatInterval: api.Duration{Duration: heartbeatInterval}, - EndConnectionTimeout: api.Duration{Duration: endTimeout}, - TerminatingTimeout: api.Duration{Duration: terminatingTimeout}, - }, - }, - TCPFlags: api.ConnTrackTCPFlags{ - FieldName: "Flags", - DetectEndConnection: true, - SwapAB: true, - }, - }) - } - return indexFields, lastStage -} - -func (b *builder) addTransformFilter(lastStage config.PipelineBuilderStage) config.PipelineBuilderStage { - var clusterName string - transformFilterRules := []api.TransformFilterRule{} - - if b.desired.Processor.ClusterName != "" { - clusterName = b.desired.Processor.ClusterName - } else { - //take clustername from openshift - clusterName = string(globals.DefaultClusterID) - } - if clusterName != "" { - transformFilterRules = []api.TransformFilterRule{ - { - Input: "K8S_ClusterName", - Type: "add_field_if_doesnt_exist", - Value: clusterName, - }, - } - } - - // Filter-out unused fields? - if helper.PtrBool(b.desired.Processor.DropUnusedFields) { - if helper.UseIPFIX(b.desired) { - rules := filters.GetOVSGoflowUnusedRules() - transformFilterRules = append(transformFilterRules, rules...) - } - // Else: nothing for eBPF at the moment - } - if len(transformFilterRules) > 0 { - lastStage = lastStage.TransformFilter("filter", api.TransformFilter{ - Rules: transformFilterRules, - }) - } - return lastStage -} - -func (b *builder) addCustomExportStages(enrichedStage *config.PipelineBuilderStage) { - for i, exporter := range b.desired.Exporters { - if exporter.Type == flowslatest.KafkaExporter { - b.createKafkaWriteStage(fmt.Sprintf("kafka-export-%d", i), &exporter.Kafka, enrichedStage) - } - if exporter.Type == flowslatest.IpfixExporter { - createIPFIXWriteStage(fmt.Sprintf("IPFIX-export-%d", i), &exporter.IPFIX, enrichedStage) - } - } -} - -func (b *builder) createKafkaWriteStage(name string, spec *flowslatest.FlowCollectorKafka, fromStage *config.PipelineBuilderStage) config.PipelineBuilderStage { - return fromStage.EncodeKafka(name, api.EncodeKafka{ - Address: spec.Address, - Topic: spec.Topic, - TLS: b.getKafkaTLS(&spec.TLS, name), - SASL: b.getKafkaSASL(&spec.SASL, name), - }) -} - -func createIPFIXWriteStage(name string, spec *flowslatest.FlowCollectorIPFIXReceiver, fromStage *config.PipelineBuilderStage) config.PipelineBuilderStage { - return fromStage.WriteIpfix(name, api.WriteIpfix{ - TargetHost: spec.TargetHost, - TargetPort: spec.TargetPort, - Transport: getIPFIXTransport(spec.Transport), - EnterpriseID: 2, - }) -} - -func (b *builder) getKafkaTLS(tls *flowslatest.ClientTLS, volumeName string) *api.ClientTLS { - if tls.Enable { - caPath, userCertPath, userKeyPath := b.volumes.AddMutualTLSCertificates(tls, volumeName) - return &api.ClientTLS{ - InsecureSkipVerify: tls.InsecureSkipVerify, - CACertPath: caPath, - UserCertPath: userCertPath, - UserKeyPath: userKeyPath, - } - } - return nil -} - -func (b *builder) getKafkaSASL(sasl *flowslatest.SASLConfig, volumePrefix string) *api.SASLConfig { - if !helper.UseSASL(sasl) { - return nil - } - t := "plain" - if sasl.Type == flowslatest.SASLScramSHA512 { - t = "scramSHA512" - } - idPath := b.volumes.AddVolume(&sasl.ClientIDReference, volumePrefix+"-sasl-id") - secretPath := b.volumes.AddVolume(&sasl.ClientSecretReference, volumePrefix+"-sasl-secret") - return &api.SASLConfig{ - Type: t, - ClientIDPath: idPath, - ClientSecretPath: secretPath, - } -} - -func getIPFIXTransport(transport string) string { - switch transport { - case "UDP": - return "udp" - default: - return "tcp" //always fallback on tcp - } + hasher := fnv.New64a() + _, _ = hasher.Write([]byte(configStr)) + digest := strconv.FormatUint(hasher.Sum64(), 36) + return &configMap, digest, nil } -// returns a configmap with a digest of its configuration contents, which will be used to -// detect any configuration change -func (b *builder) configMap(stages []config.Stage, parameters []config.StageParam) (*corev1.ConfigMap, string, error) { +func (b *builder) GetJSONConfig() (string, error) { metricsSettings := config.MetricsSettings{ Port: int(b.desired.Processor.Metrics.Server.Port), Prefix: "netobserv_", @@ -628,8 +309,8 @@ func (b *builder) configMap(stages []config.Stage, parameters []config.StagePara "health": map[string]interface{}{ "port": b.desired.Processor.HealthPort, }, - "pipeline": stages, - "parameters": parameters, + "pipeline": b.pipeline.GetStages(), + "parameters": b.pipeline.GetStageParams(), "metricsSettings": metricsSettings, } if b.desired.Processor.ProfilePort > 0 { @@ -640,24 +321,9 @@ func (b *builder) configMap(stages []config.Stage, parameters []config.StagePara bs, err := json.Marshal(config) if err != nil { - return nil, "", err + return "", err } - configStr := string(bs) - - configMap := corev1.ConfigMap{ - ObjectMeta: metav1.ObjectMeta{ - Name: b.configMapName(), - Namespace: b.info.Namespace, - Labels: b.labels, - }, - Data: map[string]string{ - configFile: configStr, - }, - } - hasher := fnv.New64a() - _, _ = hasher.Write([]byte(configStr)) - digest := strconv.FormatUint(hasher.Sum64(), 36) - return &configMap, digest, nil + return string(bs), nil } func (b *builder) promService() *corev1.Service { diff --git a/controllers/flowlogspipeline/flp_ingest_objects.go b/controllers/flowlogspipeline/flp_ingest_objects.go index a8c3cec8f..834f34160 100644 --- a/controllers/flowlogspipeline/flp_ingest_objects.go +++ b/controllers/flowlogspipeline/flp_ingest_objects.go @@ -6,8 +6,6 @@ import ( rbacv1 "k8s.io/api/rbac/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "github.com/netobserv/flowlogs-pipeline/pkg/api" - "github.com/netobserv/flowlogs-pipeline/pkg/config" flowslatest "github.com/netobserv/network-observability-operator/api/v1beta2" "github.com/netobserv/network-observability-operator/controllers/reconcilers" "github.com/netobserv/network-observability-operator/pkg/helper" @@ -18,7 +16,7 @@ type ingestBuilder struct { } func newIngestBuilder(info *reconcilers.Instance, desired *flowslatest.FlowCollectorSpec) (ingestBuilder, error) { - gen, err := newBuilder(info, desired, ConfKafkaIngester) + gen, err := NewBuilder(info, desired, ConfKafkaIngester) return ingestBuilder{ generic: gen, }, err @@ -42,31 +40,17 @@ func (b *ingestBuilder) daemonSet(annotations map[string]string) *appsv1.DaemonS } func (b *ingestBuilder) configMap() (*corev1.ConfigMap, string, error) { - stages, params, err := b.buildPipelineConfig() - if err != nil { - return nil, "", err - } - return b.generic.configMap(stages, params) -} - -func (b *ingestBuilder) buildPipelineConfig() ([]config.Stage, []config.StageParam, error) { - var pipeline config.PipelineBuilderStage + var pipeline PipelineBuilder if helper.UseIPFIX(b.generic.desired) { // IPFIX collector - pipeline = config.NewCollectorPipeline("ipfix", api.IngestCollector{ - Port: int(b.generic.desired.Processor.Port), - HostName: "0.0.0.0", - }) + pipeline = b.generic.NewIPFIXPipeline() } else { // GRPC collector (eBPF agent) - pipeline = config.NewGRPCPipeline("grpc", api.IngestGRPCProto{ - Port: int(b.generic.desired.Processor.Port), - }) + pipeline = b.generic.NewGRPCPipeline() } - pipeline = b.generic.createKafkaWriteStage("kafka-write", &b.generic.desired.Kafka, &pipeline) - - return pipeline.GetStages(), pipeline.GetStageParams(), nil + pipeline.AddKafkaWriteStage("kafka-write", &b.generic.desired.Kafka) + return b.generic.ConfigMap() } func (b *ingestBuilder) promService() *corev1.Service { diff --git a/controllers/flowlogspipeline/flp_monolith_objects.go b/controllers/flowlogspipeline/flp_monolith_objects.go index e846d1e07..4f0bab0b9 100644 --- a/controllers/flowlogspipeline/flp_monolith_objects.go +++ b/controllers/flowlogspipeline/flp_monolith_objects.go @@ -6,8 +6,6 @@ import ( rbacv1 "k8s.io/api/rbac/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "github.com/netobserv/flowlogs-pipeline/pkg/api" - "github.com/netobserv/flowlogs-pipeline/pkg/config" flowslatest "github.com/netobserv/network-observability-operator/api/v1beta2" "github.com/netobserv/network-observability-operator/controllers/reconcilers" "github.com/netobserv/network-observability-operator/pkg/helper" @@ -18,7 +16,7 @@ type monolithBuilder struct { } func newMonolithBuilder(info *reconcilers.Instance, desired *flowslatest.FlowCollectorSpec) (monolithBuilder, error) { - gen, err := newBuilder(info, desired, ConfMonolith) + gen, err := NewBuilder(info, desired, ConfMonolith) return monolithBuilder{ generic: gen, }, err @@ -42,34 +40,20 @@ func (b *monolithBuilder) daemonSet(annotations map[string]string) *appsv1.Daemo } func (b *monolithBuilder) configMap() (*corev1.ConfigMap, string, error) { - stages, params, err := b.buildPipelineConfig() - if err != nil { - return nil, "", err - } - pipelineConfigMap, digest, err := b.generic.configMap(stages, params) - return pipelineConfigMap, digest, err -} - -func (b *monolithBuilder) buildPipelineConfig() ([]config.Stage, []config.StageParam, error) { - var pipeline config.PipelineBuilderStage + var pipeline PipelineBuilder if helper.UseIPFIX(b.generic.desired) { // IPFIX collector - pipeline = config.NewCollectorPipeline("ipfix", api.IngestCollector{ - Port: int(b.generic.desired.Processor.Port), - HostName: "0.0.0.0", - }) + pipeline = b.generic.NewIPFIXPipeline() } else { // GRPC collector (eBPF agent) - pipeline = config.NewGRPCPipeline("grpc", api.IngestGRPCProto{ - Port: int(b.generic.desired.Processor.Port), - }) + pipeline = b.generic.NewGRPCPipeline() } - err := b.generic.addTransformStages(&pipeline) + err := pipeline.AddProcessorStages() if err != nil { - return nil, nil, err + return nil, "", err } - return pipeline.GetStages(), pipeline.GetStageParams(), nil + return b.generic.ConfigMap() } func (b *monolithBuilder) promService() *corev1.Service { diff --git a/controllers/flowlogspipeline/flp_monolith_reconciler.go b/controllers/flowlogspipeline/flp_monolith_reconciler.go index 5c0a987c9..a3e370a35 100644 --- a/controllers/flowlogspipeline/flp_monolith_reconciler.go +++ b/controllers/flowlogspipeline/flp_monolith_reconciler.go @@ -182,7 +182,7 @@ func (r *flpMonolithReconciler) reconcilePermissions(ctx context.Context, builde if err := r.ReconcileClusterRole(ctx, cr); err != nil { return err } - cr = buildClusterRoleTransformer() + cr = BuildClusterRoleTransformer() if err := r.ReconcileClusterRole(ctx, cr); err != nil { return err } diff --git a/controllers/flowlogspipeline/flp_pipeline_builder.go b/controllers/flowlogspipeline/flp_pipeline_builder.go new file mode 100644 index 000000000..bb842191f --- /dev/null +++ b/controllers/flowlogspipeline/flp_pipeline_builder.go @@ -0,0 +1,421 @@ +package flowlogspipeline + +import ( + "fmt" + "time" + + "github.com/netobserv/flowlogs-pipeline/pkg/api" + "github.com/netobserv/flowlogs-pipeline/pkg/config" + promConfig "github.com/prometheus/common/config" + "github.com/prometheus/common/model" + + flowslatest "github.com/netobserv/network-observability-operator/api/v1beta2" + "github.com/netobserv/network-observability-operator/controllers/constants" + "github.com/netobserv/network-observability-operator/controllers/globals" + "github.com/netobserv/network-observability-operator/pkg/filters" + "github.com/netobserv/network-observability-operator/pkg/helper" + "github.com/netobserv/network-observability-operator/pkg/metrics" + "github.com/netobserv/network-observability-operator/pkg/volumes" +) + +const ( + conntrackTerminatingTimeout = 5 * time.Second + conntrackEndTimeout = 10 * time.Second + conntrackHeartbeatInterval = 30 * time.Second +) + +type PipelineBuilder struct { + *config.PipelineBuilderStage + desired *flowslatest.FlowCollectorSpec + volumes *volumes.Builder + loki *helper.LokiConfig +} + +func newPipelineBuilder( + desired *flowslatest.FlowCollectorSpec, + loki *helper.LokiConfig, + volumes *volumes.Builder, + pipeline *config.PipelineBuilderStage, +) PipelineBuilder { + return PipelineBuilder{ + PipelineBuilderStage: pipeline, + desired: desired, + loki: loki, + volumes: volumes, + } +} + +func (b *PipelineBuilder) AddProcessorStages() error { + lastStage := *b.PipelineBuilderStage + indexFields := constants.LokiIndexFields + + lastStage = b.addTransformFilter(lastStage) + + indexFields, lastStage = b.addConnectionTracking(indexFields, lastStage) + + // enrich stage (transform) configuration + enrichedStage := lastStage.TransformNetwork("enrich", api.TransformNetwork{ + Rules: api.NetworkTransformRules{{ + Input: "SrcAddr", + Output: "SrcK8S", + Type: api.AddKubernetesRuleType, + }, { + Input: "DstAddr", + Output: "DstK8S", + Type: api.AddKubernetesRuleType, + }, { + Type: api.ReinterpretDirectionRuleType, + }}, + DirectionInfo: api.NetworkTransformDirectionInfo{ + ReporterIPField: "AgentIP", + SrcHostField: "SrcK8S_HostIP", + DstHostField: "DstK8S_HostIP", + FlowDirectionField: "FlowDirection", + IfDirectionField: "IfDirection", + }, + }) + + // loki stage (write) configuration + if helper.UseLoki(b.desired) { + lokiWrite := api.WriteLoki{ + Labels: indexFields, + BatchSize: int(b.loki.BatchSize), + BatchWait: helper.UnstructuredDuration(b.loki.BatchWait), + MaxBackoff: helper.UnstructuredDuration(b.loki.MaxBackoff), + MaxRetries: int(helper.PtrInt32(b.loki.MaxRetries)), + MinBackoff: helper.UnstructuredDuration(b.loki.MinBackoff), + StaticLabels: model.LabelSet{}, + Timeout: helper.UnstructuredDuration(b.loki.Timeout), + URL: b.loki.IngesterURL, + TimestampLabel: "TimeFlowEndMs", + TimestampScale: "1ms", + TenantID: b.loki.TenantID, + } + + for k, v := range b.desired.Loki.StaticLabels { + lokiWrite.StaticLabels[model.LabelName(k)] = model.LabelValue(v) + } + + var authorization *promConfig.Authorization + if b.loki.UseHostToken() || b.loki.UseForwardToken() { + b.volumes.AddToken(constants.FLPName) + authorization = &promConfig.Authorization{ + Type: "Bearer", + CredentialsFile: constants.TokensPath + constants.FLPName, + } + } + + if b.loki.TLS.Enable { + if b.loki.TLS.InsecureSkipVerify { + lokiWrite.ClientConfig = &promConfig.HTTPClientConfig{ + Authorization: authorization, + TLSConfig: promConfig.TLSConfig{ + InsecureSkipVerify: true, + }, + } + } else { + caPath := b.volumes.AddCACertificate(&b.loki.TLS, "loki-certs") + lokiWrite.ClientConfig = &promConfig.HTTPClientConfig{ + Authorization: authorization, + TLSConfig: promConfig.TLSConfig{ + CAFile: caPath, + }, + } + } + } else { + lokiWrite.ClientConfig = &promConfig.HTTPClientConfig{ + Authorization: authorization, + } + } + enrichedStage.WriteLoki("loki", lokiWrite) + } + + // write on Stdout if logging trace enabled + if b.desired.Processor.LogLevel == "trace" { + enrichedStage.WriteStdout("stdout", api.WriteStdout{Format: "json"}) + } + + // obtain encode_prometheus stage from metrics_definitions + names := helper.GetIncludeList(&b.desired.Processor.Metrics) + promMetrics := metrics.GetDefinitions(names) + + if len(promMetrics) > 0 { + // prometheus stage (encode) configuration + promEncode := api.PromEncode{ + Prefix: "netobserv_", + Metrics: promMetrics, + } + enrichedStage.EncodePrometheus("prometheus", promEncode) + } + + b.addCustomExportStages(&enrichedStage) + return nil +} + +func (b *PipelineBuilder) addConnectionTracking(indexFields []string, lastStage config.PipelineBuilderStage) ([]string, config.PipelineBuilderStage) { + outputFields := []api.OutputField{ + { + Name: "Bytes", + Operation: "sum", + }, + { + Name: "Bytes", + Operation: "sum", + SplitAB: true, + }, + { + Name: "Packets", + Operation: "sum", + }, + { + Name: "Packets", + Operation: "sum", + SplitAB: true, + }, + { + Name: "numFlowLogs", + Operation: "count", + }, + { + Name: "TimeFlowStartMs", + Operation: "min", + ReportMissing: true, + }, + { + Name: "TimeFlowEndMs", + Operation: "max", + ReportMissing: true, + }, + { + Name: "FlowDirection", + Operation: "first", + ReportMissing: true, + }, + { + Name: "IfDirection", + Operation: "first", + ReportMissing: true, + }, + { + Name: "AgentIP", + Operation: "first", + ReportMissing: true, + }, + } + + if helper.IsPktDropEnabled(&b.desired.Agent.EBPF) { + outputPktDropFields := []api.OutputField{ + { + Name: "PktDropBytes", + Operation: "sum", + }, + { + Name: "PktDropBytes", + Operation: "sum", + SplitAB: true, + }, + { + Name: "PktDropPackets", + Operation: "sum", + }, + { + Name: "PktDropPackets", + Operation: "sum", + SplitAB: true, + }, + { + Name: "PktDropLatestState", + Operation: "last", + }, + { + Name: "PktDropLatestDropCause", + Operation: "last", + }, + } + outputFields = append(outputFields, outputPktDropFields...) + } + + if helper.IsDNSTrackingEnabled(&b.desired.Agent.EBPF) { + outDNSTrackingFields := []api.OutputField{ + { + Name: "DnsFlagsResponseCode", + Operation: "last", + }, + { + Name: "DnsLatencyMs", + Operation: "max", + }, + } + outputFields = append(outputFields, outDNSTrackingFields...) + } + + if helper.IsFlowRTTEnabled(&b.desired.Agent.EBPF) { + outputFields = append(outputFields, api.OutputField{ + Name: "MaxTimeFlowRttNs", + Operation: "max", + Input: "TimeFlowRttNs", + }) + } + + // Connection tracking stage (only if LogTypes is not FLOWS) + if b.desired.Processor.LogTypes != nil && *b.desired.Processor.LogTypes != flowslatest.LogTypeFlows { + indexFields = append(indexFields, constants.LokiConnectionIndexFields...) + outputRecordTypes := helper.GetRecordTypes(&b.desired.Processor) + + terminatingTimeout := conntrackTerminatingTimeout + if b.desired.Processor.ConversationTerminatingTimeout != nil { + terminatingTimeout = b.desired.Processor.ConversationTerminatingTimeout.Duration + } + + endTimeout := conntrackEndTimeout + if b.desired.Processor.ConversationEndTimeout != nil { + endTimeout = b.desired.Processor.ConversationEndTimeout.Duration + } + + heartbeatInterval := conntrackHeartbeatInterval + if b.desired.Processor.ConversationHeartbeatInterval != nil { + heartbeatInterval = b.desired.Processor.ConversationHeartbeatInterval.Duration + } + + lastStage = lastStage.ConnTrack("extract_conntrack", api.ConnTrack{ + KeyDefinition: api.KeyDefinition{ + FieldGroups: []api.FieldGroup{ + {Name: "src", Fields: []string{"SrcAddr", "SrcPort"}}, + {Name: "dst", Fields: []string{"DstAddr", "DstPort"}}, + {Name: "common", Fields: []string{"Proto"}}, + }, + Hash: api.ConnTrackHash{ + FieldGroupRefs: []string{ + "common", + }, + FieldGroupARef: "src", + FieldGroupBRef: "dst", + }, + }, + OutputRecordTypes: outputRecordTypes, + OutputFields: outputFields, + Scheduling: []api.ConnTrackSchedulingGroup{ + { + Selector: nil, // Default group. Match all flowlogs + HeartbeatInterval: api.Duration{Duration: heartbeatInterval}, + EndConnectionTimeout: api.Duration{Duration: endTimeout}, + TerminatingTimeout: api.Duration{Duration: terminatingTimeout}, + }, + }, + TCPFlags: api.ConnTrackTCPFlags{ + FieldName: "Flags", + DetectEndConnection: true, + SwapAB: true, + }, + }) + } + return indexFields, lastStage +} + +func (b *PipelineBuilder) addTransformFilter(lastStage config.PipelineBuilderStage) config.PipelineBuilderStage { + var clusterName string + transformFilterRules := []api.TransformFilterRule{} + + if b.desired.Processor.ClusterName != "" { + clusterName = b.desired.Processor.ClusterName + } else { + //take clustername from openshift + clusterName = string(globals.DefaultClusterID) + } + if clusterName != "" { + transformFilterRules = []api.TransformFilterRule{ + { + Input: "K8S_ClusterName", + Type: "add_field_if_doesnt_exist", + Value: clusterName, + }, + } + } + + // Filter-out unused fields? + if helper.PtrBool(b.desired.Processor.DropUnusedFields) { + if helper.UseIPFIX(b.desired) { + rules := filters.GetOVSGoflowUnusedRules() + transformFilterRules = append(transformFilterRules, rules...) + } + // Else: nothing for eBPF at the moment + } + if len(transformFilterRules) > 0 { + lastStage = lastStage.TransformFilter("filter", api.TransformFilter{ + Rules: transformFilterRules, + }) + } + return lastStage +} + +func (b *PipelineBuilder) addCustomExportStages(enrichedStage *config.PipelineBuilderStage) { + for i, exporter := range b.desired.Exporters { + if exporter.Type == flowslatest.KafkaExporter { + b.createKafkaWriteStage(fmt.Sprintf("kafka-export-%d", i), &exporter.Kafka, enrichedStage) + } + if exporter.Type == flowslatest.IpfixExporter { + createIPFIXWriteStage(fmt.Sprintf("IPFIX-export-%d", i), &exporter.IPFIX, enrichedStage) + } + } +} + +func (b *PipelineBuilder) createKafkaWriteStage(name string, spec *flowslatest.FlowCollectorKafka, fromStage *config.PipelineBuilderStage) config.PipelineBuilderStage { + return fromStage.EncodeKafka(name, api.EncodeKafka{ + Address: spec.Address, + Topic: spec.Topic, + TLS: getKafkaTLS(&spec.TLS, name, b.volumes), + SASL: getKafkaSASL(&spec.SASL, name, b.volumes), + }) +} + +func (b *PipelineBuilder) AddKafkaWriteStage(name string, spec *flowslatest.FlowCollectorKafka) config.PipelineBuilderStage { + return b.createKafkaWriteStage(name, spec, b.PipelineBuilderStage) +} + +func createIPFIXWriteStage(name string, spec *flowslatest.FlowCollectorIPFIXReceiver, fromStage *config.PipelineBuilderStage) config.PipelineBuilderStage { + return fromStage.WriteIpfix(name, api.WriteIpfix{ + TargetHost: spec.TargetHost, + TargetPort: spec.TargetPort, + Transport: getIPFIXTransport(spec.Transport), + EnterpriseID: 2, + }) +} + +func getIPFIXTransport(transport string) string { + switch transport { + case "UDP": + return "udp" + default: + return "tcp" //always fallback on tcp + } +} + +func getKafkaTLS(tls *flowslatest.ClientTLS, volumeName string, volumes *volumes.Builder) *api.ClientTLS { + if tls.Enable { + caPath, userCertPath, userKeyPath := volumes.AddMutualTLSCertificates(tls, volumeName) + return &api.ClientTLS{ + InsecureSkipVerify: tls.InsecureSkipVerify, + CACertPath: caPath, + UserCertPath: userCertPath, + UserKeyPath: userKeyPath, + } + } + return nil +} + +func getKafkaSASL(sasl *flowslatest.SASLConfig, volumePrefix string, volumes *volumes.Builder) *api.SASLConfig { + if !helper.UseSASL(sasl) { + return nil + } + t := "plain" + if sasl.Type == flowslatest.SASLScramSHA512 { + t = "scramSHA512" + } + idPath := volumes.AddVolume(&sasl.ClientIDReference, volumePrefix+"-sasl-id") + secretPath := volumes.AddVolume(&sasl.ClientSecretReference, volumePrefix+"-sasl-secret") + return &api.SASLConfig{ + Type: t, + ClientIDPath: idPath, + ClientSecretPath: secretPath, + } +} diff --git a/controllers/flowlogspipeline/flp_test.go b/controllers/flowlogspipeline/flp_test.go index 8d61e552d..297485869 100644 --- a/controllers/flowlogspipeline/flp_test.go +++ b/controllers/flowlogspipeline/flp_test.go @@ -808,23 +808,25 @@ func TestLabels(t *testing.T) { } // This function validate that each stage has its matching parameter -func validatePipelineConfig(stages []config.Stage, parameters []config.StageParam) bool { - for _, stage := range stages { - if stage.Name == "" { - return false - } +func validatePipelineConfig(t *testing.T, cm *corev1.ConfigMap) (*config.ConfigFileStruct, string) { + var cfs config.ConfigFileStruct + err := json.Unmarshal([]byte(cm.Data[configFile]), &cfs) + assert.NoError(t, err) + + for _, stage := range cfs.Pipeline { + assert.NotEmpty(t, stage.Name) exist := false - for _, parameter := range parameters { + for _, parameter := range cfs.Parameters { if stage.Name == parameter.Name { exist = true break } } - if !exist { - return false - } + assert.True(t, exist, "stage params not found", stage.Name) } - return true + b, err := json.Marshal(cfs.Pipeline) + assert.NoError(t, err) + return &cfs, string(b) } func TestPipelineConfig(t *testing.T) { @@ -835,29 +837,35 @@ func TestPipelineConfig(t *testing.T) { cfg := getConfig() cfg.Processor.LogLevel = "info" b := monoBuilder(ns, &cfg) - stages, parameters, err := b.buildPipelineConfig() + cm, _, err := b.configMap() assert.NoError(err) - assert.True(validatePipelineConfig(stages, parameters)) - jsonStages, _ := json.Marshal(stages) - assert.Equal(`[{"name":"ipfix"},{"name":"extract_conntrack","follows":"ipfix"},{"name":"enrich","follows":"extract_conntrack"},{"name":"loki","follows":"enrich"},{"name":"prometheus","follows":"enrich"}]`, string(jsonStages)) + _, pipeline := validatePipelineConfig(t, cm) + assert.Equal( + `[{"name":"ipfix"},{"name":"extract_conntrack","follows":"ipfix"},{"name":"enrich","follows":"extract_conntrack"},{"name":"loki","follows":"enrich"},{"name":"prometheus","follows":"enrich"}]`, + pipeline, + ) // Kafka Ingester cfg.DeploymentModel = flowslatest.DeploymentModelKafka info := reconcilers.Common{Namespace: ns} bi, _ := newIngestBuilder(info.NewInstance(image), &cfg) - stages, parameters, err = bi.buildPipelineConfig() + cm, _, err = bi.configMap() assert.NoError(err) - assert.True(validatePipelineConfig(stages, parameters)) - jsonStages, _ = json.Marshal(stages) - assert.Equal(`[{"name":"ipfix"},{"name":"kafka-write","follows":"ipfix"}]`, string(jsonStages)) + _, pipeline = validatePipelineConfig(t, cm) + assert.Equal( + `[{"name":"ipfix"},{"name":"kafka-write","follows":"ipfix"}]`, + pipeline, + ) // Kafka Transformer bt := transfBuilder(ns, &cfg) - stages, parameters, err = bt.buildPipelineConfig() + cm, _, err = bt.configMap() assert.NoError(err) - assert.True(validatePipelineConfig(stages, parameters)) - jsonStages, _ = json.Marshal(stages) - assert.Equal(`[{"name":"kafka-read"},{"name":"extract_conntrack","follows":"kafka-read"},{"name":"enrich","follows":"extract_conntrack"},{"name":"loki","follows":"enrich"},{"name":"prometheus","follows":"enrich"}]`, string(jsonStages)) + _, pipeline = validatePipelineConfig(t, cm) + assert.Equal( + `[{"name":"kafka-read"},{"name":"extract_conntrack","follows":"kafka-read"},{"name":"enrich","follows":"extract_conntrack"},{"name":"loki","follows":"enrich"},{"name":"prometheus","follows":"enrich"}]`, + pipeline, + ) } func TestPipelineConfigDropUnused(t *testing.T) { @@ -869,13 +877,15 @@ func TestPipelineConfigDropUnused(t *testing.T) { cfg.Processor.LogLevel = "info" cfg.Processor.DropUnusedFields = ptr.To(true) b := monoBuilder(ns, &cfg) - stages, parameters, err := b.buildPipelineConfig() + cm, _, err := b.configMap() assert.NoError(err) - assert.True(validatePipelineConfig(stages, parameters)) - jsonStages, _ := json.Marshal(stages) - assert.Equal(`[{"name":"ipfix"},{"name":"filter","follows":"ipfix"},{"name":"extract_conntrack","follows":"filter"},{"name":"enrich","follows":"extract_conntrack"},{"name":"loki","follows":"enrich"},{"name":"prometheus","follows":"enrich"}]`, string(jsonStages)) + cfs, pipeline := validatePipelineConfig(t, cm) + assert.Equal( + `[{"name":"ipfix"},{"name":"filter","follows":"ipfix"},{"name":"extract_conntrack","follows":"filter"},{"name":"enrich","follows":"extract_conntrack"},{"name":"loki","follows":"enrich"},{"name":"prometheus","follows":"enrich"}]`, + pipeline, + ) - jsonParams, _ := json.Marshal(parameters[1].Transform.Filter) + jsonParams, _ := json.Marshal(cfs.Parameters[1].Transform.Filter) assert.Contains(string(jsonParams), `{"input":"CustomBytes1","type":"remove_field"}`) assert.Contains(string(jsonParams), `{"input":"CustomInteger5","type":"remove_field"}`) assert.Contains(string(jsonParams), `{"input":"MPLS1Label","type":"remove_field"}`) @@ -887,11 +897,13 @@ func TestPipelineTraceStage(t *testing.T) { cfg := getConfig() b := monoBuilder("namespace", &cfg) - stages, parameters, err := b.buildPipelineConfig() + cm, _, err := b.configMap() assert.NoError(err) - assert.True(validatePipelineConfig(stages, parameters)) - jsonStages, _ := json.Marshal(stages) - assert.Equal(`[{"name":"ipfix"},{"name":"extract_conntrack","follows":"ipfix"},{"name":"enrich","follows":"extract_conntrack"},{"name":"loki","follows":"enrich"},{"name":"stdout","follows":"enrich"},{"name":"prometheus","follows":"enrich"}]`, string(jsonStages)) + _, pipeline := validatePipelineConfig(t, cm) + assert.Equal( + `[{"name":"ipfix"},{"name":"extract_conntrack","follows":"ipfix"},{"name":"enrich","follows":"extract_conntrack"},{"name":"loki","follows":"enrich"},{"name":"stdout","follows":"enrich"},{"name":"prometheus","follows":"enrich"}]`, + pipeline, + ) } func getSortedMetricsNames(m []api.PromMetricsItem) []string { @@ -909,12 +921,10 @@ func TestMergeMetricsConfiguration_Default(t *testing.T) { cfg := getConfig() b := monoBuilder("namespace", &cfg) - stages, parameters, err := b.buildPipelineConfig() + cm, _, err := b.configMap() assert.NoError(err) - assert.True(validatePipelineConfig(stages, parameters)) - jsonStages, _ := json.Marshal(stages) - assert.Equal(`[{"name":"ipfix"},{"name":"extract_conntrack","follows":"ipfix"},{"name":"enrich","follows":"extract_conntrack"},{"name":"loki","follows":"enrich"},{"name":"stdout","follows":"enrich"},{"name":"prometheus","follows":"enrich"}]`, string(jsonStages)) - names := getSortedMetricsNames(parameters[5].Encode.Prom.Metrics) + cfs, _ := validatePipelineConfig(t, cm) + names := getSortedMetricsNames(cfs.Parameters[5].Encode.Prom.Metrics) assert.Equal([]string{ "namespace_drop_packets_total", "namespace_flows_total", @@ -922,7 +932,7 @@ func TestMergeMetricsConfiguration_Default(t *testing.T) { "node_ingress_bytes_total", "workload_ingress_bytes_total", }, names) - assert.Equal("netobserv_", parameters[5].Encode.Prom.Prefix) + assert.Equal("netobserv_", cfs.Parameters[5].Encode.Prom.Prefix) } func TestMergeMetricsConfiguration_WithList(t *testing.T) { @@ -932,16 +942,14 @@ func TestMergeMetricsConfiguration_WithList(t *testing.T) { cfg.Processor.Metrics.IncludeList = &[]string{"namespace_egress_bytes_total", "namespace_ingress_bytes_total"} b := monoBuilder("namespace", &cfg) - stages, parameters, err := b.buildPipelineConfig() + cm, _, err := b.configMap() assert.NoError(err) - assert.True(validatePipelineConfig(stages, parameters)) - jsonStages, _ := json.Marshal(stages) - assert.Equal(`[{"name":"ipfix"},{"name":"extract_conntrack","follows":"ipfix"},{"name":"enrich","follows":"extract_conntrack"},{"name":"loki","follows":"enrich"},{"name":"stdout","follows":"enrich"},{"name":"prometheus","follows":"enrich"}]`, string(jsonStages)) - names := getSortedMetricsNames(parameters[5].Encode.Prom.Metrics) + cfs, _ := validatePipelineConfig(t, cm) + names := getSortedMetricsNames(cfs.Parameters[5].Encode.Prom.Metrics) assert.Len(names, 2) assert.Equal("namespace_egress_bytes_total", names[0]) assert.Equal("namespace_ingress_bytes_total", names[1]) - assert.Equal("netobserv_", parameters[5].Encode.Prom.Prefix) + assert.Equal("netobserv_", cfs.Parameters[5].Encode.Prom.Prefix) } func TestMergeMetricsConfiguration_EmptyList(t *testing.T) { @@ -951,12 +959,10 @@ func TestMergeMetricsConfiguration_EmptyList(t *testing.T) { cfg.Processor.Metrics.IncludeList = &[]string{} b := monoBuilder("namespace", &cfg) - stages, parameters, err := b.buildPipelineConfig() + cm, _, err := b.configMap() assert.NoError(err) - assert.True(validatePipelineConfig(stages, parameters)) - jsonStages, _ := json.Marshal(stages) - assert.Equal(`[{"name":"ipfix"},{"name":"extract_conntrack","follows":"ipfix"},{"name":"enrich","follows":"extract_conntrack"},{"name":"loki","follows":"enrich"},{"name":"stdout","follows":"enrich"}]`, string(jsonStages)) - assert.Len(parameters, 5) + cfs, _ := validatePipelineConfig(t, cm) + assert.Len(cfs.Parameters, 5) } func TestPipelineWithExporter(t *testing.T) { @@ -978,18 +984,20 @@ func TestPipelineWithExporter(t *testing.T) { }) b := monoBuilder("namespace", &cfg) - stages, parameters, err := b.buildPipelineConfig() + cm, _, err := b.configMap() assert.NoError(err) - assert.True(validatePipelineConfig(stages, parameters)) - jsonStages, _ := json.Marshal(stages) - assert.Equal(`[{"name":"ipfix"},{"name":"extract_conntrack","follows":"ipfix"},{"name":"enrich","follows":"extract_conntrack"},{"name":"loki","follows":"enrich"},{"name":"stdout","follows":"enrich"},{"name":"prometheus","follows":"enrich"},{"name":"kafka-export-0","follows":"enrich"},{"name":"IPFIX-export-1","follows":"enrich"}]`, string(jsonStages)) + cfs, pipeline := validatePipelineConfig(t, cm) + assert.Equal( + `[{"name":"ipfix"},{"name":"extract_conntrack","follows":"ipfix"},{"name":"enrich","follows":"extract_conntrack"},{"name":"loki","follows":"enrich"},{"name":"stdout","follows":"enrich"},{"name":"prometheus","follows":"enrich"},{"name":"kafka-export-0","follows":"enrich"},{"name":"IPFIX-export-1","follows":"enrich"}]`, + pipeline, + ) - assert.Equal("kafka-test", parameters[6].Encode.Kafka.Address) - assert.Equal("topic-test", parameters[6].Encode.Kafka.Topic) + assert.Equal("kafka-test", cfs.Parameters[6].Encode.Kafka.Address) + assert.Equal("topic-test", cfs.Parameters[6].Encode.Kafka.Topic) - assert.Equal("ipfix-receiver-test", parameters[7].Write.Ipfix.TargetHost) - assert.Equal(9999, parameters[7].Write.Ipfix.TargetPort) - assert.Equal("tcp", parameters[7].Write.Ipfix.Transport) + assert.Equal("ipfix-receiver-test", cfs.Parameters[7].Write.Ipfix.TargetHost) + assert.Equal(9999, cfs.Parameters[7].Write.Ipfix.TargetPort) + assert.Equal("tcp", cfs.Parameters[7].Write.Ipfix.Transport) } func TestPipelineWithoutLoki(t *testing.T) { @@ -999,9 +1007,11 @@ func TestPipelineWithoutLoki(t *testing.T) { cfg.Loki.Enable = ptr.To(false) b := monoBuilder("namespace", &cfg) - stages, parameters, err := b.buildPipelineConfig() + cm, _, err := b.configMap() assert.NoError(err) - assert.True(validatePipelineConfig(stages, parameters)) - jsonStages, _ := json.Marshal(stages) - assert.Equal(`[{"name":"ipfix"},{"name":"extract_conntrack","follows":"ipfix"},{"name":"enrich","follows":"extract_conntrack"},{"name":"stdout","follows":"enrich"},{"name":"prometheus","follows":"enrich"}]`, string(jsonStages)) + _, pipeline := validatePipelineConfig(t, cm) + assert.Equal( + `[{"name":"ipfix"},{"name":"extract_conntrack","follows":"ipfix"},{"name":"enrich","follows":"extract_conntrack"},{"name":"stdout","follows":"enrich"},{"name":"prometheus","follows":"enrich"}]`, + pipeline, + ) } diff --git a/controllers/flowlogspipeline/flp_transfo_objects.go b/controllers/flowlogspipeline/flp_transfo_objects.go index 7cab884c6..d576d2c52 100644 --- a/controllers/flowlogspipeline/flp_transfo_objects.go +++ b/controllers/flowlogspipeline/flp_transfo_objects.go @@ -7,11 +7,8 @@ import ( rbacv1 "k8s.io/api/rbac/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "github.com/netobserv/flowlogs-pipeline/pkg/api" - "github.com/netobserv/flowlogs-pipeline/pkg/config" flowslatest "github.com/netobserv/network-observability-operator/api/v1beta2" "github.com/netobserv/network-observability-operator/controllers/reconcilers" - "github.com/netobserv/network-observability-operator/pkg/helper" ) type transfoBuilder struct { @@ -19,7 +16,7 @@ type transfoBuilder struct { } func newTransfoBuilder(info *reconcilers.Instance, desired *flowslatest.FlowCollectorSpec) (transfoBuilder, error) { - gen, err := newBuilder(info, desired, ConfKafkaTransformer) + gen, err := NewBuilder(info, desired, ConfKafkaTransformer) return transfoBuilder{ generic: gen, }, err @@ -44,38 +41,12 @@ func (b *transfoBuilder) deployment(annotations map[string]string) *appsv1.Deplo } func (b *transfoBuilder) configMap() (*corev1.ConfigMap, string, error) { - stages, params, err := b.buildPipelineConfig() + pipeline := b.generic.NewKafkaPipeline() + err := pipeline.AddProcessorStages() if err != nil { return nil, "", err } - configMap, digest, err := b.generic.configMap(stages, params) - return configMap, digest, err -} - -func (b *transfoBuilder) buildPipelineConfig() ([]config.Stage, []config.StageParam, error) { - // TODO in a later optimization patch: set ingester <-> transformer communication also via protobuf - // For now, we leave this communication via JSON and just setup protobuf ingestion when - // the transformer is communicating directly via eBPF agent - decoder := api.Decoder{Type: "protobuf"} - if helper.UseIPFIX(b.generic.desired) { - decoder = api.Decoder{Type: "json"} - } - pipeline := config.NewKafkaPipeline("kafka-read", api.IngestKafka{ - Brokers: []string{b.generic.desired.Kafka.Address}, - Topic: b.generic.desired.Kafka.Topic, - GroupId: b.generic.name(), // Without groupid, each message is delivered to each consumers - Decoder: decoder, - TLS: b.generic.getKafkaTLS(&b.generic.desired.Kafka.TLS, "kafka-cert"), - SASL: b.generic.getKafkaSASL(&b.generic.desired.Kafka.SASL, "kafka-ingest"), - PullQueueCapacity: b.generic.desired.Processor.KafkaConsumerQueueCapacity, - PullMaxBytes: b.generic.desired.Processor.KafkaConsumerBatchSize, - }) - - err := b.generic.addTransformStages(&pipeline) - if err != nil { - return nil, nil, err - } - return pipeline.GetStages(), pipeline.GetStageParams(), nil + return b.generic.ConfigMap() } func (b *transfoBuilder) promService() *corev1.Service { @@ -107,7 +78,7 @@ func (b *transfoBuilder) autoScaler() *ascv2.HorizontalPodAutoscaler { //+kubebuilder:rbac:groups=autoscaling,resources=horizontalpodautoscalers,verbs=create;delete;patch;update;get;watch;list //+kubebuilder:rbac:groups=core,resources=pods;services;nodes,verbs=get;list;watch -func buildClusterRoleTransformer() *rbacv1.ClusterRole { +func BuildClusterRoleTransformer() *rbacv1.ClusterRole { return &rbacv1.ClusterRole{ ObjectMeta: metav1.ObjectMeta{ Name: name(ConfKafkaTransformer), diff --git a/controllers/flowlogspipeline/flp_transfo_reconciler.go b/controllers/flowlogspipeline/flp_transfo_reconciler.go index 45c036c8b..e29c201d5 100644 --- a/controllers/flowlogspipeline/flp_transfo_reconciler.go +++ b/controllers/flowlogspipeline/flp_transfo_reconciler.go @@ -203,7 +203,7 @@ func (r *flpTransformerReconciler) reconcilePermissions(ctx context.Context, bui return r.CreateOwned(ctx, builder.serviceAccount()) } // We only configure name, update is not needed for now - cr := buildClusterRoleTransformer() + cr := BuildClusterRoleTransformer() if err := r.ReconcileClusterRole(ctx, cr); err != nil { return err }