Skip to content

Commit

Permalink
feat: scrape timeout
Browse files Browse the repository at this point in the history
Signed-off-by: Johannes Würbach <[email protected]>
  • Loading branch information
johanneswuerbach committed Jul 13, 2021
1 parent 57719ba commit 2b6b78e
Show file tree
Hide file tree
Showing 8 changed files with 58 additions and 39 deletions.
15 changes: 8 additions & 7 deletions cmd/postgres_exporter/datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package main

import (
"context"
"fmt"
"io/ioutil"
"net/url"
Expand All @@ -25,7 +26,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
)

func (e *Exporter) discoverDatabaseDSNs() []string {
func (e *Exporter) discoverDatabaseDSNs(ctx context.Context) []string {
// connstring syntax is complex (and not sure if even regular).
// we don't need to parse it, so just superficially validate that it starts
// with a valid-ish keyword pair
Expand All @@ -50,7 +51,7 @@ func (e *Exporter) discoverDatabaseDSNs() []string {
continue
}

server, err := e.servers.GetServer(dsn)
server, err := e.servers.GetServer(ctx, dsn)
if err != nil {
level.Error(logger).Log("msg", "Error opening connection to database", "dsn", loggableDSN(dsn), "err", err)
continue
Expand All @@ -60,7 +61,7 @@ func (e *Exporter) discoverDatabaseDSNs() []string {
// If autoDiscoverDatabases is true, set first dsn as master database (Default: false)
server.master = true

databaseNames, err := queryDatabases(server)
databaseNames, err := queryDatabases(ctx, server)
if err != nil {
level.Error(logger).Log("msg", "Error querying databases", "dsn", loggableDSN(dsn), "err", err)
continue
Expand Down Expand Up @@ -96,8 +97,8 @@ func (e *Exporter) discoverDatabaseDSNs() []string {
return result
}

func (e *Exporter) scrapeDSN(ch chan<- prometheus.Metric, dsn string) error {
server, err := e.servers.GetServer(dsn)
func (e *Exporter) scrapeDSN(ctx context.Context, ch chan<- prometheus.Metric, dsn string) error {
server, err := e.servers.GetServer(ctx, dsn)

if err != nil {
return &ErrorConnectToServer{fmt.Sprintf("Error opening connection to database (%s): %s", loggableDSN(dsn), err.Error())}
Expand All @@ -109,11 +110,11 @@ func (e *Exporter) scrapeDSN(ch chan<- prometheus.Metric, dsn string) error {
}

// Check if map versions need to be updated
if err := e.checkMapVersions(ch, server); err != nil {
if err := e.checkMapVersions(ctx, ch, server); err != nil {
level.Warn(logger).Log("msg", "Proceeding with outdated query maps, as the Postgres version could not be determined", "err", err)
}

return server.Scrape(ch, e.disableSettingsMetrics)
return server.Scrape(ctx, ch, e.disableSettingsMetrics)
}

// try to get the DataSource
Expand Down
3 changes: 2 additions & 1 deletion cmd/postgres_exporter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ var (
excludeDatabases = kingpin.Flag("exclude-databases", "A list of databases to remove when autoDiscoverDatabases is enabled").Default("").Envar("PG_EXPORTER_EXCLUDE_DATABASES").String()
includeDatabases = kingpin.Flag("include-databases", "A list of databases to include when autoDiscoverDatabases is enabled").Default("").Envar("PG_EXPORTER_INCLUDE_DATABASES").String()
metricPrefix = kingpin.Flag("metric-prefix", "A metric prefix can be used to have non-default (not \"pg\") prefixes for each of the metrics").Default("pg").Envar("PG_EXPORTER_METRIC_PREFIX").String()
scrapeTimeout = kingpin.Flag("scrape-timeout", "Maximum duration of a scrape").Default("60s").Envar("PG_EXPORTER_SCRAPE_TIMEOUT").Duration()
logger = log.NewNopLogger()
)

Expand Down Expand Up @@ -105,7 +106,7 @@ func main() {
IncludeDatabases(*includeDatabases),
}

exporter := NewExporter(dsn, opts...)
exporter := NewExporter(dsn, scrapeTimeout, opts...)
defer func() {
exporter.servers.Close()
}()
Expand Down
11 changes: 6 additions & 5 deletions cmd/postgres_exporter/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package main

import (
"context"
"database/sql"
"errors"
"fmt"
Expand All @@ -27,7 +28,7 @@ import (

// Query within a namespace mapping and emit metrics. Returns fatal errors if
// the scrape fails, and a slice of errors if they were non-fatal.
func queryNamespaceMapping(server *Server, namespace string, mapping MetricMapNamespace) ([]prometheus.Metric, []error, error) {
func queryNamespaceMapping(ctx context.Context, server *Server, namespace string, mapping MetricMapNamespace) ([]prometheus.Metric, []error, error) {
// Check for a query override for this namespace
query, found := server.queryOverrides[namespace]

Expand All @@ -45,9 +46,9 @@ func queryNamespaceMapping(server *Server, namespace string, mapping MetricMapNa
if !found {
// I've no idea how to avoid this properly at the moment, but this is
// an admin tool so you're not injecting SQL right?
rows, err = server.db.Query(fmt.Sprintf("SELECT * FROM %s;", namespace)) // nolint: gas
rows, err = server.db.QueryContext(ctx, fmt.Sprintf("SELECT * FROM %s;", namespace)) // nolint: gas
} else {
rows, err = server.db.Query(query)
rows, err = server.db.QueryContext(ctx, query)
}
if err != nil {
return []prometheus.Metric{}, []error{}, fmt.Errorf("Error running query on database %q: %s %v", server, namespace, err)
Expand Down Expand Up @@ -183,7 +184,7 @@ func queryNamespaceMapping(server *Server, namespace string, mapping MetricMapNa

// Iterate through all the namespace mappings in the exporter and run their
// queries.
func queryNamespaceMappings(ch chan<- prometheus.Metric, server *Server) map[string]error {
func queryNamespaceMappings(ctx context.Context, ch chan<- prometheus.Metric, server *Server) map[string]error {
// Return a map of namespace -> errors
namespaceErrors := make(map[string]error)

Expand Down Expand Up @@ -225,7 +226,7 @@ func queryNamespaceMappings(ch chan<- prometheus.Metric, server *Server) map[str
var nonFatalErrors []error
var err error
if scrapeMetric {
metrics, nonFatalErrors, err = queryNamespaceMapping(server, namespace, mapping)
metrics, nonFatalErrors, err = queryNamespaceMapping(ctx, server, namespace, mapping)
} else {
metrics = cachedMetric.metrics
}
Expand Down
5 changes: 3 additions & 2 deletions cmd/postgres_exporter/pg_setting.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package main

import (
"context"
"fmt"
"math"
"strconv"
Expand All @@ -24,7 +25,7 @@ import (
)

// Query the pg_settings view containing runtime variables
func querySettings(ch chan<- prometheus.Metric, server *Server) error {
func querySettings(ctx context.Context, ch chan<- prometheus.Metric, server *Server) error {
level.Debug(logger).Log("msg", "Querying pg_setting view", "server", server)

// pg_settings docs: https://www.postgresql.org/docs/current/static/view-pg-settings.html
Expand All @@ -33,7 +34,7 @@ func querySettings(ch chan<- prometheus.Metric, server *Server) error {
// types in normaliseUnit() below
query := "SELECT name, setting, COALESCE(unit, ''), short_desc, vartype FROM pg_settings WHERE vartype IN ('bool', 'integer', 'real');"

rows, err := server.db.Query(query)
rows, err := server.db.QueryContext(ctx, query)
if err != nil {
return fmt.Errorf("Error running query on database %q: %s %v", server, namespace, err)
}
Expand Down
30 changes: 21 additions & 9 deletions cmd/postgres_exporter/postgres_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package main

import (
"context"
"crypto/sha256"
"database/sql"
"errors"
Expand Down Expand Up @@ -468,6 +469,7 @@ type Exporter struct {
psqlUp prometheus.Gauge
userQueriesError *prometheus.GaugeVec
totalScrapes prometheus.Counter
scrapeTimeout *time.Duration

// servers are used to allow re-using the DB connection between scrapes.
// servers contains metrics map and query overrides.
Expand Down Expand Up @@ -555,7 +557,7 @@ func parseConstLabels(s string) prometheus.Labels {
}

// NewExporter returns a new PostgreSQL exporter for the provided DSN.
func NewExporter(dsn []string, opts ...ExporterOpt) *Exporter {
func NewExporter(dsn []string, scrapeTimeout *time.Duration, opts ...ExporterOpt) *Exporter {
e := &Exporter{
dsn: dsn,
builtinMetricMaps: builtinMetricMaps,
Expand All @@ -567,6 +569,7 @@ func NewExporter(dsn []string, opts ...ExporterOpt) *Exporter {

e.setupInternalMetrics()
e.servers = NewServers(ServerWithLabels(e.constantLabels))
e.scrapeTimeout = scrapeTimeout

return e
}
Expand Down Expand Up @@ -614,7 +617,16 @@ func (e *Exporter) Describe(ch chan<- *prometheus.Desc) {

// Collect implements prometheus.Collector.
func (e *Exporter) Collect(ch chan<- prometheus.Metric) {
e.scrape(ch)
var ctx context.Context
var cancel context.CancelFunc
if e.scrapeTimeout != nil {
ctx, cancel = context.WithTimeout(context.Background(), *e.scrapeTimeout)
} else {
ctx, cancel = context.WithCancel(context.Background())
}
defer cancel()

e.scrape(ctx, ch)

ch <- e.duration
ch <- e.totalScrapes
Expand All @@ -630,9 +642,9 @@ func newDesc(subsystem, name, help string, labels prometheus.Labels) *prometheus
)
}

func checkPostgresVersion(db *sql.DB, server string) (semver.Version, string, error) {
func checkPostgresVersion(ctx context.Context, db *sql.DB, server string) (semver.Version, string, error) {
level.Debug(logger).Log("msg", "Querying PostgreSQL version", "server", server)
versionRow := db.QueryRow("SELECT version();")
versionRow := db.QueryRowContext(ctx, "SELECT version();")
var versionString string
err := versionRow.Scan(&versionString)
if err != nil {
Expand All @@ -647,8 +659,8 @@ func checkPostgresVersion(db *sql.DB, server string) (semver.Version, string, er
}

// Check and update the exporters query maps if the version has changed.
func (e *Exporter) checkMapVersions(ch chan<- prometheus.Metric, server *Server) error {
semanticVersion, versionString, err := checkPostgresVersion(server.db, server.String())
func (e *Exporter) checkMapVersions(ctx context.Context, ch chan<- prometheus.Metric, server *Server) error {
semanticVersion, versionString, err := checkPostgresVersion(ctx, server.db, server.String())
if err != nil {
return fmt.Errorf("Error fetching version string on %q: %v", server, err)
}
Expand Down Expand Up @@ -709,7 +721,7 @@ func (e *Exporter) checkMapVersions(ch chan<- prometheus.Metric, server *Server)
return nil
}

func (e *Exporter) scrape(ch chan<- prometheus.Metric) {
func (e *Exporter) scrape(ctx context.Context, ch chan<- prometheus.Metric) {
defer func(begun time.Time) {
e.duration.Set(time.Since(begun).Seconds())
}(time.Now())
Expand All @@ -718,14 +730,14 @@ func (e *Exporter) scrape(ch chan<- prometheus.Metric) {

dsns := e.dsn
if e.autoDiscoverDatabases {
dsns = e.discoverDatabaseDSNs()
dsns = e.discoverDatabaseDSNs(ctx)
}

var errorsCount int
var connectionErrorsCount int

for _, dsn := range dsns {
if err := e.scrapeDSN(ch, dsn); err != nil {
if err := e.scrapeDSN(ctx, ch, dsn); err != nil {
errorsCount++

level.Error(logger).Log("err", err)
Expand Down
13 changes: 7 additions & 6 deletions cmd/postgres_exporter/postgres_exporter_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func (s *IntegrationSuite) SetUpSuite(c *C) {
dsn := os.Getenv("DATA_SOURCE_NAME")
c.Assert(dsn, Not(Equals), "")

exporter := NewExporter(strings.Split(dsn, ","))
exporter := NewExporter(strings.Split(dsn, ","), 10*time.Duration)
c.Assert(exporter, NotNil)
// Assign the exporter to the suite
s.e = exporter
Expand All @@ -66,7 +66,7 @@ func (s *IntegrationSuite) TestAllNamespacesReturnResults(c *C) {
c.Assert(err, IsNil)

// Do a version update
err = s.e.checkMapVersions(ch, server)
err = s.e.checkMapVersions(context.Background(), ch, server)
c.Assert(err, IsNil)

err = querySettings(ch, server)
Expand Down Expand Up @@ -99,12 +99,12 @@ func (s *IntegrationSuite) TestInvalidDsnDoesntCrash(c *C) {
}()

// Send a bad DSN
exporter := NewExporter([]string{"invalid dsn"})
exporter := NewExporter([]string{"invalid dsn"}, 10*time.Duration)
c.Assert(exporter, NotNil)
exporter.scrape(ch)

// Send a DSN to a non-listening port.
exporter = NewExporter([]string{"postgresql://nothing:[email protected]:1/nothing"})
exporter = NewExporter([]string{"postgresql://nothing:[email protected]:1/nothing"}, 10*time.Duration)
c.Assert(exporter, NotNil)
exporter.scrape(ch)
}
Expand All @@ -122,7 +122,7 @@ func (s *IntegrationSuite) TestUnknownMetricParsingDoesntCrash(c *C) {
dsn := os.Getenv("DATA_SOURCE_NAME")
c.Assert(dsn, Not(Equals), "")

exporter := NewExporter(strings.Split(dsn, ","))
exporter := NewExporter(strings.Split(dsn, ","), 10*time.Duration)
c.Assert(exporter, NotNil)

// Convert the default maps into a list of empty maps.
Expand Down Expand Up @@ -154,7 +154,7 @@ func (s *IntegrationSuite) TestExtendQueriesDoesntCrash(c *C) {
c.Assert(dsn, Not(Equals), "")

exporter := NewExporter(
strings.Split(dsn, ","),
strings.Split(dsn, ","), 10*time.Duration,
WithUserQueriesPath("../user_queries_test.yaml"),
)
c.Assert(exporter, NotNil)
Expand All @@ -168,6 +168,7 @@ func (s *IntegrationSuite) TestAutoDiscoverDatabases(c *C) {

exporter := NewExporter(
strings.Split(dsn, ","),
10*time.Duration,
)
c.Assert(exporter, NotNil)

Expand Down
5 changes: 3 additions & 2 deletions cmd/postgres_exporter/queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package main

import (
"context"
"errors"
"fmt"

Expand Down Expand Up @@ -282,8 +283,8 @@ func addQueries(content []byte, pgVersion semver.Version, server *Server) error
return nil
}

func queryDatabases(server *Server) ([]string, error) {
rows, err := server.db.Query("SELECT datname FROM pg_database WHERE datallowconn = true AND datistemplate = false AND datname != current_database()")
func queryDatabases(ctx context.Context, server *Server) ([]string, error) {
rows, err := server.db.QueryContext(ctx, "SELECT datname FROM pg_database WHERE datallowconn = true AND datistemplate = false AND datname != current_database()")
if err != nil {
return nil, fmt.Errorf("Error retrieving databases: %v", err)
}
Expand Down
15 changes: 8 additions & 7 deletions cmd/postgres_exporter/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package main

import (
"context"
"database/sql"
"fmt"
"sync"
Expand Down Expand Up @@ -95,8 +96,8 @@ func (s *Server) Close() error {
}

// Ping checks connection availability and possibly invalidates the connection if it fails.
func (s *Server) Ping() error {
if err := s.db.Ping(); err != nil {
func (s *Server) Ping(ctx context.Context) error {
if err := s.db.PingContext(ctx); err != nil {
if cerr := s.Close(); cerr != nil {
level.Error(logger).Log("msg", "Error while closing non-pinging DB connection", "server", s, "err", cerr)
}
Expand All @@ -111,19 +112,19 @@ func (s *Server) String() string {
}

// Scrape loads metrics.
func (s *Server) Scrape(ch chan<- prometheus.Metric, disableSettingsMetrics bool) error {
func (s *Server) Scrape(ctx context.Context, ch chan<- prometheus.Metric, disableSettingsMetrics bool) error {
s.mappingMtx.RLock()
defer s.mappingMtx.RUnlock()

var err error

if !disableSettingsMetrics && s.master {
if err = querySettings(ch, s); err != nil {
if err = querySettings(ctx, ch, s); err != nil {
err = fmt.Errorf("error retrieving settings: %s", err)
}
}

errMap := queryNamespaceMappings(ch, s)
errMap := queryNamespaceMappings(ctx, ch, s)
if len(errMap) > 0 {
err = fmt.Errorf("queryNamespaceMappings returned %d errors", len(errMap))
}
Expand All @@ -147,7 +148,7 @@ func NewServers(opts ...ServerOpt) *Servers {
}

// GetServer returns established connection from a collection.
func (s *Servers) GetServer(dsn string) (*Server, error) {
func (s *Servers) GetServer(ctx context.Context, dsn string) (*Server, error) {
s.m.Lock()
defer s.m.Unlock()
var err error
Expand All @@ -168,7 +169,7 @@ func (s *Servers) GetServer(dsn string) (*Server, error) {
}
s.servers[dsn] = server
}
if err = server.Ping(); err != nil {
if err = server.Ping(ctx); err != nil {
delete(s.servers, dsn)
time.Sleep(time.Duration(errCount) * time.Second)
continue
Expand Down

0 comments on commit 2b6b78e

Please sign in to comment.