diff --git a/api/v1alpha1/zz_generated.conversion.go b/api/v1alpha1/zz_generated.conversion.go index adc762641..d387ffe17 100644 --- a/api/v1alpha1/zz_generated.conversion.go +++ b/api/v1alpha1/zz_generated.conversion.go @@ -659,6 +659,7 @@ func autoConvert_v1beta1_FlowCollectorFLP_To_v1alpha1_FlowCollectorFLP(in *v1bet // WARNING: in.ConversationHeartbeatInterval requires manual conversion: does not exist in peer-type // WARNING: in.ConversationEndTimeout requires manual conversion: does not exist in peer-type // WARNING: in.ConversationTerminatingTimeout requires manual conversion: does not exist in peer-type + // WARNING: in.ClusterName requires manual conversion: does not exist in peer-type if err := Convert_v1beta1_DebugConfig_To_v1alpha1_DebugConfig(&in.Debug, &out.Debug, s); err != nil { return err } diff --git a/api/v1beta1/flowcollector_types.go b/api/v1beta1/flowcollector_types.go index ed9088352..25247f202 100644 --- a/api/v1beta1/flowcollector_types.go +++ b/api/v1beta1/flowcollector_types.go @@ -448,6 +448,11 @@ type FlowCollectorFLP struct { // `conversationTerminatingTimeout` is the time to wait from detected FIN flag to end a conversation. Only relevant for TCP flows. ConversationTerminatingTimeout *metav1.Duration `json:"conversationTerminatingTimeout,omitempty"` + //+kubebuilder:default:="" + // +optional + // `clusterName` is the name of the cluster to appear in the flows data. This is useful in a multi-cluster context. When using OpenShift, leave empty to make it automatically determined. + ClusterName string `json:"clusterName,omitempty"` + // `debug` allows setting some aspects of the internal configuration of the flow processor. // This section is aimed exclusively for debugging and fine-grained performance optimizations, // such as GOGC and GOMAXPROCS env vars. Users setting its values do it at their own risk. diff --git a/bundle/manifests/flows.netobserv.io_flowcollectors.yaml b/bundle/manifests/flows.netobserv.io_flowcollectors.yaml index 284d27d4a..452ef7cec 100644 --- a/bundle/manifests/flows.netobserv.io_flowcollectors.yaml +++ b/bundle/manifests/flows.netobserv.io_flowcollectors.yaml @@ -3793,6 +3793,12 @@ spec: and forwards them to the Loki persistence layer and/or any available exporter.' properties: + clusterName: + default: "" + description: '`clusterName` is the name of the cluster to appear + in the flows data. This is useful in a multi-cluster context. + When using OpenShift, leave empty to make it automatically determined.' + type: string conversationEndTimeout: default: 10s description: '`conversationEndTimeout` is the time to wait after diff --git a/bundle/manifests/netobserv-operator.clusterserviceversion.yaml b/bundle/manifests/netobserv-operator.clusterserviceversion.yaml index f9bab3791..9f49ca4e3 100644 --- a/bundle/manifests/netobserv-operator.clusterserviceversion.yaml +++ b/bundle/manifests/netobserv-operator.clusterserviceversion.yaml @@ -516,6 +516,14 @@ spec: - patch - update - watch + - apiGroups: + - config.openshift.io + resources: + - clusterversions + verbs: + - get + - list + - watch - apiGroups: - console.openshift.io resources: @@ -614,24 +622,15 @@ spec: - rbac.authorization.k8s.io resources: - clusterrolebindings - - rolebindings - verbs: - - create - - delete - - get - - list - - update - - watch - - apiGroups: - - rbac.authorization.k8s.io - resources: - clusterroles + - rolebindings - roles verbs: - create - delete - get - list + - update - watch - apiGroups: - security.openshift.io diff --git a/config/crd/bases/flows.netobserv.io_flowcollectors.yaml b/config/crd/bases/flows.netobserv.io_flowcollectors.yaml index 690c82e28..c7e0d39c7 100644 --- a/config/crd/bases/flows.netobserv.io_flowcollectors.yaml +++ b/config/crd/bases/flows.netobserv.io_flowcollectors.yaml @@ -3780,6 +3780,12 @@ spec: and forwards them to the Loki persistence layer and/or any available exporter.' properties: + clusterName: + default: "" + description: '`clusterName` is the name of the cluster to appear + in the flows data. This is useful in a multi-cluster context. + When using OpenShift, leave empty to make it automatically determined.' + type: string conversationEndTimeout: default: 10s description: '`conversationEndTimeout` is the time to wait after diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml index 000b593c2..912fb0f76 100644 --- a/config/rbac/role.yaml +++ b/config/rbac/role.yaml @@ -57,6 +57,14 @@ rules: - patch - update - watch +- apiGroups: + - config.openshift.io + resources: + - clusterversions + verbs: + - get + - list + - watch - apiGroups: - console.openshift.io resources: @@ -155,24 +163,15 @@ rules: - rbac.authorization.k8s.io resources: - clusterrolebindings - - rolebindings - verbs: - - create - - delete - - get - - list - - update - - watch -- apiGroups: - - rbac.authorization.k8s.io - resources: - clusterroles + - rolebindings - roles verbs: - create - delete - get - list + - update - watch - apiGroups: - security.openshift.io diff --git a/config/samples/flows_v1beta1_flowcollector.yaml b/config/samples/flows_v1beta1_flowcollector.yaml index 1ad864b79..cbd3dce11 100644 --- a/config/samples/flows_v1beta1_flowcollector.yaml +++ b/config/samples/flows_v1beta1_flowcollector.yaml @@ -58,6 +58,8 @@ spec: conversationTerminatingTimeout: 5s conversationHeartbeatInterval: 30s conversationEndTimeout: 10s + # Append a unique cluster name to each record + # clusterName: kafka: address: "kafka-cluster-kafka-bootstrap.netobserv" topic: network-flows diff --git a/controllers/flowcollector_controller.go b/controllers/flowcollector_controller.go index 5ac064aa9..d9b794bc7 100644 --- a/controllers/flowcollector_controller.go +++ b/controllers/flowcollector_controller.go @@ -5,6 +5,8 @@ import ( "fmt" "net" + "github.com/netobserv/network-observability-operator/controllers/globals" + configv1 "github.com/openshift/api/config/v1" osv1alpha1 "github.com/openshift/api/console/v1alpha1" securityv1 "github.com/openshift/api/security/v1" appsv1 "k8s.io/api/apps/v1" @@ -64,8 +66,7 @@ func NewFlowCollectorReconciler(client client.Client, scheme *runtime.Scheme, co //+kubebuilder:rbac:groups=apps,resources=deployments;daemonsets,verbs=get;list;watch;create;update;patch;delete //+kubebuilder:rbac:groups=core,resources=namespaces;services;serviceaccounts;configmaps;secrets,verbs=get;list;watch;create;update;patch;delete //+kubebuilder:rbac:groups=core,resources=endpoints,verbs=get;list;watch -//+kubebuilder:rbac:groups=rbac.authorization.k8s.io,resources=clusterroles;roles,verbs=get;create;delete;watch;list -//+kubebuilder:rbac:groups=rbac.authorization.k8s.io,resources=clusterrolebindings;rolebindings,verbs=get;list;create;delete;update;watch +//+kubebuilder:rbac:groups=rbac.authorization.k8s.io,resources=clusterrolebindings;clusterroles;rolebindings;roles,verbs=get;list;create;delete;update;watch //+kubebuilder:rbac:groups=console.openshift.io,resources=consoleplugins,verbs=get;create;delete;update;patch;list;watch //+kubebuilder:rbac:groups=operator.openshift.io,resources=consoles,verbs=get;update;list;update;watch //+kubebuilder:rbac:groups=flows.netobserv.io,resources=flowcollectors,verbs=get;list;watch;create;update;patch;delete @@ -75,6 +76,7 @@ func NewFlowCollectorReconciler(client client.Client, scheme *runtime.Scheme, co //+kubebuilder:rbac:groups=security.openshift.io,resources=securitycontextconstraints,verbs=list;create;update;watch //+kubebuilder:rbac:groups=apiregistration.k8s.io,resources=apiservices,verbs=list;get;watch //+kubebuilder:rbac:groups=monitoring.coreos.com,resources=servicemonitors;prometheusrules,verbs=get;create;delete;update;patch;list;watch +//+kubebuilder:rbac:groups=config.openshift.io,resources=clusterversions,verbs=get;list;watch //+kubebuilder:rbac:urls="/metrics",verbs=get // Reconcile is part of the main kubernetes reconciliation loop which aims to @@ -101,6 +103,18 @@ func (r *FlowCollectorReconciler) Reconcile(ctx context.Context, _ ctrl.Request) var didChange, isInProgress bool previousNamespace := desired.Status.Namespace + + // obtain default cluster ID - api is specific to openshift + if r.permissions.Vendor(ctx) == discover.VendorOpenShift && globals.DefaultClusterID == "" { + cversion := &configv1.ClusterVersion{} + key := client.ObjectKey{Name: "version"} + if err := r.Client.Get(ctx, key, cversion); err != nil { + log.Error(err, "unable to obtain cluster ID") + } else { + globals.DefaultClusterID = cversion.Spec.ClusterID + } + } + reconcilersInfo := r.newCommonInfo(ctx, desired, ns, previousNamespace, func(b bool) { didChange = b }, func(b bool) { isInProgress = b }) err = r.reconcileOperator(ctx, &reconcilersInfo, desired) diff --git a/controllers/flowcollector_controller_iso_test.go b/controllers/flowcollector_controller_iso_test.go index 02140bad5..cb1ade07c 100644 --- a/controllers/flowcollector_controller_iso_test.go +++ b/controllers/flowcollector_controller_iso_test.go @@ -53,6 +53,7 @@ func flowCollectorIsoSpecs() { ConversationHeartbeatInterval: &metav1.Duration{Duration: time.Second}, ConversationEndTimeout: &metav1.Duration{Duration: time.Second}, ConversationTerminatingTimeout: &metav1.Duration{Duration: time.Second}, + ClusterName: "testCluster", Debug: flowslatest.DebugConfig{}, LogTypes: &outputRecordTypes, Metrics: flowslatest.FLPMetrics{ diff --git a/controllers/flowlogspipeline/flp_common_objects.go b/controllers/flowlogspipeline/flp_common_objects.go index bad4e37d5..1ff343cc8 100644 --- a/controllers/flowlogspipeline/flp_common_objects.go +++ b/controllers/flowlogspipeline/flp_common_objects.go @@ -23,6 +23,7 @@ import ( flowslatest "github.com/netobserv/network-observability-operator/api/v1beta1" "github.com/netobserv/network-observability-operator/controllers/constants" + "github.com/netobserv/network-observability-operator/controllers/globals" "github.com/netobserv/network-observability-operator/controllers/reconcilers" "github.com/netobserv/network-observability-operator/pkg/filters" "github.com/netobserv/network-observability-operator/pkg/helper" @@ -279,16 +280,112 @@ func (b *builder) addTransformStages(stage *config.PipelineBuilderStage) error { lastStage := *stage indexFields := constants.LokiIndexFields - // Filter-out unused fields? - if helper.PtrBool(b.desired.Processor.DropUnusedFields) { - if helper.UseIPFIX(b.desired) { - lastStage = lastStage.TransformFilter("filter", api.TransformFilter{ - Rules: filters.GetOVSGoflowUnusedRules(), - }) + lastStage = b.addTransformFilter(lastStage) + + indexFields, lastStage = b.addConnectionTracking(indexFields, lastStage) + + // enrich stage (transform) configuration + enrichedStage := lastStage.TransformNetwork("enrich", api.TransformNetwork{ + Rules: api.NetworkTransformRules{{ + Input: "SrcAddr", + Output: "SrcK8S", + Type: api.AddKubernetesRuleType, + }, { + Input: "DstAddr", + Output: "DstK8S", + Type: api.AddKubernetesRuleType, + }, { + Type: api.ReinterpretDirectionRuleType, + }}, + DirectionInfo: api.NetworkTransformDirectionInfo{ + ReporterIPField: "AgentIP", + SrcHostField: "SrcK8S_HostIP", + DstHostField: "DstK8S_HostIP", + FlowDirectionField: "FlowDirection", + IfDirectionField: "IfDirection", + }, + }) + + // loki stage (write) configuration + if helper.UseLoki(b.desired) { + lokiWrite := api.WriteLoki{ + Labels: indexFields, + BatchSize: int(b.desired.Loki.BatchSize), + BatchWait: helper.UnstructuredDuration(b.desired.Loki.BatchWait), + MaxBackoff: helper.UnstructuredDuration(b.desired.Loki.MaxBackoff), + MaxRetries: int(helper.PtrInt32(b.desired.Loki.MaxRetries)), + MinBackoff: helper.UnstructuredDuration(b.desired.Loki.MinBackoff), + StaticLabels: model.LabelSet{}, + Timeout: helper.UnstructuredDuration(b.desired.Loki.Timeout), + URL: b.desired.Loki.URL, + TimestampLabel: "TimeFlowEndMs", + TimestampScale: "1ms", + TenantID: b.desired.Loki.TenantID, } - // Else: nothing for eBPF at the moment + + for k, v := range b.desired.Loki.StaticLabels { + lokiWrite.StaticLabels[model.LabelName(k)] = model.LabelValue(v) + } + + var authorization *promConfig.Authorization + if helper.LokiUseHostToken(&b.desired.Loki) || helper.LokiForwardUserToken(&b.desired.Loki) { + b.volumes.AddToken(constants.FLPName) + authorization = &promConfig.Authorization{ + Type: "Bearer", + CredentialsFile: constants.TokensPath + constants.FLPName, + } + } + + if b.desired.Loki.TLS.Enable { + if b.desired.Loki.TLS.InsecureSkipVerify { + lokiWrite.ClientConfig = &promConfig.HTTPClientConfig{ + Authorization: authorization, + TLSConfig: promConfig.TLSConfig{ + InsecureSkipVerify: true, + }, + } + } else { + caPath := b.volumes.AddCACertificate(&b.desired.Loki.TLS, "loki-certs") + lokiWrite.ClientConfig = &promConfig.HTTPClientConfig{ + Authorization: authorization, + TLSConfig: promConfig.TLSConfig{ + CAFile: caPath, + }, + } + } + } else { + lokiWrite.ClientConfig = &promConfig.HTTPClientConfig{ + Authorization: authorization, + } + } + enrichedStage.WriteLoki("loki", lokiWrite) + } + + // write on Stdout if logging trace enabled + if b.desired.Processor.LogLevel == "trace" { + enrichedStage.WriteStdout("stdout", api.WriteStdout{Format: "json"}) + } + + // obtain encode_prometheus stage from metrics_definitions + promMetrics, err := b.obtainMetricsConfiguration() + if err != nil { + return err } + if len(promMetrics) > 0 { + // prometheus stage (encode) configuration + promEncode := api.PromEncode{ + Prefix: "netobserv_", + Metrics: promMetrics, + } + enrichedStage.EncodePrometheus("prometheus", promEncode) + } + + b.addCustomExportStages(&enrichedStage) + return nil +} + +func (b *builder) addConnectionTracking(indexFields []string, lastStage config.PipelineBuilderStage) ([]string, config.PipelineBuilderStage) { outputFields := []api.OutputField{ { Name: "Bytes", @@ -429,106 +526,43 @@ func (b *builder) addTransformStages(stage *config.PipelineBuilderStage) error { }) } + return indexFields, lastStage +} - // enrich stage (transform) configuration - enrichedStage := lastStage.TransformNetwork("enrich", api.TransformNetwork{ - Rules: api.NetworkTransformRules{{ - Input: "SrcAddr", - Output: "SrcK8S", - Type: api.AddKubernetesRuleType, - }, { - Input: "DstAddr", - Output: "DstK8S", - Type: api.AddKubernetesRuleType, - }, { - Type: api.ReinterpretDirectionRuleType, - }}, - DirectionInfo: api.NetworkTransformDirectionInfo{ - ReporterIPField: "AgentIP", - SrcHostField: "SrcK8S_HostIP", - DstHostField: "DstK8S_HostIP", - FlowDirectionField: "FlowDirection", - IfDirectionField: "IfDirection", - }, - }) - - // loki stage (write) configuration - if helper.UseLoki(b.desired) { - lokiWrite := api.WriteLoki{ - Labels: indexFields, - BatchSize: int(b.desired.Loki.BatchSize), - BatchWait: helper.UnstructuredDuration(b.desired.Loki.BatchWait), - MaxBackoff: helper.UnstructuredDuration(b.desired.Loki.MaxBackoff), - MaxRetries: int(helper.PtrInt32(b.desired.Loki.MaxRetries)), - MinBackoff: helper.UnstructuredDuration(b.desired.Loki.MinBackoff), - StaticLabels: model.LabelSet{}, - Timeout: helper.UnstructuredDuration(b.desired.Loki.Timeout), - URL: b.desired.Loki.URL, - TimestampLabel: "TimeFlowEndMs", - TimestampScale: "1ms", - TenantID: b.desired.Loki.TenantID, - } - - for k, v := range b.desired.Loki.StaticLabels { - lokiWrite.StaticLabels[model.LabelName(k)] = model.LabelValue(v) - } - - var authorization *promConfig.Authorization - if helper.LokiUseHostToken(&b.desired.Loki) || helper.LokiForwardUserToken(&b.desired.Loki) { - b.volumes.AddToken(constants.FLPName) - authorization = &promConfig.Authorization{ - Type: "Bearer", - CredentialsFile: constants.TokensPath + constants.FLPName, - } - } - - if b.desired.Loki.TLS.Enable { - if b.desired.Loki.TLS.InsecureSkipVerify { - lokiWrite.ClientConfig = &promConfig.HTTPClientConfig{ - Authorization: authorization, - TLSConfig: promConfig.TLSConfig{ - InsecureSkipVerify: true, - }, - } - } else { - caPath := b.volumes.AddCACertificate(&b.desired.Loki.TLS, "loki-certs") - lokiWrite.ClientConfig = &promConfig.HTTPClientConfig{ - Authorization: authorization, - TLSConfig: promConfig.TLSConfig{ - CAFile: caPath, - }, - } - } - } else { - lokiWrite.ClientConfig = &promConfig.HTTPClientConfig{ - Authorization: authorization, - } - } - enrichedStage.WriteLoki("loki", lokiWrite) - } +func (b *builder) addTransformFilter(lastStage config.PipelineBuilderStage) config.PipelineBuilderStage { + var clusterName string + transformFilterRules := []api.TransformFilterRule{} - // write on Stdout if logging trace enabled - if b.desired.Processor.LogLevel == "trace" { - enrichedStage.WriteStdout("stdout", api.WriteStdout{Format: "json"}) + if b.desired.Processor.ClusterName != "" { + clusterName = b.desired.Processor.ClusterName + } else { + //take clustername from openshift + clusterName = string(globals.DefaultClusterID) } - - // obtain encode_prometheus stage from metrics_definitions - promMetrics, err := b.obtainMetricsConfiguration() - if err != nil { - return err + if clusterName != "" { + transformFilterRules = []api.TransformFilterRule{ + { + Input: "K8S_ClusterName", + Type: "add_field_if_doesnt_exist", + Value: clusterName, + }, + } } - if len(promMetrics) > 0 { - // prometheus stage (encode) configuration - promEncode := api.PromEncode{ - Prefix: "netobserv_", - Metrics: promMetrics, + // Filter-out unused fields? + if helper.PtrBool(b.desired.Processor.DropUnusedFields) { + if helper.UseIPFIX(b.desired) { + rules := filters.GetOVSGoflowUnusedRules() + transformFilterRules = append(transformFilterRules, rules...) } - enrichedStage.EncodePrometheus("prometheus", promEncode) + // Else: nothing for eBPF at the moment } - - b.addCustomExportStages(&enrichedStage) - return nil + if len(transformFilterRules) > 0 { + lastStage = lastStage.TransformFilter("filter", api.TransformFilter{ + Rules: transformFilterRules, + }) + } + return lastStage } func (b *builder) addCustomExportStages(enrichedStage *config.PipelineBuilderStage) { diff --git a/controllers/globals/globals.go b/controllers/globals/globals.go new file mode 100644 index 000000000..995c74c71 --- /dev/null +++ b/controllers/globals/globals.go @@ -0,0 +1,6 @@ +// Package globals defines some variables that are shared across multiple packages +package globals + +import "github.com/openshift/api/config/v1" + +var DefaultClusterID v1.ClusterID diff --git a/docs/FlowCollector.md b/docs/FlowCollector.md index 41f4b42fd..a5606a1fa 100644 --- a/docs/FlowCollector.md +++ b/docs/FlowCollector.md @@ -6754,6 +6754,15 @@ TLS client configuration for Loki URL. + clusterName + string + + `clusterName` is the name of the cluster to appear in the flows data. This is useful in a multi-cluster context. When using OpenShift, leave empty to make it automatically determined.
+
+ Default:
+ + false + conversationEndTimeout string