Skip to content

Commit

Permalink
Merge pull request #8 from build-security/kubebeat-with-data-and-fetc…
Browse files Browse the repository at this point in the history
…hers

Data, Fetcher, Fetcher implementations.
  • Loading branch information
yashtewari authored Nov 15, 2021
2 parents 8a66261 + 39d945a commit 5cf562a
Show file tree
Hide file tree
Showing 12 changed files with 619 additions and 72 deletions.
139 changes: 139 additions & 0 deletions kubebeat/beater/data.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
package beater

import (
"bytes"
"context"
"encoding/gob"
"fmt"
"time"

"github.com/elastic/beats/v7/libbeat/logp"
)

// Data maintains a cache that is updated by Fetcher implementations registered
// against it. It sends the cache to an output channel at the defined interval.
type Data struct {
interval time.Duration
output chan interface{}

ctx context.Context
cancel context.CancelFunc
state map[string]interface{}
fetchers map[string]Fetcher
}

// NewData returns a new Data instance with the given interval.
func NewData(ctx context.Context, interval time.Duration) *Data {
ctx, cancel := context.WithCancel(ctx)

return &Data{
interval: interval,
output: make(chan interface{}),
ctx: ctx,
cancel: cancel,
state: make(map[string]interface{}),
fetchers: make(map[string]Fetcher),
}
}

// Output returns the output channel.
func (d *Data) Output() <-chan interface{} {
return d.output
}

// RegisterFetcher registers a Fetcher implementation.
func (d *Data) RegisterFetcher(key string, f Fetcher) error {
if _, ok := d.fetchers[key]; ok {
return fmt.Errorf("fetcher key collision: %q is already registered", key)
}

d.fetchers[key] = f
return nil
}

// Run updates the cache using Fetcher implementations.
func (d *Data) Run() error {
updates := make(chan update)

for key, fetcher := range d.fetchers {
go d.fetchWorker(updates, key, fetcher)
}

go d.fetchManager(updates)

return nil
}

// update is a sigle update sent from a worker to a manager.
type update struct {
key string
val interface{}
}

func (d *Data) fetchWorker(updates chan update, k string, f Fetcher) {
for {
select {
case <-d.ctx.Done():
return
default:
val, err := f.Fetch()
if err != nil {
logp.L().Errorf("error running fetcher for key %q: %w", k, err)
}

updates <- update{k, val}
}
}
}

func (d *Data) fetchManager(updates chan update) {
ticker := time.NewTicker(d.interval)

for {
select {
case <-ticker.C:
// Generate input ID?

c, err := copy(d.state)
if err != nil {
logp.L().Errorf("could not copy data state: %w", err)
return
}

d.output <- c

case u := <-updates:
d.state[u.key] = u.val

case <-d.ctx.Done():
return
}
}
}

// Stop cleans up Data resources gracefully.
func (d *Data) Stop() {
d.cancel()

for key, fetcher := range d.fetchers {
fetcher.Stop()
logp.L().Infof("Fetcher for key %q stopped", key)
}
}

// copy makes a copy of the given map.
func copy(m map[string]interface{}) (map[string]interface{}, error) {
var buf bytes.Buffer
enc := gob.NewEncoder(&buf)
dec := gob.NewDecoder(&buf)
err := enc.Encode(m)
if err != nil {
return nil, err
}
var copy map[string]interface{}
err = dec.Decode(&copy)
if err != nil {
return nil, err
}
return copy, nil
}
7 changes: 7 additions & 0 deletions kubebeat/beater/fetcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package beater

// Fetcher represents a data fetcher.
type Fetcher interface {
Fetch() (interface{}, error)
Stop()
}
60 changes: 60 additions & 0 deletions kubebeat/beater/kube.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package beater

import (
"fmt"
"time"

"github.com/elastic/beats/v7/libbeat/common/kubernetes"
"github.com/elastic/beats/v7/libbeat/logp"
)

const ()

type KubeFetcher struct {
watcher kubernetes.Watcher
}

func NewKubeFetcher(kubeconfig string, interval time.Duration) (Fetcher, error) {
client, err := kubernetes.GetKubernetesClient(kubeconfig, kubernetes.KubeClientOptions{})
if err != nil {
return nil, fmt.Errorf("fail to get k8sclient client: %s", err.Error())
}

logp.Info("Client initiated.")

watchOptions := kubernetes.WatchOptions{
SyncTimeout: interval,
Namespace: "kube-system",
}

watcher, err := kubernetes.NewWatcher(client, &kubernetes.Pod{}, watchOptions, nil)
if err != nil {
return nil, fmt.Errorf("error creating k8s client set: %v", err)
}

logp.Info("Watcher initiated.")

return &KubeFetcher{
watcher: watcher,
}, nil
}

func (f *KubeFetcher) Fetch() (interface{}, error) {
pods := f.watcher.Store().List()

for _, p := range pods {
pod, ok := p.(*kubernetes.Pod)
if !ok {
logp.Info("could not convert to pod")
continue
}
pod.SetManagedFields(nil)
pod.Status.Reset()
pod.Kind = "Pod" // see https://github.com/kubernetes/kubernetes/issues/3030
}

return pods, nil
}

func (f *KubeFetcher) Stop() {
}
Loading

0 comments on commit 5cf562a

Please sign in to comment.