diff --git a/Gopkg.lock b/Gopkg.lock index ed19a4ec5b..ae0e460a6a 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -903,6 +903,8 @@ "discovery/cached/disk", "discovery/fake", "dynamic", + "dynamic/dynamicinformer", + "dynamic/dynamiclister", "dynamic/fake", "informers", "informers/admissionregistration", @@ -1377,6 +1379,7 @@ "k8s.io/client-go/discovery", "k8s.io/client-go/discovery/fake", "k8s.io/client-go/dynamic", + "k8s.io/client-go/dynamic/dynamicinformer", "k8s.io/client-go/dynamic/fake", "k8s.io/client-go/informers", "k8s.io/client-go/informers/apps/v1", diff --git a/cmd/rollouts-controller/main.go b/cmd/rollouts-controller/main.go index 141aa8f2c2..41858d38de 100644 --- a/cmd/rollouts-controller/main.go +++ b/cmd/rollouts-controller/main.go @@ -97,7 +97,11 @@ func newCommand() *cobra.Command { kubeinformers.WithTweakListOptions(func(options *metav1.ListOptions) { options.LabelSelector = jobprovider.AnalysisRunUIDLabelKey })) - cm := controller.NewManager(kubeClient, rolloutClient, dynamicClient, + cm := controller.NewManager( + namespace, + kubeClient, + rolloutClient, + dynamicClient, kubeInformerFactory.Apps().V1().ReplicaSets(), kubeInformerFactory.Core().V1().Services(), jobInformerFactory.Batch().V1().Jobs(), diff --git a/controller/controller.go b/controller/controller.go index 7dc89337c9..5479a17226 100644 --- a/controller/controller.go +++ b/controller/controller.go @@ -73,10 +73,13 @@ type Manager struct { serviceWorkqueue workqueue.RateLimitingInterface experimentWorkqueue workqueue.RateLimitingInterface analysisRunWorkqueue workqueue.RateLimitingInterface + + defaultIstioVersion string } // NewManager returns a new manager to manage all the controllers func NewManager( + namespace string, kubeclientset kubernetes.Interface, argoprojclientset clientset.Interface, dynamicclientset dynamic.Interface, @@ -113,6 +116,7 @@ func NewManager( metricsServer := metrics.NewMetricsServer(metricsAddr, rolloutsInformer.Lister()) rolloutController := rollout.NewRolloutController( + namespace, kubeclientset, argoprojclientset, dynamicclientset, @@ -178,6 +182,7 @@ func NewManager( serviceController: serviceController, experimentController: experimentController, analysisController: analysisController, + defaultIstioVersion: defaultIstioVersion, } return cm diff --git a/rollout/bluegreen.go b/rollout/bluegreen.go index b4e0f224e3..da12593f81 100644 --- a/rollout/bluegreen.go +++ b/rollout/bluegreen.go @@ -33,6 +33,7 @@ func (c *RolloutController) rolloutBlueGreen(r *v1alpha1.Rollout, rsList []*apps allRSs := roCtx.AllRSs() if reconcileBlueGreenTemplateChange(roCtx) { roCtx.PauseContext().ClearPauseConditions() + roCtx.PauseContext().RemoveAbort() logCtx.Infof("New pod template or template change detected") return c.syncRolloutStatusBlueGreen(previewSvc, activeSvc, roCtx) } diff --git a/rollout/controller.go b/rollout/controller.go index fe152050f2..56f240fa47 100644 --- a/rollout/controller.go +++ b/rollout/controller.go @@ -9,6 +9,7 @@ import ( log "github.com/sirupsen/logrus" appsv1 "k8s.io/api/apps/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/dynamic" appsinformers "k8s.io/client-go/informers/apps/v1" @@ -19,6 +20,7 @@ import ( "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" + "k8s.io/kubernetes/cmd/kubeadm/app/util" "k8s.io/kubernetes/pkg/controller" "k8s.io/utils/pointer" @@ -28,6 +30,7 @@ import ( clientset "github.com/argoproj/argo-rollouts/pkg/client/clientset/versioned" informers "github.com/argoproj/argo-rollouts/pkg/client/informers/externalversions/rollouts/v1alpha1" listers "github.com/argoproj/argo-rollouts/pkg/client/listers/rollouts/v1alpha1" + "github.com/argoproj/argo-rollouts/rollout/trafficrouting/istio" "github.com/argoproj/argo-rollouts/utils/conditions" controllerutil "github.com/argoproj/argo-rollouts/utils/controller" "github.com/argoproj/argo-rollouts/utils/defaults" @@ -35,8 +38,14 @@ import ( serviceutil "github.com/argoproj/argo-rollouts/utils/service" ) +const ( + virtualServiceIndexName = "byVirtualService" +) + // RolloutController is the controller implementation for Rollout resources type RolloutController struct { + // namespace which namespace(s) operates on + namespace string // rsControl is used for adopting/releasing replica sets. replicaSetControl controller.RSControlInterface @@ -80,6 +89,7 @@ type RolloutController struct { // NewRolloutController returns a new rollout controller func NewRolloutController( + namespace string, kubeclientset kubernetes.Interface, argoprojclientset clientset.Interface, dynamicclientset dynamic.Interface, @@ -102,6 +112,7 @@ func NewRolloutController( } controller := &RolloutController{ + namespace: namespace, kubeclientset: kubeclientset, argoprojclientset: argoprojclientset, dynamicclientset: dynamicclientset, @@ -146,6 +157,15 @@ func NewRolloutController( }, }) + util.CheckErr(rolloutsInformer.Informer().AddIndexers(cache.Indexers{ + virtualServiceIndexName: func(obj interface{}) (strings []string, e error) { + if rollout, ok := obj.(*v1alpha1.Rollout); ok { + return istio.GetRolloutVirtualServiceKeys(rollout), nil + } + return + }, + })) + replicaSetInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { controllerutil.EnqueueParentObject(obj, register.RolloutKind, controller.enqueueRollout) @@ -197,6 +217,10 @@ func (c *RolloutController) Run(threadiness int, stopCh <-chan struct{}) error { }, time.Second, stopCh) } log.Info("Started Rollout workers") + + gvk := schema.ParseGroupResource("virtualservices.networking.istio.io").WithVersion(c.defaultIstioVersion) + go controllerutil.WatchResourceWithExponentialBackoff(stopCh, c.dynamicclientset, c.namespace, gvk, c.rolloutWorkqueue, c.rolloutsIndexer, virtualServiceIndexName) + <-stopCh log.Info("Shutting down workers") diff --git a/rollout/controller_test.go b/rollout/controller_test.go index da3db4f133..6a8506d9ba 100644 --- a/rollout/controller_test.go +++ b/rollout/controller_test.go @@ -372,7 +372,11 @@ func (f *fixture) newController(resync resyncFunc) (*RolloutController, informer rolloutWorkqueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Rollouts") serviceWorkqueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Services") - c := NewRolloutController(f.kubeclient, f.client, nil, + c := NewRolloutController( + metav1.NamespaceAll, + f.kubeclient, + f.client, + nil, i.Argoproj().V1alpha1().Experiments(), i.Argoproj().V1alpha1().AnalysisRuns(), i.Argoproj().V1alpha1().AnalysisTemplates(), diff --git a/rollout/trafficrouting/istio/istio.go b/rollout/trafficrouting/istio/istio.go index ada237c2e8..daabb85fc3 100644 --- a/rollout/trafficrouting/istio/istio.go +++ b/rollout/trafficrouting/istio/istio.go @@ -31,6 +31,23 @@ func NewReconciler(r *v1alpha1.Rollout, client dynamic.Interface, recorder recor } } +// GetRolloutVirtualServiceKeys gets the virtual service and its namespace from a rollout +func GetRolloutVirtualServiceKeys(rollout *v1alpha1.Rollout) []string { + if rollout.Spec.Strategy.Canary == nil { + return []string{} + } + if rollout.Spec.Strategy.Canary.TrafficRouting == nil { + return []string{} + } + if rollout.Spec.Strategy.Canary.TrafficRouting.Istio == nil { + return []string{} + } + if rollout.Spec.Strategy.Canary.TrafficRouting.Istio.VirtualService.Name == "" { + return []string{} + } + return []string{fmt.Sprintf("%s/%s", rollout.Namespace, rollout.Spec.Strategy.Canary.TrafficRouting.Istio.VirtualService.Name)} +} + // Reconciler holds required fields to reconcile Istio resources type Reconciler struct { rollout *v1alpha1.Rollout diff --git a/rollout/trafficrouting/istio/istio_test.go b/rollout/trafficrouting/istio/istio_test.go index 6e57889436..dd8d64d335 100644 --- a/rollout/trafficrouting/istio/istio_test.go +++ b/rollout/trafficrouting/istio/istio_test.go @@ -291,3 +291,28 @@ func TestValidateHosts(t *testing.T) { err = validateHosts(hr, "stable", "not-found-canary") assert.Equal(t, fmt.Errorf("Canary Service 'not-found-canary' not found in route"), err) } + +func TestGetRolloutVirtualServiceKeys(t *testing.T) { + ro := &v1alpha1.Rollout{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + Namespace: metav1.NamespaceDefault, + }, + Spec: v1alpha1.RolloutSpec{ + Strategy: v1alpha1.RolloutStrategy{}, + }, + } + assert.Len(t, GetRolloutVirtualServiceKeys(ro), 0) + ro.Spec.Strategy.Canary = &v1alpha1.CanaryStrategy{} + assert.Len(t, GetRolloutVirtualServiceKeys(ro), 0) + ro.Spec.Strategy.Canary.TrafficRouting = &v1alpha1.RolloutTrafficRouting{} + assert.Len(t, GetRolloutVirtualServiceKeys(ro), 0) + ro.Spec.Strategy.Canary.TrafficRouting.Istio = &v1alpha1.IstioTrafficRouting{ + VirtualService: v1alpha1.IstioVirtualService{}, + } + assert.Len(t, GetRolloutVirtualServiceKeys(ro), 0) + ro.Spec.Strategy.Canary.TrafficRouting.Istio.VirtualService.Name = "test" + keys := GetRolloutVirtualServiceKeys(ro) + assert.Len(t, keys, 1) + assert.Equal(t, keys[0], "default/test") +} diff --git a/utils/controller/controller.go b/utils/controller/controller.go index f1664c2d15..f40f4861bc 100644 --- a/utils/controller/controller.go +++ b/utils/controller/controller.go @@ -6,10 +6,15 @@ import ( "time" log "github.com/sirupsen/logrus" + "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/selection" "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/dynamic" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" @@ -18,6 +23,78 @@ import ( logutil "github.com/argoproj/argo-rollouts/utils/log" ) +// processNextWatchObj will process a single object from the watch by seeing if +// that object is in an index and enqueueing the value object from the indexer +func processNextWatchObj(watchEvent watch.Event, queue workqueue.RateLimitingInterface, indexer cache.Indexer, index string) { + obj := watchEvent.Object + acc, err := meta.Accessor(obj) + if err != nil { + log.Errorf("Error processing object from watch: %v", err) + return + } + objsToEnqueue, err := indexer.ByIndex(index, fmt.Sprintf("%s/%s", acc.GetNamespace(), acc.GetName())) + if err != nil { + log.Errorf("Cannot process indexer: %s", err.Error()) + return + } + for i := range objsToEnqueue { + Enqueue(objsToEnqueue[i], queue) + } +} + +// WatchResource is a long-running function that will continually call the +// processNextWatchObj function in order to watch changes on a resource kind +// and enqueue a different resources kind that interact with them +func WatchResource(client dynamic.Interface, namespace string, gvk schema.GroupVersionResource, queue workqueue.RateLimitingInterface, indexer cache.Indexer, index string) error { + log.Infof("Starting watch on resource '%s'", gvk.Resource) + var watchI watch.Interface + var err error + if namespace == metav1.NamespaceAll { + watchI, err = client.Resource(gvk).Watch(metav1.ListOptions{}) + } else { + watchI, err = client.Resource(gvk).Namespace(namespace).Watch(metav1.ListOptions{}) + } + + if err != nil { + log.Errorf("Error with watch: %v", err) + return err + } + for watchEvent := range watchI.ResultChan() { + processNextWatchObj(watchEvent, queue, indexer, index) + } + return nil +} + +// WatchResourceWithExponentialBackoff creates a watch for the gvk provided. If there are any error, +// the function will rety again using exponetial backoff. It starts at 1 second wait, and wait afterwards +// increases by a factor of 2 and caps at 5 minutes. +func WatchResourceWithExponentialBackoff(stopCh <-chan struct{}, client dynamic.Interface, namespace string, gvk schema.GroupVersionResource, queue workqueue.RateLimitingInterface, indexer cache.Indexer, index string) { + backoff := wait.Backoff{ + Duration: 1 * time.Second, + Cap: 5 * time.Minute, + Factor: float64(2), + Steps: 10, + } + for { + select { + case <-stopCh: + return + default: + } + err := WatchResource(client, namespace, gvk, queue, indexer, index) + if err == nil { + backoff = wait.Backoff{ + Duration: 1 * time.Second, + Cap: 5 * time.Minute, + Factor: float64(2), + Steps: 10, + } + continue + } + time.Sleep(backoff.Step()) + } +} + // RunWorker is a long-running function that will continually call the // processNextWorkItem function in order to read and process a message on the // workqueue. diff --git a/utils/controller/controller_test.go b/utils/controller/controller_test.go index c967b5d8a1..c71e7441a2 100644 --- a/utils/controller/controller_test.go +++ b/utils/controller/controller_test.go @@ -7,8 +7,17 @@ import ( "github.com/stretchr/testify/assert" appsv1 "k8s.io/api/apps/v1" + k8serrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/watch" + dynamicinformers "k8s.io/client-go/dynamic/dynamicinformer" + dynamicfake "k8s.io/client-go/dynamic/fake" + kubetesting "k8s.io/client-go/testing" "k8s.io/client-go/util/workqueue" "github.com/argoproj/argo-rollouts/controller/metrics" @@ -17,7 +26,6 @@ import ( "github.com/argoproj/argo-rollouts/pkg/client/clientset/versioned/fake" informers "github.com/argoproj/argo-rollouts/pkg/client/informers/externalversions" "github.com/argoproj/argo-rollouts/utils/log" - "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/client-go/tools/cache" ) @@ -141,7 +149,7 @@ func TestEnqueueRateLimitedInvalidObject(t *testing.T) { func TestEnqueueParentObjectInvalidObject(t *testing.T) { errorMessages := make([]error, 0) - runtime.ErrorHandlers = append(runtime.ErrorHandlers, func(err error) { + utilruntime.ErrorHandlers = append(utilruntime.ErrorHandlers, func(err error) { errorMessages = append(errorMessages, err) }) invalidObject := "invalid-object" @@ -153,7 +161,7 @@ func TestEnqueueParentObjectInvalidObject(t *testing.T) { func TestEnqueueParentObjectInvalidTombstoneObject(t *testing.T) { errorMessages := make([]string, 0) - runtime.ErrorHandlers = append(runtime.ErrorHandlers, func(err error) { + utilruntime.ErrorHandlers = append(utilruntime.ErrorHandlers, func(err error) { errorMessages = append(errorMessages, err.Error()) }) @@ -166,7 +174,7 @@ func TestEnqueueParentObjectInvalidTombstoneObject(t *testing.T) { func TestEnqueueParentObjectNoOwner(t *testing.T) { errorMessages := make([]string, 0) - runtime.ErrorHandlers = append(runtime.ErrorHandlers, func(err error) { + utilruntime.ErrorHandlers = append(utilruntime.ErrorHandlers, func(err error) { errorMessages = append(errorMessages, err.Error()) }) rs := &appsv1.ReplicaSet{ @@ -188,7 +196,7 @@ func TestEnqueueParentObjectDifferentOwnerKind(t *testing.T) { experimentKind := v1alpha1.SchemeGroupVersion.WithKind("Experiment") errorMessages := make([]string, 0) - runtime.ErrorHandlers = append(runtime.ErrorHandlers, func(err error) { + utilruntime.ErrorHandlers = append(utilruntime.ErrorHandlers, func(err error) { errorMessages = append(errorMessages, err.Error()) }) experiment := &v1alpha1.Experiment{ @@ -217,7 +225,7 @@ func TestEnqueueParentObjectOtherOwnerTypes(t *testing.T) { deploymentKind := appsv1.SchemeGroupVersion.WithKind("Deployment") errorMessages := make([]string, 0) - runtime.ErrorHandlers = append(runtime.ErrorHandlers, func(err error) { + utilruntime.ErrorHandlers = append(utilruntime.ErrorHandlers, func(err error) { errorMessages = append(errorMessages, err.Error()) }) deployment := &appsv1.Deployment{ @@ -246,7 +254,7 @@ func TestEnqueueParentObjectEnqueueExperiment(t *testing.T) { experimentKind := v1alpha1.SchemeGroupVersion.WithKind("Experiment") errorMessages := make([]string, 0) - runtime.ErrorHandlers = append(runtime.ErrorHandlers, func(err error) { + utilruntime.ErrorHandlers = append(utilruntime.ErrorHandlers, func(err error) { errorMessages = append(errorMessages, err.Error()) }) experiment := &v1alpha1.Experiment{ @@ -279,7 +287,7 @@ func TestEnqueueParentObjectEnqueueRollout(t *testing.T) { rolloutKind := v1alpha1.SchemeGroupVersion.WithKind("Rollout") errorMessages := make([]string, 0) - runtime.ErrorHandlers = append(runtime.ErrorHandlers, func(err error) { + utilruntime.ErrorHandlers = append(utilruntime.ErrorHandlers, func(err error) { errorMessages = append(errorMessages, err.Error()) }) rollout := &v1alpha1.Rollout{ @@ -311,7 +319,7 @@ func TestEnqueueParentObjectEnqueueRollout(t *testing.T) { func TestEnqueueParentObjectRecoverTombstoneObject(t *testing.T) { experimentKind := v1alpha1.SchemeGroupVersion.WithKind("Experiment") errorMessages := make([]string, 0) - runtime.ErrorHandlers = append(runtime.ErrorHandlers, func(err error) { + utilruntime.ErrorHandlers = append(utilruntime.ErrorHandlers, func(err error) { errorMessages = append(errorMessages, err.Error()) }) experiment := &v1alpha1.Experiment{ @@ -362,3 +370,82 @@ func TestInstanceIDRequirement(t *testing.T) { assert.Panics(t, func() { InstanceIDRequirement(".%&(") }) } + +func newObj(name, kind, apiVersion string) *unstructured.Unstructured { + obj := make(map[string]interface{}) + obj["apiVersion"] = apiVersion + obj["kind"] = kind + obj["metadata"] = map[string]interface{}{ + "name": name, + "namespace": metav1.NamespaceDefault, + } + return &unstructured.Unstructured{Object: obj} +} + +func TestWatchResourceNotFound(t *testing.T) { + obj := newObj("foo", "Object", "example.com/v1") + client := dynamicfake.NewSimpleDynamicClient(runtime.NewScheme(), obj) + gvk := schema.ParseGroupResource("objects.example.com").WithVersion("v1") + returnsError := false + client.PrependWatchReactor("*", func(action kubetesting.Action) (handled bool, ret watch.Interface, err error) { + returnsError = true + return true, nil, k8serrors.NewNotFound(gvk.GroupResource(), "virtualservices") + }) + err := WatchResource(client, metav1.NamespaceAll, gvk, nil, nil, "not-used") + assert.True(t, returnsError) + assert.Equal(t, k8serrors.NewNotFound(gvk.GroupResource(), "virtualservices"), err) + + returnsError = false + err = WatchResource(client, metav1.NamespaceDefault, gvk, nil, nil, "not-used") + assert.True(t, returnsError) + assert.Equal(t, k8serrors.NewNotFound(gvk.GroupResource(), "virtualservices"), err) +} + +func TestWatchResourceHandleStop(t *testing.T) { + obj := newObj("foo", "Object", "example.com/v1") + client := dynamicfake.NewSimpleDynamicClient(runtime.NewScheme(), obj) + gvk := schema.ParseGroupResource("objects.example.com").WithVersion("v1") + watchI := watch.NewRaceFreeFake() + watchI.Stop() + client.PrependWatchReactor("*", func(action kubetesting.Action) (handled bool, ret watch.Interface, err error) { + return true, watchI, nil + }) + + WatchResource(client, metav1.NamespaceAll, gvk, nil, nil, "not-used") +} + +func TestProcessNextWatchObj(t *testing.T) { + obj := newObj("foo", "Object", "example.com/v1") + client := dynamicfake.NewSimpleDynamicClient(runtime.NewScheme(), obj) + gvk := schema.ParseGroupResource("objects.example.com").WithVersion("v1") + dInformer := dynamicinformers.NewDynamicSharedInformerFactory(client, func() time.Duration { return 0 }()) + indexer := dInformer.ForResource(gvk).Informer().GetIndexer() + indexer.AddIndexers(cache.Indexers{ + "testIndexer": func(obj interface{}) (strings []string, e error) { + return []string{"default/foo"}, nil + }, + }) + indexer.Add(obj) + { + wq := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Rollouts") + watchEvent := watch.Event{ + Object: obj, + } + processNextWatchObj(watchEvent, wq, indexer, "testIndexer") + assert.Equal(t, 1, wq.Len()) + } + { + wq := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Rollouts") + watchEvent := watch.Event{ + Object: obj, + } + processNextWatchObj(watchEvent, wq, indexer, "no-indexer") + assert.Equal(t, 0, wq.Len()) + } + { + wq := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Rollouts") + invalidWatchEvent := watch.Event{} + processNextWatchObj(invalidWatchEvent, wq, indexer, "testIndexer") + assert.Equal(t, 0, wq.Len()) + } +}