Skip to content

Commit

Permalink
Prevent host remove when service has multiple ports and one of them i…
Browse files Browse the repository at this point in the history
…s broken
  • Loading branch information
flupec authored and Vasilii Avtaev committed Jun 23, 2021
1 parent 6fb931e commit f18be8d
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 17 deletions.
4 changes: 2 additions & 2 deletions cmd/kubefwd/services/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@ limitations under the License.
package services

import (
"context"
"fmt"
"os"
"os/signal"
"strings"
"sync"
"syscall"
"time"
"context"

"github.com/bep/debounce"
"github.com/txn2/kubefwd/pkg/fwdcfg"
Expand Down Expand Up @@ -415,7 +415,7 @@ func (opts *NamespaceOpts) AddServiceHandler(obj interface{}) {
NamespaceServiceLock: opts.NamespaceIPLock,
Svc: svc,
Headless: svc.Spec.ClusterIP == "None",
PortForwards: make(map[string]*fwdport.PortForwardOpts),
PortForwards: make(map[string][]*fwdport.PortForwardOpts),
SyncDebouncer: debounce.New(5 * time.Second),
DoneChannel: make(chan struct{}),
PortMap: opts.ParsePortMap(mappings),
Expand Down
19 changes: 17 additions & 2 deletions pkg/fwdport/fwdport.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ import (
type ServiceFWD interface {
String() string
SyncPodForwards(bool)
ListServicePodNames() []string
AddServicePod(pfo *PortForwardOpts)
GetServicePodPortForwards(servicePodName string) []*PortForwardOpts
RemoveServicePod(servicePodName string)
RemoveServicePodByPort(servicePodName string, podPort string)
}

// HostFileWithLock
Expand Down Expand Up @@ -114,7 +119,7 @@ func (p pingingDialer) Dial(protocols ...string) (httpstream.Connection, string,
_ = pingStream.Reset()
}
case <-p.pingStopChan:
log.Debug(fmt.Sprintf("Ping process stopped for %s", p.pingTargetPodName))
log.Debugf("Ping process stopped for %s", p.pingTargetPodName)
return
}
}
Expand Down Expand Up @@ -161,6 +166,7 @@ func (pfo *PortForwardOpts) PortForward() error {
go func() {
<-pfo.ManualStopChan
close(downstreamStopChannel)
pfo.ServiceFwd.RemoveServicePodByPort(pfo.String(), pfo.PodPort)
pfo.removeHosts()
pfo.removeInterfaceAlias()
close(pfStopChannel)
Expand Down Expand Up @@ -343,14 +349,19 @@ func (pfo *PortForwardOpts) AddHosts() {
// associated with a forwarded pod
func (pfo *PortForwardOpts) removeHosts() {

// We must not remove hosts entries if port-forwarding on one of the service ports is failing
if pfAmount := len(pfo.ServiceFwd.GetServicePodPortForwards(pfo.String())); pfAmount > 0 {
return
}

// we should lock the pfo.HostFile here
// because sometimes other goroutine write the *txeh.Hosts
pfo.HostFile.Lock()
// other applications or process may have written to /etc/hosts
// since it was originally updated.
err := pfo.HostFile.Hosts.Reload()
if err != nil {
log.Error("Unable to reload /etc/hosts: " + err.Error())
log.Errorf("Unable to reload /etc/hosts: %s", err.Error())
return
}

Expand Down Expand Up @@ -460,3 +471,7 @@ func (pfo *PortForwardOpts) Stop() {
}
close(pfo.ManualStopChan)
}

func (pfo *PortForwardOpts) String() string {
return pfo.Service + "." + pfo.PodName
}
77 changes: 64 additions & 13 deletions pkg/fwdservice/fwdservice.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package fwdservice

import (
"context"
"fmt"
"strconv"
"sync"
"time"
"context"

log "github.com/sirupsen/logrus"
"github.com/txn2/kubefwd/pkg/fwdnet"
Expand Down Expand Up @@ -67,8 +67,8 @@ type ServiceFWD struct {
SyncDebouncer func(f func())

// A mapping of all the pods currently being forwarded.
// key = podName
PortForwards map[string]*fwdport.PortForwardOpts
// key = PortForwardOpts.String()
PortForwards map[string][]*fwdport.PortForwardOpts
DoneChannel chan struct{} // After shutdown is complete, this channel will be closed
}

Expand Down Expand Up @@ -338,14 +338,29 @@ func (svcFwd *ServiceFWD) LoopPodsToForward(pods []v1.Pod, includePodNameInHost
// AddServicePod
func (svcFwd *ServiceFWD) AddServicePod(pfo *fwdport.PortForwardOpts) {
svcFwd.NamespaceServiceLock.Lock()
ServicePod := pfo.Service + "." + pfo.PodName
if _, found := svcFwd.PortForwards[ServicePod]; !found {
svcFwd.PortForwards[ServicePod] = pfo
if _, found := svcFwd.PortForwards[pfo.String()]; !found {
portForwardsList := make([]*fwdport.PortForwardOpts, 0)
portForwardsList = append(portForwardsList, pfo)
svcFwd.PortForwards[pfo.String()] = portForwardsList
} else {
portForwardList := svcFwd.PortForwards[pfo.String()]
if !svcFwd.contains(portForwardList, pfo) {
portForwardList = append(portForwardList, pfo)
}
}
svcFwd.NamespaceServiceLock.Unlock()
}

// ListServicePodNames
func (svcFwd *ServiceFWD) contains(portForwards []*fwdport.PortForwardOpts, pfo *fwdport.PortForwardOpts) bool {
for _, pf := range portForwards {
if pfo.String() == pf.String() && pfo.PodPort == pf.PodPort {
return true
}
}
return false
}

// ListServicePodPortNames
func (svcFwd *ServiceFWD) ListServicePodNames() []string {
svcFwd.NamespaceServiceLock.Lock()
currentPodNames := make([]string, 0, len(svcFwd.PortForwards))
Expand All @@ -356,16 +371,52 @@ func (svcFwd *ServiceFWD) ListServicePodNames() []string {
return currentPodNames
}

func (svcFwd *ServiceFWD) GetServicePodPortForwards(servicePodName string) []*fwdport.PortForwardOpts {
svcFwd.NamespaceServiceLock.Lock()
defer svcFwd.NamespaceServiceLock.Unlock()
return svcFwd.PortForwards[servicePodName]
}

// RemoveServicePod removes all PortForwardOpts from mapping
func (svcFwd *ServiceFWD) RemoveServicePod(servicePodName string) {
if pod, found := svcFwd.PortForwards[servicePodName]; found {
pod.Stop()
<-pod.DoneChan
svcFwd.NamespaceServiceLock.Lock()
delete(svcFwd.PortForwards, servicePodName)
svcFwd.NamespaceServiceLock.Unlock()
log.Debugf("Removing all pods from serviceFwd by key=%s", servicePodName)
svcFwd.removeServicePodPort(servicePodName, svcFwd.allMatch)
}

func (svcFwd *ServiceFWD) allMatch(_ *fwdport.PortForwardOpts) bool {
return true
}

// removeServicePodPort removes PortForwardOpts from mapping according to filter function
func (svcFwd *ServiceFWD) removeServicePodPort(servicePodName string, filter func(pfo *fwdport.PortForwardOpts) bool) {
svcFwd.NamespaceServiceLock.Lock()
defer svcFwd.NamespaceServiceLock.Unlock()
if pods, found := svcFwd.PortForwards[servicePodName]; found {
stay := make([]*fwdport.PortForwardOpts, 0, len(pods))
for _, pod := range pods {
if filter(pod) {
pod.Stop()
<-pod.DoneChan
} else {
stay = append(stay, pod)
}
}
if len(stay) == 0 {
delete(svcFwd.PortForwards, servicePodName)
} else {
svcFwd.PortForwards[servicePodName] = stay
}
}
}

// RemoveServicePodByPort removes PortForwardOpts from mapping by specified pod port
func (svcFwd *ServiceFWD) RemoveServicePodByPort(servicePodName string, podPort string) {
log.Debugf("Removing all pods from serviceFwd by key=%s and port=%s", servicePodName, podPort)
svcFwd.removeServicePodPort(servicePodName, func(pfo *fwdport.PortForwardOpts) bool {
return pfo.PodPort == podPort
})
}

func portSearch(portName string, containers []v1.Container) (string, bool) {
for _, container := range containers {
for _, cp := range container.Ports {
Expand Down

0 comments on commit f18be8d

Please sign in to comment.