Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use new FLP API to build pipeline #110

Merged
merged 2 commits into from
Jun 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
251 changes: 69 additions & 182 deletions controllers/flowlogspipeline/flp_objects.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package flowlogspipeline

import (
"encoding/json"
"fmt"
"hash/fnv"
"strconv"

"gopkg.in/yaml.v2"
"github.com/netobserv/flowlogs-pipeline/pkg/api"
"github.com/netobserv/flowlogs-pipeline/pkg/config"
"github.com/prometheus/common/model"
appsv1 "k8s.io/api/apps/v1"
ascv2 "k8s.io/api/autoscaling/v2beta2"
corev1 "k8s.io/api/core/v1"
Expand All @@ -21,7 +24,7 @@ import (
const configMapName = "flowlogs-pipeline-config"
const configVolume = "config-volume"
const configPath = "/etc/flowlogs-pipeline"
const configFile = "config.yaml"
const configFile = "config.json"

const (
healthServiceName = "health"
Expand Down Expand Up @@ -207,205 +210,89 @@ func (b *builder) podTemplate(hostNetwork bool, configDigest string) corev1.PodT
}
}

func (b *builder) getIngestConfig() ([]map[string]string, []map[string]interface{}) {
ingestStages := []map[string]string{
{"name": "ingest"},
{"name": "decode",
"follows": "ingest",
},
}
var ingest, decoder map[string]interface{}

if b.confKind == ConfKafkaTransformer {
ingest = map[string]interface{}{
"name": "ingest",
"ingest": map[string]interface{}{
"type": "kafka",
"kafka": map[string]interface{}{
"brokers": []string{b.desiredKafka.Address},
"topic": b.desiredKafka.Topic,
"groupid": b.confKind, // Without groupid, each message is delivered to each consumers
},
},
}
decoder = map[string]interface{}{
"name": "decode",
"decode": map[string]interface{}{
"type": "json",
},
}
} else if b.portProtocol == corev1.ProtocolUDP {
// UDP Port: IPFIX collector with JSON decoder
ingest = map[string]interface{}{
"name": "ingest",
"ingest": map[string]interface{}{
"type": "collector",
"collector": map[string]interface{}{
"port": b.desired.Port,
"hostname": "0.0.0.0",
},
},
}
decoder = map[string]interface{}{
"name": "decode",
"decode": map[string]interface{}{
"type": "json",
},
}
} else {
// TCP Port: GRPC collector (eBPF agent) with Protobuf decoder
ingest = map[string]interface{}{
"name": "ingest",
"ingest": map[string]interface{}{
"type": "grpc",
"grpc": map[string]interface{}{
"port": b.desired.Port,
},
},
}
decoder = map[string]interface{}{
"name": "decode",
"decode": map[string]interface{}{
"type": "protobuf",
},
}
}

return ingestStages, []map[string]interface{}{ingest, decoder}
}

func (b *builder) getTransformConfig() ([]map[string]string, []map[string]interface{}) {
transformStages := []map[string]string{
{"name": "enrich",
"follows": "decode",
},
{"name": "loki",
"follows": "enrich",
},
{"name": "aggregate",
"follows": "enrich",
},
{"name": "prometheus",
"follows": "aggregate",
},
}
var enrich, loki, aggregate, prometheus map[string]interface{}
func (b *builder) addTransformStages(lastStage *config.PipelineBuilderStage) {
// enrich stage (transform) configuration
enrichedStage := lastStage.TransformNetwork("enrich", api.TransformNetwork{
Rules: api.NetworkTransformRules{{
Input: "SrcAddr",
Output: "SrcK8S",
Type: api.AddKubernetesRuleType,
}, {
Input: "DstAddr",
Output: "DstK8S",
Type: api.AddKubernetesRuleType,
}},
})

// loki stage (write) configuration
lokiWrite := map[string]interface{}{
"type": "loki",
"labels": constants.LokiIndexFields,
lokiWrite := api.WriteLoki{
Labels: constants.LokiIndexFields,
}

if b.desiredLoki != nil {
lokiWrite["batchSize"] = b.desiredLoki.BatchSize
lokiWrite["batchWait"] = b.desiredLoki.BatchWait.ToUnstructured()
lokiWrite["maxBackoff"] = b.desiredLoki.MaxBackoff.ToUnstructured()
lokiWrite["maxRetries"] = b.desiredLoki.MaxRetries
lokiWrite["minBackoff"] = b.desiredLoki.MinBackoff.ToUnstructured()
lokiWrite["staticLabels"] = b.desiredLoki.StaticLabels
lokiWrite["timeout"] = b.desiredLoki.Timeout.ToUnstructured()
lokiWrite["url"] = b.desiredLoki.URL
lokiWrite["timestampLabel"] = "TimeFlowEndMs"
lokiWrite["timestampScale"] = "1ms"
}

loki = map[string]interface{}{"name": "loki",
"write": map[string]interface{}{
"type": "loki",
"loki": lokiWrite,
},
}

// enrich stage (transform) configuration
enrich = map[string]interface{}{
"name": "enrich",
"transform": map[string]interface{}{
"type": "network",
"network": map[string]interface{}{
"rules": []map[string]interface{}{
{
"input": "SrcAddr",
"output": "SrcK8S",
"type": "add_kubernetes",
},
{
"input": "DstAddr",
"output": "DstK8S",
"type": "add_kubernetes",
},
},
},
},
lokiWrite.BatchSize = int(b.desiredLoki.BatchSize)
lokiWrite.BatchWait = b.desiredLoki.BatchWait.ToUnstructured().(string)
lokiWrite.MaxBackoff = b.desiredLoki.MaxBackoff.ToUnstructured().(string)
lokiWrite.MaxRetries = int(b.desiredLoki.MaxRetries)
lokiWrite.MinBackoff = b.desiredLoki.MinBackoff.ToUnstructured().(string)
lokiWrite.StaticLabels = model.LabelSet{}
for k, v := range b.desiredLoki.StaticLabels {
lokiWrite.StaticLabels[model.LabelName(k)] = model.LabelValue(v)
}
lokiWrite.Timeout = b.desiredLoki.Timeout.ToUnstructured().(string)
lokiWrite.URL = b.desiredLoki.URL
lokiWrite.TimestampLabel = "TimeFlowEndMs"
lokiWrite.TimestampScale = "1ms"
}
enrichedStage.WriteLoki("loki", lokiWrite)

// prometheus stage (encode) configuration
prometheus = map[string]interface{}{
"name": "prometheus",
"encode": map[string]interface{}{
"type": "prom",
"prom": map[string]interface{}{
"port": b.desired.PrometheusPort,
"prefix": "flp_",
},
},
}

// aggregate stage (extract) configuration
aggregate = map[string]interface{}{"name": "aggregate",
"extract": map[string]interface{}{
"type": "aggregates",
},
}
return transformStages, []map[string]interface{}{enrich, loki, aggregate, prometheus}
agg := enrichedStage.Aggregate("aggregate", []api.AggregateDefinition{})
agg.EncodePrometheus("prometheus", api.PromEncode{
Port: int(b.desired.PrometheusPort),
Prefix: "flp_",
})
}

func (b *builder) getKafkaConfig() ([]map[string]string, []map[string]interface{}) {
kafkaStages := []map[string]string{
{"name": "kafka",
"follows": "decode",
},
}
kafka := map[string]interface{}{
"name": "kafka",
"encode": map[string]interface{}{
"type": "kafka",
"kafka": map[string]interface{}{
"address": b.desiredKafka.Address,
"topic": b.desiredKafka.Topic,
},
},
func (b *builder) buildPipelineConfig() ([]config.Stage, []config.StageParam) {
var pipeline config.PipelineBuilderStage
if b.confKind == ConfKafkaTransformer {
pipeline = config.NewKafkaPipeline("ingest", api.IngestKafka{
Brokers: []string{b.desiredKafka.Address},
Topic: b.desiredKafka.Topic,
GroupId: b.confKind, // Without groupid, each message is delivered to each consumers
})
pipeline = pipeline.DecodeJSON("decode")
} else if b.portProtocol == corev1.ProtocolUDP {
// UDP Port: IPFIX collector with JSON decoder
pipeline = config.NewCollectorPipeline("ingest", api.IngestCollector{
Port: int(b.desired.Port),
HostName: "0.0.0.0",
})
pipeline = pipeline.DecodeJSON("decode")
} else {
// TCP Port: GRPC collector (eBPF agent) with Protobuf decoder
pipeline = config.NewGRPCPipeline("ingest", api.IngestGRPCProto{
Port: int(b.desired.Port),
})
pipeline = pipeline.DecodeProtobuf("decode")
}

return kafkaStages, []map[string]interface{}{kafka}
}

func (b *builder) getPipelinesConfig() ([]map[string]string, []map[string]interface{}) {
stages := []map[string]string{}
parameters := []map[string]interface{}{}

ingestStages, ingestParameters := b.getIngestConfig()
stages = append(stages, ingestStages...)
parameters = append(parameters, ingestParameters...)

if b.confKind == ConfKafkaIngester {
kafkaStages, kafkaParameters := b.getKafkaConfig()
stages = append(stages, kafkaStages...)
parameters = append(parameters, kafkaParameters...)

pipeline = pipeline.EncodeKafka("kafka", api.EncodeKafka{
Address: b.desiredKafka.Address,
Topic: b.desiredKafka.Topic,
})
} else {
transformStages, transformParameters := b.getTransformConfig()
stages = append(stages, transformStages...)
parameters = append(parameters, transformParameters...)
b.addTransformStages(&pipeline)
}
return stages, parameters
return pipeline.GetStages(), pipeline.GetStageParams()
}

// returns a configmap with a digest of its configuration contents, which will be used to
// detect any configuration change
func (b *builder) configMap() (*corev1.ConfigMap, string) {

stages, parameters := b.getPipelinesConfig()
stages, parameters := b.buildPipelineConfig()

config := map[string]interface{}{
"log-level": b.desired.LogLevel,
Expand All @@ -417,7 +304,7 @@ func (b *builder) configMap() (*corev1.ConfigMap, string) {
}

configStr := "{}"
bs, err := yaml.Marshal(config)
bs, err := json.Marshal(config)
if err == nil {
configStr = string(bs)
}
Expand Down
Loading