Skip to content

Commit

Permalink
Use debounce for pod scanning
Browse files Browse the repository at this point in the history
Instead of syncing the pod list for a service at most once every 10 minutes, apply a debounce on
changes so that we don't sync the pods list unless the service has been stable for 5 seconds or it has been more than 5 minutes since the last sync.

This should help avoid cases where pods are restarted and kubefwd doesn't seem to allow connecting to them any more, because it will sync pods more often than before.
  • Loading branch information
dobesv committed Oct 20, 2020
1 parent 79af4b7 commit ad0e5ed
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 63 deletions.
12 changes: 7 additions & 5 deletions cmd/kubefwd/services/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,19 @@ package services

import (
"fmt"
"os"
"os/signal"
"sync"
"syscall"

"github.com/bep/debounce"
"github.com/txn2/kubefwd/pkg/fwdcfg"
"github.com/txn2/kubefwd/pkg/fwdhost"
"github.com/txn2/kubefwd/pkg/fwdport"
"github.com/txn2/kubefwd/pkg/fwdservice"
"github.com/txn2/kubefwd/pkg/fwdsvcregistry"
"github.com/txn2/kubefwd/pkg/utils"
"github.com/txn2/txeh"
"os"
"os/signal"
"sync"
"syscall"
"time"

log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
Expand Down Expand Up @@ -408,6 +409,7 @@ func (opts *NamespaceOpts) AddServiceHandler(obj interface{}) {
Svc: svc,
Headless: svc.Spec.ClusterIP == "None",
PortForwards: make(map[string]*fwdport.PortForwardOpts),
SyncDebouncer: debounce.New(5 * time.Second),
DoneChannel: make(chan struct{}),
}

Expand Down
126 changes: 68 additions & 58 deletions pkg/fwdservice/fwdservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ type ServiceFWD struct {

LastSyncedAt time.Time // When was the set of pods last synced

// Use debouncer for listing pods so we don't hammer the k8s when a bunch of changes happen at once
SyncDebouncer func(f func())

// A mapping of all the pods currently being forwarded.
// key = podName
PortForwards map[string]*fwdport.PortForwardOpts
Expand Down Expand Up @@ -104,81 +107,88 @@ func (svcFwd *ServiceFWD) GetPodsForService() []v1.Pod {
// the forwarding setup for that or those pod(s). It will remove pods in-mem
// that are no longer returned by k8s, should these not be correctly deleted.
func (svcFwd *ServiceFWD) SyncPodForwards(force bool) {
sync := func() {

// When a whole set of pods gets deleted at once, they all will trigger a
// SyncPodForwards() call. This would hammer k8s with load needlessly.
// Therefore keep a timestamp from when this was last called and only allow
// call if the previous one was not too recent.
if !force && time.Since(svcFwd.LastSyncedAt) < 10*time.Minute {
log.Debugf("Skipping pods refresh for %s due to rate limiting", svcFwd)
return
}
defer func() { svcFwd.LastSyncedAt = time.Now() }()

k8sPods := svcFwd.GetPodsForService()

// If no pods are found currently. Will try again next re-sync period.
if len(k8sPods) == 0 {
log.Warnf("WARNING: No Running Pods returned for service %s", svcFwd)
return
}

// Check if the pods currently being forwarded still exist in k8s and if
// they are not in a (pre-)running state, if not: remove them
for _, podName := range svcFwd.ListServicePodNames() {
keep := false
for _, pod := range k8sPods {
if podName == pod.Name && (pod.Status.Phase == v1.PodPending || pod.Status.Phase == v1.PodRunning) {
keep = true
break
}
}
if !keep {
svcFwd.RemoveServicePod(podName)
}
}
defer func() { svcFwd.LastSyncedAt = time.Now() }()

// Set up port-forwarding for one or all of these pods normal service
// port-forward the first pod as service name. headless service not only
// forward first Pod as service name, but also port-forward all pods.
if len(k8sPods) != 0 {
k8sPods := svcFwd.GetPodsForService()

// if this is a headless service forward the first pod from the
// service name, then subsequent pods from their pod name
if svcFwd.Headless {
svcFwd.LoopPodsToForward([]v1.Pod{k8sPods[0]}, false)
svcFwd.LoopPodsToForward(k8sPods, true)
// If no pods are found currently. Will try again next re-sync period.
if len(k8sPods) == 0 {
log.Warnf("WARNING: No Running Pods returned for service %s", svcFwd)
return
}

// Check if currently we are forwarding a pod which is good to keep using
podNameToKeep := ""
// Check if the pods currently being forwarded still exist in k8s and if
// they are not in a (pre-)running state, if not: remove them
for _, podName := range svcFwd.ListServicePodNames() {
if podNameToKeep != "" {
break
}
keep := false
for _, pod := range k8sPods {
if podName == pod.Name && (pod.Status.Phase == v1.PodPending || pod.Status.Phase == v1.PodRunning) {
podNameToKeep = pod.Name
keep = true
break
}
}
}

// Stop forwarding others, should there be. In case none of the currently
// forwarded pods are good to keep, podNameToKeep will be the empty string,
// and the comparison will mean we will remove all pods, which is the desired behaviour.
for _, podName := range svcFwd.ListServicePodNames() {
if podName != podNameToKeep {
if !keep {
svcFwd.RemoveServicePod(podName)
}
}

// If no good pod was being forwarded already, start one
if podNameToKeep == "" {
svcFwd.LoopPodsToForward([]v1.Pod{k8sPods[0]}, false)
}
// Set up port-forwarding for one or all of these pods normal service
// port-forward the first pod as service name. headless service not only
// forward first Pod as service name, but also port-forward all pods.
if len(k8sPods) != 0 {

// if this is a headless service forward the first pod from the
// service name, then subsequent pods from their pod name
if svcFwd.Headless {
svcFwd.LoopPodsToForward([]v1.Pod{k8sPods[0]}, false)
svcFwd.LoopPodsToForward(k8sPods, true)
return
}

// Check if currently we are forwarding a pod which is good to keep using
podNameToKeep := ""
for _, podName := range svcFwd.ListServicePodNames() {
if podNameToKeep != "" {
break
}
for _, pod := range k8sPods {
if podName == pod.Name && (pod.Status.Phase == v1.PodPending || pod.Status.Phase == v1.PodRunning) {
podNameToKeep = pod.Name
break
}
}
}

// Stop forwarding others, should there be. In case none of the currently
// forwarded pods are good to keep, podNameToKeep will be the empty string,
// and the comparison will mean we will remove all pods, which is the desired behaviour.
for _, podName := range svcFwd.ListServicePodNames() {
if podName != podNameToKeep {
svcFwd.RemoveServicePod(podName)
}
}

// If no good pod was being forwarded already, start one
if podNameToKeep == "" {
svcFwd.LoopPodsToForward([]v1.Pod{k8sPods[0]}, false)
}
}
}
// When a whole set of pods gets deleted at once, they all will trigger a SyncPodForwards() call.
// This would hammer k8s with load needlessly. We therefore use a debouncer to only update pods
// if things have been stable for at least a few seconds. However, if things never stabilize we
// will still reload this information at least once every 5 minutes.
if force || time.Since(svcFwd.LastSyncedAt) > 5*time.Minute {
// Replace current debounced function with no-op
svcFwd.SyncDebouncer(func() {})

// Do the syncing work
sync()
} else {
// Queue sync
svcFwd.SyncDebouncer(sync)
}
}

Expand Down

0 comments on commit ad0e5ed

Please sign in to comment.