diff --git a/exporter/prometheusremotewriteexporter/README.md b/exporter/prometheusremotewriteexporter/README.md index 57a41798c2a..4d318f41fdb 100644 --- a/exporter/prometheusremotewriteexporter/README.md +++ b/exporter/prometheusremotewriteexporter/README.md @@ -36,6 +36,9 @@ The following settings can be optionally configured: - `headers`: additional headers attached to each HTTP request. - *Note the following headers cannot be changed: `Content-Encoding`, `Content-Type`, `X-Prometheus-Remote-Write-Version`, and `User-Agent`.* - `namespace`: prefix attached to each exported metric name. +- `remote_write_queue`: fine tuning for queueing and sending of the outgoing remote writes. + - `queue_size`: number of OTLP metrics that can be queued. + - `num_consumers`: minimum number of workers to use to fan out the outgoing requests. Example: @@ -51,5 +54,5 @@ Several helper files are leveraged to provide additional capabilities automatica - [HTTP settings](https://github.com/open-telemetry/opentelemetry-collector/blob/main/config/confighttp/README.md) - [TLS and mTLS settings](https://github.com/open-telemetry/opentelemetry-collector/blob/main/config/configtls/README.md) -- [Retry and timeout settings](https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/exporterhelper/README.md), note that the exporter doesn't support `sending_queue`. +- [Retry and timeout settings](https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/exporterhelper/README.md), note that the exporter doesn't support `sending_queue` but provides `remote_write_queue`. - [Resource attributes to Metric labels](https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/exporterhelper/README.md), diff --git a/exporter/prometheusremotewriteexporter/config.go b/exporter/prometheusremotewriteexporter/config.go index 2db89256f99..b211d622cb3 100644 --- a/exporter/prometheusremotewriteexporter/config.go +++ b/exporter/prometheusremotewriteexporter/config.go @@ -31,6 +31,10 @@ type Config struct { // See: https://prometheus.io/docs/practices/naming/#metric-names Namespace string `mapstructure:"namespace"` + // QueueConfig allows users to fine tune the queues + // that handle outgoing requests. + RemoteWriteQueue RemoteWriteQueue `mapstructure:"remote_write_queue"` + // ExternalLabels defines a map of label keys and values that are allowed to start with reserved prefix "__" ExternalLabels map[string]string `mapstructure:"external_labels"` @@ -42,6 +46,19 @@ type Config struct { exporterhelper.ResourceToTelemetrySettings `mapstructure:"resource_to_telemetry_conversion"` } +// RemoteWriteQueue allows to configure the remote write queue. +type RemoteWriteQueue struct { + // QueueSize is the maximum number of OTLP metric batches allowed + // in the queue at a given time. + QueueSize int `mapstructure:"queue_size"` + + // NumWorkers configures the number of workers used by + // the collector to fan out remote write requests. + NumConsumers int `mapstructure:"num_consumers"` +} + +// TODO(jbd): Add capacity, max_samples_per_send to QueueConfig. + var _ config.Exporter = (*Config)(nil) // Validate checks if the exporter configuration is valid diff --git a/exporter/prometheusremotewriteexporter/config_test.go b/exporter/prometheusremotewriteexporter/config_test.go index ce7ca7c691b..fffdc707a57 100644 --- a/exporter/prometheusremotewriteexporter/config_test.go +++ b/exporter/prometheusremotewriteexporter/config_test.go @@ -52,17 +52,16 @@ func Test_loadConfig(t *testing.T) { &Config{ ExporterSettings: config.NewExporterSettings(config.NewIDWithName(typeStr, "2")), TimeoutSettings: exporterhelper.DefaultTimeoutSettings(), - QueueSettings: exporterhelper.QueueSettings{ - Enabled: true, - NumConsumers: 2, - QueueSize: 10, - }, RetrySettings: exporterhelper.RetrySettings{ Enabled: true, InitialInterval: 10 * time.Second, MaxInterval: 1 * time.Minute, MaxElapsedTime: 10 * time.Minute, }, + RemoteWriteQueue: RemoteWriteQueue{ + QueueSize: 2000, + NumConsumers: 10, + }, Namespace: "test-space", ExternalLabels: map[string]string{"key1": "value1", "key2": "value2"}, HTTPClientSettings: confighttp.HTTPClientSettings{ diff --git a/exporter/prometheusremotewriteexporter/exporter.go b/exporter/prometheusremotewriteexporter/exporter.go index 04947215c0c..c165609c108 100644 --- a/exporter/prometheusremotewriteexporter/exporter.go +++ b/exporter/prometheusremotewriteexporter/exporter.go @@ -37,10 +37,7 @@ import ( "go.opentelemetry.io/collector/consumer/pdata" ) -const ( - maxConcurrentRequests = 5 - maxBatchByteSize = 3000000 -) +const maxBatchByteSize = 3000000 // PrwExporter converts OTLP metrics to Prometheus remote write TimeSeries and sends them to a remote endpoint. type PrwExporter struct { @@ -50,12 +47,13 @@ type PrwExporter struct { client *http.Client wg *sync.WaitGroup closeChan chan struct{} + concurrency int userAgentHeader string } // NewPrwExporter initializes a new PrwExporter instance and sets fields accordingly. // client parameter cannot be nil. -func NewPrwExporter(namespace string, endpoint string, client *http.Client, externalLabels map[string]string, buildInfo component.BuildInfo) (*PrwExporter, error) { +func NewPrwExporter(namespace string, endpoint string, client *http.Client, externalLabels map[string]string, concurrency int, buildInfo component.BuildInfo) (*PrwExporter, error) { if client == nil { return nil, errors.New("http client cannot be nil") } @@ -80,6 +78,7 @@ func NewPrwExporter(namespace string, endpoint string, client *http.Client, exte wg: new(sync.WaitGroup), closeChan: make(chan struct{}), userAgentHeader: userAgentHeader, + concurrency: concurrency, }, nil } @@ -285,7 +284,7 @@ func (prwe *PrwExporter) export(ctx context.Context, tsMap map[string]*prompb.Ti var mu sync.Mutex var wg sync.WaitGroup - concurrencyLimit := int(math.Min(maxConcurrentRequests, float64(len(requests)))) + concurrencyLimit := int(math.Min(float64(prwe.concurrency), float64(len(requests)))) wg.Add(concurrencyLimit) // used to wait for workers to be finished // Run concurrencyLimit of workers until there diff --git a/exporter/prometheusremotewriteexporter/exporter_test.go b/exporter/prometheusremotewriteexporter/exporter_test.go index 37c6741e419..f27eeb0bc40 100644 --- a/exporter/prometheusremotewriteexporter/exporter_test.go +++ b/exporter/prometheusremotewriteexporter/exporter_test.go @@ -58,6 +58,7 @@ func Test_NewPrwExporter(t *testing.T) { config *Config namespace string endpoint string + concurrency int externalLabels map[string]string client *http.Client returnError bool @@ -68,6 +69,7 @@ func Test_NewPrwExporter(t *testing.T) { cfg, "test", "invalid URL", + 5, map[string]string{"Key1": "Val1"}, http.DefaultClient, true, @@ -78,6 +80,7 @@ func Test_NewPrwExporter(t *testing.T) { cfg, "test", "http://some.url:9411/api/prom/push", + 5, map[string]string{"Key1": "Val1"}, nil, true, @@ -88,6 +91,7 @@ func Test_NewPrwExporter(t *testing.T) { cfg, "test", "http://some.url:9411/api/prom/push", + 5, map[string]string{"Key1": ""}, http.DefaultClient, true, @@ -98,6 +102,7 @@ func Test_NewPrwExporter(t *testing.T) { cfg, "test", "http://some.url:9411/api/prom/push", + 5, map[string]string{"Key1": "Val1"}, http.DefaultClient, false, @@ -108,6 +113,7 @@ func Test_NewPrwExporter(t *testing.T) { cfg, "test", "http://some.url:9411/api/prom/push", + 5, map[string]string{}, http.DefaultClient, false, @@ -117,7 +123,7 @@ func Test_NewPrwExporter(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - prwe, err := NewPrwExporter(tt.namespace, tt.endpoint, tt.client, tt.externalLabels, tt.buildInfo) + prwe, err := NewPrwExporter(tt.namespace, tt.endpoint, tt.client, tt.externalLabels, 1, tt.buildInfo) if tt.returnError { assert.Error(t, err) return @@ -260,7 +266,7 @@ func runExportPipeline(ts *prompb.TimeSeries, endpoint *url.URL) []error { Version: "1.0", } // after this, instantiate a CortexExporter with the current HTTP client and endpoint set to passed in endpoint - prwe, err := NewPrwExporter("test", endpoint.String(), HTTPClient, map[string]string{}, buildInfo) + prwe, err := NewPrwExporter("test", endpoint.String(), HTTPClient, map[string]string{}, 1, buildInfo) if err != nil { errs = append(errs, err) return errs @@ -515,7 +521,7 @@ func Test_PushMetrics(t *testing.T) { Description: "OpenTelemetry Collector", Version: "1.0", } - prwe, nErr := NewPrwExporter(config.Namespace, serverURL.String(), c, map[string]string{}, buildInfo) + prwe, nErr := NewPrwExporter(config.Namespace, serverURL.String(), c, map[string]string{}, 5, buildInfo) require.NoError(t, nErr) err := prwe.PushMetrics(context.Background(), *tt.md) if tt.returnErr { diff --git a/exporter/prometheusremotewriteexporter/factory.go b/exporter/prometheusremotewriteexporter/factory.go index 599430a0c17..a1566f11ad1 100644 --- a/exporter/prometheusremotewriteexporter/factory.go +++ b/exporter/prometheusremotewriteexporter/factory.go @@ -50,18 +50,24 @@ func createMetricsExporter(_ context.Context, params component.ExporterCreatePar return nil, err } - prwe, err := NewPrwExporter(prwCfg.Namespace, prwCfg.HTTPClientSettings.Endpoint, client, prwCfg.ExternalLabels, params.BuildInfo) + prwe, err := NewPrwExporter( + prwCfg.Namespace, + prwCfg.HTTPClientSettings.Endpoint, + client, prwCfg.ExternalLabels, + prwCfg.RemoteWriteQueue.NumConsumers, + params.BuildInfo, + ) if err != nil { return nil, err } - // Don't support the queue. + // Don't allow users to configure the queue. // See https://github.com/open-telemetry/opentelemetry-collector/issues/2949. // Prometheus remote write samples needs to be in chronological // order for each timeseries. If we shard the incoming metrics // without considering this limitation, we experience // "out of order samples" errors. - prwexp, err := exporterhelper.NewMetricsExporter( + return exporterhelper.NewMetricsExporter( cfg, params.Logger, prwe.PushMetrics, @@ -69,16 +75,12 @@ func createMetricsExporter(_ context.Context, params component.ExporterCreatePar exporterhelper.WithQueue(exporterhelper.QueueSettings{ Enabled: true, NumConsumers: 1, - QueueSize: 10000, - // TODO(jbd): Adjust the default queue size - // and allow users to modify the queue size. + QueueSize: prwCfg.RemoteWriteQueue.QueueSize, }), exporterhelper.WithRetry(prwCfg.RetrySettings), exporterhelper.WithResourceToTelemetryConversion(prwCfg.ResourceToTelemetrySettings), exporterhelper.WithShutdown(prwe.Shutdown), ) - - return prwexp, err } func createDefaultConfig() config.Exporter { @@ -96,5 +98,10 @@ func createDefaultConfig() config.Exporter { Timeout: exporterhelper.DefaultTimeoutSettings().Timeout, Headers: map[string]string{}, }, + // TODO(jbd): Adjust the default queue size. + RemoteWriteQueue: RemoteWriteQueue{ + QueueSize: 10000, + NumConsumers: 5, + }, } } diff --git a/exporter/prometheusremotewriteexporter/testdata/config.yaml b/exporter/prometheusremotewriteexporter/testdata/config.yaml index 0a64a130f97..c890bc97d3c 100644 --- a/exporter/prometheusremotewriteexporter/testdata/config.yaml +++ b/exporter/prometheusremotewriteexporter/testdata/config.yaml @@ -8,10 +8,6 @@ exporters: prometheusremotewrite: prometheusremotewrite/2: namespace: "test-space" - sending_queue: - enabled: true - num_consumers: 2 - queue_size: 10 retry_on_failure: enabled: true initial_interval: 10s @@ -28,6 +24,9 @@ exporters: key2: value2 resource_to_telemetry_conversion: enabled: true + remote_write_queue: + queue_size: 2000 + num_consumers: 10 service: pipelines: