Skip to content

Commit

Permalink
Naïve process walker for Darwin
Browse files Browse the repository at this point in the history
This fixes the regression where process names weren't appearing for
Darwin probes. Makes testing easier.

Also, changes the process walker to operate on value types. There's no
performance advantage to using reference types for something of this
size, and there appeared to be a data race in the Darwin port that
caused nodes to gain and lose process names over time.

Also, restructures how to enable docker scraping. Default false when run
manually, and enabled via --probe.docker true in the scope script.
  • Loading branch information
peterbourgon committed Jul 16, 2015
1 parent c8079da commit b585a36
Show file tree
Hide file tree
Showing 11 changed files with 300 additions and 204 deletions.
40 changes: 16 additions & 24 deletions probe/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
_ "net/http/pprof"
"os"
"os/signal"
"runtime"
"strconv"
"syscall"
"time"
Expand All @@ -24,8 +23,6 @@ import (

var version = "dev" // set at build time

const linux = "linux" // runtime.GOOS

func main() {
var (
httpListen = flag.String("http.listen", "", "listen address for HTTP profiling and instrumentation server")
Expand All @@ -34,7 +31,7 @@ func main() {
listen = flag.String("listen", ":"+strconv.Itoa(xfer.ProbePort), "listen address")
prometheusEndpoint = flag.String("prometheus.endpoint", "/metrics", "Prometheus metrics exposition endpoint (requires -http.listen)")
spyProcs = flag.Bool("processes", true, "report processes (needs root)")
dockerEnabled = flag.Bool("docker", true, "collect Docker-related attributes for processes")
dockerEnabled = flag.Bool("docker", false, "collect Docker-related attributes for processes")
dockerInterval = flag.Duration("docker.interval", 10*time.Second, "how often to update Docker attributes")
dockerBridge = flag.String("docker.bridge", "docker0", "the docker bridge name")
weaveRouterAddr = flag.String("weave.router.addr", "", "IP address or FQDN of the Weave router")
Expand Down Expand Up @@ -82,25 +79,22 @@ func main() {
processCache *process.CachingWalker
)

// TODO provide an alternate implementation for Darwin.
if runtime.GOOS == linux {
processCache = process.NewCachingWalker(process.NewWalker(*procRoot))
reporters = append(reporters, process.NewReporter(processCache, hostID))

if *dockerEnabled {
if err = report.AddLocalBridge(*dockerBridge); err != nil {
log.Fatalf("failed to get docker bridge address: %v", err)
}
processCache = process.NewCachingWalker(process.NewWalker(*procRoot))
reporters = append(reporters, process.NewReporter(processCache, hostID))

dockerRegistry, err := docker.NewRegistry(*dockerInterval)
if err != nil {
log.Fatalf("failed to start docker registry: %v", err)
}
defer dockerRegistry.Stop()
if *dockerEnabled {
if err := report.AddLocalBridge(*dockerBridge); err != nil {
log.Fatalf("failed to get docker bridge address: %v", err)
}

taggers = append(taggers, docker.NewTagger(dockerRegistry, processCache))
reporters = append(reporters, docker.NewReporter(dockerRegistry, hostID))
dockerRegistry, err := docker.NewRegistry(*dockerInterval)
if err != nil {
log.Fatalf("failed to start docker registry: %v", err)
}
defer dockerRegistry.Stop()

taggers = append(taggers, docker.NewTagger(dockerRegistry, processCache))
reporters = append(reporters, docker.NewReporter(dockerRegistry, hostID))
}

if *weaveRouterAddr != "" {
Expand Down Expand Up @@ -131,10 +125,8 @@ func main() {
r = report.MakeReport()

case <-spyTick:
if processCache != nil {
if err := processCache.Update(); err != nil {
log.Printf("error reading processes: %v", err)
}
if err := processCache.Update(); err != nil {
log.Printf("error reading processes: %v", err)
}

for _, reporter := range reporters {
Expand Down
2 changes: 1 addition & 1 deletion probe/process/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func (r *Reporter) Report() (report.Report, error) {

func (r *Reporter) processTopology() (report.Topology, error) {
t := report.NewTopology()
err := r.walker.Walk(func(p *Process) {
err := r.walker.Walk(func(p Process) {
pidstr := strconv.Itoa(p.PID)
nodeID := report.MakeProcessNodeID(r.scope, pidstr)
t.NodeMetadatas[nodeID] = report.NodeMetadata{
Expand Down
6 changes: 3 additions & 3 deletions probe/process/reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ import (
)

type mockWalker struct {
processes []*process.Process
processes []process.Process
}

func (m *mockWalker) Walk(f func(*process.Process)) error {
func (m *mockWalker) Walk(f func(process.Process)) error {
for _, p := range m.processes {
f(p)
}
Expand All @@ -22,7 +22,7 @@ func (m *mockWalker) Walk(f func(*process.Process)) error {

func TestReporter(t *testing.T) {
walker := &mockWalker{
processes: []*process.Process{
processes: []process.Process{
{PID: 1, PPID: 0, Comm: "init"},
{PID: 2, PPID: 1, Comm: "bash"},
{PID: 3, PPID: 1, Comm: "apache", Threads: 2},
Expand Down
6 changes: 3 additions & 3 deletions probe/process/tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@ type Tree interface {
}

type tree struct {
processes map[int]*Process
processes map[int]Process
}

// NewTree returns a new Tree that can be polled.
func NewTree(walker Walker) (Tree, error) {
pt := tree{processes: map[int]*Process{}}
err := walker.Walk(func(p *Process) {
pt := tree{processes: map[int]Process{}}
err := walker.Walk(func(p Process) {
pt.processes[p.PID] = p
})

Expand Down
2 changes: 1 addition & 1 deletion probe/process/tree_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (

func TestTree(t *testing.T) {
walker := &mockWalker{
processes: []*process.Process{
processes: []process.Process{
{PID: 1, PPID: 0, Comm: "init"},
{PID: 2, PPID: 1, Comm: "bash"},
{PID: 3, PPID: 1, Comm: "apache", Threads: 2},
Expand Down
89 changes: 6 additions & 83 deletions probe/process/walker.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,6 @@
package process

import (
"bytes"
"io/ioutil"
"path"
"strconv"
"strings"
"sync"
)
import "sync"

// Process represents a single process.
type Process struct {
Expand All @@ -19,83 +12,13 @@ type Process struct {

// Walker is something that walks the /proc directory
type Walker interface {
Walk(func(*Process)) error
}

// Hooks exposed for mocking
var (
ReadDir = ioutil.ReadDir
ReadFile = ioutil.ReadFile
)

type walker struct {
procRoot string
}

// NewWalker creates a new process Walker
func NewWalker(procRoot string) Walker {
return &walker{procRoot: procRoot}
}

// Walk walks the supplied directory (expecting it to look like /proc)
// and marshalls the files into instances of Process, which it then
// passes one-by-one to the supplied function. Walk is only made public
// so that is can be tested.
func (w *walker) Walk(f func(*Process)) error {
dirEntries, err := ReadDir(w.procRoot)
if err != nil {
return err
}

for _, dirEntry := range dirEntries {
filename := dirEntry.Name()
pid, err := strconv.Atoi(filename)
if err != nil {
continue
}

stat, err := ReadFile(path.Join(w.procRoot, filename, "stat"))
if err != nil {
continue
}
splits := strings.Fields(string(stat))
ppid, err := strconv.Atoi(splits[3])
if err != nil {
return err
}

threads, err := strconv.Atoi(splits[19])
if err != nil {
return err
}

cmdline := ""
if cmdlineBuf, err := ReadFile(path.Join(w.procRoot, filename, "cmdline")); err == nil {
cmdlineBuf = bytes.Replace(cmdlineBuf, []byte{'\000'}, []byte{' '}, -1)
cmdline = string(cmdlineBuf)
}

comm := "(unknown)"
if commBuf, err := ReadFile(path.Join(w.procRoot, filename, "comm")); err == nil {
comm = strings.TrimSpace(string(commBuf))
}

f(&Process{
PID: pid,
PPID: ppid,
Comm: comm,
Cmdline: cmdline,
Threads: threads,
})
}

return nil
Walk(func(Process)) error
}

// CachingWalker is a walker than caches a copy of the output from another
// Walker, and then allows other concurrent readers to Walk that copy.
type CachingWalker struct {
cache []*Process
cache []Process
cacheLock sync.RWMutex
source Walker
}
Expand All @@ -106,7 +29,7 @@ func NewCachingWalker(source Walker) *CachingWalker {
}

// Walk walks a cached copy of process list
func (c *CachingWalker) Walk(f func(*Process)) error {
func (c *CachingWalker) Walk(f func(Process)) error {
c.cacheLock.RLock()
defer c.cacheLock.RUnlock()

Expand All @@ -118,8 +41,8 @@ func (c *CachingWalker) Walk(f func(*Process)) error {

// Update updates cached copy of process list
func (c *CachingWalker) Update() error {
newCache := []*Process{}
err := c.source.Walk(func(p *Process) {
newCache := []Process{}
err := c.source.Walk(func(p Process) {
newCache = append(newCache, p)
})
if err != nil {
Expand Down
93 changes: 93 additions & 0 deletions probe/process/walker_darwin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package process

import (
"fmt"
"os/exec"
"strconv"
"strings"
)

// NewWalker returns a Darwin (lsof-based) walker.
func NewWalker(_ string) Walker {
return &walker{}
}

type walker struct{}

const (
lsofBinary = "lsof"
lsofFields = "cn" // parseLSOF() depends on the order
)

// These functions copied from procspy.

func (walker) Walk(f func(Process)) error {
output, err := exec.Command(
lsofBinary,
"-i", // only Internet files
"-n", "-P", // no number resolving
"-w", // no warnings
"-F", lsofFields, // \n based output of only the fields we want.
).CombinedOutput()
if err != nil {
return err
}

processes, err := parseLSOF(string(output))
if err != nil {
return err
}

for _, process := range processes {
f(process)
}
return nil
}

func parseLSOF(output string) (map[string]Process, error) {
var (
processes = map[string]Process{} // Local addr -> Proc
process Process
)
for _, line := range strings.Split(output, "\n") {
if len(line) <= 1 {
continue
}

var (
field = line[0]
value = line[1:]
)
switch field {
case 'p':
pid, err := strconv.Atoi(value)
if err != nil {
return nil, fmt.Errorf("invalid 'p' field in lsof output: %#v", value)
}
process.PID = pid

case 'c':
process.Comm = value

case 'n':
// 'n' is the last field, with '-F cn'
// format examples:
// "192.168.2.111:44013->54.229.241.196:80"
// "[2003:45:2b57:8900:1869:2947:f942:aba7]:55711->[2a00:1450:4008:c01::11]:443"
// "*:111" <- a listen
addresses := strings.SplitN(value, "->", 2)
if len(addresses) != 2 {
// That's a listen entry.
continue
}
processes[addresses[0]] = Process{
PID: process.PID,
Comm: process.Comm,
}

default:
return nil, fmt.Errorf("unexpected lsof field: %c in %#v", field, value)
}
}
return processes, nil
}
Loading

0 comments on commit b585a36

Please sign in to comment.