diff --git a/internal/services/controller/controller.go b/internal/services/controller/controller.go index be21113c..e4c85008 100644 --- a/internal/services/controller/controller.go +++ b/internal/services/controller/controller.go @@ -81,12 +81,13 @@ type Controller struct { type conditionalInformer struct { // if empty it means all namespaces - namespace string - resource schema.GroupVersionResource - apiType reflect.Type - informerFactory func() cache.SharedIndexInformer - permissionVerbs []string - isApplied bool + namespace string + resource schema.GroupVersionResource + apiType reflect.Type + informerFactory func() cache.SharedIndexInformer + permissionVerbs []string + isApplied bool + isResourceInError bool } func (i *conditionalInformer) Name() string { @@ -261,19 +262,24 @@ func (c *Controller) startConditionalInformersWithWatcher(ctx context.Context, c tryConditionalInformers := conditionalInformers if err := wait.PollUntilContextCancel(ctx, 2*time.Minute, true, func(ctx context.Context) (done bool, err error) { - apiResourceLists := fetchAPIResourceLists(c.discovery, c.log) - if apiResourceLists == nil { - return false, nil + _, apiResourceLists, err := c.discovery.ServerGroupsAndResources() + if err != nil { + c.log.Warnf("Error when getting server resources: %v", err.Error()) + resourcesInError := extractGroupVersionsFromApiResourceError(c.log, err) + for i, informer := range tryConditionalInformers { + tryConditionalInformers[i].isResourceInError = resourcesInError[informer.resource.GroupVersion()] + } } c.log.Infof("Cluster API server is available, trying to start conditional informers") - for i, informer := range tryConditionalInformers { - if informer.isApplied { + if informer.isApplied || informer.isResourceInError { + // reset error so we can try again + tryConditionalInformers[i].isResourceInError = false continue } apiResourceListForGroupVersion := getAPIResourceListByGroupVersion(informer.resource.GroupVersion().String(), apiResourceLists) if !isResourceAvailable(informer.apiType, apiResourceListForGroupVersion) { - c.log.Warnf("Skipping conditional informer name: %v, because API resource is not available", + c.log.Infof("Skipping conditional informer name: %v, because API resource is not available", informer.Name(), ) continue @@ -471,6 +477,22 @@ func (c *Controller) informerIsAllowedToAccessResource(ctx context.Context, name func (c *Controller) Start(done <-chan struct{}) { c.informerFactory.Start(done) } +func extractGroupVersionsFromApiResourceError(log logrus.FieldLogger, err error) map[schema.GroupVersion]bool { + cleanedString := strings.Split(err.Error(), "unable to retrieve the complete list of server APIs: ")[1] + paths := strings.Split(cleanedString, ",") + + result := make(map[schema.GroupVersion]bool) + for _, path := range paths { + apiPath := strings.Split(path, ":")[0] + gv, e := schema.ParseGroupVersion(apiPath) + if e != nil { + log.Errorf("Error when unmarshalling group version: %v", e) + continue + } + result[gv] = true + } + return result +} func getConditionalInformers(clientset kubernetes.Interface, cfg *config.Controller, f informers.SharedInformerFactory, df dynamicinformer.DynamicSharedInformerFactory, metricsClient versioned.Interface, logger logrus.FieldLogger) []conditionalInformer { conditionalInformers := []conditionalInformer{ diff --git a/internal/services/controller/controller_test.go b/internal/services/controller/controller_test.go index 62b02c34..1f3773a2 100644 --- a/internal/services/controller/controller_test.go +++ b/internal/services/controller/controller_test.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "strings" "sync/atomic" "testing" "time" @@ -16,6 +17,7 @@ import ( karpenter "github.com/aws/karpenter/pkg/apis/v1beta1" "github.com/golang/mock/gomock" "github.com/google/uuid" + "github.com/samber/lo" "github.com/sirupsen/logrus" "github.com/stretchr/testify/require" "go.uber.org/goleak" @@ -26,19 +28,20 @@ import ( storagev1 "k8s.io/api/storage/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" dynamic_fake "k8s.io/client-go/dynamic/fake" "k8s.io/client-go/kubernetes/fake" authfakev1 "k8s.io/client-go/kubernetes/typed/authorization/v1/fake" k8stesting "k8s.io/client-go/testing" - "k8s.io/metrics/pkg/apis/external_metrics" metrics_fake "k8s.io/metrics/pkg/client/clientset/versioned/fake" "castai-agent/internal/castai" mock_castai "castai-agent/internal/castai/mock" "castai-agent/internal/config" "castai-agent/internal/services/controller/delta" + mock_discovery "castai-agent/internal/services/controller/mock/discovery" mock_types "castai-agent/internal/services/providers/types/mock" mock_version "castai-agent/internal/services/version/mock" "castai-agent/pkg/labels" @@ -60,12 +63,24 @@ func TestMain(m *testing.M) { ) } -func TestController_HappyPath(t *testing.T) { +func TestController_ShouldReceiveDeltasBasedOnAvailableResources(t *testing.T) { tests := map[string]struct { - objectCount int + expectedReceivedObjectsCount int + apiResourceError error }{ "All supported objects are found and received in delta": { - objectCount: 14, + expectedReceivedObjectsCount: 14, + }, + "when fetching api resources produces multiple errors should exclude those resources": { + apiResourceError: fmt.Errorf("unable to retrieve the complete list of server APIs: %v:"+ + "stale GroupVersion discovery: some error,%v: another error", + policyv1.SchemeGroupVersion.String(), storagev1.SchemeGroupVersion.String()), + expectedReceivedObjectsCount: 12, + }, + "when fetching api resources produces single error should exclude that resource": { + apiResourceError: fmt.Errorf("unable to retrieve the complete list of server APIs: %v:"+ + "stale GroupVersion discovery: some error", storagev1.SchemeGroupVersion.String()), + expectedReceivedObjectsCount: 13, }, } @@ -78,7 +93,6 @@ func TestController_HappyPath(t *testing.T) { utilruntime.Must(karpenter.SchemeBuilder.AddToScheme(scheme)) utilruntime.Must(datadoghqv1alpha1.SchemeBuilder.AddToScheme(scheme)) utilruntime.Must(argorollouts.SchemeBuilder.AddToScheme(scheme)) - utilruntime.Must(external_metrics.SchemeBuilder.AddToScheme(scheme)) mockctrl := gomock.NewController(t) castaiclient := mock_castai.NewMockClient(mockctrl) @@ -111,7 +125,32 @@ func TestController_HappyPath(t *testing.T) { version.EXPECT().Full().Return("1.21+").MaxTimes(3) clusterID := uuid.New() + var mockDiscovery *mock_discovery.MockDiscoveryInterface + _, apiResources, _ := clientset.Discovery().ServerGroupsAndResources() + if tt.apiResourceError != nil { + mockDiscovery = mock_discovery.NewMockDiscoveryInterface(mockctrl) + errors := extractGroupVersionsFromApiResourceError(log, tt.apiResourceError) + apiResources = lo.Filter(apiResources, func(apiResource *metav1.APIResourceList, _ int) bool { + gv, _ := schema.ParseGroupVersion(apiResource.GroupVersion) + return !errors[gv] + }) + // filter expected data based on available resources + for k := range objectsData { + kLowerCased := strings.ToLower(k) + found := false + for _, resource := range apiResources { + if strings.Contains(resource.APIResources[0].Name, kLowerCased) { + found = true + break + } + } + if !found { + delete(objectsData, k) + } + } + mockDiscovery.EXPECT().ServerGroupsAndResources().Return([]*metav1.APIGroup{}, apiResources, tt.apiResourceError).AnyTimes() + } var invocations int64 castaiclient.EXPECT(). @@ -122,7 +161,7 @@ func TestController_HappyPath(t *testing.T) { require.Equal(t, clusterID, d.ClusterID) require.Equal(t, "1.21+", d.ClusterVersion) require.True(t, d.FullSnapshot) - require.Len(t, d.Items, tt.objectCount) + require.Len(t, d.Items, tt.expectedReceivedObjectsCount) var actualValues []string for _, item := range d.Items { @@ -166,6 +205,10 @@ func TestController_HappyPath(t *testing.T) { fakeSelfSubjectAccessReviewsClient, ) + if mockDiscovery != nil { + ctrl.discovery = mockDiscovery + } + ctrl.Start(ctx.Done()) go func() { @@ -181,6 +224,20 @@ func TestController_HappyPath(t *testing.T) { } } +func TestController_ApiResourcesErrorProcessing(t *testing.T) { + err := fmt.Errorf("unable to retrieve the complete list of server APIs: external.metrics.k8s.io/v1beta1: stale GroupVersion discovery: external.metrics.k8s.io/v1beta1,external.metrics.k8s.io/v2beta2: stale GroupVersion discovery: external.metrics.k8s.io/v2beta2") + val := extractGroupVersionsFromApiResourceError(logrus.New(), err) + require.Len(t, val, 2) + require.True(t, val[schema.GroupVersion{ + Group: "external.metrics.k8s.io", + Version: "v1beta1", + }]) + require.True(t, val[schema.GroupVersion{ + Group: "external.metrics.k8s.io", + Version: "v2beta2", + }]) +} + func TestController_ShouldKeepDeltaAfterDelete(t *testing.T) { mockctrl := gomock.NewController(t) castaiclient := mock_castai.NewMockClient(mockctrl)