Skip to content

Commit

Permalink
Correct hosts entries remove
Browse files Browse the repository at this point in the history
  • Loading branch information
flupec authored and Vasilii Avtaev committed Jul 2, 2021
1 parent 99daa4a commit 938836b
Show file tree
Hide file tree
Showing 7 changed files with 448 additions and 60 deletions.
2 changes: 2 additions & 0 deletions cmd/kubefwd/services/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,8 @@ func (opts *NamespaceOpts) AddServiceHandler(obj interface{}) {
SyncDebouncer: debounce.New(5 * time.Second),
DoneChannel: make(chan struct{}),
PortMap: opts.ParsePortMap(mappings),
ManualStopChannel: opts.ManualStopChannel,
WaitAllPodsShutdown: &sync.WaitGroup{},
}

// Add the service to the catalog of services being forwarded
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ require (
github.com/pkg/errors v0.9.1
github.com/sirupsen/logrus v1.6.0
github.com/spf13/cobra v1.1.1
github.com/stretchr/testify v1.6.1
github.com/txn2/txeh v1.2.1
golang.org/x/sys v0.0.0-20201112073958-5cba982894dd
k8s.io/api v0.20.4
Expand Down
144 changes: 104 additions & 40 deletions pkg/fwdport/fwdport.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package fwdport
import (
"context"
"fmt"
"io"
"net"
"net/http"
"strconv"
Expand Down Expand Up @@ -34,10 +35,21 @@ type ServiceFWD interface {
ListServicePodNames() []string
AddServicePod(pfo *PortForwardOpts)
GetServicePodPortForwards(servicePodName string) []*PortForwardOpts
RemoveServicePod(servicePodName string)
RemoveServicePodByPort(servicePodName string, podPort string)
RemoveServicePod(servicePodName string, stop bool)
RemoveServicePodByPort(servicePodName string, podPort string, stop bool)
}

type PortForwardHelper interface {
GetPortForwardRequest(pfo *PortForwardOpts) *restclient.Request
NewOnAddresses(dialer httpstream.Dialer, addresses []string, ports []string, stopChan <-chan struct{}, readyChan chan struct{}, out, errOut io.Writer) (*portforward.PortForwarder, error)
ForwardPorts(forwarder *portforward.PortForwarder) error

RoundTripperFor(config *restclient.Config) (http.RoundTripper, spdy.Upgrader, error)
NewDialer(upgrader spdy.Upgrader, client *http.Client, method string, pfRequest *restclient.Request) httpstream.Dialer
}

type PortForwardHelperImpl struct {}

// HostFileWithLock
type HostFileWithLock struct {
Hosts *txeh.Hosts
Expand All @@ -57,7 +69,7 @@ type PortForwardOpts struct {
Out *fwdpub.Publisher
Config restclient.Config
ClientSet kubernetes.Clientset
RESTClient restclient.RESTClient
RESTClient restclient.Interface

Service string
ServiceFwd ServiceFWD
Expand Down Expand Up @@ -85,11 +97,13 @@ type PortForwardOpts struct {
// while > 0 is an external namespace
NamespaceN int

Domain string
HostsParams *HostsParams
Hosts []string
ManualStopChan chan struct{} // Send a signal on this to stop the portforwarding
DoneChan chan struct{} // Listen on this channel for when the shutdown is completed.
Domain string
HostsParams *HostsParams
Hosts []string
ManualStopChan chan struct{} // Send a signal on this to stop the portforwarding
DoneChan chan struct{} // Listen on this channel for when the shutdown is completed.
StateWaiter PodStateWaiter
PortForwardHelper PortForwardHelper
}

type pingingDialer struct {
Expand Down Expand Up @@ -127,10 +141,10 @@ func (p pingingDialer) Dial(protocols ...string) (httpstream.Connection, string,
// PortForward does the port-forward for a single pod.
// It is a blocking call and will return when an error occurred
// or after a cancellation signal has been received.
func (pfo *PortForwardOpts) PortForward() error {
func PortForward(pfo *PortForwardOpts) error {
defer close(pfo.DoneChan)

transport, upgrader, err := spdy.RoundTripperFor(&pfo.Config)
transport, upgrader, err := pfo.PortForwardHelper.RoundTripperFor(&pfo.Config)
if err != nil {
return err
}
Expand All @@ -142,14 +156,7 @@ func (pfo *PortForwardOpts) PortForward() error {
}

fwdPorts := []string{fmt.Sprintf("%s:%s", pfo.LocalPort, pfo.PodPort)}

// if need to set timeout, set it here.
// restClient.Client.Timeout = 32
req := pfo.RESTClient.Post().
Resource("pods").
Namespace(pfo.Namespace).
Name(pfo.PodName).
SubResource("portforward")
req := pfo.PortForwardHelper.GetPortForwardRequest(pfo)

pfStopChannel := make(chan struct{}, 1) // Signal that k8s forwarding takes as input for us to signal when to stop
downstreamStopChannel := make(chan struct{}) // @TODO: can this be the same as pfStopChannel?
Expand All @@ -158,19 +165,15 @@ func (pfo *PortForwardOpts) PortForward() error {

pfo.AddHosts()

// Wait until the stop signal is received from above
// Close created downstream channels if there are stop signal from above
go func() {
<-pfo.ManualStopChan
close(downstreamStopChannel)
pfo.ServiceFwd.RemoveServicePodByPort(pfo.String(), pfo.PodPort)
pfo.removeHosts()
pfo.removeInterfaceAlias()
close(pfStopChannel)

}()

// Waiting until the pod is running
pod, err := pfo.WaitUntilPodRunning(downstreamStopChannel)
pod, err := pfo.StateWaiter.WaitUntilPodRunning(downstreamStopChannel)
if err != nil {
pfo.Stop()
return err
Expand All @@ -188,12 +191,12 @@ func (pfo *PortForwardOpts) PortForward() error {

p := pfo.Out.MakeProducer(localNamedEndPoint)

dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport}, http.MethodPost, req.URL())
dialer := pfo.PortForwardHelper.NewDialer(upgrader, &http.Client{Transport: transport}, http.MethodPost, req)
dialerWithPing := pingingDialer{
wrappedDialer: dialer,
pingPeriod: time.Second * 30,
pingStopChan: pfo.ManualStopChan,
pingTargetPodName: fmt.Sprintf("%s:%s", pfo.String(), pfo.PodPort),
pingTargetPodName: pfo.String(),
}

var address []string
Expand All @@ -203,22 +206,41 @@ func (pfo *PortForwardOpts) PortForward() error {
address = []string{"localhost"}
}

fw, err := portforward.NewOnAddresses(dialerWithPing, address, fwdPorts, pfStopChannel, make(chan struct{}), &p, &p)
fw, err := pfo.PortForwardHelper.NewOnAddresses(dialerWithPing, address, fwdPorts, pfStopChannel, make(chan struct{}), &p, &p)
if err != nil {
pfo.Stop()
return err
}

// Blocking call
if err = fw.ForwardPorts(); err != nil {
if err = pfo.PortForwardHelper.ForwardPorts(fw); err != nil {
log.Errorf("ForwardPorts error: %s", err.Error())
pfo.Stop()
pfo.shutdown()

return err
} else {
pfo.shutdown()
}

return nil
}

func (pfo PortForwardOpts) shutdown() {
pfo.ServiceFwd.RemoveServicePodByPort(pfo.String(), pfo.PodPort, true)
pfo.removeHosts()
pfo.removeInterfaceAlias()

}

func (pfo PortForwardOpts) isExternalShutdown() bool {
select {
case <-pfo.ManualStopChan:
return true
default:
return false
}
}

//// BuildHostsParams constructs the basic hostnames for the service
//// based on the PortForwardOpts configuration
//func (pfo *PortForwardOpts) BuildHostsParams() {
Expand Down Expand Up @@ -254,6 +276,11 @@ func (pfo *PortForwardOpts) addHost(host string) {
// AddHosts adds hostname entries to /etc/hosts
func (pfo *PortForwardOpts) AddHosts() {

// We must not add multiple hosts entries for different ports on the same service
if pfo.getBrothersInPodsAmount() != 1 {
return
}

pfo.HostFile.Lock()

// pfo.Service holds only the service name
Expand Down Expand Up @@ -348,7 +375,6 @@ func (pfo *PortForwardOpts) getBrothersInPodsAmount() int {
// removeHosts removes hosts /etc/hosts
// associated with a forwarded pod
func (pfo *PortForwardOpts) removeHosts() {

// We must not remove hosts entries if port-forwarding on one of the service ports is cancelled and others not
if pfo.getBrothersInPodsAmount() > 0 {
return
Expand Down Expand Up @@ -384,9 +410,9 @@ func (pfo *PortForwardOpts) removeInterfaceAlias() {
fwdnet.RemoveInterfaceAlias(pfo.LocalIp)
}

// Waiting for the pod running
func (pfo *PortForwardOpts) WaitUntilPodRunning(stopChannel <-chan struct{}) (*v1.Pod, error) {
pod, err := pfo.ClientSet.CoreV1().Pods(pfo.Namespace).Get(context.TODO(), pfo.PodName, metav1.GetOptions{})
// WaitUntilPodRunning Waiting for the pod running
func (waiter *PodStateWaiterImpl) WaitUntilPodRunning(stopChannel <-chan struct{}) (*v1.Pod, error) {
pod, err := waiter.ClientSet.CoreV1().Pods(waiter.Namespace).Get(context.TODO(), waiter.PodName, metav1.GetOptions{})
if err != nil {
return nil, err
}
Expand All @@ -395,7 +421,7 @@ func (pfo *PortForwardOpts) WaitUntilPodRunning(stopChannel <-chan struct{}) (*v
return pod, nil
}

watcher, err := pfo.ClientSet.CoreV1().Pods(pfo.Namespace).Watch(context.TODO(), metav1.SingleObject(pod.ObjectMeta))
watcher, err := waiter.ClientSet.CoreV1().Pods(waiter.Namespace).Watch(context.TODO(), metav1.SingleObject(pod.ObjectMeta))
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -430,10 +456,10 @@ func (pfo *PortForwardOpts) WaitUntilPodRunning(stopChannel <-chan struct{}) (*v
return nil, nil
}

// listen for pod is deleted
func (pfo *PortForwardOpts) ListenUntilPodDeleted(stopChannel <-chan struct{}, pod *v1.Pod) {
// ListenUntilPodDeleted listen for pod is deleted
func (waiter *PodStateWaiterImpl) ListenUntilPodDeleted(stopChannel <-chan struct{}, pod *v1.Pod) {

watcher, err := pfo.ClientSet.CoreV1().Pods(pfo.Namespace).Watch(context.TODO(), metav1.SingleObject(pod.ObjectMeta))
watcher, err := waiter.ClientSet.CoreV1().Pods(waiter.Namespace).Watch(context.TODO(), metav1.SingleObject(pod.ObjectMeta))
if err != nil {
return
}
Expand All @@ -452,8 +478,8 @@ func (pfo *PortForwardOpts) ListenUntilPodDeleted(stopChannel <-chan struct{}, p
}
switch event.Type {
case watch.Deleted:
log.Warnf("Pod %s deleted, resyncing the %s service pods.", pod.ObjectMeta.Name, pfo.ServiceFwd)
pfo.ServiceFwd.SyncPodForwards(false)
log.Warnf("Pod %s deleted, resyncing the %s service pods.", pod.ObjectMeta.Name, waiter.ServiceFwd)
waiter.ServiceFwd.SyncPodForwards(false)
return
}
}
Expand All @@ -473,5 +499,43 @@ func (pfo *PortForwardOpts) Stop() {
}

func (pfo *PortForwardOpts) String() string {
return pfo.Service + "." + pfo.PodName
return pfo.PodName
}

type PodStateWaiter interface {
WaitUntilPodRunning(stopChannel <-chan struct{}) (*v1.Pod, error)
//ListenUntilPodDeleted(stopChannel <-chan struct{}, pod *v1.Pod)
}

type PodStateWaiterImpl struct {
Namespace string
PodName string
ClientSet kubernetes.Clientset
ServiceFwd ServiceFWD
}

func (p *PortForwardHelperImpl) GetPortForwardRequest(pfo *PortForwardOpts) *restclient.Request {
// if need to set timeout, set it here.
// restClient.Client.Timeout = 32
return pfo.RESTClient.Post().
Resource("pods").
Namespace(pfo.Namespace).
Name(pfo.PodName).
SubResource("portforward")
}

func (p *PortForwardHelperImpl) NewOnAddresses(dialer httpstream.Dialer, addresses []string, ports []string, stopChan <-chan struct{}, readyChan chan struct{}, out, errOut io.Writer) (*portforward.PortForwarder, error) {
return portforward.NewOnAddresses(dialer, addresses, ports, stopChan, readyChan, out, errOut)
}

func (p *PortForwardHelperImpl) RoundTripperFor(config *restclient.Config) (http.RoundTripper, spdy.Upgrader, error) {
return spdy.RoundTripperFor(config)
}

func (p *PortForwardHelperImpl) NewDialer(upgrader spdy.Upgrader, client *http.Client, method string, pfRequest *restclient.Request) httpstream.Dialer {
return spdy.NewDialer(upgrader, client, method, pfRequest.URL())
}

func (p *PortForwardHelperImpl) ForwardPorts(forwarder *portforward.PortForwarder) error {
return forwarder.ForwardPorts()
}
Loading

0 comments on commit 938836b

Please sign in to comment.