From 5025f535b14b16a3d0e430bdeb4ea7c8ae965c22 Mon Sep 17 00:00:00 2001 From: Joel Takvorian Date: Mon, 29 Jan 2024 15:50:51 +0100 Subject: [PATCH] Adding k8s unit tests and some refactoring (#578) * Adding k8s unit tests and some refactoring - Add tests relative to enrichment - move k8s stuff into k8s package - Rename "KubeData" as "Informers" * Fix informers mock * Split kubernetes and informers as 2 different packages --- cmd/flowlogs-pipeline/main_test.go | 4 +- docs/api.md | 2 + pkg/pipeline/transform/kubernetes/enrich.go | 161 ++++++++++ .../transform/kubernetes/enrich_test.go | 290 ++++++++++++++++++ .../informers-mock.go} | 51 ++- .../{kubernetes.go => informers/informers.go} | 43 +-- .../informers_test.go} | 4 +- pkg/pipeline/transform/transform_network.go | 146 +-------- .../transform/transform_network_test.go | 45 --- 9 files changed, 527 insertions(+), 219 deletions(-) create mode 100644 pkg/pipeline/transform/kubernetes/enrich.go create mode 100644 pkg/pipeline/transform/kubernetes/enrich_test.go rename pkg/pipeline/transform/kubernetes/{kubernetes-mock.go => informers/informers-mock.go} (72%) rename pkg/pipeline/transform/kubernetes/{kubernetes.go => informers/informers.go} (89%) rename pkg/pipeline/transform/kubernetes/{kubernetes_test.go => informers/informers_test.go} (98%) diff --git a/cmd/flowlogs-pipeline/main_test.go b/cmd/flowlogs-pipeline/main_test.go index 73319175c..fca40f8e9 100644 --- a/cmd/flowlogs-pipeline/main_test.go +++ b/cmd/flowlogs-pipeline/main_test.go @@ -46,9 +46,7 @@ func TestTheMain(t *testing.T) { func TestPipelineConfigSetup(t *testing.T) { // Kube init mock - kdata := new(kubernetes.KubeDataMock) - kdata.On("InitFromConfig", "").Return(nil) - kubernetes.Data = kdata + kubernetes.MockInformers() js := `{ "PipeLine": "[{\"name\":\"grpc\"},{\"follows\":\"grpc\",\"name\":\"enrich\"},{\"follows\":\"enrich\",\"name\":\"loki\"},{\"follows\":\"enrich\",\"name\":\"prometheus\"}]", diff --git a/docs/api.md b/docs/api.md index e8e503d73..7911efcc3 100644 --- a/docs/api.md +++ b/docs/api.md @@ -187,6 +187,8 @@ Following is the supported API format for network transformations: inputs: entry inputs fields output: entry output field infra_prefixes: Namespace prefixes that will be tagged as infra + kubernetes: Kubernetes rule specific configuration + add_zone: If true the rule will add the zone kubeConfigPath: path to kubeconfig file (optional) servicesFile: path to services file (optional, default: /etc/services) protocolsFile: path to protocols file (optional, default: /etc/protocols) diff --git a/pkg/pipeline/transform/kubernetes/enrich.go b/pkg/pipeline/transform/kubernetes/enrich.go new file mode 100644 index 000000000..7e220c462 --- /dev/null +++ b/pkg/pipeline/transform/kubernetes/enrich.go @@ -0,0 +1,161 @@ +package kubernetes + +import ( + "fmt" + + "github.com/netobserv/flowlogs-pipeline/pkg/api" + "github.com/netobserv/flowlogs-pipeline/pkg/config" + inf "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/transform/kubernetes/informers" + "github.com/sirupsen/logrus" +) + +var informers inf.InformersInterface = &inf.Informers{} + +// For testing +func MockInformers() { + informers = inf.NewInformersMock() +} + +func InitFromConfig(kubeConfigPath string) error { + return informers.InitFromConfig(kubeConfigPath) +} + +func Enrich(outputEntry config.GenericMap, rule api.NetworkTransformRule) { + kubeInfo, err := informers.GetInfo(fmt.Sprintf("%s", outputEntry[rule.Input])) + if err != nil { + logrus.WithError(err).Tracef("can't find kubernetes info for IP %v", outputEntry[rule.Input]) + return + } + if rule.Assignee != "otel" { + // NETOBSERV-666: avoid putting empty namespaces or Loki aggregation queries will + // differentiate between empty and nil namespaces. + if kubeInfo.Namespace != "" { + outputEntry[rule.Output+"_Namespace"] = kubeInfo.Namespace + } + outputEntry[rule.Output+"_Name"] = kubeInfo.Name + outputEntry[rule.Output+"_Type"] = kubeInfo.Type + outputEntry[rule.Output+"_OwnerName"] = kubeInfo.Owner.Name + outputEntry[rule.Output+"_OwnerType"] = kubeInfo.Owner.Type + if rule.Parameters != "" { + for labelKey, labelValue := range kubeInfo.Labels { + outputEntry[rule.Parameters+"_"+labelKey] = labelValue + } + } + if kubeInfo.HostIP != "" { + outputEntry[rule.Output+"_HostIP"] = kubeInfo.HostIP + if kubeInfo.HostName != "" { + outputEntry[rule.Output+"_HostName"] = kubeInfo.HostName + } + } + fillInK8sZone(outputEntry, rule, *kubeInfo, "_Zone") + } else { + // NOTE: Some of these fields are taken from opentelemetry specs. + // See https://opentelemetry.io/docs/specs/semconv/resource/k8s/ + // Other fields (not specified in the specs) are named similarly + if kubeInfo.Namespace != "" { + outputEntry[rule.Output+"k8s.namespace.name"] = kubeInfo.Namespace + } + switch kubeInfo.Type { + case inf.TypeNode: + outputEntry[rule.Output+"k8s.node.name"] = kubeInfo.Name + outputEntry[rule.Output+"k8s.node.uid"] = kubeInfo.UID + case inf.TypePod: + outputEntry[rule.Output+"k8s.pod.name"] = kubeInfo.Name + outputEntry[rule.Output+"k8s.pod.uid"] = kubeInfo.UID + case inf.TypeService: + outputEntry[rule.Output+"k8s.service.name"] = kubeInfo.Name + outputEntry[rule.Output+"k8s.service.uid"] = kubeInfo.UID + } + outputEntry[rule.Output+"k8s.name"] = kubeInfo.Name + outputEntry[rule.Output+"k8s.type"] = kubeInfo.Type + outputEntry[rule.Output+"k8s.owner.name"] = kubeInfo.Owner.Name + outputEntry[rule.Output+"k8s.owner.type"] = kubeInfo.Owner.Type + if rule.Parameters != "" { + for labelKey, labelValue := range kubeInfo.Labels { + outputEntry[rule.Parameters+"."+labelKey] = labelValue + } + } + if kubeInfo.HostIP != "" { + outputEntry[rule.Output+"k8s.host.ip"] = kubeInfo.HostIP + if kubeInfo.HostName != "" { + outputEntry[rule.Output+"k8s.host.name"] = kubeInfo.HostName + } + } + fillInK8sZone(outputEntry, rule, *kubeInfo, "k8s.zone") + } +} + +const nodeZoneLabelName = "topology.kubernetes.io/zone" + +func fillInK8sZone(outputEntry config.GenericMap, rule api.NetworkTransformRule, kubeInfo inf.Info, zonePrefix string) { + if rule.Kubernetes == nil || !rule.Kubernetes.AddZone { + //Nothing to do + return + } + switch kubeInfo.Type { + case inf.TypeNode: + zone, ok := kubeInfo.Labels[nodeZoneLabelName] + if ok { + outputEntry[rule.Output+zonePrefix] = zone + } + return + case inf.TypePod: + nodeInfo, err := informers.GetNodeInfo(kubeInfo.HostName) + if err != nil { + logrus.WithError(err).Tracef("can't find nodes info for node %v", kubeInfo.HostName) + return + } + if nodeInfo != nil { + zone, ok := nodeInfo.Labels[nodeZoneLabelName] + if ok { + outputEntry[rule.Output+zonePrefix] = zone + } + } + return + + case inf.TypeService: + //A service is not assigned to a dedicated zone, skipping + return + } +} + +func EnrichLayer(outputEntry config.GenericMap, rule api.NetworkTransformRule) { + if rule.KubernetesInfra == nil { + logrus.Error("transformation rule: Missing Kubernetes Infra configuration ") + return + } + outputEntry[rule.KubernetesInfra.Output] = "infra" + for _, input := range rule.KubernetesInfra.Inputs { + if objectIsApp(fmt.Sprintf("%s", outputEntry[input]), rule.KubernetesInfra.InfraPrefix) { + outputEntry[rule.KubernetesInfra.Output] = "app" + return + } + } +} + +const openshiftNamespacePrefix = "openshift-" +const openshiftPrefixLen = len(openshiftNamespacePrefix) + +func objectIsApp(addr string, additionalInfraPrefix string) bool { + obj, err := informers.GetInfo(addr) + if err != nil { + logrus.WithError(err).Tracef("can't find kubernetes info for IP %s", addr) + return false + } + nsLen := len(obj.Namespace) + additionalPrefixLen := len(additionalInfraPrefix) + if nsLen == 0 { + return false + } + if nsLen >= openshiftPrefixLen && obj.Namespace[:openshiftPrefixLen] == openshiftNamespacePrefix { + return false + } + if nsLen >= additionalPrefixLen && obj.Namespace[:additionalPrefixLen] == additionalInfraPrefix { + return false + } + //Special case with openshift and kubernetes service in default namespace + if obj.Namespace == "default" && (obj.Name == "kubernetes" || obj.Name == "openshift") { + return false + } + return true +} diff --git a/pkg/pipeline/transform/kubernetes/enrich_test.go b/pkg/pipeline/transform/kubernetes/enrich_test.go new file mode 100644 index 000000000..a3e7a4658 --- /dev/null +++ b/pkg/pipeline/transform/kubernetes/enrich_test.go @@ -0,0 +1,290 @@ +package kubernetes + +import ( + "testing" + + "github.com/netobserv/flowlogs-pipeline/pkg/api" + "github.com/netobserv/flowlogs-pipeline/pkg/config" + inf "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/transform/kubernetes/informers" + "github.com/stretchr/testify/assert" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" +) + +var info = map[string]*inf.Info{ + "1.2.3.4": nil, + "10.0.0.1": { + ObjectMeta: v1.ObjectMeta{ + Name: "pod-1", + Namespace: "ns-1", + }, + Type: "Pod", + HostName: "host-1", + HostIP: "100.0.0.1", + }, + "10.0.0.2": { + ObjectMeta: v1.ObjectMeta{ + Name: "pod-2", + Namespace: "ns-2", + }, + Type: "Pod", + HostName: "host-2", + HostIP: "100.0.0.2", + }, + "20.0.0.1": { + ObjectMeta: v1.ObjectMeta{ + Name: "service-1", + Namespace: "ns-1", + }, + Type: "Service", + }, +} + +var nodes = map[string]*inf.Info{ + "host-1": { + ObjectMeta: v1.ObjectMeta{ + Name: "host-1", + Labels: map[string]string{ + nodeZoneLabelName: "us-east-1a", + }, + }, + Type: "Node", + }, + "host-2": { + ObjectMeta: v1.ObjectMeta{ + Name: "host-2", + Labels: map[string]string{ + nodeZoneLabelName: "us-east-1b", + }, + }, + Type: "Node", + }, +} + +var rules = api.NetworkTransformRules{ + { + Type: api.OpAddKubernetes, + Input: "SrcAddr", + Output: "SrcK8s", + Kubernetes: &api.K8sRule{ + AddZone: true, + }, + }, + { + Type: api.OpAddKubernetes, + Input: "DstAddr", + Output: "DstK8s", + Kubernetes: &api.K8sRule{ + AddZone: true, + }, + }, +} + +func TestEnrich(t *testing.T) { + informers = inf.SetupStubs(info, nodes) + + // Pod to unknown + entry := config.GenericMap{ + "SrcAddr": "10.0.0.1", // pod-1 + "DstAddr": "42.42.42.42", // unknown + } + for _, r := range rules { + Enrich(entry, r) + } + assert.Equal(t, config.GenericMap{ + "DstAddr": "42.42.42.42", + "SrcAddr": "10.0.0.1", + "SrcK8s_HostIP": "100.0.0.1", + "SrcK8s_HostName": "host-1", + "SrcK8s_Name": "pod-1", + "SrcK8s_Namespace": "ns-1", + "SrcK8s_OwnerName": "", + "SrcK8s_OwnerType": "", + "SrcK8s_Type": "Pod", + "SrcK8s_Zone": "us-east-1a", + }, entry) + + // Pod to pod + entry = config.GenericMap{ + "SrcAddr": "10.0.0.1", // pod-1 + "DstAddr": "10.0.0.2", // pod-2 + } + for _, r := range rules { + Enrich(entry, r) + } + assert.Equal(t, config.GenericMap{ + "DstAddr": "10.0.0.2", + "DstK8s_HostIP": "100.0.0.2", + "DstK8s_HostName": "host-2", + "DstK8s_Name": "pod-2", + "DstK8s_Namespace": "ns-2", + "DstK8s_OwnerName": "", + "DstK8s_OwnerType": "", + "DstK8s_Type": "Pod", + "DstK8s_Zone": "us-east-1b", + "SrcAddr": "10.0.0.1", + "SrcK8s_HostIP": "100.0.0.1", + "SrcK8s_HostName": "host-1", + "SrcK8s_Name": "pod-1", + "SrcK8s_Namespace": "ns-1", + "SrcK8s_OwnerName": "", + "SrcK8s_OwnerType": "", + "SrcK8s_Type": "Pod", + "SrcK8s_Zone": "us-east-1a", + }, entry) + + // Pod to service + entry = config.GenericMap{ + "SrcAddr": "10.0.0.2", // pod-2 + "DstAddr": "20.0.0.1", // service-1 + } + for _, r := range rules { + Enrich(entry, r) + } + assert.Equal(t, config.GenericMap{ + "DstAddr": "20.0.0.1", + "DstK8s_Name": "service-1", + "DstK8s_Namespace": "ns-1", + "DstK8s_OwnerName": "", + "DstK8s_OwnerType": "", + "DstK8s_Type": "Service", + "SrcAddr": "10.0.0.2", + "SrcK8s_HostIP": "100.0.0.2", + "SrcK8s_HostName": "host-2", + "SrcK8s_Name": "pod-2", + "SrcK8s_Namespace": "ns-2", + "SrcK8s_OwnerName": "", + "SrcK8s_OwnerType": "", + "SrcK8s_Type": "Pod", + "SrcK8s_Zone": "us-east-1b", + }, entry) +} + +var otelRules = api.NetworkTransformRules{ + { + Type: api.OpAddKubernetes, + Input: "source.ip", + Output: "source.", + Assignee: "otel", + Kubernetes: &api.K8sRule{ + AddZone: true, + }, + }, + { + Type: api.OpAddKubernetes, + Input: "destination.ip", + Output: "destination.", + Assignee: "otel", + Kubernetes: &api.K8sRule{ + AddZone: true, + }, + }, +} + +func TestEnrich_Otel(t *testing.T) { + informers = inf.SetupStubs(info, nodes) + + // Pod to unknown + entry := config.GenericMap{ + "source.ip": "10.0.0.1", // pod-1 + "destination.ip": "42.42.42.42", // unknown + } + for _, r := range otelRules { + Enrich(entry, r) + } + assert.Equal(t, config.GenericMap{ + "destination.ip": "42.42.42.42", + "source.ip": "10.0.0.1", + "source.k8s.host.ip": "100.0.0.1", + "source.k8s.host.name": "host-1", + "source.k8s.name": "pod-1", + "source.k8s.namespace.name": "ns-1", + "source.k8s.pod.name": "pod-1", + "source.k8s.pod.uid": types.UID(""), + "source.k8s.owner.name": "", + "source.k8s.owner.type": "", + "source.k8s.type": "Pod", + "source.k8s.zone": "us-east-1a", + }, entry) + + // Pod to pod + entry = config.GenericMap{ + "source.ip": "10.0.0.1", // pod-1 + "destination.ip": "10.0.0.2", // pod-2 + } + for _, r := range otelRules { + Enrich(entry, r) + } + assert.Equal(t, config.GenericMap{ + "destination.ip": "10.0.0.2", + "destination.k8s.host.ip": "100.0.0.2", + "destination.k8s.host.name": "host-2", + "destination.k8s.name": "pod-2", + "destination.k8s.namespace.name": "ns-2", + "destination.k8s.pod.name": "pod-2", + "destination.k8s.pod.uid": types.UID(""), + "destination.k8s.owner.name": "", + "destination.k8s.owner.type": "", + "destination.k8s.type": "Pod", + "destination.k8s.zone": "us-east-1b", + "source.ip": "10.0.0.1", + "source.k8s.host.ip": "100.0.0.1", + "source.k8s.host.name": "host-1", + "source.k8s.name": "pod-1", + "source.k8s.namespace.name": "ns-1", + "source.k8s.pod.name": "pod-1", + "source.k8s.pod.uid": types.UID(""), + "source.k8s.owner.name": "", + "source.k8s.owner.type": "", + "source.k8s.type": "Pod", + "source.k8s.zone": "us-east-1a", + }, entry) + + // Pod to service + entry = config.GenericMap{ + "source.ip": "10.0.0.2", // pod-2 + "destination.ip": "20.0.0.1", // service-1 + } + for _, r := range otelRules { + Enrich(entry, r) + } + assert.Equal(t, config.GenericMap{ + "destination.ip": "20.0.0.1", + "destination.k8s.name": "service-1", + "destination.k8s.namespace.name": "ns-1", + "destination.k8s.service.name": "service-1", + "destination.k8s.service.uid": types.UID(""), + "destination.k8s.owner.name": "", + "destination.k8s.owner.type": "", + "destination.k8s.type": "Service", + "source.ip": "10.0.0.2", + "source.k8s.host.ip": "100.0.0.2", + "source.k8s.host.name": "host-2", + "source.k8s.name": "pod-2", + "source.k8s.namespace.name": "ns-2", + "source.k8s.pod.name": "pod-2", + "source.k8s.pod.uid": types.UID(""), + "source.k8s.owner.name": "", + "source.k8s.owner.type": "", + "source.k8s.type": "Pod", + "source.k8s.zone": "us-east-1b", + }, entry) +} + +func TestEnrich_EmptyNamespace(t *testing.T) { + informers = inf.SetupStubs(info, nodes) + + // We need to check that, whether it returns NotFound or just an empty namespace, + // there is no map entry for that namespace (an empty-valued map entry is not valid) + entry := config.GenericMap{ + "SrcAddr": "1.2.3.4", // would return an empty namespace + "DstAddr": "3.2.1.0", // would return NotFound + } + + for _, r := range rules { + Enrich(entry, r) + } + + assert.NotContains(t, entry, "SrcK8s_Namespace") + assert.NotContains(t, entry, "DstK8s_Namespace") +} diff --git a/pkg/pipeline/transform/kubernetes/kubernetes-mock.go b/pkg/pipeline/transform/kubernetes/informers/informers-mock.go similarity index 72% rename from pkg/pipeline/transform/kubernetes/kubernetes-mock.go rename to pkg/pipeline/transform/kubernetes/informers/informers-mock.go index 4d72405ce..9d6307308 100644 --- a/pkg/pipeline/transform/kubernetes/kubernetes-mock.go +++ b/pkg/pipeline/transform/kubernetes/informers/informers-mock.go @@ -1,17 +1,25 @@ -package kubernetes +package informers import ( + "errors" + "github.com/stretchr/testify/mock" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/tools/cache" ) -type KubeDataMock struct { +type InformersMock struct { mock.Mock - kubeDataInterface + InformersInterface +} + +func NewInformersMock() *InformersMock { + inf := new(InformersMock) + inf.On("InitFromConfig", mock.Anything).Return(nil) + return inf } -func (o *KubeDataMock) InitFromConfig(kubeConfigPath string) error { +func (o *InformersMock) InitFromConfig(kubeConfigPath string) error { args := o.Called(kubeConfigPath) return args.Error(0) } @@ -94,7 +102,7 @@ func (m *IndexerMock) FallbackNotFound() { m.On("ByIndex", IndexIP, mock.Anything).Return([]interface{}{}, nil) } -func SetupIndexerMocks(kd *KubeData) (pods, nodes, svc, rs *IndexerMock) { +func SetupIndexerMocks(kd *Informers) (pods, nodes, svc, rs *IndexerMock) { // pods informer pods = &IndexerMock{} pim := InformerMock{} @@ -117,3 +125,36 @@ func SetupIndexerMocks(kd *KubeData) (pods, nodes, svc, rs *IndexerMock) { kd.replicaSets = &rim return } + +type FakeInformers struct { + InformersInterface + info map[string]*Info + nodes map[string]*Info +} + +func SetupStubs(info map[string]*Info, nodes map[string]*Info) *FakeInformers { + return &FakeInformers{ + info: info, + nodes: nodes, + } +} + +func (f *FakeInformers) InitFromConfig(_ string) error { + return nil +} + +func (f *FakeInformers) GetInfo(n string) (*Info, error) { + i := f.info[n] + if i != nil { + return i, nil + } + return nil, errors.New("notFound") +} + +func (f *FakeInformers) GetNodeInfo(n string) (*Info, error) { + i := f.nodes[n] + if i != nil { + return i, nil + } + return nil, errors.New("notFound") +} diff --git a/pkg/pipeline/transform/kubernetes/kubernetes.go b/pkg/pipeline/transform/kubernetes/informers/informers.go similarity index 89% rename from pkg/pipeline/transform/kubernetes/kubernetes.go rename to pkg/pipeline/transform/kubernetes/informers/informers.go index 1e93e644c..344db71d1 100644 --- a/pkg/pipeline/transform/kubernetes/kubernetes.go +++ b/pkg/pipeline/transform/kubernetes/informers/informers.go @@ -15,7 +15,7 @@ * */ -package kubernetes +package informers import ( "fmt" @@ -25,12 +25,12 @@ import ( "time" "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/transform/kubernetes/cni" + "github.com/sirupsen/logrus" - log "github.com/sirupsen/logrus" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/client-go/informers" + inf "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/client-go/metadata" "k8s.io/client-go/metadata/metadatainformer" @@ -39,8 +39,6 @@ import ( "k8s.io/client-go/tools/clientcmd" ) -var Data kubeDataInterface = &KubeData{} - const ( kubeConfigEnvVariable = "KUBECONFIG" syncTime = 10 * time.Minute @@ -50,13 +48,16 @@ const ( TypeService = "Service" ) -type kubeDataInterface interface { +var log = logrus.WithField("component", "transform.Network.Kubernetes") + +type InformersInterface interface { GetInfo(string) (*Info, error) GetNodeInfo(string) (*Info, error) InitFromConfig(string) error } -type KubeData struct { +type Informers struct { + InformersInterface // pods, nodes and services cache the different object types as *Info pointers pods cache.SharedIndexInformer nodes cache.SharedIndexInformer @@ -93,7 +94,7 @@ var commonIndexers = map[string]cache.IndexFunc{ }, } -func (k *KubeData) GetInfo(ip string) (*Info, error) { +func (k *Informers) GetInfo(ip string) (*Info, error) { if info, ok := k.fetchInformers(ip); ok { // Owner data might be discovered after the owned, so we fetch it // at the last moment @@ -106,7 +107,7 @@ func (k *KubeData) GetInfo(ip string) (*Info, error) { return nil, fmt.Errorf("informers can't find IP %s", ip) } -func (k *KubeData) fetchInformers(ip string) (*Info, bool) { +func (k *Informers) fetchInformers(ip string) (*Info, bool) { if info, ok := infoForIP(k.pods.GetIndexer(), ip); ok { // it might happen that the Host is discovered after the Pod if info.HostName == "" { @@ -135,7 +136,7 @@ func infoForIP(idx cache.Indexer, ip string) (*Info, bool) { return objs[0].(*Info), true } -func (k *KubeData) GetNodeInfo(name string) (*Info, error) { +func (k *Informers) GetNodeInfo(name string) (*Info, error) { item, ok, err := k.nodes.GetIndexer().GetByKey(name) if err != nil { return nil, err @@ -145,7 +146,7 @@ func (k *KubeData) GetNodeInfo(name string) (*Info, error) { return nil, nil } -func (k *KubeData) getOwner(info *Info) Owner { +func (k *Informers) getOwner(info *Info) Owner { if len(info.OwnerReferences) != 0 { ownerReference := info.OwnerReferences[0] if ownerReference.Kind != "ReplicaSet" { @@ -176,7 +177,7 @@ func (k *KubeData) getOwner(info *Info) Owner { } } -func (k *KubeData) getHostName(hostIP string) string { +func (k *Informers) getHostName(hostIP string) string { if hostIP != "" { if info, ok := infoForIP(k.nodes.GetIndexer(), hostIP); ok { return info.Name @@ -185,7 +186,7 @@ func (k *KubeData) getHostName(hostIP string) string { return "" } -func (k *KubeData) initNodeInformer(informerFactory informers.SharedInformerFactory) error { +func (k *Informers) initNodeInformer(informerFactory inf.SharedInformerFactory) error { nodes := informerFactory.Core().V1().Nodes().Informer() // Transform any *v1.Node instance into a *Info instance to save space // in the informer's cache @@ -232,7 +233,7 @@ func (k *KubeData) initNodeInformer(informerFactory informers.SharedInformerFact return nil } -func (k *KubeData) initPodInformer(informerFactory informers.SharedInformerFactory) error { +func (k *Informers) initPodInformer(informerFactory inf.SharedInformerFactory) error { pods := informerFactory.Core().V1().Pods().Informer() // Transform any *v1.Pod instance into a *Info instance to save space // in the informer's cache @@ -271,7 +272,7 @@ func (k *KubeData) initPodInformer(informerFactory informers.SharedInformerFacto return nil } -func (k *KubeData) initServiceInformer(informerFactory informers.SharedInformerFactory) error { +func (k *Informers) initServiceInformer(informerFactory inf.SharedInformerFactory) error { services := informerFactory.Core().V1().Services().Informer() // Transform any *v1.Service instance into a *Info instance to save space // in the informer's cache @@ -307,7 +308,7 @@ func (k *KubeData) initServiceInformer(informerFactory informers.SharedInformerF return nil } -func (k *KubeData) initReplicaSetInformer(informerFactory metadatainformer.SharedInformerFactory) error { +func (k *Informers) initReplicaSetInformer(informerFactory metadatainformer.SharedInformerFactory) error { k.replicaSets = informerFactory.ForResource( schema.GroupVersionResource{ Group: "apps", @@ -332,12 +333,12 @@ func (k *KubeData) initReplicaSetInformer(informerFactory metadatainformer.Share return nil } -func (k *KubeData) InitFromConfig(kubeConfigPath string) error { +func (k *Informers) InitFromConfig(kubeConfigPath string) error { // Initialization variables k.stopChan = make(chan struct{}) k.mdStopChan = make(chan struct{}) - config, err := LoadConfig(kubeConfigPath) + config, err := loadConfig(kubeConfigPath) if err != nil { return err } @@ -360,7 +361,7 @@ func (k *KubeData) InitFromConfig(kubeConfigPath string) error { return nil } -func LoadConfig(kubeConfigPath string) (*rest.Config, error) { +func loadConfig(kubeConfigPath string) (*rest.Config, error) { // if no config path is provided, load it from the env variable if kubeConfigPath == "" { kubeConfigPath = os.Getenv(kubeConfigEnvVariable) @@ -387,8 +388,8 @@ func LoadConfig(kubeConfigPath string) (*rest.Config, error) { return config, nil } -func (k *KubeData) initInformers(client kubernetes.Interface, metaClient metadata.Interface) error { - informerFactory := informers.NewSharedInformerFactory(client, syncTime) +func (k *Informers) initInformers(client kubernetes.Interface, metaClient metadata.Interface) error { + informerFactory := inf.NewSharedInformerFactory(client, syncTime) metadataInformerFactory := metadatainformer.NewSharedInformerFactory(metaClient, syncTime) err := k.initNodeInformer(informerFactory) if err != nil { diff --git a/pkg/pipeline/transform/kubernetes/kubernetes_test.go b/pkg/pipeline/transform/kubernetes/informers/informers_test.go similarity index 98% rename from pkg/pipeline/transform/kubernetes/kubernetes_test.go rename to pkg/pipeline/transform/kubernetes/informers/informers_test.go index 2def5970c..005897554 100644 --- a/pkg/pipeline/transform/kubernetes/kubernetes_test.go +++ b/pkg/pipeline/transform/kubernetes/informers/informers_test.go @@ -15,7 +15,7 @@ * */ -package kubernetes +package informers import ( "testing" @@ -25,7 +25,7 @@ import ( ) func TestGetInfo(t *testing.T) { - kubeData := KubeData{} + kubeData := Informers{} pidx, hidx, sidx, ridx := SetupIndexerMocks(&kubeData) pidx.MockPod("1.2.3.4", "pod1", "podNamespace", "10.0.0.1", nil) pidx.MockPod("1.2.3.5", "pod2", "podNamespace", "10.0.0.1", &Owner{Name: "rs1", Type: "ReplicaSet"}) diff --git a/pkg/pipeline/transform/transform_network.go b/pkg/pipeline/transform/transform_network.go index 597a8b9e5..f6c4ea8ee 100644 --- a/pkg/pipeline/transform/transform_network.go +++ b/pkg/pipeline/transform/transform_network.go @@ -98,9 +98,9 @@ func (n *Network) Transform(inputEntry config.GenericMap) (config.GenericMap, bo } outputEntry[rule.Output] = serviceName case api.OpAddKubernetes: - fillInK8s(outputEntry, rule) + kubernetes.Enrich(outputEntry, rule) case api.OpAddKubernetesInfra: - fillInK8sInfra(outputEntry, rule) + kubernetes.EnrichLayer(outputEntry, rule) case api.OpReinterpretDirection: reinterpretDirection(outputEntry, &n.DirectionInfo) case api.OpAddIPCategory: @@ -121,146 +121,6 @@ func (n *Network) Transform(inputEntry config.GenericMap) (config.GenericMap, bo return outputEntry, true } -func fillInK8s(outputEntry config.GenericMap, rule api.NetworkTransformRule) { - kubeInfo, err := kubernetes.Data.GetInfo(fmt.Sprintf("%s", outputEntry[rule.Input])) - if err != nil { - logrus.WithError(err).Tracef("can't find kubernetes info for IP %v", outputEntry[rule.Input]) - return - } - if rule.Assignee != "otel" { - // NETOBSERV-666: avoid putting empty namespaces or Loki aggregation queries will - // differentiate between empty and nil namespaces. - if kubeInfo.Namespace != "" { - outputEntry[rule.Output+"_Namespace"] = kubeInfo.Namespace - } - outputEntry[rule.Output+"_Name"] = kubeInfo.Name - outputEntry[rule.Output+"_Type"] = kubeInfo.Type - outputEntry[rule.Output+"_OwnerName"] = kubeInfo.Owner.Name - outputEntry[rule.Output+"_OwnerType"] = kubeInfo.Owner.Type - if rule.Parameters != "" { - for labelKey, labelValue := range kubeInfo.Labels { - outputEntry[rule.Parameters+"_"+labelKey] = labelValue - } - } - if kubeInfo.HostIP != "" { - outputEntry[rule.Output+"_HostIP"] = kubeInfo.HostIP - if kubeInfo.HostName != "" { - outputEntry[rule.Output+"_HostName"] = kubeInfo.HostName - } - } - fillInK8sZone(outputEntry, rule, *kubeInfo, "_Zone") - } else { - // NOTE: Some of these fields are taken from opentelemetry specs. - // See https://opentelemetry.io/docs/specs/semconv/resource/k8s/ - // Other fields (not specified in the specs) are named similarly - if kubeInfo.Namespace != "" { - outputEntry[rule.Output+"k8s.namespace.name"] = kubeInfo.Namespace - } - switch kubeInfo.Type { - case kubernetes.TypeNode: - outputEntry[rule.Output+"k8s.node.name"] = kubeInfo.Name - outputEntry[rule.Output+"k8s.node.uid"] = kubeInfo.UID - case kubernetes.TypePod: - outputEntry[rule.Output+"k8s.pod.name"] = kubeInfo.Name - outputEntry[rule.Output+"k8s.pod.uid"] = kubeInfo.UID - case kubernetes.TypeService: - outputEntry[rule.Output+"k8s.service.name"] = kubeInfo.Name - outputEntry[rule.Output+"k8s.service.uid"] = kubeInfo.UID - } - outputEntry[rule.Output+"k8s.name"] = kubeInfo.Name - outputEntry[rule.Output+"k8s.type"] = kubeInfo.Type - outputEntry[rule.Output+"k8s.owner.name"] = kubeInfo.Owner.Name - outputEntry[rule.Output+"k8s.owner.type"] = kubeInfo.Owner.Type - if rule.Parameters != "" { - for labelKey, labelValue := range kubeInfo.Labels { - outputEntry[rule.Parameters+"."+labelKey] = labelValue - } - } - if kubeInfo.HostIP != "" { - outputEntry[rule.Output+"k8s.host.ip"] = kubeInfo.HostIP - if kubeInfo.HostName != "" { - outputEntry[rule.Output+"k8s.host.name"] = kubeInfo.HostName - } - } - fillInK8sZone(outputEntry, rule, *kubeInfo, "k8s.zone") - } -} - -const nodeZoneLabelName = "topology.kubernetes.io/zone" - -func fillInK8sZone(outputEntry config.GenericMap, rule api.NetworkTransformRule, kubeInfo kubernetes.Info, zonePrefix string) { - if rule.Kubernetes == nil || !rule.Kubernetes.AddZone { - //Nothing to do - return - } - switch kubeInfo.Type { - case kubernetes.TypeNode: - zone, ok := kubeInfo.Labels[nodeZoneLabelName] - if ok { - outputEntry[rule.Output+zonePrefix] = zone - } - return - case kubernetes.TypePod: - nodeInfo, err := kubernetes.Data.GetNodeInfo(kubeInfo.HostName) - if err != nil { - logrus.WithError(err).Tracef("can't find nodes info for node %v", kubeInfo.HostName) - return - } - if nodeInfo != nil { - zone, ok := nodeInfo.Labels[nodeZoneLabelName] - if ok { - outputEntry[rule.Output+zonePrefix] = zone - } - } - return - - case kubernetes.TypeService: - //A service is not assigned to a dedicated zone, skipping - return - } -} - -func fillInK8sInfra(outputEntry config.GenericMap, rule api.NetworkTransformRule) { - if rule.KubernetesInfra == nil { - logrus.Error("transformation rule: Missing Kubernetes Infra configuration ") - return - } - outputEntry[rule.KubernetesInfra.Output] = "infra" - for _, input := range rule.KubernetesInfra.Inputs { - if objectIsApp(fmt.Sprintf("%s", outputEntry[input]), rule.KubernetesInfra.InfraPrefix) { - outputEntry[rule.KubernetesInfra.Output] = "app" - return - } - } -} - -const openshiftNamespacePrefix = "openshift-" -const openshiftPrefixLen = len(openshiftNamespacePrefix) - -func objectIsApp(addr string, additionalInfraPrefix string) bool { - obj, err := kubernetes.Data.GetInfo(addr) - if err != nil { - logrus.WithError(err).Tracef("can't find kubernetes info for IP %s", addr) - return false - } - nsLen := len(obj.Namespace) - additionalPrefixLen := len(additionalInfraPrefix) - if nsLen == 0 { - return false - } - if nsLen >= openshiftPrefixLen && obj.Namespace[:openshiftPrefixLen] == openshiftNamespacePrefix { - return false - } - if nsLen >= additionalPrefixLen && obj.Namespace[:additionalPrefixLen] == additionalInfraPrefix { - return false - } - //Special case with openshift and kubernetes service in default namespace - if obj.Namespace == "default" && (obj.Name == "kubernetes" || obj.Name == "openshift") { - return false - } - return true -} - func (n *Network) categorizeIP(ip net.IP) string { if ip != nil { for _, subnetCat := range n.categories { @@ -313,7 +173,7 @@ func NewTransformNetwork(params config.StageParam) (Transformer, error) { } if needToInitKubeData { - err := kubernetes.Data.InitFromConfig(jsonNetworkTransform.KubeConfigPath) + err := kubernetes.InitFromConfig(jsonNetworkTransform.KubeConfigPath) if err != nil { return nil, err } diff --git a/pkg/pipeline/transform/transform_network_test.go b/pkg/pipeline/transform/transform_network_test.go index 8b23577b1..58a2409f0 100644 --- a/pkg/pipeline/transform/transform_network_test.go +++ b/pkg/pipeline/transform/transform_network_test.go @@ -18,18 +18,15 @@ package transform import ( - "errors" "os" "path" "testing" "github.com/netobserv/flowlogs-pipeline/pkg/api" "github.com/netobserv/flowlogs-pipeline/pkg/config" - "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/transform/kubernetes" "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/transform/location" netdb "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/transform/netdb" "github.com/netobserv/flowlogs-pipeline/pkg/test" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -204,48 +201,6 @@ func InitNewTransformNetwork(t *testing.T, configFile string) Transformer { return newTransform } -func TestTransform_K8sEmptyNamespace(t *testing.T) { - kubernetes.Data = &fakeKubeData{} - nt := Network{ - TransformNetwork: api.TransformNetwork{ - Rules: api.NetworkTransformRules{{ - Type: api.OpAddKubernetes, - Input: "SrcAddr", - Output: "SrcK8s", - }, { - Type: api.OpAddKubernetes, - Input: "DstAddr", - Output: "DstK8s", - }}, - }, - } - // We need to check that, whether it returns NotFound or just an empty namespace, - // there is no map entry for that namespace (an empty-valued map entry is not valid) - out, _ := nt.Transform(config.GenericMap{ - "SrcAddr": "1.2.3.4", // would return an empty namespace - "DstAddr": "3.2.1.0", // would return NotFound - }) - assert.NotContains(t, out, "SrcK8s_Namespace") - assert.NotContains(t, out, "DstK8s_Namespace") -} - -type fakeKubeData struct{} - -func (d *fakeKubeData) InitFromConfig(_ string) error { - return nil -} -func (*fakeKubeData) GetInfo(n string) (*kubernetes.Info, error) { - // If found, returns an empty info (empty namespace) - if n == "1.2.3.4" { - return &kubernetes.Info{}, nil - } - return nil, errors.New("notFound") -} - -func (*fakeKubeData) GetNodeInfo(n string) (*kubernetes.Info, error) { - return nil, nil -} - func Test_Categorize(t *testing.T) { entry := config.GenericMap{ "addr1": "10.1.2.3",