diff --git a/docs/trace/building-your-own-exporter/README.md b/docs/trace/building-your-own-exporter/README.md index 499a79ec404..0d171695a78 100644 --- a/docs/trace/building-your-own-exporter/README.md +++ b/docs/trace/building-your-own-exporter/README.md @@ -2,10 +2,10 @@ * To export telemetry to a specific destination, custom exporters must be written. -* Exporters should inherit from `ActivityExporter` and implement `ExportAsync` - and `ShutdownAsync` methods. `ActivityExporter` is part of the [OpenTelemetry +* Exporters should inherit from `ActivityExporter` and implement `Export` and + `Shutdown` methods. `ActivityExporter` is part of the [OpenTelemetry Package](https://www.nuget.org/packages/opentelemetry). -* Depending on user's choice and load on the application, `ExportAsync` may get +* Depending on user's choice and load on the application, `Export` may get called with zero or more activities. * Exporters will only receive sampled-in and ended activities. * Exporters must not throw. diff --git a/docs/trace/building-your-own-processor/MyActivityProcessor.cs b/docs/trace/building-your-own-processor/MyActivityProcessor.cs index 92a71ecd8e0..5707b7ee743 100644 --- a/docs/trace/building-your-own-processor/MyActivityProcessor.cs +++ b/docs/trace/building-your-own-processor/MyActivityProcessor.cs @@ -39,15 +39,14 @@ public override void OnEnd(Activity activity) Console.WriteLine($"{this}.OnEnd"); } - public override Task ForceFlushAsync(CancellationToken cancellationToken) + public override bool ForceFlush(int timeoutMilliseconds = Timeout.Infinite) { - Console.WriteLine($"{this}.ForceFlushAsync"); - return Task.CompletedTask; + Console.WriteLine($"{this}.ForceFlush"); + return true; } - public override Task ShutdownAsync(CancellationToken cancellationToken) + public override void Shutdown(int timeoutMilliseconds = Timeout.Infinite) { - Console.WriteLine($"{this}.ShutdownAsync"); - return Task.CompletedTask; + Console.WriteLine($"{this}.Shutdown"); } } diff --git a/docs/trace/building-your-own-sampler/README.md b/docs/trace/building-your-own-sampler/README.md index 2271ad58ea2..1b21ecc196b 100644 --- a/docs/trace/building-your-own-sampler/README.md +++ b/docs/trace/building-your-own-sampler/README.md @@ -6,13 +6,11 @@ critical code path. ```csharp -class MySampler : Sampler +internal class MySampler : Sampler { public override SamplingResult ShouldSample(in SamplingParameters samplingParameters) { - var shouldSample = true; - - return new SamplingResult(shouldSample); + return new SamplingResult(SamplingDecision.RecordAndSampled); } } ``` diff --git a/src/OpenTelemetry.Exporter.Jaeger/JaegerExporter.cs b/src/OpenTelemetry.Exporter.Jaeger/JaegerExporter.cs index ab2ee1452db..8ca39414d64 100644 --- a/src/OpenTelemetry.Exporter.Jaeger/JaegerExporter.cs +++ b/src/OpenTelemetry.Exporter.Jaeger/JaegerExporter.cs @@ -18,6 +18,7 @@ using System.Collections.Generic; using System.Diagnostics; using System.Linq; +using System.Threading; using OpenTelemetry.Exporter.Jaeger.Implementation; using OpenTelemetry.Resources; using OpenTelemetry.Trace; @@ -61,7 +62,7 @@ public override ExportResult Export(in Batch activityBatch) } /// - public override void Shutdown() + public override void Shutdown(int timeoutMilliseconds = Timeout.Infinite) { this.JaegerAgentUdpBatcher.FlushAsync(default).GetAwaiter().GetResult(); } diff --git a/src/OpenTelemetry/CHANGELOG.md b/src/OpenTelemetry/CHANGELOG.md index 3bb44b82ecc..7394811bfda 100644 --- a/src/OpenTelemetry/CHANGELOG.md +++ b/src/OpenTelemetry/CHANGELOG.md @@ -32,7 +32,8 @@ [#1094](https://github.com/open-telemetry/opentelemetry-dotnet/pull/1094) [#1113](https://github.com/open-telemetry/opentelemetry-dotnet/pull/1113) [#1127](https://github.com/open-telemetry/opentelemetry-dotnet/pull/1127) - [#1129](https://github.com/open-telemetry/opentelemetry-dotnet/pull/1129)) + [#1129](https://github.com/open-telemetry/opentelemetry-dotnet/pull/1129) + [#1135](https://github.com/open-telemetry/opentelemetry-dotnet/pull/1135)) ## 0.4.0-beta.2 diff --git a/src/OpenTelemetry/Trace/ActivityExporter.cs b/src/OpenTelemetry/Trace/ActivityExporter.cs index 4c0aa6889bb..357c199cbef 100644 --- a/src/OpenTelemetry/Trace/ActivityExporter.cs +++ b/src/OpenTelemetry/Trace/ActivityExporter.cs @@ -16,6 +16,7 @@ using System; using System.Diagnostics; +using System.Threading; namespace OpenTelemetry.Trace { @@ -48,9 +49,14 @@ public abstract class ActivityExporter : IDisposable public abstract ExportResult Export(in Batch batch); /// - /// Shuts down the exporter. + /// Attempts to shutdown the exporter, blocks the current thread until + /// shutdown completed or timed out. /// - public virtual void Shutdown() + /// + /// The number of milliseconds to wait, or Timeout.Infinite to + /// wait indefinitely. + /// + public virtual void Shutdown(int timeoutMilliseconds = Timeout.Infinite) { } diff --git a/src/OpenTelemetry/Trace/ActivityProcessor.cs b/src/OpenTelemetry/Trace/ActivityProcessor.cs index 45e6950ac3b..7acf08c0b17 100644 --- a/src/OpenTelemetry/Trace/ActivityProcessor.cs +++ b/src/OpenTelemetry/Trace/ActivityProcessor.cs @@ -46,31 +46,31 @@ public virtual void OnEnd(Activity activity) } /// - /// Shuts down Activity processor asynchronously. + /// Flushes the , blocks the current + /// thread until flush completed, shutdown signaled or timed out. /// - /// Cancellation token. - /// Returns . - public virtual Task ShutdownAsync(CancellationToken cancellationToken) + /// + /// The number of milliseconds to wait, or Timeout.Infinite to + /// wait indefinitely. + /// + /// + /// Returns true when flush completed; otherwise, false. + /// + public virtual bool ForceFlush(int timeoutMilliseconds = Timeout.Infinite) { -#if NET452 - return Task.FromResult(0); -#else - return Task.CompletedTask; -#endif + return true; } /// - /// Flushes all activities that have not yet been processed. + /// Attempts to shutdown the processor, blocks the current thread until + /// shutdown completed or timed out. /// - /// Cancellation token. - /// Returns . - public virtual Task ForceFlushAsync(CancellationToken cancellationToken) + /// + /// The number of milliseconds to wait, or Timeout.Infinite to + /// wait indefinitely. + /// + public virtual void Shutdown(int timeoutMilliseconds = Timeout.Infinite) { -#if NET452 - return Task.FromResult(0); -#else - return Task.CompletedTask; -#endif } /// @@ -91,7 +91,7 @@ protected virtual void Dispose(bool disposing) { try { - this.ShutdownAsync(CancellationToken.None).GetAwaiter().GetResult(); + this.Shutdown(); } catch (Exception ex) { diff --git a/src/OpenTelemetry/Trace/BatchExportActivityProcessor.cs b/src/OpenTelemetry/Trace/BatchExportActivityProcessor.cs index 93dd0c602d3..9aab34dfd57 100644 --- a/src/OpenTelemetry/Trace/BatchExportActivityProcessor.cs +++ b/src/OpenTelemetry/Trace/BatchExportActivityProcessor.cs @@ -143,18 +143,18 @@ public override void OnEnd(Activity activity) /// the current thread until flush completed, shutdown signaled or /// timed out. /// - /// + /// /// The number of milliseconds to wait, or Timeout.Infinite to /// wait indefinitely. /// /// /// Returns true when flush completed; otherwise, false. /// - public bool ForceFlush(int timeoutMillis = Timeout.Infinite) + public override bool ForceFlush(int timeoutMilliseconds = Timeout.Infinite) { - if (timeoutMillis < 0 && timeoutMillis != Timeout.Infinite) + if (timeoutMilliseconds < 0 && timeoutMilliseconds != Timeout.Infinite) { - throw new ArgumentOutOfRangeException(nameof(timeoutMillis)); + throw new ArgumentOutOfRangeException(nameof(timeoutMilliseconds)); } var tail = this.circularBuffer.RemovedCount; @@ -167,7 +167,7 @@ public bool ForceFlush(int timeoutMillis = Timeout.Infinite) this.exportTrigger.Set(); - if (timeoutMillis == 0) + if (timeoutMilliseconds == 0) { return false; } @@ -182,13 +182,13 @@ public bool ForceFlush(int timeoutMillis = Timeout.Infinite) while (true) { - if (timeoutMillis == Timeout.Infinite) + if (timeoutMilliseconds == Timeout.Infinite) { WaitHandle.WaitAny(triggers, pollingMillis); } else { - var timeout = (long)timeoutMillis - sw.ElapsedMilliseconds; + var timeout = (long)timeoutMilliseconds - sw.ElapsedMilliseconds; if (timeout <= 0) { @@ -210,46 +210,30 @@ public bool ForceFlush(int timeoutMillis = Timeout.Infinite) } } - /// - /// If the is canceled. - public override Task ForceFlushAsync(CancellationToken cancellationToken) - { - // TODO - throw new NotImplementedException(); - } - /// - /// Attempt to drain the queue and shutdown the exporter, blocks the + /// Attempts to drain the queue and shutdown the exporter, blocks the /// current thread until shutdown completed or timed out. /// - /// + /// /// The number of milliseconds to wait, or Timeout.Infinite to /// wait indefinitely. /// - public void Shutdown(int timeoutMillis = Timeout.Infinite) + public override void Shutdown(int timeoutMilliseconds = Timeout.Infinite) { - if (timeoutMillis < 0 && timeoutMillis != Timeout.Infinite) + if (timeoutMilliseconds < 0 && timeoutMilliseconds != Timeout.Infinite) { - throw new ArgumentOutOfRangeException(nameof(timeoutMillis)); + throw new ArgumentOutOfRangeException(nameof(timeoutMilliseconds)); } this.shutdownDrainTarget = this.circularBuffer.AddedCount; this.shutdownTrigger.Set(); - if (timeoutMillis != 0) + if (timeoutMilliseconds != 0) { - this.exporterThread.Join(timeoutMillis); + this.exporterThread.Join(timeoutMilliseconds); } } - /// - /// If the is canceled. - public override Task ShutdownAsync(CancellationToken cancellationToken) - { - // TODO - throw new NotImplementedException(); - } - /// /// Releases the unmanaged resources used by this class and optionally releases the managed resources. /// diff --git a/src/OpenTelemetry/Trace/CompositeActivityProcessor.cs b/src/OpenTelemetry/Trace/CompositeActivityProcessor.cs index 975b179ef35..37d8178b20a 100644 --- a/src/OpenTelemetry/Trace/CompositeActivityProcessor.cs +++ b/src/OpenTelemetry/Trace/CompositeActivityProcessor.cs @@ -69,6 +69,7 @@ public CompositeActivityProcessor AddProcessor(ActivityProcessor processor) return this; } + /// public override void OnEnd(Activity activity) { var cur = this.head; @@ -80,6 +81,7 @@ public override void OnEnd(Activity activity) } } + /// public override void OnStart(Activity activity) { var cur = this.head; @@ -91,32 +93,75 @@ public override void OnStart(Activity activity) } } - public override Task ShutdownAsync(CancellationToken cancellationToken) + /// + public override bool ForceFlush(int timeoutMilliseconds = Timeout.Infinite) { + if (timeoutMilliseconds < 0 && timeoutMilliseconds != Timeout.Infinite) + { + throw new ArgumentOutOfRangeException(nameof(timeoutMilliseconds)); + } + var cur = this.head; - var task = cur.Value.ShutdownAsync(cancellationToken); - for (cur = cur.Next; cur != null; cur = cur.Next) + var sw = Stopwatch.StartNew(); + + while (cur != null) { - var processor = cur.Value; - task = task.ContinueWith(t => processor.ShutdownAsync(cancellationToken)); + if (timeoutMilliseconds == Timeout.Infinite) + { + var succeeded = cur.Value.ForceFlush(Timeout.Infinite); + } + else + { + var timeout = (long)timeoutMilliseconds - sw.ElapsedMilliseconds; + + if (timeout <= 0) + { + return false; + } + + var succeeded = cur.Value.ForceFlush((int)timeout); + + if (!succeeded) + { + return false; + } + } + + cur = cur.Next; } - return task; + return true; } - public override Task ForceFlushAsync(CancellationToken cancellationToken) + /// + public override void Shutdown(int timeoutMilliseconds = Timeout.Infinite) { + if (timeoutMilliseconds < 0 && timeoutMilliseconds != Timeout.Infinite) + { + throw new ArgumentOutOfRangeException(nameof(timeoutMilliseconds)); + } + var cur = this.head; - var task = cur.Value.ForceFlushAsync(cancellationToken); - for (cur = cur.Next; cur != null; cur = cur.Next) + var sw = Stopwatch.StartNew(); + + while (cur != null) { - var processor = cur.Value; - task = task.ContinueWith(t => processor.ForceFlushAsync(cancellationToken)); - } + if (timeoutMilliseconds == Timeout.Infinite) + { + cur.Value.Shutdown(Timeout.Infinite); + } + else + { + var timeout = (long)timeoutMilliseconds - sw.ElapsedMilliseconds; - return task; + // notify all the processors, even if we run overtime + cur.Value.Shutdown((int)Math.Max(timeout, 0)); + } + + cur = cur.Next; + } } protected override void Dispose(bool disposing) diff --git a/src/OpenTelemetry/Trace/ReentrantExportActivityProcessor.cs b/src/OpenTelemetry/Trace/ReentrantExportActivityProcessor.cs index c7847b138d4..75926080970 100644 --- a/src/OpenTelemetry/Trace/ReentrantExportActivityProcessor.cs +++ b/src/OpenTelemetry/Trace/ReentrantExportActivityProcessor.cs @@ -53,19 +53,14 @@ public override void OnEnd(Activity activity) } /// - public override Task ShutdownAsync(CancellationToken cancellationToken) + public override void Shutdown(int timeoutMilliseconds = Timeout.Infinite) { if (!this.stopped) { + // TODO: pass down the timeout to exporter this.exporter.Shutdown(); this.stopped = true; } - -#if NET452 - return Task.FromResult(0); -#else - return Task.CompletedTask; -#endif } /// diff --git a/test/OpenTelemetry.Exporter.Jaeger.Tests/TestActivityProcessor.cs b/test/OpenTelemetry.Exporter.Jaeger.Tests/TestActivityProcessor.cs index a4dfb3715a8..175997c215d 100644 --- a/test/OpenTelemetry.Exporter.Jaeger.Tests/TestActivityProcessor.cs +++ b/test/OpenTelemetry.Exporter.Jaeger.Tests/TestActivityProcessor.cs @@ -53,24 +53,15 @@ public override void OnEnd(Activity activity) this.EndAction?.Invoke(activity); } - public override Task ShutdownAsync(CancellationToken cancellationToken) + public override bool ForceFlush(int timeoutMilliseconds = Timeout.Infinite) { - this.ShutdownCalled = true; -#if NET452 - return Task.FromResult(0); -#else - return Task.CompletedTask; -#endif + this.ForceFlushCalled = true; + return true; } - public override Task ForceFlushAsync(CancellationToken cancellationToken) + public override void Shutdown(int timeoutMilliseconds = Timeout.Infinite) { - this.ForceFlushCalled = true; -#if NET452 - return Task.FromResult(0); -#else - return Task.CompletedTask; -#endif + this.ShutdownCalled = true; } protected override void Dispose(bool disposing) diff --git a/test/OpenTelemetry.Exporter.OpenTelemetryProtocol.Tests/TestActivityProcessor.cs b/test/OpenTelemetry.Exporter.OpenTelemetryProtocol.Tests/TestActivityProcessor.cs index b90ab90be95..c3daaaeeeaf 100644 --- a/test/OpenTelemetry.Exporter.OpenTelemetryProtocol.Tests/TestActivityProcessor.cs +++ b/test/OpenTelemetry.Exporter.OpenTelemetryProtocol.Tests/TestActivityProcessor.cs @@ -53,24 +53,15 @@ public override void OnEnd(Activity span) this.EndAction?.Invoke(span); } - public override Task ShutdownAsync(CancellationToken cancellationToken) + public override bool ForceFlush(int timeoutMilliseconds = Timeout.Infinite) { - this.ShutdownCalled = true; -#if NET452 - return Task.FromResult(0); -#else - return Task.CompletedTask; -#endif + this.ForceFlushCalled = true; + return true; } - public override Task ForceFlushAsync(CancellationToken cancellationToken) + public override void Shutdown(int timeoutMilliseconds = Timeout.Infinite) { - this.ForceFlushCalled = true; -#if NET452 - return Task.FromResult(0); -#else - return Task.CompletedTask; -#endif + this.ShutdownCalled = true; } protected override void Dispose(bool disposing) diff --git a/test/OpenTelemetry.Exporter.ZPages.Tests/TestActivityProcessor.cs b/test/OpenTelemetry.Exporter.ZPages.Tests/TestActivityProcessor.cs index 94bab79600c..7c01cc3a73a 100644 --- a/test/OpenTelemetry.Exporter.ZPages.Tests/TestActivityProcessor.cs +++ b/test/OpenTelemetry.Exporter.ZPages.Tests/TestActivityProcessor.cs @@ -54,24 +54,15 @@ public override void OnEnd(Activity activity) this.EndAction?.Invoke(activity); } - public override Task ShutdownAsync(CancellationToken cancellationToken) + public override bool ForceFlush(int timeoutMilliseconds = Timeout.Infinite) { - this.ShutdownCalled = true; -#if NET452 - return Task.FromResult(0); -#else - return Task.CompletedTask; -#endif + this.ForceFlushCalled = true; + return true; } - public override Task ForceFlushAsync(CancellationToken cancellationToken) + public override void Shutdown(int timeoutMilliseconds = Timeout.Infinite) { - this.ForceFlushCalled = true; -#if NET452 - return Task.FromResult(0); -#else - return Task.CompletedTask; -#endif + this.ShutdownCalled = true; } protected override void Dispose(bool disposing) diff --git a/test/OpenTelemetry.Exporter.Zipkin.Tests/TestActivityProcessor.cs b/test/OpenTelemetry.Exporter.Zipkin.Tests/TestActivityProcessor.cs index 414a9d6aea4..0d60e4a6905 100644 --- a/test/OpenTelemetry.Exporter.Zipkin.Tests/TestActivityProcessor.cs +++ b/test/OpenTelemetry.Exporter.Zipkin.Tests/TestActivityProcessor.cs @@ -53,24 +53,15 @@ public override void OnEnd(Activity span) this.EndAction?.Invoke(span); } - public override Task ShutdownAsync(CancellationToken cancellationToken) + public override bool ForceFlush(int timeoutMilliseconds = Timeout.Infinite) { - this.ShutdownCalled = true; -#if NET452 - return Task.FromResult(0); -#else - return Task.CompletedTask; -#endif + this.ForceFlushCalled = true; + return true; } - public override Task ForceFlushAsync(CancellationToken cancellationToken) + public override void Shutdown(int timeoutMilliseconds = Timeout.Infinite) { - this.ForceFlushCalled = true; -#if NET452 - return Task.FromResult(0); -#else - return Task.CompletedTask; -#endif + this.ShutdownCalled = true; } protected override void Dispose(bool disposing) diff --git a/test/OpenTelemetry.Tests/Shared/TestActivityProcessor.cs b/test/OpenTelemetry.Tests/Shared/TestActivityProcessor.cs index 4dd51585231..134666eab88 100644 --- a/test/OpenTelemetry.Tests/Shared/TestActivityProcessor.cs +++ b/test/OpenTelemetry.Tests/Shared/TestActivityProcessor.cs @@ -53,24 +53,15 @@ public override void OnEnd(Activity span) this.EndAction?.Invoke(span); } - public override Task ShutdownAsync(CancellationToken cancellationToken) + public override bool ForceFlush(int timeoutMilliseconds = Timeout.Infinite) { - this.ShutdownCalled = true; -#if NET452 - return Task.FromResult(0); -#else - return Task.CompletedTask; -#endif + this.ForceFlushCalled = true; + return true; } - public override Task ForceFlushAsync(CancellationToken cancellationToken) + public override void Shutdown(int timeoutMilliseconds = Timeout.Infinite) { - this.ForceFlushCalled = true; -#if NET452 - return Task.FromResult(0); -#else - return Task.CompletedTask; -#endif + this.ShutdownCalled = true; } protected override void Dispose(bool disposing) diff --git a/test/OpenTelemetry.Tests/Trace/CompositeActivityProcessorTests.cs b/test/OpenTelemetry.Tests/Trace/CompositeActivityProcessorTests.cs index b7b81a52748..1a3f59d4c99 100644 --- a/test/OpenTelemetry.Tests/Trace/CompositeActivityProcessorTests.cs +++ b/test/OpenTelemetry.Tests/Trace/CompositeActivityProcessorTests.cs @@ -77,7 +77,7 @@ public void CompositeActivityProcessor_ShutsDownAll() using (var processor = new CompositeActivityProcessor(new[] { p1, p2 })) { - processor.ShutdownAsync(default).Wait(); + processor.Shutdown(); Assert.True(p1.ShutdownCalled); Assert.True(p2.ShutdownCalled); } @@ -91,7 +91,7 @@ public void CompositeActivityProcessor_ForceFlush() using (var processor = new CompositeActivityProcessor(new[] { p1, p2 })) { - processor.ForceFlushAsync(default).Wait(); + processor.ForceFlush(); Assert.True(p1.ForceFlushCalled); Assert.True(p2.ForceFlushCalled); }