Skip to content

Commit

Permalink
feat: Unexepected field managers reporting (#2251)
Browse files Browse the repository at this point in the history
Collect data for field managers during Manifest reconciliation
  • Loading branch information
Tomasz-Smelcerz-SAP authored Feb 20, 2025
1 parent 9ebdc7e commit e993711
Show file tree
Hide file tree
Showing 6 changed files with 358 additions and 4 deletions.
192 changes: 192 additions & 0 deletions internal/manifest/skrresources/collector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
package skrresources

import (
"bytes"
"compress/gzip"
"context"
"encoding/base64"
"encoding/json"
"fmt"
"os"
"regexp"
"slices"
"strconv"
"strings"
"sync"
"time"

"github.com/jellydator/ttlcache/v3"
apimetav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
logf "sigs.k8s.io/controller-runtime/pkg/log"

"github.com/kyma-project/lifecycle-manager/api/shared"
"github.com/kyma-project/lifecycle-manager/internal"
"github.com/kyma-project/lifecycle-manager/internal/manifest/manifestclient"
)

const (
knownManagersDefault = string(manifestclient.DefaultFieldOwner) + ";" +
shared.OperatorName + ";" +
"k3s" // Applied in k3s environments.
knownManagersEnvVar = "KLM_EXPERIMENTAL_KNOWN_MANAGERS"
knownManagersRegexp = `^[a-zA-Z][a-zA-Z0-9.:_/-]{1,127}$`

frequencyLimiterTTLDefault = 60 * 5 // 5 minutes
frequencyLimiterTTLEnvVar = "KLM_EXPERIMENTAL_FREQUENCY_LIMITER_TTL"
frequencyLimiterTTLRegexp = `^[1-9][0-9]{1,3}$`

managedFieldsAnalysisLabelEnvVar = "KLM_EXPERIMENTAL_MANAGED_FIELDS_ANALYSIS_LABEL"
)

var (
allowedManagers = getAllowedManagers() //nolint:gochecknoglobals // list of managers is a global configuration
singletonFrequencyLimiter = newFrequencyLimiter() //nolint:gochecknoglobals // singleton cache is used to prevent emitting the same log multiple times in a short period
)

type LogCollectorEntry struct {
ObjectName string `json:"objectName"`
ObjectNamespace string `json:"objectNamespace"`
ObjectGVK string `json:"objectGvk"`
ManagedFields []apimetav1.ManagedFieldsEntry `json:"managedFields"`
}

// Implements skrresources.ManagedFieldsCollector interface, emits the collected data to the log stream.
// The collector is thread-safe.
// The collector is frequency-limited to prevent emitting entries for the same objectKey multiple times in a short time.
type LogCollector struct {
objectKey string
frequencyLimiter *ttlcache.Cache[string, bool]
owner client.FieldOwner
entries []LogCollectorEntry
mu sync.Mutex
}

func NewLogCollector(key string, owner client.FieldOwner) *LogCollector {
return &LogCollector{
objectKey: key,
owner: owner,
entries: []LogCollectorEntry{},
frequencyLimiter: singletonFrequencyLimiter,
}
}

// safeAddEntry adds a new entry to the collector's entries slice in a thread-safe way.
func (c *LogCollector) safeAddEntry(entry LogCollectorEntry) {
c.mu.Lock()
defer c.mu.Unlock()
c.entries = append(c.entries, entry)
}

func (c *LogCollector) Collect(ctx context.Context, remoteObj client.Object) {
managedFields := remoteObj.GetManagedFields()
for _, mf := range managedFields {
if isUnknownManager(mf.Manager) {
newEntry := LogCollectorEntry{
ObjectName: remoteObj.GetName(),
ObjectNamespace: remoteObj.GetNamespace(),
ObjectGVK: remoteObj.GetObjectKind().GroupVersionKind().String(),
ManagedFields: slices.Clone(remoteObj.GetManagedFields()),
}
c.safeAddEntry(newEntry)
return
}
}
}

func (c *LogCollector) Emit(ctx context.Context) error {
if c.frequencyLimiter.Has(c.objectKey) {
logger := logf.FromContext(ctx, "owner", c.owner)
logger.V(internal.TraceLogLevel).Info("Unknown managers detection skipped (frequency)")
return nil
}

c.mu.Lock()
defer c.mu.Unlock()

if len(c.entries) > 0 {
c.frequencyLimiter.Set(c.objectKey, true, ttlcache.DefaultTTL)

jsonSer, err := json.MarshalIndent(c.entries, "", " ")
if err != nil {
return fmt.Errorf("failed to serialize managed field data: %w", err)
}
logData, err := compressAndBase64(jsonSer)
if err != nil {
return err
}

logger := logf.FromContext(ctx, "owner", c.owner)
logger.V(internal.TraceLogLevel).Info("Unknown managers detected", "base64gzip", logData)
}
return nil
}

// compressAndBase64 compresses the input byte slice using gzip and encodes it to base64 so that it can be logged as a string.
func compressAndBase64(in []byte) (string, error) {
var buf bytes.Buffer
archive := gzip.NewWriter(&buf)

_, err := archive.Write(in)
if err != nil {
return "", fmt.Errorf("failed to write to gzip archive: %w", err)
}

if err := archive.Close(); err != nil {
return "", fmt.Errorf("failed to close gzip archive: %w", err)
}

return base64.StdEncoding.EncodeToString(buf.Bytes()), nil
}

func isUnknownManager(manager string) bool {
return !slices.Contains(allowedManagers, manager)
}

// allowedManagers returns either a list configured in the KLM_RECONCILECONFIG_KNOWN_MANAGERS environment variable or the default list.
// The values must be separated by semicolons and are case-sensitive!
func getAllowedManagers() []string {
configured := os.Getenv(knownManagersEnvVar)
if configured == "" {
return splitBySemicolons(knownManagersDefault)
} else {
rxp := regexp.MustCompile(knownManagersRegexp)
configuredValues := splitBySemicolons(configured)
res := []string{}
for _, name := range configuredValues {
if rxp.MatchString(name) {
res = append(res, name)
}
}
return res
}
}

func getFrequencyLimiterTTL() int {
var res int = frequencyLimiterTTLDefault

if configured := os.Getenv(frequencyLimiterTTLEnvVar); configured != "" {
rxp := regexp.MustCompile(frequencyLimiterTTLRegexp)
if rxp.MatchString(configured) {
if parsed, err := strconv.Atoi(configured); err == nil {
res = parsed
}
}
}

return res
}

func newFrequencyLimiter() *ttlcache.Cache[string, bool] {
cache := ttlcache.New(ttlcache.WithTTL[string, bool](time.Duration(getFrequencyLimiterTTL()) * time.Second))
go cache.Start()
return cache
}

func splitBySemicolons(value string) []string {
return strings.Split(value, ";")
}

func getManagedFieldsAnalysisLabel() string {
return os.Getenv(managedFieldsAnalysisLabelEnvVar)
}
83 changes: 83 additions & 0 deletions internal/manifest/skrresources/collector_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package skrresources //nolint:testpackage // testing package internals

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestGetAllowedManagers(t *testing.T) {
tests := []struct {
name string
envValue string
want []string
}{
{
name: "default managers",
envValue: "",
want: []string{"declarative.kyma-project.io/applier", "lifecycle-manager", "k3s"},
},
{
name: "single manager in env",
envValue: "manager1",
want: []string{"manager1"},
},
{
name: "multiple managers in env",
envValue: "manager1;manager2;some-manager:3",
want: []string{"manager1", "manager2", "some-manager:3"},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if tt.envValue != "" {
t.Setenv(knownManagersEnvVar, tt.envValue)
}
assert.Equal(t, tt.want, getAllowedManagers())
})
}
}

func TestGetCacheTTL(t *testing.T) {
tests := []struct {
name string
envValue string
want int
}{
{
name: "default TTL",
envValue: "",
want: 300,
},
{
name: "custom TTL",
envValue: "123",
want: 123,
},
{
name: "invalid value is ignored, default TTL is returned",
envValue: "abc",
want: 300,
},
{
name: "zero is invalid, default TTL is returned",
envValue: "0",
want: 300,
},
{
name: "Negative value is ignored, default TTL is returned",
envValue: "-123",
want: 300,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if tt.envValue != "" {
t.Setenv(frequencyLimiterTTLEnvVar, tt.envValue)
}
assert.Equal(t, tt.want, getFrequencyLimiterTTL())
})
}
}
61 changes: 61 additions & 0 deletions internal/manifest/skrresources/manifestcollector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package skrresources

import (
"context"

"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/kyma-project/lifecycle-manager/api/v1beta2"
"github.com/kyma-project/lifecycle-manager/internal/manifest/manifestclient"
)

// ManifestLogCollector is a collector for remote Manifest objects. It delegates the calls to the embedded generic collector if collection is enabled for the given Manifest.
type ManifestLogCollector struct {
collector *LogCollector
enabled bool
}

func NewManifestLogCollector(manifest *v1beta2.Manifest, owner client.FieldOwner) *ManifestLogCollector {
key := ""
enabled := false
if manifest != nil {
key = string(manifest.GetUID())
enabled = isManifestCollectionEnabled(manifest)
}
return &ManifestLogCollector{
collector: NewLogCollector(key, manifestclient.DefaultFieldOwner),
enabled: enabled,
}
}

// Implements the skrresources.ManagedFieldsCollector interface.
func (c *ManifestLogCollector) Collect(ctx context.Context, obj client.Object) {
if c.enabled {
c.collector.Collect(ctx, obj)
}
}

// Implements the skrresources.ManagedFieldsCollector interface.
func (c *ManifestLogCollector) Emit(ctx context.Context) error {
if c.enabled {
return c.collector.Emit(ctx)
}
return nil
}

// isManifestCollectionEnabled checks if managed fields detection is enabled for the given manifest.
// The detection is disabled by default, but can be enabled by setting a specific label on the manifest CR.
func isManifestCollectionEnabled(obj *v1beta2.Manifest) bool {
if obj == nil {
return false
}

configuredLabelName := getManagedFieldsAnalysisLabel()

if configuredLabelName == "" {
return false
}

_, found := obj.GetLabels()[configuredLabelName]
return found
}
18 changes: 16 additions & 2 deletions internal/manifest/skrresources/ssa.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,28 @@ type SSA interface {
Run(ctx context.Context, resourceInfo []*resource.Info) error
}

type ManagedFieldsCollector interface {
// Collect collects managed fields data from the single object
Collect(ctx context.Context, obj client.Object)
// Emit emits collected data to some backing store
Emit(ctx context.Context) error
}

type ConcurrentDefaultSSA struct {
clnt client.Client
owner client.FieldOwner
versioner machineryruntime.GroupVersioner
converter machineryruntime.ObjectConvertor
collector ManagedFieldsCollector
}

func ConcurrentSSA(clnt client.Client, owner client.FieldOwner) *ConcurrentDefaultSSA {
func ConcurrentSSA(clnt client.Client, owner client.FieldOwner, managedFieldsCollector ManagedFieldsCollector) *ConcurrentDefaultSSA {
return &ConcurrentDefaultSSA{
clnt: clnt, owner: owner,
clnt: clnt,
owner: owner,
versioner: schema.GroupVersions(clnt.Scheme().PrioritizedVersionsAllGroups()),
converter: clnt.Scheme(),
collector: managedFieldsCollector,
}
}

Expand Down Expand Up @@ -70,6 +80,9 @@ func (c *ConcurrentDefaultSSA) Run(ctx context.Context, resources []*resource.In
return errors.Join(errs...)
}
logger.V(internal.DebugLogLevel).Info("ServerSideApply finished", "time", ssaFinish)
if err := c.collector.Emit(ctx); err != nil {
logger.V(internal.DebugLogLevel).Error(err, "error emitting data of unknown field managers")
}
return nil
}

Expand Down Expand Up @@ -123,6 +136,7 @@ func (c *ConcurrentDefaultSSA) serverSideApplyResourceInfo(
)
}

c.collector.Collect(ctx, obj)
return nil
}

Expand Down
Loading

0 comments on commit e993711

Please sign in to comment.