diff --git a/internal/manifest/skrresources/collector.go b/internal/manifest/skrresources/collector.go new file mode 100644 index 0000000000..f939f7f695 --- /dev/null +++ b/internal/manifest/skrresources/collector.go @@ -0,0 +1,192 @@ +package skrresources + +import ( + "bytes" + "compress/gzip" + "context" + "encoding/base64" + "encoding/json" + "fmt" + "os" + "regexp" + "slices" + "strconv" + "strings" + "sync" + "time" + + "github.com/jellydator/ttlcache/v3" + apimetav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/client" + logf "sigs.k8s.io/controller-runtime/pkg/log" + + "github.com/kyma-project/lifecycle-manager/api/shared" + "github.com/kyma-project/lifecycle-manager/internal" + "github.com/kyma-project/lifecycle-manager/internal/manifest/manifestclient" +) + +const ( + knownManagersDefault = string(manifestclient.DefaultFieldOwner) + ";" + + shared.OperatorName + ";" + + "k3s" // Applied in k3s environments. + knownManagersEnvVar = "KLM_EXPERIMENTAL_KNOWN_MANAGERS" + knownManagersRegexp = `^[a-zA-Z][a-zA-Z0-9.:_/-]{1,127}$` + + frequencyLimiterTTLDefault = 60 * 5 // 5 minutes + frequencyLimiterTTLEnvVar = "KLM_EXPERIMENTAL_FREQUENCY_LIMITER_TTL" + frequencyLimiterTTLRegexp = `^[1-9][0-9]{1,3}$` + + managedFieldsAnalysisLabelEnvVar = "KLM_EXPERIMENTAL_MANAGED_FIELDS_ANALYSIS_LABEL" +) + +var ( + allowedManagers = getAllowedManagers() //nolint:gochecknoglobals // list of managers is a global configuration + singletonFrequencyLimiter = newFrequencyLimiter() //nolint:gochecknoglobals // singleton cache is used to prevent emitting the same log multiple times in a short period +) + +type LogCollectorEntry struct { + ObjectName string `json:"objectName"` + ObjectNamespace string `json:"objectNamespace"` + ObjectGVK string `json:"objectGvk"` + ManagedFields []apimetav1.ManagedFieldsEntry `json:"managedFields"` +} + +// Implements skrresources.ManagedFieldsCollector interface, emits the collected data to the log stream. +// The collector is thread-safe. +// The collector is frequency-limited to prevent emitting entries for the same objectKey multiple times in a short time. +type LogCollector struct { + objectKey string + frequencyLimiter *ttlcache.Cache[string, bool] + owner client.FieldOwner + entries []LogCollectorEntry + mu sync.Mutex +} + +func NewLogCollector(key string, owner client.FieldOwner) *LogCollector { + return &LogCollector{ + objectKey: key, + owner: owner, + entries: []LogCollectorEntry{}, + frequencyLimiter: singletonFrequencyLimiter, + } +} + +// safeAddEntry adds a new entry to the collector's entries slice in a thread-safe way. +func (c *LogCollector) safeAddEntry(entry LogCollectorEntry) { + c.mu.Lock() + defer c.mu.Unlock() + c.entries = append(c.entries, entry) +} + +func (c *LogCollector) Collect(ctx context.Context, remoteObj client.Object) { + managedFields := remoteObj.GetManagedFields() + for _, mf := range managedFields { + if isUnknownManager(mf.Manager) { + newEntry := LogCollectorEntry{ + ObjectName: remoteObj.GetName(), + ObjectNamespace: remoteObj.GetNamespace(), + ObjectGVK: remoteObj.GetObjectKind().GroupVersionKind().String(), + ManagedFields: slices.Clone(remoteObj.GetManagedFields()), + } + c.safeAddEntry(newEntry) + return + } + } +} + +func (c *LogCollector) Emit(ctx context.Context) error { + if c.frequencyLimiter.Has(c.objectKey) { + logger := logf.FromContext(ctx, "owner", c.owner) + logger.V(internal.TraceLogLevel).Info("Unknown managers detection skipped (frequency)") + return nil + } + + c.mu.Lock() + defer c.mu.Unlock() + + if len(c.entries) > 0 { + c.frequencyLimiter.Set(c.objectKey, true, ttlcache.DefaultTTL) + + jsonSer, err := json.MarshalIndent(c.entries, "", " ") + if err != nil { + return fmt.Errorf("failed to serialize managed field data: %w", err) + } + logData, err := compressAndBase64(jsonSer) + if err != nil { + return err + } + + logger := logf.FromContext(ctx, "owner", c.owner) + logger.V(internal.TraceLogLevel).Info("Unknown managers detected", "base64gzip", logData) + } + return nil +} + +// compressAndBase64 compresses the input byte slice using gzip and encodes it to base64 so that it can be logged as a string. +func compressAndBase64(in []byte) (string, error) { + var buf bytes.Buffer + archive := gzip.NewWriter(&buf) + + _, err := archive.Write(in) + if err != nil { + return "", fmt.Errorf("failed to write to gzip archive: %w", err) + } + + if err := archive.Close(); err != nil { + return "", fmt.Errorf("failed to close gzip archive: %w", err) + } + + return base64.StdEncoding.EncodeToString(buf.Bytes()), nil +} + +func isUnknownManager(manager string) bool { + return !slices.Contains(allowedManagers, manager) +} + +// allowedManagers returns either a list configured in the KLM_RECONCILECONFIG_KNOWN_MANAGERS environment variable or the default list. +// The values must be separated by semicolons and are case-sensitive! +func getAllowedManagers() []string { + configured := os.Getenv(knownManagersEnvVar) + if configured == "" { + return splitBySemicolons(knownManagersDefault) + } else { + rxp := regexp.MustCompile(knownManagersRegexp) + configuredValues := splitBySemicolons(configured) + res := []string{} + for _, name := range configuredValues { + if rxp.MatchString(name) { + res = append(res, name) + } + } + return res + } +} + +func getFrequencyLimiterTTL() int { + var res int = frequencyLimiterTTLDefault + + if configured := os.Getenv(frequencyLimiterTTLEnvVar); configured != "" { + rxp := regexp.MustCompile(frequencyLimiterTTLRegexp) + if rxp.MatchString(configured) { + if parsed, err := strconv.Atoi(configured); err == nil { + res = parsed + } + } + } + + return res +} + +func newFrequencyLimiter() *ttlcache.Cache[string, bool] { + cache := ttlcache.New(ttlcache.WithTTL[string, bool](time.Duration(getFrequencyLimiterTTL()) * time.Second)) + go cache.Start() + return cache +} + +func splitBySemicolons(value string) []string { + return strings.Split(value, ";") +} + +func getManagedFieldsAnalysisLabel() string { + return os.Getenv(managedFieldsAnalysisLabelEnvVar) +} diff --git a/internal/manifest/skrresources/collector_test.go b/internal/manifest/skrresources/collector_test.go new file mode 100644 index 0000000000..77c8939436 --- /dev/null +++ b/internal/manifest/skrresources/collector_test.go @@ -0,0 +1,83 @@ +package skrresources //nolint:testpackage // testing package internals + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestGetAllowedManagers(t *testing.T) { + tests := []struct { + name string + envValue string + want []string + }{ + { + name: "default managers", + envValue: "", + want: []string{"declarative.kyma-project.io/applier", "lifecycle-manager", "k3s"}, + }, + { + name: "single manager in env", + envValue: "manager1", + want: []string{"manager1"}, + }, + { + name: "multiple managers in env", + envValue: "manager1;manager2;some-manager:3", + want: []string{"manager1", "manager2", "some-manager:3"}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if tt.envValue != "" { + t.Setenv(knownManagersEnvVar, tt.envValue) + } + assert.Equal(t, tt.want, getAllowedManagers()) + }) + } +} + +func TestGetCacheTTL(t *testing.T) { + tests := []struct { + name string + envValue string + want int + }{ + { + name: "default TTL", + envValue: "", + want: 300, + }, + { + name: "custom TTL", + envValue: "123", + want: 123, + }, + { + name: "invalid value is ignored, default TTL is returned", + envValue: "abc", + want: 300, + }, + { + name: "zero is invalid, default TTL is returned", + envValue: "0", + want: 300, + }, + { + name: "Negative value is ignored, default TTL is returned", + envValue: "-123", + want: 300, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if tt.envValue != "" { + t.Setenv(frequencyLimiterTTLEnvVar, tt.envValue) + } + assert.Equal(t, tt.want, getFrequencyLimiterTTL()) + }) + } +} diff --git a/internal/manifest/skrresources/manifestcollector.go b/internal/manifest/skrresources/manifestcollector.go new file mode 100644 index 0000000000..cbfff9323b --- /dev/null +++ b/internal/manifest/skrresources/manifestcollector.go @@ -0,0 +1,61 @@ +package skrresources + +import ( + "context" + + "sigs.k8s.io/controller-runtime/pkg/client" + + "github.com/kyma-project/lifecycle-manager/api/v1beta2" + "github.com/kyma-project/lifecycle-manager/internal/manifest/manifestclient" +) + +// ManifestLogCollector is a collector for remote Manifest objects. It delegates the calls to the embedded generic collector if collection is enabled for the given Manifest. +type ManifestLogCollector struct { + collector *LogCollector + enabled bool +} + +func NewManifestLogCollector(manifest *v1beta2.Manifest, owner client.FieldOwner) *ManifestLogCollector { + key := "" + enabled := false + if manifest != nil { + key = string(manifest.GetUID()) + enabled = isManifestCollectionEnabled(manifest) + } + return &ManifestLogCollector{ + collector: NewLogCollector(key, manifestclient.DefaultFieldOwner), + enabled: enabled, + } +} + +// Implements the skrresources.ManagedFieldsCollector interface. +func (c *ManifestLogCollector) Collect(ctx context.Context, obj client.Object) { + if c.enabled { + c.collector.Collect(ctx, obj) + } +} + +// Implements the skrresources.ManagedFieldsCollector interface. +func (c *ManifestLogCollector) Emit(ctx context.Context) error { + if c.enabled { + return c.collector.Emit(ctx) + } + return nil +} + +// isManifestCollectionEnabled checks if managed fields detection is enabled for the given manifest. +// The detection is disabled by default, but can be enabled by setting a specific label on the manifest CR. +func isManifestCollectionEnabled(obj *v1beta2.Manifest) bool { + if obj == nil { + return false + } + + configuredLabelName := getManagedFieldsAnalysisLabel() + + if configuredLabelName == "" { + return false + } + + _, found := obj.GetLabels()[configuredLabelName] + return found +} diff --git a/internal/manifest/skrresources/ssa.go b/internal/manifest/skrresources/ssa.go index 8a4aa70e0f..a55e5b821f 100644 --- a/internal/manifest/skrresources/ssa.go +++ b/internal/manifest/skrresources/ssa.go @@ -26,18 +26,28 @@ type SSA interface { Run(ctx context.Context, resourceInfo []*resource.Info) error } +type ManagedFieldsCollector interface { + // Collect collects managed fields data from the single object + Collect(ctx context.Context, obj client.Object) + // Emit emits collected data to some backing store + Emit(ctx context.Context) error +} + type ConcurrentDefaultSSA struct { clnt client.Client owner client.FieldOwner versioner machineryruntime.GroupVersioner converter machineryruntime.ObjectConvertor + collector ManagedFieldsCollector } -func ConcurrentSSA(clnt client.Client, owner client.FieldOwner) *ConcurrentDefaultSSA { +func ConcurrentSSA(clnt client.Client, owner client.FieldOwner, managedFieldsCollector ManagedFieldsCollector) *ConcurrentDefaultSSA { return &ConcurrentDefaultSSA{ - clnt: clnt, owner: owner, + clnt: clnt, + owner: owner, versioner: schema.GroupVersions(clnt.Scheme().PrioritizedVersionsAllGroups()), converter: clnt.Scheme(), + collector: managedFieldsCollector, } } @@ -70,6 +80,9 @@ func (c *ConcurrentDefaultSSA) Run(ctx context.Context, resources []*resource.In return errors.Join(errs...) } logger.V(internal.DebugLogLevel).Info("ServerSideApply finished", "time", ssaFinish) + if err := c.collector.Emit(ctx); err != nil { + logger.V(internal.DebugLogLevel).Error(err, "error emitting data of unknown field managers") + } return nil } @@ -123,6 +136,7 @@ func (c *ConcurrentDefaultSSA) serverSideApplyResourceInfo( ) } + c.collector.Collect(ctx, obj) return nil } diff --git a/internal/manifest/skrresources/ssa_test.go b/internal/manifest/skrresources/ssa_test.go index 308c2a59b7..532a04e2fa 100644 --- a/internal/manifest/skrresources/ssa_test.go +++ b/internal/manifest/skrresources/ssa_test.go @@ -29,6 +29,8 @@ func TestConcurrentSSA(t *testing.T) { fakeClientBuilder := fake.NewClientBuilder().WithRuntimeObjects(pod).Build() _ = fakeClientBuilder.Create(context.Background(), pod) + inactiveCollector := skrresources.NewManifestLogCollector(nil, client.FieldOwner("test")) + type args struct { clnt client.Client owner client.FieldOwner @@ -54,7 +56,7 @@ func TestConcurrentSSA(t *testing.T) { t.Run( testCase.name, func(t *testing.T) { t.Parallel() - ssa := skrresources.ConcurrentSSA(testCase.ssa.clnt, testCase.ssa.owner) + ssa := skrresources.ConcurrentSSA(testCase.ssa.clnt, testCase.ssa.owner, inactiveCollector) if err := ssa.Run(context.Background(), testCase.apply); err != nil { require.ErrorIs(t, err, testCase.err) } diff --git a/internal/manifest/skrresources/sync.go b/internal/manifest/skrresources/sync.go index feae73400a..cd28dcee19 100644 --- a/internal/manifest/skrresources/sync.go +++ b/internal/manifest/skrresources/sync.go @@ -19,7 +19,9 @@ func SyncResources(ctx context.Context, skrClient client.Client, manifest *v1bet ) error { manifestStatus := manifest.GetStatus() - if err := ConcurrentSSA(skrClient, manifestclient.DefaultFieldOwner).Run(ctx, target); err != nil { + managedFieldsCollector := NewManifestLogCollector(manifest, manifestclient.DefaultFieldOwner) + + if err := ConcurrentSSA(skrClient, manifestclient.DefaultFieldOwner, managedFieldsCollector).Run(ctx, target); err != nil { manifest.SetStatus(manifestStatus.WithState(shared.StateError).WithErr(err)) return err }