Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SimpleActivityProcessor improvements. #896

Closed
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 133 additions & 10 deletions src/OpenTelemetry/Trace/SimpleActivityProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
// limitations under the License.
// </copyright>
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
Expand All @@ -26,16 +28,73 @@ namespace OpenTelemetry.Trace
/// </summary>
public class SimpleActivityProcessor : ActivityProcessor, IDisposable
{
private const int DefaultMaxQueueSize = 2048;
private const int DefaultMaxExportBatchSize = 512;
private static readonly TimeSpan DefaultExporterTimeout = TimeSpan.FromMilliseconds(30000);

private readonly ActivityExporter exporter;
private readonly int maxQueueSize;
private readonly TimeSpan exporterTimeout;
private readonly int maxExportBatchSize;
private readonly ConcurrentQueue<Activity> activityQueue;
private readonly EventWaitHandle stopHandle = new EventWaitHandle(false, EventResetMode.ManualReset);
private readonly EventWaitHandle dataReadyHandle = new EventWaitHandle(false, EventResetMode.AutoReset);
private readonly Thread backgroundThread;
private volatile int currentQueueSize;
private bool stopped;

/// <summary>
/// Initializes a new instance of the <see cref="SimpleActivityProcessor"/> class with default parameters:
/// <list type="bullet">
/// <item>
/// <description>maxQueueSize = 2048,</description>
/// </item>
/// <item>
/// <description>scheduledDelay = 5 sec,</description>
/// </item>
/// <item>
/// <description>exporterTimeout = 30 sec,</description>
/// </item>
/// <item>
/// <description>maxExportBatchSize = 512</description>
/// </item>
/// </list>
/// </summary>
/// <param name="exporter">Exporter instance.</param>
public SimpleActivityProcessor(ActivityExporter exporter)
: this(exporter, DefaultMaxQueueSize, DefaultExporterTimeout, DefaultMaxExportBatchSize)
{
}

/// <summary>
/// Initializes a new instance of the <see cref="SimpleActivityProcessor"/> class.
/// </summary>
/// <param name="exporter">Activity exporter instance.</param>
public SimpleActivityProcessor(ActivityExporter exporter)
/// <param name="maxQueueSize">Maximum queue size. After the size is reached activities are dropped by processor.</param>
/// <param name="exporterTimeout">Maximum allowed time to export data.</param>
/// <param name="maxExportBatchSize">The maximum batch size of every export. It must be smaller or equal to maxQueueSize.</param>
public SimpleActivityProcessor(ActivityExporter exporter, int maxQueueSize, TimeSpan exporterTimeout, int maxExportBatchSize)
{
if (maxQueueSize <= 0)
{
throw new ArgumentOutOfRangeException(nameof(maxQueueSize));
}

if (maxExportBatchSize <= 0 || maxExportBatchSize > maxQueueSize)
{
throw new ArgumentOutOfRangeException(nameof(maxExportBatchSize));
}

this.exporter = exporter ?? throw new ArgumentNullException(nameof(exporter));
this.maxQueueSize = maxQueueSize;
this.exporterTimeout = exporterTimeout;
this.maxExportBatchSize = maxExportBatchSize;

this.backgroundThread = new Thread(this.BackgroundThreadBody)
{
Name = "OpenTelemetry.Processor",
};
this.backgroundThread.Start();
}

/// <inheritdoc />
Expand All @@ -46,17 +105,16 @@ public override void OnStart(Activity activity)
/// <inheritdoc />
public override void OnEnd(Activity activity)
{
try
if (this.currentQueueSize >= this.maxQueueSize)
{
// do not await, just start export
// it can still throw in synchronous part

_ = this.exporter.ExportAsync(new[] { activity }, CancellationToken.None);
}
catch (Exception ex)
{
OpenTelemetrySdkEventSource.Log.SpanProcessorException(nameof(this.OnEnd), ex);
OpenTelemetrySdkEventSource.Log.SpanProcessorQueueIsExhausted();
return;
}

Interlocked.Increment(ref this.currentQueueSize);

this.activityQueue.Enqueue(activity);
this.dataReadyHandle.Set();
}

/// <inheritdoc />
Expand All @@ -65,6 +123,10 @@ public override Task ShutdownAsync(CancellationToken cancellationToken)
if (!this.stopped)
{
this.stopped = true;

this.stopHandle.Set();
this.backgroundThread.Join();

return this.exporter.ShutdownAsync(cancellationToken);
}

Expand Down Expand Up @@ -114,6 +176,67 @@ protected virtual void Dispose(bool isDisposing)
{
OpenTelemetrySdkEventSource.Log.SpanProcessorException(nameof(this.Dispose), e);
}

this.stopHandle.Dispose();
this.dataReadyHandle.Dispose();
}
}
}

private void BackgroundThreadBody(object state)
{
WaitHandle[] handles = new WaitHandle[] { this.stopHandle, this.dataReadyHandle };
List<Activity> activities = new List<Activity>(this.maxExportBatchSize);

while (true)
{
int handleIndex = WaitHandle.WaitAny(handles);

try
{
while (true)
{
// Read off the queue data that is ready to transmit, up to maxExportBatchSize.
while (this.activityQueue.TryDequeue(out Activity activity))
{
Interlocked.Decrement(ref this.currentQueueSize);
activities.Add(activity);
if (activities.Count == this.maxExportBatchSize)
{
break;
}
}

if (activities.Count == 0)
{
// No work to do, wait for signal to start again.
break;
}

using var cts = new CancellationTokenSource(this.exporterTimeout);
try
{
this.exporter.ExportAsync(activities, cts.Token).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
OpenTelemetrySdkEventSource.Log.SpanExporterTimeout(activities.Count);
}
finally
{
activities.Clear();
}
}
}
catch (Exception ex)
{
OpenTelemetrySdkEventSource.Log.SpanProcessorException(nameof(this.BackgroundThreadBody), ex);
}

if (handleIndex == 0)
{
// If shutdown was requested, exit thread.
return;
}
}
}
Expand Down