From 9c889cb6171934259d55a0d21771e594d3c68358 Mon Sep 17 00:00:00 2001 From: Craig Johnston <cjimti@gmail.com> Date: Sat, 30 May 2020 01:39:17 -0700 Subject: [PATCH 01/14] updated selector example --- cmd/kubefwd/services/services.go | 1 + 1 file changed, 1 insertion(+) diff --git a/cmd/kubefwd/services/services.go b/cmd/kubefwd/services/services.go index f9567532..11eeab2e 100644 --- a/cmd/kubefwd/services/services.go +++ b/cmd/kubefwd/services/services.go @@ -75,6 +75,7 @@ var Cmd = &cobra.Command{ Long: `Forward multiple Kubernetes services from one or more namespaces. Filter services with selector.`, Example: " kubefwd svc -n the-project\n" + " kubefwd svc -n the-project -l app=wx,component=api\n" + + " kubefwd svc -n default -l \"app in (ws, api)\"\n" + " kubefwd svc -n default -n the-project\n" + " kubefwd svc -n default -d internal.example.com\n" + " kubefwd svc -n the-project -x prod-cluster\n", From 8afdbcad5214a040234e28788646554d31d3f956 Mon Sep 17 00:00:00 2001 From: Craig Johnston <cjimti@gmail.com> Date: Thu, 4 Jun 2020 15:14:35 -0700 Subject: [PATCH 02/14] modules update --- go.sum | 1 + 1 file changed, 1 insertion(+) diff --git a/go.sum b/go.sum index a36f05f6..a44625f5 100644 --- a/go.sum +++ b/go.sum @@ -188,6 +188,7 @@ github.com/opencontainers/go-digest v1.0.0-rc1/go.mod h1:cMLVZDEM3+U2I4VmLI6N8jQ github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= github.com/peterbourgon/diskv v2.0.1+incompatible h1:UBdAOUP5p4RWqPBg048CAvpKN+vxiaj6gdUUzhl4XmI= github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU= +github.com/pingcap/errors v0.11.4 h1:lFuQV/oaUMGcD2tqt+01ROSmJs75VG1ToEOkZIZ4nE4= github.com/pingcap/errors v0.11.4/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8= github.com/pkg/errors v0.8.0 h1:WdK/asTD0HN+q6hsWO3/vpuAkAr+tw6aNJNDFFf0+qw= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= From 67188aeb944cc5975529356bff77a0b270df1d30 Mon Sep 17 00:00:00 2001 From: Craig Johnston <cj@imti.co> Date: Sat, 17 Oct 2020 16:34:54 -0700 Subject: [PATCH 03/14] host management cleanup --- pkg/fwdport/fwdport.go | 61 ++++++++++++++++++++---------------------- 1 file changed, 29 insertions(+), 32 deletions(-) diff --git a/pkg/fwdport/fwdport.go b/pkg/fwdport/fwdport.go index cbc0229a..9e1eaf5c 100644 --- a/pkg/fwdport/fwdport.go +++ b/pkg/fwdport/fwdport.go @@ -59,6 +59,7 @@ type PortForwardOpts struct { Remote bool Domain string HostsParams *HostsParams + Hosts []string ManualStopChan chan struct{} // Send a signal on this to stop the portforwarding DoneChan chan struct{} // Listen on this channel for when the shutdown is completed. } @@ -107,7 +108,7 @@ func (pfo *PortForwardOpts) PortForward() error { }() - // Waiting until the pod is runnning + // Waiting until the pod is running pod, err := pfo.WaitUntilPodRunning(downstreamStopChannel) if err != nil { pfo.Stop() @@ -165,7 +166,19 @@ func (pfo *PortForwardOpts) BuildTheHostsParams() { pfo.HostsParams.svcServiceName = svcServiceName } -// this method to add hosts obj in /etc/hosts +// AddHost +func (pfo *PortForwardOpts) addHost(host string) { + // add to list of hostnames for this port-forward + pfo.Hosts = append(pfo.Hosts, host) + + // remove host if it already exists in /etc/hosts + pfo.Hostfile.Hosts.RemoveHost(host) + + // add host to /etc/hosts + pfo.Hostfile.Hosts.AddHost(pfo.LocalIp.String(), host) +} + +// AddHosts adds hostname entries to /etc/hosts func (pfo *PortForwardOpts) AddHosts() { pfo.Hostfile.Lock() @@ -173,36 +186,30 @@ func (pfo *PortForwardOpts) AddHosts() { pfo.Hostfile.Hosts.RemoveHost(pfo.HostsParams.fullServiceName) pfo.Hostfile.Hosts.RemoveHost(pfo.HostsParams.svcServiceName) + if pfo.Domain != "" { - pfo.Hostfile.Hosts.AddHost(pfo.LocalIp.String(), pfo.Service+"."+pfo.Domain) + pfo.addHost(pfo.Service + "." + pfo.Domain) } - pfo.Hostfile.Hosts.AddHost(pfo.LocalIp.String(), pfo.Service) + pfo.addHost(pfo.Service) } else { if pfo.ShortName { if pfo.Domain != "" { - pfo.Hostfile.Hosts.RemoveHost(pfo.HostsParams.localServiceName + "." + pfo.Domain) - pfo.Hostfile.Hosts.AddHost(pfo.LocalIp.String(), pfo.HostsParams.localServiceName+"."+pfo.Domain) + pfo.addHost(pfo.HostsParams.localServiceName + "." + pfo.Domain) } - pfo.Hostfile.Hosts.RemoveHost(pfo.HostsParams.localServiceName) - pfo.Hostfile.Hosts.AddHost(pfo.LocalIp.String(), pfo.HostsParams.localServiceName) + pfo.addHost(pfo.HostsParams.localServiceName) } - pfo.Hostfile.Hosts.RemoveHost(pfo.HostsParams.fullServiceName) - pfo.Hostfile.Hosts.AddHost(pfo.LocalIp.String(), pfo.HostsParams.fullServiceName) - - pfo.Hostfile.Hosts.RemoveHost(pfo.HostsParams.svcServiceName) - pfo.Hostfile.Hosts.AddHost(pfo.LocalIp.String(), pfo.HostsParams.svcServiceName) + pfo.addHost(pfo.HostsParams.fullServiceName) + pfo.addHost(pfo.HostsParams.svcServiceName) if pfo.Domain != "" { - pfo.Hostfile.Hosts.RemoveHost(pfo.HostsParams.nsServiceName + "." + pfo.Domain) - pfo.Hostfile.Hosts.AddHost(pfo.LocalIp.String(), pfo.HostsParams.nsServiceName+"."+pfo.Domain) + pfo.addHost(pfo.HostsParams.nsServiceName + "." + pfo.Domain) } - pfo.Hostfile.Hosts.RemoveHost(pfo.HostsParams.nsServiceName) - pfo.Hostfile.Hosts.AddHost(pfo.LocalIp.String(), pfo.HostsParams.nsServiceName) - + pfo.addHost(pfo.HostsParams.nsServiceName) } + err := pfo.Hostfile.Hosts.Save() if err != nil { log.Error("Error saving hosts file", err) @@ -212,6 +219,7 @@ func (pfo *PortForwardOpts) AddHosts() { // this method to remove hosts obj in /etc/hosts func (pfo *PortForwardOpts) removeHosts() { + // we should lock the pfo.Hostfile here // because sometimes other goroutine write the *txeh.Hosts pfo.Hostfile.Lock() @@ -223,21 +231,10 @@ func (pfo *PortForwardOpts) removeHosts() { return } - if !pfo.Remote { - if pfo.Domain != "" { - // fmt.Printf("removeHost: %s\r\n", (pfo.HostsParams.localServiceName + "." + pfo.Domain)) - // fmt.Printf("removeHost: %s\r\n", (pfo.HostsParams.nsServiceName + "." + pfo.Domain)) - pfo.Hostfile.Hosts.RemoveHost(pfo.HostsParams.localServiceName + "." + pfo.Domain) - pfo.Hostfile.Hosts.RemoveHost(pfo.HostsParams.nsServiceName + "." + pfo.Domain) - } - // fmt.Printf("removeHost: %s\r\n", pfo.HostsParams.localServiceName) - // fmt.Printf("removeHost: %s\r\n", pfo.HostsParams.nsServiceName) - pfo.Hostfile.Hosts.RemoveHost(pfo.HostsParams.localServiceName) - pfo.Hostfile.Hosts.RemoveHost(pfo.HostsParams.nsServiceName) + // remove all hosts + for _, host := range pfo.Hosts { + pfo.Hostfile.Hosts.RemoveHost(host) } - // fmt.Printf("removeHost: %s\r\n", pfo.HostsParams.fullServiceName) - pfo.Hostfile.Hosts.RemoveHost(pfo.HostsParams.fullServiceName) - pfo.Hostfile.Hosts.RemoveHost(pfo.HostsParams.svcServiceName) // fmt.Printf("Delete Host And Save !\r\n") err = pfo.Hostfile.Hosts.Save() From cc42c050b1e0f184b24f77b43164a7dd65ad8499 Mon Sep 17 00:00:00 2001 From: Craig Johnston <cj@imti.co> Date: Sat, 17 Oct 2020 16:35:40 -0700 Subject: [PATCH 04/14] variable name housekeeping --- pkg/fwdservice/fwdservice.go | 171 ++++++++++++++++++----------------- 1 file changed, 89 insertions(+), 82 deletions(-) diff --git a/pkg/fwdservice/fwdservice.go b/pkg/fwdservice/fwdservice.go index 5f8e8be2..a9c3c619 100644 --- a/pkg/fwdservice/fwdservice.go +++ b/pkg/fwdservice/fwdservice.go @@ -2,11 +2,12 @@ package fwdservice import ( "fmt" - "k8s.io/apimachinery/pkg/api/errors" "strconv" "sync" "time" + "k8s.io/apimachinery/pkg/api/errors" + "github.com/txn2/kubefwd/pkg/fwdnet" "github.com/txn2/kubefwd/pkg/fwdport" "github.com/txn2/kubefwd/pkg/fwdpub" @@ -49,16 +50,16 @@ func (svcFwd *ServiceFWD) String() string { // GetPodsForService queries k8s and returns all pods backing this service // which are eligible for portforwarding; exclude some pods which are in final/failure state. -func (svcfwd *ServiceFWD) GetPodsForService() []v1.Pod { - listOpts := metav1.ListOptions{LabelSelector: svcfwd.PodLabelSelector} +func (svcFwd *ServiceFWD) GetPodsForService() []v1.Pod { + listOpts := metav1.ListOptions{LabelSelector: svcFwd.PodLabelSelector} - pods, err := svcfwd.ClientSet.CoreV1().Pods(svcfwd.Svc.Namespace).List(listOpts) + pods, err := svcFwd.ClientSet.CoreV1().Pods(svcFwd.Svc.Namespace).List(listOpts) if err != nil { if errors.IsNotFound(err) { - log.Warnf("WARNING: No Pods found for service %s: %s\n", svcfwd, err.Error()) + log.Warnf("WARNING: No Pods found for service %s: %s\n", svcFwd, err.Error()) } else { - log.Warnf("WARNING: Error in List pods for %s: %s\n", svcfwd, err.Error()) + log.Warnf("WARNING: Error in List pods for %s: %s\n", svcFwd, err.Error()) } return nil } @@ -76,25 +77,25 @@ func (svcfwd *ServiceFWD) GetPodsForService() []v1.Pod { // SyncPodForwards selects one or all pods behind a service, and invokes the forwarding setup for that or those pod(s). // It will remove pods in-mem that are no longer returned by k8s, should these not be correctly deleted. -func (svcfwd *ServiceFWD) SyncPodForwards(force bool) { +func (svcFwd *ServiceFWD) SyncPodForwards(force bool) { // When a whole set of pods gets deleted at once, they all will trigger a SyncPodForwards() call. This would hammer k8s with load needlessly. // Therefore keep a timestamp from when this was last called and only allow call if the previous one was not too recent. - if !force && time.Since(svcfwd.LastSyncedAt) < 10*time.Minute { - log.Debugf("Skipping pods refresh for %s due to rate limiting", svcfwd) + if !force && time.Since(svcFwd.LastSyncedAt) < 10*time.Minute { + log.Debugf("Skipping pods refresh for %s due to rate limiting", svcFwd) return } - defer func() { svcfwd.LastSyncedAt = time.Now() }() + defer func() { svcFwd.LastSyncedAt = time.Now() }() - k8sPods := svcfwd.GetPodsForService() + k8sPods := svcFwd.GetPodsForService() // If no pods are found currently. Will try again next resync period if len(k8sPods) == 0 { - log.Warnf("WARNING: No Running Pods returned for service %s", svcfwd) + log.Warnf("WARNING: No Running Pods returned for service %s", svcFwd) return } // Check if the pods currently being forwarded still exist in k8s and if they are not in a (pre-)running state, if not: remove them - for _, podName := range svcfwd.ListPodNames() { + for _, podName := range svcFwd.ListPodNames() { keep := false for _, pod := range k8sPods { if podName == pod.Name && (pod.Status.Phase == v1.PodPending || pod.Status.Phase == v1.PodRunning) { @@ -103,73 +104,78 @@ func (svcfwd *ServiceFWD) SyncPodForwards(force bool) { } } if !keep { - svcfwd.RemovePod(podName) + svcFwd.RemovePod(podName) } } // Set up portforwarding for one or all of these pods // normal service portforward the first pod as service name. headless service not only forward first Pod as service name, but also portforward all pods. if len(k8sPods) != 0 { - if svcfwd.Headless { - svcfwd.LoopPodsToForward([]v1.Pod{k8sPods[0]}, true) - svcfwd.LoopPodsToForward(k8sPods, false) - } else { - // Check if currently we are forwarding a pod which is good to keep using - podNameToKeep := "" - for _, podName := range svcfwd.ListPodNames() { - if podNameToKeep != "" { + + // if this is a headless service forward the first pod from the + // service name, then subsequent pods from their pod name + if svcFwd.Headless { + svcFwd.LoopPodsToForward([]v1.Pod{k8sPods[0]}, false) + svcFwd.LoopPodsToForward(k8sPods, true) + return + } + + // Check if currently we are forwarding a pod which is good to keep using + podNameToKeep := "" + for _, podName := range svcFwd.ListPodNames() { + if podNameToKeep != "" { + break + } + for _, pod := range k8sPods { + if podName == pod.Name && (pod.Status.Phase == v1.PodPending || pod.Status.Phase == v1.PodRunning) { + podNameToKeep = pod.Name break } - for _, pod := range k8sPods { - if podName == pod.Name && (pod.Status.Phase == v1.PodPending || pod.Status.Phase == v1.PodRunning) { - podNameToKeep = pod.Name - break - } - } } + } - // Stop forwarding others, should there be. In case none of the currently forwarded pods are good to keep, - // podNameToKeep will be the empty string, and the comparison will mean we will remove all pods, which is the desired behaviour. - for _, podName := range svcfwd.ListPodNames() { - if podName != podNameToKeep { - svcfwd.RemovePod(podName) - } + // Stop forwarding others, should there be. In case none of the currently forwarded pods are good to keep, + // podNameToKeep will be the empty string, and the comparison will mean we will remove all pods, which is the desired behaviour. + for _, podName := range svcFwd.ListPodNames() { + if podName != podNameToKeep { + svcFwd.RemovePod(podName) } + } - // If no good pod was being forwarded already, start one - if podNameToKeep == "" { - svcfwd.LoopPodsToForward([]v1.Pod{k8sPods[0]}, false) - } + // If no good pod was being forwarded already, start one + if podNameToKeep == "" { + svcFwd.LoopPodsToForward([]v1.Pod{k8sPods[0]}, false) } + } } // LoopPodsToForward starts the portforwarding for each pod in the given list -func (svcfwd *ServiceFWD) LoopPodsToForward(pods []v1.Pod, includePodNameInHost bool) { +func (svcFwd *ServiceFWD) LoopPodsToForward(pods []v1.Pod, includePodNameInHost bool) { publisher := &fwdpub.Publisher{ PublisherName: "Services", Output: false, } // Ip address handout is a critical section for synchronization, use a lock which synchronizes inside each namespace. - svcfwd.NamespaceIPLock.Lock() - defer svcfwd.NamespaceIPLock.Unlock() + svcFwd.NamespaceIPLock.Lock() + defer svcFwd.NamespaceIPLock.Unlock() for _, pod := range pods { // If pod is already configured to be forwarded, skip it - if _, found := svcfwd.PortForwards[pod.Name]; found { + if _, found := svcFwd.PortForwards[pod.Name]; found { continue } podPort := "" svcName := "" - localIp, dInc, err := fwdnet.ReadyInterface(127, 1, svcfwd.IpC, *svcfwd.IpD, podPort) + localIp, dInc, err := fwdnet.ReadyInterface(127, 1, svcFwd.IpC, *svcFwd.IpD, podPort) if err != nil { log.Warnf("WARNING: error readying interface: %s\n", err) } - *svcfwd.IpD = dInc + *svcFwd.IpD = dInc - for _, port := range svcfwd.Svc.Spec.Ports { + for _, port := range svcFwd.Svc.Spec.Ports { podPort = port.TargetPort.String() localPort := strconv.Itoa(int(port.Port)) @@ -181,33 +187,34 @@ func (svcfwd *ServiceFWD) LoopPodsToForward(pods []v1.Pod, includePodNameInHost } } - serviceHostName := svcfwd.Svc.Name + serviceHostName := svcFwd.Svc.Name if includePodNameInHost { serviceHostName = pod.Name + "." + serviceHostName } - svcName = serviceHostName - - if !svcfwd.ShortName { + if !svcFwd.ShortName { serviceHostName = serviceHostName + "." + pod.Namespace } - if svcfwd.Domain != "" { - serviceHostName = serviceHostName + "." + svcfwd.Domain + if svcFwd.Domain != "" { + serviceHostName = serviceHostName + "." + svcFwd.Domain } - if svcfwd.Remote { - serviceHostName = fmt.Sprintf("%s.svc.cluster.%s", serviceHostName, svcfwd.Context) + if svcFwd.Remote { + serviceHostName = fmt.Sprintf("%s.svc.cluster.%s", serviceHostName, svcFwd.Context) } + svcName = serviceHostName + log.Debugf("Resolving: %s to %s\n", - serviceHostName, + svcName, localIp.String(), ) - log.Printf("Port-Forward: %s:%d to pod %s:%s\n", - serviceHostName, + log.Printf("Port-Forward: %s %s:%d to pod %s:%s\n", + localIp.String(), + svcName, port.Port, pod.Name, podPort, @@ -215,21 +222,21 @@ func (svcfwd *ServiceFWD) LoopPodsToForward(pods []v1.Pod, includePodNameInHost pfo := &fwdport.PortForwardOpts{ Out: publisher, - Config: svcfwd.ClientConfig, - ClientSet: svcfwd.ClientSet, - RESTClient: svcfwd.RESTClient, - Context: svcfwd.Context, + Config: svcFwd.ClientConfig, + ClientSet: svcFwd.ClientSet, + RESTClient: svcFwd.RESTClient, + Context: svcFwd.Context, Namespace: pod.Namespace, Service: svcName, - ServiceFwd: svcfwd, + ServiceFwd: svcFwd, PodName: pod.Name, PodPort: podPort, LocalIp: localIp, LocalPort: localPort, - Hostfile: svcfwd.Hostfile, - ShortName: svcfwd.ShortName, - Remote: svcfwd.Remote, - Domain: svcfwd.Domain, + Hostfile: svcFwd.Hostfile, + ShortName: svcFwd.ShortName, + Remote: svcFwd.Remote, + Domain: svcFwd.Domain, ManualStopChan: make(chan struct{}), DoneChan: make(chan struct{}), @@ -237,7 +244,7 @@ func (svcfwd *ServiceFWD) LoopPodsToForward(pods []v1.Pod, includePodNameInHost // Fire and forget. The stopping is done in the service.Shutdown() method. go func() { - svcfwd.AddPod(pfo) + svcFwd.AddPod(pfo) if err := pfo.PortForward(); err != nil { select { case <-pfo.ManualStopChan: // if shutdown was given, we don't bother with the error. @@ -248,7 +255,7 @@ func (svcfwd *ServiceFWD) LoopPodsToForward(pods []v1.Pod, includePodNameInHost select { case <-pfo.ManualStopChan: // if shutdown was given, don't log a warning as it's an intented stopping. default: - log.Warnf("Stopped forwarding pod %s for %s", pfo.PodName, svcfwd) + log.Warnf("Stopped forwarding pod %s for %s", pfo.PodName, svcFwd) } } }() @@ -258,31 +265,31 @@ func (svcfwd *ServiceFWD) LoopPodsToForward(pods []v1.Pod, includePodNameInHost } } -func (svcfwd *ServiceFWD) AddPod(pfo *fwdport.PortForwardOpts) { - svcfwd.NamespaceIPLock.Lock() - if _, found := svcfwd.PortForwards[pfo.PodName]; !found { - svcfwd.PortForwards[pfo.PodName] = pfo +func (svcFwd *ServiceFWD) AddPod(pfo *fwdport.PortForwardOpts) { + svcFwd.NamespaceIPLock.Lock() + if _, found := svcFwd.PortForwards[pfo.PodName]; !found { + svcFwd.PortForwards[pfo.PodName] = pfo } - svcfwd.NamespaceIPLock.Unlock() + svcFwd.NamespaceIPLock.Unlock() } -func (svcfwd *ServiceFWD) ListPodNames() []string { - svcfwd.NamespaceIPLock.Lock() - currentPodNames := make([]string, 0, len(svcfwd.PortForwards)) - for podName := range svcfwd.PortForwards { +func (svcFwd *ServiceFWD) ListPodNames() []string { + svcFwd.NamespaceIPLock.Lock() + currentPodNames := make([]string, 0, len(svcFwd.PortForwards)) + for podName := range svcFwd.PortForwards { currentPodNames = append(currentPodNames, podName) } - svcfwd.NamespaceIPLock.Unlock() + svcFwd.NamespaceIPLock.Unlock() return currentPodNames } -func (svcfwd *ServiceFWD) RemovePod(podName string) { - if pod, found := svcfwd.PortForwards[podName]; found { +func (svcFwd *ServiceFWD) RemovePod(podName string) { + if pod, found := svcFwd.PortForwards[podName]; found { pod.Stop() <-pod.DoneChan - svcfwd.NamespaceIPLock.Lock() - delete(svcfwd.PortForwards, podName) - svcfwd.NamespaceIPLock.Unlock() + svcFwd.NamespaceIPLock.Lock() + delete(svcFwd.PortForwards, podName) + svcFwd.NamespaceIPLock.Unlock() } } From fc0537663edb4d6e8fb2e8f2d8ae1b1ee5097cbb Mon Sep 17 00:00:00 2001 From: Craig Johnston <cj@imti.co> Date: Sat, 17 Oct 2020 16:37:27 -0700 Subject: [PATCH 05/14] additional debug messages --- pkg/fwdsvcregistry/fwdsvcregistry.go | 55 +++++++++++++++------------- 1 file changed, 29 insertions(+), 26 deletions(-) diff --git a/pkg/fwdsvcregistry/fwdsvcregistry.go b/pkg/fwdsvcregistry/fwdsvcregistry.go index 793987ea..3e0f6fad 100644 --- a/pkg/fwdsvcregistry/fwdsvcregistry.go +++ b/pkg/fwdsvcregistry/fwdsvcregistry.go @@ -9,7 +9,8 @@ import ( _ "k8s.io/client-go/plugin/pkg/client/auth" ) -// ServicesRegistry is a structure to hold all of the services we need to do portforwarding for. +// ServicesRegistry is a structure to hold all of the kubernetes +// services to do port-forwarding for. type ServicesRegistry struct { mutex *sync.Mutex services map[string]*fwdservice.ServiceFWD @@ -44,8 +45,9 @@ func Done() <-chan struct{} { return ch } -// Add will add this service to the registry of services configured to do forwarding (if it wasn't already configured) and start the portforwarding process. -func Add(svcfwd *fwdservice.ServiceFWD) { +// Add will add this service to the registry of services configured to do forwarding +// (if it wasn't already configured) and start the port-forwarding process. +func Add(serviceFwd *fwdservice.ServiceFWD) { // If we are already shutting down, don't add a new service anymore. select { case <-svcRegistry.shutDownSignal: @@ -56,23 +58,25 @@ func Add(svcfwd *fwdservice.ServiceFWD) { svcRegistry.mutex.Lock() defer svcRegistry.mutex.Unlock() - if _, found := svcRegistry.services[svcfwd.String()]; !found { - svcRegistry.services[svcfwd.String()] = svcfwd - log.Debugf("Starting forwarding service %s", svcfwd) - } else { + if _, found := svcRegistry.services[serviceFwd.String()]; found { + log.Debugf("Registry: found existing service %s") return } - // Start the portforwarding - go svcfwd.SyncPodForwards(false) + svcRegistry.services[serviceFwd.String()] = serviceFwd + log.Debugf("Registry: Start forwarding service %s", serviceFwd) - // Schedule a resync every x minutes to deal with potential connection errors. + // Start port forwarding + go serviceFwd.SyncPodForwards(false) + + // Schedule a re sync every x minutes to deal with potential connection errors. + // @TODO review the need for this, if we keep it make if configurable go func() { for { select { case <-time.After(10 * time.Minute): - svcfwd.SyncPodForwards(false) - case <-svcfwd.DoneChannel: + serviceFwd.SyncPodForwards(false) + case <-serviceFwd.DoneChannel: return } } @@ -98,15 +102,21 @@ func ShutDownAll() { for name := range svcRegistry.services { RemoveByName(name) } - log.Debugf("All services have shut down") + log.Debugf("Registry: All services have shut down") } -// RemoveByName will shutdown and remove the service, identified by svcName.svcNamespace, from the inventory of services, if it was currently being configured to do forwarding. +// RemoveByName will shutdown and remove the service, identified by svcName.svcNamespace, +// from the inventory of services, if it was currently being configured to do forwarding. +// @TODO add context to name func RemoveByName(name string) { + + log.Debugf("Registry: Removing service %s", name) + // Pop the service from the registry svcRegistry.mutex.Lock() - svcfwd, found := svcRegistry.services[name] + serviceFwd, found := svcRegistry.services[name] if !found { + log.Debugf("Registry: Did not find service %s.", name) svcRegistry.mutex.Unlock() return } @@ -114,26 +124,19 @@ func RemoveByName(name string) { svcRegistry.mutex.Unlock() // Synchronously stop the forwarding of all active pods in it - activePodForwards := svcfwd.ListPodNames() - log.Debugf("Stopping service %s with %d portforward(s)", svcfwd, len(activePodForwards)) + activePodForwards := serviceFwd.ListPodNames() + log.Debugf("Registry: Stopping service %s with %d port-forward(s)", serviceFwd, len(activePodForwards)) podsAllDone := &sync.WaitGroup{} podsAllDone.Add(len(activePodForwards)) for _, podName := range activePodForwards { go func(podName string) { - svcfwd.RemovePod(podName) + serviceFwd.RemovePod(podName) podsAllDone.Done() }(podName) } podsAllDone.Wait() // Signal that the service has shut down - close(svcfwd.DoneChannel) -} - -// GetByName returns the ServiceFWD object, if it currently being forwarded. -func GetByName(name string) *fwdservice.ServiceFWD { - svcRegistry.mutex.Lock() - defer svcRegistry.mutex.Unlock() - return svcRegistry.services[name] + close(serviceFwd.DoneChannel) } From e5be8e357e1609144da702bdaa7a3e7bcf53974c Mon Sep 17 00:00:00 2001 From: Craig Johnston <cj@imti.co> Date: Sat, 17 Oct 2020 18:25:43 -0700 Subject: [PATCH 06/14] refactored method names for clarity --- pkg/fwdsvcregistry/fwdsvcregistry.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/fwdsvcregistry/fwdsvcregistry.go b/pkg/fwdsvcregistry/fwdsvcregistry.go index 3e0f6fad..1ad166e4 100644 --- a/pkg/fwdsvcregistry/fwdsvcregistry.go +++ b/pkg/fwdsvcregistry/fwdsvcregistry.go @@ -124,14 +124,14 @@ func RemoveByName(name string) { svcRegistry.mutex.Unlock() // Synchronously stop the forwarding of all active pods in it - activePodForwards := serviceFwd.ListPodNames() + activePodForwards := serviceFwd.ListServicePodNames() log.Debugf("Registry: Stopping service %s with %d port-forward(s)", serviceFwd, len(activePodForwards)) podsAllDone := &sync.WaitGroup{} podsAllDone.Add(len(activePodForwards)) for _, podName := range activePodForwards { go func(podName string) { - serviceFwd.RemovePod(podName) + serviceFwd.RemoveServicePod(podName) podsAllDone.Done() }(podName) } From c6bf53c9f14e1dbf5999be4ddc0d876bf6edbb23 Mon Sep 17 00:00:00 2001 From: Craig Johnston <cj@imti.co> Date: Sat, 17 Oct 2020 18:26:18 -0700 Subject: [PATCH 07/14] code documentation housekeeping --- pkg/fwdport/fwdport.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/pkg/fwdport/fwdport.go b/pkg/fwdport/fwdport.go index 9e1eaf5c..59019266 100644 --- a/pkg/fwdport/fwdport.go +++ b/pkg/fwdport/fwdport.go @@ -64,8 +64,9 @@ type PortForwardOpts struct { DoneChan chan struct{} // Listen on this channel for when the shutdown is completed. } -// PortForward does the portforward for a single pod. -// It is a blocking call and will return when an error occured of after a cancellation signal has been received. +// PortForward does the port-forward for a single pod. +// It is a blocking call and will return when an error occurred +// or after a cancellation signal has been received. func (pfo *PortForwardOpts) PortForward() error { defer close(pfo.DoneChan) @@ -233,6 +234,7 @@ func (pfo *PortForwardOpts) removeHosts() { // remove all hosts for _, host := range pfo.Hosts { + log.Debugf("REMOVING HOST %s FOR POD %s", host, pfo.PodName) pfo.Hostfile.Hosts.RemoveHost(host) } @@ -323,8 +325,8 @@ func (pfo *PortForwardOpts) ListenUntilPodDeleted(stopChannel <-chan struct{}, p } } -// Stop sends the shutdown signal to the portforwarding process. -// In case the shutdownsignal was already given before, this is a no-op. +// Stop sends the shutdown signal to the port-forwarding process. +// In case the shutdown signal was already given before, this is a no-op. func (pfo *PortForwardOpts) Stop() { select { case <-pfo.DoneChan: From 0c5475ca3fc73baa1fb8902936bcd6ef817119a0 Mon Sep 17 00:00:00 2001 From: Craig Johnston <cj@imti.co> Date: Sat, 17 Oct 2020 18:26:36 -0700 Subject: [PATCH 08/14] code documentation housekeeping --- pkg/fwdservice/fwdservice.go | 87 +++++++++++++++++++++--------------- 1 file changed, 52 insertions(+), 35 deletions(-) diff --git a/pkg/fwdservice/fwdservice.go b/pkg/fwdservice/fwdservice.go index a9c3c619..160fb843 100644 --- a/pkg/fwdservice/fwdservice.go +++ b/pkg/fwdservice/fwdservice.go @@ -20,7 +20,8 @@ import ( restclient "k8s.io/client-go/rest" ) -// Single service which we need to forward, with a reference to all the pods being forwarded for it +// ServiceFWD Single service to forward, with a reference to +// all the pods being forwarded for it type ServiceFWD struct { ClientSet *kubernetes.Clientset Context string @@ -35,21 +36,24 @@ type ServiceFWD struct { IpD *int Domain string - PodLabelSelector string // The label selector to query for matching pods. - NamespaceIPLock *sync.Mutex // Synchronization for IP handout for each portforward - Svc *v1.Service // Reference to the k8s service. - Headless bool // A headless service will forward all of the pods, while normally only a single pod is forwarded. - LastSyncedAt time.Time // When was the set of pods last synced - PortForwards map[string]*fwdport.PortForwardOpts // A mapping of all the pods currently being forwarded. key = podname - DoneChannel chan struct{} // After shutdown is complete, this channel will be closed + PodLabelSelector string // The label selector to query for matching pods. + NamespaceIPLock *sync.Mutex // Synchronization for IP handout for each portforward + Svc *v1.Service // Reference to the k8s service. + Headless bool // A headless service will forward all of the pods, while normally only a single pod is forwarded. + LastSyncedAt time.Time // When was the set of pods last synced + + // A mapping of all the pods currently being forwarded. + // key = podname + PortForwards map[string]*fwdport.PortForwardOpts + DoneChannel chan struct{} // After shutdown is complete, this channel will be closed } func (svcFwd *ServiceFWD) String() string { - return svcFwd.Svc.Name + "." + svcFwd.Namespace + return svcFwd.Svc.Name + "." + svcFwd.Namespace + "." + svcFwd.Context } // GetPodsForService queries k8s and returns all pods backing this service -// which are eligible for portforwarding; exclude some pods which are in final/failure state. +// which are eligible for port-forwarding; exclude some pods which are in final/failure state. func (svcFwd *ServiceFWD) GetPodsForService() []v1.Pod { listOpts := metav1.ListOptions{LabelSelector: svcFwd.PodLabelSelector} @@ -75,11 +79,15 @@ func (svcFwd *ServiceFWD) GetPodsForService() []v1.Pod { return podsEligible } -// SyncPodForwards selects one or all pods behind a service, and invokes the forwarding setup for that or those pod(s). -// It will remove pods in-mem that are no longer returned by k8s, should these not be correctly deleted. +// SyncPodForwards selects one or all pods behind a service, and invokes +// the forwarding setup for that or those pod(s). It will remove pods in-mem +// that are no longer returned by k8s, should these not be correctly deleted. func (svcFwd *ServiceFWD) SyncPodForwards(force bool) { - // When a whole set of pods gets deleted at once, they all will trigger a SyncPodForwards() call. This would hammer k8s with load needlessly. - // Therefore keep a timestamp from when this was last called and only allow call if the previous one was not too recent. + + // When a whole set of pods gets deleted at once, they all will trigger a + // SyncPodForwards() call. This would hammer k8s with load needlessly. + // Therefore keep a timestamp from when this was last called and only allow + // call if the previous one was not too recent. if !force && time.Since(svcFwd.LastSyncedAt) < 10*time.Minute { log.Debugf("Skipping pods refresh for %s due to rate limiting", svcFwd) return @@ -88,14 +96,15 @@ func (svcFwd *ServiceFWD) SyncPodForwards(force bool) { k8sPods := svcFwd.GetPodsForService() - // If no pods are found currently. Will try again next resync period + // If no pods are found currently. Will try again next re-sync period. if len(k8sPods) == 0 { log.Warnf("WARNING: No Running Pods returned for service %s", svcFwd) return } - // Check if the pods currently being forwarded still exist in k8s and if they are not in a (pre-)running state, if not: remove them - for _, podName := range svcFwd.ListPodNames() { + // Check if the pods currently being forwarded still exist in k8s and if + // they are not in a (pre-)running state, if not: remove them + for _, podName := range svcFwd.ListServicePodNames() { keep := false for _, pod := range k8sPods { if podName == pod.Name && (pod.Status.Phase == v1.PodPending || pod.Status.Phase == v1.PodRunning) { @@ -104,11 +113,13 @@ func (svcFwd *ServiceFWD) SyncPodForwards(force bool) { } } if !keep { - svcFwd.RemovePod(podName) + svcFwd.RemoveServicePod(podName) } } - // Set up portforwarding for one or all of these pods - // normal service portforward the first pod as service name. headless service not only forward first Pod as service name, but also portforward all pods. + + // Set up port-forwarding for one or all of these pods normal service + // port-forward the first pod as service name. headless service not only + // forward first Pod as service name, but also port-forward all pods. if len(k8sPods) != 0 { // if this is a headless service forward the first pod from the @@ -121,7 +132,7 @@ func (svcFwd *ServiceFWD) SyncPodForwards(force bool) { // Check if currently we are forwarding a pod which is good to keep using podNameToKeep := "" - for _, podName := range svcFwd.ListPodNames() { + for _, podName := range svcFwd.ListServicePodNames() { if podNameToKeep != "" { break } @@ -133,11 +144,12 @@ func (svcFwd *ServiceFWD) SyncPodForwards(force bool) { } } - // Stop forwarding others, should there be. In case none of the currently forwarded pods are good to keep, - // podNameToKeep will be the empty string, and the comparison will mean we will remove all pods, which is the desired behaviour. - for _, podName := range svcFwd.ListPodNames() { + // Stop forwarding others, should there be. In case none of the currently + // forwarded pods are good to keep, podNameToKeep will be the empty string, + // and the comparison will mean we will remove all pods, which is the desired behaviour. + for _, podName := range svcFwd.ListServicePodNames() { if podName != podNameToKeep { - svcFwd.RemovePod(podName) + svcFwd.RemoveServicePod(podName) } } @@ -149,14 +161,16 @@ func (svcFwd *ServiceFWD) SyncPodForwards(force bool) { } } -// LoopPodsToForward starts the portforwarding for each pod in the given list +// LoopPodsToForward starts the port-forwarding for each +// pod in the given list func (svcFwd *ServiceFWD) LoopPodsToForward(pods []v1.Pod, includePodNameInHost bool) { publisher := &fwdpub.Publisher{ PublisherName: "Services", Output: false, } - // Ip address handout is a critical section for synchronization, use a lock which synchronizes inside each namespace. + // Ip address handout is a critical section for synchronization, + // use a lock which synchronizes inside each namespace. svcFwd.NamespaceIPLock.Lock() defer svcFwd.NamespaceIPLock.Unlock() @@ -244,7 +258,7 @@ func (svcFwd *ServiceFWD) LoopPodsToForward(pods []v1.Pod, includePodNameInHost // Fire and forget. The stopping is done in the service.Shutdown() method. go func() { - svcFwd.AddPod(pfo) + svcFwd.AddServicePod(pfo) if err := pfo.PortForward(); err != nil { select { case <-pfo.ManualStopChan: // if shutdown was given, we don't bother with the error. @@ -265,15 +279,18 @@ func (svcFwd *ServiceFWD) LoopPodsToForward(pods []v1.Pod, includePodNameInHost } } -func (svcFwd *ServiceFWD) AddPod(pfo *fwdport.PortForwardOpts) { +// AddServicePod +func (svcFwd *ServiceFWD) AddServicePod(pfo *fwdport.PortForwardOpts) { svcFwd.NamespaceIPLock.Lock() - if _, found := svcFwd.PortForwards[pfo.PodName]; !found { - svcFwd.PortForwards[pfo.PodName] = pfo + ServicePod := pfo.Service + "." + pfo.PodName + if _, found := svcFwd.PortForwards[ServicePod]; !found { + svcFwd.PortForwards[ServicePod] = pfo } svcFwd.NamespaceIPLock.Unlock() } -func (svcFwd *ServiceFWD) ListPodNames() []string { +// ListServicePodNames +func (svcFwd *ServiceFWD) ListServicePodNames() []string { svcFwd.NamespaceIPLock.Lock() currentPodNames := make([]string, 0, len(svcFwd.PortForwards)) for podName := range svcFwd.PortForwards { @@ -283,12 +300,12 @@ func (svcFwd *ServiceFWD) ListPodNames() []string { return currentPodNames } -func (svcFwd *ServiceFWD) RemovePod(podName string) { - if pod, found := svcFwd.PortForwards[podName]; found { +func (svcFwd *ServiceFWD) RemoveServicePod(servicePodName string) { + if pod, found := svcFwd.PortForwards[servicePodName]; found { pod.Stop() <-pod.DoneChan svcFwd.NamespaceIPLock.Lock() - delete(svcFwd.PortForwards, podName) + delete(svcFwd.PortForwards, servicePodName) svcFwd.NamespaceIPLock.Unlock() } } From 7a0cdf3af6247f4109f3192aa3a2261271c1e5fb Mon Sep 17 00:00:00 2001 From: Craig Johnston <cj@imti.co> Date: Sat, 17 Oct 2020 18:33:12 -0700 Subject: [PATCH 09/14] code documentation housekeeping --- cmd/kubefwd/services/services.go | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/cmd/kubefwd/services/services.go b/cmd/kubefwd/services/services.go index d5565fa4..f8a711e7 100644 --- a/cmd/kubefwd/services/services.go +++ b/cmd/kubefwd/services/services.go @@ -48,7 +48,6 @@ import ( // cmdline arguments var namespaces []string -var regexNamespace string var contexts []string var exitOnFail bool var verbose bool @@ -119,7 +118,7 @@ func checkConnection(clientSet *kubernetes.Clientset, namespaces []string) error return nil } -func runCmd(cmd *cobra.Command, args []string) { +func runCmd(cmd *cobra.Command, _ []string) { if verbose { log.SetLevel(log.DebugLevel) @@ -151,6 +150,7 @@ Try: hostFile, err := txeh.NewHostsDefault() if err != nil { log.Fatalf("Hostfile error: %s", err.Error()) + os.Exit(1) } log.Printf("Loaded hosts file %s\n", hostFile.ReadFilePath) @@ -158,6 +158,7 @@ Try: msg, err := fwdhost.BackupHostFile(hostFile) if err != nil { log.Fatalf("Error backing up hostfile: %s\n", err.Error()) + os.Exit(1) } log.Printf("Hostfile management: %s", msg) @@ -185,6 +186,7 @@ Try: rawConfig, err := configGetter.GetClientConfig(cfgFilePath) if err != nil { log.Fatalf("Error in get rawConfig: %s\n", err.Error()) + os.Exit(1) } // labels selector to filter services @@ -361,7 +363,8 @@ func (opts *NamespaceOpts) watchServiceEvents(stopListenCh <-chan struct{}) { log.Infof("Stopped watching Service events in namespace %s", opts.Namespace) } -// AddServiceHandler is the event handler for when a new service comes in from k8s (the initial list of services will also be coming in using this event for each). +// AddServiceHandler is the event handler for when a new service comes in from k8s +// (the initial list of services will also be coming in using this event for each). func (opts *NamespaceOpts) AddServiceHandler(obj interface{}) { svc, ok := obj.(*v1.Service) if !ok { @@ -413,7 +416,7 @@ func (opts *NamespaceOpts) DeleteServiceHandler(obj interface{}) { // UpdateServiceHandler is the event handler to deal with service changes from k8s. // It currently does not do anything. -func (opts *NamespaceOpts) UpdateServiceHandler(old interface{}, new interface{}) { +func (opts *NamespaceOpts) UpdateServiceHandler(_ interface{}, new interface{}) { key, err := cache.MetaNamespaceKeyFunc(new) if err == nil { log.Printf("update service %s.", key) From d462a33a9ef79d8ea47a4c872897e9e3ac3253b7 Mon Sep 17 00:00:00 2001 From: Craig Johnston <cj@imti.co> Date: Sat, 17 Oct 2020 23:06:18 -0700 Subject: [PATCH 10/14] improved debug message --- pkg/fwdport/fwdport.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/fwdport/fwdport.go b/pkg/fwdport/fwdport.go index 59019266..7e2bc68f 100644 --- a/pkg/fwdport/fwdport.go +++ b/pkg/fwdport/fwdport.go @@ -234,7 +234,7 @@ func (pfo *PortForwardOpts) removeHosts() { // remove all hosts for _, host := range pfo.Hosts { - log.Debugf("REMOVING HOST %s FOR POD %s", host, pfo.PodName) + log.Debugf("Removing host %s for pod %s in namespace %s from context %s", host, pfo.PodName, pfo.Namespace, pfo.Context) pfo.Hostfile.Hosts.RemoveHost(host) } From a3bc4ca6c63915ddc3dede03674b887b0b9f0f07 Mon Sep 17 00:00:00 2001 From: Craig Johnston <cj@imti.co> Date: Sat, 17 Oct 2020 23:25:13 -0700 Subject: [PATCH 11/14] housekeeping --- cmd/kubefwd/services/services.go | 7 ++++--- pkg/fwdcfg/fwdcfg.go | 2 ++ pkg/fwdhost/fwdhost.go | 17 +++++++++-------- pkg/fwdnet/fwdnet.go | 15 +++++++++------ pkg/fwdport/fwdport.go | 7 ++++--- pkg/fwdpub/fwdpub.go | 3 +++ pkg/fwdservice/fwdservice.go | 6 ++---- pkg/fwdsvcregistry/fwdsvcregistry.go | 2 ++ 8 files changed, 35 insertions(+), 24 deletions(-) diff --git a/cmd/kubefwd/services/services.go b/cmd/kubefwd/services/services.go index f8a711e7..31fdcd93 100644 --- a/cmd/kubefwd/services/services.go +++ b/cmd/kubefwd/services/services.go @@ -100,16 +100,16 @@ func checkConnection(clientSet *kubernetes.Clientset, namespaces []string) error for _, namespace := range namespaces { for _, perm := range requiredPermissions { perm.Namespace = namespace - ssar := &authorizationv1.SelfSubjectAccessReview{ + var accessReview = &authorizationv1.SelfSubjectAccessReview{ Spec: authorizationv1.SelfSubjectAccessReviewSpec{ ResourceAttributes: &perm, }, } - ssar, err = clientSet.AuthorizationV1().SelfSubjectAccessReviews().Create(ssar) + accessReview, err = clientSet.AuthorizationV1().SelfSubjectAccessReviews().Create(accessReview) if err != nil { return err } - if !ssar.Status.Allowed { + if !accessReview.Status.Allowed { return fmt.Errorf("Missing RBAC permission: %v", perm) } } @@ -309,6 +309,7 @@ Try: log.Infof("Clean exit") } +// NamespaceOpts type NamespaceOpts struct { ClientSet *kubernetes.Clientset Context string diff --git a/pkg/fwdcfg/fwdcfg.go b/pkg/fwdcfg/fwdcfg.go index 7346b863..a52fbc65 100644 --- a/pkg/fwdcfg/fwdcfg.go +++ b/pkg/fwdcfg/fwdcfg.go @@ -7,10 +7,12 @@ import ( cmdutil "k8s.io/kubectl/pkg/cmd/util" ) +// ConfigGetter type ConfigGetter struct { ConfigFlag *genericclioptions.ConfigFlags } +// NewConfigGetter func NewConfigGetter() *ConfigGetter { configFlag := genericclioptions.NewConfigFlags(false) return &ConfigGetter{ diff --git a/pkg/fwdhost/fwdhost.go b/pkg/fwdhost/fwdhost.go index a696f625..4670a6eb 100644 --- a/pkg/fwdhost/fwdhost.go +++ b/pkg/fwdhost/fwdhost.go @@ -9,8 +9,9 @@ import ( "github.com/txn2/txeh" ) -// BackupHostFile will write a backup of the pre-modified hostfile to your home directory, if it does not exist already. -func BackupHostFile(hostfile *txeh.Hosts) (string, error) { +// BackupHostFile will write a backup of the pre-modified host file +// the users home directory, if it does already exist. +func BackupHostFile(hostFile *txeh.Hosts) (string, error) { homeDirLocation, err := os.UserHomeDir() if err != nil { return "", err @@ -18,24 +19,24 @@ func BackupHostFile(hostfile *txeh.Hosts) (string, error) { backupHostsPath := homeDirLocation + "/hosts.original" if _, err := os.Stat(backupHostsPath); os.IsNotExist(err) { - from, err := os.Open(hostfile.WriteFilePath) + from, err := os.Open(hostFile.WriteFilePath) if err != nil { return "", err } - defer from.Close() + defer func() { _ = from.Close() }() to, err := os.OpenFile(backupHostsPath, os.O_RDWR|os.O_CREATE, 0644) if err != nil { log.Fatal(err) } - defer to.Close() + defer func() { _ = to.Close() }() _, err = io.Copy(to, from) if err != nil { return "", err } - return fmt.Sprintf("Backing up your original hosts file %s to %s\n", hostfile.WriteFilePath, backupHostsPath), nil - } else { - return fmt.Sprintf("Original hosts backup already exists at %s\n", backupHostsPath), nil + return fmt.Sprintf("Backing up your original hosts file %s to %s\n", hostFile.WriteFilePath, backupHostsPath), nil } + + return fmt.Sprintf("Original hosts backup already exists at %s\n", backupHostsPath), nil } diff --git a/pkg/fwdnet/fwdnet.go b/pkg/fwdnet/fwdnet.go index df564512..24b58671 100644 --- a/pkg/fwdnet/fwdnet.go +++ b/pkg/fwdnet/fwdnet.go @@ -31,12 +31,12 @@ func ReadyInterface(a byte, b byte, c byte, d int, port string) (net.IP, int, er ip = net.IPv4(a, b, c, byte(i)) - iface, err := net.InterfaceByName("lo0") + networkInterface, err := net.InterfaceByName("lo0") if err != nil { return net.IP{}, i, err } - addrs, err := iface.Addrs() + addrs, err := networkInterface.Addrs() if err != nil { return net.IP{}, i, err } @@ -51,11 +51,11 @@ func ReadyInterface(a byte, b byte, c byte, d int, port string) (net.IP, int, er if err != nil { return net.IPv4(a, b, c, byte(i)), i + 1, nil } - conn.Close() + _ = conn.Close() } } - // ip is not in the list of addrs for iface + // ip is not in the list of addrs for networkInterface cmd := "ifconfig" args := []string{"lo0", "alias", ip.String(), "up"} if err := exec.Command(cmd, args...).Run(); err != nil { @@ -68,7 +68,7 @@ func ReadyInterface(a byte, b byte, c byte, d int, port string) (net.IP, int, er if err != nil { return net.IPv4(a, b, c, byte(i)), i + 1, nil } - conn.Close() + _ = conn.Close() } @@ -81,7 +81,10 @@ func RemoveInterfaceAlias(ip net.IP) { cmd := "ifconfig" args := []string{"lo0", "-alias", ip.String()} if err := exec.Command(cmd, args...).Run(); err != nil { - // suppress for now and @todo research why this would fail + // suppress for now + // @todo research alternative to ifconfig + // @todo suggest ifconfig or alternative + // @todo research libs for interface management //fmt.Println("Cannot ifconfig lo0 -alias " + ip.String() + "\r\n" + err.Error()) } } diff --git a/pkg/fwdport/fwdport.go b/pkg/fwdport/fwdport.go index 7e2bc68f..b765f527 100644 --- a/pkg/fwdport/fwdport.go +++ b/pkg/fwdport/fwdport.go @@ -11,7 +11,6 @@ import ( log "github.com/sirupsen/logrus" "github.com/txn2/kubefwd/pkg/fwdnet" "github.com/txn2/kubefwd/pkg/fwdpub" - "github.com/txn2/txeh" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -92,7 +91,7 @@ func (pfo *PortForwardOpts) PortForward() error { SubResource("portforward") pfStopChannel := make(chan struct{}, 1) // Signal that k8s forwarding takes as input for us to signal when to stop - downstreamStopChannel := make(chan struct{}) // TODO: can this be the same as pfStopChannel? + downstreamStopChannel := make(chan struct{}) // @TODO: can this be the same as pfStopChannel? localNamedEndPoint := fmt.Sprintf("%s:%s", pfo.Service, pfo.LocalPort) @@ -218,7 +217,8 @@ func (pfo *PortForwardOpts) AddHosts() { pfo.Hostfile.Unlock() } -// this method to remove hosts obj in /etc/hosts +// removeHosts removes hosts /etc/hosts +// associated with a forwarded pod func (pfo *PortForwardOpts) removeHosts() { // we should lock the pfo.Hostfile here @@ -246,6 +246,7 @@ func (pfo *PortForwardOpts) removeHosts() { pfo.Hostfile.Unlock() } +// removeInterfaceAlias called on stop signal to func (pfo *PortForwardOpts) removeInterfaceAlias() { fwdnet.RemoveInterfaceAlias(pfo.LocalIp) } diff --git a/pkg/fwdpub/fwdpub.go b/pkg/fwdpub/fwdpub.go index c8323268..39ffd6fc 100644 --- a/pkg/fwdpub/fwdpub.go +++ b/pkg/fwdpub/fwdpub.go @@ -5,17 +5,20 @@ import ( "strings" ) +// Publisher type Publisher struct { Output bool PublisherName string ProducerName string } +// MakeProducer func (p *Publisher) MakeProducer(producer string) Publisher { p.ProducerName = producer return *p } +// Write func (p *Publisher) Write(b []byte) (int, error) { outputString := string(b) outputString = strings.TrimSuffix(outputString, "\n") diff --git a/pkg/fwdservice/fwdservice.go b/pkg/fwdservice/fwdservice.go index 160fb843..811425c6 100644 --- a/pkg/fwdservice/fwdservice.go +++ b/pkg/fwdservice/fwdservice.go @@ -6,14 +6,12 @@ import ( "sync" "time" - "k8s.io/apimachinery/pkg/api/errors" - + log "github.com/sirupsen/logrus" "github.com/txn2/kubefwd/pkg/fwdnet" "github.com/txn2/kubefwd/pkg/fwdport" "github.com/txn2/kubefwd/pkg/fwdpub" - - log "github.com/sirupsen/logrus" v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" _ "k8s.io/client-go/plugin/pkg/client/auth" diff --git a/pkg/fwdsvcregistry/fwdsvcregistry.go b/pkg/fwdsvcregistry/fwdsvcregistry.go index 1ad166e4..a2249c3b 100644 --- a/pkg/fwdsvcregistry/fwdsvcregistry.go +++ b/pkg/fwdsvcregistry/fwdsvcregistry.go @@ -20,6 +20,7 @@ type ServicesRegistry struct { var svcRegistry *ServicesRegistry +// Init func Init(shutDownSignal <-chan struct{}) { svcRegistry = &ServicesRegistry{ mutex: &sync.Mutex{}, @@ -35,6 +36,7 @@ func Init(shutDownSignal <-chan struct{}) { }() } +// Done func Done() <-chan struct{} { if svcRegistry != nil { return svcRegistry.doneSignal From 27ccc498080bd86855188f92db27c75af41d0dba Mon Sep 17 00:00:00 2001 From: Craig Johnston <cj@imti.co> Date: Sun, 18 Oct 2020 21:54:14 -0700 Subject: [PATCH 12/14] improved management for multiple namespaces and multiple clusters --- cmd/kubefwd/services/services.go | 139 +++++++------ cmd/kubefwd/services/services_test.go | 284 -------------------------- go.mod | 1 + k8s/test-env/kf-a.yml | 234 +++++++++++++++++++++ k8s/test-env/kf-b.yml | 234 +++++++++++++++++++++ pkg/fwdIp/fwdIp.go | 51 +++++ pkg/fwdnet/fwdnet.go | 78 ++++--- pkg/fwdport/fwdport.go | 198 ++++++++++++------ pkg/fwdservice/fwdservice.go | 106 ++++++---- pkg/fwdsvcregistry/fwdsvcregistry.go | 2 +- 10 files changed, 833 insertions(+), 494 deletions(-) delete mode 100644 cmd/kubefwd/services/services_test.go create mode 100644 k8s/test-env/kf-a.yml create mode 100644 k8s/test-env/kf-b.yml create mode 100644 pkg/fwdIp/fwdIp.go diff --git a/cmd/kubefwd/services/services.go b/cmd/kubefwd/services/services.go index 31fdcd93..efc54c0c 100644 --- a/cmd/kubefwd/services/services.go +++ b/cmd/kubefwd/services/services.go @@ -49,7 +49,6 @@ import ( // cmdline arguments var namespaces []string var contexts []string -var exitOnFail bool var verbose bool var domain string @@ -57,7 +56,9 @@ func init() { // override error output from k8s.io/apimachinery/pkg/util/runtime utilRuntime.ErrorHandlers[0] = func(err error) { log.Errorf("Runtime error: %s", err.Error()) - fwdsvcregistry.SyncAll() + // @todo determine when a SyncAll should happen, SyncAll + // for evey error is too aggressive. + //fwdsvcregistry.SyncAll() } Cmd.Flags().StringP("kubeconfig", "c", "", "absolute path to a kubectl config file") @@ -110,7 +111,7 @@ func checkConnection(clientSet *kubernetes.Clientset, namespaces []string) error return err } if !accessReview.Status.Allowed { - return fmt.Errorf("Missing RBAC permission: %v", perm) + return fmt.Errorf("missing RBAC permission: %v", perm) } } } @@ -149,7 +150,7 @@ Try: hostFile, err := txeh.NewHostsDefault() if err != nil { - log.Fatalf("Hostfile error: %s", err.Error()) + log.Fatalf("HostFile error: %s", err.Error()) os.Exit(1) } @@ -161,7 +162,7 @@ Try: os.Exit(1) } - log.Printf("Hostfile management: %s", msg) + log.Printf("HostFile management: %s", msg) if domain != "" { log.Printf("Adding custom domain %s to all forwarded entries\n", domain) @@ -218,13 +219,6 @@ Try: } } - // ipC is the class C for the local IP address - // increment this for each cluster - // ipD is the class D for the local IP address - // increment this for each service in each cluster - ipC := 27 - ipD := 1 - stopListenCh := make(chan struct{}) // Listen for shutdown signal from user @@ -266,37 +260,40 @@ Try: if err != nil { log.Fatalf("Error connecting to k8s cluster: %s\n", err.Error()) } - log.Infof("Succesfully connected context: %v", ctx) + log.Infof("Successfully connected context: %v", ctx) // create the k8s RESTclient restClient, err := configGetter.GetRESTClient() if err != nil { log.Fatalf("Error creating k8s RestClient: %s\n", err.Error()) + os.Exit(1) } for ii, namespace := range namespaces { nsWatchesDone.Add(1) - go func(ii int, namespace string) { - // ShortName field only use short name for the first namespace and context - nameSpaceOpts := NamespaceOpts{ - ClientSet: clientSet, - Context: ctx, - Namespace: namespace, - NamespaceIPLock: &sync.Mutex{}, // For parallelization of ip handout, each namespace has its own a.b.c.* range - ListOptions: listOptions, - Hostfile: &fwdport.HostFileWithLock{Hosts: hostFile}, - ClientConfig: restConfig, - RESTClient: restClient, - ShortName: i < 1 && ii < 1, - Remote: i > 0, - IpC: byte(ipC + ii), - IpD: ipD, - Domain: domain, - ManualStopChannel: stopListenCh, - } + + nameSpaceOpts := NamespaceOpts{ + ClientSet: *clientSet, + Context: ctx, + Namespace: namespace, + + // For parallelization of ip handout, + // each cluster and namespace has its own ip range + NamespaceIPLock: &sync.Mutex{}, + ListOptions: listOptions, + HostFile: &fwdport.HostFileWithLock{Hosts: hostFile}, + ClientConfig: *restConfig, + RESTClient: *restClient, + ClusterN: i, + NamespaceN: ii, + Domain: domain, + ManualStopChannel: stopListenCh, + } + + go func(npo NamespaceOpts) { nameSpaceOpts.watchServiceEvents(stopListenCh) nsWatchesDone.Done() - }(ii, namespace) + }(nameSpaceOpts) } } @@ -311,19 +308,35 @@ Try: // NamespaceOpts type NamespaceOpts struct { - ClientSet *kubernetes.Clientset - Context string - Namespace string - NamespaceIPLock *sync.Mutex - ListOptions metav1.ListOptions - Hostfile *fwdport.HostFileWithLock - ClientConfig *restclient.Config - RESTClient *restclient.RESTClient - ShortName bool - Remote bool - IpC byte - IpD int - Domain string + NamespaceIPLock *sync.Mutex + ListOptions metav1.ListOptions + HostFile *fwdport.HostFileWithLock + + ClientSet kubernetes.Clientset + ClientConfig restclient.Config + RESTClient restclient.RESTClient + + // Context is a unique key (string) in kubectl config representing + // a user/cluster combination. Kubefwd uses context as the + // cluster name when forwarding to more than one cluster. + Context string + + // Namespace is the current Kubernetes Namespace to locate services + // and the pods that back them for port-forwarding + Namespace string + + // ClusterN is the ordinal index of the cluster (from configuration) + // cluster 0 is considered local while > 0 is remote + ClusterN int + + // NamespaceN is the ordinal index of the namespace from the + // perspective of the user. Namespace 0 is considered local + // while > 0 is an external namespace + NamespaceN int + + // Domain is specified by the user and used in place of .local + Domain string + ManualStopChannel chan struct{} } @@ -361,7 +374,7 @@ func (opts *NamespaceOpts) watchServiceEvents(stopListenCh <-chan struct{}) { // Start the informer, blocking call until we receive a stop signal controller.Run(stopListenCh) - log.Infof("Stopped watching Service events in namespace %s", opts.Namespace) + log.Infof("Stopped watching Service events in namespace %s in %s context", opts.Namespace, opts.Context) } // AddServiceHandler is the event handler for when a new service comes in from k8s @@ -381,26 +394,24 @@ func (opts *NamespaceOpts) AddServiceHandler(obj interface{}) { // Define a service to forward svcfwd := &fwdservice.ServiceFWD{ - ClientSet: opts.ClientSet, - Context: opts.Context, - Namespace: opts.Namespace, - Hostfile: opts.Hostfile, - ClientConfig: opts.ClientConfig, - RESTClient: opts.RESTClient, - ShortName: opts.ShortName, - Remote: opts.Remote, - IpC: opts.IpC, - IpD: &opts.IpD, - Domain: opts.Domain, - PodLabelSelector: selector, - NamespaceIPLock: opts.NamespaceIPLock, - Svc: svc, - Headless: svc.Spec.ClusterIP == "None", - PortForwards: make(map[string]*fwdport.PortForwardOpts), - DoneChannel: make(chan struct{}), + ClientSet: opts.ClientSet, + Context: opts.Context, + Namespace: opts.Namespace, + Hostfile: opts.HostFile, + ClientConfig: opts.ClientConfig, + RESTClient: opts.RESTClient, + NamespaceN: opts.NamespaceN, + ClusterN: opts.ClusterN, + Domain: opts.Domain, + PodLabelSelector: selector, + NamespaceServiceLock: opts.NamespaceIPLock, + Svc: svc, + Headless: svc.Spec.ClusterIP == "None", + PortForwards: make(map[string]*fwdport.PortForwardOpts), + DoneChannel: make(chan struct{}), } - // Add the service to out catalog of services being forwarded + // Add the service to the catalog of services being forwarded fwdsvcregistry.Add(svcfwd) } diff --git a/cmd/kubefwd/services/services_test.go b/cmd/kubefwd/services/services_test.go deleted file mode 100644 index e6f9ca9c..00000000 --- a/cmd/kubefwd/services/services_test.go +++ /dev/null @@ -1,284 +0,0 @@ -package services - -import ( - "net/http" - "os" - "sync" - "testing" - "time" - - "github.com/txn2/kubefwd/pkg/fwdcfg" - "github.com/txn2/kubefwd/pkg/fwdhost" - "github.com/txn2/kubefwd/pkg/fwdport" - "github.com/txn2/kubefwd/pkg/utils" - "github.com/txn2/txeh" - appsv1 "k8s.io/api/apps/v1" - apiv1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/util/intstr" - "k8s.io/client-go/kubernetes" -) - -// plese run it in root permission. -// test the main pipe line. -// test will create a nginx service/deployments and portforward it. -// then test to http get the service. -func TestMainPipe(t *testing.T) { - - // we can change the namespace here - namespace := "default" - opts := buildFwdServiceOpts(t, namespace) - - stopListenCh := make(chan struct{}) - defer close(stopListenCh) - defer deleteTestService(t, opts.ClientSet, namespace) - - go opts.StartListen(stopListenCh) - - go testFwd(t, opts.ClientSet, opts.Wg, namespace) - - time.Sleep(2 * time.Second) - opts.Wg.Wait() - -} - -// build the FwdServiceOpts struct -func buildFwdServiceOpts(t *testing.T, namespace string) *FwdServiceOpts { - - hasRoot, err := utils.CheckRoot() - - if !hasRoot { - t.Fatal("Please run test use Root") - if err != nil { - t.Fatalf("Root check failure: %s", err.Error()) - } - } - - t.Log("Start buildFwdServiceOpts test") - - hostFile, err := txeh.NewHostsDefault() - if err != nil { - t.Fatalf("Hostfile error: %s", err.Error()) - } - - _, err = fwdhost.BackupHostFile(hostFile) - if err != nil { - t.Fatalf("Error backing up hostfile: %s\n", err.Error()) - } - - // default cfgFilePath is "$HOME/.kube/config" ; - // if you want to use other kubeconfig pls change it here ; - cfgFilePath := "" - - // create a ConfigGetter - configGetter := fwdcfg.NewConfigGetter() - // build the ClientConfig - rawConfig, err := configGetter.GetClientConfig(cfgFilePath) - if err != nil { - t.Fatalf("Error in get rawConfig: %s\n", err.Error()) - } - - // ipC is the class C for the local IP address - // increment this for each cluster - // ipD is the class D for the local IP address - // increment this for each service in each cluster - ipC := 27 - ipD := 1 - - stopListenCh := make(chan struct{}) - defer close(stopListenCh) - - restConfig, err := configGetter.GetRestConfig(cfgFilePath, rawConfig.CurrentContext) - if err != nil { - t.Fatalf("Error generating REST configuration: %s\n", err.Error()) - } - - // create the k8s clientSet - clientSet, err := kubernetes.NewForConfig(restConfig) - if err != nil { - t.Fatalf("Error creating k8s clientSet: %s\n", err.Error()) - } - - // create the k8s RESTclient - restClient, err := configGetter.GetRESTClient() - if err != nil { - t.Fatalf("Error creating k8s RestClient: %s\n", err.Error()) - } - // create the test service - createTestService(t, clientSet, namespace) - - listOptions := metav1.ListOptions{ - LabelSelector: "app=kubefwd-test-nginx-service", - } - - wg := &sync.WaitGroup{} - - return &FwdServiceOpts{ - Wg: wg, - ClientSet: clientSet, - Context: rawConfig.CurrentContext, - Namespace: namespace, - ListOptions: listOptions, - Hostfile: &fwdport.HostFileWithLock{Hosts: hostFile}, - ClientConfig: restConfig, - RESTClient: restClient, - ShortName: true, - Remote: false, - IpC: byte(ipC), - IpD: ipD, - ExitOnFail: exitOnFail, - Domain: domain, - } -} - -// create a test nginx service and deployments -func createTestService(t *testing.T, clientset *kubernetes.Clientset, namespace string) { - - // create the test nginx deployements - // default namespace is "default" - - deployment := &appsv1.Deployment{ - ObjectMeta: metav1.ObjectMeta{ - Name: "kubefwd-test-nginx-deployment", - Namespace: namespace, - Labels: map[string]string{ - "app": "kubefwd-test-nginx-deployment", - }, - }, - Spec: appsv1.DeploymentSpec{ - Replicas: int32Ptr(1), - Selector: &metav1.LabelSelector{ - MatchLabels: map[string]string{ - "app": "kubefwd-test-nginx-deployment", - }, - }, - Template: apiv1.PodTemplateSpec{ - ObjectMeta: metav1.ObjectMeta{ - Labels: map[string]string{ - "app": "kubefwd-test-nginx-deployment", - }, - }, - Spec: apiv1.PodSpec{ - Containers: []apiv1.Container{ - { - Name: "nginx", - Image: "nginx:1.11.13-alpine", - Ports: []apiv1.ContainerPort{ - { - Name: "http", - Protocol: apiv1.ProtocolTCP, - ContainerPort: 80, - }, - }, - }, - }, - }, - }, - }, - } - - _, err := clientset.AppsV1().Deployments(namespace).Create(deployment) - if err != nil { - t.Fatalf("Error creating the test nginx deployment: %s\n", err.Error()) - } - - // create the test nginx service - // default namespace is "default" - service := &apiv1.Service{ - ObjectMeta: metav1.ObjectMeta{ - Name: "kubefwd-test-nginx-service", - Namespace: namespace, - }, - Spec: apiv1.ServiceSpec{ - Selector: map[string]string{ - "app": "kubefwd-test-nginx-deployment", - }, - Ports: []apiv1.ServicePort{ - { - Protocol: apiv1.ProtocolTCP, - Port: 80, - TargetPort: intstr.IntOrString{ - IntVal: 80, - }, - }, - }, - }, - } - - _, err = clientset.CoreV1().Services(namespace).Create(service) - if err != nil { - t.Fatalf("Error creating the test nginx deployment: %s\n", err.Error()) - } - t.Log("Create test nginx service and deployment success") -} - -// delete the test nginx service and deployments -func deleteTestService(t *testing.T, clientset *kubernetes.Clientset, namespace string) { - clientset.AppsV1().Deployments(namespace).Delete("kubefwd-test-nginx-deployment", &metav1.DeleteOptions{}) - clientset.CoreV1().Services(namespace).Delete("kubefwd-test-nginx-service", &metav1.DeleteOptions{}) - t.Log("Delete test nginx service and deployment success") -} - -// http get to test if the forward is success -func testFwd(t *testing.T, clientset *kubernetes.Clientset, wg *sync.WaitGroup, namespace string) { - pod := findFirstPodOfService(t, clientset, namespace) - if waitPodRunning(t, clientset, pod, namespace) { - resp, err := http.Get("http://kubefwd-test-nginx-service/") - if err != nil { - t.Fatalf("Forward Test nginx service faild, http get is err: %s", err.Error()) - } - if resp.StatusCode == 200 { - t.Log("Kubefwd PortForward Service success!") - os.Exit(0) - return - } - } - -} - -func findFirstPodOfService(t *testing.T, clientset *kubernetes.Clientset, namespace string) *apiv1.Pod { - pods, err := clientset.CoreV1().Pods(namespace).List(metav1.ListOptions{ - LabelSelector: "app=kubefwd-test-nginx-deployment", - }) - if err != nil { - t.Fatalf("Error get pod from Service, err: %s", err.Error()) - } - return &pods.Items[0] -} - -func waitPodRunning(t *testing.T, clientset *kubernetes.Clientset, pod *apiv1.Pod, namespace string) bool { - - if pod.Status.Phase == apiv1.PodRunning { - return true - } - - watcher, err := clientset.CoreV1().Pods(namespace).Watch(metav1.SingleObject(pod.ObjectMeta)) - if err != nil { - t.Fatalf("error in create pod watcher, err: %s", err.Error()) - } - RunningChannel := make(chan struct{}) - - defer close(RunningChannel) - - go func() { - defer watcher.Stop() - select { - case <-RunningChannel: - case <-time.After(time.Second * 330): - } - }() - - // watcher until the pod status is running - for { - event := <-watcher.ResultChan() - if event.Object != nil { - changedPod := event.Object.(*apiv1.Pod) - if changedPod.Status.Phase == apiv1.PodRunning { - return true - } - } - time.Sleep(time.Second * 3) - } -} - -func int32Ptr(i int32) *int32 { return &i } diff --git a/go.mod b/go.mod index 300cfdb4..3ec304b3 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/txn2/kubefwd go 1.13 require ( + github.com/davecgh/go-spew v1.1.1 github.com/elazarl/goproxy v0.0.0-20181111060418-2ce16c963a8a // indirect github.com/pingcap/errors v0.11.4 github.com/pkg/errors v0.8.1 diff --git a/k8s/test-env/kf-a.yml b/k8s/test-env/kf-a.yml new file mode 100644 index 00000000..96a0d059 --- /dev/null +++ b/k8s/test-env/kf-a.yml @@ -0,0 +1,234 @@ +apiVersion: v1 +kind: Namespace +metadata: + name: kf-a +--- +apiVersion: v1 +kind: Service +metadata: + name: ok + namespace: kf-a +spec: + selector: + app: ok + component: deployment + ports: + - protocol: "TCP" + port: 8080 + targetPort: http-web + type: ClusterIP +--- +apiVersion: v1 +kind: Service +metadata: + name: ok-headless + namespace: kf-a +spec: + selector: + app: ok + component: deployment + ports: + - protocol: "TCP" + port: 8080 + targetPort: http-web + type: ClusterIP + clusterIP: None +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: ok + namespace: kf-a + labels: + app: ok +spec: + replicas: 5 + revisionHistoryLimit: 1 + selector: + matchLabels: + app: ok + template: + metadata: + labels: + app: ok + component: deployment + spec: + containers: + - name: ok + image: txn2/ok:3.0.0 + imagePullPolicy: IfNotPresent # IfNotPresent for production + env: + - name: "MESSAGE" + value: "ok deployment in kf-a" + - name: PORT + value: "8080" + - name: NODE_NAME + valueFrom: + fieldRef: + fieldPath: spec.nodeName + - name: POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name + - name: POD_NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace + - name: POD_IP + valueFrom: + fieldRef: + fieldPath: status.podIP + - name: SERVICE_ACCOUNT + valueFrom: + fieldRef: + fieldPath: spec.serviceAccountName + ports: + - name: http-web + containerPort: 8080 + resources: + requests: + cpu: 100m + memory: 16Mi + limits: + cpu: 200m + memory: 32Mi +--- +apiVersion: v1 +kind: Service +metadata: + name: ok-ss + namespace: kf-a +spec: + selector: + app: ok-ss + component: ss + ports: + - name: http-web + protocol: "TCP" + port: 8080 + targetPort: http-web + - name: http-web2 + protocol: "TCP" + port: 8081 + targetPort: http-web2 + type: ClusterIP +--- +apiVersion: v1 +kind: Service +metadata: + name: ok-ss-headless + namespace: kf-a +spec: + selector: + app: ok-ss + component: ss + ports: + - name: http-web + protocol: "TCP" + port: 8080 + targetPort: http-web + - name: http-web2 + protocol: "TCP" + port: 8081 + targetPort: http-web2 + type: ClusterIP + clusterIP: None +--- +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: ok-ss + namespace: kf-a + labels: + app: ok-ss +spec: + replicas: 3 + revisionHistoryLimit: 1 + serviceName: ok-ss + selector: + matchLabels: + app: ok-ss + component: ss + template: + metadata: + labels: + app: ok-ss + component: ss + spec: + containers: + - name: ok + image: txn2/ok:3.0.0 + imagePullPolicy: IfNotPresent # IfNotPresent for production + env: + - name: "MESSAGE" + value: "ok ss in kf-a" + - name: PORT + value: "8080" + - name: NODE_NAME + valueFrom: + fieldRef: + fieldPath: spec.nodeName + - name: POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name + - name: POD_NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace + - name: POD_IP + valueFrom: + fieldRef: + fieldPath: status.podIP + - name: SERVICE_ACCOUNT + valueFrom: + fieldRef: + fieldPath: spec.serviceAccountName + ports: + - name: http-web + containerPort: 8080 + resources: + requests: + cpu: 100m + memory: 16Mi + limits: + cpu: 200m + memory: 32Mi + - name: ok2 + image: txn2/ok:3.0.0 + imagePullPolicy: IfNotPresent # IfNotPresent for production + env: + - name: "MESSAGE" + value: "ok2 container ss in kf-a" + - name: PORT + value: "8081" + - name: NODE_NAME + valueFrom: + fieldRef: + fieldPath: spec.nodeName + - name: POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name + - name: POD_NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace + - name: POD_IP + valueFrom: + fieldRef: + fieldPath: status.podIP + - name: SERVICE_ACCOUNT + valueFrom: + fieldRef: + fieldPath: spec.serviceAccountName + ports: + - name: http-web2 + containerPort: 8081 + resources: + requests: + cpu: 100m + memory: 16Mi + limits: + cpu: 200m + memory: 32Mi \ No newline at end of file diff --git a/k8s/test-env/kf-b.yml b/k8s/test-env/kf-b.yml new file mode 100644 index 00000000..a1286c25 --- /dev/null +++ b/k8s/test-env/kf-b.yml @@ -0,0 +1,234 @@ +apiVersion: v1 +kind: Namespace +metadata: + name: kf-b +--- +apiVersion: v1 +kind: Service +metadata: + name: ok + namespace: kf-b +spec: + selector: + app: ok + component: deployment + ports: + - protocol: "TCP" + port: 8080 + targetPort: http-web + type: ClusterIP +--- +apiVersion: v1 +kind: Service +metadata: + name: ok-headless + namespace: kf-b +spec: + selector: + app: ok + component: deployment + ports: + - protocol: "TCP" + port: 8080 + targetPort: http-web + type: ClusterIP + clusterIP: None +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: ok + namespace: kf-b + labels: + app: ok +spec: + replicas: 5 + revisionHistoryLimit: 1 + selector: + matchLabels: + app: ok + template: + metadata: + labels: + app: ok + component: deployment + spec: + containers: + - name: ok + image: txn2/ok:3.0.0 + imagePullPolicy: IfNotPresent # IfNotPresent for production + env: + - name: "MESSAGE" + value: "ok deployment in kf-b" + - name: PORT + value: "8080" + - name: NODE_NAME + valueFrom: + fieldRef: + fieldPath: spec.nodeName + - name: POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name + - name: POD_NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace + - name: POD_IP + valueFrom: + fieldRef: + fieldPath: status.podIP + - name: SERVICE_ACCOUNT + valueFrom: + fieldRef: + fieldPath: spec.serviceAccountName + ports: + - name: http-web + containerPort: 8080 + resources: + requests: + cpu: 100m + memory: 16Mi + limits: + cpu: 200m + memory: 32Mi +--- +apiVersion: v1 +kind: Service +metadata: + name: ok-ss + namespace: kf-b +spec: + selector: + app: ok-ss + component: ss + ports: + - name: http-web + protocol: "TCP" + port: 8080 + targetPort: http-web + - name: http-web2 + protocol: "TCP" + port: 8081 + targetPort: http-web2 + type: ClusterIP +--- +apiVersion: v1 +kind: Service +metadata: + name: ok-ss-headless + namespace: kf-b +spec: + selector: + app: ok-ss + component: ss + ports: + - name: http-web + protocol: "TCP" + port: 8080 + targetPort: http-web + - name: http-web2 + protocol: "TCP" + port: 8081 + targetPort: http-web2 + type: ClusterIP + clusterIP: None +--- +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: ok-ss + namespace: kf-b + labels: + app: ok-ss +spec: + replicas: 3 + revisionHistoryLimit: 1 + serviceName: ok-ss + selector: + matchLabels: + app: ok-ss + component: ss + template: + metadata: + labels: + app: ok-ss + component: ss + spec: + containers: + - name: ok + image: txn2/ok:3.0.0 + imagePullPolicy: IfNotPresent # IfNotPresent for production + env: + - name: "MESSAGE" + value: "ok ss in kf-b" + - name: PORT + value: "8080" + - name: NODE_NAME + valueFrom: + fieldRef: + fieldPath: spec.nodeName + - name: POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name + - name: POD_NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace + - name: POD_IP + valueFrom: + fieldRef: + fieldPath: status.podIP + - name: SERVICE_ACCOUNT + valueFrom: + fieldRef: + fieldPath: spec.serviceAccountName + ports: + - name: http-web + containerPort: 8080 + resources: + requests: + cpu: 100m + memory: 16Mi + limits: + cpu: 200m + memory: 32Mi + - name: ok2 + image: txn2/ok:3.0.0 + imagePullPolicy: IfNotPresent # IfNotPresent for production + env: + - name: "MESSAGE" + value: "ok2 container ss in kf-b" + - name: PORT + value: "8081" + - name: NODE_NAME + valueFrom: + fieldRef: + fieldPath: spec.nodeName + - name: POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name + - name: POD_NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace + - name: POD_IP + valueFrom: + fieldRef: + fieldPath: status.podIP + - name: SERVICE_ACCOUNT + valueFrom: + fieldRef: + fieldPath: spec.serviceAccountName + ports: + - name: http-web2 + containerPort: 8081 + resources: + requests: + cpu: 100m + memory: 16Mi + limits: + cpu: 200m + memory: 32Mi \ No newline at end of file diff --git a/pkg/fwdIp/fwdIp.go b/pkg/fwdIp/fwdIp.go new file mode 100644 index 00000000..7ae3f45e --- /dev/null +++ b/pkg/fwdIp/fwdIp.go @@ -0,0 +1,51 @@ +package fwdIp + +import ( + "fmt" + "net" + "sync" +) + +// Registry is a structure to create and hold all of the +// IP address assignments +type Registry struct { + mutex *sync.Mutex + inc map[int]map[int]int + reg map[string]net.IP +} + +var ipRegistry *Registry + +// Init +func init() { + ipRegistry = &Registry{ + mutex: &sync.Mutex{}, + inc: map[int]map[int]int{0: {0: 0}}, + reg: make(map[string]net.IP), + } +} + +func GetIp(podName string, clusterN int, NamespaceN int) (net.IP, error) { + ipRegistry.mutex.Lock() + defer ipRegistry.mutex.Unlock() + + regKey := fmt.Sprintf("%d-%d-%s", clusterN, NamespaceN, podName) + if ip, ok := ipRegistry.reg[regKey]; ok { + return ip, nil + } + + if ipRegistry.inc[clusterN] == nil { + ipRegistry.inc[clusterN] = map[int]int{0: 0} + } + + // @TODO check ranges + ip := net.IP{127, 1, 27, 1}.To4() + ip[1] += byte(clusterN) + ip[2] += byte(NamespaceN) + ip[3] += byte(ipRegistry.inc[clusterN][NamespaceN]) + + ipRegistry.inc[clusterN][NamespaceN]++ + ipRegistry.reg[regKey] = ip + + return ip, nil +} diff --git a/pkg/fwdnet/fwdnet.go b/pkg/fwdnet/fwdnet.go index 24b58671..492b10cd 100644 --- a/pkg/fwdnet/fwdnet.go +++ b/pkg/fwdnet/fwdnet.go @@ -7,13 +7,15 @@ import ( "os" "os/exec" "runtime" + + "github.com/txn2/kubefwd/pkg/fwdIp" ) // ReadyInterface prepares a local IP address on // the loopback interface. -func ReadyInterface(a byte, b byte, c byte, d int, port string) (net.IP, int, error) { +func ReadyInterface(podName string, clusterN int, namespaceN int, port string) (net.IP, error) { - ip := net.IPv4(a, b, c, byte(d)) + ip, _ := fwdIp.GetIp(podName, clusterN, namespaceN) // lo means we are probably on linux and not mac _, err := net.InterfaceByName("lo") @@ -21,58 +23,52 @@ func ReadyInterface(a byte, b byte, c byte, d int, port string) (net.IP, int, er // if no error then check to see if the ip:port are in use _, err := net.Dial("tcp", ip.String()+":"+port) if err != nil { - return ip, d + 1, nil + return ip, nil } - return ip, d + 1, errors.New("ip and port are in use") + return ip, errors.New("ip and port are in use") } - for i := d; i < 255; i++ { - - ip = net.IPv4(a, b, c, byte(i)) - - networkInterface, err := net.InterfaceByName("lo0") - if err != nil { - return net.IP{}, i, err - } + networkInterface, err := net.InterfaceByName("lo0") + if err != nil { + return net.IP{}, err + } - addrs, err := networkInterface.Addrs() - if err != nil { - return net.IP{}, i, err - } + addrs, err := networkInterface.Addrs() + if err != nil { + return net.IP{}, err + } - // check the addresses already assigned to the interface - for _, addr := range addrs { + // check the addresses already assigned to the interface + for _, addr := range addrs { - // found a match - if addr.String() == ip.String()+"/8" { - // found ip, now check for unused port - conn, err := net.Dial("tcp", ip.String()+":"+port) - if err != nil { - return net.IPv4(a, b, c, byte(i)), i + 1, nil - } - _ = conn.Close() + // found a match + if addr.String() == ip.String()+"/8" { + // found ip, now check for unused port + conn, err := net.Dial("tcp", ip.String()+":"+port) + if err != nil { + return ip, nil } + _ = conn.Close() } + } - // ip is not in the list of addrs for networkInterface - cmd := "ifconfig" - args := []string{"lo0", "alias", ip.String(), "up"} - if err := exec.Command(cmd, args...).Run(); err != nil { - fmt.Println("Cannot ifconfig lo0 alias " + ip.String() + " up") - fmt.Println("Error: " + err.Error()) - os.Exit(1) - } - - conn, err := net.Dial("tcp", ip.String()+":"+port) - if err != nil { - return net.IPv4(a, b, c, byte(i)), i + 1, nil - } - _ = conn.Close() + // ip is not in the list of addrs for networkInterface + cmd := "ifconfig" + args := []string{"lo0", "alias", ip.String(), "up"} + if err := exec.Command(cmd, args...).Run(); err != nil { + fmt.Println("Cannot ifconfig lo0 alias " + ip.String() + " up") + fmt.Println("Error: " + err.Error()) + os.Exit(1) + } + conn, err := net.Dial("tcp", ip.String()+":"+port) + if err != nil { + return ip, nil } + _ = conn.Close() - return net.IP{}, d, errors.New("unable to find an available IP/Port") + return net.IP{}, errors.New("unable to find an available IP/Port") } // RemoveInterfaceAlias can remove the Interface alias after port forwarding. diff --git a/pkg/fwdport/fwdport.go b/pkg/fwdport/fwdport.go index b765f527..712af4c9 100644 --- a/pkg/fwdport/fwdport.go +++ b/pkg/fwdport/fwdport.go @@ -21,18 +21,22 @@ import ( "k8s.io/client-go/transport/spdy" ) -// PodSyncer interface is used to represent a fwdservice.ServiceFWD reference, which cannot be used directly due to circular imports. -// It's a reference from a pod to it's parent service. +// ServiceFWD PodSyncer interface is used to represent a +// fwdservice.ServiceFWD reference, which cannot be used directly +// due to circular imports. It's a reference from a pod to it's +// parent service. type ServiceFWD interface { String() string SyncPodForwards(bool) } +// HostFileWithLock type HostFileWithLock struct { Hosts *txeh.Hosts sync.Mutex } +// HostsParams type HostsParams struct { localServiceName string nsServiceName string @@ -40,22 +44,39 @@ type HostsParams struct { svcServiceName string } +// PortForwardOpts type PortForwardOpts struct { - Out *fwdpub.Publisher - Config *restclient.Config - ClientSet *kubernetes.Clientset - RESTClient *restclient.RESTClient - Context string - Namespace string - Service string - ServiceFwd ServiceFWD - PodName string - PodPort string - LocalIp net.IP - LocalPort string - Hostfile *HostFileWithLock - ShortName bool - Remote bool + Out *fwdpub.Publisher + Config restclient.Config + ClientSet kubernetes.Clientset + RESTClient restclient.RESTClient + + Service string + ServiceFwd ServiceFWD + PodName string + PodPort string + LocalIp net.IP + LocalPort string + HostFile *HostFileWithLock + + // Context is a unique key (string) in kubectl config representing + // a user/cluster combination. Kubefwd uses context as the + // cluster name when forwarding to more than one cluster. + Context string + + // Namespace is the current Kubernetes Namespace to locate services + // and the pods that back them for port-forwarding + Namespace string + + // ClusterN is the ordinal index of the cluster (from configuration) + // cluster 0 is considered local while > 0 is remote + ClusterN int + + // NamespaceN is the ordinal index of the namespace from the + // perspective of the user. Namespace 0 is considered local + // while > 0 is an external namespace + NamespaceN int + Domain string HostsParams *HostsParams Hosts []string @@ -69,7 +90,7 @@ type PortForwardOpts struct { func (pfo *PortForwardOpts) PortForward() error { defer close(pfo.DoneChan) - transport, upgrader, err := spdy.RoundTripperFor(pfo.Config) + transport, upgrader, err := spdy.RoundTripperFor(&pfo.Config) if err != nil { return err } @@ -95,7 +116,6 @@ func (pfo *PortForwardOpts) PortForward() error { localNamedEndPoint := fmt.Sprintf("%s:%s", pfo.Service, pfo.LocalPort) - pfo.BuildTheHostsParams() pfo.AddHosts() // Wait until the stop signal is received from above @@ -150,21 +170,25 @@ func (pfo *PortForwardOpts) PortForward() error { return nil } -// this method to build the HostsParams -func (pfo *PortForwardOpts) BuildTheHostsParams() { - pfo.HostsParams = &HostsParams{} - localServiceName := pfo.Service - nsServiceName := pfo.Service + "." + pfo.Namespace - fullServiceName := fmt.Sprintf("%s.%s.svc.cluster.local", pfo.Service, pfo.Namespace) - svcServiceName := fmt.Sprintf("%s.%s.svc", pfo.Service, pfo.Namespace) - if pfo.Remote { - fullServiceName = fmt.Sprintf("%s.%s.svc.cluster.%s", pfo.Service, pfo.Namespace, pfo.Context) - } - pfo.HostsParams.localServiceName = localServiceName - pfo.HostsParams.nsServiceName = nsServiceName - pfo.HostsParams.fullServiceName = fullServiceName - pfo.HostsParams.svcServiceName = svcServiceName -} +//// BuildHostsParams constructs the basic hostnames for the service +//// based on the PortForwardOpts configuration +//func (pfo *PortForwardOpts) BuildHostsParams() { +// +// localServiceName := pfo.Service +// nsServiceName := pfo.Service + "." + pfo.Namespace +// fullServiceName := fmt.Sprintf("%s.%s.svc.cluster.local", pfo.Service, pfo.Namespace) +// svcServiceName := fmt.Sprintf("%s.%s.svc", pfo.Service, pfo.Namespace) +// +// // check if this is an additional cluster (remote from the +// // perspective of the user / argument order) +// if pfo.ClusterN > 0 { +// fullServiceName = fmt.Sprintf("%s.%s.svc.cluster.%s", pfo.Service, pfo.Namespace, pfo.Context) +// } +// pfo.HostsParams.localServiceName = localServiceName +// pfo.HostsParams.nsServiceName = nsServiceName +// pfo.HostsParams.fullServiceName = fullServiceName +// pfo.HostsParams.svcServiceName = svcServiceName +//} // AddHost func (pfo *PortForwardOpts) addHost(host string) { @@ -172,61 +196,111 @@ func (pfo *PortForwardOpts) addHost(host string) { pfo.Hosts = append(pfo.Hosts, host) // remove host if it already exists in /etc/hosts - pfo.Hostfile.Hosts.RemoveHost(host) + pfo.HostFile.Hosts.RemoveHost(host) // add host to /etc/hosts - pfo.Hostfile.Hosts.AddHost(pfo.LocalIp.String(), host) + pfo.HostFile.Hosts.AddHost(pfo.LocalIp.String(), host) } // AddHosts adds hostname entries to /etc/hosts func (pfo *PortForwardOpts) AddHosts() { - pfo.Hostfile.Lock() - if pfo.Remote { + pfo.HostFile.Lock() - pfo.Hostfile.Hosts.RemoveHost(pfo.HostsParams.fullServiceName) - pfo.Hostfile.Hosts.RemoveHost(pfo.HostsParams.svcServiceName) + // pfo.Service holds only the service name + // start with the smallest allowable hostname - if pfo.Domain != "" { - pfo.addHost(pfo.Service + "." + pfo.Domain) - } + // bare service name + if pfo.ClusterN == 0 && pfo.NamespaceN == 0 { pfo.addHost(pfo.Service) - } else { - - if pfo.ShortName { - if pfo.Domain != "" { - pfo.addHost(pfo.HostsParams.localServiceName + "." + pfo.Domain) - } - pfo.addHost(pfo.HostsParams.localServiceName) + if pfo.Domain != "" { + pfo.addHost(fmt.Sprintf( + "%s.%s", + pfo.Service, + pfo.Domain, + )) } + } - pfo.addHost(pfo.HostsParams.fullServiceName) - pfo.addHost(pfo.HostsParams.svcServiceName) + // alternate cluster / first namespace + if pfo.ClusterN > 0 && pfo.NamespaceN == 0 { + pfo.addHost(fmt.Sprintf( + "%s.%s", + pfo.Service, + pfo.Context, + )) + } + + // namespaced without cluster + if pfo.ClusterN == 0 { + pfo.addHost(fmt.Sprintf( + "%s.%s", + pfo.Service, + pfo.Namespace, + )) + + pfo.addHost(fmt.Sprintf( + "%s.%s.svc", + pfo.Service, + pfo.Namespace, + )) + + pfo.addHost(fmt.Sprintf( + "%s.%s.svc.cluster.local", + pfo.Service, + pfo.Namespace, + )) if pfo.Domain != "" { - pfo.addHost(pfo.HostsParams.nsServiceName + "." + pfo.Domain) + pfo.addHost(fmt.Sprintf( + "%s.%s.svc.cluster.%s", + pfo.Service, + pfo.Namespace, + pfo.Domain, + )) } - pfo.addHost(pfo.HostsParams.nsServiceName) + } - err := pfo.Hostfile.Hosts.Save() + pfo.addHost(fmt.Sprintf( + "%s.%s.%s", + pfo.Service, + pfo.Namespace, + pfo.Context, + )) + + pfo.addHost(fmt.Sprintf( + "%s.%s.svc.%s", + pfo.Service, + pfo.Namespace, + pfo.Context, + )) + + pfo.addHost(fmt.Sprintf( + "%s.%s.svc.cluster.%s", + pfo.Service, + pfo.Namespace, + pfo.Context, + )) + + err := pfo.HostFile.Hosts.Save() if err != nil { log.Error("Error saving hosts file", err) } - pfo.Hostfile.Unlock() + pfo.HostFile.Unlock() } // removeHosts removes hosts /etc/hosts // associated with a forwarded pod func (pfo *PortForwardOpts) removeHosts() { - // we should lock the pfo.Hostfile here + // we should lock the pfo.HostFile here // because sometimes other goroutine write the *txeh.Hosts - pfo.Hostfile.Lock() + pfo.HostFile.Lock() // other applications or process may have written to /etc/hosts // since it was originally updated. - err := pfo.Hostfile.Hosts.Reload() + err := pfo.HostFile.Hosts.Reload() if err != nil { log.Error("Unable to reload /etc/hosts: " + err.Error()) return @@ -235,15 +309,15 @@ func (pfo *PortForwardOpts) removeHosts() { // remove all hosts for _, host := range pfo.Hosts { log.Debugf("Removing host %s for pod %s in namespace %s from context %s", host, pfo.PodName, pfo.Namespace, pfo.Context) - pfo.Hostfile.Hosts.RemoveHost(host) + pfo.HostFile.Hosts.RemoveHost(host) } // fmt.Printf("Delete Host And Save !\r\n") - err = pfo.Hostfile.Hosts.Save() + err = pfo.HostFile.Hosts.Save() if err != nil { log.Errorf("Error saving /etc/hosts: %s\n", err.Error()) } - pfo.Hostfile.Unlock() + pfo.HostFile.Unlock() } // removeInterfaceAlias called on stop signal to diff --git a/pkg/fwdservice/fwdservice.go b/pkg/fwdservice/fwdservice.go index 811425c6..44e49cf0 100644 --- a/pkg/fwdservice/fwdservice.go +++ b/pkg/fwdservice/fwdservice.go @@ -21,31 +21,54 @@ import ( // ServiceFWD Single service to forward, with a reference to // all the pods being forwarded for it type ServiceFWD struct { - ClientSet *kubernetes.Clientset - Context string - Namespace string + ClientSet kubernetes.Clientset ListOptions metav1.ListOptions Hostfile *fwdport.HostFileWithLock - ClientConfig *restclient.Config - RESTClient *restclient.RESTClient - ShortName bool - Remote bool - IpC byte - IpD *int - Domain string - - PodLabelSelector string // The label selector to query for matching pods. - NamespaceIPLock *sync.Mutex // Synchronization for IP handout for each portforward - Svc *v1.Service // Reference to the k8s service. - Headless bool // A headless service will forward all of the pods, while normally only a single pod is forwarded. - LastSyncedAt time.Time // When was the set of pods last synced + ClientConfig restclient.Config + RESTClient restclient.RESTClient + + // Context is a unique key (string) in kubectl config representing + // a user/cluster combination. Kubefwd uses context as the + // cluster name when forwarding to more than one cluster. + Context string + + // Namespace is the current Kubernetes Namespace to locate services + // and the pods that back them for port-forwarding + Namespace string + + // ClusterN is the ordinal index of the cluster (from configuration) + // cluster 0 is considered local while > 0 is remote + ClusterN int + + // NamespaceN is the ordinal index of the namespace from the + // perspective of the user. Namespace 0 is considered local + // while > 0 is an external namespace + NamespaceN int + + // FwdInc the forward increment for ip + FwdInc *int + + // Domain is specified by the user and used in place of .local + Domain string + + PodLabelSelector string // The label selector to query for matching pods. + NamespaceServiceLock *sync.Mutex // + Svc *v1.Service // Reference to the k8s service. + + // Headless service will forward all of the pods, + // while normally only a single pod is forwarded. + Headless bool + + LastSyncedAt time.Time // When was the set of pods last synced // A mapping of all the pods currently being forwarded. - // key = podname + // key = podName PortForwards map[string]*fwdport.PortForwardOpts DoneChannel chan struct{} // After shutdown is complete, this channel will be closed } +// String representation of a ServiceFWD returns a unique name +// in the form SERVICE_NAME.NAMESPACE.CONTEXT func (svcFwd *ServiceFWD) String() string { return svcFwd.Svc.Name + "." + svcFwd.Namespace + "." + svcFwd.Context } @@ -169,8 +192,8 @@ func (svcFwd *ServiceFWD) LoopPodsToForward(pods []v1.Pod, includePodNameInHost // Ip address handout is a critical section for synchronization, // use a lock which synchronizes inside each namespace. - svcFwd.NamespaceIPLock.Lock() - defer svcFwd.NamespaceIPLock.Unlock() + svcFwd.NamespaceServiceLock.Lock() + defer svcFwd.NamespaceServiceLock.Unlock() for _, pod := range pods { // If pod is already configured to be forwarded, skip it @@ -181,11 +204,10 @@ func (svcFwd *ServiceFWD) LoopPodsToForward(pods []v1.Pod, includePodNameInHost podPort := "" svcName := "" - localIp, dInc, err := fwdnet.ReadyInterface(127, 1, svcFwd.IpC, *svcFwd.IpD, podPort) + localIp, err := fwdnet.ReadyInterface(pod.Name, svcFwd.ClusterN, svcFwd.NamespaceN, podPort) if err != nil { log.Warnf("WARNING: error readying interface: %s\n", err) } - *svcFwd.IpD = dInc for _, port := range svcFwd.Svc.Spec.Ports { @@ -200,33 +222,33 @@ func (svcFwd *ServiceFWD) LoopPodsToForward(pods []v1.Pod, includePodNameInHost } serviceHostName := svcFwd.Svc.Name + svcName = svcFwd.Svc.Name if includePodNameInHost { serviceHostName = pod.Name + "." + serviceHostName + svcName = pod.Name + "." + serviceHostName } - if !svcFwd.ShortName { + // if this is not the first namespace on the + // first cluster then append the namespace + if svcFwd.NamespaceN > 0 { serviceHostName = serviceHostName + "." + pod.Namespace } - if svcFwd.Domain != "" { - serviceHostName = serviceHostName + "." + svcFwd.Domain + // if this is not the first cluster append the full + // host name + if svcFwd.ClusterN > 0 { + serviceHostName = serviceHostName + "." + svcFwd.Context } - if svcFwd.Remote { - serviceHostName = fmt.Sprintf("%s.svc.cluster.%s", serviceHostName, svcFwd.Context) - } - - svcName = serviceHostName - - log.Debugf("Resolving: %s to %s\n", - svcName, + log.Debugf("Resolving: %s to %s\n", + serviceHostName, localIp.String(), ) log.Printf("Port-Forward: %s %s:%d to pod %s:%s\n", localIp.String(), - svcName, + serviceHostName, port.Port, pod.Name, podPort, @@ -245,9 +267,9 @@ func (svcFwd *ServiceFWD) LoopPodsToForward(pods []v1.Pod, includePodNameInHost PodPort: podPort, LocalIp: localIp, LocalPort: localPort, - Hostfile: svcFwd.Hostfile, - ShortName: svcFwd.ShortName, - Remote: svcFwd.Remote, + HostFile: svcFwd.Hostfile, + ClusterN: svcFwd.ClusterN, + NamespaceN: svcFwd.NamespaceN, Domain: svcFwd.Domain, ManualStopChan: make(chan struct{}), @@ -279,22 +301,22 @@ func (svcFwd *ServiceFWD) LoopPodsToForward(pods []v1.Pod, includePodNameInHost // AddServicePod func (svcFwd *ServiceFWD) AddServicePod(pfo *fwdport.PortForwardOpts) { - svcFwd.NamespaceIPLock.Lock() + svcFwd.NamespaceServiceLock.Lock() ServicePod := pfo.Service + "." + pfo.PodName if _, found := svcFwd.PortForwards[ServicePod]; !found { svcFwd.PortForwards[ServicePod] = pfo } - svcFwd.NamespaceIPLock.Unlock() + svcFwd.NamespaceServiceLock.Unlock() } // ListServicePodNames func (svcFwd *ServiceFWD) ListServicePodNames() []string { - svcFwd.NamespaceIPLock.Lock() + svcFwd.NamespaceServiceLock.Lock() currentPodNames := make([]string, 0, len(svcFwd.PortForwards)) for podName := range svcFwd.PortForwards { currentPodNames = append(currentPodNames, podName) } - svcFwd.NamespaceIPLock.Unlock() + svcFwd.NamespaceServiceLock.Unlock() return currentPodNames } @@ -302,9 +324,9 @@ func (svcFwd *ServiceFWD) RemoveServicePod(servicePodName string) { if pod, found := svcFwd.PortForwards[servicePodName]; found { pod.Stop() <-pod.DoneChan - svcFwd.NamespaceIPLock.Lock() + svcFwd.NamespaceServiceLock.Lock() delete(svcFwd.PortForwards, servicePodName) - svcFwd.NamespaceIPLock.Unlock() + svcFwd.NamespaceServiceLock.Unlock() } } diff --git a/pkg/fwdsvcregistry/fwdsvcregistry.go b/pkg/fwdsvcregistry/fwdsvcregistry.go index a2249c3b..78d03767 100644 --- a/pkg/fwdsvcregistry/fwdsvcregistry.go +++ b/pkg/fwdsvcregistry/fwdsvcregistry.go @@ -61,7 +61,7 @@ func Add(serviceFwd *fwdservice.ServiceFWD) { defer svcRegistry.mutex.Unlock() if _, found := svcRegistry.services[serviceFwd.String()]; found { - log.Debugf("Registry: found existing service %s") + log.Debugf("Registry: found existing service %s", serviceFwd.String()) return } From a60f977998bf75384e27380e5175c35ac2abffaf Mon Sep 17 00:00:00 2001 From: Craig Johnston <cj@imti.co> Date: Sun, 18 Oct 2020 22:22:36 -0700 Subject: [PATCH 13/14] go mod tidy --- go.mod | 2 -- go.sum | 5 ----- 2 files changed, 7 deletions(-) diff --git a/go.mod b/go.mod index 3ec304b3..50d7511d 100644 --- a/go.mod +++ b/go.mod @@ -3,9 +3,7 @@ module github.com/txn2/kubefwd go 1.13 require ( - github.com/davecgh/go-spew v1.1.1 github.com/elazarl/goproxy v0.0.0-20181111060418-2ce16c963a8a // indirect - github.com/pingcap/errors v0.11.4 github.com/pkg/errors v0.8.1 github.com/sirupsen/logrus v1.4.2 github.com/spf13/cobra v0.0.5 diff --git a/go.sum b/go.sum index a44625f5..0aca29d3 100644 --- a/go.sum +++ b/go.sum @@ -188,8 +188,6 @@ github.com/opencontainers/go-digest v1.0.0-rc1/go.mod h1:cMLVZDEM3+U2I4VmLI6N8jQ github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= github.com/peterbourgon/diskv v2.0.1+incompatible h1:UBdAOUP5p4RWqPBg048CAvpKN+vxiaj6gdUUzhl4XmI= github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU= -github.com/pingcap/errors v0.11.4 h1:lFuQV/oaUMGcD2tqt+01ROSmJs75VG1ToEOkZIZ4nE4= -github.com/pingcap/errors v0.11.4/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8= github.com/pkg/errors v0.8.0 h1:WdK/asTD0HN+q6hsWO3/vpuAkAr+tw6aNJNDFFf0+qw= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= @@ -350,13 +348,10 @@ k8s.io/api v0.0.0-20191108065827-59e77acf588f h1:ZTXCVdYGBbAblNUJ5B19ztoy6WHMNrP k8s.io/api v0.0.0-20191108065827-59e77acf588f/go.mod h1:uQDmBYHoPSuhbg8FGTRzrOdaNqLiws/LAtBrHv0kN5U= k8s.io/apimachinery v0.0.0-20191108065633-c18f71bf2947 h1:f3H3Rf7KD9fjmmbIMwxBye3ctEuXnbskaX/l1xy+68E= k8s.io/apimachinery v0.0.0-20191108065633-c18f71bf2947/go.mod h1:nEP/6rwhzfljWYGVS6pfyES3ipZTR19vzMnSM+ur3ho= -k8s.io/apimachinery v0.18.3 h1:pOGcbVAhxADgUYnjS08EFXs9QMl8qaH5U4fr5LGUrSk= k8s.io/cli-runtime v0.0.0-20191108072024-9fe36560f3af h1:lWKOjXCJ/PVS6pJvHcDs1RyG47Zq+WFjs/DvK1CLNP0= k8s.io/cli-runtime v0.0.0-20191108072024-9fe36560f3af/go.mod h1:gS0U9D/luQtMGiNl4Y5IhcrmW+xwbuTiqPLv+plcW20= k8s.io/client-go v0.0.0-20191108070106-f8f007fd456c h1:TaCF427jtkNsXDzSNXOFFox1DePy6WX9Nf8E1vriHys= k8s.io/client-go v0.0.0-20191108070106-f8f007fd456c/go.mod h1:D6hkzmLWI59QLvDVt8tlD5J2X1YDjcattS6vw8lP1hc= -k8s.io/client-go v1.5.1 h1:XaX/lo2/u3/pmFau8HN+sB5C/b4dc4Dmm2eXjBH4p1E= -k8s.io/client-go v11.0.0+incompatible h1:LBbX2+lOwY9flffWlJM7f1Ct8V2SRNiMRDFeiwnJo9o= k8s.io/code-generator v0.0.0-20191108065441-3c1097069dc3/go.mod h1:OJTI2RPXj6kq4bfFqT1JrTEC1S4toTWinGOm1O8jUuY= k8s.io/component-base v0.0.0-20191108070619-4b9966ca0181 h1:dAWmrp5YxoO7PAbfQ1BAHYAdmoGLb/NbusxGyladzDA= k8s.io/component-base v0.0.0-20191108070619-4b9966ca0181/go.mod h1:OnLnQI0ti1wNBL+pCumQcG9Plt1S6L+lOXsQwQJ+m9g= From a21d2fb5361c1c3d5229f895222d7dce15eab34a Mon Sep 17 00:00:00 2001 From: Craig Johnston <cj@imti.co> Date: Sun, 18 Oct 2020 22:53:25 -0700 Subject: [PATCH 14/14] updated goreleaser config --- .goreleaser.yml | 267 ++++++++++++++++++------------------------------ 1 file changed, 98 insertions(+), 169 deletions(-) diff --git a/.goreleaser.yml b/.goreleaser.yml index 53bf0a7d..74599225 100644 --- a/.goreleaser.yml +++ b/.goreleaser.yml @@ -1,179 +1,108 @@ -# Build customization -build: - # Path to main.go file. - # Default is `main.go` - main: ./cmd/kubefwd/kubefwd.go - binary: kubefwd - - env: +env: - GO111MODULE=on - GOPROXY=https://gocenter.io - - CGO_ENABLED=0 - - # GOOS list to build in. - # For more info refer to https://golang.org/doc/install/source#environment - # Defaults are darwin and linux - goos: - - linux - - darwin - - arm - - windows +before: + hooks: + - go mod download + +builds: + - id: kubefwd + main: ./cmd/kubefwd/kubefwd.go + binary: kubefwd + goos: + - linux + - darwin + - windows + goarch: + - 386 + - amd64 + - arm + - arm64 + mod_timestamp: '{{ .CommitTimestamp }}' + flags: + - -trimpath + ldflags: -s -w -X main.Version={{.Version}} + +checksum: + name_template: '{{ .ProjectName }}_checksums.txt' + +changelog: + sort: asc + filters: + exclude: + - '^docs:' + - '^test:' + - Merge pull request + - Merge branch + - go mod tidy - # GOARCH to build in. - # For more info refer to https://golang.org/doc/install/source#environment - # Defaults are 386 and amd64 - goarch: - - amd64 - - arm - - ldflags: -s -w -X main.Version={{.Version}} +dockers: + - + goos: linux + goarch: amd64 + goarm: '' + binaries: + - kubefwd + dockerfile: Dockerfile + image_templates: + - "txn2/kubefwd:latest" + - "txn2/kubefwd:{{ .Tag }}" + - "txn2/kubefwd:v{{ .Major }}" + - "txn2/kubefwd:amd64-{{ .Tag }}" + - "txn2/kubefwd:amd64-v{{ .Major }}" + build_flag_templates: + - "--label=org.label-schema.schema-version=1.0" + - "--label=org.label-schema.version={{.Version}}" + - "--label=org.label-schema.name={{.ProjectName}}" + +archives: + - name_template: '{{ .ProjectName }}_{{ .Os }}_{{ .Arch }}{{ if .Arm }}v{{ .Arm }}{{ end }}' + replacements: + darwin: Darwin + linux: Linux + windows: Windows + 386: i386 + amd64: x86_64 + format_overrides: + - goos: windows + format: zip + +nfpms: + - file_name_template: '{{ .ProjectName }}_{{ .Arch }}{{ if .Arm }}v{{ .Arm }}{{ end }}' + homepage: https://github.com/txn2/kubefwd + description: Kubernetes bulk port forwarding utility. + maintainer: Craig Johnston https://twitter.com/cjimti + license: Apache 2.0 + vendor: https://github.com/txn2 + formats: + - apk + - deb + - rpm + dependencies: + - git + recommends: + - golang release: - # Repo in which the release will be created. - # Default is extracted from the origin remote URL. github: owner: txn2 name: kubefwd - - # If set to true, will not auto-publish the release. - # Default is false. - draft: false - - # If set to true, will mark the release as not ready for production. - # Default is false. - prerelease: false - - # You can change the name of the GitHub release. - # Default is `` name_template: "{{.ProjectName}}-v{{.Version}} {{.Env.USER}}" - # You can disable this pipe in order to not upload any artifacts to - # GitHub. - # Defaults to false. - disable: false - -nfpm: - name_template: '{{ .ProjectName }}_{{ .Arch }}{{ if .Arm }}v{{ .Arm }}{{ end }}' - homepage: https://github.com/txn2/kubefwd - description: Kubernetes bulk port forwarding utility. - maintainer: Craig Johnston <cjimti@gmail.com> - license: Apache 2.0 - vendor: TXN2 - formats: - - deb - - rpm - recommends: - - rpm - -# Archive customization -archive: - # You can change the name of the archive. - # This is parsed with Golang template engine and the following variables. - name_template: "{{.ProjectName}}_{{.Os}}_{{.Arch}}" - - # Archive format. Valid options are `tar.gz` and `zip`. - # Default is `zip` - format: tar.gz - - # Replacements for GOOS and GOARCH on the archive name. - # The keys should be valid GOOS or GOARCH values followed by your custom - # replacements. - # By default, `replacements` replace GOOS and GOARCH values with valid outputs - # of `uname -s` and `uname -m` respectively. - replacements: - amd64: amd64 - 386: 386 - darwin: Darwin - linux: linux - - format_overrides: - - goos: windows - format: zip - - # Additional files you want to add to the archive. - # Defaults are any files matching `LICENCE*`, `LICENSE*`, - # `README*` and `CHANGELOG*` (case-insensitive) - files: - - LICENSE - -brew: - name: kubefwd - - github: - owner: txn2 - name: homebrew-tap - - commit_author: - name: Craig Johnston - email: cjimti@gmail.com - - folder: Formula - - homepage: https://github.com/txn2/kubefwd - - description: "Kubernetes bulk port forwarding utility." - - skip_upload: false - - test: |- - kubefwd version - -snapcraft: - name_template: '{{ .ProjectName }}_{{ .Arch }}{{ if .Arm }}v{{ .Arm }}{{ end }}' - summary: Kubernetes bulk port forwarding command link utility for local development. - description: | - Kubernetes bulk port forwarding utility for local development. - kubefwd allows you to port forward all Kubernetes Pods connected to Services on one or more - Namespaces and contexts (clusters). - See https://github.com/txn2/kubefwd - - replacements: - amd64: 64-bit - 386: 32-bit - darwin: macOS - linux: Tux - - publish: false - - grade: stable - confinement: strict - - apps: - kubefwd: - plugs: ["home", "network", "network-bind"] - - -dockers: -- - goos: linux - goarch: amd64 - goarm: '' - binaries: - - kubefwd - dockerfile: Dockerfile - image_templates: - - "txn2/kubefwd:latest" - - "txn2/kubefwd:{{ .Tag }}" - - "txn2/kubefwd:v{{ .Major }}" - - "txn2/kubefwd:amd64-{{ .Tag }}" - - "txn2/kubefwd:amd64-v{{ .Major }}" - build_flag_templates: - - "--label=org.label-schema.schema-version=1.0" - - "--label=org.label-schema.version={{.Version}}" - - "--label=org.label-schema.name={{.ProjectName}}" -- - goos: linux - goarch: arm - goarm: 6 - binaries: - - kubefwd - dockerfile: Dockerfile - image_templates: - - "txn2/kubefwd:armv6-latest" - - "txn2/kubefwd:armv6-{{ .Tag }}" - - "txn2/kubefwd:armv6-{{ .Tag }}-arm6" - - "txn2/kubefwd:armv6-v{{ .Major }}" - build_flag_templates: - - "--label=org.label-schema.schema-version=1.0" - - "--label=org.label-schema.version={{.Version}}" - - "--label=org.label-schema.name={{.ProjectName}}" \ No newline at end of file +brews: + - name: kubefwd + tap: + owner: txn2 + name: homebrew-tap + commit_author: + name: Craig Johnston + email: cj@imti.co + folder: Formula + homepage: https://github.com/txn2/kubefwd + description: "Kubernetes bulk port forwarding utility." + skip_upload: false + dependencies: + - name: kubectl + type: optional + test: |- + kubefwd version \ No newline at end of file