Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NETOBSERV-341 configure netflow v5 separately (breaking change if using v5) #207

Merged
merged 2 commits into from
May 30, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ Flags:
```
<!---END-AUTO-flowlogs-pipeline_help--->

> Note: for API details refer to [docs/api.md](docs/api.md).
> Note: for API details refer to [docs/api.md](docs/api.md).
>
## Configuration generation

Expand Down
5 changes: 3 additions & 2 deletions docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,13 @@ Following is the supported API format for kafka encode:
batchSize: limit on how many messages will be buffered before being sent to a partition
</pre>
## Ingest collector API
Following is the supported API format for the netflow collector:
Following is the supported API format for the NetFlow / IPFIX collector:

<pre>
collector:
hostName: the hostname to listen on
port: the port number to listen on
port: the port number to listen on, for IPFIX/NetFlow v9. Omit or set to 0 to disable IPFIX/NetFlow v9 ingestion
portLegacy: the port number to listen on, for legacy NetFlow v5. Omit or set to 0 to disable NetFlow v5 ingestion
batchMaxLen: the number of accumulated flows before being forwarded for processing
</pre>
## Ingest Kafka API
Expand Down
2 changes: 1 addition & 1 deletion pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ const TagEnum = "enum"
type API struct {
PromEncode PromEncode `yaml:"prom" doc:"## Prometheus encode API\nFollowing is the supported API format for prometheus encode:\n"`
KafkaEncode EncodeKafka `yaml:"kafka" doc:"## Kafka encode API\nFollowing is the supported API format for kafka encode:\n"`
IngestCollector IngestCollector `yaml:"collector" doc:"## Ingest collector API\nFollowing is the supported API format for the netflow collector:\n"`
IngestCollector IngestCollector `yaml:"collector" doc:"## Ingest collector API\nFollowing is the supported API format for the NetFlow / IPFIX collector:\n"`
IngestKafka IngestKafka `yaml:"kafka" doc:"## Ingest Kafka API\nFollowing is the supported API format for the kafka ingest:\n"`
IngestGRPCProto IngestGRPCProto `yaml:"grpc" doc:"## Ingest GRPC from Network Observability eBPF Agent\nFollowing is the supported API format for the Network Observability eBPF ingest:\n"`
DecodeAws DecodeAws `yaml:"aws" doc:"## Aws ingest API\nFollowing is the supported API format for Aws flow entries:\n"`
Expand Down
3 changes: 2 additions & 1 deletion pkg/api/ingest_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package api

type IngestCollector struct {
HostName string `yaml:"hostName" doc:"the hostname to listen on"`
Port int `yaml:"port" doc:"the port number to listen on"`
Port int `yaml:"port" doc:"the port number to listen on, for IPFIX/NetFlow v9. Omit or set to 0 to disable IPFIX/NetFlow v9 ingestion"`
PortLegacy int `yaml:"portLegacy" doc:"the port number to listen on, for legacy NetFlow v5. Omit or set to 0 to disable NetFlow v5 ingestion"`
BatchMaxLen int `yaml:"batchMaxLen" doc:"the number of accumulated flows before being forwarded for processing"`
}
53 changes: 29 additions & 24 deletions pkg/pipeline/ingest/ingest_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ const (
type ingestCollector struct {
hostname string
port int
portLegacy int
in chan map[string]interface{}
batchFlushTime time.Duration
batchMaxLength int
Expand Down Expand Up @@ -124,31 +125,33 @@ func (ingestC *ingestCollector) initCollectorListener(ctx context.Context) {
if err != nil {
log.Fatal(err)
}
go func() {
sNF := &utils.StateNetFlow{
Format: formatter,
Transport: transporter,
Logger: log.StandardLogger(),
}

log.Infof("listening for netflow on host %s, port = %d", ingestC.hostname, ingestC.port)
err = sNF.FlowRoutine(1, ingestC.hostname, ingestC.port, false)
log.Fatal(err)

}()
if ingestC.port > 0 {
go func() {
sNF := &utils.StateNetFlow{
Format: formatter,
Transport: transporter,
Logger: log.StandardLogger(),
}

go func() {
sLegacyNF := &utils.StateNFLegacy{
Format: formatter,
Transport: transporter,
Logger: log.StandardLogger(),
}
log.Infof("listening for netflow on host %s, port = %d", ingestC.hostname, ingestC.port)
err = sNF.FlowRoutine(1, ingestC.hostname, ingestC.port, false)
log.Fatal(err)
}()
}

log.Infof("listening for legacy netflow on host %s, port = %d", ingestC.hostname, ingestC.port+1)
err = sLegacyNF.FlowRoutine(1, ingestC.hostname, ingestC.port+1, false)
log.Fatal(err)
}()
if ingestC.portLegacy > 0 {
go func() {
sLegacyNF := &utils.StateNFLegacy{
Format: formatter,
Transport: transporter,
Logger: log.StandardLogger(),
}

log.Infof("listening for legacy netflow on host %s, port = %d", ingestC.hostname, ingestC.portLegacy)
err = sLegacyNF.FlowRoutine(1, ingestC.hostname, ingestC.portLegacy, false)
log.Fatal(err)
}()
}
}

func (ingestC *ingestCollector) processLogLines(out chan<- []interface{}) {
Expand Down Expand Up @@ -193,12 +196,13 @@ func NewIngestCollector(params config.StageParam) (Ingester, error) {
if jsonIngestCollector.HostName == "" {
return nil, fmt.Errorf("ingest hostname not specified")
}
if jsonIngestCollector.Port == 0 {
return nil, fmt.Errorf("ingest port not specified")
if jsonIngestCollector.Port == 0 && jsonIngestCollector.PortLegacy == 0 {
return nil, fmt.Errorf("no ingest port specified")
}

log.Infof("hostname = %s", jsonIngestCollector.HostName)
log.Infof("port = %d", jsonIngestCollector.Port)
log.Infof("portLegacy = %d", jsonIngestCollector.PortLegacy)

bml := defaultBatchMaxLength
if jsonIngestCollector.BatchMaxLen != 0 {
Expand All @@ -208,6 +212,7 @@ func NewIngestCollector(params config.StageParam) (Ingester, error) {
return &ingestCollector{
hostname: jsonIngestCollector.HostName,
port: jsonIngestCollector.Port,
portLegacy: jsonIngestCollector.PortLegacy,
exitChan: pUtils.ExitChannel(),
batchFlushTime: defaultBatchFlushTime,
batchMaxLength: bml,
Expand Down