Skip to content

Commit

Permalink
Merge pull request #456 from prometheus/beorn7/process_collector
Browse files Browse the repository at this point in the history
Rework process collector
  • Loading branch information
beorn7 authored Sep 7, 2018
2 parents 3525612 + 773f502 commit 7858729
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 30 deletions.
105 changes: 81 additions & 24 deletions prometheus/process_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,46 +13,74 @@

package prometheus

import "github.com/prometheus/procfs"
import (
"errors"
"os"

"github.com/prometheus/procfs"
)

type processCollector struct {
collectFn func(chan<- Metric)
pidFn func() (int, error)
reportErrors bool
cpuTotal *Desc
openFDs, maxFDs *Desc
vsize, maxVsize *Desc
rss *Desc
startTime *Desc
}

// ProcessCollectorOpts defines the behavior of a process metrics collector
// created with NewProcessCollector.
type ProcessCollectorOpts struct {
// PidFn returns the PID of the process the collector collects metrics
// for. It is called upon each collection. By default, the PID of the
// current process is used, as determined on construction time by
// calling os.Getpid().
PidFn func() (int, error)
// If non-empty, each of the collected metrics is prefixed by the
// provided string and an underscore ("_").
Namespace string
// If true, any error encountered during collection is reported as an
// invalid metric (see NewInvalidMetric). Otherwise, errors are ignored
// and the collected metrics will be incomplete. (Possibly, no metrics
// will be collected at all.) While that's usually not desired, it is
// appropriate for the common "mix-in" of process metrics, where process
// metrics are nice to have, but failing to collect them should not
// disrupt the collection of the remaining metrics.
ReportErrors bool
}

// NewProcessCollector returns a collector which exports the current state of
// process metrics including CPU, memory and file descriptor usage as well as
// the process start time for the given process ID under the given namespace.
// the process start time. The detailed behavior is defined by the provided
// ProcessCollectorOpts. The zero value of ProcessCollectorOpts creates a
// collector for the current process with an empty namespace string and no error
// reporting.
//
// Currently, the collector depends on a Linux-style proc filesystem and
// therefore only exports metrics for Linux.
func NewProcessCollector(pid int, namespace string) Collector {
return NewProcessCollectorPIDFn(
func() (int, error) { return pid, nil },
namespace,
)
}

// NewProcessCollectorPIDFn works like NewProcessCollector but the process ID is
// determined on each collect anew by calling the given pidFn function.
func NewProcessCollectorPIDFn(
pidFn func() (int, error),
namespace string,
) Collector {
//
// Note: An older version of this function had the following signature:
//
// NewProcessCollector(pid int, namespace string) Collector
//
// Most commonly, it was called as
//
// NewProcessCollector(os.Getpid(), "")
//
// The following call of the current version is equivalent to the above:
//
// NewProcessCollector(ProcessCollectorOpts{})
func NewProcessCollector(opts ProcessCollectorOpts) Collector {
ns := ""
if len(namespace) > 0 {
ns = namespace + "_"
if len(opts.Namespace) > 0 {
ns = opts.Namespace + "_"
}

c := processCollector{
pidFn: pidFn,
collectFn: func(chan<- Metric) {},

c := &processCollector{
reportErrors: opts.ReportErrors,
cpuTotal: NewDesc(
ns+"process_cpu_seconds_total",
"Total user and system CPU time spent in seconds.",
Expand Down Expand Up @@ -90,12 +118,23 @@ func NewProcessCollectorPIDFn(
),
}

if opts.PidFn == nil {
pid := os.Getpid()
c.pidFn = func() (int, error) { return pid, nil }
} else {
c.pidFn = opts.PidFn
}

// Set up process metric collection if supported by the runtime.
if _, err := procfs.NewStat(); err == nil {
c.collectFn = c.processCollect
} else {
c.collectFn = func(ch chan<- Metric) {
c.reportError(ch, nil, errors.New("process metrics not supported on this platform"))
}
}

return &c
return c
}

// Describe returns all descriptions of the collector.
Expand All @@ -114,16 +153,16 @@ func (c *processCollector) Collect(ch chan<- Metric) {
c.collectFn(ch)
}

// TODO(ts): Bring back error reporting by reverting 7faf9e7 as soon as the
// client allows users to configure the error behavior.
func (c *processCollector) processCollect(ch chan<- Metric) {
pid, err := c.pidFn()
if err != nil {
c.reportError(ch, nil, err)
return
}

p, err := procfs.NewProc(pid)
if err != nil {
c.reportError(ch, nil, err)
return
}

Expand All @@ -133,15 +172,33 @@ func (c *processCollector) processCollect(ch chan<- Metric) {
ch <- MustNewConstMetric(c.rss, GaugeValue, float64(stat.ResidentMemory()))
if startTime, err := stat.StartTime(); err == nil {
ch <- MustNewConstMetric(c.startTime, GaugeValue, startTime)
} else {
c.reportError(ch, c.startTime, err)
}
} else {
c.reportError(ch, nil, err)
}

if fds, err := p.FileDescriptorsLen(); err == nil {
ch <- MustNewConstMetric(c.openFDs, GaugeValue, float64(fds))
} else {
c.reportError(ch, c.openFDs, err)
}

if limits, err := p.NewLimits(); err == nil {
ch <- MustNewConstMetric(c.maxFDs, GaugeValue, float64(limits.OpenFiles))
ch <- MustNewConstMetric(c.maxVsize, GaugeValue, float64(limits.AddressSpace))
} else {
c.reportError(ch, nil, err)
}
}

func (c *processCollector) reportError(ch chan<- Metric, desc *Desc, err error) {
if !c.reportErrors {
return
}
if desc == nil {
desc = NewInvalidDesc(err)
}
ch <- NewInvalidMetric(desc, err)
}
36 changes: 32 additions & 4 deletions prometheus/process_collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@ package prometheus

import (
"bytes"
"errors"
"os"
"regexp"
"testing"

"github.com/prometheus/common/expfmt"
"github.com/prometheus/procfs"

dto "github.com/prometheus/client_model/go"
)

func TestProcessCollector(t *testing.T) {
Expand All @@ -31,12 +34,14 @@ func TestProcessCollector(t *testing.T) {
}

registry := NewRegistry()
if err := registry.Register(NewProcessCollector(os.Getpid(), "")); err != nil {
if err := registry.Register(NewProcessCollector(ProcessCollectorOpts{})); err != nil {
t.Fatal(err)
}
if err := registry.Register(NewProcessCollectorPIDFn(
func() (int, error) { return os.Getpid(), nil }, "foobar"),
); err != nil {
if err := registry.Register(NewProcessCollector(ProcessCollectorOpts{
PidFn: func() (int, error) { return os.Getpid(), nil },
Namespace: "foobar",
ReportErrors: true, // No errors expected, just to see if none are reported.
})); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -72,4 +77,27 @@ func TestProcessCollector(t *testing.T) {
t.Errorf("want body to match %s\n%s", re, buf.String())
}
}

brokenProcessCollector := NewProcessCollector(ProcessCollectorOpts{
PidFn: func() (int, error) { return 0, errors.New("boo") },
ReportErrors: true,
})

ch := make(chan Metric)
go func() {
brokenProcessCollector.Collect(ch)
close(ch)
}()
n := 0
for m := range ch {
n++
pb := &dto.Metric{}
err := m.Write(pb)
if err == nil {
t.Error("metric collected from broken process collector is unexpectedly valid")
}
}
if n != 1 {
t.Errorf("%d metrics collected, want 1", n)
}
}
3 changes: 1 addition & 2 deletions prometheus/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package prometheus
import (
"bytes"
"fmt"
"os"
"runtime"
"sort"
"strings"
Expand Down Expand Up @@ -54,7 +53,7 @@ var (
)

func init() {
MustRegister(NewProcessCollector(os.Getpid(), ""))
MustRegister(NewProcessCollector(ProcessCollectorOpts{}))
MustRegister(NewGoCollector())
}

Expand Down

0 comments on commit 7858729

Please sign in to comment.