diff --git a/src/OpenTelemetry/Trace/SimpleActivityProcessor.cs b/src/OpenTelemetry/Trace/SimpleActivityProcessor.cs index 3dc28ba3299..623512956cd 100644 --- a/src/OpenTelemetry/Trace/SimpleActivityProcessor.cs +++ b/src/OpenTelemetry/Trace/SimpleActivityProcessor.cs @@ -14,6 +14,8 @@ // limitations under the License. // using System; +using System.Collections.Concurrent; +using System.Collections.Generic; using System.Diagnostics; using System.Threading; using System.Threading.Tasks; @@ -26,16 +28,73 @@ namespace OpenTelemetry.Trace /// public class SimpleActivityProcessor : ActivityProcessor, IDisposable { + private const int DefaultMaxQueueSize = 2048; + private const int DefaultMaxExportBatchSize = 512; + private static readonly TimeSpan DefaultExporterTimeout = TimeSpan.FromMilliseconds(30000); + private readonly ActivityExporter exporter; + private readonly int maxQueueSize; + private readonly TimeSpan exporterTimeout; + private readonly int maxExportBatchSize; + private readonly ConcurrentQueue activityQueue; + private readonly EventWaitHandle stopHandle = new EventWaitHandle(false, EventResetMode.ManualReset); + private readonly EventWaitHandle dataReadyHandle = new EventWaitHandle(false, EventResetMode.AutoReset); + private readonly Thread backgroundThread; + private volatile int currentQueueSize; private bool stopped; + /// + /// Initializes a new instance of the class with default parameters: + /// + /// + /// maxQueueSize = 2048, + /// + /// + /// scheduledDelay = 5 sec, + /// + /// + /// exporterTimeout = 30 sec, + /// + /// + /// maxExportBatchSize = 512 + /// + /// + /// + /// Exporter instance. + public SimpleActivityProcessor(ActivityExporter exporter) + : this(exporter, DefaultMaxQueueSize, DefaultExporterTimeout, DefaultMaxExportBatchSize) + { + } + /// /// Initializes a new instance of the class. /// /// Activity exporter instance. - public SimpleActivityProcessor(ActivityExporter exporter) + /// Maximum queue size. After the size is reached activities are dropped by processor. + /// Maximum allowed time to export data. + /// The maximum batch size of every export. It must be smaller or equal to maxQueueSize. + public SimpleActivityProcessor(ActivityExporter exporter, int maxQueueSize, TimeSpan exporterTimeout, int maxExportBatchSize) { + if (maxQueueSize <= 0) + { + throw new ArgumentOutOfRangeException(nameof(maxQueueSize)); + } + + if (maxExportBatchSize <= 0 || maxExportBatchSize > maxQueueSize) + { + throw new ArgumentOutOfRangeException(nameof(maxExportBatchSize)); + } + this.exporter = exporter ?? throw new ArgumentNullException(nameof(exporter)); + this.maxQueueSize = maxQueueSize; + this.exporterTimeout = exporterTimeout; + this.maxExportBatchSize = maxExportBatchSize; + + this.backgroundThread = new Thread(this.BackgroundThreadBody) + { + Name = "OpenTelemetry.Processor", + }; + this.backgroundThread.Start(); } /// @@ -46,17 +105,16 @@ public override void OnStart(Activity activity) /// public override void OnEnd(Activity activity) { - try + if (this.currentQueueSize >= this.maxQueueSize) { - // do not await, just start export - // it can still throw in synchronous part - - _ = this.exporter.ExportAsync(new[] { activity }, CancellationToken.None); - } - catch (Exception ex) - { - OpenTelemetrySdkEventSource.Log.SpanProcessorException(nameof(this.OnEnd), ex); + OpenTelemetrySdkEventSource.Log.SpanProcessorQueueIsExhausted(); + return; } + + Interlocked.Increment(ref this.currentQueueSize); + + this.activityQueue.Enqueue(activity); + this.dataReadyHandle.Set(); } /// @@ -65,6 +123,10 @@ public override Task ShutdownAsync(CancellationToken cancellationToken) if (!this.stopped) { this.stopped = true; + + this.stopHandle.Set(); + this.backgroundThread.Join(); + return this.exporter.ShutdownAsync(cancellationToken); } @@ -114,6 +176,67 @@ protected virtual void Dispose(bool isDisposing) { OpenTelemetrySdkEventSource.Log.SpanProcessorException(nameof(this.Dispose), e); } + + this.stopHandle.Dispose(); + this.dataReadyHandle.Dispose(); + } + } + } + + private void BackgroundThreadBody(object state) + { + WaitHandle[] handles = new WaitHandle[] { this.stopHandle, this.dataReadyHandle }; + List activities = new List(this.maxExportBatchSize); + + while (true) + { + int handleIndex = WaitHandle.WaitAny(handles); + + try + { + while (true) + { + // Read off the queue data that is ready to transmit, up to maxExportBatchSize. + while (this.activityQueue.TryDequeue(out Activity activity)) + { + Interlocked.Decrement(ref this.currentQueueSize); + activities.Add(activity); + if (activities.Count == this.maxExportBatchSize) + { + break; + } + } + + if (activities.Count == 0) + { + // No work to do, wait for signal to start again. + break; + } + + using var cts = new CancellationTokenSource(this.exporterTimeout); + try + { + this.exporter.ExportAsync(activities, cts.Token).ConfigureAwait(false); + } + catch (OperationCanceledException) + { + OpenTelemetrySdkEventSource.Log.SpanExporterTimeout(activities.Count); + } + finally + { + activities.Clear(); + } + } + } + catch (Exception ex) + { + OpenTelemetrySdkEventSource.Log.SpanProcessorException(nameof(this.BackgroundThreadBody), ex); + } + + if (handleIndex == 0) + { + // If shutdown was requested, exit thread. + return; } } }