From dd21a55a1e6c6f59a6dd61dc36a5559b77399ee6 Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Fri, 20 Jul 2018 21:57:05 +0000 Subject: [PATCH 1/8] Refactor: implement kubernetes tagger in separate struct --- probe/kubernetes/reporter.go | 9 ++++++++- probe/kubernetes/reporter_test.go | 3 +-- prog/probe.go | 2 +- 3 files changed, 10 insertions(+), 4 deletions(-) diff --git a/probe/kubernetes/reporter.go b/probe/kubernetes/reporter.go index 3dfa8c7939..f7e20acf0f 100644 --- a/probe/kubernetes/reporter.go +++ b/probe/kubernetes/reporter.go @@ -238,8 +238,15 @@ func isPauseContainer(n report.Node, rpt report.Report) bool { return false } +// Tagger adds pod parents to container nodes. +type Tagger struct { +} + +// Name of this tagger, for metrics gathering +func (Tagger) Name() string { return "K8s" } + // Tag adds pod parents to container nodes. -func (r *Reporter) Tag(rpt report.Report) (report.Report, error) { +func (r *Tagger) Tag(rpt report.Report) (report.Report, error) { for id, n := range rpt.Container.Nodes { uid, ok := n.Latest.Lookup(docker.LabelPrefix + "io.kubernetes.pod.uid") if !ok { diff --git a/probe/kubernetes/reporter_test.go b/probe/kubernetes/reporter_test.go index 70e083b3a6..bce58cd3a8 100644 --- a/probe/kubernetes/reporter_test.go +++ b/probe/kubernetes/reporter_test.go @@ -328,8 +328,7 @@ func TestTagger(t *testing.T) { docker.LabelPrefix + "io.kubernetes.pod.uid": "123456", })) - hr := controls.NewDefaultHandlerRegistry() - rpt, err := kubernetes.NewReporter(newMockClient(), nil, "", "", nil, hr, "", 0).Tag(rpt) + rpt, err := (&kubernetes.Tagger{}).Tag(rpt) if err != nil { t.Errorf("Unexpected error: %v", err) } diff --git a/prog/probe.go b/prog/probe.go index 756a5d00bf..abba705020 100644 --- a/prog/probe.go +++ b/prog/probe.go @@ -273,7 +273,7 @@ func probeMain(flags probeFlags, targets []appclient.Target) { reporter := kubernetes.NewReporter(client, clients, probeID, hostID, p, handlerRegistry, flags.kubernetesNodeName, flags.kubernetesKubeletPort) defer reporter.Stop() p.AddReporter(reporter) - p.AddTagger(reporter) + p.AddTagger(&kubernetes.Tagger{}) } else { log.Errorf("Kubernetes: failed to start client: %v", err) log.Errorf("Kubernetes: make sure to run Scope inside a POD with a service account or provide valid probe.kubernetes.* flags") From 88049b081754808b8956d588bcb448ce11339634 Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Fri, 20 Jul 2018 22:10:13 +0000 Subject: [PATCH 2/8] Add option for Kubernetes tagging when kubernetes probing disabled This enables us to run Kubernetes probing on one node for the whole cluster. --- prog/main.go | 2 ++ prog/probe.go | 5 ++++- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/prog/main.go b/prog/main.go index 9630b816bf..d9e51ec68e 100644 --- a/prog/main.go +++ b/prog/main.go @@ -124,6 +124,7 @@ type probeFlags struct { criEndpoint string kubernetesEnabled bool + kubernetesTagOnly bool kubernetesNodeName string kubernetesClientConfig kubernetes.ClientConfig kubernetesKubeletPort uint @@ -314,6 +315,7 @@ func setupFlags(flags *flags) { // K8s flag.BoolVar(&flags.probe.kubernetesEnabled, "probe.kubernetes", false, "collect kubernetes-related attributes for containers") + flag.BoolVar(&flags.probe.kubernetesTagOnly, "probe.kubernetes-tag", false, "tag containers with kubernetes parents") flag.StringVar(&flags.probe.kubernetesClientConfig.Server, "probe.kubernetes.api", "", "The address and port of the Kubernetes API server (deprecated in favor of equivalent probe.kubernetes.server)") flag.StringVar(&flags.probe.kubernetesClientConfig.CertificateAuthority, "probe.kubernetes.certificate-authority", "", "Path to a cert. file for the certificate authority") flag.StringVar(&flags.probe.kubernetesClientConfig.ClientCertificate, "probe.kubernetes.client-certificate", "", "Path to a client certificate file for TLS") diff --git a/prog/probe.go b/prog/probe.go index abba705020..a45833d173 100644 --- a/prog/probe.go +++ b/prog/probe.go @@ -273,13 +273,16 @@ func probeMain(flags probeFlags, targets []appclient.Target) { reporter := kubernetes.NewReporter(client, clients, probeID, hostID, p, handlerRegistry, flags.kubernetesNodeName, flags.kubernetesKubeletPort) defer reporter.Stop() p.AddReporter(reporter) - p.AddTagger(&kubernetes.Tagger{}) } else { log.Errorf("Kubernetes: failed to start client: %v", err) log.Errorf("Kubernetes: make sure to run Scope inside a POD with a service account or provide valid probe.kubernetes.* flags") } } + if flags.kubernetesEnabled || flags.kubernetesTagOnly { + p.AddTagger(&kubernetes.Tagger{}) + } + if flags.ecsEnabled { reporter := awsecs.Make(flags.ecsCacheSize, flags.ecsCacheExpiry, flags.ecsClusterRegion, handlerRegistry, probeID) defer reporter.Stop() From 38ea862bfd6e0660875e25c4f29c29595c7e75ea Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Fri, 20 Jul 2018 22:10:49 +0000 Subject: [PATCH 3/8] Check if dockerBridge is nonblank before using it This gives us the option of disabling the function --- prog/probe.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/prog/probe.go b/prog/probe.go index a45833d173..c1463ea85f 100644 --- a/prog/probe.go +++ b/prog/probe.go @@ -233,7 +233,7 @@ func probeMain(flags probeFlags, targets []appclient.Target) { if flags.dockerEnabled { // Don't add the bridge in Kubernetes since container IPs are global and // shouldn't be scoped - if !flags.kubernetesEnabled { + if flags.dockerBridge != "" && !flags.kubernetesEnabled { if err := report.AddLocalBridge(flags.dockerBridge); err != nil { log.Errorf("Docker: problem with bridge %s: %v", flags.dockerBridge, err) } From 98d52bd48006d8b2b9df21896dd772ec4a98f0d2 Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Sun, 22 Jul 2018 21:37:34 +0000 Subject: [PATCH 4/8] Allow kubelet port to be disabled --- probe/kubernetes/reporter.go | 2 +- prog/main.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/probe/kubernetes/reporter.go b/probe/kubernetes/reporter.go index f7e20acf0f..58f5b13415 100644 --- a/probe/kubernetes/reporter.go +++ b/probe/kubernetes/reporter.go @@ -561,7 +561,7 @@ func (r *Reporter) podTopology(services []Service, deployments []Deployment, dae } var localPodUIDs map[string]struct{} - if r.nodeName == "" { + if r.nodeName == "" && r.kubeletPort != 0 { // We don't know the node name: fall back to obtaining the local pods from kubelet var err error localPodUIDs, err = GetLocalPodUIDs(fmt.Sprintf("127.0.0.1:%d", r.kubeletPort)) diff --git a/prog/main.go b/prog/main.go index d9e51ec68e..b133a69187 100644 --- a/prog/main.go +++ b/prog/main.go @@ -330,7 +330,7 @@ func setupFlags(flags *flags) { flag.StringVar(&flags.probe.kubernetesClientConfig.User, "probe.kubernetes.user", "", "The name of the kubeconfig user to use") flag.StringVar(&flags.probe.kubernetesClientConfig.Username, "probe.kubernetes.username", "", "Username for basic authentication to the API server") flag.StringVar(&flags.probe.kubernetesNodeName, "probe.kubernetes.node-name", "", "Name of this node, for filtering pods") - flag.UintVar(&flags.probe.kubernetesKubeletPort, "probe.kubernetes.kubelet-port", 10255, "Node-local TCP port for contacting kubelet") + flag.UintVar(&flags.probe.kubernetesKubeletPort, "probe.kubernetes.kubelet-port", 10255, "Node-local TCP port for contacting kubelet (zero to disable)") // AWS ECS flag.BoolVar(&flags.probe.ecsEnabled, "probe.ecs", false, "Collect ecs-related attributes for containers on this node") From 1279a02b7dc3ec2c412b8711b01d3184ce058381 Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Sun, 22 Jul 2018 21:59:50 +0000 Subject: [PATCH 5/8] Stop tagging pods with host ID So they can be reported centrally, find the pod host ID from the child containers. --- probe/host/tagger.go | 3 ++- render/pod.go | 36 ++++++++++++++++++++++++++++-------- 2 files changed, 30 insertions(+), 9 deletions(-) diff --git a/probe/host/tagger.go b/probe/host/tagger.go index 29e5413243..f466c96e71 100644 --- a/probe/host/tagger.go +++ b/probe/host/tagger.go @@ -30,7 +30,8 @@ func (t Tagger) Tag(r report.Report) (report.Report, error) { // Explicitly don't tag Endpoints, Addresses and Overlay nodes - These topologies include pseudo nodes, // and as such do their own host tagging. - for _, topology := range []report.Topology{r.Process, r.Container, r.ContainerImage, r.Host, r.Pod} { + // Don't tag Pods so they can be reported centrally. + for _, topology := range []report.Topology{r.Process, r.Container, r.ContainerImage, r.Host} { for _, node := range topology.Nodes { topology.ReplaceNode(node.WithLatests(metadata).WithParent(report.Host, t.hostNodeID)) } diff --git a/render/pod.go b/render/pod.go index c08ab0e515..d03b71b13a 100644 --- a/render/pod.go +++ b/render/pod.go @@ -53,14 +53,16 @@ var PodRenderer = Memoise(ConditionalRenderer(renderKubernetesTopologies, }, MakeReduce( PropagateSingleMetrics(report.Container, - Map2Parent{topologies: []string{report.Pod}, noParentsPseudoID: UnmanagedID, - chainRenderer: MakeFilter( - ComposeFilterFuncs( - IsRunning, - Complement(isPauseContainer), - ), - ContainerWithImageNameRenderer, - )}, + MakeMap(propagateHostID, + Map2Parent{topologies: []string{report.Pod}, noParentsPseudoID: UnmanagedID, + chainRenderer: MakeFilter( + ComposeFilterFuncs( + IsRunning, + Complement(isPauseContainer), + ), + ContainerWithImageNameRenderer, + )}, + ), ), ConnectionJoin(MapPod2IP, report.Pod), KubernetesVolumesRenderer, @@ -68,6 +70,24 @@ var PodRenderer = Memoise(ConditionalRenderer(renderKubernetesTopologies, ), )) +// Pods are not tagged with a Host ID, but their container children are. +// If n doesn't already have a host ID, copy it from one of the children +func propagateHostID(n report.Node) report.Node { + if _, found := n.Latest.Lookup(report.HostNodeID); found { + return n + } + var first *report.Node + n.Children.ForEach(func(child report.Node) { + if first == nil { + first = &child + } + }) + if first != nil { + n.Latest = n.Latest.Propagate(first.Latest, report.HostNodeID) + } + return n +} + // PodServiceRenderer is a Renderer which produces a renderable kubernetes services // graph by merging the pods graph and the services topology. // From 78eaf93c2159fed1ad1b2d5f0299976cefa9a356 Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Thu, 11 Oct 2018 18:10:39 +0000 Subject: [PATCH 6/8] Make flag names easier to understand Now you specify a role instead of controlling the internal behaviour --- prog/main.go | 4 ++-- prog/probe.go | 18 ++++++++++++++++-- 2 files changed, 18 insertions(+), 4 deletions(-) diff --git a/prog/main.go b/prog/main.go index b133a69187..13d1ae1a01 100644 --- a/prog/main.go +++ b/prog/main.go @@ -124,7 +124,7 @@ type probeFlags struct { criEndpoint string kubernetesEnabled bool - kubernetesTagOnly bool + kubernetesRole string kubernetesNodeName string kubernetesClientConfig kubernetes.ClientConfig kubernetesKubeletPort uint @@ -315,7 +315,7 @@ func setupFlags(flags *flags) { // K8s flag.BoolVar(&flags.probe.kubernetesEnabled, "probe.kubernetes", false, "collect kubernetes-related attributes for containers") - flag.BoolVar(&flags.probe.kubernetesTagOnly, "probe.kubernetes-tag", false, "tag containers with kubernetes parents") + flag.StringVar(&flags.probe.kubernetesRole, "probe.kubernetes.role", "", "host, cluster or blank for everything") flag.StringVar(&flags.probe.kubernetesClientConfig.Server, "probe.kubernetes.api", "", "The address and port of the Kubernetes API server (deprecated in favor of equivalent probe.kubernetes.server)") flag.StringVar(&flags.probe.kubernetesClientConfig.CertificateAuthority, "probe.kubernetes.certificate-authority", "", "Path to a cert. file for the certificate authority") flag.StringVar(&flags.probe.kubernetesClientConfig.ClientCertificate, "probe.kubernetes.client-certificate", "", "Path to a client certificate file for TLS") diff --git a/prog/probe.go b/prog/probe.go index c1463ea85f..9896e9f326 100644 --- a/prog/probe.go +++ b/prog/probe.go @@ -41,6 +41,9 @@ import ( const ( versionCheckPeriod = 6 * time.Hour defaultServiceHost = "https://cloud.weave.works.:443" + + kubernetesRoleHost = "host" + kubernetesRoleCluster = "cluster" ) var ( @@ -230,6 +233,17 @@ func probeMain(flags probeFlags, targets []appclient.Target) { defer endpointReporter.Stop() p.AddReporter(endpointReporter) + switch flags.kubernetesRole { + case "": // nothing special + case kubernetesRoleHost: + flags.kubernetesEnabled = true + case kubernetesRoleCluster: + flags.kubernetesKubeletPort = 0 + flags.kubernetesEnabled = true + default: + log.Warnf("unrecognized --probe.kubernetes.role: %s", flags.kubernetesRole) + } + if flags.dockerEnabled { // Don't add the bridge in Kubernetes since container IPs are global and // shouldn't be scoped @@ -267,7 +281,7 @@ func probeMain(flags probeFlags, targets []appclient.Target) { } } - if flags.kubernetesEnabled { + if flags.kubernetesEnabled && flags.kubernetesRole != kubernetesRoleHost { if client, err := kubernetes.NewClient(flags.kubernetesClientConfig); err == nil { defer client.Stop() reporter := kubernetes.NewReporter(client, clients, probeID, hostID, p, handlerRegistry, flags.kubernetesNodeName, flags.kubernetesKubeletPort) @@ -279,7 +293,7 @@ func probeMain(flags probeFlags, targets []appclient.Target) { } } - if flags.kubernetesEnabled || flags.kubernetesTagOnly { + if flags.kubernetesEnabled { p.AddTagger(&kubernetes.Tagger{}) } From fb96fe0024bc5df495f88c8a88e146d4812d5cc1 Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Fri, 12 Oct 2018 11:52:34 +0000 Subject: [PATCH 7/8] Turn everything else off in Kubernetes cluster probe --- prog/probe.go | 26 +++++++++++++++----------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/prog/probe.go b/prog/probe.go index 9896e9f326..c06d6821dd 100644 --- a/prog/probe.go +++ b/prog/probe.go @@ -108,6 +108,21 @@ func probeMain(flags probeFlags, targets []appclient.Target) { logCensoredArgs() defer log.Info("probe exiting") + switch flags.kubernetesRole { + case "": // nothing special + case kubernetesRoleHost: + flags.kubernetesEnabled = true + case kubernetesRoleCluster: + flags.kubernetesKubeletPort = 0 + flags.kubernetesEnabled = true + flags.spyProcs = false + flags.procEnabled = false + flags.useConntrack = false + flags.useEbpfConn = false + default: + log.Warnf("unrecognized --probe.kubernetes.role: %s", flags.kubernetesRole) + } + if flags.spyProcs && os.Getegid() != 0 { log.Warn("--probe.proc.spy=true, but that requires root to find everything") } @@ -233,17 +248,6 @@ func probeMain(flags probeFlags, targets []appclient.Target) { defer endpointReporter.Stop() p.AddReporter(endpointReporter) - switch flags.kubernetesRole { - case "": // nothing special - case kubernetesRoleHost: - flags.kubernetesEnabled = true - case kubernetesRoleCluster: - flags.kubernetesKubeletPort = 0 - flags.kubernetesEnabled = true - default: - log.Warnf("unrecognized --probe.kubernetes.role: %s", flags.kubernetesRole) - } - if flags.dockerEnabled { // Don't add the bridge in Kubernetes since container IPs are global and // shouldn't be scoped From 0c394e689a25fc09692596c3c51311bb78abaf87 Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Fri, 12 Oct 2018 13:47:39 +0000 Subject: [PATCH 8/8] Fix pod host propagation --- render/pod.go | 26 +++++++++++++++----------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/render/pod.go b/render/pod.go index d03b71b13a..49b97be876 100644 --- a/render/pod.go +++ b/render/pod.go @@ -53,7 +53,7 @@ var PodRenderer = Memoise(ConditionalRenderer(renderKubernetesTopologies, }, MakeReduce( PropagateSingleMetrics(report.Container, - MakeMap(propagateHostID, + MakeMap(propagatePodHost, Map2Parent{topologies: []string{report.Pod}, noParentsPseudoID: UnmanagedID, chainRenderer: MakeFilter( ComposeFilterFuncs( @@ -70,21 +70,25 @@ var PodRenderer = Memoise(ConditionalRenderer(renderKubernetesTopologies, ), )) -// Pods are not tagged with a Host ID, but their container children are. -// If n doesn't already have a host ID, copy it from one of the children -func propagateHostID(n report.Node) report.Node { - if _, found := n.Latest.Lookup(report.HostNodeID); found { +// Pods are not tagged with a Host parent, but their container children are. +// If n doesn't already have a host, copy it from one of the children +func propagatePodHost(n report.Node) report.Node { + if n.Topology != report.Pod { + return n + } else if _, found := n.Parents.Lookup(report.Host); found { return n } - var first *report.Node + done := false n.Children.ForEach(func(child report.Node) { - if first == nil { - first = &child + if !done { + if hosts, found := child.Parents.Lookup(report.Host); found { + for _, h := range hosts { + n = n.WithParent(report.Host, h) + } + done = true + } } }) - if first != nil { - n.Latest = n.Latest.Propagate(first.Latest, report.HostNodeID) - } return n }