From a84c5cc1534bec5a9bc5e027fcebedd3673ac3d3 Mon Sep 17 00:00:00 2001 From: Zbynek Roubalik <726523+zroubalik@users.noreply.github.com> Date: Mon, 28 Jun 2021 14:40:44 +0200 Subject: [PATCH] scale handler chores (#1903) Signed-off-by: Zbynek Roubalik --- api/v1alpha1/withtriggers_types.go | 22 ++++ pkg/scaling/resolver/scale_resolvers.go | 93 +++++++++++++- pkg/scaling/resolver/scale_resolvers_test.go | 2 +- pkg/scaling/scale_handler.go | 127 +++---------------- 4 files changed, 130 insertions(+), 114 deletions(-) diff --git a/api/v1alpha1/withtriggers_types.go b/api/v1alpha1/withtriggers_types.go index 6264af86f8e..34ffa8cc3ef 100644 --- a/api/v1alpha1/withtriggers_types.go +++ b/api/v1alpha1/withtriggers_types.go @@ -1,6 +1,9 @@ package v1alpha1 import ( + "fmt" + "time" + "knative.dev/pkg/apis" "knative.dev/pkg/apis/duck" @@ -8,6 +11,11 @@ import ( runtime "k8s.io/apimachinery/pkg/runtime" ) +const ( + // Default polling interval for a ScaledObject triggers if no pollingInterval is defined. + defaultPollingInterval = 30 +) + // +kubebuilder:object:root=true // WithTriggers is a specification for a resource with triggers @@ -55,3 +63,17 @@ type WithTriggersList struct { metav1.ListMeta `json:"metadata"` Items []WithTriggers `json:"items"` } + +// GetPollingInterval returns defined polling interval, if not set default is being returned +func (t *WithTriggers) GetPollingInterval() time.Duration { + if t.Spec.PollingInterval != nil { + return time.Second * time.Duration(*t.Spec.PollingInterval) + } + + return time.Second * time.Duration(defaultPollingInterval) +} + +// GenerateIdenitifier returns identifier for the object in for "kind.namespace.name" +func (t *WithTriggers) GenerateIdenitifier() string { + return fmt.Sprintf("%s.%s.%s", t.Kind, t.Namespace, t.Name) +} diff --git a/pkg/scaling/resolver/scale_resolvers.go b/pkg/scaling/resolver/scale_resolvers.go index 92109f90d7a..33fb58cd785 100644 --- a/pkg/scaling/resolver/scale_resolvers.go +++ b/pkg/scaling/resolver/scale_resolvers.go @@ -8,8 +8,12 @@ import ( "os" "github.com/go-logr/logr" + appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/types" + "knative.dev/pkg/apis/duck" + duckv1 "knative.dev/pkg/apis/duck/v1" "sigs.k8s.io/controller-runtime/pkg/client" kedav1alpha1 "github.com/kedacore/keda/v2/api/v1alpha1" @@ -21,6 +25,66 @@ const ( referenceCloser = ')' ) +// ResolveScaleTargetPodSpec for given scalableObject inspects the scale target workload, +// which could be almost any k8s resource (Deployment, StatefulSet, CustomResource...) +// and for the given resource returns *corev1.PodTemplateSpec and a name of the container +// which is being used for referencing environment variables +func ResolveScaleTargetPodSpec(kubeClient client.Client, logger logr.Logger, scalableObject interface{}) (*corev1.PodTemplateSpec, string, error) { + switch obj := scalableObject.(type) { + case *kedav1alpha1.ScaledObject: + // Try to get a real object instance for better cache usage, but fall back to an Unstructured if needed. + podTemplateSpec := corev1.PodTemplateSpec{} + gvk := obj.Status.ScaleTargetGVKR.GroupVersionKind() + objKey := client.ObjectKey{Namespace: obj.Namespace, Name: obj.Spec.ScaleTargetRef.Name} + switch { + // For core types, use a typed client so we get an informer-cache-backed Get to reduce API load. + case gvk.Group == "apps" && gvk.Kind == "Deployment": + deployment := &appsv1.Deployment{} + if err := kubeClient.Get(context.TODO(), objKey, deployment); err != nil { + // resource doesn't exist + logger.Error(err, "Target deployment doesn't exist", "resource", gvk.String(), "name", objKey.Name) + return nil, "", err + } + podTemplateSpec.ObjectMeta = deployment.ObjectMeta + podTemplateSpec.Spec = deployment.Spec.Template.Spec + case gvk.Group == "apps" && gvk.Kind == "StatefulSet": + statefulSet := &appsv1.StatefulSet{} + if err := kubeClient.Get(context.TODO(), objKey, statefulSet); err != nil { + // resource doesn't exist + logger.Error(err, "Target deployment doesn't exist", "resource", gvk.String(), "name", objKey.Name) + return nil, "", err + } + podTemplateSpec.ObjectMeta = statefulSet.ObjectMeta + podTemplateSpec.Spec = statefulSet.Spec.Template.Spec + default: + unstruct := &unstructured.Unstructured{} + unstruct.SetGroupVersionKind(gvk) + if err := kubeClient.Get(context.TODO(), objKey, unstruct); err != nil { + // resource doesn't exist + logger.Error(err, "Target resource doesn't exist", "resource", gvk.String(), "name", objKey.Name) + return nil, "", err + } + withPods := &duckv1.WithPod{} + if err := duck.FromUnstructured(unstruct, withPods); err != nil { + logger.Error(err, "Cannot convert Unstructured into PodSpecable Duck-type", "object", unstruct) + } + podTemplateSpec.ObjectMeta = withPods.ObjectMeta + podTemplateSpec.Spec = withPods.Spec.Template.Spec + } + + if podTemplateSpec.Spec.Containers == nil || len(podTemplateSpec.Spec.Containers) == 0 { + logger.V(1).Info("There aren't any containers found in the ScaleTarget, therefore it is no possible to inject environment properties", "resource", gvk.String(), "name", obj.Spec.ScaleTargetRef.Name) + return nil, "", nil + } + + return &podTemplateSpec, obj.Spec.ScaleTargetRef.EnvSourceContainerName, nil + case *kedav1alpha1.ScaledJob: + return &obj.Spec.JobTargetRef.Template, obj.Spec.EnvSourceContainerName, nil + default: + return nil, "", fmt.Errorf("unknown scalable object type %v", scalableObject) + } +} + // ResolveContainerEnv resolves all environment variables in a container. // It returns either map of env variable key and value or error if there is any. func ResolveContainerEnv(client client.Client, logger logr.Logger, podSpec *corev1.PodSpec, containerName, namespace string) (map[string]string, error) { @@ -49,9 +113,32 @@ func ResolveContainerEnv(client client.Client, logger logr.Logger, podSpec *core return resolveEnv(client, logger, &container, namespace) } -// ResolveAuthRef provides authentication parameters needed authenticate scaler with the environment. -// based on authentication method define in TriggerAuthentication, authParams and podIdentity is returned -func ResolveAuthRef(client client.Client, logger logr.Logger, triggerAuthRef *kedav1alpha1.ScaledObjectAuthRef, podSpec *corev1.PodSpec, namespace string) (map[string]string, kedav1alpha1.PodIdentityProvider) { +// ResolveAuthRefAndPodIdentity provides authentication parameters and pod identity needed authenticate scaler with the environment. +func ResolveAuthRefAndPodIdentity(client client.Client, logger logr.Logger, triggerAuthRef *kedav1alpha1.ScaledObjectAuthRef, podTemplateSpec *corev1.PodTemplateSpec, namespace string) (map[string]string, kedav1alpha1.PodIdentityProvider, error) { + if podTemplateSpec != nil { + authParams, podIdentity := resolveAuthRef(client, logger, triggerAuthRef, &podTemplateSpec.Spec, namespace) + + if podIdentity == kedav1alpha1.PodIdentityProviderAwsEKS { + serviceAccountName := podTemplateSpec.Spec.ServiceAccountName + serviceAccount := &corev1.ServiceAccount{} + err := client.Get(context.TODO(), types.NamespacedName{Name: serviceAccountName, Namespace: namespace}, serviceAccount) + if err != nil { + return nil, kedav1alpha1.PodIdentityProviderNone, fmt.Errorf("error getting service account: %s", err) + } + authParams["awsRoleArn"] = serviceAccount.Annotations[kedav1alpha1.PodIdentityAnnotationEKS] + } else if podIdentity == kedav1alpha1.PodIdentityProviderAwsKiam { + authParams["awsRoleArn"] = podTemplateSpec.ObjectMeta.Annotations[kedav1alpha1.PodIdentityAnnotationKiam] + } + return authParams, podIdentity, nil + } + + authParams, _ := resolveAuthRef(client, logger, triggerAuthRef, nil, namespace) + return authParams, kedav1alpha1.PodIdentityProviderNone, nil +} + +// resolveAuthRef provides authentication parameters needed authenticate scaler with the environment. +// based on authentication method defined in TriggerAuthentication, authParams and podIdentity is returned +func resolveAuthRef(client client.Client, logger logr.Logger, triggerAuthRef *kedav1alpha1.ScaledObjectAuthRef, podSpec *corev1.PodSpec, namespace string) (map[string]string, kedav1alpha1.PodIdentityProvider) { result := make(map[string]string) var podIdentity kedav1alpha1.PodIdentityProvider diff --git a/pkg/scaling/resolver/scale_resolvers_test.go b/pkg/scaling/resolver/scale_resolvers_test.go index e77da21e79a..9b870ef14fa 100644 --- a/pkg/scaling/resolver/scale_resolvers_test.go +++ b/pkg/scaling/resolver/scale_resolvers_test.go @@ -312,7 +312,7 @@ func TestResolveAuthRef(t *testing.T) { test := test t.Run(test.name, func(t *testing.T) { clusterObjectNamespaceCache = &clusterNamespace // Inject test cluster namespace. - gotMap, gotPodIdentity := ResolveAuthRef(fake.NewFakeClientWithScheme(scheme.Scheme, test.existing...), logf.Log.WithName("test"), test.soar, test.podSpec, namespace) + gotMap, gotPodIdentity := resolveAuthRef(fake.NewFakeClientWithScheme(scheme.Scheme, test.existing...), logf.Log.WithName("test"), test.soar, test.podSpec, namespace) if diff := cmp.Diff(gotMap, test.expected); diff != "" { t.Errorf("Returned authParams are different: %s", diff) } diff --git a/pkg/scaling/scale_handler.go b/pkg/scaling/scale_handler.go index 0203fd7b3ff..1c1e7fe027d 100644 --- a/pkg/scaling/scale_handler.go +++ b/pkg/scaling/scale_handler.go @@ -10,15 +10,10 @@ import ( "k8s.io/client-go/tools/record" "github.com/go-logr/logr" - appsv1 "k8s.io/api/apps/v1" "k8s.io/api/autoscaling/v2beta2" corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/scale" - "knative.dev/pkg/apis/duck" - duckv1 "knative.dev/pkg/apis/duck/v1" "sigs.k8s.io/controller-runtime/pkg/client" logf "sigs.k8s.io/controller-runtime/pkg/log" @@ -28,11 +23,6 @@ import ( "github.com/kedacore/keda/v2/pkg/scaling/resolver" ) -const ( - // Default polling interval for a ScaledObject triggers if no pollingInterval is defined. - defaultPollingInterval = 30 -) - // ScaleHandler encapsulates the logic of calling the right scalers for // each ScaledObject and making the final scale decision and operation type ScaleHandler interface { @@ -68,7 +58,7 @@ func (h *scaleHandler) GetScalers(scalableObject interface{}) ([]scalers.Scaler, return nil, err } - podTemplateSpec, containerName, err := h.getPods(scalableObject) + podTemplateSpec, containerName, err := resolver.ResolveScaleTargetPodSpec(h.client, h.logger, scalableObject) if err != nil { return nil, err } @@ -83,8 +73,7 @@ func (h *scaleHandler) HandleScalableObject(scalableObject interface{}) error { return err } - key := generateKey(withTriggers) - + key := withTriggers.GenerateIdenitifier() ctx, cancel := context.WithCancel(context.TODO()) // cancel the outdated ScaleLoop for the same ScaledObject (if exists) @@ -121,8 +110,7 @@ func (h *scaleHandler) DeleteScalableObject(scalableObject interface{}) error { return err } - key := generateKey(withTriggers) - + key := withTriggers.GenerateIdenitifier() result, ok := h.scaleLoopContexts.Load(key) if ok { cancel, ok := result.(context.CancelFunc) @@ -145,7 +133,7 @@ func (h *scaleHandler) startScaleLoop(ctx context.Context, withTriggers *kedav1a // kick off one check to the scalers now h.checkScalers(ctx, scalableObject, scalingMutex) - pollingInterval := getPollingInterval(withTriggers) + pollingInterval := withTriggers.GetPollingInterval() logger.V(1).Info("Watching with pollingInterval", "PollingInterval", pollingInterval) for { @@ -214,15 +202,15 @@ func (h *scaleHandler) checkScalers(ctx context.Context, scalableObject interfac defer scalingMutex.Unlock() switch obj := scalableObject.(type) { case *kedav1alpha1.ScaledObject: - h.scaleExecutor.RequestScale(ctx, obj, h.checkScaledObjectScalers(ctx, scalers, obj)) + h.scaleExecutor.RequestScale(ctx, obj, h.isScaledObjectActive(ctx, scalers, obj)) case *kedav1alpha1.ScaledJob: scaledJob := scalableObject.(*kedav1alpha1.ScaledJob) - isActive, scaleTo, maxScale := h.checkScaledJobScalers(ctx, scalers, scaledJob) + isActive, scaleTo, maxScale := h.isScaledJobActive(ctx, scalers, scaledJob) h.scaleExecutor.RequestJobScale(ctx, obj, isActive, scaleTo, maxScale) } } -func (h *scaleHandler) checkScaledObjectScalers(ctx context.Context, scalers []scalers.Scaler, scaledObject *kedav1alpha1.ScaledObject) bool { +func (h *scaleHandler) isScaledObjectActive(ctx context.Context, scalers []scalers.Scaler, scaledObject *kedav1alpha1.ScaledObject) bool { isActive := false for i, scaler := range scalers { isTriggerActive, err := scaler.IsActive(ctx) @@ -247,12 +235,14 @@ func (h *scaleHandler) checkScaledObjectScalers(ctx context.Context, scalers []s return isActive } -func (h *scaleHandler) checkScaledJobScalers(ctx context.Context, scalers []scalers.Scaler, scaledJob *kedav1alpha1.ScaledJob) (bool, int64, int64) { +func (h *scaleHandler) isScaledJobActive(ctx context.Context, scalers []scalers.Scaler, scaledJob *kedav1alpha1.ScaledJob) (bool, int64, int64) { var queueLength int64 var targetAverageValue int64 var maxValue int64 isActive := false + // TODO refactor this, do chores, reduce the verbosity ie: V(1) and frequency of logs + // move relevant funcs getTargetAverageValue(), min() and divideWithCeil() out of scaler_handler.go for _, scaler := range scalers { scalerLogger := h.logger.WithValues("Scaler", scaler) @@ -295,7 +285,7 @@ func (h *scaleHandler) checkScaledJobScalers(ctx context.Context, scalers []scal } } if targetAverageValue != 0 { - maxValue = min(scaledJob.MaxReplicaCount(), devideWithCeil(queueLength, targetAverageValue)) + maxValue = min(scaledJob.MaxReplicaCount(), divideWithCeil(queueLength, targetAverageValue)) } h.logger.Info("Scaler maxValue", "maxValue", maxValue) return isActive, queueLength, maxValue @@ -324,7 +314,7 @@ func getTargetAverageValue(metricSpecs []v2beta2.MetricSpec) int64 { return 0 } -func devideWithCeil(x, y int64) int64 { +func divideWithCeil(x, y int64) int64 { ans := x / y reminder := x % y if reminder != 0 { @@ -363,26 +353,11 @@ func (h *scaleHandler) buildScalers(withTriggers *kedav1alpha1.WithTriggers, pod AuthParams: make(map[string]string), GlobalHTTPTimeout: h.globalHTTPTimeout, } - if podTemplateSpec != nil { - authParams, podIdentity := resolver.ResolveAuthRef(h.client, logger, trigger.AuthenticationRef, &podTemplateSpec.Spec, withTriggers.Namespace) - - if podIdentity == kedav1alpha1.PodIdentityProviderAwsEKS { - serviceAccountName := podTemplateSpec.Spec.ServiceAccountName - serviceAccount := &corev1.ServiceAccount{} - err = h.client.Get(context.TODO(), types.NamespacedName{Name: serviceAccountName, Namespace: withTriggers.Namespace}, serviceAccount) - if err != nil { - closeScalers(scalersRes) - return []scalers.Scaler{}, fmt.Errorf("error getting service account: %s", err) - } - authParams["awsRoleArn"] = serviceAccount.Annotations[kedav1alpha1.PodIdentityAnnotationEKS] - } else if podIdentity == kedav1alpha1.PodIdentityProviderAwsKiam { - authParams["awsRoleArn"] = podTemplateSpec.ObjectMeta.Annotations[kedav1alpha1.PodIdentityAnnotationKiam] - } - config.AuthParams = authParams - config.PodIdentity = podIdentity - } else { - authParams, _ := resolver.ResolveAuthRef(h.client, logger, trigger.AuthenticationRef, nil, withTriggers.Namespace) - config.AuthParams = authParams + + config.AuthParams, config.PodIdentity, err = resolver.ResolveAuthRefAndPodIdentity(h.client, logger, trigger.AuthenticationRef, podTemplateSpec, withTriggers.Namespace) + if err != nil { + closeScalers(scalersRes) + return []scalers.Scaler{}, err } scaler, err := buildScaler(trigger.Type, config) @@ -398,62 +373,6 @@ func (h *scaleHandler) buildScalers(withTriggers *kedav1alpha1.WithTriggers, pod return scalersRes, nil } -func (h *scaleHandler) getPods(scalableObject interface{}) (*corev1.PodTemplateSpec, string, error) { - switch obj := scalableObject.(type) { - case *kedav1alpha1.ScaledObject: - // Try to get a real object instance for better cache usage, but fall back to an Unstructured if needed. - podTemplateSpec := corev1.PodTemplateSpec{} - gvk := obj.Status.ScaleTargetGVKR.GroupVersionKind() - objKey := client.ObjectKey{Namespace: obj.Namespace, Name: obj.Spec.ScaleTargetRef.Name} - switch { - // For core types, use a typed client so we get an informer-cache-backed Get to reduce API load. - case gvk.Group == "apps" && gvk.Kind == "Deployment": - deployment := &appsv1.Deployment{} - if err := h.client.Get(context.TODO(), objKey, deployment); err != nil { - // resource doesn't exist - h.logger.Error(err, "Target deployment doesn't exist", "resource", gvk.String(), "name", objKey.Name) - return nil, "", err - } - podTemplateSpec.ObjectMeta = deployment.ObjectMeta - podTemplateSpec.Spec = deployment.Spec.Template.Spec - case gvk.Group == "apps" && gvk.Kind == "StatefulSet": - statefulSet := &appsv1.StatefulSet{} - if err := h.client.Get(context.TODO(), objKey, statefulSet); err != nil { - // resource doesn't exist - h.logger.Error(err, "Target deployment doesn't exist", "resource", gvk.String(), "name", objKey.Name) - return nil, "", err - } - podTemplateSpec.ObjectMeta = statefulSet.ObjectMeta - podTemplateSpec.Spec = statefulSet.Spec.Template.Spec - default: - unstruct := &unstructured.Unstructured{} - unstruct.SetGroupVersionKind(gvk) - if err := h.client.Get(context.TODO(), objKey, unstruct); err != nil { - // resource doesn't exist - h.logger.Error(err, "Target resource doesn't exist", "resource", gvk.String(), "name", objKey.Name) - return nil, "", err - } - withPods := &duckv1.WithPod{} - if err := duck.FromUnstructured(unstruct, withPods); err != nil { - h.logger.Error(err, "Cannot convert Unstructured into PodSpecable Duck-type", "object", unstruct) - } - podTemplateSpec.ObjectMeta = withPods.ObjectMeta - podTemplateSpec.Spec = withPods.Spec.Template.Spec - } - - if podTemplateSpec.Spec.Containers == nil || len(podTemplateSpec.Spec.Containers) == 0 { - h.logger.V(1).Info("There aren't any containers found in the ScaleTarget, therefore it is no possible to inject environment properties", "resource", gvk.String(), "name", obj.Spec.ScaleTargetRef.Name) - return nil, "", nil - } - - return &podTemplateSpec, obj.Spec.ScaleTargetRef.EnvSourceContainerName, nil - case *kedav1alpha1.ScaledJob: - return &obj.Spec.JobTargetRef.Template, obj.Spec.EnvSourceContainerName, nil - default: - return nil, "", fmt.Errorf("unknown scalable object type %v", scalableObject) - } -} - func buildScaler(triggerType string, config *scalers.ScalerConfig) (scalers.Scaler, error) { // TRIGGERS-START switch triggerType { @@ -566,15 +485,3 @@ func closeScalers(scalers []scalers.Scaler) { defer scaler.Close() } } - -func getPollingInterval(withTriggers *kedav1alpha1.WithTriggers) time.Duration { - if withTriggers.Spec.PollingInterval != nil { - return time.Second * time.Duration(*withTriggers.Spec.PollingInterval) - } - - return time.Second * time.Duration(defaultPollingInterval) -} - -func generateKey(scalableObject *kedav1alpha1.WithTriggers) string { - return fmt.Sprintf("%s.%s.%s", scalableObject.Kind, scalableObject.Namespace, scalableObject.Name) -}