Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implement watch for Istio resources #354

Merged
merged 3 commits into from
Jan 21, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion cmd/rollouts-controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,11 @@ func newCommand() *cobra.Command {
kubeinformers.WithTweakListOptions(func(options *metav1.ListOptions) {
options.LabelSelector = jobprovider.AnalysisRunUIDLabelKey
}))
cm := controller.NewManager(kubeClient, rolloutClient, dynamicClient,
cm := controller.NewManager(
namespace,
kubeClient,
rolloutClient,
dynamicClient,
kubeInformerFactory.Apps().V1().ReplicaSets(),
kubeInformerFactory.Core().V1().Services(),
jobInformerFactory.Batch().V1().Jobs(),
Expand Down
5 changes: 5 additions & 0 deletions controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,13 @@ type Manager struct {
serviceWorkqueue workqueue.RateLimitingInterface
experimentWorkqueue workqueue.RateLimitingInterface
analysisRunWorkqueue workqueue.RateLimitingInterface

defaultIstioVersion string
}

// NewManager returns a new manager to manage all the controllers
func NewManager(
namespace string,
kubeclientset kubernetes.Interface,
argoprojclientset clientset.Interface,
dynamicclientset dynamic.Interface,
Expand Down Expand Up @@ -113,6 +116,7 @@ func NewManager(
metricsServer := metrics.NewMetricsServer(metricsAddr, rolloutsInformer.Lister())

rolloutController := rollout.NewRolloutController(
namespace,
kubeclientset,
argoprojclientset,
dynamicclientset,
Expand Down Expand Up @@ -178,6 +182,7 @@ func NewManager(
serviceController: serviceController,
experimentController: experimentController,
analysisController: analysisController,
defaultIstioVersion: defaultIstioVersion,
}

return cm
Expand Down
1 change: 1 addition & 0 deletions rollout/bluegreen.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ func (c *RolloutController) rolloutBlueGreen(r *v1alpha1.Rollout, rsList []*apps
allRSs := roCtx.AllRSs()
if reconcileBlueGreenTemplateChange(roCtx) {
roCtx.PauseContext().ClearPauseConditions()
roCtx.PauseContext().RemoveAbort()
logCtx.Infof("New pod template or template change detected")
return c.syncRolloutStatusBlueGreen(previewSvc, activeSvc, roCtx)
}
Expand Down
24 changes: 24 additions & 0 deletions rollout/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
log "github.com/sirupsen/logrus"
appsv1 "k8s.io/api/apps/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/dynamic"
appsinformers "k8s.io/client-go/informers/apps/v1"
Expand All @@ -19,6 +20,7 @@ import (
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"k8s.io/kubernetes/cmd/kubeadm/app/util"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/utils/pointer"

Expand All @@ -28,15 +30,22 @@ import (
clientset "github.com/argoproj/argo-rollouts/pkg/client/clientset/versioned"
informers "github.com/argoproj/argo-rollouts/pkg/client/informers/externalversions/rollouts/v1alpha1"
listers "github.com/argoproj/argo-rollouts/pkg/client/listers/rollouts/v1alpha1"
"github.com/argoproj/argo-rollouts/rollout/trafficrouting/istio"
"github.com/argoproj/argo-rollouts/utils/conditions"
controllerutil "github.com/argoproj/argo-rollouts/utils/controller"
"github.com/argoproj/argo-rollouts/utils/defaults"
logutil "github.com/argoproj/argo-rollouts/utils/log"
serviceutil "github.com/argoproj/argo-rollouts/utils/service"
)

const (
virtualServiceIndexName = "byVirtualService"
)

// RolloutController is the controller implementation for Rollout resources
type RolloutController struct {
// namespace which namespace(s) operates on
namespace string
// rsControl is used for adopting/releasing replica sets.
replicaSetControl controller.RSControlInterface

Expand Down Expand Up @@ -80,6 +89,7 @@ type RolloutController struct {

// NewRolloutController returns a new rollout controller
func NewRolloutController(
namespace string,
kubeclientset kubernetes.Interface,
argoprojclientset clientset.Interface,
dynamicclientset dynamic.Interface,
Expand All @@ -102,6 +112,7 @@ func NewRolloutController(
}

controller := &RolloutController{
namespace: namespace,
kubeclientset: kubeclientset,
argoprojclientset: argoprojclientset,
dynamicclientset: dynamicclientset,
Expand Down Expand Up @@ -146,6 +157,15 @@ func NewRolloutController(
},
})

util.CheckErr(rolloutsInformer.Informer().AddIndexers(cache.Indexers{
virtualServiceIndexName: func(obj interface{}) (strings []string, e error) {
if rollout, ok := obj.(*v1alpha1.Rollout); ok {
return istio.GetRolloutVirtualServiceKeys(rollout), nil
}
return
},
}))

replicaSetInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
controllerutil.EnqueueParentObject(obj, register.RolloutKind, controller.enqueueRollout)
Expand Down Expand Up @@ -197,6 +217,10 @@ func (c *RolloutController) Run(threadiness int, stopCh <-chan struct{}) error {
}, time.Second, stopCh)
}
log.Info("Started Rollout workers")

gvk := schema.ParseGroupResource("virtualservices.networking.istio.io").WithVersion(c.defaultIstioVersion)
go controllerutil.WatchResourceWithExponentialBackoff(stopCh, c.dynamicclientset, c.namespace, gvk, c.rolloutWorkqueue, c.rolloutsIndexer, virtualServiceIndexName)

<-stopCh
log.Info("Shutting down workers")

Expand Down
6 changes: 5 additions & 1 deletion rollout/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,11 @@ func (f *fixture) newController(resync resyncFunc) (*RolloutController, informer
rolloutWorkqueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Rollouts")
serviceWorkqueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Services")

c := NewRolloutController(f.kubeclient, f.client, nil,
c := NewRolloutController(
metav1.NamespaceAll,
f.kubeclient,
f.client,
nil,
i.Argoproj().V1alpha1().Experiments(),
i.Argoproj().V1alpha1().AnalysisRuns(),
i.Argoproj().V1alpha1().AnalysisTemplates(),
Expand Down
17 changes: 17 additions & 0 deletions rollout/trafficrouting/istio/istio.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,23 @@ func NewReconciler(r *v1alpha1.Rollout, client dynamic.Interface, recorder recor
}
}

// GetRolloutVirtualServiceKeys gets the virtual service and its namespace from a rollout
func GetRolloutVirtualServiceKeys(rollout *v1alpha1.Rollout) []string {
if rollout.Spec.Strategy.Canary == nil {
return []string{}
}
if rollout.Spec.Strategy.Canary.TrafficRouting == nil {
return []string{}
}
if rollout.Spec.Strategy.Canary.TrafficRouting.Istio == nil {
return []string{}
}
if rollout.Spec.Strategy.Canary.TrafficRouting.Istio.VirtualService.Name == "" {
return []string{}
}
return []string{fmt.Sprintf("%s/%s", rollout.Namespace, rollout.Spec.Strategy.Canary.TrafficRouting.Istio.VirtualService.Name)}
jessesuen marked this conversation as resolved.
Show resolved Hide resolved
}

// Reconciler holds required fields to reconcile Istio resources
type Reconciler struct {
rollout *v1alpha1.Rollout
Expand Down
25 changes: 25 additions & 0 deletions rollout/trafficrouting/istio/istio_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,3 +291,28 @@ func TestValidateHosts(t *testing.T) {
err = validateHosts(hr, "stable", "not-found-canary")
assert.Equal(t, fmt.Errorf("Canary Service 'not-found-canary' not found in route"), err)
}

func TestGetRolloutVirtualServiceKeys(t *testing.T) {
ro := &v1alpha1.Rollout{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Namespace: metav1.NamespaceDefault,
},
Spec: v1alpha1.RolloutSpec{
Strategy: v1alpha1.RolloutStrategy{},
},
}
assert.Len(t, GetRolloutVirtualServiceKeys(ro), 0)
ro.Spec.Strategy.Canary = &v1alpha1.CanaryStrategy{}
assert.Len(t, GetRolloutVirtualServiceKeys(ro), 0)
ro.Spec.Strategy.Canary.TrafficRouting = &v1alpha1.RolloutTrafficRouting{}
assert.Len(t, GetRolloutVirtualServiceKeys(ro), 0)
ro.Spec.Strategy.Canary.TrafficRouting.Istio = &v1alpha1.IstioTrafficRouting{
VirtualService: v1alpha1.IstioVirtualService{},
}
assert.Len(t, GetRolloutVirtualServiceKeys(ro), 0)
ro.Spec.Strategy.Canary.TrafficRouting.Istio.VirtualService.Name = "test"
keys := GetRolloutVirtualServiceKeys(ro)
assert.Len(t, keys, 1)
assert.Equal(t, keys[0], "default/test")
}
71 changes: 71 additions & 0 deletions utils/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,15 @@ import (
"time"

log "github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/selection"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"

Expand All @@ -18,6 +23,72 @@ import (
logutil "github.com/argoproj/argo-rollouts/utils/log"
)

// processNextWatchObj will process a single object from the watch by seeing if
// that object is in an index and enqueueing the value object from the indexer
func processNextWatchObj(watchEvent watch.Event, queue workqueue.RateLimitingInterface, indexer cache.Indexer, index string) {
obj := watchEvent.Object
acc, err := meta.Accessor(obj)
if err != nil {
log.Errorf("Error processing object from watch: %v", err)
return
Copy link
Member

@jessesuen jessesuen Jan 18, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason you don't surface these errors to the caller? I'm afraid that we might get into an error hotloop which we will not back off from with the exponential backoff

Actually, nevermind, we might get here because of normal stream closes, which we would not want to backoff from, and is covered by the closing of the ResultChan()

}
objsToEnqueue, err := indexer.ByIndex(index, fmt.Sprintf("%s/%s", acc.GetNamespace(), acc.GetName()))
if err != nil {
log.Errorf("Cannot process indexer: %s", err.Error())
return
}
for i := range objsToEnqueue {
Enqueue(objsToEnqueue[i], queue)
}
}

// WatchResource is a long-running function that will continually call the
// processNextWatchObj function in order to watch changes on a resource kind
// and enqueue a different resources kind that interact with them
func WatchResource(client dynamic.Interface, namespace string, gvk schema.GroupVersionResource, queue workqueue.RateLimitingInterface, indexer cache.Indexer, index string) error {
log.Infof("Starting watch on resource '%s'", gvk.Resource)
var watchI watch.Interface
var err error
if namespace == metav1.NamespaceAll {
watchI, err = client.Resource(gvk).Watch(metav1.ListOptions{})
} else {
watchI, err = client.Resource(gvk).Namespace(namespace).Watch(metav1.ListOptions{})
}

if err != nil {
log.Errorf("Error with watch: %v", err)
return err
}
for watchEvent := range watchI.ResultChan() {
processNextWatchObj(watchEvent, queue, indexer, index)
}
return nil
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There should be a watchI.Stop() call which will clean up resources held by the watch.

}

// WatchResourceWithExponentialBackoff creates a watch for the gvk provided. If there are any error,
// the function will rety again using exponetial backoff. It starts at 1 second wait, and wait afterwards
// increases by a factor of 2 and caps at 5 minutes.
func WatchResourceWithExponentialBackoff(stopCh <-chan struct{}, client dynamic.Interface, namespace string, gvk schema.GroupVersionResource, queue workqueue.RateLimitingInterface, indexer cache.Indexer, index string) {
backoff := wait.Backoff{
Duration: 1 * time.Second,
Cap: 5 * time.Minute,
Factor: float64(2),
Steps: 10,
}
for {
select {
case <-stopCh:
return
default:
}
err := WatchResource(client, namespace, gvk, queue, indexer, index)
if err == nil {
continue
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor, but If there is no error, I would suggest that we "reset" the exponential backoff by re-initializing the backoff variable.

}
time.Sleep(backoff.Step())
}
}
jessesuen marked this conversation as resolved.
Show resolved Hide resolved

// RunWorker is a long-running function that will continually call the
// processNextWorkItem function in order to read and process a message on the
// workqueue.
Expand Down
Loading