Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow users to configure the Prometheus remote write queue #3046

Merged
merged 9 commits into from
May 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion exporter/prometheusremotewriteexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ The following settings can be optionally configured:
- `headers`: additional headers attached to each HTTP request.
- *Note the following headers cannot be changed: `Content-Encoding`, `Content-Type`, `X-Prometheus-Remote-Write-Version`, and `User-Agent`.*
- `namespace`: prefix attached to each exported metric name.
- `remote_write_queue`: fine tuning for queueing and sending of the outgoing remote writes.
- `queue_size`: number of OTLP metrics that can be queued.
- `num_consumers`: minimum number of workers to use to fan out the outgoing requests.

Example:

Expand All @@ -51,5 +54,5 @@ Several helper files are leveraged to provide additional capabilities automatica

- [HTTP settings](https://github.com/open-telemetry/opentelemetry-collector/blob/main/config/confighttp/README.md)
- [TLS and mTLS settings](https://github.com/open-telemetry/opentelemetry-collector/blob/main/config/configtls/README.md)
- [Retry and timeout settings](https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/exporterhelper/README.md), note that the exporter doesn't support `sending_queue`.
- [Retry and timeout settings](https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/exporterhelper/README.md), note that the exporter doesn't support `sending_queue` but provides `remote_write_queue`.
- [Resource attributes to Metric labels](https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/exporterhelper/README.md),
17 changes: 17 additions & 0 deletions exporter/prometheusremotewriteexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ type Config struct {
// See: https://prometheus.io/docs/practices/naming/#metric-names
Namespace string `mapstructure:"namespace"`

// QueueConfig allows users to fine tune the queues
// that handle outgoing requests.
RemoteWriteQueue RemoteWriteQueue `mapstructure:"remote_write_queue"`

// ExternalLabels defines a map of label keys and values that are allowed to start with reserved prefix "__"
ExternalLabels map[string]string `mapstructure:"external_labels"`

Expand All @@ -42,6 +46,19 @@ type Config struct {
exporterhelper.ResourceToTelemetrySettings `mapstructure:"resource_to_telemetry_conversion"`
}

// RemoteWriteQueue allows to configure the remote write queue.
type RemoteWriteQueue struct {
// QueueSize is the maximum number of OTLP metric batches allowed
// in the queue at a given time.
QueueSize int `mapstructure:"queue_size"`

// NumWorkers configures the number of workers used by
// the collector to fan out remote write requests.
NumConsumers int `mapstructure:"num_consumers"`
}

// TODO(jbd): Add capacity, max_samples_per_send to QueueConfig.

var _ config.Exporter = (*Config)(nil)

// Validate checks if the exporter configuration is valid
Expand Down
9 changes: 4 additions & 5 deletions exporter/prometheusremotewriteexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,17 +52,16 @@ func Test_loadConfig(t *testing.T) {
&Config{
ExporterSettings: config.NewExporterSettings(config.NewIDWithName(typeStr, "2")),
TimeoutSettings: exporterhelper.DefaultTimeoutSettings(),
QueueSettings: exporterhelper.QueueSettings{
Enabled: true,
NumConsumers: 2,
QueueSize: 10,
},
RetrySettings: exporterhelper.RetrySettings{
Enabled: true,
InitialInterval: 10 * time.Second,
MaxInterval: 1 * time.Minute,
MaxElapsedTime: 10 * time.Minute,
},
RemoteWriteQueue: RemoteWriteQueue{
QueueSize: 2000,
NumConsumers: 10,
},
Namespace: "test-space",
ExternalLabels: map[string]string{"key1": "value1", "key2": "value2"},
HTTPClientSettings: confighttp.HTTPClientSettings{
Expand Down
11 changes: 5 additions & 6 deletions exporter/prometheusremotewriteexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,7 @@ import (
"go.opentelemetry.io/collector/consumer/pdata"
)

const (
maxConcurrentRequests = 5
maxBatchByteSize = 3000000
)
const maxBatchByteSize = 3000000

// PrwExporter converts OTLP metrics to Prometheus remote write TimeSeries and sends them to a remote endpoint.
type PrwExporter struct {
Expand All @@ -50,12 +47,13 @@ type PrwExporter struct {
client *http.Client
wg *sync.WaitGroup
closeChan chan struct{}
concurrency int
userAgentHeader string
}

// NewPrwExporter initializes a new PrwExporter instance and sets fields accordingly.
// client parameter cannot be nil.
func NewPrwExporter(namespace string, endpoint string, client *http.Client, externalLabels map[string]string, buildInfo component.BuildInfo) (*PrwExporter, error) {
func NewPrwExporter(namespace string, endpoint string, client *http.Client, externalLabels map[string]string, concurrency int, buildInfo component.BuildInfo) (*PrwExporter, error) {
if client == nil {
return nil, errors.New("http client cannot be nil")
}
Expand All @@ -80,6 +78,7 @@ func NewPrwExporter(namespace string, endpoint string, client *http.Client, exte
wg: new(sync.WaitGroup),
closeChan: make(chan struct{}),
userAgentHeader: userAgentHeader,
concurrency: concurrency,
}, nil
}

Expand Down Expand Up @@ -285,7 +284,7 @@ func (prwe *PrwExporter) export(ctx context.Context, tsMap map[string]*prompb.Ti
var mu sync.Mutex
var wg sync.WaitGroup

concurrencyLimit := int(math.Min(maxConcurrentRequests, float64(len(requests))))
concurrencyLimit := int(math.Min(float64(prwe.concurrency), float64(len(requests))))
wg.Add(concurrencyLimit) // used to wait for workers to be finished

// Run concurrencyLimit of workers until there
Expand Down
12 changes: 9 additions & 3 deletions exporter/prometheusremotewriteexporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ func Test_NewPrwExporter(t *testing.T) {
config *Config
namespace string
endpoint string
concurrency int
externalLabels map[string]string
client *http.Client
returnError bool
Expand All @@ -68,6 +69,7 @@ func Test_NewPrwExporter(t *testing.T) {
cfg,
"test",
"invalid URL",
5,
map[string]string{"Key1": "Val1"},
http.DefaultClient,
true,
Expand All @@ -78,6 +80,7 @@ func Test_NewPrwExporter(t *testing.T) {
cfg,
"test",
"http://some.url:9411/api/prom/push",
5,
map[string]string{"Key1": "Val1"},
nil,
true,
Expand All @@ -88,6 +91,7 @@ func Test_NewPrwExporter(t *testing.T) {
cfg,
"test",
"http://some.url:9411/api/prom/push",
5,
map[string]string{"Key1": ""},
http.DefaultClient,
true,
Expand All @@ -98,6 +102,7 @@ func Test_NewPrwExporter(t *testing.T) {
cfg,
"test",
"http://some.url:9411/api/prom/push",
5,
map[string]string{"Key1": "Val1"},
http.DefaultClient,
false,
Expand All @@ -108,6 +113,7 @@ func Test_NewPrwExporter(t *testing.T) {
cfg,
"test",
"http://some.url:9411/api/prom/push",
5,
map[string]string{},
http.DefaultClient,
false,
Expand All @@ -117,7 +123,7 @@ func Test_NewPrwExporter(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
prwe, err := NewPrwExporter(tt.namespace, tt.endpoint, tt.client, tt.externalLabels, tt.buildInfo)
prwe, err := NewPrwExporter(tt.namespace, tt.endpoint, tt.client, tt.externalLabels, 1, tt.buildInfo)
if tt.returnError {
assert.Error(t, err)
return
Expand Down Expand Up @@ -260,7 +266,7 @@ func runExportPipeline(ts *prompb.TimeSeries, endpoint *url.URL) []error {
Version: "1.0",
}
// after this, instantiate a CortexExporter with the current HTTP client and endpoint set to passed in endpoint
prwe, err := NewPrwExporter("test", endpoint.String(), HTTPClient, map[string]string{}, buildInfo)
prwe, err := NewPrwExporter("test", endpoint.String(), HTTPClient, map[string]string{}, 1, buildInfo)
if err != nil {
errs = append(errs, err)
return errs
Expand Down Expand Up @@ -515,7 +521,7 @@ func Test_PushMetrics(t *testing.T) {
Description: "OpenTelemetry Collector",
Version: "1.0",
}
prwe, nErr := NewPrwExporter(config.Namespace, serverURL.String(), c, map[string]string{}, buildInfo)
prwe, nErr := NewPrwExporter(config.Namespace, serverURL.String(), c, map[string]string{}, 5, buildInfo)
require.NoError(t, nErr)
err := prwe.PushMetrics(context.Background(), *tt.md)
if tt.returnErr {
Expand Down
23 changes: 15 additions & 8 deletions exporter/prometheusremotewriteexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,35 +50,37 @@ func createMetricsExporter(_ context.Context, params component.ExporterCreatePar
return nil, err
}

prwe, err := NewPrwExporter(prwCfg.Namespace, prwCfg.HTTPClientSettings.Endpoint, client, prwCfg.ExternalLabels, params.BuildInfo)
prwe, err := NewPrwExporter(
prwCfg.Namespace,
prwCfg.HTTPClientSettings.Endpoint,
client, prwCfg.ExternalLabels,
prwCfg.RemoteWriteQueue.NumConsumers,
params.BuildInfo,
)
if err != nil {
return nil, err
}

// Don't support the queue.
// Don't allow users to configure the queue.
// See https://github.com/open-telemetry/opentelemetry-collector/issues/2949.
// Prometheus remote write samples needs to be in chronological
// order for each timeseries. If we shard the incoming metrics
// without considering this limitation, we experience
// "out of order samples" errors.
prwexp, err := exporterhelper.NewMetricsExporter(
rakyll marked this conversation as resolved.
Show resolved Hide resolved
return exporterhelper.NewMetricsExporter(
cfg,
params.Logger,
prwe.PushMetrics,
exporterhelper.WithTimeout(prwCfg.TimeoutSettings),
exporterhelper.WithQueue(exporterhelper.QueueSettings{
Enabled: true,
NumConsumers: 1,
QueueSize: 10000,
// TODO(jbd): Adjust the default queue size
// and allow users to modify the queue size.
QueueSize: prwCfg.RemoteWriteQueue.QueueSize,
}),
exporterhelper.WithRetry(prwCfg.RetrySettings),
exporterhelper.WithResourceToTelemetryConversion(prwCfg.ResourceToTelemetrySettings),
exporterhelper.WithShutdown(prwe.Shutdown),
)

return prwexp, err
}

func createDefaultConfig() config.Exporter {
Expand All @@ -96,5 +98,10 @@ func createDefaultConfig() config.Exporter {
Timeout: exporterhelper.DefaultTimeoutSettings().Timeout,
Headers: map[string]string{},
},
// TODO(jbd): Adjust the default queue size.
RemoteWriteQueue: RemoteWriteQueue{
QueueSize: 10000,
NumConsumers: 5,
},
}
}
7 changes: 3 additions & 4 deletions exporter/prometheusremotewriteexporter/testdata/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,6 @@ exporters:
prometheusremotewrite:
prometheusremotewrite/2:
namespace: "test-space"
sending_queue:
enabled: true
num_consumers: 2
queue_size: 10
retry_on_failure:
enabled: true
initial_interval: 10s
Expand All @@ -28,6 +24,9 @@ exporters:
key2: value2
resource_to_telemetry_conversion:
enabled: true
remote_write_queue:
queue_size: 2000
num_consumers: 10

service:
pipelines:
Expand Down