From 92965c645f6050c3ee7f08c90faf104f0b48cd5e Mon Sep 17 00:00:00 2001 From: "M. Scott Ford" Date: Sun, 23 Oct 2022 14:01:38 -0400 Subject: [PATCH] Moves locking processing to the engine * Adds `IMutexed` class. This can be implemented by an activity to signal that it can produce `Mutex` instances that are used to control when an activity should be run. * The `GenerateBillOfMaterialsActivity` creates a `Mutex` for each unique directory that it's is asked to run on. * If the `Mutex` can't be acquired right before the activity is handled, then it is placed back in the queue. This frees up the worker so that it can do other work. This is an improvement over the previous implementation because it was possible for all of the workers to be waiting on the same lock. With this approach that scenario is not possible. This also has the side benefit of it being to implement for future activities, and the activity's handle method doesn't have to be aware of the details of the locking mechanism. --- .../GenerateBillOfMaterialsActivity.cs | 39 ++++++++++++------- .../Functionality/Engine/ApplicationEngine.cs | 22 +++++++++++ .../Functionality/Engine/IMutexed.cs | 9 +++++ 3 files changed, 56 insertions(+), 14 deletions(-) create mode 100644 Corgibytes.Freshli.Cli/Functionality/Engine/IMutexed.cs diff --git a/Corgibytes.Freshli.Cli/Functionality/BillOfMaterials/GenerateBillOfMaterialsActivity.cs b/Corgibytes.Freshli.Cli/Functionality/BillOfMaterials/GenerateBillOfMaterialsActivity.cs index c5798fa33..fd5f6da99 100644 --- a/Corgibytes.Freshli.Cli/Functionality/BillOfMaterials/GenerateBillOfMaterialsActivity.cs +++ b/Corgibytes.Freshli.Cli/Functionality/BillOfMaterials/GenerateBillOfMaterialsActivity.cs @@ -1,14 +1,14 @@ using System; using System.Collections.Concurrent; using System.IO; -using Corgibytes.Freshli.Cli.DataModel; +using System.Threading; using Corgibytes.Freshli.Cli.Functionality.Engine; using Corgibytes.Freshli.Cli.Services; using Microsoft.Extensions.DependencyInjection; namespace Corgibytes.Freshli.Cli.Functionality.BillOfMaterials; -public class GenerateBillOfMaterialsActivity : IApplicationActivity +public class GenerateBillOfMaterialsActivity : IApplicationActivity, IMutexed { public readonly string AgentExecutablePath; public readonly Guid AnalysisId; @@ -16,6 +16,7 @@ public class GenerateBillOfMaterialsActivity : IApplicationActivity public readonly string ManifestPath; private static readonly ConcurrentDictionary s_lockPoints = new(); + private static readonly ConcurrentDictionary s_historyPointMutexes = new(); public GenerateBillOfMaterialsActivity(Guid analysisId, string agentExecutablePath, int historyStopPointId, string manifestPath) @@ -26,6 +27,20 @@ public GenerateBillOfMaterialsActivity(Guid analysisId, string agentExecutablePa ManifestPath = manifestPath; } + public Mutex GetMutex(IServiceProvider provider) + { + var cacheManager = provider.GetRequiredService(); + var cacheDb = cacheManager.GetCacheDb(); + + var historyStopPoint = cacheDb.RetrieveHistoryStopPoint(HistoryStopPointId); + // TODO create an exception class for this exception and write a test to cover it getting generated + _ = historyStopPoint ?? throw new Exception($"Failed to retrieve history stop point {HistoryStopPointId}"); + + var historyPointPath = historyStopPoint.LocalPath; + EnsureHistoryPointMutexExists(historyPointPath); + return s_historyPointMutexes[historyPointPath]; + } + public void Handle(IApplicationEventEngine eventClient) { var agentManager = eventClient.ServiceProvider.GetRequiredService(); @@ -40,23 +55,19 @@ public void Handle(IApplicationEventEngine eventClient) var historyPointPath = historyStopPoint.LocalPath; var asOfDateTime = historyStopPoint.AsOfDateTime; - EnsureLockPointExists(historyPointPath); - lock (s_lockPoints[historyPointPath]) - { - var fullManifestPath = Path.Combine(historyPointPath, ManifestPath); - var bomFilePath = agentReader.ProcessManifest(fullManifestPath, asOfDateTime); - var cachedBomFilePath = cacheManager.StoreBomInCache(bomFilePath, AnalysisId, asOfDateTime); + var fullManifestPath = Path.Combine(historyPointPath, ManifestPath); + var bomFilePath = agentReader.ProcessManifest(fullManifestPath, asOfDateTime); + var cachedBomFilePath = cacheManager.StoreBomInCache(bomFilePath, AnalysisId, asOfDateTime); - eventClient.Fire(new BillOfMaterialsGeneratedEvent( - AnalysisId, HistoryStopPointId, cachedBomFilePath, AgentExecutablePath)); - } + eventClient.Fire(new BillOfMaterialsGeneratedEvent( + AnalysisId, HistoryStopPointId, cachedBomFilePath, AgentExecutablePath)); } - private void EnsureLockPointExists(string path) + private static void EnsureHistoryPointMutexExists(string path) { - if (!s_lockPoints.ContainsKey(path)) + if (!s_historyPointMutexes.ContainsKey(path)) { - s_lockPoints[path] = new object(); + s_historyPointMutexes[path] = new Mutex(); } } } diff --git a/Corgibytes.Freshli.Cli/Functionality/Engine/ApplicationEngine.cs b/Corgibytes.Freshli.Cli/Functionality/Engine/ApplicationEngine.cs index c0f5cd243..c6963091e 100644 --- a/Corgibytes.Freshli.Cli/Functionality/Engine/ApplicationEngine.cs +++ b/Corgibytes.Freshli.Cli/Functionality/Engine/ApplicationEngine.cs @@ -133,9 +133,24 @@ public void FireEventAndHandler(IApplicationEvent applicationEvent) } } + const int MutexWaitTimeoutInMilliseconds = 50; // ReSharper disable once MemberCanBePrivate.Global public void HandleActivity(IApplicationActivity activity) { + Mutex? mutex = null; + if (activity is IMutexed mutexSource) + { + mutex = mutexSource.GetMutex(ServiceProvider); + } + + var mutexAcquired = mutex?.WaitOne(MutexWaitTimeoutInMilliseconds) ?? true; + if (!mutexAcquired) + { + // place the activity back in the queue and free up the worker to make progress on a different activity + Dispatch(activity); + return; + } + try { _logger.LogDebug( @@ -149,6 +164,13 @@ public void HandleActivity(IApplicationActivity activity) { Fire(new UnhandledExceptionEvent(error)); } + finally + { + if (mutex != null && mutexAcquired) + { + mutex.ReleaseMutex(); + } + } } // ReSharper disable once MemberCanBePrivate.Global diff --git a/Corgibytes.Freshli.Cli/Functionality/Engine/IMutexed.cs b/Corgibytes.Freshli.Cli/Functionality/Engine/IMutexed.cs new file mode 100644 index 000000000..94a9697cc --- /dev/null +++ b/Corgibytes.Freshli.Cli/Functionality/Engine/IMutexed.cs @@ -0,0 +1,9 @@ +using System; +using System.Threading; + +namespace Corgibytes.Freshli.Cli.Functionality.Engine; + +public interface IMutexed +{ + public Mutex GetMutex(IServiceProvider serviceProvider); +}