Skip to content

Commit

Permalink
Merge pull request #287 from tomwilkie/284-cpu-usage
Browse files Browse the repository at this point in the history
Cache the walk of the process tree, reusing it in docker tagger and process reporter.
  • Loading branch information
tomwilkie committed Jun 29, 2015
2 parents 7e76fd3 + b9e968f commit 40b5fa5
Show file tree
Hide file tree
Showing 9 changed files with 178 additions and 64 deletions.
12 changes: 6 additions & 6 deletions probe/docker/tagger.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,21 @@ var (
// Tagger is a tagger that tags Docker container information to process
// nodes that have a PID.
type Tagger struct {
procRoot string
registry Registry
registry Registry
procWalker process.Walker
}

// NewTagger returns a usable Tagger.
func NewTagger(registry Registry, procRoot string) *Tagger {
func NewTagger(registry Registry, procWalker process.Walker) *Tagger {
return &Tagger{
registry: registry,
procRoot: procRoot,
registry: registry,
procWalker: procWalker,
}
}

// Tag implements Tagger.
func (t *Tagger) Tag(r report.Report) (report.Report, error) {
tree, err := NewProcessTreeStub(t.procRoot)
tree, err := NewProcessTreeStub(t.procWalker)
if err != nil {
return report.MakeReport(), err
}
Expand Down
4 changes: 2 additions & 2 deletions probe/docker/tagger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func TestTagger(t *testing.T) {
oldProcessTree := docker.NewProcessTreeStub
defer func() { docker.NewProcessTreeStub = oldProcessTree }()

docker.NewProcessTreeStub = func(procRoot string) (process.Tree, error) {
docker.NewProcessTreeStub = func(_ process.Walker) (process.Tree, error) {
return &mockProcessTree{map[int]int{2: 1}}, nil
}

Expand All @@ -45,7 +45,7 @@ func TestTagger(t *testing.T) {
want.Process.NodeMetadatas[pid1NodeID] = report.NodeMetadata{"pid": "1"}.Merge(wantNodeMetadata)
want.Process.NodeMetadatas[pid2NodeID] = report.NodeMetadata{"pid": "2"}.Merge(wantNodeMetadata)

tagger := docker.NewTagger(mockRegistryInstance, "/irrelevant")
tagger := docker.NewTagger(mockRegistryInstance, nil)
have, err := tagger.Tag(input)
if err != nil {
t.Errorf("%v", err)
Expand Down
48 changes: 30 additions & 18 deletions probe/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"flag"
"log"
"net/http"
_ "net/http/pprof"
"os"
"os/signal"
"runtime"
Expand Down Expand Up @@ -57,7 +58,10 @@ func main() {
log.Printf("exposing Prometheus endpoint at %s%s", *httpListen, *prometheusEndpoint)
http.Handle(*prometheusEndpoint, makePrometheusHandler())
}
go func(err error) { log.Print(err) }(http.ListenAndServe(*httpListen, nil))
go func() {
err := http.ListenAndServe(*httpListen, nil)
log.Print(err)
}()
}

if *spyProcs && os.Getegid() != 0 {
Expand All @@ -76,7 +80,8 @@ func main() {
)

var (
weaveTagger *tag.WeaveTagger
weaveTagger *tag.WeaveTagger
processCache *process.CachingWalker
)

taggers := []tag.Tagger{
Expand All @@ -89,19 +94,25 @@ func main() {
endpoint.NewReporter(hostID, hostName, *spyProcs),
}

if *dockerEnabled && runtime.GOOS == linux {
if err = report.AddLocalBridge(*dockerBridge); err != nil {
log.Fatalf("failed to get docker bridge address: %v", err)
}
// TODO provide an alternate implementation for Darwin.
if runtime.GOOS == linux {
processCache = process.NewCachingWalker(process.NewWalker(*procRoot))
reporters = append(reporters, process.NewReporter(processCache, hostID))

dockerRegistry, err := docker.NewRegistry(*dockerInterval)
if err != nil {
log.Fatalf("failed to start docker registry: %v", err)
}
defer dockerRegistry.Stop()
if *dockerEnabled {
if err = report.AddLocalBridge(*dockerBridge); err != nil {
log.Fatalf("failed to get docker bridge address: %v", err)
}

dockerRegistry, err := docker.NewRegistry(*dockerInterval)
if err != nil {
log.Fatalf("failed to start docker registry: %v", err)
}
defer dockerRegistry.Stop()

taggers = append(taggers, docker.NewTagger(dockerRegistry, *procRoot))
reporters = append(reporters, docker.NewReporter(dockerRegistry, hostID))
taggers = append(taggers, docker.NewTagger(dockerRegistry, processCache))
reporters = append(reporters, docker.NewReporter(dockerRegistry, hostID))
}
}

if *weaveRouterAddr != "" {
Expand All @@ -113,11 +124,6 @@ func main() {
taggers = append(taggers, weaveTagger)
}

// TODO provide an alternate implementation for Darwin.
if runtime.GOOS == linux {
reporters = append(reporters, process.NewReporter(*procRoot, hostID))
}

log.Printf("listening on %s", *listen)

quit := make(chan struct{})
Expand All @@ -137,6 +143,12 @@ func main() {
r = report.MakeReport()

case <-spyTick:
if processCache != nil {
if err := processCache.Update(); err != nil {
log.Printf("error reading processes: %v", err)
}
}

for _, reporter := range reporters {
newReport, err := reporter.Report()
if err != nil {
Expand Down
12 changes: 6 additions & 6 deletions probe/process/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@ const (

// Reporter generate Reports containing the Process topology
type reporter struct {
procRoot string
scope string
scope string
walker Walker
}

// NewReporter makes a new Reporter
func NewReporter(procRoot, scope string) tag.Reporter {
func NewReporter(walker Walker, scope string) tag.Reporter {
return &reporter{
procRoot: procRoot,
scope: scope,
scope: scope,
walker: walker,
}
}

Expand All @@ -43,7 +43,7 @@ func (r *reporter) Report() (report.Report, error) {

func (r *reporter) processTopology() (report.Topology, error) {
t := report.NewTopology()
err := Walk(r.procRoot, func(p *Process) {
err := r.walker.Walk(func(p *Process) {
pidstr := strconv.Itoa(p.PID)
nodeID := report.MakeProcessNodeID(r.scope, pidstr)
t.NodeMetadatas[nodeID] = report.NodeMetadata{
Expand Down
25 changes: 15 additions & 10 deletions probe/process/reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,23 +9,28 @@ import (
"github.com/weaveworks/scope/test"
)

func TestReporter(t *testing.T) {
oldWalk := process.Walk
defer func() { process.Walk = oldWalk }()
type mockWalker struct {
processes []*process.Process
}

process.Walk = func(_ string, f func(*process.Process)) error {
for _, p := range []*process.Process{
func (m *mockWalker) Walk(f func(*process.Process)) error {
for _, p := range m.processes {
f(p)
}
return nil
}

func TestReporter(t *testing.T) {
walker := &mockWalker{
processes: []*process.Process{
{PID: 1, PPID: 0, Comm: "init"},
{PID: 2, PPID: 1, Comm: "bash"},
{PID: 3, PPID: 1, Comm: "apache", Threads: 2},
{PID: 4, PPID: 2, Comm: "ping", Cmdline: "ping foo.bar.local"},
} {
f(p)
}
return nil
},
}

reporter := process.NewReporter("", "")
reporter := process.NewReporter(walker, "")
want := report.MakeReport()
want.Process = report.Topology{
Adjacency: report.Adjacency{},
Expand Down
4 changes: 2 additions & 2 deletions probe/process/tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ type tree struct {
}

// NewTree returns a new Tree that can be polled.
func NewTree(procRoot string) (Tree, error) {
func NewTree(walker Walker) (Tree, error) {
pt := tree{processes: map[int]*Process{}}
err := Walk(procRoot, func(p *Process) {
err := walker.Walk(func(p *Process) {
pt.processes[p.PID] = p
})

Expand Down
22 changes: 8 additions & 14 deletions probe/process/tree_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,16 @@ import (
)

func TestTree(t *testing.T) {
oldWalk := process.Walk
defer func() { process.Walk = oldWalk }()

process.Walk = func(_ string, f func(*process.Process)) error {
for _, p := range []*process.Process{
{PID: 1, PPID: 0},
{PID: 2, PPID: 1},
{PID: 3, PPID: 1},
{PID: 4, PPID: 2},
} {
f(p)
}
return nil
walker := &mockWalker{
processes: []*process.Process{
{PID: 1, PPID: 0, Comm: "init"},
{PID: 2, PPID: 1, Comm: "bash"},
{PID: 3, PPID: 1, Comm: "apache", Threads: 2},
{PID: 4, PPID: 2, Comm: "ping", Cmdline: "ping foo.bar.local"},
},
}

tree, err := process.NewTree("foo")
tree, err := process.NewTree(walker)
if err != nil {
t.Fatalf("newProcessTree error: %v", err)
}
Expand Down
65 changes: 60 additions & 5 deletions probe/process/walker.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"path"
"strconv"
"strings"
"sync"
)

// Process represents a single process.
Expand All @@ -16,18 +17,32 @@ type Process struct {
Threads int
}

// Walker is something that walks the /proc directory
type Walker interface {
Walk(func(*Process)) error
}

// Hooks exposed for mocking
var (
ReadDir = ioutil.ReadDir
ReadFile = ioutil.ReadFile
)

type walker struct {
procRoot string
}

// NewWalker creates a new process Walker
func NewWalker(procRoot string) Walker {
return &walker{procRoot: procRoot}
}

// Walk walks the supplied directory (expecting it to look like /proc)
// and marshalls the files into instances of Process, which it then
// passes one-by-one to the supplied function. Walk is only made public
// so that is can be tested.
var Walk = func(procRoot string, f func(*Process)) error {
dirEntries, err := ReadDir(procRoot)
func (w *walker) Walk(f func(*Process)) error {
dirEntries, err := ReadDir(w.procRoot)
if err != nil {
return err
}
Expand All @@ -39,7 +54,7 @@ var Walk = func(procRoot string, f func(*Process)) error {
continue
}

stat, err := ReadFile(path.Join(procRoot, filename, "stat"))
stat, err := ReadFile(path.Join(w.procRoot, filename, "stat"))
if err != nil {
continue
}
Expand All @@ -55,13 +70,13 @@ var Walk = func(procRoot string, f func(*Process)) error {
}

cmdline := ""
if cmdlineBuf, err := ReadFile(path.Join(procRoot, filename, "cmdline")); err == nil {
if cmdlineBuf, err := ReadFile(path.Join(w.procRoot, filename, "cmdline")); err == nil {
cmdlineBuf = bytes.Replace(cmdlineBuf, []byte{'\000'}, []byte{' '}, -1)
cmdline = string(cmdlineBuf)
}

comm := "(unknown)"
if commBuf, err := ReadFile(path.Join(procRoot, filename, "comm")); err == nil {
if commBuf, err := ReadFile(path.Join(w.procRoot, filename, "comm")); err == nil {
comm = strings.TrimSpace(string(commBuf))
}

Expand All @@ -76,3 +91,43 @@ var Walk = func(procRoot string, f func(*Process)) error {

return nil
}

// CachingWalker is a walker than caches a copy of the output from another
// Walker, and then allows other concurrent readers to Walk that copy.
type CachingWalker struct {
cache []*Process
cacheLock sync.RWMutex
source Walker
}

// NewCachingWalker returns a new CachingWalker
func NewCachingWalker(source Walker) *CachingWalker {
return &CachingWalker{source: source}
}

// Walk walks a cached copy of process list
func (c *CachingWalker) Walk(f func(*Process)) error {
c.cacheLock.RLock()
defer c.cacheLock.RUnlock()

for _, p := range c.cache {
f(p)
}
return nil
}

// Update updates cached copy of process list
func (c *CachingWalker) Update() error {
newCache := []*Process{}
err := c.source.Walk(func(p *Process) {
newCache = append(newCache, p)
})
if err != nil {
return err
}

c.cacheLock.Lock()
defer c.cacheLock.Unlock()
c.cache = newCache
return nil
}
Loading

0 comments on commit 40b5fa5

Please sign in to comment.