Skip to content

Commit

Permalink
feat: Implement watch for Istio resources (#354)
Browse files Browse the repository at this point in the history
  • Loading branch information
dthomson25 authored Jan 21, 2020
1 parent 0f52eac commit 96f3ca4
Show file tree
Hide file tree
Showing 10 changed files with 258 additions and 11 deletions.
3 changes: 3 additions & 0 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion cmd/rollouts-controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,11 @@ func newCommand() *cobra.Command {
kubeinformers.WithTweakListOptions(func(options *metav1.ListOptions) {
options.LabelSelector = jobprovider.AnalysisRunUIDLabelKey
}))
cm := controller.NewManager(kubeClient, rolloutClient, dynamicClient,
cm := controller.NewManager(
namespace,
kubeClient,
rolloutClient,
dynamicClient,
kubeInformerFactory.Apps().V1().ReplicaSets(),
kubeInformerFactory.Core().V1().Services(),
jobInformerFactory.Batch().V1().Jobs(),
Expand Down
5 changes: 5 additions & 0 deletions controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,13 @@ type Manager struct {
serviceWorkqueue workqueue.RateLimitingInterface
experimentWorkqueue workqueue.RateLimitingInterface
analysisRunWorkqueue workqueue.RateLimitingInterface

defaultIstioVersion string
}

// NewManager returns a new manager to manage all the controllers
func NewManager(
namespace string,
kubeclientset kubernetes.Interface,
argoprojclientset clientset.Interface,
dynamicclientset dynamic.Interface,
Expand Down Expand Up @@ -113,6 +116,7 @@ func NewManager(
metricsServer := metrics.NewMetricsServer(metricsAddr, rolloutsInformer.Lister())

rolloutController := rollout.NewRolloutController(
namespace,
kubeclientset,
argoprojclientset,
dynamicclientset,
Expand Down Expand Up @@ -178,6 +182,7 @@ func NewManager(
serviceController: serviceController,
experimentController: experimentController,
analysisController: analysisController,
defaultIstioVersion: defaultIstioVersion,
}

return cm
Expand Down
1 change: 1 addition & 0 deletions rollout/bluegreen.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ func (c *RolloutController) rolloutBlueGreen(r *v1alpha1.Rollout, rsList []*apps
allRSs := roCtx.AllRSs()
if reconcileBlueGreenTemplateChange(roCtx) {
roCtx.PauseContext().ClearPauseConditions()
roCtx.PauseContext().RemoveAbort()
logCtx.Infof("New pod template or template change detected")
return c.syncRolloutStatusBlueGreen(previewSvc, activeSvc, roCtx)
}
Expand Down
24 changes: 24 additions & 0 deletions rollout/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
log "github.com/sirupsen/logrus"
appsv1 "k8s.io/api/apps/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/dynamic"
appsinformers "k8s.io/client-go/informers/apps/v1"
Expand All @@ -19,6 +20,7 @@ import (
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"k8s.io/kubernetes/cmd/kubeadm/app/util"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/utils/pointer"

Expand All @@ -28,15 +30,22 @@ import (
clientset "github.com/argoproj/argo-rollouts/pkg/client/clientset/versioned"
informers "github.com/argoproj/argo-rollouts/pkg/client/informers/externalversions/rollouts/v1alpha1"
listers "github.com/argoproj/argo-rollouts/pkg/client/listers/rollouts/v1alpha1"
"github.com/argoproj/argo-rollouts/rollout/trafficrouting/istio"
"github.com/argoproj/argo-rollouts/utils/conditions"
controllerutil "github.com/argoproj/argo-rollouts/utils/controller"
"github.com/argoproj/argo-rollouts/utils/defaults"
logutil "github.com/argoproj/argo-rollouts/utils/log"
serviceutil "github.com/argoproj/argo-rollouts/utils/service"
)

const (
virtualServiceIndexName = "byVirtualService"
)

// RolloutController is the controller implementation for Rollout resources
type RolloutController struct {
// namespace which namespace(s) operates on
namespace string
// rsControl is used for adopting/releasing replica sets.
replicaSetControl controller.RSControlInterface

Expand Down Expand Up @@ -80,6 +89,7 @@ type RolloutController struct {

// NewRolloutController returns a new rollout controller
func NewRolloutController(
namespace string,
kubeclientset kubernetes.Interface,
argoprojclientset clientset.Interface,
dynamicclientset dynamic.Interface,
Expand All @@ -102,6 +112,7 @@ func NewRolloutController(
}

controller := &RolloutController{
namespace: namespace,
kubeclientset: kubeclientset,
argoprojclientset: argoprojclientset,
dynamicclientset: dynamicclientset,
Expand Down Expand Up @@ -146,6 +157,15 @@ func NewRolloutController(
},
})

util.CheckErr(rolloutsInformer.Informer().AddIndexers(cache.Indexers{
virtualServiceIndexName: func(obj interface{}) (strings []string, e error) {
if rollout, ok := obj.(*v1alpha1.Rollout); ok {
return istio.GetRolloutVirtualServiceKeys(rollout), nil
}
return
},
}))

replicaSetInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
controllerutil.EnqueueParentObject(obj, register.RolloutKind, controller.enqueueRollout)
Expand Down Expand Up @@ -197,6 +217,10 @@ func (c *RolloutController) Run(threadiness int, stopCh <-chan struct{}) error {
}, time.Second, stopCh)
}
log.Info("Started Rollout workers")

gvk := schema.ParseGroupResource("virtualservices.networking.istio.io").WithVersion(c.defaultIstioVersion)
go controllerutil.WatchResourceWithExponentialBackoff(stopCh, c.dynamicclientset, c.namespace, gvk, c.rolloutWorkqueue, c.rolloutsIndexer, virtualServiceIndexName)

<-stopCh
log.Info("Shutting down workers")

Expand Down
6 changes: 5 additions & 1 deletion rollout/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,11 @@ func (f *fixture) newController(resync resyncFunc) (*RolloutController, informer
rolloutWorkqueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Rollouts")
serviceWorkqueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Services")

c := NewRolloutController(f.kubeclient, f.client, nil,
c := NewRolloutController(
metav1.NamespaceAll,
f.kubeclient,
f.client,
nil,
i.Argoproj().V1alpha1().Experiments(),
i.Argoproj().V1alpha1().AnalysisRuns(),
i.Argoproj().V1alpha1().AnalysisTemplates(),
Expand Down
17 changes: 17 additions & 0 deletions rollout/trafficrouting/istio/istio.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,23 @@ func NewReconciler(r *v1alpha1.Rollout, client dynamic.Interface, recorder recor
}
}

// GetRolloutVirtualServiceKeys gets the virtual service and its namespace from a rollout
func GetRolloutVirtualServiceKeys(rollout *v1alpha1.Rollout) []string {
if rollout.Spec.Strategy.Canary == nil {
return []string{}
}
if rollout.Spec.Strategy.Canary.TrafficRouting == nil {
return []string{}
}
if rollout.Spec.Strategy.Canary.TrafficRouting.Istio == nil {
return []string{}
}
if rollout.Spec.Strategy.Canary.TrafficRouting.Istio.VirtualService.Name == "" {
return []string{}
}
return []string{fmt.Sprintf("%s/%s", rollout.Namespace, rollout.Spec.Strategy.Canary.TrafficRouting.Istio.VirtualService.Name)}
}

// Reconciler holds required fields to reconcile Istio resources
type Reconciler struct {
rollout *v1alpha1.Rollout
Expand Down
25 changes: 25 additions & 0 deletions rollout/trafficrouting/istio/istio_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,3 +291,28 @@ func TestValidateHosts(t *testing.T) {
err = validateHosts(hr, "stable", "not-found-canary")
assert.Equal(t, fmt.Errorf("Canary Service 'not-found-canary' not found in route"), err)
}

func TestGetRolloutVirtualServiceKeys(t *testing.T) {
ro := &v1alpha1.Rollout{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Namespace: metav1.NamespaceDefault,
},
Spec: v1alpha1.RolloutSpec{
Strategy: v1alpha1.RolloutStrategy{},
},
}
assert.Len(t, GetRolloutVirtualServiceKeys(ro), 0)
ro.Spec.Strategy.Canary = &v1alpha1.CanaryStrategy{}
assert.Len(t, GetRolloutVirtualServiceKeys(ro), 0)
ro.Spec.Strategy.Canary.TrafficRouting = &v1alpha1.RolloutTrafficRouting{}
assert.Len(t, GetRolloutVirtualServiceKeys(ro), 0)
ro.Spec.Strategy.Canary.TrafficRouting.Istio = &v1alpha1.IstioTrafficRouting{
VirtualService: v1alpha1.IstioVirtualService{},
}
assert.Len(t, GetRolloutVirtualServiceKeys(ro), 0)
ro.Spec.Strategy.Canary.TrafficRouting.Istio.VirtualService.Name = "test"
keys := GetRolloutVirtualServiceKeys(ro)
assert.Len(t, keys, 1)
assert.Equal(t, keys[0], "default/test")
}
77 changes: 77 additions & 0 deletions utils/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,15 @@ import (
"time"

log "github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/selection"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"

Expand All @@ -18,6 +23,78 @@ import (
logutil "github.com/argoproj/argo-rollouts/utils/log"
)

// processNextWatchObj will process a single object from the watch by seeing if
// that object is in an index and enqueueing the value object from the indexer
func processNextWatchObj(watchEvent watch.Event, queue workqueue.RateLimitingInterface, indexer cache.Indexer, index string) {
obj := watchEvent.Object
acc, err := meta.Accessor(obj)
if err != nil {
log.Errorf("Error processing object from watch: %v", err)
return
}
objsToEnqueue, err := indexer.ByIndex(index, fmt.Sprintf("%s/%s", acc.GetNamespace(), acc.GetName()))
if err != nil {
log.Errorf("Cannot process indexer: %s", err.Error())
return
}
for i := range objsToEnqueue {
Enqueue(objsToEnqueue[i], queue)
}
}

// WatchResource is a long-running function that will continually call the
// processNextWatchObj function in order to watch changes on a resource kind
// and enqueue a different resources kind that interact with them
func WatchResource(client dynamic.Interface, namespace string, gvk schema.GroupVersionResource, queue workqueue.RateLimitingInterface, indexer cache.Indexer, index string) error {
log.Infof("Starting watch on resource '%s'", gvk.Resource)
var watchI watch.Interface
var err error
if namespace == metav1.NamespaceAll {
watchI, err = client.Resource(gvk).Watch(metav1.ListOptions{})
} else {
watchI, err = client.Resource(gvk).Namespace(namespace).Watch(metav1.ListOptions{})
}

if err != nil {
log.Errorf("Error with watch: %v", err)
return err
}
for watchEvent := range watchI.ResultChan() {
processNextWatchObj(watchEvent, queue, indexer, index)
}
return nil
}

// WatchResourceWithExponentialBackoff creates a watch for the gvk provided. If there are any error,
// the function will rety again using exponetial backoff. It starts at 1 second wait, and wait afterwards
// increases by a factor of 2 and caps at 5 minutes.
func WatchResourceWithExponentialBackoff(stopCh <-chan struct{}, client dynamic.Interface, namespace string, gvk schema.GroupVersionResource, queue workqueue.RateLimitingInterface, indexer cache.Indexer, index string) {
backoff := wait.Backoff{
Duration: 1 * time.Second,
Cap: 5 * time.Minute,
Factor: float64(2),
Steps: 10,
}
for {
select {
case <-stopCh:
return
default:
}
err := WatchResource(client, namespace, gvk, queue, indexer, index)
if err == nil {
backoff = wait.Backoff{
Duration: 1 * time.Second,
Cap: 5 * time.Minute,
Factor: float64(2),
Steps: 10,
}
continue
}
time.Sleep(backoff.Step())
}
}

// RunWorker is a long-running function that will continually call the
// processNextWorkItem function in order to read and process a message on the
// workqueue.
Expand Down
Loading

0 comments on commit 96f3ca4

Please sign in to comment.