Skip to content

Commit

Permalink
Moves locking processing to the engine
Browse files Browse the repository at this point in the history
* Adds `IMutexed` class. This can be implemented by an activity to signal that it can produce `Mutex` instances that are used to control when an activity should be run.
* The `GenerateBillOfMaterialsActivity` creates a `Mutex` for each unique directory that it's is asked to run on.
* If the `Mutex` can't be acquired right before the activity is handled, then it is placed back in the queue. This frees up the worker so that it can do other work. This is an improvement over the previous implementation because it was possible for all of the workers to be waiting on the same lock. With this approach that scenario is not possible. This also has the side benefit of it being to implement for future activities, and the activity's handle method doesn't have to be aware of the details of the locking mechanism.
  • Loading branch information
mscottford committed Oct 23, 2022
1 parent ea3fd1f commit 92965c6
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 14 deletions.
Original file line number Diff line number Diff line change
@@ -1,21 +1,22 @@
using System;
using System.Collections.Concurrent;
using System.IO;
using Corgibytes.Freshli.Cli.DataModel;
using System.Threading;
using Corgibytes.Freshli.Cli.Functionality.Engine;
using Corgibytes.Freshli.Cli.Services;
using Microsoft.Extensions.DependencyInjection;

namespace Corgibytes.Freshli.Cli.Functionality.BillOfMaterials;

public class GenerateBillOfMaterialsActivity : IApplicationActivity
public class GenerateBillOfMaterialsActivity : IApplicationActivity, IMutexed
{
public readonly string AgentExecutablePath;
public readonly Guid AnalysisId;
public readonly int HistoryStopPointId;
public readonly string ManifestPath;

private static readonly ConcurrentDictionary<string, object> s_lockPoints = new();
private static readonly ConcurrentDictionary<string, Mutex> s_historyPointMutexes = new();

public GenerateBillOfMaterialsActivity(Guid analysisId, string agentExecutablePath,
int historyStopPointId, string manifestPath)
Expand All @@ -26,6 +27,20 @@ public GenerateBillOfMaterialsActivity(Guid analysisId, string agentExecutablePa
ManifestPath = manifestPath;
}

public Mutex GetMutex(IServiceProvider provider)
{
var cacheManager = provider.GetRequiredService<ICacheManager>();
var cacheDb = cacheManager.GetCacheDb();

var historyStopPoint = cacheDb.RetrieveHistoryStopPoint(HistoryStopPointId);
// TODO create an exception class for this exception and write a test to cover it getting generated
_ = historyStopPoint ?? throw new Exception($"Failed to retrieve history stop point {HistoryStopPointId}");

var historyPointPath = historyStopPoint.LocalPath;
EnsureHistoryPointMutexExists(historyPointPath);
return s_historyPointMutexes[historyPointPath];
}

public void Handle(IApplicationEventEngine eventClient)
{
var agentManager = eventClient.ServiceProvider.GetRequiredService<IAgentManager>();
Expand All @@ -40,23 +55,19 @@ public void Handle(IApplicationEventEngine eventClient)
var historyPointPath = historyStopPoint.LocalPath;
var asOfDateTime = historyStopPoint.AsOfDateTime;

EnsureLockPointExists(historyPointPath);
lock (s_lockPoints[historyPointPath])
{
var fullManifestPath = Path.Combine(historyPointPath, ManifestPath);
var bomFilePath = agentReader.ProcessManifest(fullManifestPath, asOfDateTime);
var cachedBomFilePath = cacheManager.StoreBomInCache(bomFilePath, AnalysisId, asOfDateTime);
var fullManifestPath = Path.Combine(historyPointPath, ManifestPath);
var bomFilePath = agentReader.ProcessManifest(fullManifestPath, asOfDateTime);
var cachedBomFilePath = cacheManager.StoreBomInCache(bomFilePath, AnalysisId, asOfDateTime);

eventClient.Fire(new BillOfMaterialsGeneratedEvent(
AnalysisId, HistoryStopPointId, cachedBomFilePath, AgentExecutablePath));
}
eventClient.Fire(new BillOfMaterialsGeneratedEvent(
AnalysisId, HistoryStopPointId, cachedBomFilePath, AgentExecutablePath));
}

private void EnsureLockPointExists(string path)
private static void EnsureHistoryPointMutexExists(string path)
{
if (!s_lockPoints.ContainsKey(path))
if (!s_historyPointMutexes.ContainsKey(path))
{
s_lockPoints[path] = new object();
s_historyPointMutexes[path] = new Mutex();
}
}
}
22 changes: 22 additions & 0 deletions Corgibytes.Freshli.Cli/Functionality/Engine/ApplicationEngine.cs
Original file line number Diff line number Diff line change
Expand Up @@ -133,9 +133,24 @@ public void FireEventAndHandler(IApplicationEvent applicationEvent)
}
}

const int MutexWaitTimeoutInMilliseconds = 50;
// ReSharper disable once MemberCanBePrivate.Global
public void HandleActivity(IApplicationActivity activity)
{
Mutex? mutex = null;
if (activity is IMutexed mutexSource)
{
mutex = mutexSource.GetMutex(ServiceProvider);
}

var mutexAcquired = mutex?.WaitOne(MutexWaitTimeoutInMilliseconds) ?? true;
if (!mutexAcquired)
{
// place the activity back in the queue and free up the worker to make progress on a different activity
Dispatch(activity);
return;
}

try
{
_logger.LogDebug(
Expand All @@ -149,6 +164,13 @@ public void HandleActivity(IApplicationActivity activity)
{
Fire(new UnhandledExceptionEvent(error));
}
finally
{
if (mutex != null && mutexAcquired)
{
mutex.ReleaseMutex();
}
}
}

// ReSharper disable once MemberCanBePrivate.Global
Expand Down
9 changes: 9 additions & 0 deletions Corgibytes.Freshli.Cli/Functionality/Engine/IMutexed.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
using System;
using System.Threading;

namespace Corgibytes.Freshli.Cli.Functionality.Engine;

public interface IMutexed
{
public Mutex GetMutex(IServiceProvider serviceProvider);
}

0 comments on commit 92965c6

Please sign in to comment.