Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Naïve process walker for Darwin #320

Merged
merged 1 commit into from
Jul 20, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 16 additions & 24 deletions probe/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
_ "net/http/pprof"
"os"
"os/signal"
"runtime"
"strconv"
"syscall"
"time"
Expand All @@ -24,8 +23,6 @@ import (

var version = "dev" // set at build time

const linux = "linux" // runtime.GOOS

func main() {
var (
httpListen = flag.String("http.listen", "", "listen address for HTTP profiling and instrumentation server")
Expand All @@ -34,7 +31,7 @@ func main() {
listen = flag.String("listen", ":"+strconv.Itoa(xfer.ProbePort), "listen address")
prometheusEndpoint = flag.String("prometheus.endpoint", "/metrics", "Prometheus metrics exposition endpoint (requires -http.listen)")
spyProcs = flag.Bool("processes", true, "report processes (needs root)")
dockerEnabled = flag.Bool("docker", true, "collect Docker-related attributes for processes")
dockerEnabled = flag.Bool("docker", false, "collect Docker-related attributes for processes")
dockerInterval = flag.Duration("docker.interval", 10*time.Second, "how often to update Docker attributes")
dockerBridge = flag.String("docker.bridge", "docker0", "the docker bridge name")
weaveRouterAddr = flag.String("weave.router.addr", "", "IP address or FQDN of the Weave router")
Expand Down Expand Up @@ -82,25 +79,22 @@ func main() {
processCache *process.CachingWalker
)

// TODO provide an alternate implementation for Darwin.
if runtime.GOOS == linux {
processCache = process.NewCachingWalker(process.NewWalker(*procRoot))
reporters = append(reporters, process.NewReporter(processCache, hostID))

if *dockerEnabled {
if err = report.AddLocalBridge(*dockerBridge); err != nil {
log.Fatalf("failed to get docker bridge address: %v", err)
}
processCache = process.NewCachingWalker(process.NewWalker(*procRoot))
reporters = append(reporters, process.NewReporter(processCache, hostID))

dockerRegistry, err := docker.NewRegistry(*dockerInterval)
if err != nil {
log.Fatalf("failed to start docker registry: %v", err)
}
defer dockerRegistry.Stop()
if *dockerEnabled {
if err := report.AddLocalBridge(*dockerBridge); err != nil {
log.Fatalf("failed to get docker bridge address: %v", err)
}

taggers = append(taggers, docker.NewTagger(dockerRegistry, processCache))
reporters = append(reporters, docker.NewReporter(dockerRegistry, hostID))
dockerRegistry, err := docker.NewRegistry(*dockerInterval)
if err != nil {
log.Fatalf("failed to start docker registry: %v", err)
}
defer dockerRegistry.Stop()

taggers = append(taggers, docker.NewTagger(dockerRegistry, processCache))
reporters = append(reporters, docker.NewReporter(dockerRegistry, hostID))
}

if *weaveRouterAddr != "" {
Expand Down Expand Up @@ -131,10 +125,8 @@ func main() {
r = report.MakeReport()

case <-spyTick:
if processCache != nil {
if err := processCache.Update(); err != nil {
log.Printf("error reading processes: %v", err)
}
if err := processCache.Update(); err != nil {
log.Printf("error reading processes: %v", err)
}

for _, reporter := range reporters {
Expand Down
2 changes: 1 addition & 1 deletion probe/process/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func (r *Reporter) Report() (report.Report, error) {

func (r *Reporter) processTopology() (report.Topology, error) {
t := report.NewTopology()
err := r.walker.Walk(func(p *Process) {
err := r.walker.Walk(func(p Process) {
pidstr := strconv.Itoa(p.PID)
nodeID := report.MakeProcessNodeID(r.scope, pidstr)
t.NodeMetadatas[nodeID] = report.NodeMetadata{
Expand Down
6 changes: 3 additions & 3 deletions probe/process/reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ import (
)

type mockWalker struct {
processes []*process.Process
processes []process.Process
}

func (m *mockWalker) Walk(f func(*process.Process)) error {
func (m *mockWalker) Walk(f func(process.Process)) error {
for _, p := range m.processes {
f(p)
}
Expand All @@ -22,7 +22,7 @@ func (m *mockWalker) Walk(f func(*process.Process)) error {

func TestReporter(t *testing.T) {
walker := &mockWalker{
processes: []*process.Process{
processes: []process.Process{
{PID: 1, PPID: 0, Comm: "init"},
{PID: 2, PPID: 1, Comm: "bash"},
{PID: 3, PPID: 1, Comm: "apache", Threads: 2},
Expand Down
6 changes: 3 additions & 3 deletions probe/process/tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@ type Tree interface {
}

type tree struct {
processes map[int]*Process
processes map[int]Process
}

// NewTree returns a new Tree that can be polled.
func NewTree(walker Walker) (Tree, error) {
pt := tree{processes: map[int]*Process{}}
err := walker.Walk(func(p *Process) {
pt := tree{processes: map[int]Process{}}
err := walker.Walk(func(p Process) {
pt.processes[p.PID] = p
})

Expand Down
2 changes: 1 addition & 1 deletion probe/process/tree_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (

func TestTree(t *testing.T) {
walker := &mockWalker{
processes: []*process.Process{
processes: []process.Process{
{PID: 1, PPID: 0, Comm: "init"},
{PID: 2, PPID: 1, Comm: "bash"},
{PID: 3, PPID: 1, Comm: "apache", Threads: 2},
Expand Down
89 changes: 6 additions & 83 deletions probe/process/walker.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,6 @@
package process

import (
"bytes"
"io/ioutil"
"path"
"strconv"
"strings"
"sync"
)
import "sync"

// Process represents a single process.
type Process struct {
Expand All @@ -19,83 +12,13 @@ type Process struct {

// Walker is something that walks the /proc directory
type Walker interface {
Walk(func(*Process)) error
}

// Hooks exposed for mocking
var (
ReadDir = ioutil.ReadDir
ReadFile = ioutil.ReadFile
)

type walker struct {
procRoot string
}

// NewWalker creates a new process Walker
func NewWalker(procRoot string) Walker {
return &walker{procRoot: procRoot}
}

// Walk walks the supplied directory (expecting it to look like /proc)
// and marshalls the files into instances of Process, which it then
// passes one-by-one to the supplied function. Walk is only made public
// so that is can be tested.
func (w *walker) Walk(f func(*Process)) error {
dirEntries, err := ReadDir(w.procRoot)
if err != nil {
return err
}

for _, dirEntry := range dirEntries {
filename := dirEntry.Name()
pid, err := strconv.Atoi(filename)
if err != nil {
continue
}

stat, err := ReadFile(path.Join(w.procRoot, filename, "stat"))
if err != nil {
continue
}
splits := strings.Fields(string(stat))
ppid, err := strconv.Atoi(splits[3])
if err != nil {
return err
}

threads, err := strconv.Atoi(splits[19])
if err != nil {
return err
}

cmdline := ""
if cmdlineBuf, err := ReadFile(path.Join(w.procRoot, filename, "cmdline")); err == nil {
cmdlineBuf = bytes.Replace(cmdlineBuf, []byte{'\000'}, []byte{' '}, -1)
cmdline = string(cmdlineBuf)
}

comm := "(unknown)"
if commBuf, err := ReadFile(path.Join(w.procRoot, filename, "comm")); err == nil {
comm = strings.TrimSpace(string(commBuf))
}

f(&Process{
PID: pid,
PPID: ppid,
Comm: comm,
Cmdline: cmdline,
Threads: threads,
})
}

return nil
Walk(func(Process)) error
}

// CachingWalker is a walker than caches a copy of the output from another
// Walker, and then allows other concurrent readers to Walk that copy.
type CachingWalker struct {
cache []*Process
cache []Process
cacheLock sync.RWMutex
source Walker
}
Expand All @@ -106,7 +29,7 @@ func NewCachingWalker(source Walker) *CachingWalker {
}

// Walk walks a cached copy of process list
func (c *CachingWalker) Walk(f func(*Process)) error {
func (c *CachingWalker) Walk(f func(Process)) error {
c.cacheLock.RLock()
defer c.cacheLock.RUnlock()

Expand All @@ -118,8 +41,8 @@ func (c *CachingWalker) Walk(f func(*Process)) error {

// Update updates cached copy of process list
func (c *CachingWalker) Update() error {
newCache := []*Process{}
err := c.source.Walk(func(p *Process) {
newCache := []Process{}
err := c.source.Walk(func(p Process) {
newCache = append(newCache, p)
})
if err != nil {
Expand Down
93 changes: 93 additions & 0 deletions probe/process/walker_darwin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package process

import (
"fmt"
"os/exec"
"strconv"
"strings"
)

// NewWalker returns a Darwin (lsof-based) walker.
func NewWalker(_ string) Walker {
return &walker{}
}

type walker struct{}

const (
lsofBinary = "lsof"
lsofFields = "cn" // parseLSOF() depends on the order
)

// These functions copied from procspy.

func (walker) Walk(f func(Process)) error {
output, err := exec.Command(
lsofBinary,
"-i", // only Internet files
"-n", "-P", // no number resolving
"-w", // no warnings
"-F", lsofFields, // \n based output of only the fields we want.
).CombinedOutput()

This comment was marked as abuse.

if err != nil {
return err
}

processes, err := parseLSOF(string(output))
if err != nil {
return err
}

for _, process := range processes {
f(process)
}
return nil
}

func parseLSOF(output string) (map[string]Process, error) {
var (
processes = map[string]Process{} // Local addr -> Proc
process Process
)
for _, line := range strings.Split(output, "\n") {
if len(line) <= 1 {
continue
}

var (
field = line[0]
value = line[1:]
)
switch field {
case 'p':
pid, err := strconv.Atoi(value)
if err != nil {
return nil, fmt.Errorf("invalid 'p' field in lsof output: %#v", value)
}
process.PID = pid

case 'c':
process.Comm = value

case 'n':
// 'n' is the last field, with '-F cn'
// format examples:
// "192.168.2.111:44013->54.229.241.196:80"
// "[2003:45:2b57:8900:1869:2947:f942:aba7]:55711->[2a00:1450:4008:c01::11]:443"
// "*:111" <- a listen
addresses := strings.SplitN(value, "->", 2)
if len(addresses) != 2 {
// That's a listen entry.
continue
}
processes[addresses[0]] = Process{
PID: process.PID,
Comm: process.Comm,
}

default:
return nil, fmt.Errorf("unexpected lsof field: %c in %#v", field, value)
}
}
return processes, nil
}
Loading