From ad0e5edc2acbe511fdf84c2ea93709b55841987f Mon Sep 17 00:00:00 2001 From: Dobes Vandermeer Date: Tue, 20 Oct 2020 12:33:35 -0700 Subject: [PATCH] Use debounce for pod scanning Instead of syncing the pod list for a service at most once every 10 minutes, apply a debounce on changes so that we don't sync the pods list unless the service has been stable for 5 seconds or it has been more than 5 minutes since the last sync. This should help avoid cases where pods are restarted and kubefwd doesn't seem to allow connecting to them any more, because it will sync pods more often than before. --- cmd/kubefwd/services/services.go | 12 +-- pkg/fwdservice/fwdservice.go | 126 +++++++++++++++++-------------- 2 files changed, 75 insertions(+), 63 deletions(-) diff --git a/cmd/kubefwd/services/services.go b/cmd/kubefwd/services/services.go index efc54c0c..4f672ed3 100644 --- a/cmd/kubefwd/services/services.go +++ b/cmd/kubefwd/services/services.go @@ -17,11 +17,7 @@ package services import ( "fmt" - "os" - "os/signal" - "sync" - "syscall" - + "github.com/bep/debounce" "github.com/txn2/kubefwd/pkg/fwdcfg" "github.com/txn2/kubefwd/pkg/fwdhost" "github.com/txn2/kubefwd/pkg/fwdport" @@ -29,6 +25,11 @@ import ( "github.com/txn2/kubefwd/pkg/fwdsvcregistry" "github.com/txn2/kubefwd/pkg/utils" "github.com/txn2/txeh" + "os" + "os/signal" + "sync" + "syscall" + "time" log "github.com/sirupsen/logrus" "github.com/spf13/cobra" @@ -408,6 +409,7 @@ func (opts *NamespaceOpts) AddServiceHandler(obj interface{}) { Svc: svc, Headless: svc.Spec.ClusterIP == "None", PortForwards: make(map[string]*fwdport.PortForwardOpts), + SyncDebouncer: debounce.New(5 * time.Second), DoneChannel: make(chan struct{}), } diff --git a/pkg/fwdservice/fwdservice.go b/pkg/fwdservice/fwdservice.go index 44e49cf0..55b6a279 100644 --- a/pkg/fwdservice/fwdservice.go +++ b/pkg/fwdservice/fwdservice.go @@ -61,6 +61,9 @@ type ServiceFWD struct { LastSyncedAt time.Time // When was the set of pods last synced + // Use debouncer for listing pods so we don't hammer the k8s when a bunch of changes happen at once + SyncDebouncer func(f func()) + // A mapping of all the pods currently being forwarded. // key = podName PortForwards map[string]*fwdport.PortForwardOpts @@ -104,81 +107,88 @@ func (svcFwd *ServiceFWD) GetPodsForService() []v1.Pod { // the forwarding setup for that or those pod(s). It will remove pods in-mem // that are no longer returned by k8s, should these not be correctly deleted. func (svcFwd *ServiceFWD) SyncPodForwards(force bool) { + sync := func() { - // When a whole set of pods gets deleted at once, they all will trigger a - // SyncPodForwards() call. This would hammer k8s with load needlessly. - // Therefore keep a timestamp from when this was last called and only allow - // call if the previous one was not too recent. - if !force && time.Since(svcFwd.LastSyncedAt) < 10*time.Minute { - log.Debugf("Skipping pods refresh for %s due to rate limiting", svcFwd) - return - } - defer func() { svcFwd.LastSyncedAt = time.Now() }() - - k8sPods := svcFwd.GetPodsForService() - - // If no pods are found currently. Will try again next re-sync period. - if len(k8sPods) == 0 { - log.Warnf("WARNING: No Running Pods returned for service %s", svcFwd) - return - } - - // Check if the pods currently being forwarded still exist in k8s and if - // they are not in a (pre-)running state, if not: remove them - for _, podName := range svcFwd.ListServicePodNames() { - keep := false - for _, pod := range k8sPods { - if podName == pod.Name && (pod.Status.Phase == v1.PodPending || pod.Status.Phase == v1.PodRunning) { - keep = true - break - } - } - if !keep { - svcFwd.RemoveServicePod(podName) - } - } + defer func() { svcFwd.LastSyncedAt = time.Now() }() - // Set up port-forwarding for one or all of these pods normal service - // port-forward the first pod as service name. headless service not only - // forward first Pod as service name, but also port-forward all pods. - if len(k8sPods) != 0 { + k8sPods := svcFwd.GetPodsForService() - // if this is a headless service forward the first pod from the - // service name, then subsequent pods from their pod name - if svcFwd.Headless { - svcFwd.LoopPodsToForward([]v1.Pod{k8sPods[0]}, false) - svcFwd.LoopPodsToForward(k8sPods, true) + // If no pods are found currently. Will try again next re-sync period. + if len(k8sPods) == 0 { + log.Warnf("WARNING: No Running Pods returned for service %s", svcFwd) return } - // Check if currently we are forwarding a pod which is good to keep using - podNameToKeep := "" + // Check if the pods currently being forwarded still exist in k8s and if + // they are not in a (pre-)running state, if not: remove them for _, podName := range svcFwd.ListServicePodNames() { - if podNameToKeep != "" { - break - } + keep := false for _, pod := range k8sPods { if podName == pod.Name && (pod.Status.Phase == v1.PodPending || pod.Status.Phase == v1.PodRunning) { - podNameToKeep = pod.Name + keep = true break } } - } - - // Stop forwarding others, should there be. In case none of the currently - // forwarded pods are good to keep, podNameToKeep will be the empty string, - // and the comparison will mean we will remove all pods, which is the desired behaviour. - for _, podName := range svcFwd.ListServicePodNames() { - if podName != podNameToKeep { + if !keep { svcFwd.RemoveServicePod(podName) } } - // If no good pod was being forwarded already, start one - if podNameToKeep == "" { - svcFwd.LoopPodsToForward([]v1.Pod{k8sPods[0]}, false) - } + // Set up port-forwarding for one or all of these pods normal service + // port-forward the first pod as service name. headless service not only + // forward first Pod as service name, but also port-forward all pods. + if len(k8sPods) != 0 { + + // if this is a headless service forward the first pod from the + // service name, then subsequent pods from their pod name + if svcFwd.Headless { + svcFwd.LoopPodsToForward([]v1.Pod{k8sPods[0]}, false) + svcFwd.LoopPodsToForward(k8sPods, true) + return + } + + // Check if currently we are forwarding a pod which is good to keep using + podNameToKeep := "" + for _, podName := range svcFwd.ListServicePodNames() { + if podNameToKeep != "" { + break + } + for _, pod := range k8sPods { + if podName == pod.Name && (pod.Status.Phase == v1.PodPending || pod.Status.Phase == v1.PodRunning) { + podNameToKeep = pod.Name + break + } + } + } + + // Stop forwarding others, should there be. In case none of the currently + // forwarded pods are good to keep, podNameToKeep will be the empty string, + // and the comparison will mean we will remove all pods, which is the desired behaviour. + for _, podName := range svcFwd.ListServicePodNames() { + if podName != podNameToKeep { + svcFwd.RemoveServicePod(podName) + } + } + // If no good pod was being forwarded already, start one + if podNameToKeep == "" { + svcFwd.LoopPodsToForward([]v1.Pod{k8sPods[0]}, false) + } + } + } + // When a whole set of pods gets deleted at once, they all will trigger a SyncPodForwards() call. + // This would hammer k8s with load needlessly. We therefore use a debouncer to only update pods + // if things have been stable for at least a few seconds. However, if things never stabilize we + // will still reload this information at least once every 5 minutes. + if force || time.Since(svcFwd.LastSyncedAt) > 5*time.Minute { + // Replace current debounced function with no-op + svcFwd.SyncDebouncer(func() {}) + + // Do the syncing work + sync() + } else { + // Queue sync + svcFwd.SyncDebouncer(sync) } }