Skip to content

Commit

Permalink
scale handler chores (#1903)
Browse files Browse the repository at this point in the history
Signed-off-by: Zbynek Roubalik <[email protected]>
  • Loading branch information
zroubalik authored Jun 28, 2021
1 parent 0afb18f commit a84c5cc
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 114 deletions.
22 changes: 22 additions & 0 deletions api/v1alpha1/withtriggers_types.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,21 @@
package v1alpha1

import (
"fmt"
"time"

"knative.dev/pkg/apis"
"knative.dev/pkg/apis/duck"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
runtime "k8s.io/apimachinery/pkg/runtime"
)

const (
// Default polling interval for a ScaledObject triggers if no pollingInterval is defined.
defaultPollingInterval = 30
)

// +kubebuilder:object:root=true

// WithTriggers is a specification for a resource with triggers
Expand Down Expand Up @@ -55,3 +63,17 @@ type WithTriggersList struct {
metav1.ListMeta `json:"metadata"`
Items []WithTriggers `json:"items"`
}

// GetPollingInterval returns defined polling interval, if not set default is being returned
func (t *WithTriggers) GetPollingInterval() time.Duration {
if t.Spec.PollingInterval != nil {
return time.Second * time.Duration(*t.Spec.PollingInterval)
}

return time.Second * time.Duration(defaultPollingInterval)
}

// GenerateIdenitifier returns identifier for the object in for "kind.namespace.name"
func (t *WithTriggers) GenerateIdenitifier() string {
return fmt.Sprintf("%s.%s.%s", t.Kind, t.Namespace, t.Name)
}
93 changes: 90 additions & 3 deletions pkg/scaling/resolver/scale_resolvers.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,12 @@ import (
"os"

"github.com/go-logr/logr"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/types"
"knative.dev/pkg/apis/duck"
duckv1 "knative.dev/pkg/apis/duck/v1"
"sigs.k8s.io/controller-runtime/pkg/client"

kedav1alpha1 "github.com/kedacore/keda/v2/api/v1alpha1"
Expand All @@ -21,6 +25,66 @@ const (
referenceCloser = ')'
)

// ResolveScaleTargetPodSpec for given scalableObject inspects the scale target workload,
// which could be almost any k8s resource (Deployment, StatefulSet, CustomResource...)
// and for the given resource returns *corev1.PodTemplateSpec and a name of the container
// which is being used for referencing environment variables
func ResolveScaleTargetPodSpec(kubeClient client.Client, logger logr.Logger, scalableObject interface{}) (*corev1.PodTemplateSpec, string, error) {
switch obj := scalableObject.(type) {
case *kedav1alpha1.ScaledObject:
// Try to get a real object instance for better cache usage, but fall back to an Unstructured if needed.
podTemplateSpec := corev1.PodTemplateSpec{}
gvk := obj.Status.ScaleTargetGVKR.GroupVersionKind()
objKey := client.ObjectKey{Namespace: obj.Namespace, Name: obj.Spec.ScaleTargetRef.Name}
switch {
// For core types, use a typed client so we get an informer-cache-backed Get to reduce API load.
case gvk.Group == "apps" && gvk.Kind == "Deployment":
deployment := &appsv1.Deployment{}
if err := kubeClient.Get(context.TODO(), objKey, deployment); err != nil {
// resource doesn't exist
logger.Error(err, "Target deployment doesn't exist", "resource", gvk.String(), "name", objKey.Name)
return nil, "", err
}
podTemplateSpec.ObjectMeta = deployment.ObjectMeta
podTemplateSpec.Spec = deployment.Spec.Template.Spec
case gvk.Group == "apps" && gvk.Kind == "StatefulSet":
statefulSet := &appsv1.StatefulSet{}
if err := kubeClient.Get(context.TODO(), objKey, statefulSet); err != nil {
// resource doesn't exist
logger.Error(err, "Target deployment doesn't exist", "resource", gvk.String(), "name", objKey.Name)
return nil, "", err
}
podTemplateSpec.ObjectMeta = statefulSet.ObjectMeta
podTemplateSpec.Spec = statefulSet.Spec.Template.Spec
default:
unstruct := &unstructured.Unstructured{}
unstruct.SetGroupVersionKind(gvk)
if err := kubeClient.Get(context.TODO(), objKey, unstruct); err != nil {
// resource doesn't exist
logger.Error(err, "Target resource doesn't exist", "resource", gvk.String(), "name", objKey.Name)
return nil, "", err
}
withPods := &duckv1.WithPod{}
if err := duck.FromUnstructured(unstruct, withPods); err != nil {
logger.Error(err, "Cannot convert Unstructured into PodSpecable Duck-type", "object", unstruct)
}
podTemplateSpec.ObjectMeta = withPods.ObjectMeta
podTemplateSpec.Spec = withPods.Spec.Template.Spec
}

if podTemplateSpec.Spec.Containers == nil || len(podTemplateSpec.Spec.Containers) == 0 {
logger.V(1).Info("There aren't any containers found in the ScaleTarget, therefore it is no possible to inject environment properties", "resource", gvk.String(), "name", obj.Spec.ScaleTargetRef.Name)
return nil, "", nil
}

return &podTemplateSpec, obj.Spec.ScaleTargetRef.EnvSourceContainerName, nil
case *kedav1alpha1.ScaledJob:
return &obj.Spec.JobTargetRef.Template, obj.Spec.EnvSourceContainerName, nil
default:
return nil, "", fmt.Errorf("unknown scalable object type %v", scalableObject)
}
}

// ResolveContainerEnv resolves all environment variables in a container.
// It returns either map of env variable key and value or error if there is any.
func ResolveContainerEnv(client client.Client, logger logr.Logger, podSpec *corev1.PodSpec, containerName, namespace string) (map[string]string, error) {
Expand Down Expand Up @@ -49,9 +113,32 @@ func ResolveContainerEnv(client client.Client, logger logr.Logger, podSpec *core
return resolveEnv(client, logger, &container, namespace)
}

// ResolveAuthRef provides authentication parameters needed authenticate scaler with the environment.
// based on authentication method define in TriggerAuthentication, authParams and podIdentity is returned
func ResolveAuthRef(client client.Client, logger logr.Logger, triggerAuthRef *kedav1alpha1.ScaledObjectAuthRef, podSpec *corev1.PodSpec, namespace string) (map[string]string, kedav1alpha1.PodIdentityProvider) {
// ResolveAuthRefAndPodIdentity provides authentication parameters and pod identity needed authenticate scaler with the environment.
func ResolveAuthRefAndPodIdentity(client client.Client, logger logr.Logger, triggerAuthRef *kedav1alpha1.ScaledObjectAuthRef, podTemplateSpec *corev1.PodTemplateSpec, namespace string) (map[string]string, kedav1alpha1.PodIdentityProvider, error) {
if podTemplateSpec != nil {
authParams, podIdentity := resolveAuthRef(client, logger, triggerAuthRef, &podTemplateSpec.Spec, namespace)

if podIdentity == kedav1alpha1.PodIdentityProviderAwsEKS {
serviceAccountName := podTemplateSpec.Spec.ServiceAccountName
serviceAccount := &corev1.ServiceAccount{}
err := client.Get(context.TODO(), types.NamespacedName{Name: serviceAccountName, Namespace: namespace}, serviceAccount)
if err != nil {
return nil, kedav1alpha1.PodIdentityProviderNone, fmt.Errorf("error getting service account: %s", err)
}
authParams["awsRoleArn"] = serviceAccount.Annotations[kedav1alpha1.PodIdentityAnnotationEKS]
} else if podIdentity == kedav1alpha1.PodIdentityProviderAwsKiam {
authParams["awsRoleArn"] = podTemplateSpec.ObjectMeta.Annotations[kedav1alpha1.PodIdentityAnnotationKiam]
}
return authParams, podIdentity, nil
}

authParams, _ := resolveAuthRef(client, logger, triggerAuthRef, nil, namespace)
return authParams, kedav1alpha1.PodIdentityProviderNone, nil
}

// resolveAuthRef provides authentication parameters needed authenticate scaler with the environment.
// based on authentication method defined in TriggerAuthentication, authParams and podIdentity is returned
func resolveAuthRef(client client.Client, logger logr.Logger, triggerAuthRef *kedav1alpha1.ScaledObjectAuthRef, podSpec *corev1.PodSpec, namespace string) (map[string]string, kedav1alpha1.PodIdentityProvider) {
result := make(map[string]string)
var podIdentity kedav1alpha1.PodIdentityProvider

Expand Down
2 changes: 1 addition & 1 deletion pkg/scaling/resolver/scale_resolvers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ func TestResolveAuthRef(t *testing.T) {
test := test
t.Run(test.name, func(t *testing.T) {
clusterObjectNamespaceCache = &clusterNamespace // Inject test cluster namespace.
gotMap, gotPodIdentity := ResolveAuthRef(fake.NewFakeClientWithScheme(scheme.Scheme, test.existing...), logf.Log.WithName("test"), test.soar, test.podSpec, namespace)
gotMap, gotPodIdentity := resolveAuthRef(fake.NewFakeClientWithScheme(scheme.Scheme, test.existing...), logf.Log.WithName("test"), test.soar, test.podSpec, namespace)
if diff := cmp.Diff(gotMap, test.expected); diff != "" {
t.Errorf("Returned authParams are different: %s", diff)
}
Expand Down
127 changes: 17 additions & 110 deletions pkg/scaling/scale_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,10 @@ import (
"k8s.io/client-go/tools/record"

"github.com/go-logr/logr"
appsv1 "k8s.io/api/apps/v1"
"k8s.io/api/autoscaling/v2beta2"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/scale"
"knative.dev/pkg/apis/duck"
duckv1 "knative.dev/pkg/apis/duck/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
logf "sigs.k8s.io/controller-runtime/pkg/log"

Expand All @@ -28,11 +23,6 @@ import (
"github.com/kedacore/keda/v2/pkg/scaling/resolver"
)

const (
// Default polling interval for a ScaledObject triggers if no pollingInterval is defined.
defaultPollingInterval = 30
)

// ScaleHandler encapsulates the logic of calling the right scalers for
// each ScaledObject and making the final scale decision and operation
type ScaleHandler interface {
Expand Down Expand Up @@ -68,7 +58,7 @@ func (h *scaleHandler) GetScalers(scalableObject interface{}) ([]scalers.Scaler,
return nil, err
}

podTemplateSpec, containerName, err := h.getPods(scalableObject)
podTemplateSpec, containerName, err := resolver.ResolveScaleTargetPodSpec(h.client, h.logger, scalableObject)
if err != nil {
return nil, err
}
Expand All @@ -83,8 +73,7 @@ func (h *scaleHandler) HandleScalableObject(scalableObject interface{}) error {
return err
}

key := generateKey(withTriggers)

key := withTriggers.GenerateIdenitifier()
ctx, cancel := context.WithCancel(context.TODO())

// cancel the outdated ScaleLoop for the same ScaledObject (if exists)
Expand Down Expand Up @@ -121,8 +110,7 @@ func (h *scaleHandler) DeleteScalableObject(scalableObject interface{}) error {
return err
}

key := generateKey(withTriggers)

key := withTriggers.GenerateIdenitifier()
result, ok := h.scaleLoopContexts.Load(key)
if ok {
cancel, ok := result.(context.CancelFunc)
Expand All @@ -145,7 +133,7 @@ func (h *scaleHandler) startScaleLoop(ctx context.Context, withTriggers *kedav1a
// kick off one check to the scalers now
h.checkScalers(ctx, scalableObject, scalingMutex)

pollingInterval := getPollingInterval(withTriggers)
pollingInterval := withTriggers.GetPollingInterval()
logger.V(1).Info("Watching with pollingInterval", "PollingInterval", pollingInterval)

for {
Expand Down Expand Up @@ -214,15 +202,15 @@ func (h *scaleHandler) checkScalers(ctx context.Context, scalableObject interfac
defer scalingMutex.Unlock()
switch obj := scalableObject.(type) {
case *kedav1alpha1.ScaledObject:
h.scaleExecutor.RequestScale(ctx, obj, h.checkScaledObjectScalers(ctx, scalers, obj))
h.scaleExecutor.RequestScale(ctx, obj, h.isScaledObjectActive(ctx, scalers, obj))
case *kedav1alpha1.ScaledJob:
scaledJob := scalableObject.(*kedav1alpha1.ScaledJob)
isActive, scaleTo, maxScale := h.checkScaledJobScalers(ctx, scalers, scaledJob)
isActive, scaleTo, maxScale := h.isScaledJobActive(ctx, scalers, scaledJob)
h.scaleExecutor.RequestJobScale(ctx, obj, isActive, scaleTo, maxScale)
}
}

func (h *scaleHandler) checkScaledObjectScalers(ctx context.Context, scalers []scalers.Scaler, scaledObject *kedav1alpha1.ScaledObject) bool {
func (h *scaleHandler) isScaledObjectActive(ctx context.Context, scalers []scalers.Scaler, scaledObject *kedav1alpha1.ScaledObject) bool {
isActive := false
for i, scaler := range scalers {
isTriggerActive, err := scaler.IsActive(ctx)
Expand All @@ -247,12 +235,14 @@ func (h *scaleHandler) checkScaledObjectScalers(ctx context.Context, scalers []s
return isActive
}

func (h *scaleHandler) checkScaledJobScalers(ctx context.Context, scalers []scalers.Scaler, scaledJob *kedav1alpha1.ScaledJob) (bool, int64, int64) {
func (h *scaleHandler) isScaledJobActive(ctx context.Context, scalers []scalers.Scaler, scaledJob *kedav1alpha1.ScaledJob) (bool, int64, int64) {
var queueLength int64
var targetAverageValue int64
var maxValue int64
isActive := false

// TODO refactor this, do chores, reduce the verbosity ie: V(1) and frequency of logs
// move relevant funcs getTargetAverageValue(), min() and divideWithCeil() out of scaler_handler.go
for _, scaler := range scalers {
scalerLogger := h.logger.WithValues("Scaler", scaler)

Expand Down Expand Up @@ -295,7 +285,7 @@ func (h *scaleHandler) checkScaledJobScalers(ctx context.Context, scalers []scal
}
}
if targetAverageValue != 0 {
maxValue = min(scaledJob.MaxReplicaCount(), devideWithCeil(queueLength, targetAverageValue))
maxValue = min(scaledJob.MaxReplicaCount(), divideWithCeil(queueLength, targetAverageValue))
}
h.logger.Info("Scaler maxValue", "maxValue", maxValue)
return isActive, queueLength, maxValue
Expand Down Expand Up @@ -324,7 +314,7 @@ func getTargetAverageValue(metricSpecs []v2beta2.MetricSpec) int64 {
return 0
}

func devideWithCeil(x, y int64) int64 {
func divideWithCeil(x, y int64) int64 {
ans := x / y
reminder := x % y
if reminder != 0 {
Expand Down Expand Up @@ -363,26 +353,11 @@ func (h *scaleHandler) buildScalers(withTriggers *kedav1alpha1.WithTriggers, pod
AuthParams: make(map[string]string),
GlobalHTTPTimeout: h.globalHTTPTimeout,
}
if podTemplateSpec != nil {
authParams, podIdentity := resolver.ResolveAuthRef(h.client, logger, trigger.AuthenticationRef, &podTemplateSpec.Spec, withTriggers.Namespace)

if podIdentity == kedav1alpha1.PodIdentityProviderAwsEKS {
serviceAccountName := podTemplateSpec.Spec.ServiceAccountName
serviceAccount := &corev1.ServiceAccount{}
err = h.client.Get(context.TODO(), types.NamespacedName{Name: serviceAccountName, Namespace: withTriggers.Namespace}, serviceAccount)
if err != nil {
closeScalers(scalersRes)
return []scalers.Scaler{}, fmt.Errorf("error getting service account: %s", err)
}
authParams["awsRoleArn"] = serviceAccount.Annotations[kedav1alpha1.PodIdentityAnnotationEKS]
} else if podIdentity == kedav1alpha1.PodIdentityProviderAwsKiam {
authParams["awsRoleArn"] = podTemplateSpec.ObjectMeta.Annotations[kedav1alpha1.PodIdentityAnnotationKiam]
}
config.AuthParams = authParams
config.PodIdentity = podIdentity
} else {
authParams, _ := resolver.ResolveAuthRef(h.client, logger, trigger.AuthenticationRef, nil, withTriggers.Namespace)
config.AuthParams = authParams

config.AuthParams, config.PodIdentity, err = resolver.ResolveAuthRefAndPodIdentity(h.client, logger, trigger.AuthenticationRef, podTemplateSpec, withTriggers.Namespace)
if err != nil {
closeScalers(scalersRes)
return []scalers.Scaler{}, err
}

scaler, err := buildScaler(trigger.Type, config)
Expand All @@ -398,62 +373,6 @@ func (h *scaleHandler) buildScalers(withTriggers *kedav1alpha1.WithTriggers, pod
return scalersRes, nil
}

func (h *scaleHandler) getPods(scalableObject interface{}) (*corev1.PodTemplateSpec, string, error) {
switch obj := scalableObject.(type) {
case *kedav1alpha1.ScaledObject:
// Try to get a real object instance for better cache usage, but fall back to an Unstructured if needed.
podTemplateSpec := corev1.PodTemplateSpec{}
gvk := obj.Status.ScaleTargetGVKR.GroupVersionKind()
objKey := client.ObjectKey{Namespace: obj.Namespace, Name: obj.Spec.ScaleTargetRef.Name}
switch {
// For core types, use a typed client so we get an informer-cache-backed Get to reduce API load.
case gvk.Group == "apps" && gvk.Kind == "Deployment":
deployment := &appsv1.Deployment{}
if err := h.client.Get(context.TODO(), objKey, deployment); err != nil {
// resource doesn't exist
h.logger.Error(err, "Target deployment doesn't exist", "resource", gvk.String(), "name", objKey.Name)
return nil, "", err
}
podTemplateSpec.ObjectMeta = deployment.ObjectMeta
podTemplateSpec.Spec = deployment.Spec.Template.Spec
case gvk.Group == "apps" && gvk.Kind == "StatefulSet":
statefulSet := &appsv1.StatefulSet{}
if err := h.client.Get(context.TODO(), objKey, statefulSet); err != nil {
// resource doesn't exist
h.logger.Error(err, "Target deployment doesn't exist", "resource", gvk.String(), "name", objKey.Name)
return nil, "", err
}
podTemplateSpec.ObjectMeta = statefulSet.ObjectMeta
podTemplateSpec.Spec = statefulSet.Spec.Template.Spec
default:
unstruct := &unstructured.Unstructured{}
unstruct.SetGroupVersionKind(gvk)
if err := h.client.Get(context.TODO(), objKey, unstruct); err != nil {
// resource doesn't exist
h.logger.Error(err, "Target resource doesn't exist", "resource", gvk.String(), "name", objKey.Name)
return nil, "", err
}
withPods := &duckv1.WithPod{}
if err := duck.FromUnstructured(unstruct, withPods); err != nil {
h.logger.Error(err, "Cannot convert Unstructured into PodSpecable Duck-type", "object", unstruct)
}
podTemplateSpec.ObjectMeta = withPods.ObjectMeta
podTemplateSpec.Spec = withPods.Spec.Template.Spec
}

if podTemplateSpec.Spec.Containers == nil || len(podTemplateSpec.Spec.Containers) == 0 {
h.logger.V(1).Info("There aren't any containers found in the ScaleTarget, therefore it is no possible to inject environment properties", "resource", gvk.String(), "name", obj.Spec.ScaleTargetRef.Name)
return nil, "", nil
}

return &podTemplateSpec, obj.Spec.ScaleTargetRef.EnvSourceContainerName, nil
case *kedav1alpha1.ScaledJob:
return &obj.Spec.JobTargetRef.Template, obj.Spec.EnvSourceContainerName, nil
default:
return nil, "", fmt.Errorf("unknown scalable object type %v", scalableObject)
}
}

func buildScaler(triggerType string, config *scalers.ScalerConfig) (scalers.Scaler, error) {
// TRIGGERS-START
switch triggerType {
Expand Down Expand Up @@ -566,15 +485,3 @@ func closeScalers(scalers []scalers.Scaler) {
defer scaler.Close()
}
}

func getPollingInterval(withTriggers *kedav1alpha1.WithTriggers) time.Duration {
if withTriggers.Spec.PollingInterval != nil {
return time.Second * time.Duration(*withTriggers.Spec.PollingInterval)
}

return time.Second * time.Duration(defaultPollingInterval)
}

func generateKey(scalableObject *kedav1alpha1.WithTriggers) string {
return fmt.Sprintf("%s.%s.%s", scalableObject.Kind, scalableObject.Namespace, scalableObject.Name)
}

0 comments on commit a84c5cc

Please sign in to comment.