From 1a76dc345a92945db018f4a22bc618b72199693c Mon Sep 17 00:00:00 2001 From: Julien Pinsonneau Date: Thu, 14 Dec 2023 13:20:42 +0100 Subject: [PATCH] read defaults from CRD --- Dockerfile | 1 + api/v1alpha1/flowcollector_types.go | 2 +- api/v1alpha1/flowcollector_webhook.go | 10 + api/v1alpha1/zz_generated.conversion.go | 31 +-- api/v1beta1/flowcollector_types.go | 6 +- api/v1beta1/flowcollector_webhook.go | 113 ++++++++--- api/v1beta1/zz_generated.conversion.go | 46 ++--- api/v1beta2/flowcollector_types.go | 40 ++-- api/v1beta2/zz_generated.deepcopy.go | 48 ++++- .../flows.netobserv.io_flowcollectors.yaml | 1 + .../flows.netobserv.io_flowcollectors.yaml | 1 + .../consoleplugin/consoleplugin_objects.go | 10 +- .../consoleplugin/consoleplugin_reconciler.go | 6 +- controllers/ebpf/agent_controller.go | 7 +- .../flowcollector_controller_console_test.go | 4 +- .../flowcollector_controller_ebpf_test.go | 6 +- .../flowcollector_controller_iso_test.go | 16 +- controllers/flowcollector_controller_test.go | 27 ++- controllers/flp/flp_common_objects.go | 27 +-- controllers/flp/flp_controller_test.go | 16 +- controllers/flp/flp_pipeline_builder.go | 40 ++-- controllers/flp/flp_test.go | 12 +- controllers/ovs/flowsconfig_cno_reconciler.go | 2 +- .../ovs/flowsconfig_ovnk_reconciler.go | 2 +- docs/FlowCollector.md | 1 + ...ned.flows.netobserv.io_flowcollectors.yaml | 1 + main.go | 13 ++ pkg/helper/crd.go | 180 ++++++++++++++++++ pkg/helper/flowcollector.go | 103 ++++++++++ pkg/helper/helpers_test.go | 61 ++++++ 30 files changed, 647 insertions(+), 186 deletions(-) create mode 100644 pkg/helper/crd.go diff --git a/Dockerfile b/Dockerfile index 61019331c..b0bae54e5 100644 --- a/Dockerfile +++ b/Dockerfile @@ -17,6 +17,7 @@ COPY main.go main.go COPY api/ api/ COPY controllers/ controllers/ COPY pkg/ pkg/ +COPY config/ config/ # Build RUN CGO_ENABLED=0 GOOS=linux GOARCH=$TARGETARCH GO111MODULE=on go build -ldflags "-X 'main.buildVersion=$BUILD_VERSION' -X 'main.buildDate=`date +%Y-%m-%d\ %H:%M`'" -mod vendor -a -o manager main.go diff --git a/api/v1alpha1/flowcollector_types.go b/api/v1alpha1/flowcollector_types.go index 1fe8984ad..2cebf1556 100644 --- a/api/v1alpha1/flowcollector_types.go +++ b/api/v1alpha1/flowcollector_types.go @@ -188,7 +188,7 @@ type FlowCollectorEBPF struct { // excludeInterfaces contains the interface names that will be excluded from flow tracing. // If an entry is enclosed by slashes (such as `/br-/`), it will match as regular expression, // otherwise it will be matched as a case-sensitive string. - //+kubebuilder:default=lo; + //+kubebuilder:default:=lo; ExcludeInterfaces []string `json:"excludeInterfaces,omitempty"` //+kubebuilder:validation:Enum=trace;debug;info;warn;error;fatal;panic diff --git a/api/v1alpha1/flowcollector_webhook.go b/api/v1alpha1/flowcollector_webhook.go index 07422a750..22af2b56e 100644 --- a/api/v1alpha1/flowcollector_webhook.go +++ b/api/v1alpha1/flowcollector_webhook.go @@ -382,3 +382,13 @@ func Convert_v1beta2_DebugProcessorConfig_To_v1alpha1_DebugConfig(in *v1beta2.De out.Env = in.Env return nil } + +// This function need to be manually created because conversion-gen not able to create it intentionally because +// we have new defined fields in v1beta2 not in v1alpha1 +// nolint:golint,stylecheck,revive +func Convert_v1alpha1_FlowCollectorEBPF_To_v1beta2_FlowCollectorEBPF(in *FlowCollectorEBPF, out *v1beta2.FlowCollectorEBPF, s apiconversion.Scope) error { + out.Debug = &v1beta2.DebugAgentConfig{ + Env: in.Debug.Env, + } + return autoConvert_v1alpha1_FlowCollectorEBPF_To_v1beta2_FlowCollectorEBPF(in, out, s) +} diff --git a/api/v1alpha1/zz_generated.conversion.go b/api/v1alpha1/zz_generated.conversion.go index 2ce437783..112a7fe20 100644 --- a/api/v1alpha1/zz_generated.conversion.go +++ b/api/v1alpha1/zz_generated.conversion.go @@ -98,11 +98,6 @@ func RegisterConversions(s *runtime.Scheme) error { }); err != nil { return err } - if err := s.AddGeneratedConversionFunc((*FlowCollectorEBPF)(nil), (*v1beta2.FlowCollectorEBPF)(nil), func(a, b interface{}, scope conversion.Scope) error { - return Convert_v1alpha1_FlowCollectorEBPF_To_v1beta2_FlowCollectorEBPF(a.(*FlowCollectorEBPF), b.(*v1beta2.FlowCollectorEBPF), scope) - }); err != nil { - return err - } if err := s.AddGeneratedConversionFunc((*FlowCollectorIPFIX)(nil), (*v1beta2.FlowCollectorIPFIX)(nil), func(a, b interface{}, scope conversion.Scope) error { return Convert_v1alpha1_FlowCollectorIPFIX_To_v1beta2_FlowCollectorIPFIX(a.(*FlowCollectorIPFIX), b.(*v1beta2.FlowCollectorIPFIX), scope) }); err != nil { @@ -208,6 +203,11 @@ func RegisterConversions(s *runtime.Scheme) error { }); err != nil { return err } + if err := s.AddConversionFunc((*FlowCollectorEBPF)(nil), (*v1beta2.FlowCollectorEBPF)(nil), func(a, b interface{}, scope conversion.Scope) error { + return Convert_v1alpha1_FlowCollectorEBPF_To_v1beta2_FlowCollectorEBPF(a.(*FlowCollectorEBPF), b.(*v1beta2.FlowCollectorEBPF), scope) + }); err != nil { + return err + } if err := s.AddConversionFunc((*FlowCollectorExporter)(nil), (*v1beta2.FlowCollectorExporter)(nil), func(a, b interface{}, scope conversion.Scope) error { return Convert_v1alpha1_FlowCollectorExporter_To_v1beta2_FlowCollectorExporter(a.(*FlowCollectorExporter), b.(*v1beta2.FlowCollectorExporter), scope) }); err != nil { @@ -565,17 +565,10 @@ func autoConvert_v1alpha1_FlowCollectorEBPF_To_v1beta2_FlowCollectorEBPF(in *Flo out.LogLevel = in.LogLevel out.Privileged = in.Privileged out.KafkaBatchSize = in.KafkaBatchSize - if err := Convert_v1alpha1_DebugConfig_To_v1beta2_DebugAgentConfig(&in.Debug, &out.Debug, s); err != nil { - return err - } + // WARNING: in.Debug requires manual conversion: inconvertible types (./api/v1alpha1.DebugConfig vs *github.com/netobserv/network-observability-operator/api/v1beta2.DebugAgentConfig) return nil } -// Convert_v1alpha1_FlowCollectorEBPF_To_v1beta2_FlowCollectorEBPF is an autogenerated conversion function. -func Convert_v1alpha1_FlowCollectorEBPF_To_v1beta2_FlowCollectorEBPF(in *FlowCollectorEBPF, out *v1beta2.FlowCollectorEBPF, s conversion.Scope) error { - return autoConvert_v1alpha1_FlowCollectorEBPF_To_v1beta2_FlowCollectorEBPF(in, out, s) -} - func autoConvert_v1beta2_FlowCollectorEBPF_To_v1alpha1_FlowCollectorEBPF(in *v1beta2.FlowCollectorEBPF, out *FlowCollectorEBPF, s conversion.Scope) error { out.ImagePullPolicy = in.ImagePullPolicy out.Resources = in.Resources @@ -587,9 +580,7 @@ func autoConvert_v1beta2_FlowCollectorEBPF_To_v1alpha1_FlowCollectorEBPF(in *v1b out.LogLevel = in.LogLevel out.Privileged = in.Privileged out.KafkaBatchSize = in.KafkaBatchSize - if err := Convert_v1beta2_DebugAgentConfig_To_v1alpha1_DebugConfig(&in.Debug, &out.Debug, s); err != nil { - return err - } + // WARNING: in.Debug requires manual conversion: inconvertible types (*github.com/netobserv/network-observability-operator/api/v1beta2.DebugAgentConfig vs ./api/v1alpha1.DebugConfig) // WARNING: in.Features requires manual conversion: does not exist in peer-type return nil } @@ -636,9 +627,7 @@ func autoConvert_v1alpha1_FlowCollectorFLP_To_v1beta2_FlowCollectorFLP(in *FlowC } out.KafkaConsumerQueueCapacity = in.KafkaConsumerQueueCapacity out.KafkaConsumerBatchSize = in.KafkaConsumerBatchSize - if err := Convert_v1alpha1_DebugConfig_To_v1beta2_DebugProcessorConfig(&in.Debug, &out.Debug, s); err != nil { - return err - } + // WARNING: in.Debug requires manual conversion: inconvertible types (./api/v1alpha1.DebugConfig vs *github.com/netobserv/network-observability-operator/api/v1beta2.DebugProcessorConfig) return nil } @@ -663,9 +652,7 @@ func autoConvert_v1beta2_FlowCollectorFLP_To_v1alpha1_FlowCollectorFLP(in *v1bet // WARNING: in.LogTypes requires manual conversion: does not exist in peer-type // WARNING: in.ClusterName requires manual conversion: does not exist in peer-type // WARNING: in.MultiClusterDeployment requires manual conversion: does not exist in peer-type - if err := Convert_v1beta2_DebugProcessorConfig_To_v1alpha1_DebugConfig(&in.Debug, &out.Debug, s); err != nil { - return err - } + // WARNING: in.Debug requires manual conversion: inconvertible types (*github.com/netobserv/network-observability-operator/api/v1beta2.DebugProcessorConfig vs ./api/v1alpha1.DebugConfig) return nil } diff --git a/api/v1beta1/flowcollector_types.go b/api/v1beta1/flowcollector_types.go index 4e893987a..d0227efea 100644 --- a/api/v1beta1/flowcollector_types.go +++ b/api/v1beta1/flowcollector_types.go @@ -203,7 +203,7 @@ type FlowCollectorEBPF struct { // `excludeInterfaces` contains the interface names that are excluded from flow tracing. // An entry enclosed by slashes, such as `/br-/`, is matched as a regular expression. // Otherwise it is matched as a case-sensitive string. - //+kubebuilder:default=lo; + //+kubebuilder:default:=lo; //+optional ExcludeInterfaces []string `json:"excludeInterfaces"` @@ -588,11 +588,11 @@ type FlowCollectorLoki struct { // A timeout of zero means no timeout. Timeout *metav1.Duration `json:"timeout,omitempty"` // Warning: keep as pointer, else default is ignored - //+kubebuilder:default="1s" + //+kubebuilder:default:="1s" // `minBackoff` is the initial backoff time for client connection between retries. MinBackoff *metav1.Duration `json:"minBackoff,omitempty"` // Warning: keep as pointer, else default is ignored - //+kubebuilder:default="5s" + //+kubebuilder:default:="5s" // `maxBackoff` is the maximum backoff time for client connection between retries. MaxBackoff *metav1.Duration `json:"maxBackoff,omitempty"` // Warning: keep as pointer, else default is ignored diff --git a/api/v1beta1/flowcollector_webhook.go b/api/v1beta1/flowcollector_webhook.go index 0a223403c..99bb6bc75 100644 --- a/api/v1beta1/flowcollector_webhook.go +++ b/api/v1beta1/flowcollector_webhook.go @@ -18,6 +18,7 @@ package v1beta1 import ( "fmt" + "reflect" "github.com/netobserv/network-observability-operator/api/v1beta2" utilconversion "github.com/netobserv/network-observability-operator/pkg/conversion" @@ -82,10 +83,24 @@ func Convert_v1beta1_FlowCollector_To_v1beta2_FlowCollector(in *FlowCollector, o out.Spec.Processor.LokiTimeout = in.Spec.Loki.Timeout out.Spec.Processor.LokiBatchWait = in.Spec.Loki.BatchWait out.Spec.Processor.LokiBatchSize = in.Spec.Loki.BatchSize - out.Spec.Processor.Debug.LokiMinBackoff = in.Spec.Loki.MinBackoff - out.Spec.Processor.Debug.LokiMaxBackoff = in.Spec.Loki.MaxBackoff - out.Spec.Processor.Debug.LokiMaxRetries = in.Spec.Loki.MaxRetries - out.Spec.Processor.Debug.LokiStaticLabels = in.Spec.Loki.StaticLabels + // fill cross object (Loki -> Processor) debug config + debugPath := helper.ProcessorDebugPath + out.Spec.Processor.Debug.LokiMinBackoff = helper.GetDebugDurationValue(debugPath, "lokiMinBackoff", in.Spec.Loki.MinBackoff) + out.Spec.Processor.Debug.LokiMaxBackoff = helper.GetDebugDurationValue(debugPath, "lokiMaxBackoff", in.Spec.Loki.MaxBackoff) + out.Spec.Processor.Debug.LokiMaxRetries = helper.GetDebugInt32Value(debugPath, "lokiMaxRetries", in.Spec.Loki.MaxRetries) + out.Spec.Processor.Debug.LokiStaticLabels = helper.GetDebugMapValue(debugPath, "lokiStaticLabels", &in.Spec.Loki.StaticLabels) + // clear Processor debug config if default + if reflect.DeepEqual(helper.GetDebugProcessorConfig(nil), helper.GetDebugProcessorConfig(out.Spec.Processor.Debug)) { + out.Spec.Processor.Debug = nil + } + // clear Agent debug config if default + if reflect.DeepEqual(helper.GetDebugAgentConfig(nil), helper.GetDebugAgentConfig(out.Spec.Agent.EBPF.Debug)) { + out.Spec.Agent.EBPF.Debug = nil + } + // clear Plugin debug config if default + if reflect.DeepEqual(helper.GetDebugPluginConfig(nil), helper.GetDebugPluginConfig(out.Spec.ConsolePlugin.Debug)) { + out.Spec.ConsolePlugin.Debug = nil + } return nil } @@ -98,10 +113,12 @@ func Convert_v1beta2_FlowCollector_To_v1beta1_FlowCollector(in *v1beta2.FlowColl out.Spec.Loki.Timeout = in.Spec.Processor.LokiTimeout out.Spec.Loki.BatchWait = in.Spec.Processor.LokiBatchWait out.Spec.Loki.BatchSize = in.Spec.Processor.LokiBatchSize - out.Spec.Loki.MinBackoff = in.Spec.Processor.Debug.LokiMinBackoff - out.Spec.Loki.MaxBackoff = in.Spec.Processor.Debug.LokiMaxBackoff - out.Spec.Loki.MaxRetries = in.Spec.Processor.Debug.LokiMaxRetries - out.Spec.Loki.StaticLabels = in.Spec.Processor.Debug.LokiStaticLabels + if in.Spec.Processor.Debug != nil { + out.Spec.Loki.MinBackoff = in.Spec.Processor.Debug.LokiMinBackoff + out.Spec.Loki.MaxBackoff = in.Spec.Processor.Debug.LokiMaxBackoff + out.Spec.Loki.MaxRetries = in.Spec.Processor.Debug.LokiMaxRetries + out.Spec.Loki.StaticLabels = helper.GetValueOrDefaultMapString(helper.ProcessorDebugPath, "lokiStaticLabels", in.Spec.Processor.Debug.LokiStaticLabels) + } return nil } @@ -150,19 +167,17 @@ func Convert_v1beta1_FlowCollectorLoki_To_v1beta2_FlowCollectorLoki(in *FlowColl return autoConvert_v1beta1_FlowCollectorLoki_To_v1beta2_FlowCollectorLoki(in, out, s) } -// This function need to be manually created because conversion-gen not able to create it intentionally because -// we have new defined fields in v1beta2 not in v1beta1 -// nolint:golint,stylecheck,revive -func Convert_v1beta2_FLPMetrics_To_v1beta1_FLPMetrics(in *v1beta2.FLPMetrics, out *FLPMetrics, s apiconversion.Scope) error { - return autoConvert_v1beta2_FLPMetrics_To_v1beta1_FLPMetrics(in, out, s) -} - // This function need to be manually created because conversion-gen not able to create it intentionally because // we have new defined fields in v1beta2 not in v1beta1 // nolint:golint,stylecheck,revive func Convert_v1beta1_FlowCollectorConsolePlugin_To_v1beta2_FlowCollectorConsolePlugin(in *FlowCollectorConsolePlugin, out *v1beta2.FlowCollectorConsolePlugin, s apiconversion.Scope) error { - out.Debug.Register = in.Register - out.Debug.Port = in.Port + debugPath := helper.PluginDebugPath + out.Debug = &v1beta2.DebugPluginConfig{ + Env: map[string]string{}, + Args: []string{}, + Register: helper.GetDebugBoolValue(debugPath, "register", in.Register), + Port: helper.GetDebugInt32Value(debugPath, "port", &in.Port), + } return autoConvert_v1beta1_FlowCollectorConsolePlugin_To_v1beta2_FlowCollectorConsolePlugin(in, out, s) } @@ -197,6 +212,13 @@ func Convert_v1beta1_FlowCollectorSpec_To_v1beta2_FlowCollectorSpec(in *FlowColl return nil } +// This function need to be manually created because conversion-gen not able to create it intentionally because +// we have new defined fields in v1beta2 not in v1beta1 +// nolint:golint,stylecheck,revive +func Convert_v1beta2_FLPMetrics_To_v1beta1_FLPMetrics(in *v1beta2.FLPMetrics, out *FLPMetrics, s apiconversion.Scope) error { + return autoConvert_v1beta2_FLPMetrics_To_v1beta1_FLPMetrics(in, out, s) +} + // This function need to be manually created because conversion-gen not able to create it intentionally because // we have camel case enum in v1beta2 which were uppercase in v1beta1 // nolint:golint,stylecheck,revive @@ -250,22 +272,28 @@ func Convert_v1beta1_FlowCollectorFLP_To_v1beta2_FlowCollectorFLP(in *FlowCollec logTypes := v1beta2.FLPLogTypes(utilconversion.UpperToPascal(*in.LogTypes)) out.LogTypes = &logTypes } - out.Debug.Port = in.Port - out.Debug.HealthPort = in.HealthPort - out.Debug.ProfilePort = in.ProfilePort - out.Debug.EnableKubeProbes = in.EnableKubeProbes - out.Debug.DropUnusedFields = in.DropUnusedFields - out.Debug.ConversationHeartbeatInterval = in.ConversationHeartbeatInterval - out.Debug.ConversationEndTimeout = in.ConversationEndTimeout - out.Debug.ConversationTerminatingTimeout = in.ConversationTerminatingTimeout + debugPath := helper.ProcessorDebugPath + out.Debug = &v1beta2.DebugProcessorConfig{ + Env: map[string]string{}, + Port: helper.GetDebugInt32Value(debugPath, "port", &in.Port), + HealthPort: helper.GetDebugInt32Value(debugPath, "healthPort", &in.HealthPort), + ProfilePort: helper.GetDebugInt32Value(debugPath, "profilePort", &in.ProfilePort), + EnableKubeProbes: helper.GetDebugBoolValue(debugPath, "enableKubeProbes", in.EnableKubeProbes), + DropUnusedFields: helper.GetDebugBoolValue(debugPath, "dropUnusedFields", in.DropUnusedFields), + ConversationHeartbeatInterval: helper.GetDebugDurationValue(debugPath, "conversationHeartbeatInterval", in.ConversationHeartbeatInterval), + ConversationEndTimeout: helper.GetDebugDurationValue(debugPath, "conversationEndTimeout", in.ConversationEndTimeout), + ConversationTerminatingTimeout: helper.GetDebugDurationValue(debugPath, "conversationTerminatingTimeout", in.ConversationTerminatingTimeout), + } return autoConvert_v1beta1_FlowCollectorFLP_To_v1beta2_FlowCollectorFLP(in, out, s) } // we have new defined fields in v1beta2 not in v1beta1 // nolint:golint,stylecheck,revive func Convert_v1beta2_FlowCollectorConsolePlugin_To_v1beta1_FlowCollectorConsolePlugin(in *v1beta2.FlowCollectorConsolePlugin, out *FlowCollectorConsolePlugin, s apiconversion.Scope) error { - out.Register = in.Debug.Register - out.Port = in.Debug.Port + if in.Debug != nil { + out.Register = in.Debug.Register + out.Port = helper.GetValueOrDefaultInt32(helper.PluginDebugPath, "port", in.Debug.Port) + } return autoConvert_v1beta2_FlowCollectorConsolePlugin_To_v1beta1_FlowCollectorConsolePlugin(in, out, s) } @@ -289,6 +317,17 @@ func Convert_v1beta2_FlowCollectorFLP_To_v1beta1_FlowCollectorFLP(in *v1beta2.Fl str := utilconversion.PascalToUpper(string(*in.LogTypes), '_') out.LogTypes = &str } + if in.Debug != nil { + debugPath := helper.ProcessorDebugPath + out.Port = helper.GetValueOrDefaultInt32(debugPath, "port", in.Debug.Port) + out.HealthPort = helper.GetValueOrDefaultInt32(debugPath, "healthPort", in.Debug.HealthPort) + out.ProfilePort = helper.GetValueOrDefaultInt32(debugPath, "profilePort", in.Debug.ProfilePort) + out.EnableKubeProbes = in.Debug.EnableKubeProbes + out.DropUnusedFields = in.Debug.DropUnusedFields + out.ConversationHeartbeatInterval = in.Debug.ConversationHeartbeatInterval + out.ConversationEndTimeout = in.Debug.ConversationEndTimeout + out.ConversationTerminatingTimeout = in.Debug.ConversationTerminatingTimeout + } return nil } @@ -400,3 +439,23 @@ func Convert_v1beta2_DebugProcessorConfig_To_v1beta1_DebugConfig(in *v1beta2.Deb out.Env = in.Env return nil } + +// This function need to be manually created because conversion-gen not able to create it intentionally because +// we have new defined fields in v1beta2 not in v1beta1 +// nolint:golint,stylecheck,revive +func Convert_v1beta1_FlowCollectorEBPF_To_v1beta2_FlowCollectorEBPF(in *FlowCollectorEBPF, out *v1beta2.FlowCollectorEBPF, s apiconversion.Scope) error { + out.Debug = &v1beta2.DebugAgentConfig{ + Env: in.Debug.Env, + } + return autoConvert_v1beta1_FlowCollectorEBPF_To_v1beta2_FlowCollectorEBPF(in, out, s) +} + +// This function need to be manually created because conversion-gen not able to create it intentionally because +// we have new defined fields in v1beta2 not in v1beta1 +// nolint:golint,stylecheck,revive +func Convert_v1beta2_FlowCollectorEBPF_To_v1beta1_FlowCollectorEBPF(in *v1beta2.FlowCollectorEBPF, out *FlowCollectorEBPF, s apiconversion.Scope) error { + if in.Debug != nil { + out.Debug.Env = in.Debug.Env + } + return autoConvert_v1beta2_FlowCollectorEBPF_To_v1beta1_FlowCollectorEBPF(in, out, s) +} diff --git a/api/v1beta1/zz_generated.conversion.go b/api/v1beta1/zz_generated.conversion.go index e07539898..03917feb4 100644 --- a/api/v1beta1/zz_generated.conversion.go +++ b/api/v1beta1/zz_generated.conversion.go @@ -88,16 +88,6 @@ func RegisterConversions(s *runtime.Scheme) error { }); err != nil { return err } - if err := s.AddGeneratedConversionFunc((*FlowCollectorEBPF)(nil), (*v1beta2.FlowCollectorEBPF)(nil), func(a, b interface{}, scope conversion.Scope) error { - return Convert_v1beta1_FlowCollectorEBPF_To_v1beta2_FlowCollectorEBPF(a.(*FlowCollectorEBPF), b.(*v1beta2.FlowCollectorEBPF), scope) - }); err != nil { - return err - } - if err := s.AddGeneratedConversionFunc((*v1beta2.FlowCollectorEBPF)(nil), (*FlowCollectorEBPF)(nil), func(a, b interface{}, scope conversion.Scope) error { - return Convert_v1beta2_FlowCollectorEBPF_To_v1beta1_FlowCollectorEBPF(a.(*v1beta2.FlowCollectorEBPF), b.(*FlowCollectorEBPF), scope) - }); err != nil { - return err - } if err := s.AddGeneratedConversionFunc((*FlowCollectorIPFIX)(nil), (*v1beta2.FlowCollectorIPFIX)(nil), func(a, b interface{}, scope conversion.Scope) error { return Convert_v1beta1_FlowCollectorIPFIX_To_v1beta2_FlowCollectorIPFIX(a.(*FlowCollectorIPFIX), b.(*v1beta2.FlowCollectorIPFIX), scope) }); err != nil { @@ -203,6 +193,11 @@ func RegisterConversions(s *runtime.Scheme) error { }); err != nil { return err } + if err := s.AddConversionFunc((*FlowCollectorEBPF)(nil), (*v1beta2.FlowCollectorEBPF)(nil), func(a, b interface{}, scope conversion.Scope) error { + return Convert_v1beta1_FlowCollectorEBPF_To_v1beta2_FlowCollectorEBPF(a.(*FlowCollectorEBPF), b.(*v1beta2.FlowCollectorEBPF), scope) + }); err != nil { + return err + } if err := s.AddConversionFunc((*FlowCollectorExporter)(nil), (*v1beta2.FlowCollectorExporter)(nil), func(a, b interface{}, scope conversion.Scope) error { return Convert_v1beta1_FlowCollectorExporter_To_v1beta2_FlowCollectorExporter(a.(*FlowCollectorExporter), b.(*v1beta2.FlowCollectorExporter), scope) }); err != nil { @@ -268,6 +263,11 @@ func RegisterConversions(s *runtime.Scheme) error { }); err != nil { return err } + if err := s.AddConversionFunc((*v1beta2.FlowCollectorEBPF)(nil), (*FlowCollectorEBPF)(nil), func(a, b interface{}, scope conversion.Scope) error { + return Convert_v1beta2_FlowCollectorEBPF_To_v1beta1_FlowCollectorEBPF(a.(*v1beta2.FlowCollectorEBPF), b.(*FlowCollectorEBPF), scope) + }); err != nil { + return err + } if err := s.AddConversionFunc((*v1beta2.FlowCollectorExporter)(nil), (*FlowCollectorExporter)(nil), func(a, b interface{}, scope conversion.Scope) error { return Convert_v1beta2_FlowCollectorExporter_To_v1beta1_FlowCollectorExporter(a.(*v1beta2.FlowCollectorExporter), b.(*FlowCollectorExporter), scope) }); err != nil { @@ -550,18 +550,11 @@ func autoConvert_v1beta1_FlowCollectorEBPF_To_v1beta2_FlowCollectorEBPF(in *Flow out.LogLevel = in.LogLevel out.Privileged = in.Privileged out.KafkaBatchSize = in.KafkaBatchSize - if err := Convert_v1beta1_DebugConfig_To_v1beta2_DebugAgentConfig(&in.Debug, &out.Debug, s); err != nil { - return err - } + // WARNING: in.Debug requires manual conversion: inconvertible types (./api/v1beta1.DebugConfig vs *github.com/netobserv/network-observability-operator/api/v1beta2.DebugAgentConfig) out.Features = *(*[]v1beta2.AgentFeature)(unsafe.Pointer(&in.Features)) return nil } -// Convert_v1beta1_FlowCollectorEBPF_To_v1beta2_FlowCollectorEBPF is an autogenerated conversion function. -func Convert_v1beta1_FlowCollectorEBPF_To_v1beta2_FlowCollectorEBPF(in *FlowCollectorEBPF, out *v1beta2.FlowCollectorEBPF, s conversion.Scope) error { - return autoConvert_v1beta1_FlowCollectorEBPF_To_v1beta2_FlowCollectorEBPF(in, out, s) -} - func autoConvert_v1beta2_FlowCollectorEBPF_To_v1beta1_FlowCollectorEBPF(in *v1beta2.FlowCollectorEBPF, out *FlowCollectorEBPF, s conversion.Scope) error { out.ImagePullPolicy = in.ImagePullPolicy out.Resources = in.Resources @@ -573,18 +566,11 @@ func autoConvert_v1beta2_FlowCollectorEBPF_To_v1beta1_FlowCollectorEBPF(in *v1be out.LogLevel = in.LogLevel out.Privileged = in.Privileged out.KafkaBatchSize = in.KafkaBatchSize - if err := Convert_v1beta2_DebugAgentConfig_To_v1beta1_DebugConfig(&in.Debug, &out.Debug, s); err != nil { - return err - } + // WARNING: in.Debug requires manual conversion: inconvertible types (*github.com/netobserv/network-observability-operator/api/v1beta2.DebugAgentConfig vs ./api/v1beta1.DebugConfig) out.Features = *(*[]AgentFeature)(unsafe.Pointer(&in.Features)) return nil } -// Convert_v1beta2_FlowCollectorEBPF_To_v1beta1_FlowCollectorEBPF is an autogenerated conversion function. -func Convert_v1beta2_FlowCollectorEBPF_To_v1beta1_FlowCollectorEBPF(in *v1beta2.FlowCollectorEBPF, out *FlowCollectorEBPF, s conversion.Scope) error { - return autoConvert_v1beta2_FlowCollectorEBPF_To_v1beta1_FlowCollectorEBPF(in, out, s) -} - func autoConvert_v1beta1_FlowCollectorExporter_To_v1beta2_FlowCollectorExporter(in *FlowCollectorExporter, out *v1beta2.FlowCollectorExporter, s conversion.Scope) error { out.Type = v1beta2.ExporterType(in.Type) if err := Convert_v1beta1_FlowCollectorKafka_To_v1beta2_FlowCollectorKafka(&in.Kafka, &out.Kafka, s); err != nil { @@ -631,9 +617,7 @@ func autoConvert_v1beta1_FlowCollectorFLP_To_v1beta2_FlowCollectorFLP(in *FlowCo // WARNING: in.ConversationTerminatingTimeout requires manual conversion: does not exist in peer-type out.ClusterName = in.ClusterName out.MultiClusterDeployment = (*bool)(unsafe.Pointer(in.MultiClusterDeployment)) - if err := Convert_v1beta1_DebugConfig_To_v1beta2_DebugProcessorConfig(&in.Debug, &out.Debug, s); err != nil { - return err - } + // WARNING: in.Debug requires manual conversion: inconvertible types (./api/v1beta1.DebugConfig vs *github.com/netobserv/network-observability-operator/api/v1beta2.DebugProcessorConfig) return nil } @@ -656,9 +640,7 @@ func autoConvert_v1beta2_FlowCollectorFLP_To_v1beta1_FlowCollectorFLP(in *v1beta out.LogTypes = (*string)(unsafe.Pointer(in.LogTypes)) out.ClusterName = in.ClusterName out.MultiClusterDeployment = (*bool)(unsafe.Pointer(in.MultiClusterDeployment)) - if err := Convert_v1beta2_DebugProcessorConfig_To_v1beta1_DebugConfig(&in.Debug, &out.Debug, s); err != nil { - return err - } + // WARNING: in.Debug requires manual conversion: inconvertible types (*github.com/netobserv/network-observability-operator/api/v1beta2.DebugProcessorConfig vs ./api/v1beta1.DebugConfig) return nil } diff --git a/api/v1beta2/flowcollector_types.go b/api/v1beta2/flowcollector_types.go index 6eb246b3d..163947393 100644 --- a/api/v1beta2/flowcollector_types.go +++ b/api/v1beta2/flowcollector_types.go @@ -210,7 +210,7 @@ type FlowCollectorEBPF struct { // `excludeInterfaces` contains the interface names that are excluded from flow tracing. // An entry enclosed by slashes, such as `/br-/`, is matched as a regular expression. // Otherwise it is matched as a case-sensitive string. - //+kubebuilder:default=lo; + //+kubebuilder:default:=lo; //+optional ExcludeInterfaces []string `json:"excludeInterfaces"` @@ -236,7 +236,7 @@ type FlowCollectorEBPF struct { // This section is aimed exclusively for debugging and fine-grained performance optimizations, // such as `GOGC` and `GOMAXPROCS` env vars. Users setting its values do it at their own risk. // +optional - Debug DebugAgentConfig `json:"debug,omitempty"` + Debug *DebugAgentConfig `json:"debug,omitempty"` // List of additional features to enable. They are all disabled by default. Enabling additional features might have performance impacts. Possible values are:
// - `PacketDrop`: enable the packets drop flows logging feature. This feature requires mounting @@ -463,7 +463,7 @@ type FlowCollectorFLP struct { // This section is aimed exclusively for debugging and fine-grained performance optimizations, // such as `GOGC` and `GOMAXPROCS` env vars. Users setting its values do it at their own risk. // +optional - Debug DebugProcessorConfig `json:"debug,omitempty"` + Debug *DebugProcessorConfig `json:"debug,omitempty"` } type HPAStatus string @@ -697,7 +697,7 @@ type FlowCollectorConsolePlugin struct { // This section is aimed exclusively for debugging and fine-grained performance optimizations, // such as `GOGC` and `GOMAXPROCS` env vars. Users setting its values do it at their own risk. // +optional - Debug DebugPluginConfig `json:"debug,omitempty"` + Debug *DebugPluginConfig `json:"debug,omitempty"` } // Configuration of the port to service name translation feature of the console plugin @@ -864,64 +864,72 @@ type DebugProcessorConfig struct { //+kubebuilder:validation:Minimum=1025 //+kubebuilder:validation:Maximum=65535 //+kubebuilder:default:=2055 + //+optional // Port of the flow collector (host port). // By convention, some values are forbidden. It must be greater than 1024 and different from // 4500, 4789 and 6081. - Port int32 `json:"port,omitempty"` + Port *int32 `json:"port,omitempty"` //+kubebuilder:validation:Minimum=1 //+kubebuilder:validation:Maximum=65535 //+kubebuilder:default:=8080 + //+optional // `healthPort` is a collector HTTP port in the Pod that exposes the health check API - HealthPort int32 `json:"healthPort,omitempty"` + HealthPort *int32 `json:"healthPort,omitempty"` //+kubebuilder:validation:Minimum=0 //+kubebuilder:validation:Maximum=65535 + //+kubebuilder:default:=6060 //+optional // `profilePort` allows setting up a Go pprof profiler listening to this port - ProfilePort int32 `json:"profilePort,omitempty"` + ProfilePort *int32 `json:"profilePort,omitempty"` //+kubebuilder:default:=true + //+optional // `enableKubeProbes` is a flag to enable or disable Kubernetes liveness and readiness probes EnableKubeProbes *bool `json:"enableKubeProbes,omitempty"` //+kubebuilder:default:=true + //+optional // `dropUnusedFields` allows, when set to `true`, to drop fields that are known to be unused by OVS, to save storage space. DropUnusedFields *bool `json:"dropUnusedFields,omitempty"` //+kubebuilder:default:="30s" - // +optional + //+optional // `conversationHeartbeatInterval` is the time to wait between "tick" events of a conversation ConversationHeartbeatInterval *metav1.Duration `json:"conversationHeartbeatInterval,omitempty"` //+kubebuilder:default:="10s" - // +optional + //+optional // `conversationEndTimeout` is the time to wait after a network flow is received, to consider the conversation ended. // This delay is ignored when a FIN packet is collected for TCP flows (see `conversationTerminatingTimeout` instead). ConversationEndTimeout *metav1.Duration `json:"conversationEndTimeout,omitempty"` //+kubebuilder:default:="5s" - // +optional + //+optional // `conversationTerminatingTimeout` is the time to wait from detected FIN flag to end a conversation. Only relevant for TCP flows. ConversationTerminatingTimeout *metav1.Duration `json:"conversationTerminatingTimeout,omitempty"` - //+kubebuilder:default="1s" + //+kubebuilder:default:="1s" + //+optional // `lokiMinBackoff` is the initial backoff time for loki client connection between retries. LokiMinBackoff *metav1.Duration `json:"lokiMinBackoff,omitempty"` // Warning: keep as pointer, else default is ignored - //+kubebuilder:default="5s" + //+kubebuilder:default:="5s" + //+optional // `lokiMaxBackoff` is the maximum backoff time for loki client connection between retries. LokiMaxBackoff *metav1.Duration `json:"lokiMaxBackoff,omitempty"` // Warning: keep as pointer, else default is ignored //+kubebuilder:validation:Minimum=0 //+kubebuilder:default:=2 + //+optional // `lokiMaxRetries` is the maximum number of retries for loki client connections. LokiMaxRetries *int32 `json:"lokiMaxRetries,omitempty"` //+kubebuilder:default:={"app":"netobserv-flowcollector"} - // +optional + //+optional // `lokiStaticLabels` is a map of common labels to set on each flow in loki storage. - LokiStaticLabels map[string]string `json:"lokiStaticLabels"` + LokiStaticLabels *map[string]string `json:"lokiStaticLabels,omitempty"` } // `DebugConfig` allows tweaking some aspects of the internal configuration of the console plugin. @@ -942,6 +950,7 @@ type DebugPluginConfig struct { Args []string `json:"args,omitempty"` //+kubebuilder:default:=true + //+optional // `register` allows, when set to `true`, to automatically register the provided console plugin with the OpenShift Console operator. // When set to `false`, you can still register it manually by editing console.operator.openshift.io/cluster with the following command: // `oc patch console.operator.openshift.io cluster --type='json' -p '[{"op": "add", "path": "/spec/plugins/-", "value": "netobserv-plugin"}]'` @@ -950,8 +959,9 @@ type DebugPluginConfig struct { //+kubebuilder:validation:Minimum=1 //+kubebuilder:validation:Maximum=65535 //+kubebuilder:default:=9001 + //+optional // `port` is the plugin service port. Do not use 9002, which is reserved for metrics. - Port int32 `json:"port,omitempty"` + Port *int32 `json:"port,omitempty"` } // Add more exporter types below diff --git a/api/v1beta2/zz_generated.deepcopy.go b/api/v1beta2/zz_generated.deepcopy.go index c80b1c02b..5d2b7b9c4 100644 --- a/api/v1beta2/zz_generated.deepcopy.go +++ b/api/v1beta2/zz_generated.deepcopy.go @@ -143,6 +143,11 @@ func (in *DebugPluginConfig) DeepCopyInto(out *DebugPluginConfig) { *out = new(bool) **out = **in } + if in.Port != nil { + in, out := &in.Port, &out.Port + *out = new(int32) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DebugPluginConfig. @@ -165,6 +170,21 @@ func (in *DebugProcessorConfig) DeepCopyInto(out *DebugProcessorConfig) { (*out)[key] = val } } + if in.Port != nil { + in, out := &in.Port, &out.Port + *out = new(int32) + **out = **in + } + if in.HealthPort != nil { + in, out := &in.HealthPort, &out.HealthPort + *out = new(int32) + **out = **in + } + if in.ProfilePort != nil { + in, out := &in.ProfilePort, &out.ProfilePort + *out = new(int32) + **out = **in + } if in.EnableKubeProbes != nil { in, out := &in.EnableKubeProbes, &out.EnableKubeProbes *out = new(bool) @@ -207,9 +227,13 @@ func (in *DebugProcessorConfig) DeepCopyInto(out *DebugProcessorConfig) { } if in.LokiStaticLabels != nil { in, out := &in.LokiStaticLabels, &out.LokiStaticLabels - *out = make(map[string]string, len(*in)) - for key, val := range *in { - (*out)[key] = val + *out = new(map[string]string) + if **in != nil { + in, out := *in, *out + *out = make(map[string]string, len(*in)) + for key, val := range *in { + (*out)[key] = val + } } } } @@ -336,7 +360,11 @@ func (in *FlowCollectorConsolePlugin) DeepCopyInto(out *FlowCollectorConsolePlug (*in)[i].DeepCopyInto(&(*out)[i]) } } - in.Debug.DeepCopyInto(&out.Debug) + if in.Debug != nil { + in, out := &in.Debug, &out.Debug + *out = new(DebugPluginConfig) + (*in).DeepCopyInto(*out) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new FlowCollectorConsolePlugin. @@ -368,7 +396,11 @@ func (in *FlowCollectorEBPF) DeepCopyInto(out *FlowCollectorEBPF) { *out = make([]string, len(*in)) copy(*out, *in) } - in.Debug.DeepCopyInto(&out.Debug) + if in.Debug != nil { + in, out := &in.Debug, &out.Debug + *out = new(DebugAgentConfig) + (*in).DeepCopyInto(*out) + } if in.Features != nil { in, out := &in.Features, &out.Features *out = make([]AgentFeature, len(*in)) @@ -434,7 +466,11 @@ func (in *FlowCollectorFLP) DeepCopyInto(out *FlowCollectorFLP) { *out = new(bool) **out = **in } - in.Debug.DeepCopyInto(&out.Debug) + if in.Debug != nil { + in, out := &in.Debug, &out.Debug + *out = new(DebugProcessorConfig) + (*in).DeepCopyInto(*out) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new FlowCollectorFLP. diff --git a/bundle/manifests/flows.netobserv.io_flowcollectors.yaml b/bundle/manifests/flows.netobserv.io_flowcollectors.yaml index e13a2c143..90417d477 100644 --- a/bundle/manifests/flows.netobserv.io_flowcollectors.yaml +++ b/bundle/manifests/flows.netobserv.io_flowcollectors.yaml @@ -7143,6 +7143,7 @@ spec: minimum: 1025 type: integer profilePort: + default: 6060 description: '`profilePort` allows setting up a Go pprof profiler listening to this port' format: int32 diff --git a/config/crd/bases/flows.netobserv.io_flowcollectors.yaml b/config/crd/bases/flows.netobserv.io_flowcollectors.yaml index 2f9c4988c..7476d7348 100644 --- a/config/crd/bases/flows.netobserv.io_flowcollectors.yaml +++ b/config/crd/bases/flows.netobserv.io_flowcollectors.yaml @@ -7129,6 +7129,7 @@ spec: minimum: 1025 type: integer profilePort: + default: 6060 description: '`profilePort` allows setting up a Go pprof profiler listening to this port' format: int32 diff --git a/controllers/consoleplugin/consoleplugin_objects.go b/controllers/consoleplugin/consoleplugin_objects.go index 856a74e15..06a65dc6e 100644 --- a/controllers/consoleplugin/consoleplugin_objects.go +++ b/controllers/consoleplugin/consoleplugin_objects.go @@ -64,6 +64,7 @@ func newBuilder(ns, imageName string, desired *flowslatest.FlowCollectorSpec, lo } func (b *builder) consolePlugin() *osv1alpha1.ConsolePlugin { + debugConfig := helper.GetDebugPluginConfig(b.desired.ConsolePlugin.Debug) return &osv1alpha1.ConsolePlugin{ ObjectMeta: metav1.ObjectMeta{ Name: constants.PluginName, @@ -73,7 +74,7 @@ func (b *builder) consolePlugin() *osv1alpha1.ConsolePlugin { Service: osv1alpha1.ConsolePluginService{ Name: constants.PluginName, Namespace: b.namespace, - Port: b.desired.ConsolePlugin.Debug.Port, + Port: *debugConfig.Port, BasePath: "/", }, Proxy: []osv1alpha1.ConsolePluginProxy{{ @@ -83,7 +84,7 @@ func (b *builder) consolePlugin() *osv1alpha1.ConsolePlugin { Service: osv1alpha1.ConsolePluginProxyServiceConfig{ Name: constants.PluginName, Namespace: b.namespace, - Port: b.desired.ConsolePlugin.Debug.Port, + Port: *debugConfig.Port, }, }}, }, @@ -255,6 +256,7 @@ func (b *builder) autoScaler() *ascv2.HorizontalPodAutoscaler { } func (b *builder) mainService() *corev1.Service { + debugConfig := helper.GetDebugPluginConfig(b.desired.ConsolePlugin.Debug) return &corev1.Service{ ObjectMeta: metav1.ObjectMeta{ Name: constants.PluginName, @@ -267,12 +269,12 @@ func (b *builder) mainService() *corev1.Service { Spec: corev1.ServiceSpec{ Selector: b.selector, Ports: []corev1.ServicePort{{ - Port: b.desired.ConsolePlugin.Debug.Port, + Port: *debugConfig.Port, Protocol: corev1.ProtocolTCP, // Some Kubernetes versions might automatically set TargetPort to Port. We need to // explicitly set it here so the reconcile loop verifies that the owned service // is equal as the desired service - TargetPort: intstr.FromInt(int(b.desired.ConsolePlugin.Debug.Port)), + TargetPort: intstr.FromInt(int(*debugConfig.Port)), }}, }, } diff --git a/controllers/consoleplugin/consoleplugin_reconciler.go b/controllers/consoleplugin/consoleplugin_reconciler.go index 80eaca467..3384a80b7 100644 --- a/controllers/consoleplugin/consoleplugin_reconciler.go +++ b/controllers/consoleplugin/consoleplugin_reconciler.go @@ -119,7 +119,8 @@ func (r *CPReconciler) Reconcile(ctx context.Context, desired *flowslatest.FlowC func (r *CPReconciler) checkAutoPatch(ctx context.Context, desired *flowslatest.FlowCollector) error { console := operatorsv1.Console{} - reg := helper.UseConsolePlugin(&desired.Spec) && helper.PtrBool(desired.Spec.ConsolePlugin.Debug.Register) + debugConfig := helper.GetDebugPluginConfig(desired.Spec.ConsolePlugin.Debug) + reg := helper.UseConsolePlugin(&desired.Spec) && *debugConfig.Register if err := r.Client.Get(ctx, types.NamespacedName{Name: "cluster"}, &console); err != nil { // Console operator CR not found => warn but continue execution if reg { @@ -246,5 +247,6 @@ func (r *CPReconciler) reconcileHPA(ctx context.Context, builder *builder, desir } func pluginNeedsUpdate(plg *osv1alpha1.ConsolePlugin, desired *pluginSpec) bool { - return plg.Spec.Service.Port != desired.Debug.Port + debugConfig := helper.GetDebugPluginConfig(desired.Debug) + return plg.Spec.Service.Port != *debugConfig.Port } diff --git a/controllers/ebpf/agent_controller.go b/controllers/ebpf/agent_controller.go index f663a3731..9f0a02251 100644 --- a/controllers/ebpf/agent_controller.go +++ b/controllers/ebpf/agent_controller.go @@ -322,6 +322,7 @@ func (c *AgentController) envConfig(ctx context.Context, coll *flowslatest.FlowC } } else { config = append(config, corev1.EnvVar{Name: envExport, Value: exportGRPC}) + debugConfig := helper.GetDebugProcessorConfig(coll.Spec.Processor.Debug) // When flowlogs-pipeline is deployed as a daemonset, each agent must send // data to the pod that is deployed in the same host config = append(config, corev1.EnvVar{ @@ -334,7 +335,7 @@ func (c *AgentController) envConfig(ctx context.Context, coll *flowslatest.FlowC }, }, corev1.EnvVar{ Name: envFlowsTargetPort, - Value: strconv.Itoa(int(coll.Spec.Processor.Debug.Port)), + Value: strconv.Itoa(int(*debugConfig.Port)), }) } return config, nil @@ -462,7 +463,8 @@ func (c *AgentController) setEnvConfig(coll *flowslatest.FlowCollector) []corev1 dedupJustMark := dedupeJustMarkDefault // we need to sort env map to keep idempotency, // as equal maps could be iterated in different order - for _, pair := range helper.KeySorted(coll.Spec.Agent.EBPF.Debug.Env) { + debugConfig := helper.GetDebugAgentConfig(coll.Spec.Agent.EBPF.Debug) + for _, pair := range helper.KeySorted(debugConfig.Env) { k, v := pair[0], pair[1] if k == envDedupe { dedup = v @@ -472,6 +474,7 @@ func (c *AgentController) setEnvConfig(coll *flowslatest.FlowCollector) []corev1 config = append(config, corev1.EnvVar{Name: k, Value: v}) } } + config = append(config, corev1.EnvVar{Name: envDedupe, Value: dedup}) config = append(config, corev1.EnvVar{Name: envDedupeJustMark, Value: dedupJustMark}) diff --git a/controllers/flowcollector_controller_console_test.go b/controllers/flowcollector_controller_console_test.go index 0f1ab0f95..26cd769cf 100644 --- a/controllers/flowcollector_controller_console_test.go +++ b/controllers/flowcollector_controller_console_test.go @@ -143,7 +143,9 @@ func flowCollectorConsolePluginSpecs() { if err := k8sClient.Get(ctx, crKey, &fc); err != nil { return err } - fc.Spec.ConsolePlugin.Debug.Port = 9099 + fc.Spec.ConsolePlugin.Debug = &flowslatest.DebugPluginConfig{ + Port: ptr.To(int32(9099)), + } fc.Spec.ConsolePlugin.Replicas = ptr.To(int32(2)) fc.Spec.ConsolePlugin.Autoscaler.Status = flowslatest.HPAStatusDisabled return k8sClient.Update(ctx, &fc) diff --git a/controllers/flowcollector_controller_ebpf_test.go b/controllers/flowcollector_controller_ebpf_test.go index 9cd8ae767..3fffb22bb 100644 --- a/controllers/flowcollector_controller_ebpf_test.go +++ b/controllers/flowcollector_controller_ebpf_test.go @@ -58,8 +58,8 @@ func flowCollectorEBPFSpecs() { Processor: flowslatest.FlowCollectorFLP{ ImagePullPolicy: "Never", LogLevel: "error", - Debug: flowslatest.DebugProcessorConfig{ - Port: 9999, + Debug: &flowslatest.DebugProcessorConfig{ + Port: ptr.To(int32(9999)), }, }, Agent: flowslatest.FlowCollectorAgent{ @@ -71,7 +71,7 @@ func flowCollectorEBPFSpecs() { Interfaces: []string{"veth0", "/^br-/"}, ExcludeInterfaces: []string{"br-3", "lo"}, LogLevel: "trace", - Debug: flowslatest.DebugAgentConfig{ + Debug: &flowslatest.DebugAgentConfig{ Env: map[string]string{"GOGC": "400", "BUFFERS_LENGTH": "100"}, }, }, diff --git a/controllers/flowcollector_controller_iso_test.go b/controllers/flowcollector_controller_iso_test.go index f1261d31f..df8ab5f27 100644 --- a/controllers/flowcollector_controller_iso_test.go +++ b/controllers/flowcollector_controller_iso_test.go @@ -70,10 +70,10 @@ func flowCollectorIsoSpecs() { LokiBatchSize: 100, MultiClusterDeployment: ptr.To(true), ClusterName: "testCluster", - Debug: flowslatest.DebugProcessorConfig{ - Port: 12345, - HealthPort: 12346, - ProfilePort: 0, + Debug: &flowslatest.DebugProcessorConfig{ + Port: ptr.To(int32(12345)), + HealthPort: ptr.To(int32(12346)), + ProfilePort: ptr.To(int32(12347)), ConversationHeartbeatInterval: &metav1.Duration{Duration: time.Second}, ConversationEndTimeout: &metav1.Duration{Duration: time.Second}, ConversationTerminatingTimeout: &metav1.Duration{Duration: time.Second}, @@ -82,7 +82,7 @@ func flowCollectorIsoSpecs() { LokiMinBackoff: &metav1.Duration{Duration: time.Second}, LokiMaxBackoff: &metav1.Duration{Duration: time.Second}, LokiMaxRetries: &zero, - LokiStaticLabels: map[string]string{}, + LokiStaticLabels: &map[string]string{}, }, LogTypes: &outputRecordTypes, Metrics: flowslatest.FLPMetrics{ @@ -117,7 +117,7 @@ func flowCollectorIsoSpecs() { CacheActiveTimeout: "5s", CacheMaxFlows: 100, ImagePullPolicy: "Always", - Debug: flowslatest.DebugAgentConfig{}, + Debug: &flowslatest.DebugAgentConfig{}, LogLevel: "trace", Resources: v1.ResourceRequirements{Limits: nil, Requests: nil}, Interfaces: []string{}, @@ -131,9 +131,9 @@ func flowCollectorIsoSpecs() { Enable: ptr.To(true), Replicas: &zero, ImagePullPolicy: "Always", - Debug: flowslatest.DebugPluginConfig{ + Debug: &flowslatest.DebugPluginConfig{ Register: ptr.To(false), - Port: 12345, + Port: ptr.To(int32(12345)), }, Resources: v1.ResourceRequirements{Limits: nil, Requests: nil}, LogLevel: "trace", diff --git a/controllers/flowcollector_controller_test.go b/controllers/flowcollector_controller_test.go index 0d835260c..d2c4527d5 100644 --- a/controllers/flowcollector_controller_test.go +++ b/controllers/flowcollector_controller_test.go @@ -1,6 +1,9 @@ package controllers import ( + "log" + "os" + "path/filepath" "strings" "time" @@ -8,6 +11,7 @@ import ( . "github.com/onsi/gomega" appsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" + rbacv1 "k8s.io/api/rbac/v1" kerr "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -18,6 +22,7 @@ import ( flowslatest "github.com/netobserv/network-observability-operator/api/v1beta2" "github.com/netobserv/network-observability-operator/controllers/constants" . "github.com/netobserv/network-observability-operator/controllers/controllerstest" + "github.com/netobserv/network-observability-operator/pkg/helper" "github.com/netobserv/network-observability-operator/pkg/test" ) @@ -37,6 +42,18 @@ var updateCR = func(key types.NamespacedName, updater func(*flowslatest.FlowColl // nolint:cyclop func flowCollectorControllerSpecs() { + crdPath, err := filepath.Abs("../config/crd/bases/flows.netobserv.io_flowcollectors.yaml") + if err != nil { + log.Fatalf("can't read CRD path %v", err) + } + crdBytes, err := os.ReadFile(crdPath) + if err != nil { + log.Fatalf("can't read CRD file %v", err) + } + err = helper.ParseCRD(crdBytes) + if err != nil { + log.Fatalf("can't parse CRD %v", err) + } const operatorNamespace = "main-namespace" const otherNamespace = "other-namespace" @@ -80,11 +97,11 @@ func flowCollectorControllerSpecs() { Processor: flowslatest.FlowCollectorFLP{ ImagePullPolicy: "Never", LogLevel: "error", - Debug: flowslatest.DebugProcessorConfig{ + Debug: &flowslatest.DebugProcessorConfig{ Env: map[string]string{ "GOGC": "200", }, - Port: 9999, + Port: ptr.To(int32(9999)), ConversationHeartbeatInterval: &metav1.Duration{ Duration: conntrackHeartbeatInterval, }, @@ -152,13 +169,13 @@ func flowCollectorControllerSpecs() { fc.Spec.Processor = flowslatest.FlowCollectorFLP{ ImagePullPolicy: "Never", LogLevel: "error", - Debug: flowslatest.DebugProcessorConfig{ + Debug: &flowslatest.DebugProcessorConfig{ Env: map[string]string{ // we'll test that env vars are sorted, to keep idempotency "GOMAXPROCS": "33", "GOGC": "400", }, - Port: 7891, + Port: ptr.To(int32(7891)), ConversationHeartbeatInterval: &metav1.Duration{ Duration: conntrackHeartbeatInterval, }, @@ -236,7 +253,7 @@ func flowCollectorControllerSpecs() { Context("Changing namespace", func() { It("Should update namespace successfully", func() { updateCR(crKey, func(fc *flowslatest.FlowCollector) { - fc.Spec.Processor.Debug.Port = 9999 + fc.Spec.Processor.Debug.Port = ptr.To(int32(9999)) fc.Spec.Namespace = otherNamespace }) }) diff --git a/controllers/flp/flp_common_objects.go b/controllers/flp/flp_common_objects.go index ccaf7c85d..20a5637d0 100644 --- a/controllers/flp/flp_common_objects.go +++ b/controllers/flp/flp_common_objects.go @@ -114,14 +114,14 @@ func (b *builder) Pipeline() *PipelineBuilder { return b.pipeline } func (b *builder) NewIPFIXPipeline() PipelineBuilder { return b.initPipeline(config.NewCollectorPipeline("ipfix", api.IngestCollector{ - Port: int(b.desired.Processor.Debug.Port), + Port: int(*helper.GetDebugProcessorConfig(b.desired.Processor.Debug).Port), HostName: "0.0.0.0", })) } func (b *builder) NewGRPCPipeline() PipelineBuilder { return b.initPipeline(config.NewGRPCPipeline("grpc", api.IngestGRPCProto{ - Port: int(b.desired.Processor.Debug.Port), + Port: int(*helper.GetDebugProcessorConfig(b.desired.Processor.Debug).Port), })) } @@ -156,13 +156,14 @@ func (b *builder) portProtocol() corev1.Protocol { } func (b *builder) podTemplate(hasHostPort, hostNetwork bool, annotations map[string]string) corev1.PodTemplateSpec { + debugConfig := helper.GetDebugProcessorConfig(b.desired.Processor.Debug) var ports []corev1.ContainerPort var tolerations []corev1.Toleration if hasHostPort { ports = []corev1.ContainerPort{{ Name: constants.FLPPortName, - HostPort: b.desired.Processor.Debug.Port, - ContainerPort: b.desired.Processor.Debug.Port, + HostPort: *debugConfig.Port, + ContainerPort: *debugConfig.Port, Protocol: b.portProtocol(), }} // This allows deploying an instance in the master node, the same technique used in the @@ -172,7 +173,7 @@ func (b *builder) podTemplate(hasHostPort, hostNetwork bool, annotations map[str ports = append(ports, corev1.ContainerPort{ Name: healthServiceName, - ContainerPort: b.desired.Processor.Debug.HealthPort, + ContainerPort: *debugConfig.HealthPort, }) ports = append(ports, corev1.ContainerPort{ @@ -180,10 +181,10 @@ func (b *builder) podTemplate(hasHostPort, hostNetwork bool, annotations map[str ContainerPort: b.desired.Processor.Metrics.Server.Port, }) - if b.desired.Processor.Debug.ProfilePort > 0 { + if debugConfig.ProfilePort != nil { ports = append(ports, corev1.ContainerPort{ Name: profilePortName, - ContainerPort: b.desired.Processor.Debug.ProfilePort, + ContainerPort: *debugConfig.ProfilePort, Protocol: corev1.ProtocolTCP, }) } @@ -204,9 +205,10 @@ func (b *builder) podTemplate(hasHostPort, hostNetwork bool, annotations map[str }}) var envs []corev1.EnvVar + debugConfig = helper.GetDebugProcessorConfig(b.desired.Processor.Debug) // we need to sort env map to keep idempotency, // as equal maps could be iterated in different order - for _, pair := range helper.KeySorted(b.desired.Processor.Debug.Env) { + for _, pair := range helper.KeySorted(debugConfig.Env) { envs = append(envs, corev1.EnvVar{Name: pair[0], Value: pair[1]}) } envs = append(envs, constants.EnvNoHTTP2) @@ -221,7 +223,7 @@ func (b *builder) podTemplate(hasHostPort, hostNetwork bool, annotations map[str Ports: ports, Env: envs, } - if helper.PtrBool(b.desired.Processor.Debug.EnableKubeProbes) { + if *debugConfig.EnableKubeProbes { container.LivenessProbe = &corev1.Probe{ ProbeHandler: corev1.ProbeHandler{ HTTPGet: &corev1.HTTPGetAction{ @@ -306,18 +308,19 @@ func (b *builder) GetJSONConfig() (string, error) { } } } + debugConfig := helper.GetDebugProcessorConfig(b.desired.Processor.Debug) config := map[string]interface{}{ "log-level": b.desired.Processor.LogLevel, "health": map[string]interface{}{ - "port": b.desired.Processor.Debug.HealthPort, + "port": *debugConfig.HealthPort, }, "pipeline": b.pipeline.GetStages(), "parameters": b.pipeline.GetStageParams(), "metricsSettings": metricsSettings, } - if b.desired.Processor.Debug.ProfilePort > 0 { + if debugConfig.ProfilePort != nil { config["profile"] = map[string]interface{}{ - "port": b.desired.Processor.Debug.ProfilePort, + "port": *debugConfig.ProfilePort, } } diff --git a/controllers/flp/flp_controller_test.go b/controllers/flp/flp_controller_test.go index 6e20afb33..782f98b01 100644 --- a/controllers/flp/flp_controller_test.go +++ b/controllers/flp/flp_controller_test.go @@ -2,6 +2,7 @@ package flp import ( "fmt" + "time" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" @@ -23,8 +24,11 @@ import ( ) const ( - timeout = test.Timeout - interval = test.Interval + timeout = test.Timeout + interval = test.Interval + conntrackEndTimeout = 10 * time.Second + conntrackTerminatingTimeout = 5 * time.Second + conntrackHeartbeatInterval = 30 * time.Second ) var ( @@ -85,7 +89,7 @@ func ControllerSpecs() { Processor: flowslatest.FlowCollectorFLP{ ImagePullPolicy: "Never", LogLevel: "error", - Debug: flowslatest.DebugProcessorConfig{ + Debug: &flowslatest.DebugProcessorConfig{ Env: map[string]string{ "GOGC": "200", }, @@ -175,13 +179,13 @@ func ControllerSpecs() { fc.Spec.Processor = flowslatest.FlowCollectorFLP{ ImagePullPolicy: "Never", LogLevel: "error", - Debug: flowslatest.DebugProcessorConfig{ + Debug: &flowslatest.DebugProcessorConfig{ Env: map[string]string{ // we'll test that env vars are sorted, to keep idempotency "GOMAXPROCS": "33", "GOGC": "400", }, - Port: 7891, + Port: ptr.To(int32(7891)), ConversationHeartbeatInterval: &metav1.Duration{ Duration: conntrackHeartbeatInterval, }, @@ -724,7 +728,7 @@ func ControllerSpecs() { Context("Changing namespace", func() { It("Should update namespace successfully", func() { updateCR(crKey, func(fc *flowslatest.FlowCollector) { - fc.Spec.Processor.Debug.Port = 9999 + fc.Spec.Processor.Debug.Port = ptr.To(int32(9999)) fc.Spec.Namespace = otherNamespace }) }) diff --git a/controllers/flp/flp_pipeline_builder.go b/controllers/flp/flp_pipeline_builder.go index 51b35fff3..f9caecbc5 100644 --- a/controllers/flp/flp_pipeline_builder.go +++ b/controllers/flp/flp_pipeline_builder.go @@ -2,7 +2,6 @@ package flp import ( "fmt" - "time" "github.com/netobserv/flowlogs-pipeline/pkg/api" "github.com/netobserv/flowlogs-pipeline/pkg/config" @@ -18,10 +17,7 @@ import ( ) const ( - conntrackTerminatingTimeout = 5 * time.Second - conntrackEndTimeout = 10 * time.Second - conntrackHeartbeatInterval = 30 * time.Second - clusterNameLabelName = "K8S_ClusterName" + clusterNameLabelName = "K8S_ClusterName" ) type PipelineBuilder struct { @@ -83,14 +79,15 @@ func (b *PipelineBuilder) AddProcessorStages() error { }) // loki stage (write) configuration + debugConfig := helper.GetDebugProcessorConfig(b.desired.Processor.Debug) if helper.UseLoki(b.desired) { lokiWrite := api.WriteLoki{ Labels: indexFields, BatchSize: int(b.desired.Processor.LokiBatchSize), BatchWait: helper.UnstructuredDuration(b.desired.Processor.LokiBatchWait), - MaxBackoff: helper.UnstructuredDuration(b.desired.Processor.Debug.LokiMaxBackoff), - MaxRetries: int(helper.PtrInt32(b.desired.Processor.Debug.LokiMaxRetries)), - MinBackoff: helper.UnstructuredDuration(b.desired.Processor.Debug.LokiMinBackoff), + MaxBackoff: helper.UnstructuredDuration(debugConfig.LokiMaxBackoff), + MaxRetries: int(helper.PtrInt32(debugConfig.LokiMaxRetries)), + MinBackoff: helper.UnstructuredDuration(debugConfig.LokiMinBackoff), StaticLabels: model.LabelSet{}, Timeout: helper.UnstructuredDuration(b.desired.Processor.LokiTimeout), URL: b.loki.IngesterURL, @@ -99,7 +96,7 @@ func (b *PipelineBuilder) AddProcessorStages() error { TenantID: b.loki.TenantID, } - for k, v := range b.desired.Processor.Debug.LokiStaticLabels { + for k, v := range *debugConfig.LokiStaticLabels { lokiWrite.StaticLabels[model.LabelName(k)] = model.LabelValue(v) } @@ -268,22 +265,7 @@ func (b *PipelineBuilder) addConnectionTracking(indexFields []string, lastStage if b.desired.Processor.LogTypes != nil && *b.desired.Processor.LogTypes != flowslatest.LogTypeFlows { indexFields = append(indexFields, constants.LokiConnectionIndexFields...) outputRecordTypes := helper.GetRecordTypes(&b.desired.Processor) - - terminatingTimeout := conntrackTerminatingTimeout - if b.desired.Processor.Debug.ConversationTerminatingTimeout != nil { - terminatingTimeout = b.desired.Processor.Debug.ConversationTerminatingTimeout.Duration - } - - endTimeout := conntrackEndTimeout - if b.desired.Processor.Debug.ConversationEndTimeout != nil { - endTimeout = b.desired.Processor.Debug.ConversationEndTimeout.Duration - } - - heartbeatInterval := conntrackHeartbeatInterval - if b.desired.Processor.Debug.ConversationHeartbeatInterval != nil { - heartbeatInterval = b.desired.Processor.Debug.ConversationHeartbeatInterval.Duration - } - + debugConfig := helper.GetDebugProcessorConfig(b.desired.Processor.Debug) lastStage = lastStage.ConnTrack("extract_conntrack", api.ConnTrack{ KeyDefinition: api.KeyDefinition{ FieldGroups: []api.FieldGroup{ @@ -304,9 +286,9 @@ func (b *PipelineBuilder) addConnectionTracking(indexFields []string, lastStage Scheduling: []api.ConnTrackSchedulingGroup{ { Selector: nil, // Default group. Match all flowlogs - HeartbeatInterval: api.Duration{Duration: heartbeatInterval}, - EndConnectionTimeout: api.Duration{Duration: endTimeout}, - TerminatingTimeout: api.Duration{Duration: terminatingTimeout}, + HeartbeatInterval: api.Duration{Duration: debugConfig.ConversationHeartbeatInterval.Duration}, + EndConnectionTimeout: api.Duration{Duration: debugConfig.ConversationEndTimeout.Duration}, + TerminatingTimeout: api.Duration{Duration: debugConfig.ConversationTerminatingTimeout.Duration}, }, }, TCPFlags: api.ConnTrackTCPFlags{ @@ -342,7 +324,7 @@ func (b *PipelineBuilder) addTransformFilter(lastStage config.PipelineBuilderSta } // Filter-out unused fields? - if helper.PtrBool(b.desired.Processor.Debug.DropUnusedFields) { + if *helper.GetDebugProcessorConfig(b.desired.Processor.Debug).DropUnusedFields { if helper.UseIPFIX(b.desired) { rules := filters.GetOVSGoflowUnusedRules() transformFilterRules = append(transformFilterRules, rules...) diff --git a/controllers/flp/flp_test.go b/controllers/flp/flp_test.go index 734cec74c..201b7fba3 100644 --- a/controllers/flp/flp_test.go +++ b/controllers/flp/flp_test.go @@ -93,9 +93,9 @@ func getConfig(lokiMode ...string) flowslatest.FlowCollectorSpec { }}, }, LogTypes: &outputRecordTypes, - Debug: flowslatest.DebugProcessorConfig{ - Port: 2055, - HealthPort: 8080, + Debug: &flowslatest.DebugProcessorConfig{ + Port: ptr.To(int32(2055)), + HealthPort: ptr.To(int32(8080)), LokiMinBackoff: &metav1.Duration{ Duration: 1, }, @@ -103,7 +103,7 @@ func getConfig(lokiMode ...string) flowslatest.FlowCollectorSpec { Duration: 300, }, LokiMaxRetries: ptr.To(int32(10)), - LokiStaticLabels: map[string]string{"app": "netobserv-flowcollector"}, + LokiStaticLabels: &map[string]string{"app": "netobserv-flowcollector"}, }, }, Loki: getLoki(lokiMode...), @@ -638,7 +638,7 @@ func TestConfigMapShouldDeserializeAsJSONWithLokiManual(t *testing.T) { params := decoded.Parameters assert.Len(params, 6) - assert.Equal(cfg.Processor.Debug.Port, int32(params[0].Ingest.Collector.Port)) + assert.Equal(*cfg.Processor.Debug.Port, int32(params[0].Ingest.Collector.Port)) lokiCfg := params[3].Write.Loki assert.Equal(loki.Manual.IngesterURL, lokiCfg.URL) @@ -687,7 +687,7 @@ func TestConfigMapShouldDeserializeAsJSONWithLokiStack(t *testing.T) { params := decoded.Parameters assert.Len(params, 6) - assert.Equal(cfg.Processor.Debug.Port, int32(params[0].Ingest.Collector.Port)) + assert.Equal(*cfg.Processor.Debug.Port, int32(params[0].Ingest.Collector.Port)) lokiCfg := params[3].Write.Loki assert.Equal("https://lokistack-gateway-http.ls-namespace.svc:8080/api/logs/v1/network/", lokiCfg.URL) diff --git a/controllers/ovs/flowsconfig_cno_reconciler.go b/controllers/ovs/flowsconfig_cno_reconciler.go index 8158ef009..1d59b0198 100644 --- a/controllers/ovs/flowsconfig_cno_reconciler.go +++ b/controllers/ovs/flowsconfig_cno_reconciler.go @@ -106,7 +106,7 @@ func (c *FlowsConfigCNOController) desired( return &flowsConfig{ FlowCollectorIPFIX: *corrected, - NodePort: coll.Spec.Processor.Debug.Port, + NodePort: *helper.GetDebugProcessorConfig(coll.Spec.Processor.Debug).Port, } } diff --git a/controllers/ovs/flowsconfig_ovnk_reconciler.go b/controllers/ovs/flowsconfig_ovnk_reconciler.go index 8a9994f8d..a3b86ce5d 100644 --- a/controllers/ovs/flowsconfig_ovnk_reconciler.go +++ b/controllers/ovs/flowsconfig_ovnk_reconciler.go @@ -98,7 +98,7 @@ func (c *FlowsConfigOVNKController) desiredEnv(ctx context.Context, coll *flowsl return envs, nil } - envs["OVN_IPFIX_TARGETS"] = fmt.Sprintf(":%d", coll.Spec.Processor.Debug.Port) + envs["OVN_IPFIX_TARGETS"] = fmt.Sprintf(":%d", *helper.GetDebugProcessorConfig(coll.Spec.Processor.Debug).Port) return envs, nil } diff --git a/docs/FlowCollector.md b/docs/FlowCollector.md index 5c40ed7c5..48aaca61d 100644 --- a/docs/FlowCollector.md +++ b/docs/FlowCollector.md @@ -12821,6 +12821,7 @@ TLS client configuration for Loki URL. `profilePort` allows setting up a Go pprof profiler listening to this port

Format: int32
+ Default: 6060
Minimum: 0
Maximum: 65535
diff --git a/hack/cloned.flows.netobserv.io_flowcollectors.yaml b/hack/cloned.flows.netobserv.io_flowcollectors.yaml index 30a84ae59..98dfadf74 100644 --- a/hack/cloned.flows.netobserv.io_flowcollectors.yaml +++ b/hack/cloned.flows.netobserv.io_flowcollectors.yaml @@ -4958,6 +4958,7 @@ spec: minimum: 1025 type: integer profilePort: + default: 6060 description: '`profilePort` allows setting up a Go pprof profiler listening to this port' format: int32 maximum: 65535 diff --git a/main.go b/main.go index ffae60c02..f2c8dc9d3 100644 --- a/main.go +++ b/main.go @@ -19,12 +19,14 @@ package main import ( "context" "crypto/tls" + _ "embed" "flag" "fmt" _ "net/http/pprof" "os" "go.uber.org/zap/zapcore" + // Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.) // to ensure that exec-entrypoint and run can make use of them. osv1alpha1 "github.com/openshift/api/console/v1alpha1" @@ -34,9 +36,11 @@ import ( ascv2 "k8s.io/api/autoscaling/v2" corev1 "k8s.io/api/core/v1" apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" + "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" clientgoscheme "k8s.io/client-go/kubernetes/scheme" + _ "k8s.io/client-go/plugin/pkg/client/auth" apiregv1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1" ctrl "sigs.k8s.io/controller-runtime" @@ -51,6 +55,7 @@ import ( flowsv1beta2 "github.com/netobserv/network-observability-operator/api/v1beta2" "github.com/netobserv/network-observability-operator/controllers" "github.com/netobserv/network-observability-operator/controllers/constants" + "github.com/netobserv/network-observability-operator/pkg/helper" "github.com/netobserv/network-observability-operator/pkg/manager" //+kubebuilder:scaffold:imports ) @@ -64,6 +69,9 @@ var ( setupLog = ctrl.Log.WithName("setup") ) +//go:embed config/crd/bases/flows.netobserv.io_flowcollectors.yaml +var crdBytes []byte + func init() { utilruntime.Must(clientgoscheme.AddToScheme(scheme)) utilruntime.Must(flowsv1alpha1.AddToScheme(scheme)) @@ -123,6 +131,11 @@ func main() { os.Exit(1) } + if err := helper.ParseCRD(crdBytes); err != nil { + setupLog.Error(err, "unable to parse CRD") + os.Exit(1) + } + disableHTTP2 := func(c *tls.Config) { if enableHTTP2 { return diff --git a/pkg/helper/crd.go b/pkg/helper/crd.go new file mode 100644 index 000000000..0006cf0b7 --- /dev/null +++ b/pkg/helper/crd.go @@ -0,0 +1,180 @@ +package helper + +import ( + "encoding/json" + "fmt" + "log" + "regexp" + "strconv" + + apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" + apiextensionsv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1" + + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/serializer" +) + +var ( + crd *apiextensionsv1.CustomResourceDefinition + quoteRegex = regexp.MustCompile(`^"(.*)"$`) + AgentDebugPath = []string{"spec", "agent", "debug"} + ProcessorDebugPath = []string{"spec", "processor", "debug"} + PluginDebugPath = []string{"spec", "consolePlugin", "debug"} +) + +func ParseCRD(bytes []byte) error { + crdScheme := runtime.NewScheme() + err := apiextensionsv1.AddToScheme(crdScheme) + if err != nil { + return err + } + err = apiextensionsv1beta1.AddToScheme(crdScheme) + if err != nil { + return err + } + crdCodecFactory := serializer.NewCodecFactory(crdScheme) + crdDeserializer := crdCodecFactory.UniversalDeserializer() + crdObj, _, err := crdDeserializer.Decode(bytes, nil, &apiextensionsv1.CustomResourceDefinition{}) + if err != nil { + return err + } + crd = crdObj.(*apiextensionsv1.CustomResourceDefinition) + + return nil +} + +func SetCRD(v *apiextensionsv1.CustomResourceDefinition) { + crd = v +} + +func GetDebugDurationValue(path []string, field string, value *v1.Duration) *v1.Duration { + if value != nil && !IsDefaultValue(path, field, value.Duration.String()) { + return value + } + return nil +} + +func GetDebugBoolValue(path []string, field string, value *bool) *bool { + if value != nil && !IsDefaultValue(path, field, *value) { + return value + } + return nil +} + +func GetDebugInt32Value(path []string, field string, value *int32) *int32 { + if value != nil && !IsDefaultValue(path, field, *value) { + return value + } + return nil +} + +func GetDebugMapValue(path []string, field string, value *map[string]string) *map[string]string { + bytes, _ := json.Marshal(value) + if !IsDefaultValue(path, field, string(bytes)) { + return value + } + return nil +} + +func IsDefaultValue(path []string, field string, value interface{}) bool { + defaultValueStr := GetFieldDefaultString(path, field) + switch value.(type) { + case string: + return value == defaultValueStr + case bool: + return fmt.Sprintf("%t", value) == defaultValueStr + case int, int8, int16, int32, int64, uint, uint8, uint16, uint32, uint64: + return fmt.Sprintf("%d", value) == defaultValueStr + case float32, float64: + return fmt.Sprintf("%f", value) == defaultValueStr + default: + return false + } +} + +func GetFieldDefaultString(path []string, field string) string { + bytes := GetFieldDefault(path, field) + if len(bytes) > 0 { + return quoteRegex.ReplaceAllString(string(bytes), `$1`) + } + return "" +} + +func GetFieldDefaultInt32(path []string, field string) int32 { + defaultValueStr := GetFieldDefaultString(path, field) + intVar, _ := strconv.ParseInt(defaultValueStr, 0, 32) + return int32(intVar) +} + +func GetFieldDefaultBool(path []string, field string) bool { + defaultValueStr := GetFieldDefaultString(path, field) + return defaultValueStr == "true" +} + +func GetFieldDefaultDuration(path []string, field string) v1.Duration { + duration := v1.Duration{} + _ = duration.UnmarshalJSON(GetFieldDefault(path, field)) + return duration +} + +func GetFieldDefaultMapString(path []string, field string) map[string]string { + bytes := GetFieldDefault(path, field) + if len(bytes) > 0 { + m := make(map[string]string) + _ = json.Unmarshal(bytes, &m) + return m + } + return map[string]string{} +} + +func GetFieldDefault(path []string, field string) []byte { + pathProperties := getPathProperties(path) + if fieldSchema, ok := pathProperties[field]; ok { + return fieldSchema.Default.Raw + } + return []byte{} +} + +func GetValueOrDefaultInt32(path []string, field string, value *int32) int32 { + if value != nil { + return *value + } + return GetFieldDefaultInt32(path, field) +} + +func GetValueOrDefaultMapString(path []string, field string, value *map[string]string) map[string]string { + if value != nil { + return *value + } + return GetFieldDefaultMapString(path, field) +} + +func getPathProperties(path []string) map[string]apiextensionsv1.JSONSchemaProps { + schema := getSchema() + if schema == nil { + log.Print("getPathProperties schema is nill") + return map[string]apiextensionsv1.JSONSchemaProps{} + } + properties := schema.Properties + for _, key := range path { + if val, ok := properties[key]; ok { + properties = val.Properties + } + } + return properties +} + +func getSchema() *apiextensionsv1.JSONSchemaProps { + if crd == nil { + log.Print("getSchema crd is nill") + return nil + } + versions := crd.Spec.Versions + if len(versions) > 0 { + lastVersion := versions[len(versions)-1] + return lastVersion.Schema.OpenAPIV3Schema + } + log.Print("getSchema versions is empty") + return nil +} diff --git a/pkg/helper/flowcollector.go b/pkg/helper/flowcollector.go index d0fbbe04d..dd4617e1b 100644 --- a/pkg/helper/flowcollector.go +++ b/pkg/helper/flowcollector.go @@ -6,6 +6,7 @@ import ( flowslatest "github.com/netobserv/network-observability-operator/api/v1beta2" "github.com/netobserv/network-observability-operator/controllers/constants" "github.com/netobserv/network-observability-operator/pkg/metrics" + "k8s.io/utils/ptr" "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -170,3 +171,105 @@ func GetNamespace(spec *flowslatest.FlowCollectorSpec) string { } return constants.DefaultOperatorNamespace } + +func GetDebugAgentConfig(specConfig *flowslatest.DebugAgentConfig) flowslatest.DebugAgentConfig { + debugConfig := flowslatest.DebugAgentConfig{ + Env: map[string]string{}, + } + + if specConfig != nil { + if len(specConfig.Env) > 0 { + debugConfig.Env = specConfig.Env + } + } + + return debugConfig +} + +func GetDebugProcessorConfig(specConfig *flowslatest.DebugProcessorConfig) flowslatest.DebugProcessorConfig { + debugConfig := flowslatest.DebugProcessorConfig{ + Env: map[string]string{}, + Port: ptr.To(GetFieldDefaultInt32(ProcessorDebugPath, "port")), + HealthPort: ptr.To(GetFieldDefaultInt32(ProcessorDebugPath, "healthPort")), + ProfilePort: ptr.To(GetFieldDefaultInt32(ProcessorDebugPath, "profilePort")), + EnableKubeProbes: ptr.To(GetFieldDefaultBool(ProcessorDebugPath, "enableKubeProbes")), + DropUnusedFields: ptr.To(GetFieldDefaultBool(ProcessorDebugPath, "dropUnusedFields")), + ConversationHeartbeatInterval: ptr.To(GetFieldDefaultDuration(ProcessorDebugPath, "conversationHeartbeatInterval")), + ConversationEndTimeout: ptr.To(GetFieldDefaultDuration(ProcessorDebugPath, "conversationEndTimeout")), + ConversationTerminatingTimeout: ptr.To(GetFieldDefaultDuration(ProcessorDebugPath, "conversationTerminatingTimeout")), + LokiMinBackoff: ptr.To(GetFieldDefaultDuration(ProcessorDebugPath, "lokiMinBackoff")), + LokiMaxBackoff: ptr.To(GetFieldDefaultDuration(ProcessorDebugPath, "lokiMaxBackoff")), + LokiMaxRetries: ptr.To(GetFieldDefaultInt32(ProcessorDebugPath, "lokiMaxRetries")), + LokiStaticLabels: ptr.To(GetFieldDefaultMapString(ProcessorDebugPath, "lokiStaticLabels")), + } + + if specConfig != nil { + if len(specConfig.Env) > 0 { + debugConfig.Env = specConfig.Env + } + if specConfig.Port != nil && *specConfig.Port > 0 { + debugConfig.Port = specConfig.Port + } + if specConfig.HealthPort != nil && *specConfig.HealthPort > 0 { + debugConfig.HealthPort = specConfig.HealthPort + } + if specConfig.ProfilePort != nil && *specConfig.ProfilePort > 0 { + debugConfig.ProfilePort = specConfig.ProfilePort + } + if specConfig.EnableKubeProbes != nil { + debugConfig.EnableKubeProbes = specConfig.EnableKubeProbes + } + if specConfig.DropUnusedFields != nil { + debugConfig.DropUnusedFields = specConfig.DropUnusedFields + } + if specConfig.ConversationHeartbeatInterval != nil { + debugConfig.ConversationHeartbeatInterval = specConfig.ConversationHeartbeatInterval + } + if specConfig.ConversationEndTimeout != nil { + debugConfig.ConversationEndTimeout = specConfig.ConversationEndTimeout + } + if specConfig.ConversationTerminatingTimeout != nil { + debugConfig.ConversationTerminatingTimeout = specConfig.ConversationTerminatingTimeout + } + if specConfig.LokiMinBackoff != nil { + debugConfig.LokiMinBackoff = specConfig.LokiMinBackoff + } + if specConfig.LokiMaxBackoff != nil { + debugConfig.LokiMaxBackoff = specConfig.LokiMaxBackoff + } + if specConfig.LokiMaxRetries != nil { + debugConfig.LokiMaxRetries = specConfig.LokiMaxRetries + } + if specConfig.LokiStaticLabels != nil { + debugConfig.LokiStaticLabels = specConfig.LokiStaticLabels + } + } + + return debugConfig +} + +func GetDebugPluginConfig(specConfig *flowslatest.DebugPluginConfig) flowslatest.DebugPluginConfig { + debugConfig := flowslatest.DebugPluginConfig{ + Env: map[string]string{}, + Args: []string{}, + Register: ptr.To(GetFieldDefaultBool(PluginDebugPath, "register")), + Port: ptr.To(GetFieldDefaultInt32(PluginDebugPath, "port")), + } + + if specConfig != nil { + if len(specConfig.Env) > 0 { + debugConfig.Env = specConfig.Env + } + if len(specConfig.Args) > 0 { + debugConfig.Args = specConfig.Args + } + if specConfig.Register != nil { + debugConfig.Register = specConfig.Register + } + if specConfig.Port != nil && *specConfig.Port > 0 { + debugConfig.Port = specConfig.Port + } + } + + return debugConfig +} diff --git a/pkg/helper/helpers_test.go b/pkg/helper/helpers_test.go index bacd5898c..3ac885fd4 100644 --- a/pkg/helper/helpers_test.go +++ b/pkg/helper/helpers_test.go @@ -5,7 +5,9 @@ import ( "time" "github.com/stretchr/testify/assert" + v1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/yaml" ) func TestExtractVersion(t *testing.T) { @@ -95,3 +97,62 @@ func TestUnstructuredDuration(t *testing.T) { } }) } + +func TestCRDDefault(t *testing.T) { + labels := map[string]string{"app": "netobserv-flowcollector"} + labelBytes, _ := yaml.Marshal(labels) + + SetCRD(&v1.CustomResourceDefinition{ + Spec: v1.CustomResourceDefinitionSpec{ + Versions: []v1.CustomResourceDefinitionVersion{ + { + Name: "testAPI", + Schema: &v1.CustomResourceValidation{ + OpenAPIV3Schema: &v1.JSONSchemaProps{ + Properties: map[string]v1.JSONSchemaProps{ + "spec": { + Properties: map[string]v1.JSONSchemaProps{ + "processor": { + Properties: map[string]v1.JSONSchemaProps{ + "debug": { + Properties: map[string]v1.JSONSchemaProps{ + "conversationEndTimeout": { + Default: &v1.JSON{ + Raw: []byte("10s"), + }, + }, + "lokiMaxRetries": { + Default: &v1.JSON{ + Raw: []byte("2"), + }, + }, + "lokiStaticLabels": { + Default: &v1.JSON{ + Raw: labelBytes, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }) + // conversationEndTimeout + assert.Equal(t, "10s", GetFieldDefaultString([]string{"spec", "processor", "debug"}, "conversationEndTimeout")) + assert.Equal(t, true, IsDefaultValue([]string{"spec", "processor", "debug"}, "conversationEndTimeout", "10s")) + assert.Equal(t, false, IsDefaultValue([]string{"spec", "processor", "debug"}, "conversationEndTimeout", "1s")) + // lokiMaxRetries + assert.Equal(t, "2", GetFieldDefaultString([]string{"spec", "processor", "debug"}, "lokiMaxRetries")) + assert.Equal(t, true, IsDefaultValue([]string{"spec", "processor", "debug"}, "lokiMaxRetries", "2")) + assert.Equal(t, false, IsDefaultValue([]string{"spec", "processor", "debug"}, "lokiMaxRetries", "12")) + // lokiStaticLabels + assert.Equal(t, "app: netobserv-flowcollector\n", GetFieldDefaultString([]string{"spec", "processor", "debug"}, "lokiStaticLabels")) + +}