Skip to content

Commit

Permalink
feat: support Ingress from Networking API version (argoproj#1529)
Browse files Browse the repository at this point in the history
Signed-off-by: Leonardo Luz Almeida <[email protected]>
  • Loading branch information
leoluz authored Oct 26, 2021
1 parent a398ff9 commit ac155f1
Show file tree
Hide file tree
Showing 20 changed files with 2,106 additions and 214 deletions.
9 changes: 8 additions & 1 deletion cmd/rollouts-controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/argoproj/argo-rollouts/pkg/signals"
controllerutil "github.com/argoproj/argo-rollouts/utils/controller"
"github.com/argoproj/argo-rollouts/utils/defaults"
ingressutil "github.com/argoproj/argo-rollouts/utils/ingress"
istioutil "github.com/argoproj/argo-rollouts/utils/istio"
logutil "github.com/argoproj/argo-rollouts/utils/log"
"github.com/argoproj/argo-rollouts/utils/tolerantinformer"
Expand Down Expand Up @@ -54,6 +55,7 @@ func newCommand() *cobra.Command {
istioVersion string
trafficSplitVersion string
ambassadorVersion string
ingressVersion string
albIngressClasses []string
nginxIngressClasses []string
awsVerifyTargetGroup bool
Expand Down Expand Up @@ -144,6 +146,10 @@ func newCommand() *cobra.Command {

k8sRequestProvider := &metrics.K8sRequestsCountProvider{}
kubeclientmetrics.AddMetricsTransportWrapper(config, k8sRequestProvider.IncKubernetesRequest)
mode, err := ingressutil.DetermineIngressMode(ingressVersion, kubeClient.DiscoveryClient)
checkError(err)
ingressWrapper, err := ingressutil.NewIngressWrapper(mode, kubeClient, kubeInformerFactory)
checkError(err)

cm := controller.NewManager(
namespace,
Expand All @@ -154,7 +160,7 @@ func newCommand() *cobra.Command {
discoveryClient,
kubeInformerFactory.Apps().V1().ReplicaSets(),
kubeInformerFactory.Core().V1().Services(),
kubeInformerFactory.Extensions().V1beta1().Ingresses(),
ingressWrapper,
jobInformerFactory.Batch().V1().Jobs(),
tolerantinformer.NewTolerantRolloutInformer(dynamicInformerFactory),
tolerantinformer.NewTolerantExperimentInformer(dynamicInformerFactory),
Expand Down Expand Up @@ -212,6 +218,7 @@ func newCommand() *cobra.Command {
command.Flags().StringVar(&istioVersion, "istio-api-version", defaults.DefaultIstioVersion, "Set the default Istio apiVersion that controller should look when manipulating VirtualServices.")
command.Flags().StringVar(&ambassadorVersion, "ambassador-api-version", defaults.DefaultAmbassadorVersion, "Set the Ambassador apiVersion that controller should look when manipulating Ambassador Mappings.")
command.Flags().StringVar(&trafficSplitVersion, "traffic-split-api-version", defaults.DefaultSMITrafficSplitVersion, "Set the default TrafficSplit apiVersion that controller uses when creating TrafficSplits.")
command.Flags().StringVar(&ingressVersion, "ingress-api-version", "", "Set the Ingress apiVersion that the controller should use.")
command.Flags().StringArrayVar(&albIngressClasses, "alb-ingress-classes", defaultALBIngressClass, "Defines all the ingress class annotations that the alb ingress controller operates on. Defaults to alb")
command.Flags().StringArrayVar(&nginxIngressClasses, "nginx-ingress-classes", defaultNGINXIngressClass, "Defines all the ingress class annotations that the nginx ingress controller operates on. Defaults to nginx")
command.Flags().BoolVar(&awsVerifyTargetGroup, "alb-verify-weight", false, "Verify ALB target group weights before progressing through steps (requires AWS privileges)")
Expand Down
10 changes: 5 additions & 5 deletions controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
appsinformers "k8s.io/client-go/informers/apps/v1"
batchinformers "k8s.io/client-go/informers/batch/v1"
coreinformers "k8s.io/client-go/informers/core/v1"
extensionsinformers "k8s.io/client-go/informers/extensions/v1beta1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/cache"
Expand All @@ -43,6 +42,7 @@ import (
"github.com/argoproj/argo-rollouts/rollout"
"github.com/argoproj/argo-rollouts/service"
"github.com/argoproj/argo-rollouts/utils/defaults"
ingressutil "github.com/argoproj/argo-rollouts/utils/ingress"
"github.com/argoproj/argo-rollouts/utils/queue"
"github.com/argoproj/argo-rollouts/utils/record"
)
Expand Down Expand Up @@ -148,7 +148,7 @@ func NewManager(
discoveryClient discovery.DiscoveryInterface,
replicaSetInformer appsinformers.ReplicaSetInformer,
servicesInformer coreinformers.ServiceInformer,
ingressesInformer extensionsinformers.IngressInformer,
ingressWrap *ingressutil.IngressWrap,
jobInformer batchinformers.JobInformer,
rolloutsInformer informers.RolloutInformer,
experimentsInformer informers.ExperimentInformer,
Expand Down Expand Up @@ -222,7 +222,7 @@ func NewManager(
IstioDestinationRuleInformer: istioDestinationRuleInformer,
ReplicaSetInformer: replicaSetInformer,
ServicesInformer: servicesInformer,
IngressInformer: ingressesInformer,
IngressWrapper: ingressWrap,
RolloutsInformer: rolloutsInformer,
ResyncPeriod: resyncPeriod,
RolloutWorkQueue: rolloutWorkqueue,
Expand Down Expand Up @@ -272,7 +272,7 @@ func NewManager(

ingressController := ingress.NewController(ingress.ControllerConfig{
Client: kubeclientset,
IngressInformer: ingressesInformer,
IngressWrap: ingressWrap,
IngressWorkQueue: ingressWorkqueue,

RolloutsInformer: rolloutsInformer,
Expand All @@ -288,7 +288,7 @@ func NewManager(
metricsServer: metricsServer,
rolloutSynced: rolloutsInformer.Informer().HasSynced,
serviceSynced: servicesInformer.Informer().HasSynced,
ingressSynced: ingressesInformer.Informer().HasSynced,
ingressSynced: ingressWrap.HasSynced,
jobSynced: jobInformer.Informer().HasSynced,
experimentSynced: experimentsInformer.Informer().HasSynced,
analysisRunSynced: analysisRunInformer.Informer().HasSynced,
Expand Down
29 changes: 18 additions & 11 deletions ingress/alb.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ import (
"strings"

log "github.com/sirupsen/logrus"
extensionsv1beta1 "k8s.io/api/extensions/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/pointer"

"github.com/argoproj/argo-rollouts/pkg/apis/rollouts/v1alpha1"
Expand All @@ -17,9 +15,10 @@ import (
logutil "github.com/argoproj/argo-rollouts/utils/log"
)

func (c *Controller) syncALBIngress(ingress *extensionsv1beta1.Ingress, rollouts []*v1alpha1.Rollout) error {
func (c *Controller) syncALBIngress(ingress *ingressutil.Ingress, rollouts []*v1alpha1.Rollout) error {
ctx := context.TODO()
managedActions, err := ingressutil.NewManagedALBActions(ingress.Annotations[ingressutil.ManagedActionsAnnotation])
annotations := ingress.GetAnnotations()
managedActions, err := ingressutil.NewManagedALBActions(annotations[ingressutil.ManagedActionsAnnotation])
if err != nil {
return nil
}
Expand All @@ -40,32 +39,40 @@ func (c *Controller) syncALBIngress(ingress *extensionsv1beta1.Ingress, rollouts
delete(managedActions, roName)
resetALBAction, err := getResetALBActionStr(ingress, actionKey)
if err != nil {
log.WithField(logutil.RolloutKey, roName).WithField(logutil.IngressKey, ingress.Name).WithField(logutil.NamespaceKey, ingress.Namespace).Error(err)
log.WithField(logutil.RolloutKey, roName).
WithField(logutil.IngressKey, ingress.GetName()).
WithField(logutil.NamespaceKey, ingress.GetNamespace()).
Error(err)
return nil
}
newIngress.Annotations[actionKey] = resetALBAction
annotations := newIngress.GetAnnotations()
annotations[actionKey] = resetALBAction
newIngress.SetAnnotations(annotations)
}
}
if !modified {
return nil
}
newManagedStr := managedActions.String()
newIngress.Annotations[ingressutil.ManagedActionsAnnotation] = newManagedStr
newAnnotations := newIngress.GetAnnotations()
newAnnotations[ingressutil.ManagedActionsAnnotation] = newManagedStr
newIngress.SetAnnotations(newAnnotations)
if newManagedStr == "" {
delete(newIngress.Annotations, ingressutil.ManagedActionsAnnotation)
delete(newIngress.GetAnnotations(), ingressutil.ManagedActionsAnnotation)
}
_, err = c.client.ExtensionsV1beta1().Ingresses(ingress.Namespace).Update(ctx, newIngress, metav1.UpdateOptions{})
_, err = c.ingressWrapper.Update(ctx, ingress.GetNamespace(), newIngress)
return err
}

func getResetALBActionStr(ingress *extensionsv1beta1.Ingress, action string) (string, error) {
func getResetALBActionStr(ingress *ingressutil.Ingress, action string) (string, error) {
parts := strings.Split(action, ingressutil.ALBActionPrefix)
if len(parts) != 2 {
return "", fmt.Errorf("unable to parse action to get the service %s", action)
}
service := parts[1]

previousActionStr := ingress.Annotations[action]
annotations := ingress.GetAnnotations()
previousActionStr := annotations[action]
var previousAction ingressutil.ALBAction
err := json.Unmarshal([]byte(previousActionStr), &previousAction)
if err != nil {
Expand Down
12 changes: 6 additions & 6 deletions ingress/alb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func TestInvalidManagedALBActions(t *testing.T) {
ing := newALBIngress("test-ingress", 80, "stable-service", rollout.Name)
ing.Annotations[ingressutil.ManagedActionsAnnotation] = "invalid-managed-by"

ctrl, kubeclient, enqueuedObjects := newFakeIngressController(ing, rollout)
ctrl, kubeclient, enqueuedObjects := newFakeIngressController(t, ing, rollout)

err := ctrl.syncIngress("default/test-ingress")
assert.Nil(t, err)
Expand All @@ -113,7 +113,7 @@ func TestInvalidPreviousALBActionAnnotationValue(t *testing.T) {
ing := newALBIngress("test-ingress", 80, "stable-service", "not-existing-rollout")
ing.Annotations[albActionAnnotation("stable-service")] = "{"

ctrl, kubeclient, enqueuedObjects := newFakeIngressController(ing, nil)
ctrl, kubeclient, enqueuedObjects := newFakeIngressController(t, ing, nil)

err := ctrl.syncIngress("default/test-ingress")
assert.Nil(t, err)
Expand All @@ -124,7 +124,7 @@ func TestInvalidPreviousALBActionAnnotationValue(t *testing.T) {
func TestInvalidPreviousALBActionAnnotationKey(t *testing.T) {
ing := newALBIngress("test-ingress", 80, "stable-service", "not-existing-rollout")
ing.Annotations[ingressutil.ManagedActionsAnnotation] = "invalid-action-key"
ctrl, kubeclient, enqueuedObjects := newFakeIngressController(ing, nil)
ctrl, kubeclient, enqueuedObjects := newFakeIngressController(t, ing, nil)

err := ctrl.syncIngress("default/test-ingress")
assert.Nil(t, err)
Expand All @@ -136,7 +136,7 @@ func TestResetActionFailureFindNoPort(t *testing.T) {
ing := newALBIngress("test-ingress", 80, "stable-service", "not-existing-rollout")
ing.Annotations[albActionAnnotation("stable-service")] = "{}"

ctrl, kubeclient, enqueuedObjects := newFakeIngressController(ing, nil)
ctrl, kubeclient, enqueuedObjects := newFakeIngressController(t, ing, nil)

err := ctrl.syncIngress("default/test-ingress")
assert.Nil(t, err)
Expand All @@ -148,7 +148,7 @@ func TestALBIngressNoModifications(t *testing.T) {
rollout := rollout("rollout", "stable-service", "test-ingress")
ing := newALBIngress("test-ingress", 80, "stable-service", rollout.Name)

ctrl, kubeclient, enqueuedObjects := newFakeIngressController(ing, rollout)
ctrl, kubeclient, enqueuedObjects := newFakeIngressController(t, ing, rollout)

err := ctrl.syncIngress("default/test-ingress")
assert.Nil(t, err)
Expand All @@ -159,7 +159,7 @@ func TestALBIngressNoModifications(t *testing.T) {
func TestALBIngressResetAction(t *testing.T) {
ing := newALBIngress("test-ingress", 80, "stable-service", "non-existing-rollout")

ctrl, kubeclient, enqueuedObjects := newFakeIngressController(ing, nil)
ctrl, kubeclient, enqueuedObjects := newFakeIngressController(t, ing, nil)
err := ctrl.syncIngress("default/test-ingress")
assert.Nil(t, err)
assert.Len(t, enqueuedObjects, 0)
Expand Down
25 changes: 15 additions & 10 deletions ingress/ingress.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
package ingress

import (
"context"
"fmt"
"strings"
"time"

log "github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/util/wait"
extensionsinformers "k8s.io/client-go/informers/extensions/v1beta1"
"k8s.io/client-go/kubernetes"
extentionslisters "k8s.io/client-go/listers/extensions/v1beta1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/kubernetes/cmd/kubeadm/app/util"
Expand All @@ -32,7 +31,7 @@ const (
// ControllerConfig describes the data required to instantiate a new ingress controller
type ControllerConfig struct {
Client kubernetes.Interface
IngressInformer extensionsinformers.IngressInformer
IngressWrap *ingressutil.IngressWrap
IngressWorkQueue workqueue.RateLimitingInterface

RolloutsInformer informers.RolloutInformer
Expand All @@ -47,7 +46,7 @@ type ControllerConfig struct {
type Controller struct {
client kubernetes.Interface
rolloutsIndexer cache.Indexer
ingressLister extentionslisters.IngressLister
ingressWrapper IngressWrapper
ingressWorkqueue workqueue.RateLimitingInterface

metricServer *metrics.MetricsServer
Expand All @@ -56,13 +55,18 @@ type Controller struct {
nginxClasses []string
}

type IngressWrapper interface {
GetCached(namespace, name string) (*ingressutil.Ingress, error)
Update(ctx context.Context, namespace string, ingress *ingressutil.Ingress) (*ingressutil.Ingress, error)
}

// NewController returns a new ingress controller
func NewController(cfg ControllerConfig) *Controller {

controller := &Controller{
client: cfg.Client,
rolloutsIndexer: cfg.RolloutsInformer.Informer().GetIndexer(),
ingressLister: cfg.IngressInformer.Lister(),
ingressWrapper: cfg.IngressWrap,

ingressWorkqueue: cfg.IngressWorkQueue,
metricServer: cfg.MetricsServer,
Expand All @@ -79,7 +83,7 @@ func NewController(cfg ControllerConfig) *Controller {
},
}))

cfg.IngressInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
cfg.IngressWrap.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
controllerutil.Enqueue(obj, cfg.IngressWorkQueue)
},
Expand Down Expand Up @@ -119,7 +123,7 @@ func (c *Controller) syncIngress(key string) error {
if err != nil {
return err
}
ingress, err := c.ingressLister.Ingresses(namespace).Get(name)
ingress, err := c.ingressWrapper.GetCached(namespace, name)
if err != nil {
if !errors.IsNotFound(err) {
// Unknown error occurred
Expand All @@ -132,15 +136,16 @@ func (c *Controller) syncIngress(key string) error {
}
return nil
}
rollouts, err := c.getRolloutsByIngress(ingress.Namespace, ingress.Name)
rollouts, err := c.getRolloutsByIngress(ingress.GetNamespace(), ingress.GetName())
if err != nil {
return nil
}
// An ingress without annotations cannot be a alb or nginx ingress
if ingress.Annotations == nil {
if ingress.GetAnnotations() == nil {
return nil
}
class := ingress.Annotations["kubernetes.io/ingress.class"]
annotations := ingress.GetAnnotations()
class := annotations["kubernetes.io/ingress.class"]
switch {
case hasClass(c.albClasses, class):
return c.syncALBIngress(ingress, rollouts)
Expand Down
18 changes: 12 additions & 6 deletions ingress/ingress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/argoproj/argo-rollouts/pkg/apis/rollouts/v1alpha1"
"github.com/argoproj/argo-rollouts/pkg/client/clientset/versioned/fake"
informers "github.com/argoproj/argo-rollouts/pkg/client/informers/externalversions"
ingressutil "github.com/argoproj/argo-rollouts/utils/ingress"
"k8s.io/client-go/tools/cache"
)

Expand Down Expand Up @@ -53,7 +54,8 @@ func newNginxIngress(name string, port int, serviceName string) *extensionsv1bet
}
}

func newFakeIngressController(ing *extensionsv1beta1.Ingress, rollout *v1alpha1.Rollout) (*Controller, *k8sfake.Clientset, map[string]int) {
func newFakeIngressController(t *testing.T, ing *extensionsv1beta1.Ingress, rollout *v1alpha1.Rollout) (*Controller, *k8sfake.Clientset, map[string]int) {
t.Helper()
client := fake.NewSimpleClientset()
if rollout != nil {
client = fake.NewSimpleClientset(rollout)
Expand All @@ -64,13 +66,17 @@ func newFakeIngressController(ing *extensionsv1beta1.Ingress, rollout *v1alpha1.
}
i := informers.NewSharedInformerFactory(client, 0)
k8sI := kubeinformers.NewSharedInformerFactory(kubeclient, 0)
ingressWrap, err := ingressutil.NewIngressWrapper(ingressutil.IngressModeExtensions, kubeclient, k8sI)
if err != nil {
t.Fatal(err)
}

rolloutWorkqueue := workqueue.NewNamedRateLimitingQueue(queue.DefaultArgoRolloutsRateLimiter(), "Rollouts")
ingressWorkqueue := workqueue.NewNamedRateLimitingQueue(queue.DefaultArgoRolloutsRateLimiter(), "Ingresses")

c := NewController(ControllerConfig{
Client: kubeclient,
IngressInformer: k8sI.Extensions().V1beta1().Ingresses(),
IngressWrap: ingressWrap,
IngressWorkQueue: ingressWorkqueue,

RolloutsInformer: i.Argoproj().V1alpha1().Rollouts(),
Expand Down Expand Up @@ -110,7 +116,7 @@ func newFakeIngressController(ing *extensionsv1beta1.Ingress, rollout *v1alpha1.
}

func TestSyncMissingIngress(t *testing.T) {
ctrl, _, _ := newFakeIngressController(nil, nil)
ctrl, _, _ := newFakeIngressController(t, nil, nil)

err := ctrl.syncIngress("default/test-ingress")
assert.NoError(t, err)
Expand All @@ -119,7 +125,7 @@ func TestSyncMissingIngress(t *testing.T) {
func TestSyncIngressNotReferencedByRollout(t *testing.T) {
ing := newNginxIngress("test-stable-ingress", 80, "test-stable-service")

ctrl, kubeclient, _ := newFakeIngressController(ing, nil)
ctrl, kubeclient, _ := newFakeIngressController(t, ing, nil)

err := ctrl.syncIngress("default/test-stable-ingress")
assert.NoError(t, err)
Expand Down Expand Up @@ -150,7 +156,7 @@ func TestSyncIngressReferencedByRollout(t *testing.T) {
},
}

ctrl, kubeclient, enqueuedObjects := newFakeIngressController(ing, rollout)
ctrl, kubeclient, enqueuedObjects := newFakeIngressController(t, ing, rollout)

err := ctrl.syncIngress("default/test-stable-ingress")
assert.NoError(t, err)
Expand Down Expand Up @@ -182,7 +188,7 @@ func TestSkipIngressWithNoAnnotations(t *testing.T) {
},
}

ctrl, kubeclient, enqueuedObjects := newFakeIngressController(ing, rollout)
ctrl, kubeclient, enqueuedObjects := newFakeIngressController(t, ing, rollout)

err := ctrl.syncIngress("default/test-stable-ingress")
assert.NoError(t, err)
Expand Down
Loading

0 comments on commit ac155f1

Please sign in to comment.