Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement context.destination.address/port for DB spans #665

Merged
merged 9 commits into from
Jan 29, 2020
4 changes: 2 additions & 2 deletions src/Elastic.Apm.EntityFramework6/Ef6Interceptor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ private void DoStartSpan<TResult>(IDbCommand command, DbCommandInterceptionConte

LogEvent("DB operation started - starting a new span...", command, interceptCtx, dbgOriginalCaller);

var span = DbSpanCommon.StartSpan(Agent.Instance, command);
var span = Agent.Instance.TracerInternal.DbSpanCommon.StartSpan(Agent.Instance, command);
interceptCtx.SetUserState(_userStateKey, span);
}

Expand All @@ -131,7 +131,7 @@ private void DoEndSpan<TResult>(IDbCommand command, DbCommandInterceptionContext

LogEvent("DB operation ended - ending the corresponding span...", command, interceptCtx, dbgOriginalCaller);

DbSpanCommon.EndSpan(span, command);
Agent.Instance.TracerInternal.DbSpanCommon.EndSpan(span, command);
}
}
}
Expand Down
18 changes: 8 additions & 10 deletions src/Elastic.Apm.EntityFrameworkCore/EfCoreDiagnosticListener.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Data;
using Elastic.Apm.Api;
using Elastic.Apm.DiagnosticSource;
using Elastic.Apm.Model;
using Microsoft.EntityFrameworkCore.Diagnostics;
Expand All @@ -11,10 +9,10 @@ namespace Elastic.Apm.EntityFrameworkCore
{
internal class EfCoreDiagnosticListener : IDiagnosticListener
{
private readonly IApmAgent _agent;
private readonly ApmAgent _agent;
private readonly ConcurrentDictionary<Guid, Span> _spans = new ConcurrentDictionary<Guid, Span>();

public EfCoreDiagnosticListener(IApmAgent agent) => _agent = agent;
public EfCoreDiagnosticListener(IApmAgent agent) => _agent = (ApmAgent)agent;

public string Name => "Microsoft.EntityFrameworkCore";

Expand All @@ -26,27 +24,27 @@ public void OnNext(KeyValuePair<string, object> kv)
{
switch (kv.Key)
{
case string k when k == RelationalEventId.CommandExecuting.Name && _agent.Tracer.CurrentTransaction != null:
case { } k when k == RelationalEventId.CommandExecuting.Name && _agent.Tracer.CurrentTransaction != null:
if (kv.Value is CommandEventData commandEventData)
{
var newSpan = DbSpanCommon.StartSpan(_agent, commandEventData.Command);
var newSpan = _agent.TracerInternal.DbSpanCommon.StartSpan(_agent, commandEventData.Command);
_spans.TryAdd(commandEventData.CommandId, newSpan);
}
break;
case string k when k == RelationalEventId.CommandExecuted.Name:
case { } k when k == RelationalEventId.CommandExecuted.Name:
if (kv.Value is CommandExecutedEventData commandExecutedEventData)
{
if (_spans.TryRemove(commandExecutedEventData.CommandId, out var span))
DbSpanCommon.EndSpan(span, commandExecutedEventData.Command, commandExecutedEventData.Duration);
_agent.TracerInternal.DbSpanCommon.EndSpan(span, commandExecutedEventData.Command, commandExecutedEventData.Duration);
}
break;
case string k when k == RelationalEventId.CommandError.Name:
case { } k when k == RelationalEventId.CommandError.Name:
if (kv.Value is CommandErrorEventData commandErrorEventData)
{
if (_spans.TryRemove(commandErrorEventData.CommandId, out var span))
{
span.CaptureException(commandErrorEventData.Exception);
DbSpanCommon.EndSpan(span, commandErrorEventData.Command, commandErrorEventData.Duration);
_agent.TracerInternal.DbSpanCommon.EndSpan(span, commandErrorEventData.Command, commandErrorEventData.Duration);
}
}
break;
Expand Down
10 changes: 5 additions & 5 deletions src/Elastic.Apm.SqlClient/SqlClientDiagnosticListener.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ private class PropertyFetcherSet
public PropertyFetcher Exception { get; } = new PropertyFetcher("Exception");
}

private readonly IApmAgent _apmAgent;
private readonly ApmAgent _apmAgent;
private readonly IApmLogger _logger;

private readonly ConcurrentDictionary<Guid, Span> _spans = new ConcurrentDictionary<Guid, Span>();
Expand All @@ -36,7 +36,7 @@ private class PropertyFetcherSet

public SqlClientDiagnosticListener(IApmAgent apmAgent)
{
_apmAgent = apmAgent;
_apmAgent = (ApmAgent)apmAgent;
_logger = _apmAgent.Logger.Scoped(nameof(SqlClientDiagnosticListener));
}

Expand Down Expand Up @@ -69,7 +69,7 @@ private void HandleStartCommand(object payloadData, PropertyFetcherSet propertyF
if (propertyFetcherSet.StartCorrelationId.Fetch(payloadData) is Guid operationId
&& propertyFetcherSet.StartCommand.Fetch(payloadData) is IDbCommand dbCommand)
{
var span = DbSpanCommon.StartSpan(_apmAgent, dbCommand);
var span = _apmAgent.TracerInternal.DbSpanCommon.StartSpan(_apmAgent, dbCommand);
_spans.TryAdd(operationId, span);
}
}
Expand All @@ -95,7 +95,7 @@ private void HandleStopCommand(object payloadData, PropertyFetcherSet propertyFe
statistics.ContainsKey("ExecutionTime") && statistics["ExecutionTime"] is long durationInMs)
duration = TimeSpan.FromMilliseconds(durationInMs);

DbSpanCommon.EndSpan(span, dbCommand, duration);
_apmAgent.TracerInternal.DbSpanCommon.EndSpan(span, dbCommand, duration);
}
}
catch (Exception ex)
Expand All @@ -117,7 +117,7 @@ private void HandleErrorCommand(object payloadData, PropertyFetcherSet propertyF

if (propertyFetcherSet.ErrorCommand.Fetch(payloadData) is IDbCommand dbCommand)
{
DbSpanCommon.EndSpan(span, dbCommand);
_apmAgent.TracerInternal.DbSpanCommon.EndSpan(span, dbCommand);
}
else
{
Expand Down
3 changes: 3 additions & 0 deletions src/Elastic.Apm/Api/ApiConstants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ public struct ApiConstants
public const string SubtypeHttp = "http";
public const string SubtypeMssql = "mssql";
public const string SubtypeSqLite = "sqlite";
public const string SubtypeMySql = "mysql";
public const string SubtypeOracle = "oracle";
public const string SubtypePostgreSql = "postgresql";

public const string TypeDb = "db";
public const string TypeExternal = "external";
Expand Down
2 changes: 1 addition & 1 deletion src/Elastic.Apm/Api/Database.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ namespace Elastic.Apm.Api
public class Database
{
public const string TypeElasticsearch = "elasticsearch";

public const string TypeSql = "sql";

public string Instance { get; set; }

[JsonConverter(typeof(TrimmedStringJsonConverter), 10_000)]
Expand Down
3 changes: 3 additions & 0 deletions src/Elastic.Apm/Api/Tracer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ ICurrentExecutionSegmentsContainer currentExecutionSegmentsContainer
_sender = payloadSender.ThrowIfArgumentNull(nameof(payloadSender));
_configProvider = configProvider.ThrowIfArgumentNull(nameof(configProvider));
CurrentExecutionSegmentsContainer = currentExecutionSegmentsContainer.ThrowIfArgumentNull(nameof(currentExecutionSegmentsContainer));
DbSpanCommon = new DbSpanCommon(logger);
}

internal ICurrentExecutionSegmentsContainer CurrentExecutionSegmentsContainer { get; }
Expand All @@ -38,6 +39,8 @@ ICurrentExecutionSegmentsContainer currentExecutionSegmentsContainer

public ITransaction CurrentTransaction => CurrentExecutionSegmentsContainer.CurrentTransaction;

public DbSpanCommon DbSpanCommon { get; }

public ITransaction StartTransaction(string name, string type, DistributedTracingData distributedTracingData = null) =>
StartTransactionInternal(name, type, distributedTracingData);

Expand Down
233 changes: 233 additions & 0 deletions src/Elastic.Apm/Helpers/DbConnectionStringParser.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,233 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Globalization;
using System.Threading;
using Elastic.Apm.Api;
using Elastic.Apm.Logging;

namespace Elastic.Apm.Helpers
{
internal class DbConnectionStringParser
{
private const string ThisClassName = nameof(DbConnectionStringParser);
internal const int MaxCacheSize = 100;

private readonly IApmLogger _logger;
private readonly ConcurrentDictionary<string, Destination> _cache = new ConcurrentDictionary<string, Destination>();
// We keep count for _cache because ConcurrentDictionary.Count is very heavy operation
// (for example see https://github.com/dotnet/corefx/issues/3357)
private volatile int _cacheCount;

internal DbConnectionStringParser(IApmLogger logger) => _logger = logger.Scoped(ThisClassName);

/// <returns><c>Destination</c> if successful and <c>null</c> otherwise</returns>
internal Destination ExtractDestination(string dbConnectionString) => ExtractDestination(dbConnectionString, out _);

/// <summary>
/// Used only by tests.
/// </summary>
internal Destination ExtractDestination(string dbConnectionString, out bool wasFoundInCache)
{
if (_cache.TryGetValue(dbConnectionString, out var destination))
{
wasFoundInCache = true;
return destination;
}

wasFoundInCache = false;
destination = ParseConnectionString(dbConnectionString);
if (_cacheCount < MaxCacheSize && _cache.TryAdd(dbConnectionString, destination)) Interlocked.Increment(ref _cacheCount);
return destination;
}

private static readonly Dictionary<string, Action<string, Destination>> KeyToPropertySetter =
new Dictionary<string, Action<string, Destination>>(StringComparer.OrdinalIgnoreCase)
{
{ "Server" , ParseServerValue },
{ "Data Source" , ParseServerValue },
{ "Host" , ParseServerValue },
{ "Hostname" , ParseServerValue },
{ "Network Address" , ParseServerValue },
{ "dbq" , ParseServerValue },
{ "Port" , ParsePortValue }
};

private const char KeyValuePairsSeparator = ';';
private const char KeyValueSeparator = '=';
private const char ServerNameDbInstanceSeparator = '\\';
private const string SqlServerLocalDbPrefix = "(LocalDB)";
private const string SqlServerExpressUserInstancePrefix = @".\";
private const string SqlAzurePrefix = @"tcp:";
private const string OracleXeClientSuffix = @"/XE";
private static readonly IEnumerable<string> DiscardablePrefixes = new List<string>
{
SqlAzurePrefix
};
private static readonly IEnumerable<string> DiscardableSuffixes = new List<string>
{
OracleXeClientSuffix
};

/// <returns><c>Destination</c> if successful and <c>null</c> otherwise</returns>
private Destination ParseConnectionString(string dbConnectionString)
{
Destination result = null;
foreach (var keyValueString in dbConnectionString.Split(KeyValuePairsSeparator))
{
if (string.IsNullOrWhiteSpace(keyValueString)) continue;

var keyValueArray = keyValueString.Split(KeyValueSeparator);
if (keyValueArray.Length != 2)
{
_logger.Trace()?.Log("Encountered invalid key-value pair - skipping it."
+ " keyValueString: `{KeyValueString}'. keyValueArray: {KeyValueArray}. dbConnectionString: {DbConnectionString}."
, keyValueString, keyValueArray, dbConnectionString);
continue;
}

if (! KeyToPropertySetter.TryGetValue(keyValueArray[0].Trim(), out var propSetter)) continue;

if (result == null) result = new Destination();

try
{
propSetter(keyValueArray[1].Trim(), result);
}
catch (Exception ex)
{
_logger.Trace()?.LogException(ex, "Encountered invalid value for a known key"
+ " - considering the whole connection string as invalid and returning null."
+ " keyValueString: `{KeyValueString}'. keyValueArray: {KeyValueArray}. dbConnectionString: {DbConnectionString}."
, keyValueString, keyValueArray, dbConnectionString);
return null;
}
}

return result;
}

private static void ParseServerValue(string valueToParseArg, Destination destination)
{
var valueToParse = TrimDiscardable(valueToParseArg);

if (valueToParse.StartsWith(SqlServerLocalDbPrefix, StringComparison.OrdinalIgnoreCase)
|| valueToParse.StartsWith(SqlServerExpressUserInstancePrefix, StringComparison.OrdinalIgnoreCase))
{
destination.Address = "localhost";
return;
}

var dbInstanceSeparatorIndex = valueToParse.IndexOf(ServerNameDbInstanceSeparator);
if (dbInstanceSeparatorIndex != -1) valueToParse = valueToParse.Substring(0, dbInstanceSeparatorIndex);
ParseServerWithOptionalPort(valueToParse, destination);
}

private static string TrimDiscardable(string valueToParse)
{
var currentResult = valueToParse;

foreach (var discardablePrefix in DiscardablePrefixes)
{
if (currentResult.StartsWith(discardablePrefix, StringComparison.OrdinalIgnoreCase))
currentResult = currentResult.Substring(discardablePrefix.Length);
}

foreach (var discardableSuffix in DiscardableSuffixes)
{
if (currentResult.EndsWith(discardableSuffix, StringComparison.OrdinalIgnoreCase))
currentResult = currentResult.Substring(0, currentResult.Length - discardableSuffix.Length);
}

return currentResult;
}

private static void ParseServerWithOptionalPort(string valueToParseArg, Destination destination)
{
// Possible values:
// - Name/IPv4 with/without port: "Name_or_IPv4_address", "Name_or_IPv4_address:port", "Name_or_IPv4_address,port"
// - IPv6 with/without port: "IPv6_address", "[IPv6_address]", "[IPv6_address]:port", "IPv6_address,port" or even "[IPv6_address],port"

var valueToParse = valueToParseArg.Trim();
if (valueToParse.IsEmpty())
throw new FormatException($"Server address part is white space only/empty string. valueToParseArg: `{valueToParseArg}'.");

var commaIndex = valueToParse.IndexOf(',');
if (commaIndex != -1)
{
// Server part has comma which means it's "Name_or_IPv4_address,port", "IPv6_address,port" or "[IPv6_address],port"
destination.Address = ParseAddress(valueToParse.Substring(0, commaIndex));
ParsePortValue(valueToParse.Substring(commaIndex + 1), destination);
return;
}

var firstColumnIndex = valueToParse.IndexOf(':');
if (firstColumnIndex == -1)
{
// Server part doesn't have even one column which means it's "Name_or_IPv4_address"
destination.Address = valueToParse.Trim();
return;
}

var lastColumnIndex = valueToParse.LastIndexOf(':');
if (firstColumnIndex == lastColumnIndex)
{
// Server part has just one column which means it's "Name_or_IPv4_address:port"
destination.Address = ParseAddress(valueToParse.Substring(0, firstColumnIndex));
ParsePortValue(valueToParse.Substring(firstColumnIndex + 1), destination);
return;
}

// Server part has more than one column which means it's "IPv6_address", "[IPv6_address]" or "[IPv6_address]:port"

if (valueToParse[0] != '[')
{
// Server part doesn't start with '[' which means it's "IPv6_address"
destination.Address = valueToParse;
return;
}

// Server part starts with '[' which means it's "[IPv6_address]" or "[IPv6_address]:port"

if (valueToParse[valueToParse.Length - 1] == ']')
{
// Server part ends with ']' which means it's "[IPv6_address]"
destination.Address = ParseAddress(valueToParse);
return;
}

// Server part doesn't end with ']' which means it's "[IPv6_address]:port"

destination.Address = ParseAddress(valueToParse.Substring(0, lastColumnIndex));
ParsePortValue(valueToParse.Substring(lastColumnIndex + 1), destination);
}

private static string ParseAddress(string valueToParseArg)
{
// Possible values: "Name_or_IPv4_address", "IPv6_address", "[IPv6_address]"

var valueToParse = valueToParseArg.Trim();
if (valueToParse.IsEmpty())
throw new FormatException($"Server address part is white space only/empty string. valueToParseArg: `{valueToParseArg}'.");

var startIndex = valueToParse[0] == '[' ? 1 : 0;
var endIndex = valueToParse[valueToParse.Length - 1] == ']' ? valueToParse.Length - 1 : valueToParse.Length;

return valueToParse.Substring(startIndex, endIndex - startIndex).Trim();
}

private static void ParsePortValue(string valueToParse, Destination destination)
{
if (string.IsNullOrWhiteSpace(valueToParse))
throw new FormatException($"Port part of server value is white space only/empty string. valueToParse: `{valueToParse}'.");

if (! int.TryParse(valueToParse, NumberStyles.Integer, CultureInfo.InvariantCulture, out var port))
throw new FormatException($"Failed to parse port part of server value. valueToParse: `{valueToParse}'.");

if (port < 0)
throw new FormatException($"Port part of server value is a negative integer. port: {port}. valueToParse: `{valueToParse}'.");

destination.Port = port;
}
}
}
2 changes: 2 additions & 0 deletions src/Elastic.Apm/Helpers/StringExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,7 @@ internal static string Repeat(this string input, int count)
// Credit: https://stackoverflow.com/a/444818/973581
internal static bool ContainsOrdinalIgnoreCase(this string thisObj, string subStr) =>
thisObj.IndexOf(subStr, StringComparison.OrdinalIgnoreCase) >= 0;

internal static string ToLog(this string thisObj) => "`" + thisObj + "'";
}
}
Loading