diff --git a/cmd/kubefwd/services/services.go b/cmd/kubefwd/services/services.go index 9626a4de..8a775cf2 100644 --- a/cmd/kubefwd/services/services.go +++ b/cmd/kubefwd/services/services.go @@ -413,11 +413,10 @@ func (opts *NamespaceOpts) AddServiceHandler(obj interface{}) { NamespaceServiceLock: opts.NamespaceIPLock, Svc: svc, Headless: svc.Spec.ClusterIP == "None", - PortForwards: make(map[string][]*fwdport.PortForwardOpts), + PortForwards: make(map[string]*fwdport.PortForwardOpts), SyncDebouncer: debounce.New(5 * time.Second), DoneChannel: make(chan struct{}), PortMap: opts.ParsePortMap(mappings), - ManualStopChannel: opts.ManualStopChannel, } // Add the service to the catalog of services being forwarded diff --git a/go.mod b/go.mod index c1c70e59..23c114c6 100644 --- a/go.mod +++ b/go.mod @@ -5,13 +5,11 @@ go 1.16 require ( github.com/bep/debounce v1.2.0 github.com/elazarl/goproxy v0.0.0-20181111060418-2ce16c963a8a // indirect - github.com/golang/mock v1.4.1 github.com/onsi/ginkgo v1.12.0 // indirect github.com/onsi/gomega v1.9.0 // indirect github.com/pkg/errors v0.9.1 github.com/sirupsen/logrus v1.6.0 github.com/spf13/cobra v1.1.1 - github.com/stretchr/testify v1.6.1 github.com/txn2/txeh v1.2.1 golang.org/x/sys v0.0.0-20201112073958-5cba982894dd k8s.io/api v0.20.4 diff --git a/pkg/fwdport/fwdport.go b/pkg/fwdport/fwdport.go index 1d1efee4..a0837c2b 100644 --- a/pkg/fwdport/fwdport.go +++ b/pkg/fwdport/fwdport.go @@ -1,10 +1,8 @@ -//go:generate ${gopath}/bin/mockgen -source=fwdport.go -destination=mock_fwdport.go -package=fwdport package fwdport import ( "context" "fmt" - "io" "net" "net/http" "strconv" @@ -33,32 +31,6 @@ import ( type ServiceFWD interface { String() string SyncPodForwards(bool) - ListServicePodNames() []string - AddServicePod(pfo *PortForwardOpts) - GetServicePodPortForwards(servicePodName string) []*PortForwardOpts - RemoveServicePod(servicePodName string, stop bool) - RemoveServicePodByPort(servicePodName string, podPort string, stop bool) -} - -type PortForwardHelper interface { - GetPortForwardRequest(pfo *PortForwardOpts) *restclient.Request - NewOnAddresses(dialer httpstream.Dialer, addresses []string, ports []string, stopChan <-chan struct{}, readyChan chan struct{}, out, errOut io.Writer) (*portforward.PortForwarder, error) - ForwardPorts(forwarder *portforward.PortForwarder) error - - RoundTripperFor(config *restclient.Config) (http.RoundTripper, spdy.Upgrader, error) - NewDialer(upgrader spdy.Upgrader, client *http.Client, method string, pfRequest *restclient.Request) httpstream.Dialer -} - -type HostsOperator interface { - AddHosts() - RemoveHosts() - RemoveInterfaceAlias() -} - -type PortForwardHelperImpl struct {} - -type PortForwardOptsHostsOperator struct { - Pfo *PortForwardOpts } // HostFileWithLock @@ -80,7 +52,7 @@ type PortForwardOpts struct { Out *fwdpub.Publisher Config restclient.Config ClientSet kubernetes.Clientset - RESTClient restclient.Interface + RESTClient restclient.RESTClient Service string ServiceFwd ServiceFWD @@ -108,15 +80,11 @@ type PortForwardOpts struct { // while > 0 is an external namespace NamespaceN int - Domain string - HostsParams *HostsParams - Hosts []string - ManualStopChan chan struct{} // Send a signal on this to stop the portforwarding - DoneChan chan struct{} // Listen on this channel for when the shutdown is completed. - - StateWaiter PodStateWaiter - PortForwardHelper PortForwardHelper - HostsOperator HostsOperator + Domain string + HostsParams *HostsParams + Hosts []string + ManualStopChan chan struct{} // Send a signal on this to stop the portforwarding + DoneChan chan struct{} // Listen on this channel for when the shutdown is completed. } type pingingDialer struct { @@ -126,6 +94,10 @@ type pingingDialer struct { pingTargetPodName string } +func (p pingingDialer) stopPing() { + p.pingStopChan <- struct{}{} +} + func (p pingingDialer) Dial(protocols ...string) (httpstream.Connection, string, error) { streamConn, streamProtocolVersion, dialErr := p.wrappedDialer.Dial(protocols...) if dialErr != nil { @@ -142,7 +114,7 @@ func (p pingingDialer) Dial(protocols ...string) (httpstream.Connection, string, _ = pingStream.Reset() } case <-p.pingStopChan: - log.Debugf("Ping process stopped for %s", p.pingTargetPodName) + log.Debug(fmt.Sprintf("Ping process stopped for %s", p.pingTargetPodName)) return } } @@ -154,10 +126,10 @@ func (p pingingDialer) Dial(protocols ...string) (httpstream.Connection, string, // PortForward does the port-forward for a single pod. // It is a blocking call and will return when an error occurred // or after a cancellation signal has been received. -func PortForward(pfo *PortForwardOpts) error { +func (pfo *PortForwardOpts) PortForward() error { defer close(pfo.DoneChan) - transport, upgrader, err := pfo.PortForwardHelper.RoundTripperFor(&pfo.Config) + transport, upgrader, err := spdy.RoundTripperFor(&pfo.Config) if err != nil { return err } @@ -169,32 +141,42 @@ func PortForward(pfo *PortForwardOpts) error { } fwdPorts := []string{fmt.Sprintf("%s:%s", pfo.LocalPort, pfo.PodPort)} - req := pfo.PortForwardHelper.GetPortForwardRequest(pfo) + + // if need to set timeout, set it here. + // restClient.Client.Timeout = 32 + req := pfo.RESTClient.Post(). + Resource("pods"). + Namespace(pfo.Namespace). + Name(pfo.PodName). + SubResource("portforward") pfStopChannel := make(chan struct{}, 1) // Signal that k8s forwarding takes as input for us to signal when to stop downstreamStopChannel := make(chan struct{}) // @TODO: can this be the same as pfStopChannel? localNamedEndPoint := fmt.Sprintf("%s:%s", pfo.Service, pfo.LocalPort) - pfo.HostsOperator.AddHosts() + pfo.AddHosts() - // Close created downstream channels if there are stop signal from above + // Wait until the stop signal is received from above go func() { <-pfo.ManualStopChan close(downstreamStopChannel) + pfo.removeHosts() + pfo.removeInterfaceAlias() close(pfStopChannel) + }() // Waiting until the pod is running - pod, err := pfo.StateWaiter.WaitUntilPodRunning(downstreamStopChannel) + pod, err := pfo.WaitUntilPodRunning(downstreamStopChannel) if err != nil { - pfo.stopAndShutdown() + pfo.Stop() return err } else if pod == nil { // if err is not nil but pod is nil // mean service deleted but pod is not runnning. // No error, just return - pfo.stopAndShutdown() + pfo.Stop() return nil } @@ -204,12 +186,12 @@ func PortForward(pfo *PortForwardOpts) error { p := pfo.Out.MakeProducer(localNamedEndPoint) - dialer := pfo.PortForwardHelper.NewDialer(upgrader, &http.Client{Transport: transport}, http.MethodPost, req) + dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport}, http.MethodPost, req.URL()) dialerWithPing := pingingDialer{ wrappedDialer: dialer, pingPeriod: time.Second * 30, - pingStopChan: pfo.ManualStopChan, - pingTargetPodName: pfo.String(), + pingStopChan: make(chan struct{}), + pingTargetPodName: pfo.PodName, } var address []string @@ -219,38 +201,23 @@ func PortForward(pfo *PortForwardOpts) error { address = []string{"localhost"} } - fw, err := pfo.PortForwardHelper.NewOnAddresses(dialerWithPing, address, fwdPorts, pfStopChannel, make(chan struct{}), &p, &p) + fw, err := portforward.NewOnAddresses(dialerWithPing, address, fwdPorts, pfStopChannel, make(chan struct{}), &p, &p) if err != nil { - pfo.stopAndShutdown() + pfo.Stop() return err } // Blocking call - if err = pfo.PortForwardHelper.ForwardPorts(fw); err != nil { + if err = fw.ForwardPorts(); err != nil { log.Errorf("ForwardPorts error: %s", err.Error()) - pfo.shutdown() - + pfo.Stop() + dialerWithPing.stopPing() return err - } else { - pfo.shutdown() } return nil } -// shutdown removes port-forward from ServiceFwd and removes hosts entries if it's necessary -func (pfo PortForwardOpts) shutdown() { - pfo.ServiceFwd.RemoveServicePodByPort(pfo.String(), pfo.PodPort, true) - pfo.HostsOperator.RemoveHosts() - pfo.HostsOperator.RemoveInterfaceAlias() -} - -// stopAndShutdown is shortcut for closing all downstream channels and shutdown -func (pfo PortForwardOpts) stopAndShutdown() { - pfo.Stop() - pfo.shutdown() -} - //// BuildHostsParams constructs the basic hostnames for the service //// based on the PortForwardOpts configuration //func (pfo *PortForwardOpts) BuildHostsParams() { @@ -271,14 +238,144 @@ func (pfo PortForwardOpts) stopAndShutdown() { // pfo.HostsParams.svcServiceName = svcServiceName //} -// getBrothersInPodsAmount returns amount of port-forwards that proceeds on different ports under same pod -func (pfo *PortForwardOpts) getBrothersInPodsAmount() int { - return len(pfo.ServiceFwd.GetServicePodPortForwards(pfo.String())) +// AddHost +func (pfo *PortForwardOpts) addHost(host string) { + // add to list of hostnames for this port-forward + pfo.Hosts = append(pfo.Hosts, host) + + // remove host if it already exists in /etc/hosts + pfo.HostFile.Hosts.RemoveHost(host) + + // add host to /etc/hosts + pfo.HostFile.Hosts.AddHost(pfo.LocalIp.String(), host) } -// WaitUntilPodRunning Waiting for the pod running -func (waiter *PodStateWaiterImpl) WaitUntilPodRunning(stopChannel <-chan struct{}) (*v1.Pod, error) { - pod, err := waiter.ClientSet.CoreV1().Pods(waiter.Namespace).Get(context.TODO(), waiter.PodName, metav1.GetOptions{}) +// AddHosts adds hostname entries to /etc/hosts +func (pfo *PortForwardOpts) AddHosts() { + + pfo.HostFile.Lock() + + // pfo.Service holds only the service name + // start with the smallest allowable hostname + + // bare service name + if pfo.ClusterN == 0 && pfo.NamespaceN == 0 { + pfo.addHost(pfo.Service) + + if pfo.Domain != "" { + pfo.addHost(fmt.Sprintf( + "%s.%s", + pfo.Service, + pfo.Domain, + )) + } + } + + // alternate cluster / first namespace + if pfo.ClusterN > 0 && pfo.NamespaceN == 0 { + pfo.addHost(fmt.Sprintf( + "%s.%s", + pfo.Service, + pfo.Context, + )) + } + + // namespaced without cluster + if pfo.ClusterN == 0 { + pfo.addHost(fmt.Sprintf( + "%s.%s", + pfo.Service, + pfo.Namespace, + )) + + pfo.addHost(fmt.Sprintf( + "%s.%s.svc", + pfo.Service, + pfo.Namespace, + )) + + pfo.addHost(fmt.Sprintf( + "%s.%s.svc.cluster.local", + pfo.Service, + pfo.Namespace, + )) + + if pfo.Domain != "" { + pfo.addHost(fmt.Sprintf( + "%s.%s.svc.cluster.%s", + pfo.Service, + pfo.Namespace, + pfo.Domain, + )) + } + + } + + pfo.addHost(fmt.Sprintf( + "%s.%s.%s", + pfo.Service, + pfo.Namespace, + pfo.Context, + )) + + pfo.addHost(fmt.Sprintf( + "%s.%s.svc.%s", + pfo.Service, + pfo.Namespace, + pfo.Context, + )) + + pfo.addHost(fmt.Sprintf( + "%s.%s.svc.cluster.%s", + pfo.Service, + pfo.Namespace, + pfo.Context, + )) + + err := pfo.HostFile.Hosts.Save() + if err != nil { + log.Error("Error saving hosts file", err) + } + pfo.HostFile.Unlock() +} + +// removeHosts removes hosts /etc/hosts +// associated with a forwarded pod +func (pfo *PortForwardOpts) removeHosts() { + + // we should lock the pfo.HostFile here + // because sometimes other goroutine write the *txeh.Hosts + pfo.HostFile.Lock() + // other applications or process may have written to /etc/hosts + // since it was originally updated. + err := pfo.HostFile.Hosts.Reload() + if err != nil { + log.Error("Unable to reload /etc/hosts: " + err.Error()) + return + } + + // remove all hosts + for _, host := range pfo.Hosts { + log.Debugf("Removing host %s for pod %s in namespace %s from context %s", host, pfo.PodName, pfo.Namespace, pfo.Context) + pfo.HostFile.Hosts.RemoveHost(host) + } + + // fmt.Printf("Delete Host And Save !\r\n") + err = pfo.HostFile.Hosts.Save() + if err != nil { + log.Errorf("Error saving /etc/hosts: %s\n", err.Error()) + } + pfo.HostFile.Unlock() +} + +// removeInterfaceAlias called on stop signal to +func (pfo *PortForwardOpts) removeInterfaceAlias() { + fwdnet.RemoveInterfaceAlias(pfo.LocalIp) +} + +// Waiting for the pod running +func (pfo *PortForwardOpts) WaitUntilPodRunning(stopChannel <-chan struct{}) (*v1.Pod, error) { + pod, err := pfo.ClientSet.CoreV1().Pods(pfo.Namespace).Get(context.TODO(), pfo.PodName, metav1.GetOptions{}) if err != nil { return nil, err } @@ -287,7 +384,7 @@ func (waiter *PodStateWaiterImpl) WaitUntilPodRunning(stopChannel <-chan struct{ return pod, nil } - watcher, err := waiter.ClientSet.CoreV1().Pods(waiter.Namespace).Watch(context.TODO(), metav1.SingleObject(pod.ObjectMeta)) + watcher, err := pfo.ClientSet.CoreV1().Pods(pfo.Namespace).Watch(context.TODO(), metav1.SingleObject(pod.ObjectMeta)) if err != nil { return nil, err } @@ -322,10 +419,10 @@ func (waiter *PodStateWaiterImpl) WaitUntilPodRunning(stopChannel <-chan struct{ return nil, nil } -// ListenUntilPodDeleted listen for pod is deleted -func (waiter *PodStateWaiterImpl) ListenUntilPodDeleted(stopChannel <-chan struct{}, pod *v1.Pod) { +// listen for pod is deleted +func (pfo *PortForwardOpts) ListenUntilPodDeleted(stopChannel <-chan struct{}, pod *v1.Pod) { - watcher, err := waiter.ClientSet.CoreV1().Pods(waiter.Namespace).Watch(context.TODO(), metav1.SingleObject(pod.ObjectMeta)) + watcher, err := pfo.ClientSet.CoreV1().Pods(pfo.Namespace).Watch(context.TODO(), metav1.SingleObject(pod.ObjectMeta)) if err != nil { return } @@ -344,8 +441,8 @@ func (waiter *PodStateWaiterImpl) ListenUntilPodDeleted(stopChannel <-chan struc } switch event.Type { case watch.Deleted: - log.Warnf("Pod %s deleted, resyncing the %s service pods.", pod.ObjectMeta.Name, waiter.ServiceFwd) - waiter.ServiceFwd.SyncPodForwards(false) + log.Warnf("Pod %s deleted, resyncing the %s service pods.", pod.ObjectMeta.Name, pfo.ServiceFwd) + pfo.ServiceFwd.SyncPodForwards(false) return } } @@ -363,186 +460,3 @@ func (pfo *PortForwardOpts) Stop() { } close(pfo.ManualStopChan) } - -func (pfo *PortForwardOpts) String() string { - return pfo.PodName -} - -type PodStateWaiter interface { - WaitUntilPodRunning(stopChannel <-chan struct{}) (*v1.Pod, error) - //ListenUntilPodDeleted(stopChannel <-chan struct{}, pod *v1.Pod) -} - -type PodStateWaiterImpl struct { - Namespace string - PodName string - ClientSet kubernetes.Clientset - ServiceFwd ServiceFWD -} - -func (p PortForwardHelperImpl) GetPortForwardRequest(pfo *PortForwardOpts) *restclient.Request { - // if need to set timeout, set it here. - // restClient.Client.Timeout = 32 - return pfo.RESTClient.Post(). - Resource("pods"). - Namespace(pfo.Namespace). - Name(pfo.PodName). - SubResource("portforward") -} - -func (p PortForwardHelperImpl) NewOnAddresses(dialer httpstream.Dialer, addresses []string, ports []string, stopChan <-chan struct{}, readyChan chan struct{}, out, errOut io.Writer) (*portforward.PortForwarder, error) { - return portforward.NewOnAddresses(dialer, addresses, ports, stopChan, readyChan, out, errOut) -} - -func (p PortForwardHelperImpl) RoundTripperFor(config *restclient.Config) (http.RoundTripper, spdy.Upgrader, error) { - return spdy.RoundTripperFor(config) -} - -func (p PortForwardHelperImpl) NewDialer(upgrader spdy.Upgrader, client *http.Client, method string, pfRequest *restclient.Request) httpstream.Dialer { - return spdy.NewDialer(upgrader, client, method, pfRequest.URL()) -} - -func (p PortForwardHelperImpl) ForwardPorts(forwarder *portforward.PortForwarder) error { - return forwarder.ForwardPorts() -} - -// AddHosts adds hostname entries to /etc/hosts -func (operator PortForwardOptsHostsOperator) AddHosts() { - - // We must not add multiple hosts entries for different ports on the same service - if operator.Pfo.getBrothersInPodsAmount() != 1 { - return - } - - operator.Pfo.HostFile.Lock() - - // pfo.Service holds only the service name - // start with the smallest allowable hostname - - // bare service name - if operator.Pfo.ClusterN == 0 && operator.Pfo.NamespaceN == 0 { - operator.addHost(operator.Pfo.Service) - - if operator.Pfo.Domain != "" { - operator.addHost(fmt.Sprintf( - "%s.%s", - operator.Pfo.Service, - operator.Pfo.Domain, - )) - } - } - - // alternate cluster / first namespace - if operator.Pfo.ClusterN > 0 && operator.Pfo.NamespaceN == 0 { - operator.addHost(fmt.Sprintf( - "%s.%s", - operator.Pfo.Service, - operator.Pfo.Context, - )) - } - - // namespaced without cluster - if operator.Pfo.ClusterN == 0 { - operator.addHost(fmt.Sprintf( - "%s.%s", - operator.Pfo.Service, - operator.Pfo.Namespace, - )) - - operator.addHost(fmt.Sprintf( - "%s.%s.svc", - operator.Pfo.Service, - operator.Pfo.Namespace, - )) - - operator.addHost(fmt.Sprintf( - "%s.%s.svc.cluster.local", - operator.Pfo.Service, - operator.Pfo.Namespace, - )) - - if operator.Pfo.Domain != "" { - operator.addHost(fmt.Sprintf( - "%s.%s.svc.cluster.%s", - operator.Pfo.Service, - operator.Pfo.Namespace, - operator.Pfo.Domain, - )) - } - - } - - operator.addHost(fmt.Sprintf( - "%s.%s.%s", - operator.Pfo.Service, - operator.Pfo.Namespace, - operator.Pfo.Context, - )) - - operator.addHost(fmt.Sprintf( - "%s.%s.svc.%s", - operator.Pfo.Service, - operator.Pfo.Namespace, - operator.Pfo.Context, - )) - - operator.addHost(fmt.Sprintf( - "%s.%s.svc.cluster.%s", - operator.Pfo.Service, - operator.Pfo.Namespace, - operator.Pfo.Context, - )) - - err := operator.Pfo.HostFile.Hosts.Save() - if err != nil { - log.Error("Error saving hosts file", err) - } - operator.Pfo.HostFile.Unlock() -} - -// RemoveHosts removes hosts /etc/hosts associated with a forwarded pod -func (operator PortForwardOptsHostsOperator) RemoveHosts() { - // We must not remove hosts entries if port-forwarding on one of the service ports is cancelled and others not - if operator.Pfo.getBrothersInPodsAmount() > 0 { - return - } - - // we should lock the pfo.HostFile here - // because sometimes other goroutine write the *txeh.Hosts - operator.Pfo.HostFile.Lock() - // other applications or process may have written to /etc/hosts - // since it was originally updated. - err := operator.Pfo.HostFile.Hosts.Reload() - if err != nil { - log.Errorf("Unable to reload /etc/hosts: %s", err.Error()) - return - } - - // remove all hosts - for _, host := range operator.Pfo.Hosts { - log.Debugf("Removing host %s for pod %s in namespace %s from context %s", host, operator.Pfo.PodName, operator.Pfo.Namespace, operator.Pfo.Context) - operator.Pfo.HostFile.Hosts.RemoveHost(host) - } - - // fmt.Printf("Delete Host And Save !\r\n") - err = operator.Pfo.HostFile.Hosts.Save() - if err != nil { - log.Errorf("Error saving /etc/hosts: %s\n", err.Error()) - } - operator.Pfo.HostFile.Unlock() -} - -func (operator PortForwardOptsHostsOperator) RemoveInterfaceAlias() { - fwdnet.RemoveInterfaceAlias(operator.Pfo.LocalIp) -} - -func (operator PortForwardOptsHostsOperator) addHost(host string) { - // add to list of hostnames for this port-forward - operator.Pfo.Hosts = append(operator.Pfo.Hosts, host) - - // remove host if it already exists in /etc/hosts - operator.Pfo.HostFile.Hosts.RemoveHost(host) - - // add host to /etc/hosts - operator.Pfo.HostFile.Hosts.AddHost(operator.Pfo.LocalIp.String(), host) -} \ No newline at end of file diff --git a/pkg/fwdport/fwdport_test.go b/pkg/fwdport/fwdport_test.go deleted file mode 100644 index 52cea879..00000000 --- a/pkg/fwdport/fwdport_test.go +++ /dev/null @@ -1,131 +0,0 @@ -package fwdport - -import ( - "github.com/golang/mock/gomock" - "github.com/pkg/errors" - "github.com/stretchr/testify/assert" - "github.com/txn2/kubefwd/pkg/fwdpub" - v1 "k8s.io/api/core/v1" - "testing" -) - -var ( - podName = "test-pod-name" - namespace = "test-namespace" - serviceName = "test-service-name" -) - -func TestPortForward_RemovesItselfFromServiceFwd_AfterPortForwardErr(t *testing.T) { - ctrl := gomock.NewController(t) - defer ctrl.Finish() - - svcFwd := NewMockServiceFWD(ctrl) - waiter := NewMockPodStateWaiter(ctrl) - hostsOperator := NewMockHostsOperator(ctrl) - - pfHelper := NewMockPortForwardHelper(ctrl) - pfo := &PortForwardOpts{ - Out: &fwdpub.Publisher{ - PublisherName: "Services", - Output: false, - }, - Service: serviceName, - ServiceFwd: svcFwd, - PodName: podName, - PodPort: "8080", - HostFile: nil, - LocalPort: "8080", - Namespace: namespace, - ManualStopChan: make(chan struct{}), - DoneChan: make(chan struct{}), - StateWaiter: waiter, - PortForwardHelper: pfHelper, - HostsOperator: hostsOperator, - } - pfErr := errors.New("pf error") - - pfHelper.EXPECT().RoundTripperFor(gomock.Any()).Return(nil, nil, nil) - pfHelper.EXPECT().GetPortForwardRequest(gomock.Any()).Return(nil) - hostsOperator.EXPECT().AddHosts().Times(1) - waiter.EXPECT().WaitUntilPodRunning(gomock.Any()).Return(&v1.Pod{Status: v1.PodStatus{Phase: v1.PodRunning}}, nil) - pfHelper.EXPECT().NewDialer(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil) - pfHelper.EXPECT(). - NewOnAddresses(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). - Return(nil, nil) - - pfHelper.EXPECT().ForwardPorts(gomock.Any()).Return(pfErr) - - svcFwd.EXPECT().RemoveServicePodByPort(gomock.Eq(pfo.String()), gomock.Eq(pfo.PodPort), gomock.Eq(true)) - hostsOperator.EXPECT().RemoveHosts().Times(1) - hostsOperator.EXPECT().RemoveInterfaceAlias().Times(1) - - err := PortForward(pfo) - assert.NotNil(t, err) - assert.Equal(t, pfErr, err) - - <-pfo.DoneChan - assertChannelsClosed(t, - assertableChannel{ch: pfo.DoneChan, name: "DoneChan"}, - ) -} - -func TestPortForward_OnlyClosesDownstreamChannels_WhenErrorOnWaitUntilPodRunning(t *testing.T) { - ctrl := gomock.NewController(t) - defer ctrl.Finish() - - svcFwd := NewMockServiceFWD(ctrl) - waiter := NewMockPodStateWaiter(ctrl) - hostsOperator := NewMockHostsOperator(ctrl) - - pfHelper := NewMockPortForwardHelper(ctrl) - pfo := &PortForwardOpts{ - Out: &fwdpub.Publisher{ - PublisherName: "Services", - Output: false, - }, - Service: serviceName, - ServiceFwd: svcFwd, - PodName: podName, - PodPort: "8080", - HostFile: nil, - LocalPort: "8080", - Namespace: namespace, - ManualStopChan: make(chan struct{}), - DoneChan: make(chan struct{}), - StateWaiter: waiter, - PortForwardHelper: pfHelper, - HostsOperator: hostsOperator, - } - - untilPodRunningErr := errors.New("for example, bad credentials error from clientset") - - pfHelper.EXPECT().RoundTripperFor(gomock.Any()).Return(nil, nil, nil) - pfHelper.EXPECT().GetPortForwardRequest(gomock.Any()).Return(nil) - hostsOperator.EXPECT().AddHosts().Times(1) - waiter.EXPECT().WaitUntilPodRunning(gomock.Any()).Return(nil, untilPodRunningErr) - svcFwd.EXPECT().RemoveServicePodByPort(gomock.Eq(pfo.String()), gomock.Eq(pfo.PodPort), gomock.Eq(true)) - hostsOperator.EXPECT().RemoveHosts().Times(1) - hostsOperator.EXPECT().RemoveInterfaceAlias().Times(1) - - err := PortForward(pfo) - assert.NotNil(t, err) - assert.Equal(t, untilPodRunningErr, err) - - <-pfo.DoneChan - assertChannelsClosed(t, - assertableChannel{ch: pfo.DoneChan, name: "DoneChan"}, - assertableChannel{ch: pfo.ManualStopChan, name: "ManualStopChan"}, - ) -} - -func assertChannelsClosed(t *testing.T, channels ...assertableChannel) { - for _, assertableCh := range channels { - _, open := <-assertableCh.ch - assert.False(t, open, "%s must be closed", assertableCh.name) - } -} - -type assertableChannel struct { - ch chan struct{} - name string -} \ No newline at end of file diff --git a/pkg/fwdport/mock_fwdport.go b/pkg/fwdport/mock_fwdport.go deleted file mode 100644 index fc715eb4..00000000 --- a/pkg/fwdport/mock_fwdport.go +++ /dev/null @@ -1,323 +0,0 @@ -// Code generated by MockGen. DO NOT EDIT. -// Source: fwdport.go - -// Package fwdport is a generated GoMock package. -package fwdport - -import ( - gomock "github.com/golang/mock/gomock" - io "io" - v1 "k8s.io/api/core/v1" - httpstream "k8s.io/apimachinery/pkg/util/httpstream" - rest "k8s.io/client-go/rest" - portforward "k8s.io/client-go/tools/portforward" - spdy "k8s.io/client-go/transport/spdy" - http "net/http" - reflect "reflect" -) - -// MockServiceFWD is a mock of ServiceFWD interface -type MockServiceFWD struct { - ctrl *gomock.Controller - recorder *MockServiceFWDMockRecorder -} - -// MockServiceFWDMockRecorder is the mock recorder for MockServiceFWD -type MockServiceFWDMockRecorder struct { - mock *MockServiceFWD -} - -// NewMockServiceFWD creates a new mock instance -func NewMockServiceFWD(ctrl *gomock.Controller) *MockServiceFWD { - mock := &MockServiceFWD{ctrl: ctrl} - mock.recorder = &MockServiceFWDMockRecorder{mock} - return mock -} - -// EXPECT returns an object that allows the caller to indicate expected use -func (m *MockServiceFWD) EXPECT() *MockServiceFWDMockRecorder { - return m.recorder -} - -// String mocks base method -func (m *MockServiceFWD) String() string { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "String") - ret0, _ := ret[0].(string) - return ret0 -} - -// String indicates an expected call of String -func (mr *MockServiceFWDMockRecorder) String() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "String", reflect.TypeOf((*MockServiceFWD)(nil).String)) -} - -// SyncPodForwards mocks base method -func (m *MockServiceFWD) SyncPodForwards(arg0 bool) { - m.ctrl.T.Helper() - m.ctrl.Call(m, "SyncPodForwards", arg0) -} - -// SyncPodForwards indicates an expected call of SyncPodForwards -func (mr *MockServiceFWDMockRecorder) SyncPodForwards(arg0 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SyncPodForwards", reflect.TypeOf((*MockServiceFWD)(nil).SyncPodForwards), arg0) -} - -// ListServicePodNames mocks base method -func (m *MockServiceFWD) ListServicePodNames() []string { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ListServicePodNames") - ret0, _ := ret[0].([]string) - return ret0 -} - -// ListServicePodNames indicates an expected call of ListServicePodNames -func (mr *MockServiceFWDMockRecorder) ListServicePodNames() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListServicePodNames", reflect.TypeOf((*MockServiceFWD)(nil).ListServicePodNames)) -} - -// AddServicePod mocks base method -func (m *MockServiceFWD) AddServicePod(pfo *PortForwardOpts) { - m.ctrl.T.Helper() - m.ctrl.Call(m, "AddServicePod", pfo) -} - -// AddServicePod indicates an expected call of AddServicePod -func (mr *MockServiceFWDMockRecorder) AddServicePod(pfo interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddServicePod", reflect.TypeOf((*MockServiceFWD)(nil).AddServicePod), pfo) -} - -// GetServicePodPortForwards mocks base method -func (m *MockServiceFWD) GetServicePodPortForwards(servicePodName string) []*PortForwardOpts { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetServicePodPortForwards", servicePodName) - ret0, _ := ret[0].([]*PortForwardOpts) - return ret0 -} - -// GetServicePodPortForwards indicates an expected call of GetServicePodPortForwards -func (mr *MockServiceFWDMockRecorder) GetServicePodPortForwards(servicePodName interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetServicePodPortForwards", reflect.TypeOf((*MockServiceFWD)(nil).GetServicePodPortForwards), servicePodName) -} - -// RemoveServicePod mocks base method -func (m *MockServiceFWD) RemoveServicePod(servicePodName string, stop bool) { - m.ctrl.T.Helper() - m.ctrl.Call(m, "RemoveServicePod", servicePodName, stop) -} - -// RemoveServicePod indicates an expected call of RemoveServicePod -func (mr *MockServiceFWDMockRecorder) RemoveServicePod(servicePodName, stop interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RemoveServicePod", reflect.TypeOf((*MockServiceFWD)(nil).RemoveServicePod), servicePodName, stop) -} - -// RemoveServicePodByPort mocks base method -func (m *MockServiceFWD) RemoveServicePodByPort(servicePodName, podPort string, stop bool) { - m.ctrl.T.Helper() - m.ctrl.Call(m, "RemoveServicePodByPort", servicePodName, podPort, stop) -} - -// RemoveServicePodByPort indicates an expected call of RemoveServicePodByPort -func (mr *MockServiceFWDMockRecorder) RemoveServicePodByPort(servicePodName, podPort, stop interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RemoveServicePodByPort", reflect.TypeOf((*MockServiceFWD)(nil).RemoveServicePodByPort), servicePodName, podPort, stop) -} - -// MockPortForwardHelper is a mock of PortForwardHelper interface -type MockPortForwardHelper struct { - ctrl *gomock.Controller - recorder *MockPortForwardHelperMockRecorder -} - -// MockPortForwardHelperMockRecorder is the mock recorder for MockPortForwardHelper -type MockPortForwardHelperMockRecorder struct { - mock *MockPortForwardHelper -} - -// NewMockPortForwardHelper creates a new mock instance -func NewMockPortForwardHelper(ctrl *gomock.Controller) *MockPortForwardHelper { - mock := &MockPortForwardHelper{ctrl: ctrl} - mock.recorder = &MockPortForwardHelperMockRecorder{mock} - return mock -} - -// EXPECT returns an object that allows the caller to indicate expected use -func (m *MockPortForwardHelper) EXPECT() *MockPortForwardHelperMockRecorder { - return m.recorder -} - -// GetPortForwardRequest mocks base method -func (m *MockPortForwardHelper) GetPortForwardRequest(pfo *PortForwardOpts) *rest.Request { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetPortForwardRequest", pfo) - ret0, _ := ret[0].(*rest.Request) - return ret0 -} - -// GetPortForwardRequest indicates an expected call of GetPortForwardRequest -func (mr *MockPortForwardHelperMockRecorder) GetPortForwardRequest(pfo interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetPortForwardRequest", reflect.TypeOf((*MockPortForwardHelper)(nil).GetPortForwardRequest), pfo) -} - -// NewOnAddresses mocks base method -func (m *MockPortForwardHelper) NewOnAddresses(dialer httpstream.Dialer, addresses, ports []string, stopChan <-chan struct{}, readyChan chan struct{}, out, errOut io.Writer) (*portforward.PortForwarder, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "NewOnAddresses", dialer, addresses, ports, stopChan, readyChan, out, errOut) - ret0, _ := ret[0].(*portforward.PortForwarder) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// NewOnAddresses indicates an expected call of NewOnAddresses -func (mr *MockPortForwardHelperMockRecorder) NewOnAddresses(dialer, addresses, ports, stopChan, readyChan, out, errOut interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NewOnAddresses", reflect.TypeOf((*MockPortForwardHelper)(nil).NewOnAddresses), dialer, addresses, ports, stopChan, readyChan, out, errOut) -} - -// ForwardPorts mocks base method -func (m *MockPortForwardHelper) ForwardPorts(forwarder *portforward.PortForwarder) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ForwardPorts", forwarder) - ret0, _ := ret[0].(error) - return ret0 -} - -// ForwardPorts indicates an expected call of ForwardPorts -func (mr *MockPortForwardHelperMockRecorder) ForwardPorts(forwarder interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ForwardPorts", reflect.TypeOf((*MockPortForwardHelper)(nil).ForwardPorts), forwarder) -} - -// RoundTripperFor mocks base method -func (m *MockPortForwardHelper) RoundTripperFor(config *rest.Config) (http.RoundTripper, spdy.Upgrader, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "RoundTripperFor", config) - ret0, _ := ret[0].(http.RoundTripper) - ret1, _ := ret[1].(spdy.Upgrader) - ret2, _ := ret[2].(error) - return ret0, ret1, ret2 -} - -// RoundTripperFor indicates an expected call of RoundTripperFor -func (mr *MockPortForwardHelperMockRecorder) RoundTripperFor(config interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RoundTripperFor", reflect.TypeOf((*MockPortForwardHelper)(nil).RoundTripperFor), config) -} - -// NewDialer mocks base method -func (m *MockPortForwardHelper) NewDialer(upgrader spdy.Upgrader, client *http.Client, method string, pfRequest *rest.Request) httpstream.Dialer { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "NewDialer", upgrader, client, method, pfRequest) - ret0, _ := ret[0].(httpstream.Dialer) - return ret0 -} - -// NewDialer indicates an expected call of NewDialer -func (mr *MockPortForwardHelperMockRecorder) NewDialer(upgrader, client, method, pfRequest interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NewDialer", reflect.TypeOf((*MockPortForwardHelper)(nil).NewDialer), upgrader, client, method, pfRequest) -} - -// MockHostsOperator is a mock of HostsOperator interface -type MockHostsOperator struct { - ctrl *gomock.Controller - recorder *MockHostsOperatorMockRecorder -} - -// MockHostsOperatorMockRecorder is the mock recorder for MockHostsOperator -type MockHostsOperatorMockRecorder struct { - mock *MockHostsOperator -} - -// NewMockHostsOperator creates a new mock instance -func NewMockHostsOperator(ctrl *gomock.Controller) *MockHostsOperator { - mock := &MockHostsOperator{ctrl: ctrl} - mock.recorder = &MockHostsOperatorMockRecorder{mock} - return mock -} - -// EXPECT returns an object that allows the caller to indicate expected use -func (m *MockHostsOperator) EXPECT() *MockHostsOperatorMockRecorder { - return m.recorder -} - -// AddHosts mocks base method -func (m *MockHostsOperator) AddHosts() { - m.ctrl.T.Helper() - m.ctrl.Call(m, "AddHosts") -} - -// AddHosts indicates an expected call of AddHosts -func (mr *MockHostsOperatorMockRecorder) AddHosts() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddHosts", reflect.TypeOf((*MockHostsOperator)(nil).AddHosts)) -} - -// RemoveHosts mocks base method -func (m *MockHostsOperator) RemoveHosts() { - m.ctrl.T.Helper() - m.ctrl.Call(m, "RemoveHosts") -} - -// RemoveHosts indicates an expected call of RemoveHosts -func (mr *MockHostsOperatorMockRecorder) RemoveHosts() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RemoveHosts", reflect.TypeOf((*MockHostsOperator)(nil).RemoveHosts)) -} - -// RemoveInterfaceAlias mocks base method -func (m *MockHostsOperator) RemoveInterfaceAlias() { - m.ctrl.T.Helper() - m.ctrl.Call(m, "RemoveInterfaceAlias") -} - -// RemoveInterfaceAlias indicates an expected call of RemoveInterfaceAlias -func (mr *MockHostsOperatorMockRecorder) RemoveInterfaceAlias() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RemoveInterfaceAlias", reflect.TypeOf((*MockHostsOperator)(nil).RemoveInterfaceAlias)) -} - -// MockPodStateWaiter is a mock of PodStateWaiter interface -type MockPodStateWaiter struct { - ctrl *gomock.Controller - recorder *MockPodStateWaiterMockRecorder -} - -// MockPodStateWaiterMockRecorder is the mock recorder for MockPodStateWaiter -type MockPodStateWaiterMockRecorder struct { - mock *MockPodStateWaiter -} - -// NewMockPodStateWaiter creates a new mock instance -func NewMockPodStateWaiter(ctrl *gomock.Controller) *MockPodStateWaiter { - mock := &MockPodStateWaiter{ctrl: ctrl} - mock.recorder = &MockPodStateWaiterMockRecorder{mock} - return mock -} - -// EXPECT returns an object that allows the caller to indicate expected use -func (m *MockPodStateWaiter) EXPECT() *MockPodStateWaiterMockRecorder { - return m.recorder -} - -// WaitUntilPodRunning mocks base method -func (m *MockPodStateWaiter) WaitUntilPodRunning(stopChannel <-chan struct{}) (*v1.Pod, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "WaitUntilPodRunning", stopChannel) - ret0, _ := ret[0].(*v1.Pod) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// WaitUntilPodRunning indicates an expected call of WaitUntilPodRunning -func (mr *MockPodStateWaiterMockRecorder) WaitUntilPodRunning(stopChannel interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WaitUntilPodRunning", reflect.TypeOf((*MockPodStateWaiter)(nil).WaitUntilPodRunning), stopChannel) -} diff --git a/pkg/fwdservice/fwdservice.go b/pkg/fwdservice/fwdservice.go index 681b4f03..a7c90718 100644 --- a/pkg/fwdservice/fwdservice.go +++ b/pkg/fwdservice/fwdservice.go @@ -1,12 +1,11 @@ package fwdservice import ( - "context" "fmt" - "net" "strconv" "sync" "time" + "context" log "github.com/sirupsen/logrus" "github.com/txn2/kubefwd/pkg/fwdnet" @@ -68,10 +67,9 @@ type ServiceFWD struct { SyncDebouncer func(f func()) // A mapping of all the pods currently being forwarded. - // key = PortForwardOpts.String() - PortForwards map[string][]*fwdport.PortForwardOpts - DoneChannel chan struct{} // After shutdown is complete, this channel will be closed - ManualStopChannel chan struct{} + // key = podName + PortForwards map[string]*fwdport.PortForwardOpts + DoneChannel chan struct{} // After shutdown is complete, this channel will be closed } /** @@ -143,7 +141,7 @@ func (svcFwd *ServiceFWD) SyncPodForwards(force bool) { } } if !keep { - svcFwd.RemoveServicePod(podName, true) + svcFwd.RemoveServicePod(podName) } } @@ -179,7 +177,7 @@ func (svcFwd *ServiceFWD) SyncPodForwards(force bool) { // and the comparison will mean we will remove all pods, which is the desired behaviour. for _, podName := range svcFwd.ListServicePodNames() { if podName != podNameToKeep { - svcFwd.RemoveServicePod(podName, true) + svcFwd.RemoveServicePod(podName) } } @@ -213,9 +211,17 @@ func (svcFwd *ServiceFWD) LoopPodsToForward(pods []v1.Pod, includePodNameInHost Output: false, } + // Ip address handout is a critical section for synchronization, + // use a lock which synchronizes inside each namespace. + svcFwd.NamespaceServiceLock.Lock() + defer svcFwd.NamespaceServiceLock.Unlock() + for _, pod := range pods { - var localIp net.IP - podIpReady := false + // If pod is already configured to be forwarded, skip it + if _, found := svcFwd.PortForwards[pod.Name]; found { + continue + } + podPort := "" serviceHostName := svcFwd.Svc.Name @@ -226,6 +232,11 @@ func (svcFwd *ServiceFWD) LoopPodsToForward(pods []v1.Pod, includePodNameInHost svcName = pod.Name + "." + svcFwd.Svc.Name } + localIp, err := fwdnet.ReadyInterface(svcName, pod.Name, svcFwd.ClusterN, svcFwd.NamespaceN, podPort) + if err != nil { + log.Warnf("WARNING: error readying interface: %s\n", err) + } + // if this is not the first namespace on the // first cluster then append the namespace if svcFwd.NamespaceN > 0 { @@ -265,17 +276,32 @@ func (svcFwd *ServiceFWD) LoopPodsToForward(pods []v1.Pod, includePodNameInHost } } + log.Debugf("Resolving: %s to %s (%s)\n", + serviceHostName, + localIp.String(), + svcName, + ) + + log.Printf("Port-Forward: %s %s:%d to pod %s:%s\n", + localIp.String(), + serviceHostName, + port.Port, + pod.Name, + podPort, + ) + pfo := &fwdport.PortForwardOpts{ Out: publisher, Config: svcFwd.ClientConfig, ClientSet: svcFwd.ClientSet, - RESTClient: &svcFwd.RESTClient, + RESTClient: svcFwd.RESTClient, Context: svcFwd.Context, Namespace: pod.Namespace, Service: svcName, ServiceFwd: svcFwd, PodName: pod.Name, PodPort: podPort, + LocalIp: localIp, LocalPort: localPort, HostFile: svcFwd.Hostfile, ClusterN: svcFwd.ClusterN, @@ -284,50 +310,12 @@ func (svcFwd *ServiceFWD) LoopPodsToForward(pods []v1.Pod, includePodNameInHost ManualStopChan: make(chan struct{}), DoneChan: make(chan struct{}), - - StateWaiter: &fwdport.PodStateWaiterImpl{ - Namespace: pod.Namespace, - PodName: pod.Name, - ClientSet: svcFwd.ClientSet, - ServiceFwd: svcFwd, - }, - PortForwardHelper: &fwdport.PortForwardHelperImpl{}, - } - pfo.HostsOperator = fwdport.PortForwardOptsHostsOperator{Pfo: pfo} - - // If port-forwarding on pod under exact port is already configured, then skip it - if runningPortForwards := svcFwd.GetServicePodPortForwards(pfo.Service); len(runningPortForwards) > 0 && svcFwd.contains(runningPortForwards, pfo) { - continue - } - - if !podIpReady { // We need to Ready interface only once per pod - if localIp, err = fwdnet.ReadyInterface(svcName, pod.Name, svcFwd.ClusterN, svcFwd.NamespaceN, ""); err == nil { - podIpReady = true - } else { - log.Warnf("WARNING: error readying interface: %s\n", err) - } } - log.Debugf("Resolving: %s to %s (%s)\n", - serviceHostName, - localIp.String(), - svcName, - ) - - log.Printf("Port-Forward: %s %s:%d to pod %s:%s\n", - localIp.String(), - serviceHostName, - port.Port, - pod.Name, - podPort, - ) - - pfo.LocalIp = localIp - // Fire and forget. The stopping is done in the service.Shutdown() method. go func() { svcFwd.AddServicePod(pfo) - if err := fwdport.PortForward(pfo); err != nil { + if err := pfo.PortForward(); err != nil { select { case <-pfo.ManualStopChan: // if shutdown was given, we don't bother with the error. default: @@ -347,31 +335,17 @@ func (svcFwd *ServiceFWD) LoopPodsToForward(pods []v1.Pod, includePodNameInHost } } -// AddServicePod adds PortForwardOpts to mapping +// AddServicePod func (svcFwd *ServiceFWD) AddServicePod(pfo *fwdport.PortForwardOpts) { - log.Debugf("ServiceForward: Add %s with %s port", pfo, pfo.PodPort) svcFwd.NamespaceServiceLock.Lock() - defer svcFwd.NamespaceServiceLock.Unlock() - if existPortForwards, found := svcFwd.PortForwards[pfo.String()]; !found { - svcFwd.PortForwards[pfo.String()] = []*fwdport.PortForwardOpts{pfo} - } else { - if !svcFwd.contains(existPortForwards, pfo) { - existPortForwards = append(existPortForwards, pfo) - svcFwd.PortForwards[pfo.String()] = existPortForwards - } + ServicePod := pfo.Service + "." + pfo.PodName + if _, found := svcFwd.PortForwards[ServicePod]; !found { + svcFwd.PortForwards[ServicePod] = pfo } + svcFwd.NamespaceServiceLock.Unlock() } -func (svcFwd *ServiceFWD) contains(portForwards []*fwdport.PortForwardOpts, pfo *fwdport.PortForwardOpts) bool { - for _, pf := range portForwards { - if pfo.PodName == pf.PodName && pfo.Service == pf.Service && pfo.PodPort == pf.PodPort { - return true - } - } - return false -} - -// ListServicePodNames returns list of keys for mapping +// ListServicePodNames func (svcFwd *ServiceFWD) ListServicePodNames() []string { svcFwd.NamespaceServiceLock.Lock() currentPodNames := make([]string, 0, len(svcFwd.PortForwards)) @@ -382,59 +356,14 @@ func (svcFwd *ServiceFWD) ListServicePodNames() []string { return currentPodNames } -func (svcFwd *ServiceFWD) GetServicePodPortForwards(servicePodName string) []*fwdport.PortForwardOpts { - svcFwd.NamespaceServiceLock.Lock() - defer svcFwd.NamespaceServiceLock.Unlock() - return svcFwd.PortForwards[servicePodName] -} - -// RemoveServicePod removes all PortForwardOpts from mapping with or without port-forward stop -func (svcFwd *ServiceFWD) RemoveServicePod(servicePodName string, stop bool) { - log.Debugf("ServiceForward: Removing all pods by key=%s", servicePodName) - svcFwd.removeServicePodPort(servicePodName, svcFwd.allMatch, stop) - log.Debugf("ServiceForward: Done removing all pods by key=%s", servicePodName) -} - -func (svcFwd *ServiceFWD) allMatch(_ *fwdport.PortForwardOpts) bool { - return true -} - -// removeServicePodPort removes PortForwardOpts from mapping according to filter function with or without port-forward stop -func (svcFwd *ServiceFWD) removeServicePodPort(servicePodName string, filter func(pfo *fwdport.PortForwardOpts) bool, stop bool) { - svcFwd.NamespaceServiceLock.Lock() - if pods, found := svcFwd.PortForwards[servicePodName]; found { - stay := make([]*fwdport.PortForwardOpts, 0, len(pods)) - for _, pod := range pods { - if filter(pod) { - if stop { - defer svcFwd.stop(pod) - } - } else { - stay = append(stay, pod) - } - } - if len(stay) == 0 { - delete(svcFwd.PortForwards, servicePodName) - } else { - svcFwd.PortForwards[servicePodName] = stay - } - log.Debugf("ServiceForward: Removed %d pods by key %s", len(pods) - len(stay), servicePodName) +func (svcFwd *ServiceFWD) RemoveServicePod(servicePodName string) { + if pod, found := svcFwd.PortForwards[servicePodName]; found { + pod.Stop() + <-pod.DoneChan + svcFwd.NamespaceServiceLock.Lock() + delete(svcFwd.PortForwards, servicePodName) + svcFwd.NamespaceServiceLock.Unlock() } - svcFwd.NamespaceServiceLock.Unlock() -} - -func (svcFwd *ServiceFWD) stop(pfo *fwdport.PortForwardOpts) { - pfo.Stop() - <-pfo.DoneChan -} - -// RemoveServicePodByPort removes PortForwardOpts from mapping by specified pod port with or without port-forward stop -func (svcFwd *ServiceFWD) RemoveServicePodByPort(servicePodName string, podPort string, stop bool) { - log.Debugf("ServiceForward: Removing all pods by key=%s and port=%s", servicePodName, podPort) - svcFwd.removeServicePodPort(servicePodName, func(pfo *fwdport.PortForwardOpts) bool { - return pfo.PodPort == podPort - }, stop) - log.Debugf("ServiceForward: Done removing all pods by key=%s and port=%s", servicePodName, podPort) } func portSearch(portName string, containers []v1.Container) (string, bool) { diff --git a/pkg/fwdservice/fwdservice_test.go b/pkg/fwdservice/fwdservice_test.go deleted file mode 100644 index c7ea6212..00000000 --- a/pkg/fwdservice/fwdservice_test.go +++ /dev/null @@ -1,143 +0,0 @@ -package fwdservice - -import ( - "github.com/stretchr/testify/assert" - "github.com/txn2/kubefwd/pkg/fwdport" - "sync" - "testing" -) - -var ( - firstPodName = "test-pod-name" - firstServiceName = "test-service-name" - firstPort = "80" - secondPort = "8080" -) - -func TestServiceFWD_AddServicePod(t *testing.T) { - svcFwd := &ServiceFWD{ - NamespaceServiceLock: &sync.Mutex{}, - PortForwards: make(map[string][]*fwdport.PortForwardOpts), - } - - pfOnFirstPort := &fwdport.PortForwardOpts{ - Service: firstServiceName, - ServiceFwd: svcFwd, - PodName: firstPodName, - PodPort: firstPort, - LocalPort: firstPort, - } - pfOnSecondPort := &fwdport.PortForwardOpts{ - Service: firstServiceName, - ServiceFwd: svcFwd, - PodName: firstPodName, - PodPort: secondPort, - LocalPort: secondPort, - } - - registeredPortForwards := svcFwd.GetServicePodPortForwards(pfOnFirstPort.String()) - assert.Len(t, registeredPortForwards, 0) - - svcFwd.AddServicePod(pfOnFirstPort) - - registeredPortForwards = svcFwd.GetServicePodPortForwards(pfOnFirstPort.String()) - assert.Len(t, registeredPortForwards, 1) - assert.Contains(t, registeredPortForwards, pfOnFirstPort) - - svcFwd.AddServicePod(pfOnSecondPort) - registeredPortForwards = svcFwd.GetServicePodPortForwards(pfOnFirstPort.String()) - assert.Len(t, registeredPortForwards, 2) - assert.Contains(t, registeredPortForwards, pfOnFirstPort) - assert.Contains(t, registeredPortForwards, pfOnSecondPort) -} - -func TestServiceFWD_RemoveServicePod(t *testing.T) { - svcFwd := &ServiceFWD{ - NamespaceServiceLock: &sync.Mutex{}, - PortForwards: make(map[string][]*fwdport.PortForwardOpts), - } - - pfOnFirstPort := &fwdport.PortForwardOpts{ - Service: firstServiceName, - ServiceFwd: svcFwd, - PodName: firstPodName, - PodPort: firstPort, - LocalPort: firstPort, - ManualStopChan: make(chan struct{}), - DoneChan: make(chan struct{}), - } - pfOnSecondPort := &fwdport.PortForwardOpts{ - Service: firstServiceName, - ServiceFwd: svcFwd, - PodName: firstPodName, - PodPort: secondPort, - LocalPort: secondPort, - ManualStopChan: make(chan struct{}), - DoneChan: make(chan struct{}), - } - - svcFwd.AddServicePod(pfOnFirstPort) - svcFwd.AddServicePod(pfOnSecondPort) - - registeredPortForwards := svcFwd.GetServicePodPortForwards(pfOnFirstPort.String()) - assert.Len(t, registeredPortForwards, 2) - - // Simulate PortForwardOpts.PortForward() finalization - close(pfOnFirstPort.DoneChan) - close(pfOnSecondPort.DoneChan) - - svcFwd.RemoveServicePod(pfOnFirstPort.String(), true) - - registeredPortForwards = svcFwd.GetServicePodPortForwards(pfOnFirstPort.String()) - assert.Len(t, registeredPortForwards, 0) -} - -func TestServiceFwd_RemoveServicePodByPort(t *testing.T) { - svcFwd := &ServiceFWD{ - NamespaceServiceLock: &sync.Mutex{}, - PortForwards: make(map[string][]*fwdport.PortForwardOpts), - } - - pfOnFirstPort := &fwdport.PortForwardOpts{ - Service: firstServiceName, - ServiceFwd: svcFwd, - PodName: firstPodName, - PodPort: firstPort, - LocalPort: firstPort, - ManualStopChan: make(chan struct{}), - DoneChan: make(chan struct{}), - } - pfOnSecondPort := &fwdport.PortForwardOpts{ - Service: firstServiceName, - ServiceFwd: svcFwd, - PodName: firstPodName, - PodPort: secondPort, - LocalPort: secondPort, - ManualStopChan: make(chan struct{}), - DoneChan: make(chan struct{}), - } - - svcFwd.AddServicePod(pfOnFirstPort) - svcFwd.AddServicePod(pfOnSecondPort) - - registeredPortForwards := svcFwd.GetServicePodPortForwards(pfOnFirstPort.String()) - assert.Len(t, registeredPortForwards, 2) - - // Simulate PortForwardOpts.PortForward() finalization - close(pfOnFirstPort.DoneChan) - close(pfOnSecondPort.DoneChan) - - svcFwd.RemoveServicePodByPort(pfOnFirstPort.String(), firstPort, true) - - registeredPortForwards = svcFwd.GetServicePodPortForwards(pfOnFirstPort.String()) - assert.Len(t, registeredPortForwards, 1) - assert.NotContains(t, registeredPortForwards, pfOnFirstPort) - assert.Contains(t, registeredPortForwards, pfOnSecondPort) - - svcFwd.RemoveServicePodByPort(pfOnFirstPort.String(), secondPort, true) - - registeredPortForwards = svcFwd.GetServicePodPortForwards(pfOnFirstPort.String()) - assert.Len(t, registeredPortForwards, 0) - assert.NotContains(t, registeredPortForwards, pfOnFirstPort) - assert.NotContains(t, registeredPortForwards, pfOnSecondPort) -} \ No newline at end of file diff --git a/pkg/fwdsvcregistry/fwdsvcregistry.go b/pkg/fwdsvcregistry/fwdsvcregistry.go index 6b279ff3..ebee98f4 100644 --- a/pkg/fwdsvcregistry/fwdsvcregistry.go +++ b/pkg/fwdsvcregistry/fwdsvcregistry.go @@ -133,7 +133,7 @@ func RemoveByName(name string) { podsAllDone.Add(len(activePodForwards)) for _, podName := range activePodForwards { go func(podName string) { - serviceFwd.RemoveServicePod(podName, true) + serviceFwd.RemoveServicePod(podName) podsAllDone.Done() }(podName) }