Skip to content

Commit

Permalink
Merge pull request #168 from castai/CO-3027-agent-conditional-informe…
Browse files Browse the repository at this point in the history
…rs-need-better-error-handling-impl

Agent conditional informers have better error handling
  • Loading branch information
linkas45 authored Jun 4, 2024
2 parents addeeef + 51a5ccd commit d15bc45
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 18 deletions.
46 changes: 34 additions & 12 deletions internal/services/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,13 @@ type Controller struct {

type conditionalInformer struct {
// if empty it means all namespaces
namespace string
resource schema.GroupVersionResource
apiType reflect.Type
informerFactory func() cache.SharedIndexInformer
permissionVerbs []string
isApplied bool
namespace string
resource schema.GroupVersionResource
apiType reflect.Type
informerFactory func() cache.SharedIndexInformer
permissionVerbs []string
isApplied bool
isResourceInError bool
}

func (i *conditionalInformer) Name() string {
Expand Down Expand Up @@ -261,19 +262,24 @@ func (c *Controller) startConditionalInformersWithWatcher(ctx context.Context, c
tryConditionalInformers := conditionalInformers

if err := wait.PollUntilContextCancel(ctx, 2*time.Minute, true, func(ctx context.Context) (done bool, err error) {
apiResourceLists := fetchAPIResourceLists(c.discovery, c.log)
if apiResourceLists == nil {
return false, nil
_, apiResourceLists, err := c.discovery.ServerGroupsAndResources()
if err != nil {
c.log.Warnf("Error when getting server resources: %v", err.Error())
resourcesInError := extractGroupVersionsFromApiResourceError(c.log, err)
for i, informer := range tryConditionalInformers {
tryConditionalInformers[i].isResourceInError = resourcesInError[informer.resource.GroupVersion()]
}
}
c.log.Infof("Cluster API server is available, trying to start conditional informers")

for i, informer := range tryConditionalInformers {
if informer.isApplied {
if informer.isApplied || informer.isResourceInError {
// reset error so we can try again
tryConditionalInformers[i].isResourceInError = false
continue
}
apiResourceListForGroupVersion := getAPIResourceListByGroupVersion(informer.resource.GroupVersion().String(), apiResourceLists)
if !isResourceAvailable(informer.apiType, apiResourceListForGroupVersion) {
c.log.Warnf("Skipping conditional informer name: %v, because API resource is not available",
c.log.Infof("Skipping conditional informer name: %v, because API resource is not available",
informer.Name(),
)
continue
Expand Down Expand Up @@ -471,6 +477,22 @@ func (c *Controller) informerIsAllowedToAccessResource(ctx context.Context, name
func (c *Controller) Start(done <-chan struct{}) {
c.informerFactory.Start(done)
}
func extractGroupVersionsFromApiResourceError(log logrus.FieldLogger, err error) map[schema.GroupVersion]bool {
cleanedString := strings.Split(err.Error(), "unable to retrieve the complete list of server APIs: ")[1]
paths := strings.Split(cleanedString, ",")

result := make(map[schema.GroupVersion]bool)
for _, path := range paths {
apiPath := strings.Split(path, ":")[0]
gv, e := schema.ParseGroupVersion(apiPath)
if e != nil {
log.Errorf("Error when unmarshalling group version: %v", e)
continue
}
result[gv] = true
}
return result
}

func getConditionalInformers(clientset kubernetes.Interface, cfg *config.Controller, f informers.SharedInformerFactory, df dynamicinformer.DynamicSharedInformerFactory, metricsClient versioned.Interface, logger logrus.FieldLogger) []conditionalInformer {
conditionalInformers := []conditionalInformer{
Expand Down
69 changes: 63 additions & 6 deletions internal/services/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"strings"
"sync/atomic"
"testing"
"time"
Expand All @@ -16,6 +17,7 @@ import (
karpenter "github.com/aws/karpenter/pkg/apis/v1beta1"
"github.com/golang/mock/gomock"
"github.com/google/uuid"
"github.com/samber/lo"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/require"
"go.uber.org/goleak"
Expand All @@ -26,19 +28,20 @@ import (
storagev1 "k8s.io/api/storage/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
dynamic_fake "k8s.io/client-go/dynamic/fake"
"k8s.io/client-go/kubernetes/fake"
authfakev1 "k8s.io/client-go/kubernetes/typed/authorization/v1/fake"
k8stesting "k8s.io/client-go/testing"
"k8s.io/metrics/pkg/apis/external_metrics"
metrics_fake "k8s.io/metrics/pkg/client/clientset/versioned/fake"

"castai-agent/internal/castai"
mock_castai "castai-agent/internal/castai/mock"
"castai-agent/internal/config"
"castai-agent/internal/services/controller/delta"
mock_discovery "castai-agent/internal/services/controller/mock/discovery"
mock_types "castai-agent/internal/services/providers/types/mock"
mock_version "castai-agent/internal/services/version/mock"
"castai-agent/pkg/labels"
Expand All @@ -60,12 +63,24 @@ func TestMain(m *testing.M) {
)
}

func TestController_HappyPath(t *testing.T) {
func TestController_ShouldReceiveDeltasBasedOnAvailableResources(t *testing.T) {
tests := map[string]struct {
objectCount int
expectedReceivedObjectsCount int
apiResourceError error
}{
"All supported objects are found and received in delta": {
objectCount: 14,
expectedReceivedObjectsCount: 14,
},
"when fetching api resources produces multiple errors should exclude those resources": {
apiResourceError: fmt.Errorf("unable to retrieve the complete list of server APIs: %v:"+
"stale GroupVersion discovery: some error,%v: another error",
policyv1.SchemeGroupVersion.String(), storagev1.SchemeGroupVersion.String()),
expectedReceivedObjectsCount: 12,
},
"when fetching api resources produces single error should exclude that resource": {
apiResourceError: fmt.Errorf("unable to retrieve the complete list of server APIs: %v:"+
"stale GroupVersion discovery: some error", storagev1.SchemeGroupVersion.String()),
expectedReceivedObjectsCount: 13,
},
}

Expand All @@ -78,7 +93,6 @@ func TestController_HappyPath(t *testing.T) {
utilruntime.Must(karpenter.SchemeBuilder.AddToScheme(scheme))
utilruntime.Must(datadoghqv1alpha1.SchemeBuilder.AddToScheme(scheme))
utilruntime.Must(argorollouts.SchemeBuilder.AddToScheme(scheme))
utilruntime.Must(external_metrics.SchemeBuilder.AddToScheme(scheme))

mockctrl := gomock.NewController(t)
castaiclient := mock_castai.NewMockClient(mockctrl)
Expand Down Expand Up @@ -111,7 +125,32 @@ func TestController_HappyPath(t *testing.T) {
version.EXPECT().Full().Return("1.21+").MaxTimes(3)

clusterID := uuid.New()
var mockDiscovery *mock_discovery.MockDiscoveryInterface
_, apiResources, _ := clientset.Discovery().ServerGroupsAndResources()
if tt.apiResourceError != nil {
mockDiscovery = mock_discovery.NewMockDiscoveryInterface(mockctrl)
errors := extractGroupVersionsFromApiResourceError(log, tt.apiResourceError)
apiResources = lo.Filter(apiResources, func(apiResource *metav1.APIResourceList, _ int) bool {
gv, _ := schema.ParseGroupVersion(apiResource.GroupVersion)
return !errors[gv]
})

// filter expected data based on available resources
for k := range objectsData {
kLowerCased := strings.ToLower(k)
found := false
for _, resource := range apiResources {
if strings.Contains(resource.APIResources[0].Name, kLowerCased) {
found = true
break
}
}
if !found {
delete(objectsData, k)
}
}
mockDiscovery.EXPECT().ServerGroupsAndResources().Return([]*metav1.APIGroup{}, apiResources, tt.apiResourceError).AnyTimes()
}
var invocations int64

castaiclient.EXPECT().
Expand All @@ -122,7 +161,7 @@ func TestController_HappyPath(t *testing.T) {
require.Equal(t, clusterID, d.ClusterID)
require.Equal(t, "1.21+", d.ClusterVersion)
require.True(t, d.FullSnapshot)
require.Len(t, d.Items, tt.objectCount)
require.Len(t, d.Items, tt.expectedReceivedObjectsCount)

var actualValues []string
for _, item := range d.Items {
Expand Down Expand Up @@ -166,6 +205,10 @@ func TestController_HappyPath(t *testing.T) {
fakeSelfSubjectAccessReviewsClient,
)

if mockDiscovery != nil {
ctrl.discovery = mockDiscovery
}

ctrl.Start(ctx.Done())

go func() {
Expand All @@ -181,6 +224,20 @@ func TestController_HappyPath(t *testing.T) {
}
}

func TestController_ApiResourcesErrorProcessing(t *testing.T) {
err := fmt.Errorf("unable to retrieve the complete list of server APIs: external.metrics.k8s.io/v1beta1: stale GroupVersion discovery: external.metrics.k8s.io/v1beta1,external.metrics.k8s.io/v2beta2: stale GroupVersion discovery: external.metrics.k8s.io/v2beta2")
val := extractGroupVersionsFromApiResourceError(logrus.New(), err)
require.Len(t, val, 2)
require.True(t, val[schema.GroupVersion{
Group: "external.metrics.k8s.io",
Version: "v1beta1",
}])
require.True(t, val[schema.GroupVersion{
Group: "external.metrics.k8s.io",
Version: "v2beta2",
}])
}

func TestController_ShouldKeepDeltaAfterDelete(t *testing.T) {
mockctrl := gomock.NewController(t)
castaiclient := mock_castai.NewMockClient(mockctrl)
Expand Down

0 comments on commit d15bc45

Please sign in to comment.