Skip to content

Commit

Permalink
Adding k8s unit tests and some refactoring (#578)
Browse files Browse the repository at this point in the history
* Adding k8s unit tests and some refactoring

- Add tests relative to enrichment
- move k8s stuff into k8s package
- Rename "KubeData" as "Informers"

* Fix informers mock

* Split kubernetes and informers as 2 different packages
  • Loading branch information
jotak authored Jan 29, 2024
1 parent 4289f5a commit 5025f53
Show file tree
Hide file tree
Showing 9 changed files with 527 additions and 219 deletions.
4 changes: 1 addition & 3 deletions cmd/flowlogs-pipeline/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,7 @@ func TestTheMain(t *testing.T) {

func TestPipelineConfigSetup(t *testing.T) {
// Kube init mock
kdata := new(kubernetes.KubeDataMock)
kdata.On("InitFromConfig", "").Return(nil)
kubernetes.Data = kdata
kubernetes.MockInformers()

js := `{
"PipeLine": "[{\"name\":\"grpc\"},{\"follows\":\"grpc\",\"name\":\"enrich\"},{\"follows\":\"enrich\",\"name\":\"loki\"},{\"follows\":\"enrich\",\"name\":\"prometheus\"}]",
Expand Down
2 changes: 2 additions & 0 deletions docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,8 @@ Following is the supported API format for network transformations:
inputs: entry inputs fields
output: entry output field
infra_prefixes: Namespace prefixes that will be tagged as infra
kubernetes: Kubernetes rule specific configuration
add_zone: If true the rule will add the zone
kubeConfigPath: path to kubeconfig file (optional)
servicesFile: path to services file (optional, default: /etc/services)
protocolsFile: path to protocols file (optional, default: /etc/protocols)
Expand Down
161 changes: 161 additions & 0 deletions pkg/pipeline/transform/kubernetes/enrich.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
package kubernetes

import (
"fmt"

"github.com/netobserv/flowlogs-pipeline/pkg/api"
"github.com/netobserv/flowlogs-pipeline/pkg/config"
inf "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/transform/kubernetes/informers"
"github.com/sirupsen/logrus"
)

var informers inf.InformersInterface = &inf.Informers{}

// For testing
func MockInformers() {
informers = inf.NewInformersMock()
}

func InitFromConfig(kubeConfigPath string) error {
return informers.InitFromConfig(kubeConfigPath)
}

func Enrich(outputEntry config.GenericMap, rule api.NetworkTransformRule) {
kubeInfo, err := informers.GetInfo(fmt.Sprintf("%s", outputEntry[rule.Input]))
if err != nil {
logrus.WithError(err).Tracef("can't find kubernetes info for IP %v", outputEntry[rule.Input])
return
}
if rule.Assignee != "otel" {
// NETOBSERV-666: avoid putting empty namespaces or Loki aggregation queries will
// differentiate between empty and nil namespaces.
if kubeInfo.Namespace != "" {
outputEntry[rule.Output+"_Namespace"] = kubeInfo.Namespace
}
outputEntry[rule.Output+"_Name"] = kubeInfo.Name
outputEntry[rule.Output+"_Type"] = kubeInfo.Type
outputEntry[rule.Output+"_OwnerName"] = kubeInfo.Owner.Name
outputEntry[rule.Output+"_OwnerType"] = kubeInfo.Owner.Type
if rule.Parameters != "" {
for labelKey, labelValue := range kubeInfo.Labels {
outputEntry[rule.Parameters+"_"+labelKey] = labelValue
}
}
if kubeInfo.HostIP != "" {
outputEntry[rule.Output+"_HostIP"] = kubeInfo.HostIP
if kubeInfo.HostName != "" {
outputEntry[rule.Output+"_HostName"] = kubeInfo.HostName
}
}
fillInK8sZone(outputEntry, rule, *kubeInfo, "_Zone")
} else {
// NOTE: Some of these fields are taken from opentelemetry specs.
// See https://opentelemetry.io/docs/specs/semconv/resource/k8s/
// Other fields (not specified in the specs) are named similarly
if kubeInfo.Namespace != "" {
outputEntry[rule.Output+"k8s.namespace.name"] = kubeInfo.Namespace
}
switch kubeInfo.Type {
case inf.TypeNode:
outputEntry[rule.Output+"k8s.node.name"] = kubeInfo.Name
outputEntry[rule.Output+"k8s.node.uid"] = kubeInfo.UID
case inf.TypePod:
outputEntry[rule.Output+"k8s.pod.name"] = kubeInfo.Name
outputEntry[rule.Output+"k8s.pod.uid"] = kubeInfo.UID
case inf.TypeService:
outputEntry[rule.Output+"k8s.service.name"] = kubeInfo.Name
outputEntry[rule.Output+"k8s.service.uid"] = kubeInfo.UID
}
outputEntry[rule.Output+"k8s.name"] = kubeInfo.Name
outputEntry[rule.Output+"k8s.type"] = kubeInfo.Type
outputEntry[rule.Output+"k8s.owner.name"] = kubeInfo.Owner.Name
outputEntry[rule.Output+"k8s.owner.type"] = kubeInfo.Owner.Type
if rule.Parameters != "" {
for labelKey, labelValue := range kubeInfo.Labels {
outputEntry[rule.Parameters+"."+labelKey] = labelValue
}
}
if kubeInfo.HostIP != "" {
outputEntry[rule.Output+"k8s.host.ip"] = kubeInfo.HostIP
if kubeInfo.HostName != "" {
outputEntry[rule.Output+"k8s.host.name"] = kubeInfo.HostName
}
}
fillInK8sZone(outputEntry, rule, *kubeInfo, "k8s.zone")
}
}

const nodeZoneLabelName = "topology.kubernetes.io/zone"

func fillInK8sZone(outputEntry config.GenericMap, rule api.NetworkTransformRule, kubeInfo inf.Info, zonePrefix string) {
if rule.Kubernetes == nil || !rule.Kubernetes.AddZone {
//Nothing to do
return
}
switch kubeInfo.Type {
case inf.TypeNode:
zone, ok := kubeInfo.Labels[nodeZoneLabelName]
if ok {
outputEntry[rule.Output+zonePrefix] = zone
}
return
case inf.TypePod:
nodeInfo, err := informers.GetNodeInfo(kubeInfo.HostName)
if err != nil {
logrus.WithError(err).Tracef("can't find nodes info for node %v", kubeInfo.HostName)
return
}
if nodeInfo != nil {
zone, ok := nodeInfo.Labels[nodeZoneLabelName]
if ok {
outputEntry[rule.Output+zonePrefix] = zone
}
}
return

case inf.TypeService:
//A service is not assigned to a dedicated zone, skipping
return
}
}

func EnrichLayer(outputEntry config.GenericMap, rule api.NetworkTransformRule) {
if rule.KubernetesInfra == nil {
logrus.Error("transformation rule: Missing Kubernetes Infra configuration ")
return
}
outputEntry[rule.KubernetesInfra.Output] = "infra"
for _, input := range rule.KubernetesInfra.Inputs {
if objectIsApp(fmt.Sprintf("%s", outputEntry[input]), rule.KubernetesInfra.InfraPrefix) {
outputEntry[rule.KubernetesInfra.Output] = "app"
return
}
}
}

const openshiftNamespacePrefix = "openshift-"
const openshiftPrefixLen = len(openshiftNamespacePrefix)

func objectIsApp(addr string, additionalInfraPrefix string) bool {
obj, err := informers.GetInfo(addr)
if err != nil {
logrus.WithError(err).Tracef("can't find kubernetes info for IP %s", addr)
return false
}
nsLen := len(obj.Namespace)
additionalPrefixLen := len(additionalInfraPrefix)
if nsLen == 0 {
return false
}
if nsLen >= openshiftPrefixLen && obj.Namespace[:openshiftPrefixLen] == openshiftNamespacePrefix {
return false
}
if nsLen >= additionalPrefixLen && obj.Namespace[:additionalPrefixLen] == additionalInfraPrefix {
return false
}
//Special case with openshift and kubernetes service in default namespace
if obj.Namespace == "default" && (obj.Name == "kubernetes" || obj.Name == "openshift") {
return false
}
return true
}
Loading

0 comments on commit 5025f53

Please sign in to comment.