Skip to content

Commit

Permalink
Use add_kubernetes_metadata IP & port matching in Packetbeat (#5707)
Browse files Browse the repository at this point in the history
The code was there for Metricbeat already, this PR moves it to libbeat
and adapts Packetbeat to include it
  • Loading branch information
exekias authored and ruflin committed Nov 28, 2017
1 parent 220284f commit 3cb7fba
Show file tree
Hide file tree
Showing 9 changed files with 176 additions and 164 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ https://github.com/elastic/beats/compare/v6.0.0-beta2...master[Check the HEAD di
- Update to Golang 1.9.2
- Add number_of_routing_shards config set to 30 {pull}5570[5570]
- Set log level for kafka output. {pull}5397[5397]
- Moved `ip_port` indexer for `add_kubernetes_metadata` to all beats. {pull}5707[5707]

*Auditbeat*

Expand Down Expand Up @@ -124,6 +125,8 @@ https://github.com/elastic/beats/compare/v6.0.0-beta2...master[Check the HEAD di

*Packetbeat*

- Configure good defaults for `add_kubernetes_metadata`. {pull}5707[5707]

*Winlogbeat*

==== Deprecated
Expand Down
79 changes: 79 additions & 0 deletions libbeat/processors/add_kubernetes_metadata/indexers.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
const (
ContainerIndexerName = "container"
PodNameIndexerName = "pod_name"
IPPortIndexerName = "ip_port"
)

// Indexer take known pods and generate all the metadata we need to enrich
Expand Down Expand Up @@ -248,3 +249,81 @@ func containerID(status PodContainerStatus) string {
}
return ""
}

// IPPortIndexer indexes pods based on all their host:port combinations
type IPPortIndexer struct {
genMeta GenMeta
}

// NewIPPortIndexer creates and returns a new indexer for pod IP & ports
func NewIPPortIndexer(_ common.Config, genMeta GenMeta) (Indexer, error) {
return &IPPortIndexer{genMeta: genMeta}, nil
}

// GetMetadata returns metadata for the given pod, if it matches the index
func (h *IPPortIndexer) GetMetadata(pod *Pod) []MetadataIndex {
commonMeta := h.genMeta.GenerateMetaData(pod)
hostPorts := h.GetIndexes(pod)
var metadata []MetadataIndex

if pod.Status.PodIP == "" {
return metadata
}
for i := 0; i < len(hostPorts); i++ {
dobreak := false
containerMeta := commonMeta.Clone()
for _, container := range pod.Spec.Containers {
ports := container.Ports

for _, port := range ports {
if port.ContainerPort == int64(0) {
continue
}
if strings.Index(hostPorts[i], fmt.Sprintf("%s:%d", pod.Status.PodIP, port.ContainerPort)) != -1 {
containerMeta["container"] = common.MapStr{
"name": container.Name,
}
dobreak = true
break
}
}

if dobreak {
break
}

}

metadata = append(metadata, MetadataIndex{
Index: hostPorts[i],
Data: containerMeta,
})
}

return metadata
}

// GetIndexes returns the indexes for the given Pod
func (h *IPPortIndexer) GetIndexes(pod *Pod) []string {
var hostPorts []string

ip := pod.Status.PodIP
if ip == "" {
return hostPorts
}
for _, container := range pod.Spec.Containers {
ports := container.Ports

for _, port := range ports {
if port.ContainerPort != int64(0) {
hostPorts = append(hostPorts, fmt.Sprintf("%s:%d", ip, port.ContainerPort))
} else {
hostPorts = append(hostPorts, ip)
}

}

}

return hostPorts
}
66 changes: 66 additions & 0 deletions libbeat/processors/add_kubernetes_metadata/indexers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,3 +237,69 @@ func TestFilteredGenMetaExclusion(t *testing.T) {
ok, _ = labelMap.HasKey("x")
assert.Equal(t, ok, false)
}

func TestIpPortIndexer(t *testing.T) {
var testConfig = common.NewConfig()

ipIndexer, err := NewIPPortIndexer(*testConfig, metagen)
assert.Nil(t, err)

podName := "testpod"
ns := "testns"
container := "container"
ip := "1.2.3.4"
port := int64(80)
pod := Pod{
Metadata: ObjectMeta{
Name: podName,
Namespace: ns,
Labels: map[string]string{
"labelkey": "labelvalue",
},
},
Spec: PodSpec{
Containers: make([]Container, 0),
},

Status: PodStatus{
PodIP: ip,
},
}

indexers := ipIndexer.GetMetadata(&pod)
indices := ipIndexer.GetIndexes(&pod)
assert.Equal(t, len(indexers), 0)
assert.Equal(t, len(indices), 0)
expected := common.MapStr{
"pod": common.MapStr{
"name": "testpod",
},
"namespace": "testns",
"labels": common.MapStr{
"labelkey": "labelvalue",
},
}

pod.Spec.Containers = []Container{
{
Name: container,
Ports: []ContainerPort{
{
Name: container,
ContainerPort: port,
},
},
},
}
expected["container"] = common.MapStr{"name": container}

indexers = ipIndexer.GetMetadata(&pod)
assert.Equal(t, len(indexers), 1)
assert.Equal(t, indexers[0].Index, fmt.Sprintf("%s:%d", ip, port))

indices = ipIndexer.GetIndexes(&pod)
assert.Equal(t, len(indices), 1)
assert.Equal(t, indices[0], fmt.Sprintf("%s:%d", ip, port))

assert.Equal(t, expected.String(), indexers[0].Data.String())
}
1 change: 1 addition & 0 deletions libbeat/processors/add_kubernetes_metadata/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ func init() {
// Register default indexers
Indexing.AddIndexer(PodNameIndexerName, NewPodNameIndexer)
Indexing.AddIndexer(ContainerIndexerName, NewContainerIndexer)
Indexing.AddIndexer(IPPortIndexerName, NewIPPortIndexer)
Indexing.AddMatcher(FieldMatcherName, NewFieldMatcher)
Indexing.AddMatcher(FieldFormatMatcherName, NewFieldFormatMatcher)
}
Expand Down
2 changes: 1 addition & 1 deletion metricbeat/beater/metricbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
"github.com/elastic/beats/metricbeat/mb"
"github.com/elastic/beats/metricbeat/mb/module"

// Add metricbeat specific processors
// Add metricbeat default processors
_ "github.com/elastic/beats/metricbeat/processor/add_kubernetes_metadata"
)

Expand Down
85 changes: 1 addition & 84 deletions metricbeat/processor/add_kubernetes_metadata/indexers.go
Original file line number Diff line number Diff line change
@@ -1,24 +1,16 @@
package add_kubernetes_metadata

import (
"fmt"
"strings"

"github.com/elastic/beats/libbeat/common"
kubernetes "github.com/elastic/beats/libbeat/processors/add_kubernetes_metadata"
)

const (
IpPortIndexerName = "ip_port"
)

func init() {
// Register default indexers
kubernetes.Indexing.AddIndexer(IpPortIndexerName, NewIpPortIndexer)
cfg := common.NewConfig()

//Add IP Port Indexer as a default indexer
kubernetes.Indexing.AddDefaultIndexerConfig(IpPortIndexerName, *cfg)
kubernetes.Indexing.AddDefaultIndexerConfig(kubernetes.IPPortIndexerName, *cfg)

config := map[string]interface{}{
"lookup_fields": []string{"metricset.host"},
Expand All @@ -29,78 +21,3 @@ func init() {
kubernetes.Indexing.AddDefaultMatcherConfig(kubernetes.FieldMatcherName, *fieldCfg)
}
}

// IpPortIndexer indexes pods based on all their host:port combinations
type IpPortIndexer struct {
genMeta kubernetes.GenMeta
}

func NewIpPortIndexer(_ common.Config, genMeta kubernetes.GenMeta) (kubernetes.Indexer, error) {
return &IpPortIndexer{genMeta: genMeta}, nil
}

func (h *IpPortIndexer) GetMetadata(pod *kubernetes.Pod) []kubernetes.MetadataIndex {
commonMeta := h.genMeta.GenerateMetaData(pod)
hostPorts := h.GetIndexes(pod)
var metadata []kubernetes.MetadataIndex

if pod.Status.PodIP == "" {
return metadata
}
for i := 0; i < len(hostPorts); i++ {
dobreak := false
containerMeta := commonMeta.Clone()
for _, container := range pod.Spec.Containers {
ports := container.Ports

for _, port := range ports {
if port.ContainerPort == int64(0) {
continue
}
if strings.Index(hostPorts[i], fmt.Sprintf("%s:%d", pod.Status.PodIP, port.ContainerPort)) != -1 {
containerMeta["container"] = common.MapStr{
"name": container.Name,
}
dobreak = true
break
}
}

if dobreak {
break
}

}

metadata = append(metadata, kubernetes.MetadataIndex{
Index: hostPorts[i],
Data: containerMeta,
})
}

return metadata
}

func (h *IpPortIndexer) GetIndexes(pod *kubernetes.Pod) []string {
var hostPorts []string

ip := pod.Status.PodIP
if ip == "" {
return hostPorts
}
for _, container := range pod.Spec.Containers {
ports := container.Ports

for _, port := range ports {
if port.ContainerPort != int64(0) {
hostPorts = append(hostPorts, fmt.Sprintf("%s:%d", ip, port.ContainerPort))
} else {
hostPorts = append(hostPorts, ip)
}

}

}

return hostPorts
}
79 changes: 0 additions & 79 deletions metricbeat/processor/add_kubernetes_metadata/indexers_test.go

This file was deleted.

3 changes: 3 additions & 0 deletions packetbeat/beater/packetbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ import (
"github.com/elastic/beats/packetbeat/protos/udp"
"github.com/elastic/beats/packetbeat/publish"
"github.com/elastic/beats/packetbeat/sniffer"

// Add packetbeat default processors
_ "github.com/elastic/beats/packetbeat/processor/add_kubernetes_metadata"
)

// Beater object. Contains all objects needed to run the beat
Expand Down
Loading

0 comments on commit 3cb7fba

Please sign in to comment.