From 2b6b78ef5ca5db915f09d05804e3df0b44bae71c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johannes=20W=C3=BCrbach?= Date: Tue, 13 Jul 2021 14:41:48 +0200 Subject: [PATCH] feat: scrape timeout MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Johannes Würbach --- cmd/postgres_exporter/datasource.go | 15 +++++----- cmd/postgres_exporter/main.go | 3 +- cmd/postgres_exporter/namespace.go | 11 +++---- cmd/postgres_exporter/pg_setting.go | 5 ++-- cmd/postgres_exporter/postgres_exporter.go | 30 +++++++++++++------ .../postgres_exporter_integration_test.go | 13 ++++---- cmd/postgres_exporter/queries.go | 5 ++-- cmd/postgres_exporter/server.go | 15 +++++----- 8 files changed, 58 insertions(+), 39 deletions(-) diff --git a/cmd/postgres_exporter/datasource.go b/cmd/postgres_exporter/datasource.go index 3bbe2f0a7..efdb3c8ff 100644 --- a/cmd/postgres_exporter/datasource.go +++ b/cmd/postgres_exporter/datasource.go @@ -14,6 +14,7 @@ package main import ( + "context" "fmt" "io/ioutil" "net/url" @@ -25,7 +26,7 @@ import ( "github.com/prometheus/client_golang/prometheus" ) -func (e *Exporter) discoverDatabaseDSNs() []string { +func (e *Exporter) discoverDatabaseDSNs(ctx context.Context) []string { // connstring syntax is complex (and not sure if even regular). // we don't need to parse it, so just superficially validate that it starts // with a valid-ish keyword pair @@ -50,7 +51,7 @@ func (e *Exporter) discoverDatabaseDSNs() []string { continue } - server, err := e.servers.GetServer(dsn) + server, err := e.servers.GetServer(ctx, dsn) if err != nil { level.Error(logger).Log("msg", "Error opening connection to database", "dsn", loggableDSN(dsn), "err", err) continue @@ -60,7 +61,7 @@ func (e *Exporter) discoverDatabaseDSNs() []string { // If autoDiscoverDatabases is true, set first dsn as master database (Default: false) server.master = true - databaseNames, err := queryDatabases(server) + databaseNames, err := queryDatabases(ctx, server) if err != nil { level.Error(logger).Log("msg", "Error querying databases", "dsn", loggableDSN(dsn), "err", err) continue @@ -96,8 +97,8 @@ func (e *Exporter) discoverDatabaseDSNs() []string { return result } -func (e *Exporter) scrapeDSN(ch chan<- prometheus.Metric, dsn string) error { - server, err := e.servers.GetServer(dsn) +func (e *Exporter) scrapeDSN(ctx context.Context, ch chan<- prometheus.Metric, dsn string) error { + server, err := e.servers.GetServer(ctx, dsn) if err != nil { return &ErrorConnectToServer{fmt.Sprintf("Error opening connection to database (%s): %s", loggableDSN(dsn), err.Error())} @@ -109,11 +110,11 @@ func (e *Exporter) scrapeDSN(ch chan<- prometheus.Metric, dsn string) error { } // Check if map versions need to be updated - if err := e.checkMapVersions(ch, server); err != nil { + if err := e.checkMapVersions(ctx, ch, server); err != nil { level.Warn(logger).Log("msg", "Proceeding with outdated query maps, as the Postgres version could not be determined", "err", err) } - return server.Scrape(ch, e.disableSettingsMetrics) + return server.Scrape(ctx, ch, e.disableSettingsMetrics) } // try to get the DataSource diff --git a/cmd/postgres_exporter/main.go b/cmd/postgres_exporter/main.go index 25f94a745..0ae33e001 100644 --- a/cmd/postgres_exporter/main.go +++ b/cmd/postgres_exporter/main.go @@ -42,6 +42,7 @@ var ( excludeDatabases = kingpin.Flag("exclude-databases", "A list of databases to remove when autoDiscoverDatabases is enabled").Default("").Envar("PG_EXPORTER_EXCLUDE_DATABASES").String() includeDatabases = kingpin.Flag("include-databases", "A list of databases to include when autoDiscoverDatabases is enabled").Default("").Envar("PG_EXPORTER_INCLUDE_DATABASES").String() metricPrefix = kingpin.Flag("metric-prefix", "A metric prefix can be used to have non-default (not \"pg\") prefixes for each of the metrics").Default("pg").Envar("PG_EXPORTER_METRIC_PREFIX").String() + scrapeTimeout = kingpin.Flag("scrape-timeout", "Maximum duration of a scrape").Default("60s").Envar("PG_EXPORTER_SCRAPE_TIMEOUT").Duration() logger = log.NewNopLogger() ) @@ -105,7 +106,7 @@ func main() { IncludeDatabases(*includeDatabases), } - exporter := NewExporter(dsn, opts...) + exporter := NewExporter(dsn, scrapeTimeout, opts...) defer func() { exporter.servers.Close() }() diff --git a/cmd/postgres_exporter/namespace.go b/cmd/postgres_exporter/namespace.go index 1b9e970ef..1c62dd611 100644 --- a/cmd/postgres_exporter/namespace.go +++ b/cmd/postgres_exporter/namespace.go @@ -14,6 +14,7 @@ package main import ( + "context" "database/sql" "errors" "fmt" @@ -27,7 +28,7 @@ import ( // Query within a namespace mapping and emit metrics. Returns fatal errors if // the scrape fails, and a slice of errors if they were non-fatal. -func queryNamespaceMapping(server *Server, namespace string, mapping MetricMapNamespace) ([]prometheus.Metric, []error, error) { +func queryNamespaceMapping(ctx context.Context, server *Server, namespace string, mapping MetricMapNamespace) ([]prometheus.Metric, []error, error) { // Check for a query override for this namespace query, found := server.queryOverrides[namespace] @@ -45,9 +46,9 @@ func queryNamespaceMapping(server *Server, namespace string, mapping MetricMapNa if !found { // I've no idea how to avoid this properly at the moment, but this is // an admin tool so you're not injecting SQL right? - rows, err = server.db.Query(fmt.Sprintf("SELECT * FROM %s;", namespace)) // nolint: gas + rows, err = server.db.QueryContext(ctx, fmt.Sprintf("SELECT * FROM %s;", namespace)) // nolint: gas } else { - rows, err = server.db.Query(query) + rows, err = server.db.QueryContext(ctx, query) } if err != nil { return []prometheus.Metric{}, []error{}, fmt.Errorf("Error running query on database %q: %s %v", server, namespace, err) @@ -183,7 +184,7 @@ func queryNamespaceMapping(server *Server, namespace string, mapping MetricMapNa // Iterate through all the namespace mappings in the exporter and run their // queries. -func queryNamespaceMappings(ch chan<- prometheus.Metric, server *Server) map[string]error { +func queryNamespaceMappings(ctx context.Context, ch chan<- prometheus.Metric, server *Server) map[string]error { // Return a map of namespace -> errors namespaceErrors := make(map[string]error) @@ -225,7 +226,7 @@ func queryNamespaceMappings(ch chan<- prometheus.Metric, server *Server) map[str var nonFatalErrors []error var err error if scrapeMetric { - metrics, nonFatalErrors, err = queryNamespaceMapping(server, namespace, mapping) + metrics, nonFatalErrors, err = queryNamespaceMapping(ctx, server, namespace, mapping) } else { metrics = cachedMetric.metrics } diff --git a/cmd/postgres_exporter/pg_setting.go b/cmd/postgres_exporter/pg_setting.go index 4b0e2124f..724b1011f 100644 --- a/cmd/postgres_exporter/pg_setting.go +++ b/cmd/postgres_exporter/pg_setting.go @@ -14,6 +14,7 @@ package main import ( + "context" "fmt" "math" "strconv" @@ -24,7 +25,7 @@ import ( ) // Query the pg_settings view containing runtime variables -func querySettings(ch chan<- prometheus.Metric, server *Server) error { +func querySettings(ctx context.Context, ch chan<- prometheus.Metric, server *Server) error { level.Debug(logger).Log("msg", "Querying pg_setting view", "server", server) // pg_settings docs: https://www.postgresql.org/docs/current/static/view-pg-settings.html @@ -33,7 +34,7 @@ func querySettings(ch chan<- prometheus.Metric, server *Server) error { // types in normaliseUnit() below query := "SELECT name, setting, COALESCE(unit, ''), short_desc, vartype FROM pg_settings WHERE vartype IN ('bool', 'integer', 'real');" - rows, err := server.db.Query(query) + rows, err := server.db.QueryContext(ctx, query) if err != nil { return fmt.Errorf("Error running query on database %q: %s %v", server, namespace, err) } diff --git a/cmd/postgres_exporter/postgres_exporter.go b/cmd/postgres_exporter/postgres_exporter.go index 158048112..6381638fd 100644 --- a/cmd/postgres_exporter/postgres_exporter.go +++ b/cmd/postgres_exporter/postgres_exporter.go @@ -14,6 +14,7 @@ package main import ( + "context" "crypto/sha256" "database/sql" "errors" @@ -468,6 +469,7 @@ type Exporter struct { psqlUp prometheus.Gauge userQueriesError *prometheus.GaugeVec totalScrapes prometheus.Counter + scrapeTimeout *time.Duration // servers are used to allow re-using the DB connection between scrapes. // servers contains metrics map and query overrides. @@ -555,7 +557,7 @@ func parseConstLabels(s string) prometheus.Labels { } // NewExporter returns a new PostgreSQL exporter for the provided DSN. -func NewExporter(dsn []string, opts ...ExporterOpt) *Exporter { +func NewExporter(dsn []string, scrapeTimeout *time.Duration, opts ...ExporterOpt) *Exporter { e := &Exporter{ dsn: dsn, builtinMetricMaps: builtinMetricMaps, @@ -567,6 +569,7 @@ func NewExporter(dsn []string, opts ...ExporterOpt) *Exporter { e.setupInternalMetrics() e.servers = NewServers(ServerWithLabels(e.constantLabels)) + e.scrapeTimeout = scrapeTimeout return e } @@ -614,7 +617,16 @@ func (e *Exporter) Describe(ch chan<- *prometheus.Desc) { // Collect implements prometheus.Collector. func (e *Exporter) Collect(ch chan<- prometheus.Metric) { - e.scrape(ch) + var ctx context.Context + var cancel context.CancelFunc + if e.scrapeTimeout != nil { + ctx, cancel = context.WithTimeout(context.Background(), *e.scrapeTimeout) + } else { + ctx, cancel = context.WithCancel(context.Background()) + } + defer cancel() + + e.scrape(ctx, ch) ch <- e.duration ch <- e.totalScrapes @@ -630,9 +642,9 @@ func newDesc(subsystem, name, help string, labels prometheus.Labels) *prometheus ) } -func checkPostgresVersion(db *sql.DB, server string) (semver.Version, string, error) { +func checkPostgresVersion(ctx context.Context, db *sql.DB, server string) (semver.Version, string, error) { level.Debug(logger).Log("msg", "Querying PostgreSQL version", "server", server) - versionRow := db.QueryRow("SELECT version();") + versionRow := db.QueryRowContext(ctx, "SELECT version();") var versionString string err := versionRow.Scan(&versionString) if err != nil { @@ -647,8 +659,8 @@ func checkPostgresVersion(db *sql.DB, server string) (semver.Version, string, er } // Check and update the exporters query maps if the version has changed. -func (e *Exporter) checkMapVersions(ch chan<- prometheus.Metric, server *Server) error { - semanticVersion, versionString, err := checkPostgresVersion(server.db, server.String()) +func (e *Exporter) checkMapVersions(ctx context.Context, ch chan<- prometheus.Metric, server *Server) error { + semanticVersion, versionString, err := checkPostgresVersion(ctx, server.db, server.String()) if err != nil { return fmt.Errorf("Error fetching version string on %q: %v", server, err) } @@ -709,7 +721,7 @@ func (e *Exporter) checkMapVersions(ch chan<- prometheus.Metric, server *Server) return nil } -func (e *Exporter) scrape(ch chan<- prometheus.Metric) { +func (e *Exporter) scrape(ctx context.Context, ch chan<- prometheus.Metric) { defer func(begun time.Time) { e.duration.Set(time.Since(begun).Seconds()) }(time.Now()) @@ -718,14 +730,14 @@ func (e *Exporter) scrape(ch chan<- prometheus.Metric) { dsns := e.dsn if e.autoDiscoverDatabases { - dsns = e.discoverDatabaseDSNs() + dsns = e.discoverDatabaseDSNs(ctx) } var errorsCount int var connectionErrorsCount int for _, dsn := range dsns { - if err := e.scrapeDSN(ch, dsn); err != nil { + if err := e.scrapeDSN(ctx, ch, dsn); err != nil { errorsCount++ level.Error(logger).Log("err", err) diff --git a/cmd/postgres_exporter/postgres_exporter_integration_test.go b/cmd/postgres_exporter/postgres_exporter_integration_test.go index 5d479d7a5..c55433287 100644 --- a/cmd/postgres_exporter/postgres_exporter_integration_test.go +++ b/cmd/postgres_exporter/postgres_exporter_integration_test.go @@ -42,7 +42,7 @@ func (s *IntegrationSuite) SetUpSuite(c *C) { dsn := os.Getenv("DATA_SOURCE_NAME") c.Assert(dsn, Not(Equals), "") - exporter := NewExporter(strings.Split(dsn, ",")) + exporter := NewExporter(strings.Split(dsn, ","), 10*time.Duration) c.Assert(exporter, NotNil) // Assign the exporter to the suite s.e = exporter @@ -66,7 +66,7 @@ func (s *IntegrationSuite) TestAllNamespacesReturnResults(c *C) { c.Assert(err, IsNil) // Do a version update - err = s.e.checkMapVersions(ch, server) + err = s.e.checkMapVersions(context.Background(), ch, server) c.Assert(err, IsNil) err = querySettings(ch, server) @@ -99,12 +99,12 @@ func (s *IntegrationSuite) TestInvalidDsnDoesntCrash(c *C) { }() // Send a bad DSN - exporter := NewExporter([]string{"invalid dsn"}) + exporter := NewExporter([]string{"invalid dsn"}, 10*time.Duration) c.Assert(exporter, NotNil) exporter.scrape(ch) // Send a DSN to a non-listening port. - exporter = NewExporter([]string{"postgresql://nothing:nothing@127.0.0.1:1/nothing"}) + exporter = NewExporter([]string{"postgresql://nothing:nothing@127.0.0.1:1/nothing"}, 10*time.Duration) c.Assert(exporter, NotNil) exporter.scrape(ch) } @@ -122,7 +122,7 @@ func (s *IntegrationSuite) TestUnknownMetricParsingDoesntCrash(c *C) { dsn := os.Getenv("DATA_SOURCE_NAME") c.Assert(dsn, Not(Equals), "") - exporter := NewExporter(strings.Split(dsn, ",")) + exporter := NewExporter(strings.Split(dsn, ","), 10*time.Duration) c.Assert(exporter, NotNil) // Convert the default maps into a list of empty maps. @@ -154,7 +154,7 @@ func (s *IntegrationSuite) TestExtendQueriesDoesntCrash(c *C) { c.Assert(dsn, Not(Equals), "") exporter := NewExporter( - strings.Split(dsn, ","), + strings.Split(dsn, ","), 10*time.Duration, WithUserQueriesPath("../user_queries_test.yaml"), ) c.Assert(exporter, NotNil) @@ -168,6 +168,7 @@ func (s *IntegrationSuite) TestAutoDiscoverDatabases(c *C) { exporter := NewExporter( strings.Split(dsn, ","), + 10*time.Duration, ) c.Assert(exporter, NotNil) diff --git a/cmd/postgres_exporter/queries.go b/cmd/postgres_exporter/queries.go index 903e1a277..d014d3b4e 100644 --- a/cmd/postgres_exporter/queries.go +++ b/cmd/postgres_exporter/queries.go @@ -14,6 +14,7 @@ package main import ( + "context" "errors" "fmt" @@ -282,8 +283,8 @@ func addQueries(content []byte, pgVersion semver.Version, server *Server) error return nil } -func queryDatabases(server *Server) ([]string, error) { - rows, err := server.db.Query("SELECT datname FROM pg_database WHERE datallowconn = true AND datistemplate = false AND datname != current_database()") +func queryDatabases(ctx context.Context, server *Server) ([]string, error) { + rows, err := server.db.QueryContext(ctx, "SELECT datname FROM pg_database WHERE datallowconn = true AND datistemplate = false AND datname != current_database()") if err != nil { return nil, fmt.Errorf("Error retrieving databases: %v", err) } diff --git a/cmd/postgres_exporter/server.go b/cmd/postgres_exporter/server.go index cf2469cc9..ce808f778 100644 --- a/cmd/postgres_exporter/server.go +++ b/cmd/postgres_exporter/server.go @@ -14,6 +14,7 @@ package main import ( + "context" "database/sql" "fmt" "sync" @@ -95,8 +96,8 @@ func (s *Server) Close() error { } // Ping checks connection availability and possibly invalidates the connection if it fails. -func (s *Server) Ping() error { - if err := s.db.Ping(); err != nil { +func (s *Server) Ping(ctx context.Context) error { + if err := s.db.PingContext(ctx); err != nil { if cerr := s.Close(); cerr != nil { level.Error(logger).Log("msg", "Error while closing non-pinging DB connection", "server", s, "err", cerr) } @@ -111,19 +112,19 @@ func (s *Server) String() string { } // Scrape loads metrics. -func (s *Server) Scrape(ch chan<- prometheus.Metric, disableSettingsMetrics bool) error { +func (s *Server) Scrape(ctx context.Context, ch chan<- prometheus.Metric, disableSettingsMetrics bool) error { s.mappingMtx.RLock() defer s.mappingMtx.RUnlock() var err error if !disableSettingsMetrics && s.master { - if err = querySettings(ch, s); err != nil { + if err = querySettings(ctx, ch, s); err != nil { err = fmt.Errorf("error retrieving settings: %s", err) } } - errMap := queryNamespaceMappings(ch, s) + errMap := queryNamespaceMappings(ctx, ch, s) if len(errMap) > 0 { err = fmt.Errorf("queryNamespaceMappings returned %d errors", len(errMap)) } @@ -147,7 +148,7 @@ func NewServers(opts ...ServerOpt) *Servers { } // GetServer returns established connection from a collection. -func (s *Servers) GetServer(dsn string) (*Server, error) { +func (s *Servers) GetServer(ctx context.Context, dsn string) (*Server, error) { s.m.Lock() defer s.m.Unlock() var err error @@ -168,7 +169,7 @@ func (s *Servers) GetServer(dsn string) (*Server, error) { } s.servers[dsn] = server } - if err = server.Ping(); err != nil { + if err = server.Ping(ctx); err != nil { delete(s.servers, dsn) time.Sleep(time.Duration(errCount) * time.Second) continue