Skip to content

Commit

Permalink
Merge branch 'feat-review-comments' into feat_performance_monitoring_…
Browse files Browse the repository at this point in the history
…v1_test
  • Loading branch information
tharun0064 committed Feb 5, 2025
2 parents 5aa675e + 0e54518 commit a9272c0
Show file tree
Hide file tree
Showing 10 changed files with 62 additions and 71 deletions.
4 changes: 2 additions & 2 deletions src/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,8 @@ func main() {
log.Error(err.Error())
}

if args.EnableQueryMonitoring && args.HasMetrics() {
queryperformancemonitoring.QueryPerformanceMain(args, pgIntegration, collectionList,app)
if args.EnableQueryMonitoring {
queryperformancemonitoring.QueryPerformanceMain(args, pgIntegration, collectionList, app)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type CommonParameters struct {
func SetCommonParameters(args args.ArgumentList, version uint64, databases string) *CommonParameters {
return &CommonParameters{
Version: version,
Databases: databases,
Databases: databases, // comma separated database names
QueryMonitoringCountThreshold: validateAndGetQueryMonitoringCountThreshold(args),
QueryMonitoringResponseTimeThreshold: validateAndGetQueryMonitoringResponseTimeThreshold(args),
Host: args.Hostname,
Expand Down
4 changes: 2 additions & 2 deletions src/query-performance-monitoring/common-utils/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ const PublishThreshold = 600
const RandomIntRange = 1000000
const TimeFormat = "20060102150405"

// The maximum number of individual queries that can be fetched in a single metrics
const MaxIndividualQueryCountThreshold = 300
// The maximum number of individual queries that can be fetched in a single metrics, the value was chosen as the queries samples were with same query statements but with different parameters so 10 samples would be enough to check the execution plan
const MaxIndividualQueryCountThreshold = 10

var ErrUnsupportedVersion = errors.New("unsupported PostgreSQL version")
var ErrUnExpectedError = errors.New("unexpected error")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ import (
"github.com/newrelic/nri-postgresql/src/query-performance-monitoring/datamodels"
)

func PopulateBlockingMetrics(conn *performancedbconnection.PGSQLConnection, pgIntegration *integration.Integration, cp *commonparameters.CommonParameters, app *newrelic.Application) {
isEligible, enableCheckError := validations.CheckBlockingSessionMetricsFetchEligibility(conn, cp.Version, app)
func PopulateBlockingMetrics(conn *performancedbconnection.PGSQLConnection, pgIntegration *integration.Integration, cp *commonparameters.CommonParameters, enabledExtensions map[string]bool, app *newrelic.Application) {
isEligible, enableCheckError := validations.CheckBlockingSessionMetricsFetchEligibility(enabledExtensions, cp.Version, app)
if enableCheckError != nil {
log.Error("Error executing query: %v in PopulateBlockingMetrics", enableCheckError)
return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ import (
type queryInfoMap map[string]string
type databaseQueryInfoMap map[string]queryInfoMap

func PopulateIndividualQueryMetrics(conn *performancedbconnection.PGSQLConnection, slowRunningQueries []datamodels.SlowRunningQueryMetrics, pgIntegration *integration.Integration, cp *commonparameters.CommonParameters, app *newrelic.Application) []datamodels.IndividualQueryMetrics {
isEligible, err := validations.CheckIndividualQueryMetricsFetchEligibility(conn, app)
func PopulateIndividualQueryMetrics(conn *performancedbconnection.PGSQLConnection, slowRunningQueries []datamodels.SlowRunningQueryMetrics, pgIntegration *integration.Integration, cp *commonparameters.CommonParameters, enabledExtensions map[string]bool, app *newrelic.Application) []datamodels.IndividualQueryMetrics {
isEligible, err := validations.CheckIndividualQueryMetricsFetchEligibility(enabledExtensions,app)
if err != nil {
log.Error("Error executing query: %v", err)
return nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ func GetSlowRunningMetrics(conn *performancedbconnection.PGSQLConnection, cp *co
return slowQueryMetricsList, slowQueryMetricsListInterface, nil
}

func PopulateSlowRunningMetrics(conn *performancedbconnection.PGSQLConnection, pgIntegration *integration.Integration, cp *commonparameters.CommonParameters, app *newrelic.Application) []datamodels.SlowRunningQueryMetrics {
isEligible, err := validations.CheckSlowQueryMetricsFetchEligibility(conn, app)
func PopulateSlowRunningMetrics(conn *performancedbconnection.PGSQLConnection, pgIntegration *integration.Integration, cp *commonparameters.CommonParameters, enabledExtensions map[string]bool, app *newrelic.Application) []datamodels.SlowRunningQueryMetrics {
isEligible, err := validations.CheckSlowQueryMetricsFetchEligibility(enabledExtensions, app)
if err != nil {
log.Error("Error executing query: %v", err)
return nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ import (
"github.com/newrelic/nri-postgresql/src/query-performance-monitoring/validations"
)

func PopulateWaitEventMetrics(conn *performancedbconnection.PGSQLConnection, pgIntegration *integration.Integration, cp *commonparameters.CommonParameters, app *newrelic.Application) error {
func PopulateWaitEventMetrics(conn *performancedbconnection.PGSQLConnection, pgIntegration *integration.Integration, cp *commonparameters.CommonParameters, enabledExtensions map[string]bool, app *newrelic.Application) error {
var isEligible bool
var eligibleCheckErr error
isEligible, eligibleCheckErr = validations.CheckWaitEventMetricsFetchEligibility(conn, app)
isEligible, eligibleCheckErr = validations.CheckWaitEventMetricsFetchEligibility(enabledExtensions, app)
if eligibleCheckErr != nil {
log.Error("Error executing query: %v", eligibleCheckErr)
return commonutils.ErrUnExpectedError
Expand Down
24 changes: 18 additions & 6 deletions src/query-performance-monitoring/query_performance_main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package queryperformancemonitoring
import (
"github.com/newrelic/go-agent/v3/newrelic"
common_package "github.com/newrelic/nri-postgresql/common-package"
"github.com/newrelic/nri-postgresql/src/query-performance-monitoring/validations"
"time"

common_parameters "github.com/newrelic/nri-postgresql/src/query-performance-monitoring/common-parameters"
Expand Down Expand Up @@ -36,41 +37,52 @@ func QueryPerformanceMain(args args.ArgumentList, pgIntegration *integration.Int
log.Error("Error fetching version: ", versionErr)
return
}
gv := common_parameters.SetCommonParameters(args, version.Major, commonutils.GetDatabaseListInString(databaseMap))
populateQueryPerformanceMetrics(newConnection, pgIntegration, gv, connectionInfo, app)
versionInt := version.Major
if !validations.CheckPostgresVersionSupportForQueryMonitoring(versionInt) {
log.Debug("Postgres version is not supported for query monitoring")
return
}
cp := common_parameters.SetCommonParameters(args, versionInt, commonutils.GetDatabaseListInString(databaseMap))

populateQueryPerformanceMetrics(newConnection, pgIntegration, cp, connectionInfo, app)
}

func populateQueryPerformanceMetrics(newConnection *performancedbconnection.PGSQLConnection, pgIntegration *integration.Integration, cp *common_parameters.CommonParameters, connectionInfo performancedbconnection.Info, app *newrelic.Application) {
enabledExtensions, err := validations.FetchAllExtensions(newConnection, app)
if err != nil {
log.Error("Error fetching extensions: ", err)
return
}
start := time.Now()
txn := app.StartTransaction("slow_queries_metrics_go")
defer txn.End()
common_package.Txn = txn
log.Debug("Starting PopulateSlowRunningMetrics at ", start)
slowRunningQueries := performancemetrics.PopulateSlowRunningMetrics(newConnection, pgIntegration, cp, app)
slowRunningQueries := performancemetrics.PopulateSlowRunningMetrics(newConnection, pgIntegration, cp, enabledExtensions, app)
log.Debug("PopulateSlowRunningMetrics completed in ", time.Since(start))

waitTxn := app.StartTransaction("wait_queries_metrics_go")
defer waitTxn.End()
common_package.Txn = waitTxn
start = time.Now()
log.Debug("Starting PopulateWaitEventMetrics at ", start)
_ = performancemetrics.PopulateWaitEventMetrics(newConnection, pgIntegration, cp, app)
_ = performancemetrics.PopulateWaitEventMetrics(newConnection, pgIntegration, cp, enabledExtensions, app)
log.Debug("PopulateWaitEventMetrics completed in ", time.Since(start))

blockingEventsTxn := app.StartTransaction("blocking_queries_go")
defer blockingEventsTxn.End()
common_package.Txn = blockingEventsTxn
start = time.Now()
log.Debug("Starting PopulateBlockingMetrics at ", start)
performancemetrics.PopulateBlockingMetrics(newConnection, pgIntegration, cp, app)
performancemetrics.PopulateBlockingMetrics(newConnection, pgIntegration, cp, enabledExtensions, app)
log.Debug("PopulateBlockingMetrics completed in ", time.Since(start))

individualTxn := app.StartTransaction("individual_txns_go")
defer individualTxn.End()
common_package.Txn = individualTxn
start = time.Now()
log.Debug("Starting PopulateIndividualQueryMetrics at ", start)
individualQueries := performancemetrics.PopulateIndividualQueryMetrics(newConnection, slowRunningQueries, pgIntegration, cp, app)
individualQueries := performancemetrics.PopulateIndividualQueryMetrics(newConnection, slowRunningQueries, pgIntegration, cp, enabledExtensions, app)
log.Debug("PopulateIndividualQueryMetrics completed in ", time.Since(start))

execPlanTxn := app.StartTransaction("execution_plan_go")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,67 +7,45 @@ import (
commonutils "github.com/newrelic/nri-postgresql/src/query-performance-monitoring/common-utils"
)

var extensions map[string]bool

func fetchAllExtensions(conn *performancedbconnection.PGSQLConnection, app *newrelic.Application) error {
func FetchAllExtensions(conn *performancedbconnection.PGSQLConnection, app *newrelic.Application) (map[string]bool, error) {
rows, err := conn.Queryx("SELECT extname FROM pg_extension", app)
if err != nil {
log.Error("Error executing query: ", err.Error())
return err
return nil, err
}
defer rows.Close()
extensions = make(map[string]bool)
var enabledExtensions = make(map[string]bool)
for rows.Next() {
var extname string
if err := rows.Scan(&extname); err != nil {
log.Error("Error scanning rows: ", err.Error())
return err
return nil, err
}
extensions[extname] = true
enabledExtensions[extname] = true
}
return nil
}

func isExtensionEnabled(extensionName string) bool {
return extensions[extensionName]
return enabledExtensions, nil
}

func CheckSlowQueryMetricsFetchEligibility(conn *performancedbconnection.PGSQLConnection, app *newrelic.Application) (bool, error) {
loadExtensionsMap(conn, app)
return isExtensionEnabled("pg_stat_statements"), nil
func CheckSlowQueryMetricsFetchEligibility(enabledExtensions map[string]bool, app *newrelic.Application) (bool, error) {
return enabledExtensions["pg_stat_statements"], nil
}

func CheckWaitEventMetricsFetchEligibility(conn *performancedbconnection.PGSQLConnection, app *newrelic.Application) (bool, error) {
loadExtensionsMap(conn, app)
return isExtensionEnabled("pg_wait_sampling") && isExtensionEnabled("pg_stat_statements"), nil
func CheckWaitEventMetricsFetchEligibility(enabledExtensions map[string]bool, app *newrelic.Application) (bool, error) {
return enabledExtensions["pg_wait_sampling"] && enabledExtensions["pg_stat_statements"], nil
}

func CheckBlockingSessionMetricsFetchEligibility(conn *performancedbconnection.PGSQLConnection, version uint64, app *newrelic.Application) (bool, error) {
func CheckBlockingSessionMetricsFetchEligibility(enabledExtensions map[string]bool, version uint64, app *newrelic.Application) (bool, error) {
// Version 12 and 13 do not require the pg_stat_statements extension
if version == commonutils.PostgresVersion12 || version == commonutils.PostgresVersion13 {
return true, nil
}
loadExtensionsMap(conn, app)
return isExtensionEnabled("pg_stat_statements"), nil
return enabledExtensions["pg_stat_statements"], nil
}

func CheckIndividualQueryMetricsFetchEligibility(conn *performancedbconnection.PGSQLConnection, app *newrelic.Application) (bool, error) {
loadExtensionsMap(conn, app)
return isExtensionEnabled("pg_stat_monitor"), nil
func CheckIndividualQueryMetricsFetchEligibility(enabledExtensions map[string]bool, app *newrelic.Application) (bool, error) {
return enabledExtensions["pg_stat_monitor"], nil
}

func CheckPostgresVersionSupportForQueryMonitoring(version uint64) bool {
return version >= commonutils.PostgresVersion12
}

func ClearExtensionsLoadCache() {
extensions = nil
}

func loadExtensionsMap(conn *performancedbconnection.PGSQLConnection, app *newrelic.Application) {
if extensions == nil {
if err := fetchAllExtensions(conn, app); err != nil {
log.Error("Error fetching all extensions: %v", err)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,52 +14,53 @@ import (
func TestCheckBlockingSessionMetricsFetchEligibilityExtensionNotRequired(t *testing.T) {
conn, mock := connection.CreateMockSQL(t)
version := uint64(12)
isExtensionEnabledTest, _ := validations.CheckBlockingSessionMetricsFetchEligibility(conn, version,nil)
enabledExtensions, _ := validations.FetchAllExtensions(conn, nil)
isExtensionEnabledTest, _ := validations.CheckBlockingSessionMetricsFetchEligibility(enabledExtensions, version, nil)
assert.Equal(t, isExtensionEnabledTest, true)
assert.NoError(t, mock.ExpectationsWereMet())
validations.ClearExtensionsLoadCache()
}

func TestCheckBlockingSessionMetricsFetchEligibilitySupportedVersionSuccess(t *testing.T) {
conn, mock := connection.CreateMockSQL(t)
version := uint64(14)
validationQueryStatStatements := "SELECT extname FROM pg_extension"
mock.ExpectQuery(regexp.QuoteMeta(validationQueryStatStatements)).WillReturnRows(sqlmock.NewRows([]string{"extname"}).AddRow("pg_stat_statements"))
isExtensionEnabledTest, _ := validations.CheckBlockingSessionMetricsFetchEligibility(conn, version, nil)
enabledExtensions, _ := validations.FetchAllExtensions(conn, nil)
isExtensionEnabledTest, _ := validations.CheckBlockingSessionMetricsFetchEligibility(enabledExtensions, version, nil)
assert.Equal(t, isExtensionEnabledTest, true)
assert.NoError(t, mock.ExpectationsWereMet())
validations.ClearExtensionsLoadCache()

}

func TestCheckBlockingSessionMetricsFetchEligibilitySupportedVersionFail(t *testing.T) {
conn, mock := connection.CreateMockSQL(t)
version := uint64(14)
validationQueryStatStatements := "SELECT extname FROM pg_extension"
mock.ExpectQuery(regexp.QuoteMeta(validationQueryStatStatements)).WillReturnRows(sqlmock.NewRows([]string{"extname"}).AddRow("pg_stat_statements"))
isExtensionEnabledTest, _ := validations.CheckBlockingSessionMetricsFetchEligibility(conn, version, nil)
enabledExtensions, _ := validations.FetchAllExtensions(conn, nil)
isExtensionEnabledTest, _ := validations.CheckBlockingSessionMetricsFetchEligibility(enabledExtensions, version, nil)
assert.Equal(t, isExtensionEnabledTest, true)
assert.NoError(t, mock.ExpectationsWereMet())
validations.ClearExtensionsLoadCache()
}

func TestIndividualQueryMetricsFetchEligibilitySupportedVersionSuccess(t *testing.T) {
conn, mock := connection.CreateMockSQL(t)
validationQueryStatStatements := "SELECT extname FROM pg_extension"
mock.ExpectQuery(regexp.QuoteMeta(validationQueryStatStatements)).WillReturnRows(sqlmock.NewRows([]string{"extname"}).AddRow("pg_stat_monitor"))
isExtensionEnabledTest, _ := validations.CheckIndividualQueryMetricsFetchEligibility(conn, nil)
enabledExtensions, _ := validations.FetchAllExtensions(conn, nil)
isExtensionEnabledTest, _ := validations.CheckIndividualQueryMetricsFetchEligibility(enabledExtensions, nil)
assert.Equal(t, isExtensionEnabledTest, true)
assert.NoError(t, mock.ExpectationsWereMet())
validations.ClearExtensionsLoadCache()
}

func TestIndividualQueryMetricsFetchEligibilitySupportedVersionFail(t *testing.T) {
conn, mock := connection.CreateMockSQL(t)
validationQueryStatStatements := "SELECT extname FROM pg_extension"
mock.ExpectQuery(regexp.QuoteMeta(validationQueryStatStatements)).WillReturnRows(sqlmock.NewRows([]string{"extname"}))
isExtensionEnabledTest, _ := validations.CheckIndividualQueryMetricsFetchEligibility(conn, nil)
enabledExtensions, _ := validations.FetchAllExtensions(conn, nil)
isExtensionEnabledTest, _ := validations.CheckIndividualQueryMetricsFetchEligibility(enabledExtensions, nil)
assert.Equal(t, isExtensionEnabledTest, false)
assert.NoError(t, mock.ExpectationsWereMet())
validations.ClearExtensionsLoadCache()
}

func TestCheckWaitEventMetricsFetchEligibility(t *testing.T) {
Expand All @@ -77,29 +78,29 @@ func TestCheckWaitEventMetricsFetchEligibility(t *testing.T) {
conn, mock := connection.CreateMockSQL(t)
for _, tc := range testCases {
mock.ExpectQuery(regexp.QuoteMeta(validationQuery)).WillReturnRows(sqlmock.NewRows([]string{"extname"}).AddRow(tc.waitExt).AddRow(tc.statExt))
isExtensionEnabledTest, _ := validations.CheckWaitEventMetricsFetchEligibility(conn, nil)
enabledExtensions, _ := validations.FetchAllExtensions(conn, nil)
isExtensionEnabledTest, _ := validations.CheckWaitEventMetricsFetchEligibility(enabledExtensions, nil)
assert.Equal(t, isExtensionEnabledTest, tc.expected)
assert.NoError(t, mock.ExpectationsWereMet())
validations.ClearExtensionsLoadCache()
}
}

func TestCheckSlowQueryMetricsFetchEligibilitySupportedVersionSuccess(t *testing.T) {
conn, mock := connection.CreateMockSQL(t)
validationQueryStatStatements := "SELECT extname FROM pg_extension"
mock.ExpectQuery(regexp.QuoteMeta(validationQueryStatStatements)).WillReturnRows(sqlmock.NewRows([]string{"extname"}).AddRow("pg_stat_statements"))
isExtensionEnabledTest, _ := validations.CheckSlowQueryMetricsFetchEligibility(conn, nil)
enabledExtensions, _ := validations.FetchAllExtensions(conn, nil)
isExtensionEnabledTest, _ := validations.CheckSlowQueryMetricsFetchEligibility(enabledExtensions, nil)
assert.Equal(t, isExtensionEnabledTest, true)
assert.NoError(t, mock.ExpectationsWereMet())
validations.ClearExtensionsLoadCache()
}

func TestCheckSlowQueryMetricsFetchEligibilitySupportedVersionFail(t *testing.T) {
conn, mock := connection.CreateMockSQL(t)
validationQueryStatStatements := "SELECT extname FROM pg_extension"
mock.ExpectQuery(regexp.QuoteMeta(validationQueryStatStatements)).WillReturnRows(sqlmock.NewRows([]string{"extname"}))
isExtensionEnabledTest, _ := validations.CheckSlowQueryMetricsFetchEligibility(conn, nil)
enabledExtensions, _ := validations.FetchAllExtensions(conn, nil)
isExtensionEnabledTest, _ := validations.CheckSlowQueryMetricsFetchEligibility(enabledExtensions, nil)
assert.Equal(t, isExtensionEnabledTest, false)
assert.NoError(t, mock.ExpectationsWereMet())
validations.ClearExtensionsLoadCache()
}

0 comments on commit a9272c0

Please sign in to comment.