Skip to content

Commit

Permalink
Added wrapper for Marten repository instead of the internal logic for…
Browse files Browse the repository at this point in the history
… tracing
  • Loading branch information
oskardudycz committed Dec 8, 2022
1 parent c063d4d commit 7cc1744
Show file tree
Hide file tree
Showing 3 changed files with 134 additions and 99 deletions.
28 changes: 20 additions & 8 deletions Core.Marten/Repository/Config.cs
Original file line number Diff line number Diff line change
@@ -1,33 +1,45 @@
using Core.Aggregates;
using Core.OpenTelemetry;
using Core.OptimisticConcurrency;
using Marten;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;

namespace Core.Marten.Repository;

public static class Config
{
public static IServiceCollection AddMartenRepository<T>(
this IServiceCollection services,
bool withAppendScope = true
bool withAppendScope = true,
bool withTelemetry = true
) where T : class, IAggregate
{
services.AddScoped<MartenRepository<T>, MartenRepository<T>>();
services.AddScoped<IMartenRepository<T>, MartenRepository<T>>();

if (!withAppendScope)
{
services.AddScoped<IMartenRepository<T>, MartenRepository<T>>();
}
else
if (withAppendScope)
{
services.AddScoped<IMartenRepository<T>, MartenRepositoryWithETagDecorator<T>>(
sp => new MartenRepositoryWithETagDecorator<T>(
sp.GetRequiredService<MartenRepository<T>>(),
sp.GetRequiredService<IMartenRepository<T>>(),
sp.GetRequiredService<IExpectedResourceVersionProvider>(),
sp.GetRequiredService<INextResourceVersionProvider>()
)
);
}

if (withTelemetry)
{
services.AddScoped<IMartenRepository<T>, MartenRepositoryWithTracingDecorator<T>>(
sp => new MartenRepositoryWithTracingDecorator<T>(
sp.GetRequiredService<IMartenRepository<T>>(),
sp.GetRequiredService<IDocumentSession>(),
sp.GetRequiredService<IActivityScope>(),
sp.GetRequiredService<ILogger<MartenRepositoryWithTracingDecorator<T>>>()
)
);
}

return services;
}
}
115 changes: 24 additions & 91 deletions Core.Marten/Repository/MartenRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,111 +17,44 @@ public interface IMartenRepository<T> where T : class, IAggregate
public class MartenRepository<T>: IMartenRepository<T> where T : class, IAggregate
{
private readonly IDocumentSession documentSession;
private readonly IActivityScope activityScope;
private readonly ILogger<MartenRepository<T>> logger;

public MartenRepository(
IDocumentSession documentSession,
IActivityScope activityScope,
ILogger<MartenRepository<T>> logger
)
{
public MartenRepository(IDocumentSession documentSession) =>
this.documentSession = documentSession;
this.activityScope = activityScope;
this.logger = logger;
}

public Task<T?> Find(Guid id, CancellationToken cancellationToken) =>
documentSession.Events.AggregateStreamAsync<T>(id, token: cancellationToken);

public Task<long> Add(T aggregate, CancellationToken cancellationToken = default) =>
activityScope.Run($"{typeof(MartenRepository<T>).Name}/{nameof(Add)}",
async (activity, ct) =>
{
PropagateTelemetry(activity);

var events = aggregate.DequeueUncommittedEvents();

documentSession.Events.StartStream<Aggregate>(
aggregate.Id,
events
);

await documentSession.SaveChangesAsync(ct).ConfigureAwait(false);

return (long)events.Length;
},
new StartActivityOptions { Tags = { { TelemetryTags.Logic.Entity, typeof(T).Name } } },
cancellationToken
);

public Task<long> Update(T aggregate, long? expectedVersion = null, CancellationToken token = default) =>
activityScope.Run($"MartenRepository/{nameof(Update)}",
async (activity, ct) =>
{
PropagateTelemetry(activity);

var events = aggregate.DequeueUncommittedEvents();
public Task<T?> Find(Guid id, CancellationToken ct) =>
documentSession.Events.AggregateStreamAsync<T>(id, token: ct);

var nextVersion = (expectedVersion ?? aggregate.Version) + events.Length;

documentSession.Events.Append(
aggregate.Id,
nextVersion,
events
);

await documentSession.SaveChangesAsync(ct).ConfigureAwait(false);
public async Task<long> Add(T aggregate, CancellationToken ct = default)
{
var events = aggregate.DequeueUncommittedEvents();

return nextVersion;
},
new StartActivityOptions { Tags = { { TelemetryTags.Logic.Entity, typeof(T).Name } } },
token
documentSession.Events.StartStream<Aggregate>(
aggregate.Id,
events
);

public Task<long> Delete(T aggregate, long? expectedVersion = null, CancellationToken token = default) =>
activityScope.Run($"MartenRepository/{nameof(Delete)}",
async (activity, ct) =>
{
PropagateTelemetry(activity);
await documentSession.SaveChangesAsync(ct).ConfigureAwait(false);

var events = aggregate.DequeueUncommittedEvents();

var nextVersion = (expectedVersion ?? aggregate.Version) + events.Length;
return events.Length;
}

documentSession.Events.Append(
aggregate.Id,
nextVersion,
events
);
public async Task<long> Update(T aggregate, long? expectedVersion = null, CancellationToken ct = default)
{
var events = aggregate.DequeueUncommittedEvents();

await documentSession.SaveChangesAsync(ct).ConfigureAwait(false);
var nextVersion = (expectedVersion ?? aggregate.Version) + events.Length;

return nextVersion;
},
new StartActivityOptions { Tags = { { TelemetryTags.Logic.Entity, typeof(T).Name } } },
token
documentSession.Events.Append(
aggregate.Id,
nextVersion,
events
);

private void PropagateTelemetry(Activity? activity)
{
var propagationContext = activity.Propagate(documentSession, InjectTelemetryIntoDocumentSession);

if (!propagationContext.HasValue) return;
await documentSession.SaveChangesAsync(ct).ConfigureAwait(false);

documentSession.CorrelationId = propagationContext.Value.ActivityContext.TraceId.ToHexString();
documentSession.CausationId = propagationContext.Value.ActivityContext.SpanId.ToHexString();
return nextVersion;
}

private void InjectTelemetryIntoDocumentSession(IDocumentSession session, string key, string value)
{
try
{
session.SetHeader(key, value);
}
catch (Exception ex)
{
logger.LogError(ex, "Failed to inject trace context");
}
}
public Task<long> Delete(T aggregate, long? expectedVersion = null, CancellationToken ct = default) =>
Update(aggregate, expectedVersion, ct);
}
90 changes: 90 additions & 0 deletions Core.Marten/Repository/MartenRepositoryWithTelemetryDecorator.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
using System.Diagnostics;
using Core.Aggregates;
using Core.OpenTelemetry;
using Marten;
using Microsoft.Extensions.Logging;

namespace Core.Marten.Repository;

public class MartenRepositoryWithTracingDecorator<T>: IMartenRepository<T>
where T : class, IAggregate
{
private readonly IMartenRepository<T> inner;
private readonly IDocumentSession documentSession;
private readonly IActivityScope activityScope;
private readonly ILogger<MartenRepositoryWithTracingDecorator<T>> logger;

public MartenRepositoryWithTracingDecorator(
IMartenRepository<T> inner,
IDocumentSession documentSession,
IActivityScope activityScope,
ILogger<MartenRepositoryWithTracingDecorator<T>> logger
)
{
this.inner = inner;
this.activityScope = activityScope;
this.logger = logger;
this.documentSession = documentSession;
}

public Task<T?> Find(Guid id, CancellationToken cancellationToken) =>
inner.Find(id, cancellationToken);

public Task<long> Add(T aggregate, CancellationToken cancellationToken = default) =>
activityScope.Run($"MartenRepository/{nameof(Add)}",
(activity, ct) =>
{
PropagateTelemetry(activity);

return inner.Add(aggregate, ct);
},
new StartActivityOptions { Tags = { { TelemetryTags.Logic.Entity, typeof(T).Name } } },
cancellationToken
);

public Task<long> Update(T aggregate, long? expectedVersion = null, CancellationToken token = default) =>
activityScope.Run($"MartenRepository/{nameof(Update)}",
(activity, ct) =>
{
PropagateTelemetry(activity);

return inner.Update(aggregate, expectedVersion, ct);
},
new StartActivityOptions { Tags = { { TelemetryTags.Logic.Entity, typeof(T).Name } } },
token
);

public Task<long> Delete(T aggregate, long? expectedVersion = null, CancellationToken token = default) =>
activityScope.Run($"MartenRepository/{nameof(Delete)}",
(activity, ct) =>
{
PropagateTelemetry(activity);

return inner.Delete(aggregate, expectedVersion, ct);
},
new StartActivityOptions { Tags = { { TelemetryTags.Logic.Entity, typeof(T).Name } } },
token
);

private void PropagateTelemetry(Activity? activity)
{
var propagationContext = activity.Propagate(documentSession, InjectTelemetryIntoDocumentSession);

if (!propagationContext.HasValue) return;

documentSession.CorrelationId = propagationContext.Value.ActivityContext.TraceId.ToHexString();
documentSession.CausationId = propagationContext.Value.ActivityContext.SpanId.ToHexString();
}

private void InjectTelemetryIntoDocumentSession(IDocumentSession session, string key, string value)
{
try
{
session.SetHeader(key, value);
}
catch (Exception ex)
{
logger.LogError(ex, "Failed to inject trace context");
}
}
}

0 comments on commit 7cc1744

Please sign in to comment.