Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add configMap informer #326

Merged
merged 2 commits into from
Dec 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 5 additions & 9 deletions interceptor/config/serving.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package config

import (
"time"

"github.com/kelseyhightower/envconfig"
)

Expand All @@ -16,15 +18,9 @@ type Serving struct {
// This is the server that the external scaler will issue metrics
// requests to
AdminPort int `envconfig:"KEDA_HTTP_ADMIN_PORT" required:"true"`
// RoutingTableUpdateDurationMS is the interval (in milliseconds) representing how
// often to do a complete update of the routing table ConfigMap.
//
// The interceptor will also open a watch stream to the routing table
// ConfigMap and attempt to update the routing table on every update.
//
// Since it does full updates alongside watch stream updates, it can
// only process one at a time. Therefore, this is a best effort timeout
RoutingTableUpdateDurationMS int `envconfig:"KEDA_HTTP_ROUTING_TABLE_UPDATE_DURATION_MS" default:"500"`
// ConfigMapCacheRsyncPeriod is the time interval
// for the configmap informer to rsync the local cache.
ConfigMapCacheRsyncPeriod time.Duration `envconfig:"KEDA_HTTP_SCALER_CONFIG_MAP_INFORMER_RSYNC_PERIOD" default:"60m"`
arschles marked this conversation as resolved.
Show resolved Hide resolved
// The interceptor has an internal process that periodically fetches the state
// of deployment that is running the servers it forwards to.
//
Expand Down
14 changes: 12 additions & 2 deletions interceptor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,15 @@ func main() {
q := queue.NewMemory()
routingTable := routing.NewTable()

// Create the informer of ConfigMap resource,
// the resynchronization period of the informer should be not less than 1s,
// refer to: https://github.com/kubernetes/client-go/blob/v0.22.2/tools/cache/shared_informer.go#L475
configMapInformer := k8s.NewInformerConfigMapUpdater(
lggr,
cl,
servingCfg.ConfigMapCacheRsyncPeriod,
)

lggr.Info(
"Fetching initial routing table",
)
Expand Down Expand Up @@ -109,10 +118,11 @@ func main() {
err := routing.StartConfigMapRoutingTableUpdater(
ctx,
lggr,
time.Duration(servingCfg.RoutingTableUpdateDurationMS)*time.Millisecond,
configMapsInterface,
configMapInformer,
servingCfg.CurrentNamespace,
routingTable,
q,
nil,
)
lggr.Error(err, "config map routing table updater failed")
return err
Expand Down
132 changes: 132 additions & 0 deletions pkg/k8s/config_map_cache_informer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
package k8s

import (
"context"
"encoding/json"
"fmt"
"time"

"github.com/go-logr/logr"
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/informers"
infcorev1 "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
)

type InformerConfigMapUpdater struct {
lggr logr.Logger
cmInformer infcorev1.ConfigMapInformer
bcaster *watch.Broadcaster
}

func (i *InformerConfigMapUpdater) MarshalJSON() ([]byte, error) {
lst := i.cmInformer.Lister()
cms, err := lst.List(labels.Everything())
if err != nil {
return nil, err
}
return json.Marshal(&cms)
}

func (i *InformerConfigMapUpdater) Start(ctx context.Context) error {
i.cmInformer.Informer().Run(ctx.Done())
return errors.Wrap(
ctx.Err(),
"configMap informer was stopped",
)
}

func (i *InformerConfigMapUpdater) Get(
ns,
name string,
) (corev1.ConfigMap, error) {
cm, err := i.cmInformer.Lister().ConfigMaps(ns).Get(name)
if err != nil {
return corev1.ConfigMap{}, err
}
return *cm, nil
}

func (i *InformerConfigMapUpdater) Watch(
ns,
name string,
) watch.Interface {
return watch.Filter(i.bcaster.Watch(), func(e watch.Event) (watch.Event, bool) {
cm, ok := e.Object.(*corev1.ConfigMap)
if !ok {
i.lggr.Error(
fmt.Errorf("informer expected ConfigMap, ignoring this event"),
"event",
e,
)
return e, false
}
if cm.Namespace == ns && cm.Name == name {
return e, true
}
return e, false
})
}

func (i *InformerConfigMapUpdater) addEvtHandler(obj interface{}) {
cm, ok := obj.(*corev1.ConfigMap)
if !ok {
i.lggr.Error(
fmt.Errorf("informer expected configMap, got %v", obj),
"not forwarding event",
)
return
}
i.bcaster.Action(watch.Added, cm)
}

func (i *InformerConfigMapUpdater) updateEvtHandler(oldObj, newObj interface{}) {
cm, ok := newObj.(*corev1.ConfigMap)
if !ok {
i.lggr.Error(
fmt.Errorf("informer expected configMap, got %v", newObj),
"not forwarding event",
)
return
}
i.bcaster.Action(watch.Modified, cm)
}

func (i *InformerConfigMapUpdater) deleteEvtHandler(obj interface{}) {
cm, ok := obj.(*corev1.ConfigMap)
if !ok {
i.lggr.Error(
fmt.Errorf("informer expected configMap, got %v", obj),
"not forwarding event",
)
return
}
i.bcaster.Action(watch.Deleted, cm)
}

func NewInformerConfigMapUpdater(
lggr logr.Logger,
cl kubernetes.Interface,
defaultResync time.Duration,
) *InformerConfigMapUpdater {
factory := informers.NewSharedInformerFactory(
cl,
defaultResync,
)
cmInformer := factory.Core().V1().ConfigMaps()
ret := &InformerConfigMapUpdater{
lggr: lggr,
bcaster: watch.NewBroadcaster(0, watch.WaitIfChannelFull),
cmInformer: cmInformer,
}
ret.cmInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: ret.addEvtHandler,
UpdateFunc: ret.updateEvtHandler,
DeleteFunc: ret.deleteEvtHandler,
})
return ret
}
80 changes: 44 additions & 36 deletions pkg/routing/config_map_updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,13 @@ package routing

import (
"context"
"time"

"github.com/go-logr/logr"
"github.com/kedacore/http-add-on/pkg/k8s"
"github.com/kedacore/http-add-on/pkg/queue"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
)

// StartConfigMapRoutingTableUpdater starts a loop that does the following:
Expand All @@ -21,49 +19,42 @@ import (
// called ConfigMapRoutingTableName. On either of those events, decodes
// that ConfigMap into a routing table and stores the new table into table
// using table.Replace(newTable)
// - Execute the callback function, if one exists
// - Returns an appropriate non-nil error if ctx.Done() receives
func StartConfigMapRoutingTableUpdater(
ctx context.Context,
lggr logr.Logger,
updateEvery time.Duration,
getterWatcher k8s.ConfigMapGetterWatcher,
cmInformer *k8s.InformerConfigMapUpdater,
ns string,
table *Table,
q queue.Counter,
cbFunc func() error,
) error {
lggr = lggr.WithName("pkg.routing.StartConfigMapRoutingTableUpdater")
watchIface, err := getterWatcher.Watch(ctx, metav1.ListOptions{})
if err != nil {
return err
}
defer watchIface.Stop()

ticker := time.NewTicker(updateEvery)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return errors.Wrap(ctx.Err(), "context is done")
case <-ticker.C:
if err := GetTable(ctx, lggr, getterWatcher, table, q); err != nil {
return errors.Wrap(err, "failed to fetch routing table")
}
watcher := cmInformer.Watch(ns, ConfigMapRoutingTableName)
defer watcher.Stop()

case evt := <-watchIface.ResultChan():
evtType := evt.Type
obj := evt.Object
if evtType == watch.Added || evtType == watch.Modified {
cm, ok := obj.(*corev1.ConfigMap)
// by definition of watchIface, all returned objects should
// be assertable to a ConfigMap. In the likely impossible
// case that it isn't, just ignore and move on.
// This check is just to be defensive.
ctx, done := context.WithCancel(ctx)
defer done()
grp, ctx := errgroup.WithContext(ctx)

grp.Go(func() error {
defer done()
return cmInformer.Start(ctx)
})

grp.Go(func() error {
defer done()
for {
tpiperatgod marked this conversation as resolved.
Show resolved Hide resolved
select {
case event := <-watcher.ResultChan():
cm, ok := event.Object.(*corev1.ConfigMap)
// Theoretically this will not happen
if !ok {
continue
}
// the watcher is open on all ConfigMaps in the namespace, so
// bail out of this loop iteration immediately if the event
// isn't for the routing table ConfigMap.
if cm.Name != ConfigMapRoutingTableName {
lggr.Info(
"The event object observed is not a configmap",
)
continue
}
newTable, err := FetchTableFromConfigMap(cm, q)
Expand All @@ -81,8 +72,25 @@ func StartConfigMapRoutingTableUpdater(
)
continue
}
// Execute the callback function, if one exists
if cbFunc != nil {
if err := cbFunc(); err != nil {
lggr.Error(
err,
"failed to exec the callback function",
)
continue
}
}
case <-ctx.Done():
return errors.Wrap(ctx.Err(), "context is done")
}
}
}
})

if err := grp.Wait(); err != nil {
lggr.Error(err, "config map routing updater is failed")
return err
}
return nil
}
Loading