Skip to content

Commit

Permalink
Gather per-process CPU and memory metrics.
Browse files Browse the repository at this point in the history
  • Loading branch information
tomwilkie committed Dec 15, 2015
1 parent b3b2bcc commit fe330bc
Show file tree
Hide file tree
Showing 13 changed files with 115 additions and 59 deletions.
4 changes: 2 additions & 2 deletions experimental/demo/gce.sh
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,9 @@ function setup {
. ~/.profile
git clone http://github.com/weaveworks/scope.git
cd scope
git checkout 0.9
git checkout master
make deps
make
make RUN_FLAGS=
./scope launch
EOF
done
Expand Down
2 changes: 1 addition & 1 deletion probe/endpoint/procspy/proc.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func walkProcPid(buf *bytes.Buffer, walker process.Walker) (map[uint64]*Proc, er
statT syscall.Stat_t
)

walker.Walk(func(p process.Process) {
walker.Walk(func(p, _ process.Process) {
dirName := strconv.Itoa(p.PID)
fdBase := filepath.Join(procRoot, dirName, "fd")

Expand Down
31 changes: 0 additions & 31 deletions probe/process/cache.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package process

import (
"strconv"
"strings"
"time"

"github.com/armon/go-metrics"
Expand Down Expand Up @@ -41,32 +39,3 @@ func cachedReadFile(path string) ([]byte, error) {
metrics.IncrCounter(missMetricsKey, 1.0)
return buf, err
}

// we cache the stats, but for a shorter period
func readStats(path string) (int, int, error) {
var (
key = []byte(path)
buf []byte
err error
)
if buf, err = fileCache.Get(key); err == nil {
metrics.IncrCounter(hitMetricsKey, 1.0)
} else {
buf, err = fs.ReadFile(path)
if err != nil {
return -1, -1, err
}
fileCache.Set(key, buf, statsTimeout)
metrics.IncrCounter(missMetricsKey, 1.0)
}
splits := strings.Fields(string(buf))
ppid, err := strconv.Atoi(splits[3])
if err != nil {
return -1, -1, err
}
threads, err := strconv.Atoi(splits[19])
if err != nil {
return -1, -1, err
}
return ppid, threads, nil
}
27 changes: 21 additions & 6 deletions probe/process/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,19 @@ package process

import (
"strconv"
"time"

"github.com/weaveworks/scope/report"
)

// We use these keys in node metadata
const (
PID = "pid"
Comm = "comm"
PPID = "ppid"
Cmdline = "cmdline"
Threads = "threads"
PID = "pid"
Comm = "comm"
PPID = "ppid"
Cmdline = "cmdline"
Threads = "threads"
CPUUsage = "cpu_usage_percent"
)

// Reporter generates Reports containing the Process topology.
Expand Down Expand Up @@ -45,7 +47,13 @@ func (r *Reporter) Report() (report.Report, error) {

func (r *Reporter) processTopology() (report.Topology, error) {
t := report.MakeTopology()
err := r.walker.Walk(func(p Process) {
now := time.Now()
deltaTotal, maxCPU, err := getDeltaTotalJiffies()
if err != nil {
return t, err
}

err = r.walker.Walk(func(p, prev Process) {
pidstr := strconv.Itoa(p.PID)
nodeID := report.MakeProcessNodeID(r.scope, pidstr)
node := report.MakeNode()
Expand All @@ -59,9 +67,16 @@ func (r *Reporter) processTopology() (report.Topology, error) {
node.Metadata[tuple.key] = tuple.value
}
}

if p.PPID > 0 {
node.Metadata[PPID] = strconv.Itoa(p.PPID)
}

if deltaTotal > 0 {
cpuUsage := float64(p.Jiffies-prev.Jiffies) / float64(deltaTotal) * 100.
node = node.WithMetric(CPUUsage, report.MakeMetric().Add(now, cpuUsage).WithMax(maxCPU))
}

t.AddNode(nodeID, node)
})

Expand Down
4 changes: 2 additions & 2 deletions probe/process/reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ type mockWalker struct {
processes []process.Process
}

func (m *mockWalker) Walk(f func(process.Process)) error {
func (m *mockWalker) Walk(f func(process.Process, process.Process)) error {
for _, p := range m.processes {
f(p)
f(p, process.Process{})
}
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion probe/process/tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ type tree struct {
// NewTree returns a new Tree that can be polled.
func NewTree(walker Walker) (Tree, error) {
pt := tree{processes: map[int]Process{}}
err := walker.Walk(func(p Process) {
err := walker.Walk(func(p, _ Process) {
pt.processes[p.PID] = p
})

Expand Down
20 changes: 13 additions & 7 deletions probe/process/walker.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,21 @@ type Process struct {
Comm string
Cmdline string
Threads int
Jiffies uint64
}

// Walker is something that walks the /proc directory
type Walker interface {
Walk(func(Process)) error
Walk(func(Process, Process)) error
}

// CachingWalker is a walker than caches a copy of the output from another
// Walker, and then allows other concurrent readers to Walk that copy.
type CachingWalker struct {
cache []Process
cacheLock sync.RWMutex
source Walker
cache []Process
previousByPID map[int]Process
cacheLock sync.RWMutex
source Walker
}

// NewCachingWalker returns a new CachingWalker
Expand All @@ -32,20 +34,20 @@ func NewCachingWalker(source Walker) *CachingWalker {
func (*CachingWalker) Name() string { return "Process" }

// Walk walks a cached copy of process list
func (c *CachingWalker) Walk(f func(Process)) error {
func (c *CachingWalker) Walk(f func(Process, Process)) error {
c.cacheLock.RLock()
defer c.cacheLock.RUnlock()

for _, p := range c.cache {
f(p)
f(p, c.previousByPID[p.PID])
}
return nil
}

// Tick updates cached copy of process list
func (c *CachingWalker) Tick() error {
newCache := []Process{}
err := c.source.Walk(func(p Process) {
err := c.source.Walk(func(p, _ Process) {
newCache = append(newCache, p)
})
if err != nil {
Expand All @@ -54,6 +56,10 @@ func (c *CachingWalker) Tick() error {

c.cacheLock.Lock()
defer c.cacheLock.Unlock()
c.previousByPID = map[int]Process{}
for _, p := range c.cache {
c.previousByPID[p.PID] = p
}
c.cache = newCache
return nil
}
8 changes: 6 additions & 2 deletions probe/process/walker_darwin.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ const (

// These functions copied from procspy.

func (walker) Walk(f func(Process)) error {
func (walker) Walk(f func(Process, Process)) error {
output, err := exec.Command(
lsofBinary,
"-i", // only Internet files
Expand All @@ -40,7 +40,7 @@ func (walker) Walk(f func(Process)) error {
}

for _, process := range processes {
f(process)
f(process, Process{})
}
return nil
}
Expand Down Expand Up @@ -92,3 +92,7 @@ func parseLSOF(output string) (map[string]Process, error) {
}
return processes, nil
}

func getDeltaTotalJiffies() (uint64, float64, error) {
return 0, 0.0, nil
}
56 changes: 53 additions & 3 deletions probe/process/walker_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ import (
"strconv"
"strings"

linuxproc "github.com/c9s/goprocinfo/linux"

"github.com/weaveworks/scope/common/fs"
"github.com/weaveworks/scope/probe/host"
)

type walker struct {
Expand All @@ -18,11 +21,36 @@ func NewWalker(procRoot string) Walker {
return &walker{procRoot: procRoot}
}

func readStats(path string) (int, int, uint64, error) {
buf, err := fs.ReadFile(path)
if err != nil {
return -1, -1, 0, err
}
splits := strings.Fields(string(buf))
ppid, err := strconv.Atoi(splits[3])
if err != nil {
return -1, -1, 0, err
}
threads, err := strconv.Atoi(splits[19])
if err != nil {
return -1, -1, 0, err
}
userJiffies, err := strconv.ParseUint(splits[13], 10, 64)
if err != nil {
return -1, -1, 0, err
}
sysJiffies, err := strconv.ParseUint(splits[14], 10, 64)
if err != nil {
return -1, -1, 0, err
}
return ppid, threads, userJiffies + sysJiffies, nil
}

// Walk walks the supplied directory (expecting it to look like /proc)
// and marshalls the files into instances of Process, which it then
// passes one-by-one to the supplied function. Walk is only made public
// so that is can be tested.
func (w *walker) Walk(f func(Process)) error {
func (w *walker) Walk(f func(Process, Process)) error {
dirEntries, err := fs.ReadDirNames(w.procRoot)
if err != nil {
return err
Expand All @@ -34,7 +62,7 @@ func (w *walker) Walk(f func(Process)) error {
continue
}

ppid, threads, err := readStats(path.Join(w.procRoot, filename, "stat"))
ppid, threads, jiffies, err := readStats(path.Join(w.procRoot, filename, "stat"))
if err != nil {
continue
}
Expand All @@ -56,8 +84,30 @@ func (w *walker) Walk(f func(Process)) error {
Comm: comm,
Cmdline: cmdline,
Threads: threads,
})
Jiffies: jiffies,
}, Process{})
}

return nil
}

var previousStat = linuxproc.CPUStat{}

func getDeltaTotalJiffies() (uint64, float64, error) {
stat, err := linuxproc.ReadStat(host.ProcStat)
if err != nil {
return 0, 0.0, err
}

var (
currentStat = stat.CPUStatAll
prevTotal = (previousStat.Idle + previousStat.IOWait + previousStat.User +
previousStat.Nice + previousStat.System + previousStat.IRQ +
previousStat.SoftIRQ + previousStat.Steal)
currentTotal = (currentStat.Idle + currentStat.IOWait + currentStat.User +
currentStat.Nice + currentStat.System + currentStat.IRQ +
currentStat.SoftIRQ + currentStat.Steal)
)
previousStat = currentStat
return currentTotal - prevTotal, float64(len(stat.CPUStats)) * 100., nil
}
2 changes: 1 addition & 1 deletion probe/process/walker_linux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func TestWalker(t *testing.T) {

have := map[int]process.Process{}
walker := process.NewWalker("/proc")
err := walker.Walk(func(p process.Process) {
err := walker.Walk(func(p, _ process.Process) {
have[p.PID] = p
})

Expand Down
4 changes: 2 additions & 2 deletions probe/process/walker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
func TestBasicWalk(t *testing.T) {
var (
procRoot = "/proc"
procFunc = func(process.Process) {}
procFunc = func(process.Process, process.Process) {}
)
if err := process.NewWalker(procRoot).Walk(procFunc); err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -59,7 +59,7 @@ func TestCache(t *testing.T) {

func all(w process.Walker) ([]process.Process, error) {
all := []process.Process{}
err := w.Walk(func(p process.Process) {
err := w.Walk(func(p, _ process.Process) {
all = append(all, p)
})
return all, err
Expand Down
12 changes: 12 additions & 0 deletions render/detailed_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,18 @@ func processOriginTable(nmd report.Node, addHostTag bool, addContainerTag bool)
rows = append([]Row{{Key: "Host", ValueMajor: report.ExtractHostID(nmd)}}, rows...)
}

for _, tuple := range []struct {
key, human string
fmt formatter
}{
{process.CPUUsage, "CPU Usage", formatPercent},
{host.MemUsage, "Memory Usage", formatPercent},
} {
if val, ok := nmd.Metrics[tuple.key]; ok {
rows = append(rows, sparklineRow(tuple.human, val, tuple.fmt))
}
}

var (
title = "Process"
name, commFound = nmd.Metadata[process.Comm]
Expand Down
2 changes: 1 addition & 1 deletion report/topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func (n Node) WithSets(sets Sets) Node {
// WithMetric returns a fresh copy of n, with metric merged in at key.
func (n Node) WithMetric(key string, metric Metric) Node {
result := n.Copy()
n.Metrics[key] = n.Metrics[key].Merge(metric)
result.Metrics[key] = n.Metrics[key].Merge(metric)
return result
}

Expand Down

0 comments on commit fe330bc

Please sign in to comment.