Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Kubernetes node name to filter pods if possible #2556

Merged
merged 2 commits into from
Jul 14, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 17 additions & 14 deletions probe/kubernetes/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,18 +111,20 @@ type Reporter struct {
probe *probe.Probe
hostID string
handlerRegistry *controls.HandlerRegistry
nodeName string
kubeletPort uint
}

// NewReporter makes a new Reporter
func NewReporter(client Client, pipes controls.PipeClient, probeID string, hostID string, probe *probe.Probe, handlerRegistry *controls.HandlerRegistry, kubeletPort uint) *Reporter {
func NewReporter(client Client, pipes controls.PipeClient, probeID string, hostID string, probe *probe.Probe, handlerRegistry *controls.HandlerRegistry, nodeName string, kubeletPort uint) *Reporter {

This comment was marked as abuse.

This comment was marked as abuse.

reporter := &Reporter{
client: client,
pipes: pipes,
probeID: probeID,
probe: probe,
hostID: hostID,
handlerRegistry: handlerRegistry,
nodeName: nodeName,

This comment was marked as abuse.

This comment was marked as abuse.

This comment was marked as abuse.

kubeletPort: kubeletPort,
}
reporter.registerControls()
Expand Down Expand Up @@ -424,21 +426,22 @@ func (r *Reporter) podTopology(services []Service, replicaSets []ReplicaSet, dae
))
}

// Obtain the local pods from kubelet since we only want to report those
// for performance reasons.
//
// In theory a simpler approach would be to obtain the current NodeName
// and filter local pods based on that. However that's hard since
// 1. reconstructing the NodeName requires cloud provider credentials
// 2. inferring the NodeName out of the hostname or system uuid is unreliable
// (uuids and hostnames can be duplicated across the cluster).
localPodUIDs, errUIDs := GetLocalPodUIDs(fmt.Sprintf("127.0.0.1:%d", r.kubeletPort))
if errUIDs != nil {
log.Warnf("Cannot obtain local pods, reporting all (which may impact performance): %v", errUIDs)
var localPodUIDs map[string]struct{}
if r.nodeName == "" {
// We don't know the node name: fall back to obtaining the local pods from kubelet
var err error
localPodUIDs, err = GetLocalPodUIDs(fmt.Sprintf("127.0.0.1:%d", r.kubeletPort))
if err != nil {
log.Warnf("No node name and cannot obtain local pods, reporting all (which may impact performance): %v", err)
}
}
err := r.client.WalkPods(func(p Pod) error {
// filter out non-local pods
if errUIDs == nil {
// filter out non-local pods: we only want to report local ones for performance reasons.
if r.nodeName != "" {
if p.NodeName() != r.nodeName {
return nil
}
} else if localPodUIDs != nil {
if _, ok := localPodUIDs[p.UID()]; !ok {
return nil
}
Expand Down
6 changes: 3 additions & 3 deletions probe/kubernetes/reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func TestReporter(t *testing.T) {
pod2ID := report.MakePodNodeID(pod2UID)
serviceID := report.MakeServiceNodeID(serviceUID)
hr := controls.NewDefaultHandlerRegistry()
rpt, _ := kubernetes.NewReporter(newMockClient(), nil, "", "foo", nil, hr, 0).Report()
rpt, _ := kubernetes.NewReporter(newMockClient(), nil, "", "foo", nil, hr, "", 0).Report()

// Reporter should have added the following pods
for _, pod := range []struct {
Expand Down Expand Up @@ -255,7 +255,7 @@ func TestTagger(t *testing.T) {
}))

hr := controls.NewDefaultHandlerRegistry()
rpt, err := kubernetes.NewReporter(newMockClient(), nil, "", "", nil, hr, 0).Tag(rpt)
rpt, err := kubernetes.NewReporter(newMockClient(), nil, "", "", nil, hr, "", 0).Tag(rpt)
if err != nil {
t.Errorf("Unexpected error: %v", err)
}
Expand Down Expand Up @@ -284,7 +284,7 @@ func TestReporterGetLogs(t *testing.T) {
client := newMockClient()
pipes := mockPipeClient{}
hr := controls.NewDefaultHandlerRegistry()
reporter := kubernetes.NewReporter(client, pipes, "", "", nil, hr, 0)
reporter := kubernetes.NewReporter(client, pipes, "", "", nil, hr, "", 0)

// Should error on invalid IDs
{
Expand Down
2 changes: 2 additions & 0 deletions prog/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ type probeFlags struct {
dockerBridge string

kubernetesEnabled bool
kubernetesNodeName string
kubernetesClientConfig kubernetes.ClientConfig
kubernetesKubeletPort uint

Expand Down Expand Up @@ -314,6 +315,7 @@ func setupFlags(flags *flags) {
flag.StringVar(&flags.probe.kubernetesClientConfig.Token, kubernetesTokenFlag, "", "Bearer token for authentication to the API server")
flag.StringVar(&flags.probe.kubernetesClientConfig.User, "probe.kubernetes.user", "", "The name of the kubeconfig user to use")
flag.StringVar(&flags.probe.kubernetesClientConfig.Username, "probe.kubernetes.username", "", "Username for basic authentication to the API server")
flag.StringVar(&flags.probe.kubernetesNodeName, "probe.kubernetes.node-name", "", "Name of this node, for filtering pods")
flag.UintVar(&flags.probe.kubernetesKubeletPort, "probe.kubernetes.kubelet-port", 10255, "Node-local TCP port for contacting kubelet")

// AWS ECS
Expand Down
2 changes: 1 addition & 1 deletion prog/probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ func probeMain(flags probeFlags, targets []appclient.Target) {
if flags.kubernetesEnabled {
if client, err := kubernetes.NewClient(flags.kubernetesClientConfig); err == nil {
defer client.Stop()
reporter := kubernetes.NewReporter(client, clients, probeID, hostID, p, handlerRegistry, flags.kubernetesKubeletPort)
reporter := kubernetes.NewReporter(client, clients, probeID, hostID, p, handlerRegistry, flags.kubernetesNodeName, flags.kubernetesKubeletPort)
defer reporter.Stop()
p.AddReporter(reporter)
p.AddTagger(reporter)
Expand Down