Skip to content

Commit

Permalink
Allow users to configure the Prometheus remote write queue (#3046)
Browse files Browse the repository at this point in the history
* Allow users to configure the Prometheus remote write queue

* Fix lint

* Fix godoc

* Fix docs

* Revert wait group change

* Limit concurrency to the write reqs

* Rename min_shards to concurrency

* Renaming the queue settings

* Renaming the queue settings
  • Loading branch information
rakyll authored May 17, 2021
1 parent ff91f42 commit 709d8a8
Show file tree
Hide file tree
Showing 7 changed files with 57 additions and 27 deletions.
5 changes: 4 additions & 1 deletion exporter/prometheusremotewriteexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ The following settings can be optionally configured:
- `headers`: additional headers attached to each HTTP request.
- *Note the following headers cannot be changed: `Content-Encoding`, `Content-Type`, `X-Prometheus-Remote-Write-Version`, and `User-Agent`.*
- `namespace`: prefix attached to each exported metric name.
- `remote_write_queue`: fine tuning for queueing and sending of the outgoing remote writes.
- `queue_size`: number of OTLP metrics that can be queued.
- `num_consumers`: minimum number of workers to use to fan out the outgoing requests.

Example:

Expand All @@ -51,5 +54,5 @@ Several helper files are leveraged to provide additional capabilities automatica
- [HTTP settings](https://github.com/open-telemetry/opentelemetry-collector/blob/main/config/confighttp/README.md)
- [TLS and mTLS settings](https://github.com/open-telemetry/opentelemetry-collector/blob/main/config/configtls/README.md)
- [Retry and timeout settings](https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/exporterhelper/README.md), note that the exporter doesn't support `sending_queue`.
- [Retry and timeout settings](https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/exporterhelper/README.md), note that the exporter doesn't support `sending_queue` but provides `remote_write_queue`.
- [Resource attributes to Metric labels](https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/exporterhelper/README.md),
17 changes: 17 additions & 0 deletions exporter/prometheusremotewriteexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ type Config struct {
// See: https://prometheus.io/docs/practices/naming/#metric-names
Namespace string `mapstructure:"namespace"`

// QueueConfig allows users to fine tune the queues
// that handle outgoing requests.
RemoteWriteQueue RemoteWriteQueue `mapstructure:"remote_write_queue"`

// ExternalLabels defines a map of label keys and values that are allowed to start with reserved prefix "__"
ExternalLabels map[string]string `mapstructure:"external_labels"`

Expand All @@ -42,6 +46,19 @@ type Config struct {
exporterhelper.ResourceToTelemetrySettings `mapstructure:"resource_to_telemetry_conversion"`
}

// RemoteWriteQueue allows to configure the remote write queue.
type RemoteWriteQueue struct {
// QueueSize is the maximum number of OTLP metric batches allowed
// in the queue at a given time.
QueueSize int `mapstructure:"queue_size"`

// NumWorkers configures the number of workers used by
// the collector to fan out remote write requests.
NumConsumers int `mapstructure:"num_consumers"`
}

// TODO(jbd): Add capacity, max_samples_per_send to QueueConfig.

var _ config.Exporter = (*Config)(nil)

// Validate checks if the exporter configuration is valid
Expand Down
9 changes: 4 additions & 5 deletions exporter/prometheusremotewriteexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,17 +52,16 @@ func Test_loadConfig(t *testing.T) {
&Config{
ExporterSettings: config.NewExporterSettings(config.NewIDWithName(typeStr, "2")),
TimeoutSettings: exporterhelper.DefaultTimeoutSettings(),
QueueSettings: exporterhelper.QueueSettings{
Enabled: true,
NumConsumers: 2,
QueueSize: 10,
},
RetrySettings: exporterhelper.RetrySettings{
Enabled: true,
InitialInterval: 10 * time.Second,
MaxInterval: 1 * time.Minute,
MaxElapsedTime: 10 * time.Minute,
},
RemoteWriteQueue: RemoteWriteQueue{
QueueSize: 2000,
NumConsumers: 10,
},
Namespace: "test-space",
ExternalLabels: map[string]string{"key1": "value1", "key2": "value2"},
HTTPClientSettings: confighttp.HTTPClientSettings{
Expand Down
11 changes: 5 additions & 6 deletions exporter/prometheusremotewriteexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,7 @@ import (
"go.opentelemetry.io/collector/consumer/pdata"
)

const (
maxConcurrentRequests = 5
maxBatchByteSize = 3000000
)
const maxBatchByteSize = 3000000

// PrwExporter converts OTLP metrics to Prometheus remote write TimeSeries and sends them to a remote endpoint.
type PrwExporter struct {
Expand All @@ -50,12 +47,13 @@ type PrwExporter struct {
client *http.Client
wg *sync.WaitGroup
closeChan chan struct{}
concurrency int
userAgentHeader string
}

// NewPrwExporter initializes a new PrwExporter instance and sets fields accordingly.
// client parameter cannot be nil.
func NewPrwExporter(namespace string, endpoint string, client *http.Client, externalLabels map[string]string, buildInfo component.BuildInfo) (*PrwExporter, error) {
func NewPrwExporter(namespace string, endpoint string, client *http.Client, externalLabels map[string]string, concurrency int, buildInfo component.BuildInfo) (*PrwExporter, error) {
if client == nil {
return nil, errors.New("http client cannot be nil")
}
Expand All @@ -80,6 +78,7 @@ func NewPrwExporter(namespace string, endpoint string, client *http.Client, exte
wg: new(sync.WaitGroup),
closeChan: make(chan struct{}),
userAgentHeader: userAgentHeader,
concurrency: concurrency,
}, nil
}

Expand Down Expand Up @@ -285,7 +284,7 @@ func (prwe *PrwExporter) export(ctx context.Context, tsMap map[string]*prompb.Ti
var mu sync.Mutex
var wg sync.WaitGroup

concurrencyLimit := int(math.Min(maxConcurrentRequests, float64(len(requests))))
concurrencyLimit := int(math.Min(float64(prwe.concurrency), float64(len(requests))))
wg.Add(concurrencyLimit) // used to wait for workers to be finished

// Run concurrencyLimit of workers until there
Expand Down
12 changes: 9 additions & 3 deletions exporter/prometheusremotewriteexporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ func Test_NewPrwExporter(t *testing.T) {
config *Config
namespace string
endpoint string
concurrency int
externalLabels map[string]string
client *http.Client
returnError bool
Expand All @@ -68,6 +69,7 @@ func Test_NewPrwExporter(t *testing.T) {
cfg,
"test",
"invalid URL",
5,
map[string]string{"Key1": "Val1"},
http.DefaultClient,
true,
Expand All @@ -78,6 +80,7 @@ func Test_NewPrwExporter(t *testing.T) {
cfg,
"test",
"http://some.url:9411/api/prom/push",
5,
map[string]string{"Key1": "Val1"},
nil,
true,
Expand All @@ -88,6 +91,7 @@ func Test_NewPrwExporter(t *testing.T) {
cfg,
"test",
"http://some.url:9411/api/prom/push",
5,
map[string]string{"Key1": ""},
http.DefaultClient,
true,
Expand All @@ -98,6 +102,7 @@ func Test_NewPrwExporter(t *testing.T) {
cfg,
"test",
"http://some.url:9411/api/prom/push",
5,
map[string]string{"Key1": "Val1"},
http.DefaultClient,
false,
Expand All @@ -108,6 +113,7 @@ func Test_NewPrwExporter(t *testing.T) {
cfg,
"test",
"http://some.url:9411/api/prom/push",
5,
map[string]string{},
http.DefaultClient,
false,
Expand All @@ -117,7 +123,7 @@ func Test_NewPrwExporter(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
prwe, err := NewPrwExporter(tt.namespace, tt.endpoint, tt.client, tt.externalLabels, tt.buildInfo)
prwe, err := NewPrwExporter(tt.namespace, tt.endpoint, tt.client, tt.externalLabels, 1, tt.buildInfo)
if tt.returnError {
assert.Error(t, err)
return
Expand Down Expand Up @@ -260,7 +266,7 @@ func runExportPipeline(ts *prompb.TimeSeries, endpoint *url.URL) []error {
Version: "1.0",
}
// after this, instantiate a CortexExporter with the current HTTP client and endpoint set to passed in endpoint
prwe, err := NewPrwExporter("test", endpoint.String(), HTTPClient, map[string]string{}, buildInfo)
prwe, err := NewPrwExporter("test", endpoint.String(), HTTPClient, map[string]string{}, 1, buildInfo)
if err != nil {
errs = append(errs, err)
return errs
Expand Down Expand Up @@ -515,7 +521,7 @@ func Test_PushMetrics(t *testing.T) {
Description: "OpenTelemetry Collector",
Version: "1.0",
}
prwe, nErr := NewPrwExporter(config.Namespace, serverURL.String(), c, map[string]string{}, buildInfo)
prwe, nErr := NewPrwExporter(config.Namespace, serverURL.String(), c, map[string]string{}, 5, buildInfo)
require.NoError(t, nErr)
err := prwe.PushMetrics(context.Background(), *tt.md)
if tt.returnErr {
Expand Down
23 changes: 15 additions & 8 deletions exporter/prometheusremotewriteexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,35 +50,37 @@ func createMetricsExporter(_ context.Context, params component.ExporterCreatePar
return nil, err
}

prwe, err := NewPrwExporter(prwCfg.Namespace, prwCfg.HTTPClientSettings.Endpoint, client, prwCfg.ExternalLabels, params.BuildInfo)
prwe, err := NewPrwExporter(
prwCfg.Namespace,
prwCfg.HTTPClientSettings.Endpoint,
client, prwCfg.ExternalLabels,
prwCfg.RemoteWriteQueue.NumConsumers,
params.BuildInfo,
)
if err != nil {
return nil, err
}

// Don't support the queue.
// Don't allow users to configure the queue.
// See https://github.com/open-telemetry/opentelemetry-collector/issues/2949.
// Prometheus remote write samples needs to be in chronological
// order for each timeseries. If we shard the incoming metrics
// without considering this limitation, we experience
// "out of order samples" errors.
prwexp, err := exporterhelper.NewMetricsExporter(
return exporterhelper.NewMetricsExporter(
cfg,
params.Logger,
prwe.PushMetrics,
exporterhelper.WithTimeout(prwCfg.TimeoutSettings),
exporterhelper.WithQueue(exporterhelper.QueueSettings{
Enabled: true,
NumConsumers: 1,
QueueSize: 10000,
// TODO(jbd): Adjust the default queue size
// and allow users to modify the queue size.
QueueSize: prwCfg.RemoteWriteQueue.QueueSize,
}),
exporterhelper.WithRetry(prwCfg.RetrySettings),
exporterhelper.WithResourceToTelemetryConversion(prwCfg.ResourceToTelemetrySettings),
exporterhelper.WithShutdown(prwe.Shutdown),
)

return prwexp, err
}

func createDefaultConfig() config.Exporter {
Expand All @@ -96,5 +98,10 @@ func createDefaultConfig() config.Exporter {
Timeout: exporterhelper.DefaultTimeoutSettings().Timeout,
Headers: map[string]string{},
},
// TODO(jbd): Adjust the default queue size.
RemoteWriteQueue: RemoteWriteQueue{
QueueSize: 10000,
NumConsumers: 5,
},
}
}
7 changes: 3 additions & 4 deletions exporter/prometheusremotewriteexporter/testdata/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,6 @@ exporters:
prometheusremotewrite:
prometheusremotewrite/2:
namespace: "test-space"
sending_queue:
enabled: true
num_consumers: 2
queue_size: 10
retry_on_failure:
enabled: true
initial_interval: 10s
Expand All @@ -28,6 +24,9 @@ exporters:
key2: value2
resource_to_telemetry_conversion:
enabled: true
remote_write_queue:
queue_size: 2000
num_consumers: 10

service:
pipelines:
Expand Down

0 comments on commit 709d8a8

Please sign in to comment.