From 2051cf5f3973e24bc4c681b95326d302aede63c1 Mon Sep 17 00:00:00 2001 From: Joel Takvorian Date: Thu, 19 May 2022 16:47:40 +0200 Subject: [PATCH 1/2] NETOBSERV-341 configure netflow v5 separately (breaking change if using v5) Fixes #200 --- README.md | 2 +- docs/api.md | 5 ++- pkg/api/api.go | 2 +- pkg/api/ingest_collector.go | 3 +- pkg/pipeline/ingest/ingest_collector.go | 53 ++++++++++++++----------- 5 files changed, 36 insertions(+), 29 deletions(-) diff --git a/README.md b/README.md index 1cec31441..a95170856 100644 --- a/README.md +++ b/README.md @@ -55,7 +55,7 @@ Flags: ``` -> Note: for API details refer to [docs/api.md](docs/api.md). +> Note: for API details refer to [docs/api.md](docs/api.md). > ## Configuration generation diff --git a/docs/api.md b/docs/api.md index 7b6a177dd..465898d2d 100644 --- a/docs/api.md +++ b/docs/api.md @@ -39,12 +39,13 @@ Following is the supported API format for kafka encode: batchSize: limit on how many messages will be buffered before being sent to a partition ## Ingest collector API -Following is the supported API format for the netflow collector: +Following is the supported API format for the NetFlow / IPFIX collector:
  collector:
          hostName: the hostname to listen on
-         port: the port number to listen on
+         port: the port number to listen on, for IPFIX/NetFlow v9. Omit or set to 0 to disable IPFIX/NetFlow v9 ingestion
+         portLegacy: the port number to listen on, for legacy NetFlow v5. Omit or set to 0 to disable NetFlow v5 ingestion
          batchMaxLen: the number of accumulated flows before being forwarded for processing
 
## Ingest Kafka API diff --git a/pkg/api/api.go b/pkg/api/api.go index bffbce4cc..2c4078833 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -26,7 +26,7 @@ const TagEnum = "enum" type API struct { PromEncode PromEncode `yaml:"prom" doc:"## Prometheus encode API\nFollowing is the supported API format for prometheus encode:\n"` KafkaEncode EncodeKafka `yaml:"kafka" doc:"## Kafka encode API\nFollowing is the supported API format for kafka encode:\n"` - IngestCollector IngestCollector `yaml:"collector" doc:"## Ingest collector API\nFollowing is the supported API format for the netflow collector:\n"` + IngestCollector IngestCollector `yaml:"collector" doc:"## Ingest collector API\nFollowing is the supported API format for the NetFlow / IPFIX collector:\n"` IngestKafka IngestKafka `yaml:"kafka" doc:"## Ingest Kafka API\nFollowing is the supported API format for the kafka ingest:\n"` IngestGRPCProto IngestGRPCProto `yaml:"grpc" doc:"## Ingest GRPC from Network Observability eBPF Agent\nFollowing is the supported API format for the Network Observability eBPF ingest:\n"` DecodeAws DecodeAws `yaml:"aws" doc:"## Aws ingest API\nFollowing is the supported API format for Aws flow entries:\n"` diff --git a/pkg/api/ingest_collector.go b/pkg/api/ingest_collector.go index 8c05a6cd3..2dceaa6f0 100644 --- a/pkg/api/ingest_collector.go +++ b/pkg/api/ingest_collector.go @@ -19,6 +19,7 @@ package api type IngestCollector struct { HostName string `yaml:"hostName" doc:"the hostname to listen on"` - Port int `yaml:"port" doc:"the port number to listen on"` + Port int `yaml:"port" doc:"the port number to listen on, for IPFIX/NetFlow v9. Omit or set to 0 to disable IPFIX/NetFlow v9 ingestion"` + PortLegacy int `yaml:"portLegacy" doc:"the port number to listen on, for legacy NetFlow v5. Omit or set to 0 to disable NetFlow v5 ingestion"` BatchMaxLen int `yaml:"batchMaxLen" doc:"the number of accumulated flows before being forwarded for processing"` } diff --git a/pkg/pipeline/ingest/ingest_collector.go b/pkg/pipeline/ingest/ingest_collector.go index f7dbfcf3c..a526fc578 100644 --- a/pkg/pipeline/ingest/ingest_collector.go +++ b/pkg/pipeline/ingest/ingest_collector.go @@ -48,6 +48,7 @@ const ( type ingestCollector struct { hostname string port int + portLegacy int in chan map[string]interface{} batchFlushTime time.Duration batchMaxLength int @@ -124,31 +125,33 @@ func (ingestC *ingestCollector) initCollectorListener(ctx context.Context) { if err != nil { log.Fatal(err) } - go func() { - sNF := &utils.StateNetFlow{ - Format: formatter, - Transport: transporter, - Logger: log.StandardLogger(), - } - - log.Infof("listening for netflow on host %s, port = %d", ingestC.hostname, ingestC.port) - err = sNF.FlowRoutine(1, ingestC.hostname, ingestC.port, false) - log.Fatal(err) - - }() + if ingestC.port > 0 { + go func() { + sNF := &utils.StateNetFlow{ + Format: formatter, + Transport: transporter, + Logger: log.StandardLogger(), + } - go func() { - sLegacyNF := &utils.StateNFLegacy{ - Format: formatter, - Transport: transporter, - Logger: log.StandardLogger(), - } + log.Infof("listening for netflow on host %s, port = %d", ingestC.hostname, ingestC.port) + err = sNF.FlowRoutine(1, ingestC.hostname, ingestC.port, false) + log.Fatal(err) + }() + } - log.Infof("listening for legacy netflow on host %s, port = %d", ingestC.hostname, ingestC.port+1) - err = sLegacyNF.FlowRoutine(1, ingestC.hostname, ingestC.port+1, false) - log.Fatal(err) - }() + if ingestC.portLegacy > 0 { + go func() { + sLegacyNF := &utils.StateNFLegacy{ + Format: formatter, + Transport: transporter, + Logger: log.StandardLogger(), + } + log.Infof("listening for legacy netflow on host %s, port = %d", ingestC.hostname, ingestC.portLegacy) + err = sLegacyNF.FlowRoutine(1, ingestC.hostname, ingestC.portLegacy, false) + log.Fatal(err) + }() + } } func (ingestC *ingestCollector) processLogLines(out chan<- []interface{}) { @@ -193,12 +196,13 @@ func NewIngestCollector(params config.StageParam) (Ingester, error) { if jsonIngestCollector.HostName == "" { return nil, fmt.Errorf("ingest hostname not specified") } - if jsonIngestCollector.Port == 0 { - return nil, fmt.Errorf("ingest port not specified") + if jsonIngestCollector.Port == 0 && jsonIngestCollector.PortLegacy == 0 { + return nil, fmt.Errorf("no ingest port specified") } log.Infof("hostname = %s", jsonIngestCollector.HostName) log.Infof("port = %d", jsonIngestCollector.Port) + log.Infof("portLegacy = %d", jsonIngestCollector.PortLegacy) bml := defaultBatchMaxLength if jsonIngestCollector.BatchMaxLen != 0 { @@ -208,6 +212,7 @@ func NewIngestCollector(params config.StageParam) (Ingester, error) { return &ingestCollector{ hostname: jsonIngestCollector.HostName, port: jsonIngestCollector.Port, + portLegacy: jsonIngestCollector.PortLegacy, exitChan: pUtils.ExitChannel(), batchFlushTime: defaultBatchFlushTime, batchMaxLength: bml, From 75083a00dc1789d15b4e9ab27f1eaf516052daf1 Mon Sep 17 00:00:00 2001 From: Joel Takvorian Date: Mon, 30 May 2022 11:48:24 +0200 Subject: [PATCH 2/2] Update confgen for legacy port --- contrib/kubernetes/flowlogs-pipeline.conf.yaml | 1 + hack/deploy-and-monitor-k8s-network-workload.sh | 1 + network_definitions/config.yaml | 1 + pkg/confgen/flowlogs2metrics_config.go | 5 +++-- pkg/test/e2e/pipline/flp-config.yaml | 1 + 5 files changed, 7 insertions(+), 2 deletions(-) diff --git a/contrib/kubernetes/flowlogs-pipeline.conf.yaml b/contrib/kubernetes/flowlogs-pipeline.conf.yaml index 5be3f62cd..dfe537fca 100644 --- a/contrib/kubernetes/flowlogs-pipeline.conf.yaml +++ b/contrib/kubernetes/flowlogs-pipeline.conf.yaml @@ -5,6 +5,7 @@ parameters: collector: hostname: 0.0.0.0 port: 2055 + portLegacy: 2056 type: collector name: ingest_collector - decode: diff --git a/hack/deploy-and-monitor-k8s-network-workload.sh b/hack/deploy-and-monitor-k8s-network-workload.sh index c3479b99a..61f8f88d8 100755 --- a/hack/deploy-and-monitor-k8s-network-workload.sh +++ b/hack/deploy-and-monitor-k8s-network-workload.sh @@ -29,6 +29,7 @@ pipeline: collector: hostname: 0.0.0.0 port: 2055 + portLegacy: 2056 type: collector decode: type: json diff --git a/network_definitions/config.yaml b/network_definitions/config.yaml index 2db867364..f7c528503 100644 --- a/network_definitions/config.yaml +++ b/network_definitions/config.yaml @@ -6,6 +6,7 @@ description: ingest: collector: port: 2055 + portLegacy: 2056 hostName: 0.0.0.0 transform: generic: diff --git a/pkg/confgen/flowlogs2metrics_config.go b/pkg/confgen/flowlogs2metrics_config.go index 8b9e94877..8c2d8e112 100644 --- a/pkg/confgen/flowlogs2metrics_config.go +++ b/pkg/confgen/flowlogs2metrics_config.go @@ -53,8 +53,9 @@ func (cg *ConfGen) generateFlowlogs2PipelineConfig(fileName string) error { "ingest": map[string]interface{}{ "type": "collector", "collector": map[string]interface{}{ - "port": cg.config.Ingest.Collector.Port, - "hostname": cg.config.Ingest.Collector.HostName, + "port": cg.config.Ingest.Collector.Port, + "portLegacy": cg.config.Ingest.Collector.PortLegacy, + "hostname": cg.config.Ingest.Collector.HostName, }, }, }, diff --git a/pkg/test/e2e/pipline/flp-config.yaml b/pkg/test/e2e/pipline/flp-config.yaml index 322e86aa2..6e4ec89cb 100644 --- a/pkg/test/e2e/pipline/flp-config.yaml +++ b/pkg/test/e2e/pipline/flp-config.yaml @@ -10,6 +10,7 @@ data: collector: hostname: 0.0.0.0 port: 2055 + portLegacy: 2056 type: collector name: ingest_collector - decode: