Skip to content

Commit

Permalink
Merge pull request netobserv#120 from mariomac/netobserv-flows
Browse files Browse the repository at this point in the history
Integrate components from Goflow-Kube
  • Loading branch information
Mario Macias authored Mar 8, 2022
2 parents 57d4010 + 29b7dd1 commit e7919b2
Show file tree
Hide file tree
Showing 12 changed files with 434 additions and 65 deletions.
1 change: 1 addition & 0 deletions docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ Following is the supported API format for the netflow collector:
collector:
hostName: the hostname to listen on
port: the port number to listen on
batchMaxLen: the number of accumulated flows before being forwarded for processing
</pre>
## Ingest Kafka API
Following is the supported API format for the kafka ingest:
Expand Down
5 changes: 3 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.17
require (
github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible
github.com/go-kit/kit v0.12.0
github.com/golang/snappy v0.0.3
github.com/heptiolabs/healthcheck v0.0.0-20211123025425-613501dd5deb
github.com/ip2location/ip2location-go/v9 v9.2.0
github.com/json-iterator/go v1.1.12
Expand All @@ -20,6 +21,7 @@ require (
github.com/spf13/pflag v1.0.5
github.com/spf13/viper v1.10.1
github.com/stretchr/testify v1.7.0
github.com/vmware/go-ipfix v0.5.12
golang.org/x/net v0.0.0-20220225172249-27dd8689420f
google.golang.org/protobuf v1.27.1
gopkg.in/yaml.v2 v2.4.0
Expand All @@ -39,7 +41,6 @@ require (
github.com/go-logr/logr v1.2.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/golang/snappy v0.0.3 // indirect
github.com/google/go-cmp v0.5.6 // indirect
github.com/google/gofuzz v1.1.0 // indirect
github.com/googleapis/gnostic v0.5.5 // indirect
Expand All @@ -50,7 +51,7 @@ require (
github.com/klauspost/compress v1.13.6 // indirect
github.com/libp2p/go-reuseport v0.0.2 // indirect
github.com/magiconair/properties v1.8.5 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f // indirect
Expand Down
81 changes: 79 additions & 2 deletions go.sum

Large diffs are not rendered by default.

5 changes: 3 additions & 2 deletions pkg/api/ingest_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package api

type IngestCollector struct {
HostName string `yaml:"hostName" doc:"the hostname to listen on"`
Port int `yaml:"port" doc:"the port number to listen on"`
HostName string `yaml:"hostName" doc:"the hostname to listen on"`
Port int `yaml:"port" doc:"the port number to listen on"`
BatchMaxLen int `yaml:"batchMaxLen" doc:"the number of accumulated flows before being forwarded for processing"`
}
59 changes: 38 additions & 21 deletions pkg/pipeline/ingest/ingest_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,9 @@ import (
"net"
"time"

ms "github.com/mitchellh/mapstructure"
"github.com/netobserv/flowlogs-pipeline/pkg/config"
pUtils "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/utils"

ms "github.com/mitchellh/mapstructure"
goflowFormat "github.com/netsampler/goflow2/format"
goflowCommonFormat "github.com/netsampler/goflow2/format/common"
_ "github.com/netsampler/goflow2/format/protobuf"
Expand All @@ -38,14 +37,19 @@ import (
"google.golang.org/protobuf/proto"
)

const channelSize = 1000
const batchMaxTimeInMilliSecs = 1000
const (
channelSize = 1000
defaultBatchFlushTime = time.Second
defaultBatchMaxLength = 500
)

type ingestCollector struct {
hostname string
port int
in chan map[string]interface{}
exitChan chan bool
hostname string
port int
in chan map[string]interface{}
batchFlushTime time.Duration
batchMaxLength int
exitChan chan bool
}

// TransportWrapper is an implementation of the goflow2 transport interface
Expand Down Expand Up @@ -104,11 +108,11 @@ func (ingestC *ingestCollector) Ingest(out chan<- []interface{}) {

func (ingestC *ingestCollector) initCollectorListener(ctx context.Context) {
transporter := NewWrapper(ingestC.in)
formatter, err := goflowFormat.FindFormat(ctx, "pb")
if err != nil {
log.Fatal(err)
}
go func() {
formatter, err := goflowFormat.FindFormat(ctx, "pb")
if err != nil {
log.Fatal(err)
}
sNF := &utils.StateNetFlow{
Format: formatter,
Transport: transporter,
Expand All @@ -122,10 +126,6 @@ func (ingestC *ingestCollector) initCollectorListener(ctx context.Context) {
}()

go func() {
formatter, err := goflowFormat.FindFormat(ctx, "pb")
if err != nil {
log.Fatal(err)
}
sLegacyNF := &utils.StateNFLegacy{
Format: formatter,
Transport: transporter,
Expand All @@ -141,21 +141,31 @@ func (ingestC *ingestCollector) initCollectorListener(ctx context.Context) {

func (ingestC *ingestCollector) processLogLines(out chan<- []interface{}) {
var records []interface{}
// Maximum batch time for each batch
flushRecords := time.NewTicker(ingestC.batchFlushTime)
defer flushRecords.Stop()
for {
select {
case <-ingestC.exitChan:
log.Debugf("exiting ingestCollector because of signal")
return
case record := <-ingestC.in:
// TODO: for efficiency, consider forwarding directly as map,
// as this is reverted back from string to map in later pipeline stages
recordAsBytes, _ := json.Marshal(record)
records = append(records, string(recordAsBytes))
case <-time.After(time.Millisecond * batchMaxTimeInMilliSecs): // Maximum batch time for each batch
if len(records) >= ingestC.batchMaxLength {
log.Debugf("ingestCollector sending %d entries", len(records))
out <- records
records = []interface{}{}
}
case <-flushRecords.C:
// Process batch of records (if not empty)
if len(records) > 0 {
log.Debugf("ingestCollector sending %d entries", len(records))
out <- records
records = []interface{}{}
}
records = []interface{}{}
}
}
}
Expand All @@ -177,9 +187,16 @@ func NewIngestCollector(params config.StageParam) (Ingester, error) {
ch := make(chan bool, 1)
pUtils.RegisterExitChannel(ch)

bml := defaultBatchMaxLength
if jsonIngestCollector.BatchMaxLen != 0 {
bml = jsonIngestCollector.BatchMaxLen
}

return &ingestCollector{
hostname: jsonIngestCollector.HostName,
port: jsonIngestCollector.Port,
exitChan: ch,
hostname: jsonIngestCollector.HostName,
port: jsonIngestCollector.Port,
exitChan: ch,
batchFlushTime: defaultBatchFlushTime,
batchMaxLength: bml,
}, nil
}
62 changes: 62 additions & 0 deletions pkg/pipeline/ingest/ingest_collector_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package ingest

import (
"encoding/json"
"testing"
"time"

"github.com/netobserv/flowlogs-pipeline/pkg/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

const timeout = 5 * time.Second

func TestIngest(t *testing.T) {
collectorPort, err := test.UDPPort()
require.NoError(t, err)
ic := &ingestCollector{
hostname: "0.0.0.0",
port: collectorPort,
batchFlushTime: 10 * time.Millisecond,
exitChan: make(chan bool),
}
forwarded := make(chan []interface{})
//defer close(forwarded)

// GIVEN an IPFIX collector Ingester
go ic.Ingest(forwarded)

client, err := test.NewIPFIXClient(collectorPort)
require.NoError(t, err)

received := waitForFlows(t, client, forwarded)
require.NotEmpty(t, received)
require.IsType(t, "string", received[0])
flow := map[string]interface{}{}
require.NoError(t, json.Unmarshal([]byte(received[0].(string)), &flow))
assert.EqualValues(t, 12345678, flow["TimeFlowStart"])
assert.EqualValues(t, 12345678, flow["TimeFlowEnd"])
assert.Equal(t, "1.2.3.4", flow["SrcAddr"])
}

// The IPFIX client might send information before the Ingester is actually listening,
// so we might need to repeat the submission until the ingest starts forwarding logs
func waitForFlows(t *testing.T, client *test.IPFIXClient, forwarded chan []interface{}) []interface{} {
var start = time.Now()
for {
if client.SendTemplate() == nil &&
client.SendFlow(12345678, "1.2.3.4") == nil {
select {
case received := <-forwarded:
return received
default:
// nothing yet received
}
}
if time.Since(start) > timeout {
require.Fail(t, "error waiting for ingester to forward received data")
}
time.After(50 * time.Millisecond)
}
}
6 changes: 5 additions & 1 deletion pkg/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,11 @@ func NewPipeline() (*Pipeline, error) {
configParams := config.Parameters
log.Debugf("configParams = %v ", configParams)

return newBuilder(configParams, stages).build()
build := newBuilder(configParams, stages)
if err := build.readStages(); err != nil {
return nil, err
}
return build.build()
}

func (p *Pipeline) Run() {
Expand Down
17 changes: 7 additions & 10 deletions pkg/pipeline/pipeline_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,8 @@ func (b *builder) readStages() error {
}

// reads the configured Go stages and connects between them
// readStages must be invoked before this
func (b *builder) build() (*Pipeline, error) {
if err := b.readStages(); err != nil {
return nil, err
}

for _, connection := range b.configStages {
if connection.Name == "" || connection.Follows == "" {
// ignore entries that do not represent a connection
Expand All @@ -104,7 +101,7 @@ func (b *builder) build() (*Pipeline, error) {
if !ok {
return nil, fmt.Errorf("unknown pipeline stage: %s", connection.Name)
}
dstNode, err := b.getStageNode(dstEntry)
dstNode, err := b.getStageNode(dstEntry, connection.Name)
if err != nil {
return nil, err
}
Expand All @@ -118,14 +115,14 @@ func (b *builder) build() (*Pipeline, error) {
if !ok {
return nil, fmt.Errorf("unknown pipeline stage: %s", connection.Follows)
}
srcNode, err := b.getStageNode(srcEntry)
srcNode, err := b.getStageNode(srcEntry, connection.Follows)
if err != nil {
return nil, err
}
src, ok := srcNode.(node.Sender)
if !ok {
return nil, fmt.Errorf("stage %q of type %q can't send data",
connection.Name, dstEntry.stageType)
connection.Follows, srcEntry.stageType)
}
log.Debugf("connecting stages: %s --> %s", connection.Follows, connection.Name)

Expand Down Expand Up @@ -158,8 +155,8 @@ func (b *builder) build() (*Pipeline, error) {
}, nil
}

func (b *builder) getStageNode(pe *pipelineEntry) (interface{}, error) {
if stg, ok := b.createdStages[pe.configStage.Name]; ok {
func (b *builder) getStageNode(pe *pipelineEntry, stageID string) (interface{}, error) {
if stg, ok := b.createdStages[stageID]; ok {
return stg, nil
}
var stage interface{}
Expand Down Expand Up @@ -205,7 +202,7 @@ func (b *builder) getStageNode(pe *pipelineEntry) (interface{}, error) {
default:
return nil, fmt.Errorf("invalid stage type: %s", pe.stageType)
}
b.createdStages[pe.configStage.Name] = stage
b.createdStages[stageID] = stage
return stage, nil
}

Expand Down
57 changes: 31 additions & 26 deletions pkg/pipeline/transform/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"net"
"os"
"path"
"time"

log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -202,36 +203,13 @@ func (k *KubeData) NewReplicaSetInformer(informerFactory informers.SharedInforme
}

func (k *KubeData) InitFromConfig(kubeConfigPath string) error {
var config *rest.Config
var err error

// Initialization variables
k.stopChan = make(chan struct{})
k.ipInformers = map[string]cache.SharedIndexInformer{}

if kubeConfigPath != "" {
config, err = clientcmd.BuildConfigFromFlags("", kubeConfigPath)
if err != nil {
return fmt.Errorf("can't build config from %s", kubeConfigPath)
}
} else {
kubeConfigPath = os.Getenv(kubeConfigEnvVariable)
if kubeConfigPath != "" {
config, err = clientcmd.BuildConfigFromFlags("", kubeConfigPath)
if err != nil {
return fmt.Errorf("can't build config from %s", kubeConfigPath)
}
} else {
homeDir, _ := os.UserHomeDir()
config, err = clientcmd.BuildConfigFromFlags("", homeDir+"/.kube/config")
if err != nil {
// creates the in-cluster config
config, err = rest.InClusterConfig()
if err != nil {
return fmt.Errorf("can't access kubenetes. Tried using config from: config parameter, %s env, homedir and InClusterConfig", kubeConfigEnvVariable)
}
}
}
config, err := LoadConfig(kubeConfigPath)
if err != nil {
return err
}

kubeClient, err := kubernetes.NewForConfig(config)
Expand All @@ -247,6 +225,33 @@ func (k *KubeData) InitFromConfig(kubeConfigPath string) error {
return nil
}

func LoadConfig(kubeConfigPath string) (*rest.Config, error) {
// if no config path is provided, load it from the env variable
if kubeConfigPath == "" {
kubeConfigPath = os.Getenv(kubeConfigEnvVariable)
}
// otherwise, load it from the $HOME/.kube/config file
if kubeConfigPath == "" {
homeDir, err := os.UserHomeDir()
if err != nil {
return nil, fmt.Errorf("can't get user home dir: %w", err)
}
kubeConfigPath = path.Join(homeDir, ".kube", "config")
}
config, err := clientcmd.BuildConfigFromFlags("", kubeConfigPath)
if err == nil {
return config, nil
}
// fallback: use in-cluster config
config, err = rest.InClusterConfig()
if err != nil {
return nil, fmt.Errorf("can't access kubenetes. Tried using config from: "+
"config parameter, %s env, homedir and InClusterConfig. Got: %w",
kubeConfigEnvVariable, err)
}
return config, nil
}

func (k *KubeData) initInformers(client kubernetes.Interface) error {
informerFactory := informers.NewSharedInformerFactory(client, syncTime)
err := k.NewNodeInformer(informerFactory)
Expand Down
Loading

0 comments on commit e7919b2

Please sign in to comment.