Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

scale handler: minor chores #1903

Merged
merged 1 commit into from
Jun 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions api/v1alpha1/withtriggers_types.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,21 @@
package v1alpha1

import (
"fmt"
"time"

"knative.dev/pkg/apis"
"knative.dev/pkg/apis/duck"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
runtime "k8s.io/apimachinery/pkg/runtime"
)

const (
// Default polling interval for a ScaledObject triggers if no pollingInterval is defined.
defaultPollingInterval = 30
)

// +kubebuilder:object:root=true

// WithTriggers is a specification for a resource with triggers
Expand Down Expand Up @@ -55,3 +63,17 @@ type WithTriggersList struct {
metav1.ListMeta `json:"metadata"`
Items []WithTriggers `json:"items"`
}

// GetPollingInterval returns defined polling interval, if not set default is being returned
func (t *WithTriggers) GetPollingInterval() time.Duration {
if t.Spec.PollingInterval != nil {
return time.Second * time.Duration(*t.Spec.PollingInterval)
}

return time.Second * time.Duration(defaultPollingInterval)
}

// GenerateIdenitifier returns identifier for the object in for "kind.namespace.name"
func (t *WithTriggers) GenerateIdenitifier() string {
return fmt.Sprintf("%s.%s.%s", t.Kind, t.Namespace, t.Name)
}
93 changes: 90 additions & 3 deletions pkg/scaling/resolver/scale_resolvers.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,12 @@ import (
"os"

"github.com/go-logr/logr"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/types"
"knative.dev/pkg/apis/duck"
duckv1 "knative.dev/pkg/apis/duck/v1"
"sigs.k8s.io/controller-runtime/pkg/client"

kedav1alpha1 "github.com/kedacore/keda/v2/api/v1alpha1"
Expand All @@ -21,6 +25,66 @@ const (
referenceCloser = ')'
)

// ResolveScaleTargetPodSpec for given scalableObject inspects the scale target workload,
// which could be almost any k8s resource (Deployment, StatefulSet, CustomResource...)
// and for the given resource returns *corev1.PodTemplateSpec and a name of the container
// which is being used for referencing environment variables
func ResolveScaleTargetPodSpec(kubeClient client.Client, logger logr.Logger, scalableObject interface{}) (*corev1.PodTemplateSpec, string, error) {
switch obj := scalableObject.(type) {
case *kedav1alpha1.ScaledObject:
// Try to get a real object instance for better cache usage, but fall back to an Unstructured if needed.
podTemplateSpec := corev1.PodTemplateSpec{}
gvk := obj.Status.ScaleTargetGVKR.GroupVersionKind()
objKey := client.ObjectKey{Namespace: obj.Namespace, Name: obj.Spec.ScaleTargetRef.Name}
switch {
// For core types, use a typed client so we get an informer-cache-backed Get to reduce API load.
case gvk.Group == "apps" && gvk.Kind == "Deployment":
deployment := &appsv1.Deployment{}
if err := kubeClient.Get(context.TODO(), objKey, deployment); err != nil {
// resource doesn't exist
logger.Error(err, "Target deployment doesn't exist", "resource", gvk.String(), "name", objKey.Name)
return nil, "", err
}
podTemplateSpec.ObjectMeta = deployment.ObjectMeta
podTemplateSpec.Spec = deployment.Spec.Template.Spec
case gvk.Group == "apps" && gvk.Kind == "StatefulSet":
statefulSet := &appsv1.StatefulSet{}
if err := kubeClient.Get(context.TODO(), objKey, statefulSet); err != nil {
// resource doesn't exist
logger.Error(err, "Target deployment doesn't exist", "resource", gvk.String(), "name", objKey.Name)
return nil, "", err
}
podTemplateSpec.ObjectMeta = statefulSet.ObjectMeta
podTemplateSpec.Spec = statefulSet.Spec.Template.Spec
default:
unstruct := &unstructured.Unstructured{}
unstruct.SetGroupVersionKind(gvk)
if err := kubeClient.Get(context.TODO(), objKey, unstruct); err != nil {
// resource doesn't exist
logger.Error(err, "Target resource doesn't exist", "resource", gvk.String(), "name", objKey.Name)
return nil, "", err
}
withPods := &duckv1.WithPod{}
if err := duck.FromUnstructured(unstruct, withPods); err != nil {
logger.Error(err, "Cannot convert Unstructured into PodSpecable Duck-type", "object", unstruct)
}
podTemplateSpec.ObjectMeta = withPods.ObjectMeta
podTemplateSpec.Spec = withPods.Spec.Template.Spec
}

if podTemplateSpec.Spec.Containers == nil || len(podTemplateSpec.Spec.Containers) == 0 {
logger.V(1).Info("There aren't any containers found in the ScaleTarget, therefore it is no possible to inject environment properties", "resource", gvk.String(), "name", obj.Spec.ScaleTargetRef.Name)
return nil, "", nil
}

return &podTemplateSpec, obj.Spec.ScaleTargetRef.EnvSourceContainerName, nil
case *kedav1alpha1.ScaledJob:
return &obj.Spec.JobTargetRef.Template, obj.Spec.EnvSourceContainerName, nil
default:
return nil, "", fmt.Errorf("unknown scalable object type %v", scalableObject)
}
}

// ResolveContainerEnv resolves all environment variables in a container.
// It returns either map of env variable key and value or error if there is any.
func ResolveContainerEnv(client client.Client, logger logr.Logger, podSpec *corev1.PodSpec, containerName, namespace string) (map[string]string, error) {
Expand Down Expand Up @@ -49,9 +113,32 @@ func ResolveContainerEnv(client client.Client, logger logr.Logger, podSpec *core
return resolveEnv(client, logger, &container, namespace)
}

// ResolveAuthRef provides authentication parameters needed authenticate scaler with the environment.
// based on authentication method define in TriggerAuthentication, authParams and podIdentity is returned
func ResolveAuthRef(client client.Client, logger logr.Logger, triggerAuthRef *kedav1alpha1.ScaledObjectAuthRef, podSpec *corev1.PodSpec, namespace string) (map[string]string, kedav1alpha1.PodIdentityProvider) {
// ResolveAuthRefAndPodIdentity provides authentication parameters and pod identity needed authenticate scaler with the environment.
func ResolveAuthRefAndPodIdentity(client client.Client, logger logr.Logger, triggerAuthRef *kedav1alpha1.ScaledObjectAuthRef, podTemplateSpec *corev1.PodTemplateSpec, namespace string) (map[string]string, kedav1alpha1.PodIdentityProvider, error) {
if podTemplateSpec != nil {
authParams, podIdentity := resolveAuthRef(client, logger, triggerAuthRef, &podTemplateSpec.Spec, namespace)

if podIdentity == kedav1alpha1.PodIdentityProviderAwsEKS {
serviceAccountName := podTemplateSpec.Spec.ServiceAccountName
serviceAccount := &corev1.ServiceAccount{}
err := client.Get(context.TODO(), types.NamespacedName{Name: serviceAccountName, Namespace: namespace}, serviceAccount)
if err != nil {
return nil, kedav1alpha1.PodIdentityProviderNone, fmt.Errorf("error getting service account: %s", err)
}
authParams["awsRoleArn"] = serviceAccount.Annotations[kedav1alpha1.PodIdentityAnnotationEKS]
} else if podIdentity == kedav1alpha1.PodIdentityProviderAwsKiam {
authParams["awsRoleArn"] = podTemplateSpec.ObjectMeta.Annotations[kedav1alpha1.PodIdentityAnnotationKiam]
}
return authParams, podIdentity, nil
}

authParams, _ := resolveAuthRef(client, logger, triggerAuthRef, nil, namespace)
return authParams, kedav1alpha1.PodIdentityProviderNone, nil
}

// resolveAuthRef provides authentication parameters needed authenticate scaler with the environment.
// based on authentication method defined in TriggerAuthentication, authParams and podIdentity is returned
func resolveAuthRef(client client.Client, logger logr.Logger, triggerAuthRef *kedav1alpha1.ScaledObjectAuthRef, podSpec *corev1.PodSpec, namespace string) (map[string]string, kedav1alpha1.PodIdentityProvider) {
result := make(map[string]string)
var podIdentity kedav1alpha1.PodIdentityProvider

Expand Down
2 changes: 1 addition & 1 deletion pkg/scaling/resolver/scale_resolvers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ func TestResolveAuthRef(t *testing.T) {
test := test
t.Run(test.name, func(t *testing.T) {
clusterObjectNamespaceCache = &clusterNamespace // Inject test cluster namespace.
gotMap, gotPodIdentity := ResolveAuthRef(fake.NewFakeClientWithScheme(scheme.Scheme, test.existing...), logf.Log.WithName("test"), test.soar, test.podSpec, namespace)
gotMap, gotPodIdentity := resolveAuthRef(fake.NewFakeClientWithScheme(scheme.Scheme, test.existing...), logf.Log.WithName("test"), test.soar, test.podSpec, namespace)
if diff := cmp.Diff(gotMap, test.expected); diff != "" {
t.Errorf("Returned authParams are different: %s", diff)
}
Expand Down
127 changes: 17 additions & 110 deletions pkg/scaling/scale_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,10 @@ import (
"k8s.io/client-go/tools/record"

"github.com/go-logr/logr"
appsv1 "k8s.io/api/apps/v1"
"k8s.io/api/autoscaling/v2beta2"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/scale"
"knative.dev/pkg/apis/duck"
duckv1 "knative.dev/pkg/apis/duck/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
logf "sigs.k8s.io/controller-runtime/pkg/log"

Expand All @@ -28,11 +23,6 @@ import (
"github.com/kedacore/keda/v2/pkg/scaling/resolver"
)

const (
// Default polling interval for a ScaledObject triggers if no pollingInterval is defined.
defaultPollingInterval = 30
)

// ScaleHandler encapsulates the logic of calling the right scalers for
// each ScaledObject and making the final scale decision and operation
type ScaleHandler interface {
Expand Down Expand Up @@ -68,7 +58,7 @@ func (h *scaleHandler) GetScalers(scalableObject interface{}) ([]scalers.Scaler,
return nil, err
}

podTemplateSpec, containerName, err := h.getPods(scalableObject)
podTemplateSpec, containerName, err := resolver.ResolveScaleTargetPodSpec(h.client, h.logger, scalableObject)
if err != nil {
return nil, err
}
Expand All @@ -83,8 +73,7 @@ func (h *scaleHandler) HandleScalableObject(scalableObject interface{}) error {
return err
}

key := generateKey(withTriggers)

key := withTriggers.GenerateIdenitifier()
ctx, cancel := context.WithCancel(context.TODO())

// cancel the outdated ScaleLoop for the same ScaledObject (if exists)
Expand Down Expand Up @@ -121,8 +110,7 @@ func (h *scaleHandler) DeleteScalableObject(scalableObject interface{}) error {
return err
}

key := generateKey(withTriggers)

key := withTriggers.GenerateIdenitifier()
result, ok := h.scaleLoopContexts.Load(key)
if ok {
cancel, ok := result.(context.CancelFunc)
Expand All @@ -145,7 +133,7 @@ func (h *scaleHandler) startScaleLoop(ctx context.Context, withTriggers *kedav1a
// kick off one check to the scalers now
h.checkScalers(ctx, scalableObject, scalingMutex)

pollingInterval := getPollingInterval(withTriggers)
pollingInterval := withTriggers.GetPollingInterval()
logger.V(1).Info("Watching with pollingInterval", "PollingInterval", pollingInterval)

for {
Expand Down Expand Up @@ -214,15 +202,15 @@ func (h *scaleHandler) checkScalers(ctx context.Context, scalableObject interfac
defer scalingMutex.Unlock()
switch obj := scalableObject.(type) {
case *kedav1alpha1.ScaledObject:
h.scaleExecutor.RequestScale(ctx, obj, h.checkScaledObjectScalers(ctx, scalers, obj))
h.scaleExecutor.RequestScale(ctx, obj, h.isScaledObjectActive(ctx, scalers, obj))
case *kedav1alpha1.ScaledJob:
scaledJob := scalableObject.(*kedav1alpha1.ScaledJob)
isActive, scaleTo, maxScale := h.checkScaledJobScalers(ctx, scalers, scaledJob)
isActive, scaleTo, maxScale := h.isScaledJobActive(ctx, scalers, scaledJob)
h.scaleExecutor.RequestJobScale(ctx, obj, isActive, scaleTo, maxScale)
}
}

func (h *scaleHandler) checkScaledObjectScalers(ctx context.Context, scalers []scalers.Scaler, scaledObject *kedav1alpha1.ScaledObject) bool {
func (h *scaleHandler) isScaledObjectActive(ctx context.Context, scalers []scalers.Scaler, scaledObject *kedav1alpha1.ScaledObject) bool {
isActive := false
for i, scaler := range scalers {
isTriggerActive, err := scaler.IsActive(ctx)
Expand All @@ -247,12 +235,14 @@ func (h *scaleHandler) checkScaledObjectScalers(ctx context.Context, scalers []s
return isActive
}

func (h *scaleHandler) checkScaledJobScalers(ctx context.Context, scalers []scalers.Scaler, scaledJob *kedav1alpha1.ScaledJob) (bool, int64, int64) {
func (h *scaleHandler) isScaledJobActive(ctx context.Context, scalers []scalers.Scaler, scaledJob *kedav1alpha1.ScaledJob) (bool, int64, int64) {
var queueLength int64
var targetAverageValue int64
var maxValue int64
isActive := false

// TODO refactor this, do chores, reduce the verbosity ie: V(1) and frequency of logs
// move relevant funcs getTargetAverageValue(), min() and divideWithCeil() out of scaler_handler.go
for _, scaler := range scalers {
scalerLogger := h.logger.WithValues("Scaler", scaler)

Expand Down Expand Up @@ -295,7 +285,7 @@ func (h *scaleHandler) checkScaledJobScalers(ctx context.Context, scalers []scal
}
}
if targetAverageValue != 0 {
maxValue = min(scaledJob.MaxReplicaCount(), devideWithCeil(queueLength, targetAverageValue))
maxValue = min(scaledJob.MaxReplicaCount(), divideWithCeil(queueLength, targetAverageValue))
}
h.logger.Info("Scaler maxValue", "maxValue", maxValue)
return isActive, queueLength, maxValue
Expand Down Expand Up @@ -324,7 +314,7 @@ func getTargetAverageValue(metricSpecs []v2beta2.MetricSpec) int64 {
return 0
}

func devideWithCeil(x, y int64) int64 {
func divideWithCeil(x, y int64) int64 {
ans := x / y
reminder := x % y
if reminder != 0 {
Expand Down Expand Up @@ -363,26 +353,11 @@ func (h *scaleHandler) buildScalers(withTriggers *kedav1alpha1.WithTriggers, pod
AuthParams: make(map[string]string),
GlobalHTTPTimeout: h.globalHTTPTimeout,
}
if podTemplateSpec != nil {
authParams, podIdentity := resolver.ResolveAuthRef(h.client, logger, trigger.AuthenticationRef, &podTemplateSpec.Spec, withTriggers.Namespace)

if podIdentity == kedav1alpha1.PodIdentityProviderAwsEKS {
serviceAccountName := podTemplateSpec.Spec.ServiceAccountName
serviceAccount := &corev1.ServiceAccount{}
err = h.client.Get(context.TODO(), types.NamespacedName{Name: serviceAccountName, Namespace: withTriggers.Namespace}, serviceAccount)
if err != nil {
closeScalers(scalersRes)
return []scalers.Scaler{}, fmt.Errorf("error getting service account: %s", err)
}
authParams["awsRoleArn"] = serviceAccount.Annotations[kedav1alpha1.PodIdentityAnnotationEKS]
} else if podIdentity == kedav1alpha1.PodIdentityProviderAwsKiam {
authParams["awsRoleArn"] = podTemplateSpec.ObjectMeta.Annotations[kedav1alpha1.PodIdentityAnnotationKiam]
}
config.AuthParams = authParams
config.PodIdentity = podIdentity
} else {
authParams, _ := resolver.ResolveAuthRef(h.client, logger, trigger.AuthenticationRef, nil, withTriggers.Namespace)
config.AuthParams = authParams

config.AuthParams, config.PodIdentity, err = resolver.ResolveAuthRefAndPodIdentity(h.client, logger, trigger.AuthenticationRef, podTemplateSpec, withTriggers.Namespace)
if err != nil {
closeScalers(scalersRes)
return []scalers.Scaler{}, err
}

scaler, err := buildScaler(trigger.Type, config)
Expand All @@ -398,62 +373,6 @@ func (h *scaleHandler) buildScalers(withTriggers *kedav1alpha1.WithTriggers, pod
return scalersRes, nil
}

func (h *scaleHandler) getPods(scalableObject interface{}) (*corev1.PodTemplateSpec, string, error) {
switch obj := scalableObject.(type) {
case *kedav1alpha1.ScaledObject:
// Try to get a real object instance for better cache usage, but fall back to an Unstructured if needed.
podTemplateSpec := corev1.PodTemplateSpec{}
gvk := obj.Status.ScaleTargetGVKR.GroupVersionKind()
objKey := client.ObjectKey{Namespace: obj.Namespace, Name: obj.Spec.ScaleTargetRef.Name}
switch {
// For core types, use a typed client so we get an informer-cache-backed Get to reduce API load.
case gvk.Group == "apps" && gvk.Kind == "Deployment":
deployment := &appsv1.Deployment{}
if err := h.client.Get(context.TODO(), objKey, deployment); err != nil {
// resource doesn't exist
h.logger.Error(err, "Target deployment doesn't exist", "resource", gvk.String(), "name", objKey.Name)
return nil, "", err
}
podTemplateSpec.ObjectMeta = deployment.ObjectMeta
podTemplateSpec.Spec = deployment.Spec.Template.Spec
case gvk.Group == "apps" && gvk.Kind == "StatefulSet":
statefulSet := &appsv1.StatefulSet{}
if err := h.client.Get(context.TODO(), objKey, statefulSet); err != nil {
// resource doesn't exist
h.logger.Error(err, "Target deployment doesn't exist", "resource", gvk.String(), "name", objKey.Name)
return nil, "", err
}
podTemplateSpec.ObjectMeta = statefulSet.ObjectMeta
podTemplateSpec.Spec = statefulSet.Spec.Template.Spec
default:
unstruct := &unstructured.Unstructured{}
unstruct.SetGroupVersionKind(gvk)
if err := h.client.Get(context.TODO(), objKey, unstruct); err != nil {
// resource doesn't exist
h.logger.Error(err, "Target resource doesn't exist", "resource", gvk.String(), "name", objKey.Name)
return nil, "", err
}
withPods := &duckv1.WithPod{}
if err := duck.FromUnstructured(unstruct, withPods); err != nil {
h.logger.Error(err, "Cannot convert Unstructured into PodSpecable Duck-type", "object", unstruct)
}
podTemplateSpec.ObjectMeta = withPods.ObjectMeta
podTemplateSpec.Spec = withPods.Spec.Template.Spec
}

if podTemplateSpec.Spec.Containers == nil || len(podTemplateSpec.Spec.Containers) == 0 {
h.logger.V(1).Info("There aren't any containers found in the ScaleTarget, therefore it is no possible to inject environment properties", "resource", gvk.String(), "name", obj.Spec.ScaleTargetRef.Name)
return nil, "", nil
}

return &podTemplateSpec, obj.Spec.ScaleTargetRef.EnvSourceContainerName, nil
case *kedav1alpha1.ScaledJob:
return &obj.Spec.JobTargetRef.Template, obj.Spec.EnvSourceContainerName, nil
default:
return nil, "", fmt.Errorf("unknown scalable object type %v", scalableObject)
}
}

func buildScaler(triggerType string, config *scalers.ScalerConfig) (scalers.Scaler, error) {
// TRIGGERS-START
switch triggerType {
Expand Down Expand Up @@ -566,15 +485,3 @@ func closeScalers(scalers []scalers.Scaler) {
defer scaler.Close()
}
}

func getPollingInterval(withTriggers *kedav1alpha1.WithTriggers) time.Duration {
if withTriggers.Spec.PollingInterval != nil {
return time.Second * time.Duration(*withTriggers.Spec.PollingInterval)
}

return time.Second * time.Duration(defaultPollingInterval)
}

func generateKey(scalableObject *kedav1alpha1.WithTriggers) string {
return fmt.Sprintf("%s.%s.%s", scalableObject.Kind, scalableObject.Namespace, scalableObject.Name)
}