diff --git a/kubebeat/beater/data.go b/kubebeat/beater/data.go new file mode 100644 index 000000000000..7958d127984c --- /dev/null +++ b/kubebeat/beater/data.go @@ -0,0 +1,139 @@ +package beater + +import ( + "bytes" + "context" + "encoding/gob" + "fmt" + "time" + + "github.com/elastic/beats/v7/libbeat/logp" +) + +// Data maintains a cache that is updated by Fetcher implementations registered +// against it. It sends the cache to an output channel at the defined interval. +type Data struct { + interval time.Duration + output chan interface{} + + ctx context.Context + cancel context.CancelFunc + state map[string]interface{} + fetchers map[string]Fetcher +} + +// NewData returns a new Data instance with the given interval. +func NewData(ctx context.Context, interval time.Duration) *Data { + ctx, cancel := context.WithCancel(ctx) + + return &Data{ + interval: interval, + output: make(chan interface{}), + ctx: ctx, + cancel: cancel, + state: make(map[string]interface{}), + fetchers: make(map[string]Fetcher), + } +} + +// Output returns the output channel. +func (d *Data) Output() <-chan interface{} { + return d.output +} + +// RegisterFetcher registers a Fetcher implementation. +func (d *Data) RegisterFetcher(key string, f Fetcher) error { + if _, ok := d.fetchers[key]; ok { + return fmt.Errorf("fetcher key collision: %q is already registered", key) + } + + d.fetchers[key] = f + return nil +} + +// Run updates the cache using Fetcher implementations. +func (d *Data) Run() error { + updates := make(chan update) + + for key, fetcher := range d.fetchers { + go d.fetchWorker(updates, key, fetcher) + } + + go d.fetchManager(updates) + + return nil +} + +// update is a sigle update sent from a worker to a manager. +type update struct { + key string + val interface{} +} + +func (d *Data) fetchWorker(updates chan update, k string, f Fetcher) { + for { + select { + case <-d.ctx.Done(): + return + default: + val, err := f.Fetch() + if err != nil { + logp.L().Errorf("error running fetcher for key %q: %w", k, err) + } + + updates <- update{k, val} + } + } +} + +func (d *Data) fetchManager(updates chan update) { + ticker := time.NewTicker(d.interval) + + for { + select { + case <-ticker.C: + // Generate input ID? + + c, err := copy(d.state) + if err != nil { + logp.L().Errorf("could not copy data state: %w", err) + return + } + + d.output <- c + + case u := <-updates: + d.state[u.key] = u.val + + case <-d.ctx.Done(): + return + } + } +} + +// Stop cleans up Data resources gracefully. +func (d *Data) Stop() { + d.cancel() + + for key, fetcher := range d.fetchers { + fetcher.Stop() + logp.L().Infof("Fetcher for key %q stopped", key) + } +} + +// copy makes a copy of the given map. +func copy(m map[string]interface{}) (map[string]interface{}, error) { + var buf bytes.Buffer + enc := gob.NewEncoder(&buf) + dec := gob.NewDecoder(&buf) + err := enc.Encode(m) + if err != nil { + return nil, err + } + var copy map[string]interface{} + err = dec.Decode(©) + if err != nil { + return nil, err + } + return copy, nil +} diff --git a/kubebeat/beater/fetcher.go b/kubebeat/beater/fetcher.go new file mode 100644 index 000000000000..b05939dd707e --- /dev/null +++ b/kubebeat/beater/fetcher.go @@ -0,0 +1,7 @@ +package beater + +// Fetcher represents a data fetcher. +type Fetcher interface { + Fetch() (interface{}, error) + Stop() +} diff --git a/kubebeat/beater/kube.go b/kubebeat/beater/kube.go new file mode 100644 index 000000000000..96b53f49b7fe --- /dev/null +++ b/kubebeat/beater/kube.go @@ -0,0 +1,60 @@ +package beater + +import ( + "fmt" + "time" + + "github.com/elastic/beats/v7/libbeat/common/kubernetes" + "github.com/elastic/beats/v7/libbeat/logp" +) + +const () + +type KubeFetcher struct { + watcher kubernetes.Watcher +} + +func NewKubeFetcher(kubeconfig string, interval time.Duration) (Fetcher, error) { + client, err := kubernetes.GetKubernetesClient(kubeconfig, kubernetes.KubeClientOptions{}) + if err != nil { + return nil, fmt.Errorf("fail to get k8sclient client: %s", err.Error()) + } + + logp.Info("Client initiated.") + + watchOptions := kubernetes.WatchOptions{ + SyncTimeout: interval, + Namespace: "kube-system", + } + + watcher, err := kubernetes.NewWatcher(client, &kubernetes.Pod{}, watchOptions, nil) + if err != nil { + return nil, fmt.Errorf("error creating k8s client set: %v", err) + } + + logp.Info("Watcher initiated.") + + return &KubeFetcher{ + watcher: watcher, + }, nil +} + +func (f *KubeFetcher) Fetch() (interface{}, error) { + pods := f.watcher.Store().List() + + for _, p := range pods { + pod, ok := p.(*kubernetes.Pod) + if !ok { + logp.Info("could not convert to pod") + continue + } + pod.SetManagedFields(nil) + pod.Status.Reset() + pod.Kind = "Pod" // see https://github.com/kubernetes/kubernetes/issues/3030 + } + + return pods, nil +} + +func (f *KubeFetcher) Stop() { +} diff --git a/kubebeat/beater/kubebeat.go b/kubebeat/beater/kubebeat.go index 439c792db4ca..3fb8f64735c5 100644 --- a/kubebeat/beater/kubebeat.go +++ b/kubebeat/beater/kubebeat.go @@ -4,14 +4,14 @@ import ( "bytes" "context" "fmt" + "time" + "github.com/elastic/beats/v7/kubebeat/bundle" "github.com/mitchellh/mapstructure" - "time" "github.com/elastic/beats/v7/kubebeat/config" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" - "github.com/elastic/beats/v7/libbeat/common/kubernetes" "github.com/elastic/beats/v7/libbeat/logp" "github.com/open-policy-agent/opa/sdk" sdktest "github.com/open-policy-agent/opa/sdk/test" @@ -22,38 +22,32 @@ type kubebeat struct { done chan struct{} config config.Config client beat.Client - watcher kubernetes.Watcher opa *sdk.OPA bundleServer *sdktest.Server + data *Data } // New creates an instance of kubebeat. func New(b *beat.Beat, cfg *common.Config) (beat.Beater, error) { + ctx := context.Background() + c := config.DefaultConfig if err := cfg.Unpack(&c); err != nil { - return nil, fmt.Errorf("error reading config file: %v", err) + return nil, fmt.Errorf("error reading config file: %w", err) } logp.Info("Config initiated.") - client, err := kubernetes.GetKubernetesClient(c.KubeConfig, kubernetes.KubeClientOptions{}) - if err != nil { - return nil, fmt.Errorf("fail to get k8sclient client: %s", err.Error()) - } - - logp.Info("Client initiated.") + data := NewData(ctx, c.Period) - watchOptions := kubernetes.WatchOptions{ - SyncTimeout: c.Period, - Namespace: "kube-system", - } + data.RegisterFetcher("processes", NewProcessesFetcher(procfsdir)) - watcher, err := kubernetes.NewWatcher(client, &kubernetes.Pod{}, watchOptions, nil) + kubef, err := NewKubeFetcher(c.KubeConfig, c.Period) if err != nil { - return nil, fmt.Errorf("error creating k8s client set: %v", err) + return nil, err } - logp.Info("Watcher initiated.") + data.RegisterFetcher("kube_api", kubef) // create a mock HTTP bundle bundleServer bundleServer, err := sdktest.NewServer(sdktest.MockBundle("/bundles/bundle.tar.gz", bundle.Policies)) @@ -79,7 +73,7 @@ func New(b *beat.Beat, cfg *common.Config) (beat.Beater, error) { config: c, opa: opa, bundleServer: bundleServer, - watcher: watcher, + data: data, } return bt, nil } @@ -91,7 +85,7 @@ type RuleResult struct { } type Finding struct { - Compliant bool `json:"compliant""` + Compliant bool `json:"compliant"` Message string `json:"message"` Resource interface{} `json:"resource"` } @@ -100,88 +94,73 @@ type Finding struct { func (bt *kubebeat) Run(b *beat.Beat) error { logp.Info("kubebeat is running! Hit CTRL-C to stop it.") - err := bt.watcher.Start() + err := bt.data.Run() if err != nil { return err } - bt.client, err = b.Publisher.Connect() - if err != nil { + if bt.client, err = b.Publisher.Connect(); err != nil { return err } - ticker := time.NewTicker(bt.config.Period) + // ticker := time.NewTicker(bt.config.Period) + output := bt.data.Output() + for { select { case <-bt.done: return nil - case <-ticker.C: - } - - pods := bt.watcher.Store().List() - events := make([]beat.Event, 0) - timestamp := time.Now() - - for _, p := range pods { - pod, ok := p.(*kubernetes.Pod) - if !ok { - logp.Info("could not convert to pod") - continue - } - pod.SetManagedFields(nil) - pod.Status.Reset() - pod.Kind = "Pod" // see https://github.com/kubernetes/kubernetes/issues/3030 + case o := <-output: + events := make([]beat.Event, 0) + timestamp := time.Now() - result, err := bt.Decision(pod) + result, err := bt.Decision(o) if err != nil { errEvent := beat.Event{ Timestamp: timestamp, Fields: common.MapStr{ "type": b.Info.Name, "err": fmt.Errorf("error running the policy: %v", err.Error()), - "resource": pod, - }, - } - events = append(events, errEvent) - continue - } - - var decoded PolicyResult - err = mapstructure.Decode(result, &decoded) - if err != nil { - errEvent := beat.Event{ - Timestamp: timestamp, - Fields: common.MapStr{ - "type": b.Info.Name, - "err": fmt.Errorf("error parsing the policy result: %v", err.Error()), - "resource": pod, - "raw_result": result, + "resource": o, }, } events = append(events, errEvent) - continue - } - - for ruleName, ruleResult := range decoded { - for _, Finding := range ruleResult.Findings { - event := beat.Event{ + } else { + var decoded PolicyResult + err = mapstructure.Decode(result, &decoded) + if err != nil { + errEvent := beat.Event{ Timestamp: timestamp, Fields: common.MapStr{ - "type": b.Info.Name, - "rule_id": ruleName, - "compliant": Finding.Compliant, - "resource": Finding.Resource, - "message": Finding.Message, + "type": b.Info.Name, + "err": fmt.Errorf("error parsing the policy result: %v", err.Error()), + "resource": o, + "raw_result": result, }, } - events = append(events, event) + events = append(events, errEvent) + } else { + for ruleName, ruleResult := range decoded { + for _, Finding := range ruleResult.Findings { + event := beat.Event{ + Timestamp: timestamp, + Fields: common.MapStr{ + "type": b.Info.Name, + "rule_id": ruleName, + "compliant": Finding.Compliant, + "resource": Finding.Resource, + "message": Finding.Message, + }, + } + events = append(events, event) + } + } } } + bt.client.PublishAll(events) + logp.Info("%v events sent", len(events)) } - - bt.client.PublishAll(events) - logp.Info("%v events sent", len(events)) } } diff --git a/kubebeat/beater/process.go b/kubebeat/beater/process.go new file mode 100644 index 000000000000..b61dbe1c9a10 --- /dev/null +++ b/kubebeat/beater/process.go @@ -0,0 +1,55 @@ +package beater + +import ( + "github.com/elastic/beats/v7/kubebeat/proc" +) + +const ( + procfsdir = "/hostfs" +) + +type Process struct { + PID string `json:"pid"` + Cmd string `json:"cmd"` + Stat proc.ProcStat `json:"stat"` +} + +type ProcessesFetcher struct { + directory string // parent directory of target procfs +} + +func NewProcessesFetcher(dir string) Fetcher { + return &ProcessesFetcher{ + directory: dir, + } +} + +func (f *ProcessesFetcher) Fetch() (interface{}, error) { + pids, err := proc.List(f.directory) + if err != nil { + return nil, err + } + + ret := make(map[string]Process) + + // If errors occur during read, then return what we have till now + // without reporting errors. + for _, p := range pids { + cmd, err := proc.ReadCmdLine(f.directory, p) + if err != nil { + return ret, nil + } + + stat, err := proc.ReadStat(f.directory, p) + if err != nil { + return ret, nil + } + + ret[p] = Process{p, cmd, stat} + } + + return ret, nil +} + +func (f *ProcessesFetcher) Stop() { +} diff --git a/kubebeat/proc/cmdline.go b/kubebeat/proc/cmdline.go new file mode 100644 index 000000000000..c72dd43a511f --- /dev/null +++ b/kubebeat/proc/cmdline.go @@ -0,0 +1,24 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package proc + +import ( + "bytes" + "io/ioutil" + "strings" +) + +func ReadCmdLine(root string, pid string) (string, error) { + fn := getProcAttr(root, pid, "cmdline") + + b, err := ioutil.ReadFile(fn) + if err != nil { + return "", err + } + + b = bytes.ReplaceAll(b, []byte{0}, []byte{' '}) + + return strings.TrimSpace(string(b)), nil +} diff --git a/kubebeat/proc/io.go b/kubebeat/proc/io.go new file mode 100644 index 000000000000..e57af9b9eae0 --- /dev/null +++ b/kubebeat/proc/io.go @@ -0,0 +1,57 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package proc + +import ( + "bytes" + "io/ioutil" + "strings" +) + +type ProcIO struct { + ReadBytes string + WriteBytes string + CancelledWriteBytes string +} + +// ReadProcStat reads proccess stats from /proc//io. +// The parsing code logic is borrowed from osquery C++ implementation and translated to Go. +// This makes the data returned from the `host_processes` table +// consistent with data returned from the original osquery `processes` table. +// https://github.com/osquery/osquery/blob/master/osquery/tables/system/linux/processes.cpp +func ReadIO(root string, pid string) (procio ProcIO, err error) { + // Proc IO example + // rchar: 1527371144 + // wchar: 1495591102 + // syscr: 481186 + // syscw: 255942 + // read_bytes: 14401536 + // write_bytes: 815329280 + // cancelled_write_bytes: 40976384 + fn := getProcAttr(root, pid, "io") + b, err := ioutil.ReadFile(fn) + if err != nil { + return + } + + lines := bytes.Split(b, []byte{'\n'}) + for _, line := range lines { + detail := bytes.SplitN(line, []byte{':'}, 2) + if len(detail) != 2 { + continue + } + + k := strings.TrimSpace(bytesToString(detail[0])) + switch k { + case "read_bytes": + procio.ReadBytes = strings.TrimSpace(bytesToString(detail[1])) + case "write_bytes": + procio.WriteBytes = strings.TrimSpace(bytesToString(detail[1])) + case "cancelled_write_bytes": + procio.CancelledWriteBytes = strings.TrimSpace(bytesToString(detail[1])) + } + } + return procio, nil +} diff --git a/kubebeat/proc/link.go b/kubebeat/proc/link.go new file mode 100644 index 000000000000..4725af9e1aab --- /dev/null +++ b/kubebeat/proc/link.go @@ -0,0 +1,19 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package proc + +import ( + "os" +) + +func ReadLink(root string, pid string, attr string) (string, error) { + fn := getProcAttr(root, pid, attr) + + s, err := os.Readlink(fn) + if err != nil { + return "", err + } + return s, nil +} diff --git a/kubebeat/proc/list.go b/kubebeat/proc/list.go new file mode 100644 index 000000000000..be24adc292f2 --- /dev/null +++ b/kubebeat/proc/list.go @@ -0,0 +1,40 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package proc + +import ( + "os" + "path/filepath" + "strconv" +) + +func List(root string) ([]string, error) { + var pids []string + + root = filepath.Join(root, "/proc") + + dirs, err := os.ReadDir(root) + + if err != nil { + return nil, err + } + + for _, dir := range dirs { + if !dir.IsDir() { + continue + } + + name := dir.Name() + // Check if directory is number + _, err := strconv.Atoi(name) + if err != nil { + err = nil + continue + } + pids = append(pids, name) + } + + return pids, nil +} diff --git a/kubebeat/proc/stat.go b/kubebeat/proc/stat.go new file mode 100644 index 000000000000..f36aa3461f0e --- /dev/null +++ b/kubebeat/proc/stat.go @@ -0,0 +1,120 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package proc + +import ( + "bytes" + "errors" + "io/ioutil" + "path/filepath" + "strings" +) + +var ( + ErrInvalidProcStatHeader = errors.New("invalid /proc/stat header") + ErrInvalidProcStatContent = errors.New("invalid /proc/stat content") +) + +type ProcStat struct { + Name string + RealUID string + RealGID string + EffectiveUID string + EffectiveGID string + SavedUID string + SavedGID string + ResidentSize string + TotalSize string + State string + Parent string + Group string + Nice string + Threads string + UserTime string + SystemTime string + StartTime string +} + +func getProcAttr(root, pid, attr string) string { + return filepath.Join(root, "/proc", pid, attr) +} + +// ReadProcStat reads proccess stats from /proc//stat. +// The parsing code logic is borrowed from osquery C++ implementation and translated to Go. +// This makes the data returned from the `host_processes` table +// consistent with data returned from the original osquery `processes` table. +// https://github.com/osquery/osquery/blob/master/osquery/tables/system/linux/processes.cpp +func ReadStat(root string, pid string) (stat ProcStat, err error) { + fn := getProcAttr(root, pid, "stat") + b, err := ioutil.ReadFile(fn) + if err != nil { + return + } + // Proc stat example + // 6462 (bash) S 6402 6462 6462 34817 37849 4194304 14126 901131 0 191 15 9 3401 725 20 0 1 0 134150 20156416 1369 18446744073709551615 94186936238080 94186936960773 140723699470016 0 0 0 65536 3670020 1266777851 1 0 0 17 7 0 0 0 0 0 94186937191664 94186937239044 94186967023616 140723699476902 140723699476912 140723699476912 140723699478510 0 + pos := bytes.IndexByte(b, ')') + if pos == -1 { + return stat, ErrInvalidProcStatHeader + } + + content := bytesToString(b[pos+2:]) + details := strings.Split(content, " ") + if len(details) < 19 { + return stat, ErrInvalidProcStatContent + } + + stat.State = details[0] + stat.Parent = details[1] + stat.Group = details[2] + stat.UserTime = details[11] + stat.SystemTime = details[12] + stat.Nice = details[16] + stat.Threads = details[17] + stat.StartTime = details[19] + + fn = getProcAttr(root, pid, "status") + b, err = ioutil.ReadFile(fn) + if err != nil { + return + } + + lines := bytes.Split(b, []byte{'\n'}) + for _, line := range lines { + detail := bytes.SplitN(line, []byte{':'}, 2) + if len(detail) != 2 { + continue + } + + k := strings.TrimSpace(bytesToString(detail[0])) + v := bytesToString(detail[1]) + switch k { + case "Name": + stat.Name = strings.TrimSpace(v) + case "VmRSS": + if len(v) >= 3 { + stat.ResidentSize = strings.TrimSpace(v[:len(v)-3] + "000") + } + case "VmSize": + if len(v) >= 3 { + stat.TotalSize = strings.TrimSpace(v[:len(v)-3] + "000") + } + case "Gid": + arr := strings.Split(v, "\t") + if len(arr) == 4 { + stat.RealGID = arr[0] + stat.EffectiveGID = arr[1] + stat.SavedGID = arr[2] + } + case "Uid": + arr := strings.Split(v, "\t") + if len(arr) == 4 { + stat.RealUID = arr[0] + stat.EffectiveUID = arr[1] + stat.SavedUID = arr[2] + } + } + } + return stat, err +} diff --git a/kubebeat/proc/strconv.go b/kubebeat/proc/strconv.go new file mode 100644 index 000000000000..320b7f5da223 --- /dev/null +++ b/kubebeat/proc/strconv.go @@ -0,0 +1,11 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package proc + +import "unsafe" + +func bytesToString(b []byte) string { + return *(*string)(unsafe.Pointer(&b)) +} diff --git a/kubebeat/proc/uptime.go b/kubebeat/proc/uptime.go new file mode 100644 index 000000000000..1ede0601762f --- /dev/null +++ b/kubebeat/proc/uptime.go @@ -0,0 +1,36 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package proc + +import ( + "bytes" + "errors" + "io/ioutil" + "path/filepath" + "strconv" +) + +var ( + ErrInvalidProcUptimecontent = errors.New("invalid /proc/uptime content") +) + +// Reads system uptime from /proc/uptime +func ReadUptime(root string) (secs int64, err error) { + fp := filepath.Join(root, "/proc/uptime") + b, err := ioutil.ReadFile(fp) + if err != nil { + return + } + detail := bytes.Split(b, []byte{' '}) + if len(detail) != 2 { + return secs, ErrInvalidProcUptimecontent + } + + num, err := strconv.ParseFloat(bytesToString(detail[0]), 64) + if err != nil { + return secs, ErrInvalidProcUptimecontent + } + return int64(num), nil +}