Skip to content

Commit

Permalink
Gather per-process CPU and memory metrics.
Browse files Browse the repository at this point in the history
  • Loading branch information
tomwilkie committed Dec 16, 2015
1 parent b3b2bcc commit 79e6e9a
Show file tree
Hide file tree
Showing 15 changed files with 172 additions and 81 deletions.
4 changes: 2 additions & 2 deletions experimental/demo/gce.sh
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,9 @@ function setup {
. ~/.profile
git clone http://github.com/weaveworks/scope.git
cd scope
git checkout 0.9
git checkout master
make deps
make
make RUN_FLAGS=
./scope launch
EOF
done
Expand Down
2 changes: 1 addition & 1 deletion probe/endpoint/procspy/proc.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func walkProcPid(buf *bytes.Buffer, walker process.Walker) (map[uint64]*Proc, er
statT syscall.Stat_t
)

walker.Walk(func(p process.Process) {
walker.Walk(func(p, _ process.Process) {
dirName := strconv.Itoa(p.PID)
fdBase := filepath.Join(procRoot, dirName, "fd")

Expand Down
2 changes: 1 addition & 1 deletion probe/endpoint/procspy/proc_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ var mockFS = fs.Dir("",
),
fs.File{
FName: "stat",
FContents: "1 na R 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1",
FContents: "1 na R 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0",
},
),
),
Expand Down
31 changes: 0 additions & 31 deletions probe/process/cache.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package process

import (
"strconv"
"strings"
"time"

"github.com/armon/go-metrics"
Expand Down Expand Up @@ -41,32 +39,3 @@ func cachedReadFile(path string) ([]byte, error) {
metrics.IncrCounter(missMetricsKey, 1.0)
return buf, err
}

// we cache the stats, but for a shorter period
func readStats(path string) (int, int, error) {
var (
key = []byte(path)
buf []byte
err error
)
if buf, err = fileCache.Get(key); err == nil {
metrics.IncrCounter(hitMetricsKey, 1.0)
} else {
buf, err = fs.ReadFile(path)
if err != nil {
return -1, -1, err
}
fileCache.Set(key, buf, statsTimeout)
metrics.IncrCounter(missMetricsKey, 1.0)
}
splits := strings.Fields(string(buf))
ppid, err := strconv.Atoi(splits[3])
if err != nil {
return -1, -1, err
}
threads, err := strconv.Atoi(splits[19])
if err != nil {
return -1, -1, err
}
return ppid, threads, nil
}
45 changes: 34 additions & 11 deletions probe/process/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,29 +3,37 @@ package process
import (
"strconv"

"github.com/weaveworks/scope/common/mtime"
"github.com/weaveworks/scope/report"
)

// We use these keys in node metadata
const (
PID = "pid"
Comm = "comm"
PPID = "ppid"
Cmdline = "cmdline"
Threads = "threads"
PID = "pid"
Comm = "comm"
PPID = "ppid"
Cmdline = "cmdline"
Threads = "threads"
CPUUsage = "cpu_usage_percent"
MemoryUsage = "memory_usage_bytes"
)

// Reporter generates Reports containing the Process topology.
type Reporter struct {
scope string
walker Walker
scope string
walker Walker
jiffies Jiffies
}

// Jiffies is the type for the function used to fetch the elapsed jiffies.
type Jiffies func() (uint64, float64, error)

// NewReporter makes a new Reporter.
func NewReporter(walker Walker, scope string) *Reporter {
func NewReporter(walker Walker, scope string, jiffies Jiffies) *Reporter {
return &Reporter{
scope: scope,
walker: walker,
scope: scope,
walker: walker,
jiffies: jiffies,
}
}

Expand All @@ -45,7 +53,13 @@ func (r *Reporter) Report() (report.Report, error) {

func (r *Reporter) processTopology() (report.Topology, error) {
t := report.MakeTopology()
err := r.walker.Walk(func(p Process) {
now := mtime.Now()
deltaTotal, maxCPU, err := r.jiffies()
if err != nil {
return t, err
}

err = r.walker.Walk(func(p, prev Process) {
pidstr := strconv.Itoa(p.PID)
nodeID := report.MakeProcessNodeID(r.scope, pidstr)
node := report.MakeNode()
Expand All @@ -59,9 +73,18 @@ func (r *Reporter) processTopology() (report.Topology, error) {
node.Metadata[tuple.key] = tuple.value
}
}

if p.PPID > 0 {
node.Metadata[PPID] = strconv.Itoa(p.PPID)
}

if deltaTotal > 0 {
cpuUsage := float64(p.Jiffies-prev.Jiffies) / float64(deltaTotal) * 100.
node = node.WithMetric(CPUUsage, report.MakeMetric().Add(now, cpuUsage).WithMax(maxCPU))
}

node = node.WithMetric(MemoryUsage, report.MakeMetric().Add(now, float64(p.RSSBytes)))

t.AddNode(nodeID, node)
})

Expand Down
22 changes: 14 additions & 8 deletions probe/process/reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ package process_test
import (
"reflect"
"testing"
"time"

"github.com/weaveworks/scope/common/mtime"
"github.com/weaveworks/scope/probe/process"
"github.com/weaveworks/scope/report"
"github.com/weaveworks/scope/test"
Expand All @@ -13,9 +15,9 @@ type mockWalker struct {
processes []process.Process
}

func (m *mockWalker) Walk(f func(process.Process)) error {
func (m *mockWalker) Walk(f func(process.Process, process.Process)) error {
for _, p := range m.processes {
f(p)
f(p, process.Process{})
}
return nil
}
Expand All @@ -30,44 +32,48 @@ func TestReporter(t *testing.T) {
{PID: 5, PPID: 1, Cmdline: "tail -f /var/log/syslog"},
},
}
getDeltaTotalJiffies := func() (uint64, float64, error) { return 0, 0., nil }
now := time.Now()
mtime.NowForce(now)
defer mtime.NowReset()

reporter := process.NewReporter(walker, "")
reporter := process.NewReporter(walker, "", getDeltaTotalJiffies)
want := report.MakeReport()
want.Process = report.MakeTopology().AddNode(
report.MakeProcessNodeID("", "1"), report.MakeNodeWith(map[string]string{
process.PID: "1",
process.Comm: "init",
process.Threads: "0",
}),
}).WithMetric(process.MemoryUsage, report.MakeMetric().Add(now, 0.)),
).AddNode(
report.MakeProcessNodeID("", "2"), report.MakeNodeWith(map[string]string{
process.PID: "2",
process.Comm: "bash",
process.PPID: "1",
process.Threads: "0",
}),
}).WithMetric(process.MemoryUsage, report.MakeMetric().Add(now, 0.)),
).AddNode(
report.MakeProcessNodeID("", "3"), report.MakeNodeWith(map[string]string{
process.PID: "3",
process.Comm: "apache",
process.PPID: "1",
process.Threads: "2",
}),
}).WithMetric(process.MemoryUsage, report.MakeMetric().Add(now, 0.)),
).AddNode(
report.MakeProcessNodeID("", "4"), report.MakeNodeWith(map[string]string{
process.PID: "4",
process.Comm: "ping",
process.PPID: "2",
process.Cmdline: "ping foo.bar.local",
process.Threads: "0",
}),
}).WithMetric(process.MemoryUsage, report.MakeMetric().Add(now, 0.)),
).AddNode(
report.MakeProcessNodeID("", "5"), report.MakeNodeWith(map[string]string{
process.PID: "5",
process.PPID: "1",
process.Cmdline: "tail -f /var/log/syslog",
process.Threads: "0",
}),
}).WithMetric(process.MemoryUsage, report.MakeMetric().Add(now, 0.)),
)

have, err := reporter.Report()
Expand Down
2 changes: 1 addition & 1 deletion probe/process/tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ type tree struct {
// NewTree returns a new Tree that can be polled.
func NewTree(walker Walker) (Tree, error) {
pt := tree{processes: map[int]Process{}}
err := walker.Walk(func(p Process) {
err := walker.Walk(func(p, _ Process) {
pt.processes[p.PID] = p
})

Expand Down
21 changes: 14 additions & 7 deletions probe/process/walker.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,22 @@ type Process struct {
Comm string
Cmdline string
Threads int
Jiffies uint64
RSSBytes uint64
}

// Walker is something that walks the /proc directory
type Walker interface {
Walk(func(Process)) error
Walk(func(Process, Process)) error
}

// CachingWalker is a walker than caches a copy of the output from another
// Walker, and then allows other concurrent readers to Walk that copy.
type CachingWalker struct {
cache []Process
cacheLock sync.RWMutex
source Walker
cache []Process
previousByPID map[int]Process
cacheLock sync.RWMutex
source Walker
}

// NewCachingWalker returns a new CachingWalker
Expand All @@ -32,20 +35,20 @@ func NewCachingWalker(source Walker) *CachingWalker {
func (*CachingWalker) Name() string { return "Process" }

// Walk walks a cached copy of process list
func (c *CachingWalker) Walk(f func(Process)) error {
func (c *CachingWalker) Walk(f func(Process, Process)) error {
c.cacheLock.RLock()
defer c.cacheLock.RUnlock()

for _, p := range c.cache {
f(p)
f(p, c.previousByPID[p.PID])
}
return nil
}

// Tick updates cached copy of process list
func (c *CachingWalker) Tick() error {
newCache := []Process{}
err := c.source.Walk(func(p Process) {
err := c.source.Walk(func(p, _ Process) {
newCache = append(newCache, p)
})
if err != nil {
Expand All @@ -54,6 +57,10 @@ func (c *CachingWalker) Tick() error {

c.cacheLock.Lock()
defer c.cacheLock.Unlock()
c.previousByPID = map[int]Process{}
for _, p := range c.cache {
c.previousByPID[p.PID] = p
}
c.cache = newCache
return nil
}
9 changes: 7 additions & 2 deletions probe/process/walker_darwin.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ const (

// These functions copied from procspy.

func (walker) Walk(f func(Process)) error {
func (walker) Walk(f func(Process, Process)) error {
output, err := exec.Command(
lsofBinary,
"-i", // only Internet files
Expand All @@ -40,7 +40,7 @@ func (walker) Walk(f func(Process)) error {
}

for _, process := range processes {
f(process)
f(process, Process{})
}
return nil
}
Expand Down Expand Up @@ -92,3 +92,8 @@ func parseLSOF(output string) (map[string]Process, error) {
}
return processes, nil
}

// GetDeltaTotalJiffies returns 0 - darwin doesn't have jiffies.
func GetDeltaTotalJiffies() (uint64, float64, error) {
return 0, 0.0, nil
}
Loading

0 comments on commit 79e6e9a

Please sign in to comment.