From 6c79585e659508375e35a5e53e0aa2c1f7ae216a Mon Sep 17 00:00:00 2001 From: Tim Chan Date: Mon, 27 Jan 2025 13:24:01 -0800 Subject: [PATCH] Added mongodb replica metrics and routing logic for multiple mongodb instances --- .chloggen/mongodbReplicaMetrics.yaml | 27 ++ receiver/mongodbreceiver/client.go | 6 +- receiver/mongodbreceiver/client_test.go | 17 + receiver/mongodbreceiver/config.go | 23 +- receiver/mongodbreceiver/config_test.go | 4 +- receiver/mongodbreceiver/documentation.md | 48 +++ receiver/mongodbreceiver/go.mod | 12 +- receiver/mongodbreceiver/go.sum | 12 + .../internal/metadata/generated_config.go | 24 ++ .../metadata/generated_config_test.go | 12 + .../internal/metadata/generated_metrics.go | 342 ++++++++++++++++++ .../metadata/generated_metrics_test.go | 90 +++++ .../internal/metadata/testdata/config.yaml | 24 ++ receiver/mongodbreceiver/metadata.yaml | 50 ++- receiver/mongodbreceiver/metrics.go | 63 +++- receiver/mongodbreceiver/scraper.go | 112 +++++- receiver/mongodbreceiver/scraper_test.go | 7 + 17 files changed, 848 insertions(+), 25 deletions(-) create mode 100644 .chloggen/mongodbReplicaMetrics.yaml diff --git a/.chloggen/mongodbReplicaMetrics.yaml b/.chloggen/mongodbReplicaMetrics.yaml new file mode 100644 index 000000000000..4ae528669af4 --- /dev/null +++ b/.chloggen/mongodbReplicaMetrics.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: mongodbreceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Added mongodb replica metrics and routing logic for multiple mongodb instances + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [37517] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/receiver/mongodbreceiver/client.go b/receiver/mongodbreceiver/client.go index 1cf92a5a2c79..ba7b2d2715b7 100644 --- a/receiver/mongodbreceiver/client.go +++ b/receiver/mongodbreceiver/client.go @@ -26,6 +26,7 @@ type client interface { DBStats(ctx context.Context, DBName string) (bson.M, error) TopStats(ctx context.Context) (bson.M, error) IndexStats(ctx context.Context, DBName, collectionName string) ([]bson.M, error) + RunCommand(ctx context.Context, db string, command bson.M) (bson.M, error) } // mongodbClient is a mongodb metric scraper client @@ -37,12 +38,11 @@ type mongodbClient struct { // newClient creates a new client to connect and query mongo for the // mongodbreceiver -func newClient(ctx context.Context, config *Config, logger *zap.Logger) (client, error) { - driver, err := mongo.Connect(ctx, config.ClientOptions()) +var newClient = func(ctx context.Context, config *Config, logger *zap.Logger, secondary bool) (client, error) { + driver, err := mongo.Connect(ctx, config.ClientOptions(secondary)) if err != nil { return nil, err } - return &mongodbClient{ cfg: config, logger: logger, diff --git a/receiver/mongodbreceiver/client_test.go b/receiver/mongodbreceiver/client_test.go index 9856e0a6389c..2e7487391a94 100644 --- a/receiver/mongodbreceiver/client_test.go +++ b/receiver/mongodbreceiver/client_test.go @@ -69,6 +69,23 @@ func (fc *fakeClient) IndexStats(ctx context.Context, dbName, collectionName str return args.Get(0).([]bson.M), args.Error(1) } +func (fc *fakeClient) RunCommand(ctx context.Context, db string, command bson.M) (bson.M, error) { + args := fc.Called(ctx, db, command) + if args.Get(0) == nil { + return nil, args.Error(1) + } + + result, ok := args.Get(0).(bson.M) + if !ok { + err := errors.New("mock returned invalid type") + zap.L().Error("type assertion failed", + zap.String("expected", "bson.M")) + return nil, err + } + + return result, args.Error(1) +} + func TestListDatabaseNames(t *testing.T) { mont := mtest.New(t, mtest.NewOptions().ClientType(mtest.Mock)) diff --git a/receiver/mongodbreceiver/config.go b/receiver/mongodbreceiver/config.go index 4d89797d324e..370a77ba24ec 100644 --- a/receiver/mongodbreceiver/config.go +++ b/receiver/mongodbreceiver/config.go @@ -11,6 +11,7 @@ import ( "time" "go.mongodb.org/mongo-driver/mongo/options" + "go.mongodb.org/mongo-driver/mongo/readpref" "go.opentelemetry.io/collector/config/confignet" "go.opentelemetry.io/collector/config/configopaque" "go.opentelemetry.io/collector/config/configtls" @@ -59,7 +60,27 @@ func (c *Config) Validate() error { return err } -func (c *Config) ClientOptions() *options.ClientOptions { +func (c *Config) ClientOptions(secondary bool) *options.ClientOptions { + if secondary { + // For secondary nodes, create a direct connection + clientOptions := options.Client(). + SetHosts(c.hostlist()). + SetDirect(true). + SetReadPreference(readpref.SecondaryPreferred()) + + if c.Timeout > 0 { + clientOptions.SetConnectTimeout(c.Timeout) + } + + if c.Username != "" && c.Password != "" { + clientOptions.SetAuth(options.Credential{ + Username: c.Username, + Password: string(c.Password), + }) + } + + return clientOptions + } clientOptions := options.Client() connString := "mongodb://" + strings.Join(c.hostlist(), ",") clientOptions.ApplyURI(connString) diff --git a/receiver/mongodbreceiver/config_test.go b/receiver/mongodbreceiver/config_test.go index 21f69c114092..b3d45d553b45 100644 --- a/receiver/mongodbreceiver/config_test.go +++ b/receiver/mongodbreceiver/config_test.go @@ -165,7 +165,7 @@ func TestOptions(t *testing.T) { ReplicaSet: "rs-1", } - clientOptions := cfg.ClientOptions() + clientOptions := cfg.ClientOptions(false) require.Equal(t, clientOptions.Auth.Username, cfg.Username) require.Equal(t, clientOptions.ConnectTimeout.Milliseconds(), @@ -191,7 +191,7 @@ func TestOptionsTLS(t *testing.T) { }, }, } - opts := cfg.ClientOptions() + opts := cfg.ClientOptions(false) require.NotNil(t, opts.TLSConfig) } diff --git a/receiver/mongodbreceiver/documentation.md b/receiver/mongodbreceiver/documentation.md index 1a605c560614..48f819e030bc 100644 --- a/receiver/mongodbreceiver/documentation.md +++ b/receiver/mongodbreceiver/documentation.md @@ -340,6 +340,54 @@ The number of replicated operations executed. | ---- | ----------- | ------ | | operation | The MongoDB operation being counted. | Str: ``insert``, ``query``, ``update``, ``delete``, ``getmore``, ``command`` | +### mongodb.repl_commands_per_sec + +The number of replicated commands executed per second. + +| Unit | Metric Type | Value Type | +| ---- | ----------- | ---------- | +| {command}/s | Gauge | Double | + +### mongodb.repl_deletes_per_sec + +The number of replicated deletes executed per second. + +| Unit | Metric Type | Value Type | +| ---- | ----------- | ---------- | +| {delete}/s | Gauge | Double | + +### mongodb.repl_getmores_per_sec + +The number of replicated getmores executed per second. + +| Unit | Metric Type | Value Type | +| ---- | ----------- | ---------- | +| {getmore}/s | Gauge | Double | + +### mongodb.repl_inserts_per_sec + +The number of replicated insertions executed per second. + +| Unit | Metric Type | Value Type | +| ---- | ----------- | ---------- | +| {insert}/s | Gauge | Double | + +### mongodb.repl_queries_per_sec + +The number of replicated queries executed per second. + +| Unit | Metric Type | Value Type | +| ---- | ----------- | ---------- | +| {query}/s | Gauge | Double | + +### mongodb.repl_updates_per_sec + +The number of replicated updates executed per second. + +| Unit | Metric Type | Value Type | +| ---- | ----------- | ---------- | +| {update}/s | Gauge | Double | + ### mongodb.uptime The amount of time that the server has been running. diff --git a/receiver/mongodbreceiver/go.mod b/receiver/mongodbreceiver/go.mod index 4f67cc1e8a88..c234260a22c6 100644 --- a/receiver/mongodbreceiver/go.mod +++ b/receiver/mongodbreceiver/go.mod @@ -11,8 +11,8 @@ require ( github.com/stretchr/testify v1.10.0 github.com/testcontainers/testcontainers-go v0.35.0 go.mongodb.org/mongo-driver v1.17.2 - go.opentelemetry.io/collector/component v0.117.1-0.20250114172347-71aae791d7f8 - go.opentelemetry.io/collector/component/componenttest v0.117.1-0.20250114172347-71aae791d7f8 + go.opentelemetry.io/collector/component v0.118.0 + go.opentelemetry.io/collector/component/componenttest v0.118.0 go.opentelemetry.io/collector/config/confignet v1.23.1-0.20250114172347-71aae791d7f8 go.opentelemetry.io/collector/config/configopaque v1.23.1-0.20250114172347-71aae791d7f8 go.opentelemetry.io/collector/config/configtls v1.23.1-0.20250114172347-71aae791d7f8 @@ -21,10 +21,10 @@ require ( go.opentelemetry.io/collector/consumer/consumertest v0.117.1-0.20250114172347-71aae791d7f8 go.opentelemetry.io/collector/featuregate v1.23.1-0.20250114172347-71aae791d7f8 go.opentelemetry.io/collector/filter v0.117.1-0.20250114172347-71aae791d7f8 - go.opentelemetry.io/collector/pdata v1.23.1-0.20250114172347-71aae791d7f8 + go.opentelemetry.io/collector/pdata v1.24.0 go.opentelemetry.io/collector/receiver v0.117.1-0.20250114172347-71aae791d7f8 go.opentelemetry.io/collector/receiver/receivertest v0.117.1-0.20250114172347-71aae791d7f8 - go.opentelemetry.io/collector/scraper v0.117.1-0.20250114172347-71aae791d7f8 + go.opentelemetry.io/collector/scraper v0.118.0 go.opentelemetry.io/collector/scraper/scraperhelper v0.117.1-0.20250114172347-71aae791d7f8 go.uber.org/goleak v1.3.0 go.uber.org/multierr v1.11.0 @@ -91,11 +91,11 @@ require ( github.com/xdg-go/stringprep v1.0.4 // indirect github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 // indirect github.com/yusufpapurcu/wmi v1.2.4 // indirect - go.opentelemetry.io/collector/config/configtelemetry v0.117.1-0.20250114172347-71aae791d7f8 // indirect + go.opentelemetry.io/collector/config/configtelemetry v0.118.0 // indirect go.opentelemetry.io/collector/consumer/consumererror v0.117.1-0.20250114172347-71aae791d7f8 // indirect go.opentelemetry.io/collector/consumer/xconsumer v0.117.1-0.20250114172347-71aae791d7f8 // indirect go.opentelemetry.io/collector/pdata/pprofile v0.117.1-0.20250114172347-71aae791d7f8 // indirect - go.opentelemetry.io/collector/pipeline v0.117.1-0.20250114172347-71aae791d7f8 // indirect + go.opentelemetry.io/collector/pipeline v0.118.0 // indirect go.opentelemetry.io/collector/receiver/xreceiver v0.117.1-0.20250114172347-71aae791d7f8 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 // indirect go.opentelemetry.io/otel v1.32.0 // indirect diff --git a/receiver/mongodbreceiver/go.sum b/receiver/mongodbreceiver/go.sum index 893268d76398..d270a414b99d 100644 --- a/receiver/mongodbreceiver/go.sum +++ b/receiver/mongodbreceiver/go.sum @@ -153,14 +153,20 @@ go.mongodb.org/mongo-driver v1.17.2 h1:gvZyk8352qSfzyZ2UMWcpDpMSGEr1eqE4T793Sqyh go.mongodb.org/mongo-driver v1.17.2/go.mod h1:Hy04i7O2kC4RS06ZrhPRqj/u4DTYkFDAAccj+rVKqgQ= go.opentelemetry.io/collector/component v0.117.1-0.20250114172347-71aae791d7f8 h1:8ez4jqjh9bL3HJ0w5lyLQG8KGaIZeoWBCo2SmWzxH/s= go.opentelemetry.io/collector/component v0.117.1-0.20250114172347-71aae791d7f8/go.mod h1:WEjJIJerT8OMT63dIwO5qvjikUdTn0wfPmLemCMzuOs= +go.opentelemetry.io/collector/component v0.118.0 h1:sSO/ObxJ+yH77Z4DmT1mlSuxhbgUmY1ztt7xCA1F/8w= +go.opentelemetry.io/collector/component v0.118.0/go.mod h1:LUJ3AL2b+tmFr3hZol3hzKzCMvNdqNq0M5CF3SWdv4M= go.opentelemetry.io/collector/component/componenttest v0.117.1-0.20250114172347-71aae791d7f8 h1:XqmRmP9zpwjFhvXgyBmizpG9pXRvbhebXRTsjSmKtjA= go.opentelemetry.io/collector/component/componenttest v0.117.1-0.20250114172347-71aae791d7f8/go.mod h1:RXXMJaRdf7aQNPOEK610lHpKGMCToz/xpa8wOwylO5c= +go.opentelemetry.io/collector/component/componenttest v0.118.0 h1:knEHckoiL2fEWSIc0iehg39zP4IXzi9sHa45O+oxKo8= +go.opentelemetry.io/collector/component/componenttest v0.118.0/go.mod h1:aHc7t7zVwCpbhrWIWY+GMuaMxMCUP8C8P7pJOt8r/vU= go.opentelemetry.io/collector/config/confignet v1.23.1-0.20250114172347-71aae791d7f8 h1:zRwqBWx0xNIRx4TlN9R/YbV8ZtGY4/31AOAT3N+D98U= go.opentelemetry.io/collector/config/confignet v1.23.1-0.20250114172347-71aae791d7f8/go.mod h1:ZppUH1hgUJOubawEsxsQ9MzEYFytqo2GnVSS7d4CVxc= go.opentelemetry.io/collector/config/configopaque v1.23.1-0.20250114172347-71aae791d7f8 h1:AsRtsuEHotwTX1plB8cvjXhjfutpQ0kCt+nnA6BZ0wU= go.opentelemetry.io/collector/config/configopaque v1.23.1-0.20250114172347-71aae791d7f8/go.mod h1:sW0t0iI/VfRL9VYX7Ik6XzVgPcR+Y5kejTLsYcMyDWs= go.opentelemetry.io/collector/config/configtelemetry v0.117.1-0.20250114172347-71aae791d7f8 h1:ZGItI3UNIAJNuMON4q750zPhjATyucKzMxVj+g2B8+I= go.opentelemetry.io/collector/config/configtelemetry v0.117.1-0.20250114172347-71aae791d7f8/go.mod h1:SlBEwQg0qly75rXZ6W1Ig8jN25KBVBkFIIAUI1GiAAE= +go.opentelemetry.io/collector/config/configtelemetry v0.118.0 h1:UlN46EViG2X42odWtXgWaqY7Y01ZKpsnswSwXTWx5mM= +go.opentelemetry.io/collector/config/configtelemetry v0.118.0/go.mod h1:SlBEwQg0qly75rXZ6W1Ig8jN25KBVBkFIIAUI1GiAAE= go.opentelemetry.io/collector/config/configtls v1.23.1-0.20250114172347-71aae791d7f8 h1:BP8qOBYl22g+gu3B9x4RFIj/NCn3nRsvC/2aRTp3ryo= go.opentelemetry.io/collector/config/configtls v1.23.1-0.20250114172347-71aae791d7f8/go.mod h1:cjMoqKm4MX9sc9qyEW5/kRepiKLuDYqFofGa0f/rqFE= go.opentelemetry.io/collector/confmap v1.23.1-0.20250114172347-71aae791d7f8 h1:GIHOyMs+I2mU2gHGmf5LoCFUYxlsCqOhmr/T2yzDcdk= @@ -179,12 +185,16 @@ go.opentelemetry.io/collector/filter v0.117.1-0.20250114172347-71aae791d7f8 h1:8 go.opentelemetry.io/collector/filter v0.117.1-0.20250114172347-71aae791d7f8/go.mod h1:MUyBRsy3IKVixSWIfA9BXzHhx24LHo4y38uQmFlqddA= go.opentelemetry.io/collector/pdata v1.23.1-0.20250114172347-71aae791d7f8 h1:H788KMJ8eG/44EZh33caRnJFu505JfQTwqHE9Kn+Wd8= go.opentelemetry.io/collector/pdata v1.23.1-0.20250114172347-71aae791d7f8/go.mod h1:cf3/W9E/uIvPS4MR26SnMFJhraUCattzzM6qusuONuc= +go.opentelemetry.io/collector/pdata v1.24.0 h1:D6j92eAzmAbQgivNBUnt8r9juOl8ugb+ihYynoFZIEg= +go.opentelemetry.io/collector/pdata v1.24.0/go.mod h1:cf3/W9E/uIvPS4MR26SnMFJhraUCattzzM6qusuONuc= go.opentelemetry.io/collector/pdata/pprofile v0.117.1-0.20250114172347-71aae791d7f8 h1:QXGUBw/2ebklQBwUffnv5XR9eI44qNfSrY3Ik+mWLiU= go.opentelemetry.io/collector/pdata/pprofile v0.117.1-0.20250114172347-71aae791d7f8/go.mod h1:7aB/796MdPcm5G3vyX7mI/9ttGNvOzyDYSAKtxOe/8E= go.opentelemetry.io/collector/pdata/testdata v0.117.0 h1:ainpacShKHaDkPK6lcvgJ0aPKYUD/E3+I0gYJZleedo= go.opentelemetry.io/collector/pdata/testdata v0.117.0/go.mod h1:LZAymmRKHQEqJqJUSO15rej3+V1rNRyBMF5mWCKCMBY= go.opentelemetry.io/collector/pipeline v0.117.1-0.20250114172347-71aae791d7f8 h1:oyC5rdSmmhoDoVk9L019cnka+Cf3oP0vRnQO3TiVwNQ= go.opentelemetry.io/collector/pipeline v0.117.1-0.20250114172347-71aae791d7f8/go.mod h1:qE3DmoB05AW0C3lmPvdxZqd/H4po84NPzd5MrqgtL74= +go.opentelemetry.io/collector/pipeline v0.118.0 h1:RI1DMe7L0+5hGkx0EDGxG00TaJoh96MEQppgOlGx1Oc= +go.opentelemetry.io/collector/pipeline v0.118.0/go.mod h1:qE3DmoB05AW0C3lmPvdxZqd/H4po84NPzd5MrqgtL74= go.opentelemetry.io/collector/receiver v0.117.1-0.20250114172347-71aae791d7f8 h1:G0cEWKJQW1xQUwmmJarAGzoMo/d73DRPzTmKpHWT6Rs= go.opentelemetry.io/collector/receiver v0.117.1-0.20250114172347-71aae791d7f8/go.mod h1:s6KBdRSFC9UD1Ssmb6Eq87Ifnd8/Z8N1K66htGeXC3s= go.opentelemetry.io/collector/receiver/receivertest v0.117.1-0.20250114172347-71aae791d7f8 h1:HtTZsxZUF2d6lDR/lxomt8dILuaezp/rNf01A3hK+SE= @@ -193,6 +203,8 @@ go.opentelemetry.io/collector/receiver/xreceiver v0.117.1-0.20250114172347-71aae go.opentelemetry.io/collector/receiver/xreceiver v0.117.1-0.20250114172347-71aae791d7f8/go.mod h1:rwYdewYxwosce0t/hZNBkiAgg4M51+Kmo1V3TZQ2Jso= go.opentelemetry.io/collector/scraper v0.117.1-0.20250114172347-71aae791d7f8 h1:5IY1+jbTk83YPj7nzEzb/7A18IPkd1Sut+vZLlpJOGE= go.opentelemetry.io/collector/scraper v0.117.1-0.20250114172347-71aae791d7f8/go.mod h1:EPk7kbf8/222ni0zQJ/a9qBSx2Tjay6yzAg1BNDt8hU= +go.opentelemetry.io/collector/scraper v0.118.0 h1:944QgQVZ7PM0L9WIwgRPY0LbbHX5qsk2x4uxDO1IOAQ= +go.opentelemetry.io/collector/scraper v0.118.0/go.mod h1:wIa4bIqiU9bkeg3v5QQybwz1+K5DjrP1Afc13Kt22Cw= go.opentelemetry.io/collector/scraper/scraperhelper v0.117.1-0.20250114172347-71aae791d7f8 h1:l1Y5Esp7Qf+TYIl/Z3UHvWYfB1bGImpE5D7CNtymcb4= go.opentelemetry.io/collector/scraper/scraperhelper v0.117.1-0.20250114172347-71aae791d7f8/go.mod h1:uF/wyxA3QvJlrE737LSOo8hn+R/A46myrRRK4DbO9D4= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 h1:jq9TW8u3so/bN+JPT166wjOI6/vQPF6Xe7nMNIltagk= diff --git a/receiver/mongodbreceiver/internal/metadata/generated_config.go b/receiver/mongodbreceiver/internal/metadata/generated_config.go index ab438c54658f..7531be6f6343 100644 --- a/receiver/mongodbreceiver/internal/metadata/generated_config.go +++ b/receiver/mongodbreceiver/internal/metadata/generated_config.go @@ -55,6 +55,12 @@ type MetricsConfig struct { MongodbOperationLatencyTime MetricConfig `mapstructure:"mongodb.operation.latency.time"` MongodbOperationReplCount MetricConfig `mapstructure:"mongodb.operation.repl.count"` MongodbOperationTime MetricConfig `mapstructure:"mongodb.operation.time"` + MongodbReplCommandsPerSec MetricConfig `mapstructure:"mongodb.repl_commands_per_sec"` + MongodbReplDeletesPerSec MetricConfig `mapstructure:"mongodb.repl_deletes_per_sec"` + MongodbReplGetmoresPerSec MetricConfig `mapstructure:"mongodb.repl_getmores_per_sec"` + MongodbReplInsertsPerSec MetricConfig `mapstructure:"mongodb.repl_inserts_per_sec"` + MongodbReplQueriesPerSec MetricConfig `mapstructure:"mongodb.repl_queries_per_sec"` + MongodbReplUpdatesPerSec MetricConfig `mapstructure:"mongodb.repl_updates_per_sec"` MongodbSessionCount MetricConfig `mapstructure:"mongodb.session.count"` MongodbStorageSize MetricConfig `mapstructure:"mongodb.storage.size"` MongodbUptime MetricConfig `mapstructure:"mongodb.uptime"` @@ -143,6 +149,24 @@ func DefaultMetricsConfig() MetricsConfig { MongodbOperationTime: MetricConfig{ Enabled: true, }, + MongodbReplCommandsPerSec: MetricConfig{ + Enabled: false, + }, + MongodbReplDeletesPerSec: MetricConfig{ + Enabled: false, + }, + MongodbReplGetmoresPerSec: MetricConfig{ + Enabled: false, + }, + MongodbReplInsertsPerSec: MetricConfig{ + Enabled: false, + }, + MongodbReplQueriesPerSec: MetricConfig{ + Enabled: false, + }, + MongodbReplUpdatesPerSec: MetricConfig{ + Enabled: false, + }, MongodbSessionCount: MetricConfig{ Enabled: true, }, diff --git a/receiver/mongodbreceiver/internal/metadata/generated_config_test.go b/receiver/mongodbreceiver/internal/metadata/generated_config_test.go index 8d245c9849ef..a2d9e13f578e 100644 --- a/receiver/mongodbreceiver/internal/metadata/generated_config_test.go +++ b/receiver/mongodbreceiver/internal/metadata/generated_config_test.go @@ -52,6 +52,12 @@ func TestMetricsBuilderConfig(t *testing.T) { MongodbOperationLatencyTime: MetricConfig{Enabled: true}, MongodbOperationReplCount: MetricConfig{Enabled: true}, MongodbOperationTime: MetricConfig{Enabled: true}, + MongodbReplCommandsPerSec: MetricConfig{Enabled: true}, + MongodbReplDeletesPerSec: MetricConfig{Enabled: true}, + MongodbReplGetmoresPerSec: MetricConfig{Enabled: true}, + MongodbReplInsertsPerSec: MetricConfig{Enabled: true}, + MongodbReplQueriesPerSec: MetricConfig{Enabled: true}, + MongodbReplUpdatesPerSec: MetricConfig{Enabled: true}, MongodbSessionCount: MetricConfig{Enabled: true}, MongodbStorageSize: MetricConfig{Enabled: true}, MongodbUptime: MetricConfig{Enabled: true}, @@ -94,6 +100,12 @@ func TestMetricsBuilderConfig(t *testing.T) { MongodbOperationLatencyTime: MetricConfig{Enabled: false}, MongodbOperationReplCount: MetricConfig{Enabled: false}, MongodbOperationTime: MetricConfig{Enabled: false}, + MongodbReplCommandsPerSec: MetricConfig{Enabled: false}, + MongodbReplDeletesPerSec: MetricConfig{Enabled: false}, + MongodbReplGetmoresPerSec: MetricConfig{Enabled: false}, + MongodbReplInsertsPerSec: MetricConfig{Enabled: false}, + MongodbReplQueriesPerSec: MetricConfig{Enabled: false}, + MongodbReplUpdatesPerSec: MetricConfig{Enabled: false}, MongodbSessionCount: MetricConfig{Enabled: false}, MongodbStorageSize: MetricConfig{Enabled: false}, MongodbUptime: MetricConfig{Enabled: false}, diff --git a/receiver/mongodbreceiver/internal/metadata/generated_metrics.go b/receiver/mongodbreceiver/internal/metadata/generated_metrics.go index 87533a58bc73..8df8c57f074d 100644 --- a/receiver/mongodbreceiver/internal/metadata/generated_metrics.go +++ b/receiver/mongodbreceiver/internal/metadata/generated_metrics.go @@ -1653,6 +1653,300 @@ func newMetricMongodbOperationTime(cfg MetricConfig) metricMongodbOperationTime return m } +type metricMongodbReplCommandsPerSec struct { + data pmetric.Metric // data buffer for generated metric. + config MetricConfig // metric config provided by user. + capacity int // max observed number of data points added to the metric. +} + +// init fills mongodb.repl_commands_per_sec metric with initial data. +func (m *metricMongodbReplCommandsPerSec) init() { + m.data.SetName("mongodb.repl_commands_per_sec") + m.data.SetDescription("The number of replicated commands executed per second.") + m.data.SetUnit("{command}/s") + m.data.SetEmptyGauge() +} + +func (m *metricMongodbReplCommandsPerSec) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val float64) { + if !m.config.Enabled { + return + } + dp := m.data.Gauge().DataPoints().AppendEmpty() + dp.SetStartTimestamp(start) + dp.SetTimestamp(ts) + dp.SetDoubleValue(val) +} + +// updateCapacity saves max length of data point slices that will be used for the slice capacity. +func (m *metricMongodbReplCommandsPerSec) updateCapacity() { + if m.data.Gauge().DataPoints().Len() > m.capacity { + m.capacity = m.data.Gauge().DataPoints().Len() + } +} + +// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points. +func (m *metricMongodbReplCommandsPerSec) emit(metrics pmetric.MetricSlice) { + if m.config.Enabled && m.data.Gauge().DataPoints().Len() > 0 { + m.updateCapacity() + m.data.MoveTo(metrics.AppendEmpty()) + m.init() + } +} + +func newMetricMongodbReplCommandsPerSec(cfg MetricConfig) metricMongodbReplCommandsPerSec { + m := metricMongodbReplCommandsPerSec{config: cfg} + if cfg.Enabled { + m.data = pmetric.NewMetric() + m.init() + } + return m +} + +type metricMongodbReplDeletesPerSec struct { + data pmetric.Metric // data buffer for generated metric. + config MetricConfig // metric config provided by user. + capacity int // max observed number of data points added to the metric. +} + +// init fills mongodb.repl_deletes_per_sec metric with initial data. +func (m *metricMongodbReplDeletesPerSec) init() { + m.data.SetName("mongodb.repl_deletes_per_sec") + m.data.SetDescription("The number of replicated deletes executed per second.") + m.data.SetUnit("{delete}/s") + m.data.SetEmptyGauge() +} + +func (m *metricMongodbReplDeletesPerSec) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val float64) { + if !m.config.Enabled { + return + } + dp := m.data.Gauge().DataPoints().AppendEmpty() + dp.SetStartTimestamp(start) + dp.SetTimestamp(ts) + dp.SetDoubleValue(val) +} + +// updateCapacity saves max length of data point slices that will be used for the slice capacity. +func (m *metricMongodbReplDeletesPerSec) updateCapacity() { + if m.data.Gauge().DataPoints().Len() > m.capacity { + m.capacity = m.data.Gauge().DataPoints().Len() + } +} + +// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points. +func (m *metricMongodbReplDeletesPerSec) emit(metrics pmetric.MetricSlice) { + if m.config.Enabled && m.data.Gauge().DataPoints().Len() > 0 { + m.updateCapacity() + m.data.MoveTo(metrics.AppendEmpty()) + m.init() + } +} + +func newMetricMongodbReplDeletesPerSec(cfg MetricConfig) metricMongodbReplDeletesPerSec { + m := metricMongodbReplDeletesPerSec{config: cfg} + if cfg.Enabled { + m.data = pmetric.NewMetric() + m.init() + } + return m +} + +type metricMongodbReplGetmoresPerSec struct { + data pmetric.Metric // data buffer for generated metric. + config MetricConfig // metric config provided by user. + capacity int // max observed number of data points added to the metric. +} + +// init fills mongodb.repl_getmores_per_sec metric with initial data. +func (m *metricMongodbReplGetmoresPerSec) init() { + m.data.SetName("mongodb.repl_getmores_per_sec") + m.data.SetDescription("The number of replicated getmores executed per second.") + m.data.SetUnit("{getmore}/s") + m.data.SetEmptyGauge() +} + +func (m *metricMongodbReplGetmoresPerSec) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val float64) { + if !m.config.Enabled { + return + } + dp := m.data.Gauge().DataPoints().AppendEmpty() + dp.SetStartTimestamp(start) + dp.SetTimestamp(ts) + dp.SetDoubleValue(val) +} + +// updateCapacity saves max length of data point slices that will be used for the slice capacity. +func (m *metricMongodbReplGetmoresPerSec) updateCapacity() { + if m.data.Gauge().DataPoints().Len() > m.capacity { + m.capacity = m.data.Gauge().DataPoints().Len() + } +} + +// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points. +func (m *metricMongodbReplGetmoresPerSec) emit(metrics pmetric.MetricSlice) { + if m.config.Enabled && m.data.Gauge().DataPoints().Len() > 0 { + m.updateCapacity() + m.data.MoveTo(metrics.AppendEmpty()) + m.init() + } +} + +func newMetricMongodbReplGetmoresPerSec(cfg MetricConfig) metricMongodbReplGetmoresPerSec { + m := metricMongodbReplGetmoresPerSec{config: cfg} + if cfg.Enabled { + m.data = pmetric.NewMetric() + m.init() + } + return m +} + +type metricMongodbReplInsertsPerSec struct { + data pmetric.Metric // data buffer for generated metric. + config MetricConfig // metric config provided by user. + capacity int // max observed number of data points added to the metric. +} + +// init fills mongodb.repl_inserts_per_sec metric with initial data. +func (m *metricMongodbReplInsertsPerSec) init() { + m.data.SetName("mongodb.repl_inserts_per_sec") + m.data.SetDescription("The number of replicated insertions executed per second.") + m.data.SetUnit("{insert}/s") + m.data.SetEmptyGauge() +} + +func (m *metricMongodbReplInsertsPerSec) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val float64) { + if !m.config.Enabled { + return + } + dp := m.data.Gauge().DataPoints().AppendEmpty() + dp.SetStartTimestamp(start) + dp.SetTimestamp(ts) + dp.SetDoubleValue(val) +} + +// updateCapacity saves max length of data point slices that will be used for the slice capacity. +func (m *metricMongodbReplInsertsPerSec) updateCapacity() { + if m.data.Gauge().DataPoints().Len() > m.capacity { + m.capacity = m.data.Gauge().DataPoints().Len() + } +} + +// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points. +func (m *metricMongodbReplInsertsPerSec) emit(metrics pmetric.MetricSlice) { + if m.config.Enabled && m.data.Gauge().DataPoints().Len() > 0 { + m.updateCapacity() + m.data.MoveTo(metrics.AppendEmpty()) + m.init() + } +} + +func newMetricMongodbReplInsertsPerSec(cfg MetricConfig) metricMongodbReplInsertsPerSec { + m := metricMongodbReplInsertsPerSec{config: cfg} + if cfg.Enabled { + m.data = pmetric.NewMetric() + m.init() + } + return m +} + +type metricMongodbReplQueriesPerSec struct { + data pmetric.Metric // data buffer for generated metric. + config MetricConfig // metric config provided by user. + capacity int // max observed number of data points added to the metric. +} + +// init fills mongodb.repl_queries_per_sec metric with initial data. +func (m *metricMongodbReplQueriesPerSec) init() { + m.data.SetName("mongodb.repl_queries_per_sec") + m.data.SetDescription("The number of replicated queries executed per second.") + m.data.SetUnit("{query}/s") + m.data.SetEmptyGauge() +} + +func (m *metricMongodbReplQueriesPerSec) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val float64) { + if !m.config.Enabled { + return + } + dp := m.data.Gauge().DataPoints().AppendEmpty() + dp.SetStartTimestamp(start) + dp.SetTimestamp(ts) + dp.SetDoubleValue(val) +} + +// updateCapacity saves max length of data point slices that will be used for the slice capacity. +func (m *metricMongodbReplQueriesPerSec) updateCapacity() { + if m.data.Gauge().DataPoints().Len() > m.capacity { + m.capacity = m.data.Gauge().DataPoints().Len() + } +} + +// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points. +func (m *metricMongodbReplQueriesPerSec) emit(metrics pmetric.MetricSlice) { + if m.config.Enabled && m.data.Gauge().DataPoints().Len() > 0 { + m.updateCapacity() + m.data.MoveTo(metrics.AppendEmpty()) + m.init() + } +} + +func newMetricMongodbReplQueriesPerSec(cfg MetricConfig) metricMongodbReplQueriesPerSec { + m := metricMongodbReplQueriesPerSec{config: cfg} + if cfg.Enabled { + m.data = pmetric.NewMetric() + m.init() + } + return m +} + +type metricMongodbReplUpdatesPerSec struct { + data pmetric.Metric // data buffer for generated metric. + config MetricConfig // metric config provided by user. + capacity int // max observed number of data points added to the metric. +} + +// init fills mongodb.repl_updates_per_sec metric with initial data. +func (m *metricMongodbReplUpdatesPerSec) init() { + m.data.SetName("mongodb.repl_updates_per_sec") + m.data.SetDescription("The number of replicated updates executed per second.") + m.data.SetUnit("{update}/s") + m.data.SetEmptyGauge() +} + +func (m *metricMongodbReplUpdatesPerSec) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val float64) { + if !m.config.Enabled { + return + } + dp := m.data.Gauge().DataPoints().AppendEmpty() + dp.SetStartTimestamp(start) + dp.SetTimestamp(ts) + dp.SetDoubleValue(val) +} + +// updateCapacity saves max length of data point slices that will be used for the slice capacity. +func (m *metricMongodbReplUpdatesPerSec) updateCapacity() { + if m.data.Gauge().DataPoints().Len() > m.capacity { + m.capacity = m.data.Gauge().DataPoints().Len() + } +} + +// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points. +func (m *metricMongodbReplUpdatesPerSec) emit(metrics pmetric.MetricSlice) { + if m.config.Enabled && m.data.Gauge().DataPoints().Len() > 0 { + m.updateCapacity() + m.data.MoveTo(metrics.AppendEmpty()) + m.init() + } +} + +func newMetricMongodbReplUpdatesPerSec(cfg MetricConfig) metricMongodbReplUpdatesPerSec { + m := metricMongodbReplUpdatesPerSec{config: cfg} + if cfg.Enabled { + m.data = pmetric.NewMetric() + m.init() + } + return m +} + type metricMongodbSessionCount struct { data pmetric.Metric // data buffer for generated metric. config MetricConfig // metric config provided by user. @@ -1843,6 +2137,12 @@ type MetricsBuilder struct { metricMongodbOperationLatencyTime metricMongodbOperationLatencyTime metricMongodbOperationReplCount metricMongodbOperationReplCount metricMongodbOperationTime metricMongodbOperationTime + metricMongodbReplCommandsPerSec metricMongodbReplCommandsPerSec + metricMongodbReplDeletesPerSec metricMongodbReplDeletesPerSec + metricMongodbReplGetmoresPerSec metricMongodbReplGetmoresPerSec + metricMongodbReplInsertsPerSec metricMongodbReplInsertsPerSec + metricMongodbReplQueriesPerSec metricMongodbReplQueriesPerSec + metricMongodbReplUpdatesPerSec metricMongodbReplUpdatesPerSec metricMongodbSessionCount metricMongodbSessionCount metricMongodbStorageSize metricMongodbStorageSize metricMongodbUptime metricMongodbUptime @@ -1899,6 +2199,12 @@ func NewMetricsBuilder(mbc MetricsBuilderConfig, settings receiver.Settings, opt metricMongodbOperationLatencyTime: newMetricMongodbOperationLatencyTime(mbc.Metrics.MongodbOperationLatencyTime), metricMongodbOperationReplCount: newMetricMongodbOperationReplCount(mbc.Metrics.MongodbOperationReplCount), metricMongodbOperationTime: newMetricMongodbOperationTime(mbc.Metrics.MongodbOperationTime), + metricMongodbReplCommandsPerSec: newMetricMongodbReplCommandsPerSec(mbc.Metrics.MongodbReplCommandsPerSec), + metricMongodbReplDeletesPerSec: newMetricMongodbReplDeletesPerSec(mbc.Metrics.MongodbReplDeletesPerSec), + metricMongodbReplGetmoresPerSec: newMetricMongodbReplGetmoresPerSec(mbc.Metrics.MongodbReplGetmoresPerSec), + metricMongodbReplInsertsPerSec: newMetricMongodbReplInsertsPerSec(mbc.Metrics.MongodbReplInsertsPerSec), + metricMongodbReplQueriesPerSec: newMetricMongodbReplQueriesPerSec(mbc.Metrics.MongodbReplQueriesPerSec), + metricMongodbReplUpdatesPerSec: newMetricMongodbReplUpdatesPerSec(mbc.Metrics.MongodbReplUpdatesPerSec), metricMongodbSessionCount: newMetricMongodbSessionCount(mbc.Metrics.MongodbSessionCount), metricMongodbStorageSize: newMetricMongodbStorageSize(mbc.Metrics.MongodbStorageSize), metricMongodbUptime: newMetricMongodbUptime(mbc.Metrics.MongodbUptime), @@ -2019,6 +2325,12 @@ func (mb *MetricsBuilder) EmitForResource(options ...ResourceMetricsOption) { mb.metricMongodbOperationLatencyTime.emit(ils.Metrics()) mb.metricMongodbOperationReplCount.emit(ils.Metrics()) mb.metricMongodbOperationTime.emit(ils.Metrics()) + mb.metricMongodbReplCommandsPerSec.emit(ils.Metrics()) + mb.metricMongodbReplDeletesPerSec.emit(ils.Metrics()) + mb.metricMongodbReplGetmoresPerSec.emit(ils.Metrics()) + mb.metricMongodbReplInsertsPerSec.emit(ils.Metrics()) + mb.metricMongodbReplQueriesPerSec.emit(ils.Metrics()) + mb.metricMongodbReplUpdatesPerSec.emit(ils.Metrics()) mb.metricMongodbSessionCount.emit(ils.Metrics()) mb.metricMongodbStorageSize.emit(ils.Metrics()) mb.metricMongodbUptime.emit(ils.Metrics()) @@ -2188,6 +2500,36 @@ func (mb *MetricsBuilder) RecordMongodbOperationTimeDataPoint(ts pcommon.Timesta mb.metricMongodbOperationTime.recordDataPoint(mb.startTime, ts, val, operationAttributeValue.String()) } +// RecordMongodbReplCommandsPerSecDataPoint adds a data point to mongodb.repl_commands_per_sec metric. +func (mb *MetricsBuilder) RecordMongodbReplCommandsPerSecDataPoint(ts pcommon.Timestamp, val float64) { + mb.metricMongodbReplCommandsPerSec.recordDataPoint(mb.startTime, ts, val) +} + +// RecordMongodbReplDeletesPerSecDataPoint adds a data point to mongodb.repl_deletes_per_sec metric. +func (mb *MetricsBuilder) RecordMongodbReplDeletesPerSecDataPoint(ts pcommon.Timestamp, val float64) { + mb.metricMongodbReplDeletesPerSec.recordDataPoint(mb.startTime, ts, val) +} + +// RecordMongodbReplGetmoresPerSecDataPoint adds a data point to mongodb.repl_getmores_per_sec metric. +func (mb *MetricsBuilder) RecordMongodbReplGetmoresPerSecDataPoint(ts pcommon.Timestamp, val float64) { + mb.metricMongodbReplGetmoresPerSec.recordDataPoint(mb.startTime, ts, val) +} + +// RecordMongodbReplInsertsPerSecDataPoint adds a data point to mongodb.repl_inserts_per_sec metric. +func (mb *MetricsBuilder) RecordMongodbReplInsertsPerSecDataPoint(ts pcommon.Timestamp, val float64) { + mb.metricMongodbReplInsertsPerSec.recordDataPoint(mb.startTime, ts, val) +} + +// RecordMongodbReplQueriesPerSecDataPoint adds a data point to mongodb.repl_queries_per_sec metric. +func (mb *MetricsBuilder) RecordMongodbReplQueriesPerSecDataPoint(ts pcommon.Timestamp, val float64) { + mb.metricMongodbReplQueriesPerSec.recordDataPoint(mb.startTime, ts, val) +} + +// RecordMongodbReplUpdatesPerSecDataPoint adds a data point to mongodb.repl_updates_per_sec metric. +func (mb *MetricsBuilder) RecordMongodbReplUpdatesPerSecDataPoint(ts pcommon.Timestamp, val float64) { + mb.metricMongodbReplUpdatesPerSec.recordDataPoint(mb.startTime, ts, val) +} + // RecordMongodbSessionCountDataPoint adds a data point to mongodb.session.count metric. func (mb *MetricsBuilder) RecordMongodbSessionCountDataPoint(ts pcommon.Timestamp, val int64) { mb.metricMongodbSessionCount.recordDataPoint(mb.startTime, ts, val) diff --git a/receiver/mongodbreceiver/internal/metadata/generated_metrics_test.go b/receiver/mongodbreceiver/internal/metadata/generated_metrics_test.go index 86b91fd60500..c9cf9444b89c 100644 --- a/receiver/mongodbreceiver/internal/metadata/generated_metrics_test.go +++ b/receiver/mongodbreceiver/internal/metadata/generated_metrics_test.go @@ -169,6 +169,24 @@ func TestMetricsBuilder(t *testing.T) { allMetricsCount++ mb.RecordMongodbOperationTimeDataPoint(ts, 1, AttributeOperationInsert) + allMetricsCount++ + mb.RecordMongodbReplCommandsPerSecDataPoint(ts, 1) + + allMetricsCount++ + mb.RecordMongodbReplDeletesPerSecDataPoint(ts, 1) + + allMetricsCount++ + mb.RecordMongodbReplGetmoresPerSecDataPoint(ts, 1) + + allMetricsCount++ + mb.RecordMongodbReplInsertsPerSecDataPoint(ts, 1) + + allMetricsCount++ + mb.RecordMongodbReplQueriesPerSecDataPoint(ts, 1) + + allMetricsCount++ + mb.RecordMongodbReplUpdatesPerSecDataPoint(ts, 1) + defaultMetricsCount++ allMetricsCount++ mb.RecordMongodbSessionCountDataPoint(ts, 1) @@ -631,6 +649,78 @@ func TestMetricsBuilder(t *testing.T) { attrVal, ok := dp.Attributes().Get("operation") assert.True(t, ok) assert.EqualValues(t, "insert", attrVal.Str()) + case "mongodb.repl_commands_per_sec": + assert.False(t, validatedMetrics["mongodb.repl_commands_per_sec"], "Found a duplicate in the metrics slice: mongodb.repl_commands_per_sec") + validatedMetrics["mongodb.repl_commands_per_sec"] = true + assert.Equal(t, pmetric.MetricTypeGauge, ms.At(i).Type()) + assert.Equal(t, 1, ms.At(i).Gauge().DataPoints().Len()) + assert.Equal(t, "The number of replicated commands executed per second.", ms.At(i).Description()) + assert.Equal(t, "{command}/s", ms.At(i).Unit()) + dp := ms.At(i).Gauge().DataPoints().At(0) + assert.Equal(t, start, dp.StartTimestamp()) + assert.Equal(t, ts, dp.Timestamp()) + assert.Equal(t, pmetric.NumberDataPointValueTypeDouble, dp.ValueType()) + assert.InDelta(t, float64(1), dp.DoubleValue(), 0.01) + case "mongodb.repl_deletes_per_sec": + assert.False(t, validatedMetrics["mongodb.repl_deletes_per_sec"], "Found a duplicate in the metrics slice: mongodb.repl_deletes_per_sec") + validatedMetrics["mongodb.repl_deletes_per_sec"] = true + assert.Equal(t, pmetric.MetricTypeGauge, ms.At(i).Type()) + assert.Equal(t, 1, ms.At(i).Gauge().DataPoints().Len()) + assert.Equal(t, "The number of replicated deletes executed per second.", ms.At(i).Description()) + assert.Equal(t, "{delete}/s", ms.At(i).Unit()) + dp := ms.At(i).Gauge().DataPoints().At(0) + assert.Equal(t, start, dp.StartTimestamp()) + assert.Equal(t, ts, dp.Timestamp()) + assert.Equal(t, pmetric.NumberDataPointValueTypeDouble, dp.ValueType()) + assert.InDelta(t, float64(1), dp.DoubleValue(), 0.01) + case "mongodb.repl_getmores_per_sec": + assert.False(t, validatedMetrics["mongodb.repl_getmores_per_sec"], "Found a duplicate in the metrics slice: mongodb.repl_getmores_per_sec") + validatedMetrics["mongodb.repl_getmores_per_sec"] = true + assert.Equal(t, pmetric.MetricTypeGauge, ms.At(i).Type()) + assert.Equal(t, 1, ms.At(i).Gauge().DataPoints().Len()) + assert.Equal(t, "The number of replicated getmores executed per second.", ms.At(i).Description()) + assert.Equal(t, "{getmore}/s", ms.At(i).Unit()) + dp := ms.At(i).Gauge().DataPoints().At(0) + assert.Equal(t, start, dp.StartTimestamp()) + assert.Equal(t, ts, dp.Timestamp()) + assert.Equal(t, pmetric.NumberDataPointValueTypeDouble, dp.ValueType()) + assert.InDelta(t, float64(1), dp.DoubleValue(), 0.01) + case "mongodb.repl_inserts_per_sec": + assert.False(t, validatedMetrics["mongodb.repl_inserts_per_sec"], "Found a duplicate in the metrics slice: mongodb.repl_inserts_per_sec") + validatedMetrics["mongodb.repl_inserts_per_sec"] = true + assert.Equal(t, pmetric.MetricTypeGauge, ms.At(i).Type()) + assert.Equal(t, 1, ms.At(i).Gauge().DataPoints().Len()) + assert.Equal(t, "The number of replicated insertions executed per second.", ms.At(i).Description()) + assert.Equal(t, "{insert}/s", ms.At(i).Unit()) + dp := ms.At(i).Gauge().DataPoints().At(0) + assert.Equal(t, start, dp.StartTimestamp()) + assert.Equal(t, ts, dp.Timestamp()) + assert.Equal(t, pmetric.NumberDataPointValueTypeDouble, dp.ValueType()) + assert.InDelta(t, float64(1), dp.DoubleValue(), 0.01) + case "mongodb.repl_queries_per_sec": + assert.False(t, validatedMetrics["mongodb.repl_queries_per_sec"], "Found a duplicate in the metrics slice: mongodb.repl_queries_per_sec") + validatedMetrics["mongodb.repl_queries_per_sec"] = true + assert.Equal(t, pmetric.MetricTypeGauge, ms.At(i).Type()) + assert.Equal(t, 1, ms.At(i).Gauge().DataPoints().Len()) + assert.Equal(t, "The number of replicated queries executed per second.", ms.At(i).Description()) + assert.Equal(t, "{query}/s", ms.At(i).Unit()) + dp := ms.At(i).Gauge().DataPoints().At(0) + assert.Equal(t, start, dp.StartTimestamp()) + assert.Equal(t, ts, dp.Timestamp()) + assert.Equal(t, pmetric.NumberDataPointValueTypeDouble, dp.ValueType()) + assert.InDelta(t, float64(1), dp.DoubleValue(), 0.01) + case "mongodb.repl_updates_per_sec": + assert.False(t, validatedMetrics["mongodb.repl_updates_per_sec"], "Found a duplicate in the metrics slice: mongodb.repl_updates_per_sec") + validatedMetrics["mongodb.repl_updates_per_sec"] = true + assert.Equal(t, pmetric.MetricTypeGauge, ms.At(i).Type()) + assert.Equal(t, 1, ms.At(i).Gauge().DataPoints().Len()) + assert.Equal(t, "The number of replicated updates executed per second.", ms.At(i).Description()) + assert.Equal(t, "{update}/s", ms.At(i).Unit()) + dp := ms.At(i).Gauge().DataPoints().At(0) + assert.Equal(t, start, dp.StartTimestamp()) + assert.Equal(t, ts, dp.Timestamp()) + assert.Equal(t, pmetric.NumberDataPointValueTypeDouble, dp.ValueType()) + assert.InDelta(t, float64(1), dp.DoubleValue(), 0.01) case "mongodb.session.count": assert.False(t, validatedMetrics["mongodb.session.count"], "Found a duplicate in the metrics slice: mongodb.session.count") validatedMetrics["mongodb.session.count"] = true diff --git a/receiver/mongodbreceiver/internal/metadata/testdata/config.yaml b/receiver/mongodbreceiver/internal/metadata/testdata/config.yaml index 5f97ca6b4081..69facb8fd75e 100644 --- a/receiver/mongodbreceiver/internal/metadata/testdata/config.yaml +++ b/receiver/mongodbreceiver/internal/metadata/testdata/config.yaml @@ -55,6 +55,18 @@ all_set: enabled: true mongodb.operation.time: enabled: true + mongodb.repl_commands_per_sec: + enabled: true + mongodb.repl_deletes_per_sec: + enabled: true + mongodb.repl_getmores_per_sec: + enabled: true + mongodb.repl_inserts_per_sec: + enabled: true + mongodb.repl_queries_per_sec: + enabled: true + mongodb.repl_updates_per_sec: + enabled: true mongodb.session.count: enabled: true mongodb.storage.size: @@ -124,6 +136,18 @@ none_set: enabled: false mongodb.operation.time: enabled: false + mongodb.repl_commands_per_sec: + enabled: false + mongodb.repl_deletes_per_sec: + enabled: false + mongodb.repl_getmores_per_sec: + enabled: false + mongodb.repl_inserts_per_sec: + enabled: false + mongodb.repl_queries_per_sec: + enabled: false + mongodb.repl_updates_per_sec: + enabled: false mongodb.session.count: enabled: false mongodb.storage.size: diff --git a/receiver/mongodbreceiver/metadata.yaml b/receiver/mongodbreceiver/metadata.yaml index b95816191cef..aa9083314fb4 100644 --- a/receiver/mongodbreceiver/metadata.yaml +++ b/receiver/mongodbreceiver/metadata.yaml @@ -357,7 +357,55 @@ metrics: value_type: int monotonic: true aggregation_temporality: cumulative - attributes: [ ] + attributes: [ ] + mongodb.repl_queries_per_sec: + description: The number of replicated queries executed per second. + unit: "{query}/s" + enabled: false + gauge: + value_type: double + aggregation_temporality: delta + monotonic: false + mongodb.repl_inserts_per_sec: + description: The number of replicated insertions executed per second. + unit: "{insert}/s" + enabled: false + gauge: + value_type: double + aggregation_temporality: delta + monotonic: false + mongodb.repl_commands_per_sec: + description: The number of replicated commands executed per second. + unit: "{command}/s" + enabled: false + gauge: + value_type: double + aggregation_temporality: delta + monotonic: false + mongodb.repl_getmores_per_sec: + description: The number of replicated getmores executed per second. + unit: "{getmore}/s" + enabled: false + gauge: + value_type: double + aggregation_temporality: delta + monotonic: false + mongodb.repl_deletes_per_sec: + description: The number of replicated deletes executed per second. + unit: "{delete}/s" + enabled: false + gauge: + value_type: double + aggregation_temporality: delta + monotonic: false + mongodb.repl_updates_per_sec: + description: The number of replicated updates executed per second. + unit: "{update}/s" + enabled: false + gauge: + value_type: double + aggregation_temporality: delta + monotonic: false tests: config: diff --git a/receiver/mongodbreceiver/metrics.go b/receiver/mongodbreceiver/metrics.go index c70627f5988c..60b7d8af871c 100644 --- a/receiver/mongodbreceiver/metrics.go +++ b/receiver/mongodbreceiver/metrics.go @@ -4,6 +4,7 @@ package mongodbreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/mongodbreceiver" import ( + "context" "errors" "fmt" "reflect" @@ -12,6 +13,7 @@ import ( "go.mongodb.org/mongo-driver/bson" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/scraper/scrapererror" + "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/mongodbreceiver/internal/metadata" ) @@ -233,15 +235,74 @@ func (s *mongodbScraper) recordOperations(now pcommon.Timestamp, doc bson.M, err } func (s *mongodbScraper) recordOperationsRepl(now pcommon.Timestamp, doc bson.M, errs *scrapererror.ScrapeErrors) { + var replDoc bson.M = doc + var highestInsertCount int64 = -1 + + if len(s.secondaryClients) > 0 { + ctx := context.Background() + for _, secondaryClient := range s.secondaryClients { + status, err := secondaryClient.ServerStatus(ctx, "admin") + if err != nil { + s.logger.Debug("Failed to get secondary server status", zap.Error(err)) + continue + } + + if opcountersRepl, ok := status["opcountersRepl"].(bson.M); ok { + if insertCount, ok := opcountersRepl["insert"].(int64); ok { + if insertCount > highestInsertCount { + highestInsertCount = insertCount + replDoc = status + } + } + } + } + } + + currentCounts := make(map[string]int64) for operationVal, operation := range metadata.MapAttributeOperation { metricPath := []string{"opcountersRepl", operationVal} metricName := "mongodb.operation.repl.count" - val, err := collectMetric(doc, metricPath) + val, err := collectMetric(replDoc, metricPath) if err != nil { errs.AddPartial(1, fmt.Errorf(collectMetricWithAttributes, metricName, operationVal, err)) continue } s.mb.RecordMongodbOperationReplCountDataPoint(now, val, operation) + + currentCounts[operationVal] = val + s.recordReplOperationPerSecond(now, operationVal, val) + } + + s.prevReplCounts = currentCounts + s.prevReplTimestamp = now +} + +func (s *mongodbScraper) recordReplOperationPerSecond(now pcommon.Timestamp, operationVal string, currentCount int64) { + if s.prevReplTimestamp > 0 { + timeDelta := float64(now-s.prevReplTimestamp) / 1e9 + if timeDelta > 0 { + if prevReplCount, exists := s.prevReplCounts[operationVal]; exists { + delta := currentCount - prevReplCount + queriesPerSec := float64(delta) / timeDelta + + switch operationVal { + case "query": + s.mb.RecordMongodbReplQueriesPerSecDataPoint(now, queriesPerSec) + case "insert": + s.mb.RecordMongodbReplInsertsPerSecDataPoint(now, queriesPerSec) + case "command": + s.mb.RecordMongodbReplCommandsPerSecDataPoint(now, queriesPerSec) + case "getmore": + s.mb.RecordMongodbReplGetmoresPerSecDataPoint(now, queriesPerSec) + case "delete": + s.mb.RecordMongodbReplDeletesPerSecDataPoint(now, queriesPerSec) + case "update": + s.mb.RecordMongodbReplUpdatesPerSecDataPoint(now, queriesPerSec) + default: + fmt.Printf("Unhandled repl operation: %s\n", operationVal) + } + } + } } } diff --git a/receiver/mongodbreceiver/scraper.go b/receiver/mongodbreceiver/scraper.go index 234884d07523..b2f67730b0ce 100644 --- a/receiver/mongodbreceiver/scraper.go +++ b/receiver/mongodbreceiver/scraper.go @@ -13,7 +13,9 @@ import ( "github.com/hashicorp/go-version" "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/bson/primitive" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config/confignet" "go.opentelemetry.io/collector/featuregate" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" @@ -37,34 +39,81 @@ var ( ) type mongodbScraper struct { - logger *zap.Logger - config *Config - client client - mongoVersion *version.Version - mb *metadata.MetricsBuilder + logger *zap.Logger + config *Config + client client + secondaryClients []client + mongoVersion *version.Version + mb *metadata.MetricsBuilder + prevReplTimestamp pcommon.Timestamp + prevReplCounts map[string]int64 } func newMongodbScraper(settings receiver.Settings, config *Config) *mongodbScraper { return &mongodbScraper{ - logger: settings.Logger, - config: config, - mb: metadata.NewMetricsBuilder(config.MetricsBuilderConfig, settings), - mongoVersion: unknownVersion(), + logger: settings.Logger, + config: config, + mb: metadata.NewMetricsBuilder(config.MetricsBuilderConfig, settings), + mongoVersion: unknownVersion(), + prevReplTimestamp: pcommon.Timestamp(0), + prevReplCounts: make(map[string]int64), } } func (s *mongodbScraper) start(ctx context.Context, _ component.Host) error { - c, err := newClient(ctx, s.config, s.logger) + c, err := newClient(ctx, s.config, s.logger, false) if err != nil { return fmt.Errorf("create mongo client: %w", err) } s.client = c + + // Skip secondary host discovery if direct connection is enabled + if s.config.DirectConnection { + return nil + } + + secondaries, err := s.findSecondaryHosts(ctx) + if err != nil { + s.logger.Warn("failed to find secondary hosts", zap.Error(err)) + return nil + } + + for _, secondary := range secondaries { + secondaryConfig := *s.config + secondaryConfig.Hosts = []confignet.TCPAddrConfig{ + { + Endpoint: secondary, + }, + } + + client, err := newClient(ctx, &secondaryConfig, s.logger, true) + if err != nil { + s.logger.Warn("failed to connect to secondary", zap.String("host", secondary), zap.Error(err)) + continue + } + s.secondaryClients = append(s.secondaryClients, client) + } + return nil } func (s *mongodbScraper) shutdown(ctx context.Context) error { + var errs []error + if s.client != nil { - return s.client.Disconnect(ctx) + if err := s.client.Disconnect(ctx); err != nil { + errs = append(errs, err) + } + } + + for _, client := range s.secondaryClients { + if err := client.Disconnect(ctx); err != nil { + errs = append(errs, err) + } + } + + if len(errs) > 0 { + return fmt.Errorf("multiple disconnect errors: %v", errs) } return nil } @@ -231,3 +280,44 @@ func serverAddressAndPort(serverStatus bson.M) (string, int64, error) { return "", 0, fmt.Errorf("unexpected host format: %s", host) } } + +func (s *mongodbScraper) findSecondaryHosts(ctx context.Context) ([]string, error) { + result, err := s.client.RunCommand(ctx, "admin", bson.M{"replSetGetStatus": 1}) + if err != nil { + s.logger.Error("Failed to get replica set status", zap.Error(err)) + return nil, fmt.Errorf("failed to get replica set status: %w", err) + } + + members, ok := result["members"].(primitive.A) + if !ok { + return nil, fmt.Errorf("invalid members format: expected type primitive.A but got %T, value: %v", result["members"], result["members"]) + } + + var hosts []string + for _, member := range members { + m, ok := member.(bson.M) + if !ok { + continue + } + + state, ok := m["stateStr"].(string) + if !ok { + continue + } + + name, ok := m["name"].(string) + if !ok { + continue + } + + // Only add actual secondaries, not arbiters or other states + if state == "SECONDARY" { + s.logger.Debug("Found secondary", + zap.String("host", name), + zap.String("state", state)) + hosts = append(hosts, name) + } + } + + return hosts, nil +} diff --git a/receiver/mongodbreceiver/scraper_test.go b/receiver/mongodbreceiver/scraper_test.go index cee21c69154b..4d428872bd88 100644 --- a/receiver/mongodbreceiver/scraper_test.go +++ b/receiver/mongodbreceiver/scraper_test.go @@ -40,6 +40,13 @@ func TestScraperLifecycle(t *testing.T) { f := NewFactory() cfg := f.CreateDefaultConfig().(*Config) + /* NOTE: + setting direct connection to true because originally, the scraper tests only ONE mongodb instance. + added in routing logic to detect multiple mongodb instances which takes longer than 2 milliseconds. + since this test is testing for lifecycle (start and shutting down ONE instance). + */ + cfg.DirectConnection = true + scraper := newMongodbScraper(receivertest.NewNopSettings(), cfg) require.NoError(t, scraper.start(context.Background(), componenttest.NewNopHost())) require.NoError(t, scraper.shutdown(context.Background()))