Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: Remove ReplicaSet 0 replicas filter #209

Merged
merged 2 commits into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/dump/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func run(ctx context.Context) error {

log = log.WithField("k8s_version", v.Full())

delta, err := controller.CollectSingleSnapshot(ctx, log, clusterID, clientset, dynamicClient, metricsClient, cfg.Controller, v, "")
delta, err := controller.CollectSingleSnapshot(ctx, log, clusterID, clientset, dynamicClient, metricsClient, cfg.Controller, v)
if err != nil {
return err
}
Expand Down
33 changes: 8 additions & 25 deletions internal/services/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,12 +119,11 @@ func CollectSingleSnapshot(ctx context.Context,
metricsClient versioned.Interface,
cfg *config.Controller,
v version.Interface,
castwareNamespace string,
) (*castai.Delta, error) {
f := informers.NewSharedInformerFactory(clientset, 0)
df := dynamicinformer.NewDynamicSharedInformerFactory(dynamicClient, 0)

defaultInformers := getDefaultInformers(f, castwareNamespace)
defaultInformers := getDefaultInformers(f)
conditionalInformers := getConditionalInformers(clientset, cfg, f, df, metricsClient, log)
additionalTransformers := createAdditionalTransformers(cfg)

Expand Down Expand Up @@ -199,7 +198,6 @@ func New(
agentVersion *config.AgentVersion,
healthzProvider *HealthzProvider,
selfSubjectAccessReview authorizationtypev1.SelfSubjectAccessReviewInterface,
castwareNamespace string,
) *Controller {
healthzProvider.Initializing()

Expand All @@ -210,7 +208,7 @@ func New(
df := dynamicinformer.NewDynamicSharedInformerFactory(dynamicClient, defaultResync)
discovery := clientset.Discovery()

defaultInformers := getDefaultInformers(f, castwareNamespace)
defaultInformers := getDefaultInformers(f)
conditionalInformers := getConditionalInformers(clientset, cfg, f, df, metricsClient, log)
additionalTransformers := createAdditionalTransformers(cfg)

Expand Down Expand Up @@ -935,7 +933,7 @@ type defaultInformer struct {
filters filters.Filters
}

func getDefaultInformers(f informers.SharedInformerFactory, castwareNamespace string) map[reflect.Type]defaultInformer {
func getDefaultInformers(f informers.SharedInformerFactory) map[reflect.Type]defaultInformer {
return map[reflect.Type]defaultInformer{
reflect.TypeOf(&corev1.Node{}): {informer: f.Core().V1().Nodes().Informer()},
reflect.TypeOf(&corev1.Pod{}): {informer: f.Core().V1().Pods().Informer()},
Expand All @@ -944,26 +942,11 @@ func getDefaultInformers(f informers.SharedInformerFactory, castwareNamespace st
reflect.TypeOf(&corev1.ReplicationController{}): {informer: f.Core().V1().ReplicationControllers().Informer()},
reflect.TypeOf(&corev1.Namespace{}): {informer: f.Core().V1().Namespaces().Informer()},
reflect.TypeOf(&appsv1.Deployment{}): {informer: f.Apps().V1().Deployments().Informer()},
reflect.TypeOf(&appsv1.ReplicaSet{}): {
informer: f.Apps().V1().ReplicaSets().Informer(),
filters: filters.Filters{
{
func(e castai.EventType, obj interface{}) bool {
replicaSet, ok := obj.(*appsv1.ReplicaSet)
if !ok {
return false
}

return e == castai.EventDelete || replicaSet.Namespace == castwareNamespace ||
(replicaSet.Spec.Replicas != nil && *replicaSet.Spec.Replicas > 0 && replicaSet.Status.Replicas > 0)
},
},
},
},
reflect.TypeOf(&appsv1.DaemonSet{}): {informer: f.Apps().V1().DaemonSets().Informer()},
reflect.TypeOf(&appsv1.StatefulSet{}): {informer: f.Apps().V1().StatefulSets().Informer()},
reflect.TypeOf(&storagev1.StorageClass{}): {informer: f.Storage().V1().StorageClasses().Informer()},
reflect.TypeOf(&batchv1.Job{}): {informer: f.Batch().V1().Jobs().Informer()},
reflect.TypeOf(&appsv1.ReplicaSet{}): {informer: f.Apps().V1().ReplicaSets().Informer()},
reflect.TypeOf(&appsv1.DaemonSet{}): {informer: f.Apps().V1().DaemonSets().Informer()},
reflect.TypeOf(&appsv1.StatefulSet{}): {informer: f.Apps().V1().StatefulSets().Informer()},
reflect.TypeOf(&storagev1.StorageClass{}): {informer: f.Storage().V1().StorageClasses().Informer()},
reflect.TypeOf(&batchv1.Job{}): {informer: f.Batch().V1().Jobs().Informer()},
reflect.TypeOf(&corev1.Service{}): {
informer: f.Core().V1().Services().Informer(),
filters: filters.Filters{
Expand Down
78 changes: 0 additions & 78 deletions internal/services/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"encoding/json"
"fmt"
"reflect"
"strings"
"sync"
"sync/atomic"
Expand All @@ -19,7 +18,6 @@ import (
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/require"
"go.uber.org/goleak"
appsv1 "k8s.io/api/apps/v1"
authorizationv1 "k8s.io/api/authorization/v1"
autoscalingv1 "k8s.io/api/autoscaling/v1"
v1 "k8s.io/api/core/v1"
Expand All @@ -34,7 +32,6 @@ import (
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
dynamic_fake "k8s.io/client-go/dynamic/fake"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
authfakev1 "k8s.io/client-go/kubernetes/typed/authorization/v1/fake"
k8stesting "k8s.io/client-go/testing"
Expand Down Expand Up @@ -220,7 +217,6 @@ func TestController_ShouldReceiveDeltasBasedOnAvailableResources(t *testing.T) {
agentVersion,
NewHealthzProvider(defaultHealthzCfg, log),
fakeSelfSubjectAccessReviewsClient,
"castai-agent",
)

if mockDiscovery != nil {
Expand Down Expand Up @@ -382,7 +378,6 @@ func TestController_ShouldSendByInterval(t *testing.T) {
agentVersion,
NewHealthzProvider(defaultHealthzCfg, log),
clientset.AuthorizationV1().SelfSubjectAccessReviews(),
"castai-agent",
)

ctrl.Start(ctx.Done())
Expand Down Expand Up @@ -531,7 +526,6 @@ func TestController_ShouldKeepDeltaAfterDelete(t *testing.T) {
agentVersion,
NewHealthzProvider(defaultHealthzCfg, log),
clientset.AuthorizationV1().SelfSubjectAccessReviews(),
"castai-agent",
)

ctrl.Start(ctx.Done())
Expand Down Expand Up @@ -1182,77 +1176,6 @@ func loadInitialHappyPathData(t *testing.T, scheme *runtime.Scheme) ([]sampleObj
return objects, clientset, dynamicClient, metricsClient
}

func TestDefaultInformers_MatchFilters(t *testing.T) {
tests := map[string]struct {
obj runtime.Object
eventType castai.EventType
expectedMatch bool
}{
"keep if replicaset in castware namespace": {
obj: &appsv1.ReplicaSet{
ObjectMeta: metav1.ObjectMeta{
Namespace: "castware",
},
},
eventType: castai.EventAdd,
expectedMatch: true,
},
"discard if replicaset has zero replicas": {
obj: &appsv1.ReplicaSet{
ObjectMeta: metav1.ObjectMeta{
Namespace: "test",
},
Spec: appsv1.ReplicaSetSpec{
Replicas: lo.ToPtr(int32(0)),
},
Status: appsv1.ReplicaSetStatus{
Replicas: 0,
},
},
eventType: castai.EventAdd,
expectedMatch: false,
},
"keep if replicaset has more than zero replicas": {
obj: &appsv1.ReplicaSet{
ObjectMeta: metav1.ObjectMeta{
Namespace: "test",
},
Spec: appsv1.ReplicaSetSpec{
Replicas: lo.ToPtr(int32(1)),
},
Status: appsv1.ReplicaSetStatus{
Replicas: 1,
},
},
eventType: castai.EventAdd,
expectedMatch: true,
},
"keep if delete event": {
obj: &appsv1.ReplicaSet{
ObjectMeta: metav1.ObjectMeta{
Namespace: "test",
},
},
eventType: castai.EventDelete,
expectedMatch: true,
},
}

for name, data := range tests {
t.Run(name, func(t *testing.T) {
r := require.New(t)
f := informers.NewSharedInformerFactory(fake.NewSimpleClientset(data.obj), 0)

defaultInformers := getDefaultInformers(f, "castware")
objInformer := defaultInformers[reflect.TypeOf(data.obj)]

match := objInformer.filters.Apply(data.eventType, data.obj)

r.Equal(data.expectedMatch, match)
})
}
}

func TestCollectSingleSnapshot(t *testing.T) {
r := require.New(t)

Expand Down Expand Up @@ -1294,7 +1217,6 @@ func TestCollectSingleSnapshot(t *testing.T) {
PrepTimeout: 10 * time.Second,
},
version,
"",
)
r.NoError(err)
r.NotNil(snapshot)
Expand Down
1 change: 0 additions & 1 deletion internal/services/controller/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ func Loop(
agentVersion,
healthzProvider,
clientset.AuthorizationV1().SelfSubjectAccessReviews(),
cfg.SelfPod.Namespace,
)

ctrl.Start(ctrlCtx.Done())
Expand Down
Loading