Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for multiple readers #2596

Merged
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ static OpenTelemetry.Metrics.MetricTypeExtensions.IsGauge(this OpenTelemetry.Met
static OpenTelemetry.Metrics.MetricTypeExtensions.IsHistogram(this OpenTelemetry.Metrics.MetricType self) -> bool
static OpenTelemetry.Metrics.MetricTypeExtensions.IsLong(this OpenTelemetry.Metrics.MetricType self) -> bool
static OpenTelemetry.Metrics.MetricTypeExtensions.IsSum(this OpenTelemetry.Metrics.MetricType self) -> bool
static OpenTelemetry.ProviderExtensions.GetMetricCollect(this OpenTelemetry.BaseProvider baseProvider) -> System.Func<OpenTelemetry.Batch<OpenTelemetry.Metrics.Metric>>
static OpenTelemetry.ProviderExtensions.GetCollectObservableInstruments(this OpenTelemetry.BaseProvider baseProvider) -> System.Action
static OpenTelemetry.Sdk.CreateMeterProviderBuilder() -> OpenTelemetry.Metrics.MeterProviderBuilder
static readonly OpenTelemetry.Metrics.MetricStreamConfiguration.Drop -> OpenTelemetry.Metrics.MetricStreamConfiguration
virtual OpenTelemetry.BaseExporter<T>.OnForceFlush(int timeoutMilliseconds) -> bool
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ static OpenTelemetry.Metrics.MetricTypeExtensions.IsGauge(this OpenTelemetry.Met
static OpenTelemetry.Metrics.MetricTypeExtensions.IsHistogram(this OpenTelemetry.Metrics.MetricType self) -> bool
static OpenTelemetry.Metrics.MetricTypeExtensions.IsLong(this OpenTelemetry.Metrics.MetricType self) -> bool
static OpenTelemetry.Metrics.MetricTypeExtensions.IsSum(this OpenTelemetry.Metrics.MetricType self) -> bool
static OpenTelemetry.ProviderExtensions.GetMetricCollect(this OpenTelemetry.BaseProvider baseProvider) -> System.Func<OpenTelemetry.Batch<OpenTelemetry.Metrics.Metric>>
static OpenTelemetry.ProviderExtensions.GetCollectObservableInstruments(this OpenTelemetry.BaseProvider baseProvider) -> System.Action
static OpenTelemetry.Sdk.CreateMeterProviderBuilder() -> OpenTelemetry.Metrics.MeterProviderBuilder
static readonly OpenTelemetry.Metrics.MetricStreamConfiguration.Drop -> OpenTelemetry.Metrics.MetricStreamConfiguration
virtual OpenTelemetry.BaseExporter<T>.OnForceFlush(int timeoutMilliseconds) -> bool
Expand Down
61 changes: 34 additions & 27 deletions src/OpenTelemetry/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,30 +26,33 @@
([#2542](https://github.com/open-telemetry/opentelemetry-dotnet/pull/2542))

* Added wildcard support for AddMeter.
([#2459](https://github.com/open-telemetry/opentelemetry-dotnet/pull/2459))
([#2459](https://github.com/open-telemetry/opentelemetry-dotnet/pull/2459))

* Add support for multiple Metric readers
([#2596](https://github.com/open-telemetry/opentelemetry-dotnet/pull/2596))

## 1.2.0-beta1

Released 2021-Oct-08

* Exception from Observable instrument callbacks does not
result in entire metrics being lost.
* Exception from Observable instrument callbacks does not result in entire
metrics being lost.

* SDK is allocation-free on recording of measurements with
upto 8 tags.
* SDK is allocation-free on recording of measurements with upto 8 tags.

* TracerProviderBuilder.AddLegacySource now supports wildcard activity names.
([#2183](https://github.com/open-telemetry/opentelemetry-dotnet/issues/2183))

* Instrument and View names are validated
[according with the spec](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/api.md#instrument).
* Instrument and View names are validated [according with the
spec](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/api.md#instrument).
([#2470](https://github.com/open-telemetry/opentelemetry-dotnet/issues/2470))

## 1.2.0-alpha4

Released 2021-Sep-23

* `BatchExportProcessor.OnShutdown` will now log the count of dropped telemetry items.
* `BatchExportProcessor.OnShutdown` will now log the count of dropped telemetry
items.
([#2331](https://github.com/open-telemetry/opentelemetry-dotnet/pull/2331))
* Changed `CompositeProcessor<T>.OnForceFlush` to meet with the spec
requirement. Now the SDK will invoke `ForceFlush` on all registered
Expand All @@ -60,36 +63,39 @@ Released 2021-Sep-23

Released 2021-Sep-13

* Metrics perf improvements, bug fixes.
Replace MetricProcessor with MetricReader.
* Metrics perf improvements, bug fixes. Replace MetricProcessor with
MetricReader.
([#2306](https://github.com/open-telemetry/opentelemetry-dotnet/pull/2306))

* Add `BatchExportActivityProcessorOptions` which supports field value overriding
using `OTEL_BSP_SCHEDULE_DELAY`, `OTEL_BSP_EXPORT_TIMEOUT`,
`OTEL_BSP_MAX_QUEUE_SIZE`, `OTEL_BSP_MAX_EXPORT_BATCH_SIZE`
envionmental variables as defined in the
* Add `BatchExportActivityProcessorOptions` which supports field value
overriding using `OTEL_BSP_SCHEDULE_DELAY`, `OTEL_BSP_EXPORT_TIMEOUT`,
`OTEL_BSP_MAX_QUEUE_SIZE`, `OTEL_BSP_MAX_EXPORT_BATCH_SIZE` envionmental
variables as defined in the
[specification](https://github.com/open-telemetry/opentelemetry-specification/blob/v1.5.0/specification/sdk-environment-variables.md#batch-span-processor).
([#2219](https://github.com/open-telemetry/opentelemetry-dotnet/pull/2219))

## 1.2.0-alpha2

Released 2021-Aug-24

* More Metrics features. All instrument types, push/pull
exporters, Delta/Cumulative temporality supported.
* More Metrics features. All instrument types, push/pull exporters,
Delta/Cumulative temporality supported.

* `ResourceBuilder.CreateDefault` has detectors for
`OTEL_RESOURCE_ATTRIBUTES`, `OTEL_SERVICE_NAME` environment variables
so that explicit `AddEnvironmentVariableDetector` call is not needed. ([#2247](https://github.com/open-telemetry/opentelemetry-dotnet/pull/2247))
* `ResourceBuilder.CreateDefault` has detectors for `OTEL_RESOURCE_ATTRIBUTES`,
`OTEL_SERVICE_NAME` environment variables so that explicit
`AddEnvironmentVariableDetector` call is not needed.
([#2247](https://github.com/open-telemetry/opentelemetry-dotnet/pull/2247))

* `ResourceBuilder.AddEnvironmentVariableDetector` handles `OTEL_SERVICE_NAME`
environmental variable. ([#2209](https://github.com/open-telemetry/opentelemetry-dotnet/pull/2209))
environmental variable.
([#2209](https://github.com/open-telemetry/opentelemetry-dotnet/pull/2209))

* Removes upper constraint for Microsoft.Extensions.Logging
dependencies. ([#2179](https://github.com/open-telemetry/opentelemetry-dotnet/pull/2179))
* Removes upper constraint for Microsoft.Extensions.Logging dependencies.
([#2179](https://github.com/open-telemetry/opentelemetry-dotnet/pull/2179))

* OpenTelemetryLogger modified to not throw, when the
formatter supplied in ILogger.Log call is null. ([#2200](https://github.com/open-telemetry/opentelemetry-dotnet/pull/2200))
* OpenTelemetryLogger modified to not throw, when the formatter supplied in
ILogger.Log call is null.
([#2200](https://github.com/open-telemetry/opentelemetry-dotnet/pull/2200))

## 1.2.0-alpha1

Expand All @@ -100,7 +106,8 @@ Released 2021-Jul-23
([#2174](https://github.com/open-telemetry/opentelemetry-dotnet/pull/2174))

* Removes .NET Framework 4.5.2, .NET 4.6 support. The minimum .NET Framework
version supported is .NET 4.6.1. ([#2138](https://github.com/open-telemetry/opentelemetry-dotnet/issues/2138))
version supported is .NET 4.6.1.
([#2138](https://github.com/open-telemetry/opentelemetry-dotnet/issues/2138))

## 1.1.0

Expand Down Expand Up @@ -137,8 +144,8 @@ Released 2021-May-11
Released 2021-Apr-23

* Use `AssemblyFileVersionAttribute` instead of `FileVersionInfo.GetVersionInfo`
to get the SDK version attribute to ensure that it works when the assembly
is not loaded directly from a file on disk
to get the SDK version attribute to ensure that it works when the assembly is
not loaded directly from a file on disk
([#1908](https://github.com/open-telemetry/opentelemetry-dotnet/issues/1908))

## 1.1.0-beta1
Expand Down
108 changes: 108 additions & 0 deletions src/OpenTelemetry/Metrics/CompositeMetricReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.Metrics;
using System.Threading;
using OpenTelemetry.Internal;

Expand All @@ -27,6 +28,7 @@ internal sealed class CompositeMetricReader : MetricReader
private readonly DoublyLinkedListNode head;
private DoublyLinkedListNode tail;
private bool disposed;
private int count;

public CompositeMetricReader(IEnumerable<MetricReader> readers)
{
Expand All @@ -40,6 +42,7 @@ public CompositeMetricReader(IEnumerable<MetricReader> readers)

this.head = new DoublyLinkedListNode(iter.Current);
this.tail = this.head;
this.count++;

while (iter.MoveNext())
{
Expand All @@ -57,12 +60,117 @@ public CompositeMetricReader AddReader(MetricReader reader)
};
this.tail.Next = node;
this.tail = node;
this.count++;

return this;
}

public Enumerator GetEnumerator() => new Enumerator(this.head);

internal List<Metric> AddMetricsWithNoViews(Instrument instrument)
{
var metrics = new List<Metric>();
utpilla marked this conversation as resolved.
Show resolved Hide resolved
for (var cur = this.head; cur != null; cur = cur.Next)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for a followup/todo - see if we need to catch exceptions from individual readers, so that anyone of them throwing won't affect the others.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't expect this method to throw any exceptions, right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure. As a general principle, one reader's bad behaviour shouldn't affect other readers

{
var metric = cur.Value.AddMetricWithNoViews(instrument);
metrics.Add(metric);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add only if notnull

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and in the provider, you can check the list.Count > 0 before enable measurement event is called.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm adding null to preserve the order in which we process the metrics when we have multiple readers. CompositeMetricReader checks for null before calling the reader to update the value.

In future, if we allow readers to have their own MaxMetrics limit, this would still work as we will know which readers need to update the value and which don't.

}

return metrics;
}

internal void RecordSingleStreamLongMeasurements(List<Metric> metrics, long value, ReadOnlySpan<KeyValuePair<string, object>> tags)
{
Debug.Assert(metrics.Count == this.count, "The count of metrics to be updated for a CompositeReader must match the number of individual readers.");

int index = 0;
for (var cur = this.head; cur != null; cur = cur.Next)
{
if (metrics[index] != null)
{
cur.Value.RecordSingleStreamLongMeasurement(metrics[index], value, tags);
}

index++;
}
}

internal void RecordSingleStreamDoubleMeasurements(List<Metric> metrics, double value, ReadOnlySpan<KeyValuePair<string, object>> tags)
{
Debug.Assert(metrics.Count == this.count, "The count of metrics to be updated for a CompositeReader must match the number of individual readers.");

int index = 0;
for (var cur = this.head; cur != null; cur = cur.Next)
{
if (metrics[index] != null)
{
cur.Value.RecordSingleStreamDoubleMeasurement(metrics[index], value, tags);
}

index++;
}
}

internal List<List<Metric>> AddMetricsSuperListWithViews(Instrument instrument, List<MetricStreamConfiguration> metricStreamConfigs)
{
var metricsSuperList = new List<List<Metric>>();
for (var cur = this.head; cur != null; cur = cur.Next)
{
var metrics = cur.Value.AddMetricsListWithViews(instrument, metricStreamConfigs);
metricsSuperList.Add(metrics);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

only add if the metric.Count > 0

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as #2596 (comment)

}

return metricsSuperList;
}

internal void RecordLongMeasurements(List<List<Metric>> metricsSuperList, long value, ReadOnlySpan<KeyValuePair<string, object>> tags)
{
Debug.Assert(metricsSuperList.Count == this.count, "The count of metrics to be updated for a CompositeReader must match the number of individual readers.");

int index = 0;
for (var cur = this.head; cur != null; cur = cur.Next)
{
if (metricsSuperList[index].Count > 0)
{
cur.Value.RecordLongMeasurement(metricsSuperList[index], value, tags);
}

index++;
}
}

internal void RecordDoubleMeasurements(List<List<Metric>> metricsSuperList, double value, ReadOnlySpan<KeyValuePair<string, object>> tags)
{
Debug.Assert(metricsSuperList.Count == this.count, "The count of metrics to be updated for a CompositeReader must match the number of individual readers.");

int index = 0;
for (var cur = this.head; cur != null; cur = cur.Next)
{
if (metricsSuperList[index].Count > 0)
{
cur.Value.RecordDoubleMeasurement(metricsSuperList[index], value, tags);
}

index++;
}
}

internal void CompleteSingleStreamMeasurements(List<Metric> metrics)
utpilla marked this conversation as resolved.
Show resolved Hide resolved
{
Debug.Assert(metrics.Count == this.count, "The count of metrics to be updated for a CompositeReader must match the number of individual readers.");

int index = 0;
for (var cur = this.head; cur != null; cur = cur.Next)
{
if (metrics[index] != null)
{
cur.Value.CompleteSingleStreamMeasurement(metrics[index]);
}

index++;
}
}

/// <inheritdoc/>
protected override bool ProcessMetrics(in Batch<Metric> metrics, int timeoutMilliseconds)
{
Expand Down
5 changes: 0 additions & 5 deletions src/OpenTelemetry/Metrics/MeterProviderBuilderBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,6 @@ public override MeterProviderBuilder AddMeter(params string[] names)

internal MeterProviderBuilder AddReader(MetricReader reader)
{
if (this.MetricReaders.Count >= 1)
{
throw new InvalidOperationException("Only one Metricreader is allowed.");
}

this.MetricReaders.Add(reader);
return this;
}
Expand Down
Loading